← Protobuf | Etcd →

gRPC - Go RPC 框架

gRPC 是 Google 开源的高性能 RPC 框架,基于 HTTP/2 和 Protocol Buffers。掌握 gRPC 是开发微服务、分布式系统的基础。

📌 核心概念

🚀

高性能

基于 HTTP/2

二进制协议
📝

IDL 定义

.proto 文件

protobuf
🔄

四种服务

Unary/Stream

RPC 类型
🔌

中间件

Interceptor

拦截器

快速开始

📝 定义服务

// service.proto
syntax = "proto3";

package user;

// 请求消息
message GetUserRequest {
    int64 user_id = 1;
}

// 响应消息
message GetUserResponse {
    int64 user_id = 1;
    string name = 2;
    string email = 3;
}

// 服务定义
service UserService {
    // 一元 RPC
    rpc GetUser(GetUserRequest) returns (GetUserResponse);
    
    // 流式 RPC
    rpc ListUsers(ListUsersRequest) returns (stream User);
}

📝 生成 Go 代码

# 安装 protoc 和插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

# 生成代码
protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       service.proto

# 生成文件:
# - service.pb.go (消息定义)
# - service_grpc.pb.go (服务接口)

💡 gRPC 要点

  • HTTP/2: 多路复用、头部压缩
  • Protobuf: 二进制序列化,高效紧凑
  • 代码生成: 从 .proto 生成客户端/服务端代码
  • 四种 RPC: Unary、Server Stream、Client Stream、Bidirectional

gRPC 服务端

📝 实现服务

package main

import (
    "context"
    "fmt"
    "net"
    "google.golang.org/grpc"
    pb "path/to/proto"
)

// 实现服务接口
type userServiceServer struct {
    pb.UnimplementedUserServiceServer
}

// 实现一元 RPC
func (s *userServiceServer) GetUser(
    ctx context.Context, 
    req *pb.GetUserRequest,
) (*pb.GetUserResponse, error) {
    // 业务逻辑
    return &pb.GetUserResponse{
        UserId: req.UserId,
        Name:   "Alice",
        Email:  "alice@example.com",
    }, nil
}

func main() {
    // 创建监听
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        panic(err)
    }
    
    // 创建 gRPC 服务器
    server := grpc.NewServer()
    
    // 注册服务
    pb.RegisterUserServiceServer(server, &userServiceServer{})
    
    fmt.Println("Server starting on :50051")
    
    // 启动服务
    if err := server.Serve(lis); err != nil {
        panic(err)
    }
}

gRPC 客户端

📝 调用服务

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    pb "path/to/proto"
)

func main() {
    // 建立连接
    conn, err := grpc.Dial(
        "localhost:50051",
        grpc.WithInsecure(),
    )
    if err != nil {
        panic(err)
    }
    defer conn.Close()
    
    // 创建客户端
    client := pb.NewUserServiceClient(conn)
    
    // 调用一元 RPC
    ctx := context.Background()
    resp, err := client.GetUser(ctx, &pb.GetUserRequest{
        UserId: 123,
    })
    if err != nil {
        panic(err)
    }
    
    fmt.Printf("User: %s (%s)\n", resp.Name, resp.Email)
}

拦截器 (中间件)

📝 客户端/服务端拦截器

// 客户端拦截器
func clientInterceptor(
    ctx context.Context,
    method string,
    req, reply interface{},
    cc *grpc.ClientConn,
    invoker grpc.UnaryInvoker,
    opts ...grpc.CallOption,
) error {
    fmt.Printf("Calling: %s\n", method)
    
    // 添加认证头
    ctx = metadata.AppendToOutgoingContext(
        ctx, "authorization", "Bearer token",
    )
    
    // 调用实际 RPC
    err := invoker(ctx, method, req, reply, cc, opts...)
    
    fmt.Printf("Response: %v\n", reply)
    return err
}

// 服务端拦截器
func serverInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    // 日志记录
    fmt.Printf("Handling: %s\n", info.FullMethod)
    
    // 认证检查
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, grpc.Errorf(codes.Unauthenticated, "missing metadata")
    }
    
    // 调用实际处理函数
    return handler(ctx, req)
}

// 使用拦截器
func main() {
    // 服务端
    server := grpc.NewServer(
        grpc.UnaryInterceptor(serverInterceptor),
    )
    
    // 客户端
    conn, _ := grpc.Dial(
        "localhost:50051",
        grpc.WithInsecure(),
        grpc.WithUnaryInterceptor(clientInterceptor),
    )
}

流式 RPC

📝 服务端流式

// .proto 定义
service ChatService {
    // 服务端流式
    rpc Subscribe(ChatRequest) returns (stream ChatMessage);
    
    // 客户端流式
    rpc SendMessages(stream ChatMessage) returns (Summary);
    
    // 双向流式
    rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

// 服务端实现
func (s *chatServer) Subscribe(
    req *ChatRequest,
    stream ChatService_SubscribeServer,
) error {
    for i := 0; i < 5; i++ {
        msg := &ChatMessage{
            Content: fmt.Sprintf("Message %d", i),
        }
        if err := stream.Send(msg); err != nil {
            return err
        }
        time.Sleep(time.Second)
    }
    return nil
}

// 客户端调用
func main() {
    stream, _ := client.Subscribe(ctx, &ChatRequest{})
    
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            break
        }
        fmt.Println(msg.Content)
    }
}

最佳实践

✅ gRPC 使用建议

  • TLS 加密: 生产环境使用 TLS
  • 超时控制: 设置合理的超时时间
  • 错误处理: 使用 status 包返回错误
  • 拦截器: 统一日志、认证、监控
  • 连接池: 复用 gRPC 连接