Golang 使用 protobuf 实现 grpc 通信

发布时间: 更新时间: 总字数:2060 阅读时间:5m 作者: IP上海 分享 网址

gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。本文介绍 Golang 使用 protobuf 实现 gRPC 通信示例。

介绍

  • gRPC 支持多语言
    • C++、C#、Dart、Go、Java、Node.js、Objective-C、PHP、Python、Ruby
  • 使用 golang + protobuf 实现 rpc 通信
  • 当前使用原生 golang 开发,也可以参考 gogo/protobuf 实现
  • 概念
    • grpc 中给服务端叫 gRPC server,Server 端需要实现对应的 RPC,所有的 RPC 组成了 Service
    • 客户端叫 grpc Stub(不叫 grpc client)通过 Stub 可以真正的调用 RPC 请求
    • Channel 提供一个与特定 gRPC server 的主机和端口建立的连接
    • Stub 是在 Channel 的基础上封装

特点

  • HTTP/2
  • Protobuf
  • 客户端、服务端基于同一份 IDL
  • 移动网络的良好支持
  • 支持多语言

gRPC 的流式

gRPC 分为三种流类型:

  • Server-side streaming RPC 服务器端流式 RPC
  • Client-side streaming RPC 客户端流式 RPC
  • Bidirectional streaming RPC 双向流式 RPC

适用于:

  • 大规模数据包
  • 实时场景
  • 解决数据包过大造成的瞬时压力

gRPC 的拦截器

按功能分为:

  • 一元拦截器(grpc.UnaryInterceptor) 适用于普通方法
  • 流拦截器(grpc.StreamInterceptor) 适用于流方法

按端分为

  • 客户端拦截器(ClientInterceptor)
  • 服务端拦截器(ServerInterceptor)

说明:

  • gRPC 原来只能设置一个拦截器
  • 多个拦截器可以使用 google.golang.org/grpc.ChainUnaryInterceptor
import (
    "github.com/grpc-ecosystem/go-grpc-middleware"
		...
)

	opts := []grpc.ServerOption{
		grpc.ChainUnaryInterceptor(
			interceptor.RecoveryInterceptor,
			interceptor.LogInterceptor,
		),
		、、 grpc.ChainStreamInterceptor()
	}
	server := grpc.NewServer(opts...)
  • grpc.WithPerRPCCredentials 可以添加认证拦截,需要实现 PerRPCCredentials
// PerRPCCredentials defines the common interface for the credentials which need to
// attach security information to every RPC (e.g., oauth2).
type PerRPCCredentials interface {
	// GetRequestMetadata gets the current request metadata, refreshing tokens
	// if required. This should be called by the transport layer on each
	// request, and the data should be populated in headers or other
	// context. If a status code is returned, it will be used as the status for
	// the RPC (restricted to an allowable set of codes as defined by gRFC
	// A54). uri is the URI of the entry point for the request.  When supported
	// by the underlying implementation, ctx can be used for timeout and
	// cancellation. Additionally, RequestInfo data will be available via ctx
	// to this call.  TODO(zhaoq): Define the set of the qualified keys instead
	// of leaving it as an arbitrary string.
	GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
	// RequireTransportSecurity indicates whether the credentials requires
	// transport security.
	RequireTransportSecurity() bool
}

grpc 库其他

常见模块作用:

  • google.golang.org/grpc/credentials gRPC 库支持的各种凭证
  • google.golang.org/grpc/codes Package codes defines the canonical error codes used by gRPC.
  • google.golang.org/grpc/metadata Package metadata define the structure of the metadata supported by gRPC library.
    • gRPC Metadata 通过 HTTP header 传输
  • google.golang.org/grpc/status Package status implements errors returned by gRPC.
  • grpc-ecosystem/grpc-health-probe 为 Kubernetes 和其他系统中的 gRPC 应用程序执行健康检查的命令行工具

开源软件使用:

环境准备

编写 proto 文件

使用 vscode 插件 vscode-proto3 开发 demo.proto

syntax = "proto3";
import "google/protobuf/empty.proto";

package main;
option go_package = "github.com/xiexianbin/go-grpc-demo/grpc";

message NumRequest {
    repeated int64 Nums = 1;
}

message NumResponse {
    int64 Result = 1;
}

message VersionResponse {
    string Version = 1;
}

message FilePath {
    string path = 1;
}

message FileResponse {
    bytes content = 1;
}

service Service {
    rpc Sum(NumRequest) returns (NumResponse) {}
    rpc Diff(NumRequest) returns (NumResponse) {}

    rpc Version(google.protobuf.Empty) returns (VersionResponse) {}
    rpc ReadFile(FilePath) returns (FileResponse) {}
}

编译生成结构体和方法

此次编译,生成 golang 对应的结构体和方法

  • 运行如下命令安装 Go protocol buffers plugin(protoc-gen-go 会被安装到 $GOBIN 目录下)
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
  • 生成结构体 .pb.go 文件

语法: - -go_out=. 设置 Go 代码输出的目录 - plugins=plugin1+plugin2 指定要加载的子插件列表 - : 冒号充当分隔符的作用,后跟所需要的参数集

protoc --go_out=plugins=grpc:. *.proto

生成:

SRC_DIR=.
DST_DIR=.
protoc -I=$SRC_DIR --go_out=$DST_DIR $SRC_DIR/grpc/demo.proto
# protoc -I=$SRC_DIR --go_out=$DST_DIR --go_opt=paths=source_relative $SRC_DIR/grpc/demo.proto
# --plugin=$GOBIN/protoc-gen-go

在当前目录下生成 github.com/xiexianbin/go-grpc-demo/proto/demo.pb.go 文件:

  • NumRequest、NumResponse、VersionResponse、FilePath、FileResponse 结构体和对应的方法
  • fileDescriptor 代表对应方法具体的 Message Field

编译生成 gRPC 接口

  • 安装 protobuf 服务( Service )的 Golang 代码生成插件
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
  • 生成 grpc 文件
SRC_DIR=.
DST_DIR=.
protoc -I=$SRC_DIR --go-grpc_out=$DST_DIR $SRC_DIR/grpc/demo.proto
# protoc -I=$SRC_DIR --go-grpc_out=$DST_DIR --go-grpc_opt=paths=source_relative $SRC_DIR/grpc/demo.proto

在目录 github.com/xiexianbin/go-grpc-demo/proto/demo_grpc.pb.go 文件,里面封装了 rpc 调用需要的方法

实现 gRPC Server 类

type DemoServiceServer struct {
	dgrpc.UnimplementedServiceServer
}

func (s *DemoServiceServer) Sum(ctx context.Context, numRequest *dgrpc.NumRequest) (*dgrpc.NumResponse, error) {
	...
	return numResponse, nil
}

func (s *DemoServiceServer) Diff(ctx context.Context, numRequest *dgrpc.NumRequest) (*dgrpc.NumResponse, error) {
	...
	return numResponse, nil
}

func (s *DemoServiceServer) Version(ctx context.Context, empty *emptypb.Empty) (*dgrpc.VersionResponse, error) {
	...
	return version, nil
}

func (s *DemoServiceServer) ReadFile(ctx context.Context, filePath *dgrpc.FilePath) (*dgrpc.FileResponse, error) {
	...
	return &dgrpc.FileResponse{Content: fileContent}, nil
}

服务监听

rpc 客户端工具

  • wombat rpc 客户端工具安装:
brew install --cask wombat

通过是否采用SSL证书、指定 proto 文件,可以调试 rpc 相关接口

  • grpcurl 像 cURL 一样,但用于 gRPC:用于与gRPC服务器交互的命令行工具
# 安装
brew install grpcurl

go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest

# 使用
grpcurl grpc.server.com:443 my.custom.server.Service/Method

# no TLS
grpcurl -plaintext grpc.server.com:80 my.custom.server.Service/Method

TSL 通信

证书签发

install and use xca to create tsl cert.

# 生成根证书
xca -create-ca true \
  -root-cert x-ca/ca/root-ca.crt \
  -root-key x-ca/ca/root-ca/private/root-ca.key \
  -tls-cert x-ca/ca/tls-ca.crt \
  -tls-key x-ca/ca/tls-ca/private/tls-ca.key

# 生成 server 证书
xca -cn server \
  --domains "localhost" \
  --ips 127.0.0.1 \
  -tls-cert x-ca/ca/tls-ca.crt \
  -tls-key x-ca/ca/tls-ca/private/tls-ca.key

# 生成 client 证书
xca -cn client \
  --domains "localhost" \
  --ips 127.0.0.1 \
  -tls-cert x-ca/ca/tls-ca.crt \
  -tls-key x-ca/ca/tls-ca/private/tls-ca.key

服务端加载证书

func loadServerTSLCert() (credentials.TransportCredentials, error) {
	caPEMFile, err := ioutil.ReadFile(rootCACrtPath)
	if err != nil {
		return nil, err
	}

	caPool := x509.NewCertPool()
	if !caPool.AppendCertsFromPEM(caPEMFile) {
		return nil, fmt.Errorf("load %s cert fail", rootCACrtPath)
	}

	localCert, err := tls.LoadX509KeyPair(serverCrtPath, serverKeyPath)
	if err != nil {
		return nil, fmt.Errorf("load server cert and key file fail: %s", err.Error())
	}

	config := &tls.Config{
		Certificates: []tls.Certificate{localCert},
		ClientAuth:   tls.RequireAndVerifyClientCert, // check client's certificate
		ClientCAs:    caPool,
	}

	return credentials.NewTLS(config), nil
}

	var s *googlerpc.Server
	if rootCACrtPath != "" && serverCrtPath != "" && serverKeyPath != "" {
		tlsCrt, err := loadServerTSLCert()
		if err != nil {
			log.Fatalf("load server cert err: %s", err.Error())
		}
		s = googlerpc.NewServer(googlerpc.Creds(tlsCrt))
	} else {
		s = googlerpc.NewServer()
	}

客户端加载证书

func loadClientTSLCert() (credentials.TransportCredentials, error) {
	caPEMFile, err := ioutil.ReadFile(rootCACrtPath2)
	if err != nil {
		return nil, err
	}

	caPool := x509.NewCertPool()
	if !caPool.AppendCertsFromPEM(caPEMFile) {
		return nil, fmt.Errorf("load %s cert fail", rootCACrtPath2)
	}

	localCert, err := tls.LoadX509KeyPair(clientCrtPath, clientKeyPath)
	if err != nil {
		return nil, fmt.Errorf("load client cert and key file fail: %s", err.Error())
	}

	config := &tls.Config{
		Certificates: []tls.Certificate{localCert},
		ServerName:   "localhost", // client cn name
		RootCAs:      caPool,
	}

	return credentials.NewTLS(config), nil
}

	var creds credentials.TransportCredentials
	var err error
	if rootCACrtPath2 != "" && clientCrtPath != "" && clientKeyPath != "" {
		creds, err = loadClientTSLCert()
		if err != nil {
			log.Fatalf("load client cert err: %s", err.Error())
		}
	} else {
		creds = insecure.NewCredentials()
	}
	conn, err := googlerpc.Dial("127.0.0.1:8000", googlerpc.WithTransportCredentials(creds))

说明:

  • grpc.Dial 通过异步连接的服务器,可以通过 grpc.WithBlock 选项,配置阻塞等待
  • grpc.ClientConn 不关闭连接,会导致 goroutine 和 Memory 等泄露
  • 客户端请求失败后会默认会通过 backoff 算法重试,默认的最大重试间隔是 120s
  • Kubernetes 中 gRPC 负载均衡问题
    • 原因:gRPC 的 RPC 协议是基于 HTTP/2 标准实现的,HTTP/2 的链路复用特性决定调度到一个后端,即复用原有的连接
  • Server 监听是循环等待连接,若没有则休眠,最大休眠时间 1s;若接收到新请求则起一个新的 goroutine 去处理
  • 任何调用,如果不加超时控制,会出现泄漏和客户端不断重试

启动服务

由于 xca 采用二级证书签发,因此服务端和客户端启动服务时,需要加载证书链,签发和使用详见源代码README.md

源代码

更多实现,如客户端、grpc+TSL实现,参考源代码:go-grpc-demo

其他

  • gRPC 压测工具:ghz

参考

  1. https://grpc.io/about/
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数