Protocol Buffer
Protocol Buffer
是Google开发的一种与语言无关、平台无关、可扩展的用于序列化结构化数据的机制。 使用protobuf
有以下几个步骤:
使用protobuf compiler
编译.proto
文件生成对应语言的数据结构代码
我这里使用的Go,让我们跟着官方的《Protocol Buffer Basics: Go》 来使用一下protobuf
Hello Protobuf
定义proto文件
首先先创建一个grpc/helloproto
目录,并在grpc
使用go mod
初始化
复制 $ mkdir -p grpc/helloproto && cd grpc
$ go mod init grpc
然后在grpc/helloproto
目录下创建hellp_proto.proto
文件:
复制 syntax = "proto3" ;
package helloproto ; // proto文件不是通过每个文件来区分命名空间的,而是通过package
import "google/protobuf/timestamp.proto" ;
option go_package = "grpc/helloproto" ;
message Person {
string name = 1 ;
int32 id = 2 ;
string email = 3 ;
enum PhoneType {
MOBILE = 0 ;
HOME = 1 ;
WORK = 2 ;
}
message PhoneNumber {
string number = 1 ;
PhoneType type = 2 ;
}
repeated PhoneNumber phones = 4 ;
google.protobuf.Timestamp last_updated = 5 ;
}
message AddressBook {
repeated Person people = 1 ;
}
这里需要注意的是,我们引入了一个官方提供的proto
文件import "google/protobuf/timestamp.proto";
之后这将会影响到使用protobuf compiler
编译。其他官方提供的类型可以在《Package google.protobuf》 找到 此外我这里使用的Go,添加了可选的配置option go_package = "grpc/helloproto";
,这也会影响到protobuf complier
的编译
编译proto文件
首先我们需要安装protobuf compiler
:
复制 $ brew install protobuf
$ protoc --version
libprotoc 3.17.3
然后由于我这里使用的Go,则需要安装Go相关的插件,其他语言也可以在这里 找到:
复制 $ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
然后运行compiler:
复制 $ cd ~/grpc
$ protoc --proto_path=. \ # 指定扫描import文件的路径
--go_out = ../ \ # 指定生成文件的路径
./helloproto/hello_proto.proto google/protobuf/timestamp.proto # 指定编译的文件
运行完以后生成了文件hello_proto.pb.go
其中包含了我们定义的数据结构,和其他相关代码。
这里需要解释一下,--proto_path
选项制定了扫描proto文件中国import文件的路径。但是对于我们引入的google/protobuf/timestamp.proto
这个文件,在我们本地并没有,于是在最后指定编译文件的时候需要额外引入。 此外,让我们先看一下当前~/grpc
下的目录结构:
复制 $ tree
.
├── go.mod
├── go.sum
└── helloproto
├── hello_proto.pb.go
└── hello_proto.proto
--go_out
选项指定了,生成文件的开始路径。我们是在~/grpc
路径下执行的,此时protoc
生成文件的路径就是从~
同级目录开始的。然而我们发现生成的文件却在~/grpc/helloproto
目录下,这就是之前设置了option go_package = "grpc/helloproto";
的结果。 prorotc
插件google.golang.org/protobuf/cmd/protoc-gen-go
要求我们必须设置option go_package
参数并且指定有效的Go导入路径。一下的提示信息将会在设置错误的go_package
时提示出来:
复制 protoc-gen-go: invalid Go import path "helloproto" for "helloproto/hello_proto.proto"
The import path must contain at least one period ( '.' ) or forward slash ( '/' ) character.
也就是说,在--go_out
设置的同级目录中,import
生成的XXX_pb.go
文件时指定的包名就是在proto
文件中设置的option go_package
包名。
为了方便之后修改proto
文件的时候重新编译,这里稍作修改,并将命令写入了Makefile
文件:
复制 $ touch ~/grpc/Makefile
$ cat Makefile
PROTO_FILES = $( shell find ./helloproto/* -name *.proto )
proto:
protoc --proto_path=. --go_out=../ $( PROTO_FILES ) google/protobuf/timestamp.proto
序列化结构体
然后编写测试hello_proto_test.go
:
复制 package grpc
import (
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
pb "grpc/helloproto"
"testing"
)
func TestHelloProto (t * testing . T ) {
// The whole purpose of using protocol buffers is to serialize your data
bookOut := & pb . AddressBook {
People: [] * pb . Person {{
Id: 1234 ,
Name: "John Doe" ,
Email: "jdoe@example.com" ,
Phones: [] * pb . Person_PhoneNumber {
{Number: "555-4321" , Type: pb.Person_HOME},
},
}},
}
data, err := proto. Marshal (bookOut)
assert. NoError (t, err)
bookIn := & pb . AddressBook {}
assert. NoError (t, proto. Unmarshal (data, bookIn))
}
其他序列化方式
其他还有如XML
、JSON
等比较通用的序列化方式。XML
一直被人诟病的就是信息密度太低,JSON
由于采用字符串保存数据,可读性很高,性能却不太理想。 protobuf
推出之后相较于以上两者性能有了很大提升,但是在官方推出的第一版protobuf
之后,社区有了性能更强的魔改版本gogoprotobuf
。
go_serialization_benchmarks 比较了Go中多种序列化方式的性能。
2020年3月份,protobuf
官方又发文《A new Go API for Protocol Buffers》 推出了一个v2版本,性能上与gogoprotobuf
相比依然不尽人意。具体可参考《go protobuf v1败给了gogo protobuf,那v2呢?》 。但是在官方推出的v2版本支持了动态反射,这使得我们生成一些编译时未知的message。 关于官方发文的翻译可以参考一下:《Go Protobuf APIv2 动态反射 Protobuf 使用指南》 这篇文章中也介绍了protoreflect
的一些使用。接下来让我们用两个例子简单看一下v2的动态反射。
Protoreflect
第一个例子很简单,直接在hello_proto_test.go
中添加如下代码:
复制 func TestHelloProtoReflect (t * testing . T ) {
book := & pb . AddressBook {
People: [] * pb . Person {{
Id: 1234 ,
Name: "John Doe" ,
Email: "jdoe@example.com" ,
Phones: [] * pb . Person_PhoneNumber {
{Number: "555-4321" , Type: pb.Person_HOME},
},
}},
}
data, err := proto. Marshal (book)
assert. NoError (t, err)
// Get message type by full name
msgType, err := protoregistry.GlobalTypes. FindMessageByName ( "helloproto.AddressBook" )
assert. NoError (t, err)
// Deserialize into helloproto.AddressBook message
msg := msgType. New (). Interface ()
err = proto. Unmarshal (data, msg)
assert. NoError (t, err)
t. Log (msg)
}
我们直接根据字面量"helloproto.AddressBook"
从protoregistry.GlobalTypes
获取了一个protoreflect.MessageType
,并将提前准备好的序列化数据,反序列化到由msgType
新生成的实例中。 需要提一下的是,protoregistry.GlobalTypes
是根据生成的hello_proto.pb.go
文件来反射生成实例的。
第二个例子是通过反射,遍历helloproto.Person
的所有字段,并将名字改成"zhangsan"
:
复制 func TestHelloProtoReflect2 (t * testing . T ) {
person := & pb . Person {
Id: 1234 ,
Name: "John Doe" ,
Email: "jdoe@example.com" ,
Phones: [] * pb . Person_PhoneNumber {
{Number: "555-4321" , Type: pb.Person_HOME},
},
}
// 获取Massage
msg := person. ProtoReflect ()
msg. Range ( func (fd protoreflect . FieldDescriptor , v protoreflect . Value ) bool {
if fd. Name () == "name" {
msg. Set (fd, protoreflect. ValueOfString ( "zhangsan" ))
}
return true
})
t. Log (msg. Interface ())
}
甚至我们还可以利用Message.Clear()
在Range
中删除指定的字段,具体可以参考《A new Go API for Protocol Buffers》。
通过下图可以看到ProtoMessage
和Message
之间的转换关系
复制 ┌──────────────── New () ─────────────────┐
│ │
│ ┌─── Descriptor () ─────┐ │ ┌── Interface () ───┐
│ │ V V │ V
╔═════════════╗ ╔═══════════════════╗ ╔═════════╗ ╔══════════════╗
║ MessageType ║ ║ MessageDescriptor ║ ║ Message ║ ║ ProtoMessage ║
╚═════════════╝ ╚═══════════════════╝ ╚═════════╝ ╚══════════════╝
Λ Λ │ │ Λ │
│ └──── Descriptor () ────┘ │ └─ ProtoReflect () ─┘
│ │
└─────────────────── Type () ─────────┘
gogo/protobuf
最后简单介绍一下gogo/protobuf
的使用。该项目提供了protoc-gen-gofast
的生成工具,配套需要引入一下mod:
复制 go get github.com/gogo/protobuf/protoc-gen-gofast
复制 go get github.com/gogo/protobuf/proto
go get github.com/gogo/protobuf/gofast
go get github.com/gogo/protobuf/gogoproto
protoc
命令也需要做相应的修改:
复制 PROTO_FILES = $( shell find ./helloproto/ * -name * .proto)
gofast_proto :
protoc --proto_path=. \
--gofast_out = Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types:../ \
$(PROTO_FILES)
需要注意的是,这里我们使用到了google/protobuf/*.proto
文件,需要将其替换成gogo/protobuf/types
除此之外,gogo/protobuf
还提供多种不同的生成工具,来适应不同的场景:
复制 protoc-gen-gogofast (same as gofast, but imports gogoprotobuf)
protoc-gen-gogofaster (same as gogofast, without XXX_unrecognized, less pointer fields)
protoc-gen-gogoslick (same as gogofaster, but with generated string, gostring and equal methods)
具体可以到其github仓库查看。
最后想再提一下,官方提供的两个protobuf
版本在两个不同的仓库。v1版本在https://github.com/golang/protobuf
,v2版本在https://github.com/protocolbuffers/protobuf-go
。gogo/protobuf
目前并没有兼容v2版本,也就是说,如果需要使用反射等功能则不能使用gogo/protobuf
gRPC
RPC 全称 (Remote Procedure Call),远程过程调用,指的是一台计算机通过网络请求另一台计算机的上服务,RPC 是构建在已经存在的协议(TCP/IP,HTTP 等)之上的,RPC 采用的是客户端,服务器模式。
gRPC 是一款能运行在多平台上开源高效的RPC框架,可以有效地连接数据中心和跨数据中心的服务,支持负载均衡、链路跟踪、心跳检查和身份验证等特点。
Hello gRPC
让我们定义一个最简单的proto文件来实现gRPC调用。首先我们需要安装protocl插件:
复制 $ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1
然后定义名为hellogrpc.proto
的文件,内容如下:
复制 syntax = "proto3" ;
package helloworld ;
option go_package = "grpc/hellogrpc" ;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1 ;
}
// The response message containing the greetings
message HelloReply {
string message = 1 ;
}
编译生成文件:
复制 $ protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative hellogrpc.proto
$ tree -L 1
.
├── hellogrpc.pb.go
├── hellogrpc.proto
├── hellogrpc_grpc.pb.go
可以看到,除了生成了我们熟知的helogrpc.pb.go
文件,还生成了hellogrpc_grpc.pb.go
文件。
接下来,在服务端我们需要实现定义的方法,然后将其注册为grpc服务。我们可以引入生成的文件包"grpc/hellogrpc"
并继承其中的UnimplementedGreeterServer
接口体,然后实现SayHello()
方法:
复制 type Server struct {
Addr string
UnimplementedGreeterServer
}
// SayHello implements hellogrpc.GreeterServer
func (s * Server ) SayHello (ctx context . Context , in * HelloRequest ) ( * HelloReply , error ) {
log. Printf ( "Received: %v " , in. GetName ())
return & HelloReply {Message: s.Addr + ":Hello " + in. GetName ()}, nil
}
然后我们需要初始化一个grpc服务,并为其注册我们的server实现,然后绑定监听端口:
复制 const addr = "localhost:50055"
func TestGrpcServer (t * testing . T ) {
var s * grpc . Server
// Create the insecure server
{
s = grpc. NewServer ()
}
pb. RegisterGreeterServer (s, & Server {})
lis, _ := net. Listen ( "tcp" , addr)
log. Printf ( "server listening at %v " , lis. Addr ())
s. Serve (lis)
}
在客户端这边,我们需要创建一个grpc链接,并通过hellogrpc_grpc.pb.go
中提供的代码生成一个grpc客户端:
复制 const addr = "localhost:50055"
func TestGrpcClient (t * testing . T ) {
var conn * grpc . ClientConn
var err error
//Set up a connection to the server.
{
conn, _ = grpc. Dial (addr, grpc. WithTransportCredentials (insecure. NewCredentials ()))
}
c := pb. NewGreeterClient (conn)
// Contact the server and print out its response.
ctx, cancel := context. WithTimeout (context. Background (), time.Second)
defer cancel ()
const name = "zhangsan"
r, _ := c. SayHello (ctx, & pb . HelloRequest {Name: name})
log. Printf ( "Greeting: %s " , r. GetMessage ())
}
可以看到,gRPC的调用非常简单。
流式调用
得益于http2.0的流式响应,除了上面例子中的简单调用,gRPC还有提供了三种流式调用server-side streaming RPC 、client-side streaming RPC 、bidirectional streaming RPC 。让我们直接看一个bidirectional streaming RPC 的例子。
首先我们在刚才hellogrpc.proto
文件的基础上添加一个新的方法,并执行编译:
复制 // The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
// Sends more greetings
rpc SayMoreHello ( stream HelloRequest) returns ( stream HelloReply) {}
}
然后在服务端实现SayMoreHello()
方法:
复制 // SayMoreHello implements hellogrpc.GreeterServer
func (s * server ) SayMoreHello (stream pb . Greeter_SayMoreHelloServer ) error {
for {
in, err := stream. Recv ()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log. Printf ( "Received: %v " , in. GetName ())
reply := & pb . HelloReply {Message: "Hello " + in. GetName ()}
for i := 0 ; i < 3 ; i ++ {
if err := stream. Send (reply); err != nil {
return err
}
time. Sleep (time.Second)
}
}
}
客户端也需要相应的修改为流式调用的方式:
复制 func TestGrpcClient2 (t * testing . T ) {
//Set up a connection to the server.
conn, _ := grpc. Dial (addr, grpc. WithInsecure ())
c := pb. NewGreeterClient (conn)
// Contact the server and print out its response.
stream, _ := c. SayMoreHello (context. Background ())
waitc := make ( chan struct {})
go func () {
for {
in, err := stream. Recv ()
if err == io.EOF {
// read done.
close (waitc)
return
}
if err != nil {
log. Fatalf ( "could not greet: %v " , err)
}
log. Printf ( "Greeting: %s " , in. GetMessage ())
}
}()
names := [] string { "zhangsan" , "lisi" , "wangwu" }
for _, name := range names {
if err := stream. Send ( & pb . HelloRequest {Name: name}); err != nil {
log. Fatalf ( "Failed to send a req: %v " , err)
}
time. Sleep (time.Second)
}
stream. CloseSend ()
<- waitc
}
gRPC插件
gRPC还提供了完备的插件接口,可以通过下图看到:
gRPC默认使用protobuf作为数据传输格式,并采用gzip进行数据压缩。我们可以通过google.golang.org/grpc/encoding
包下的proto
和gzip
的init()
方法中看到:
复制 package proto
...
func init () {
encoding. RegisterCodec ( codec {})
}
复制 package gzip
...
func init () {
c := & compressor {}
...
encoding. RegisterCompressor (c)
}
在ServerOption
和DialOption
中也提供了方法,设置我们自定义的编码和压缩方式:
复制 // ServerOption
func ForceServerCodec (codec encoding . Codec ) ServerOption
func RPCCompressor (cp Compressor ) ServerOption
// DialOption
func WithCodec (c Codec ) DialOption
func WithCompressor (cp Compressor ) DialOption
当然其他的插件gRPC也提供了一系列接口,提供我们自己去实现,接下来让我们自己来实现一些常用的接口。
服务发现 & 负载均衡
常用的方式有两种,一种是集中式LB方案,Consumer直接请求代理服务器,由代理服务器去处理服务发现逻辑,并根据负载均衡策略转发请求到对应的ServiceProvider:
Consumer和ServiceProvider通过LB解藕,通常由运维在LB上配置注册所有服务的地址映射,并为LB配置一个DNS域名,提供给Consumer发现LB。当收到来自Consumer的请求时,LB根据某种策略(比如Round-Robin)做负载均衡后,将请求转发到对应的ServiceProvider。 这种方式的缺点就在于,单点的LB成了系统的瓶颈,如果对LB做分布式处理,部署多个实例会增加系统的维护成本。
另一种是进程内LB方案,将处理服务发现和负载均衡的策略交由Consumer处理:
这种方式下,需要有一个额外的服务注册中心,ServiceProvider的启动,需要主动到ServiceRegistry注册。并且,ServiceRegistry需要时时的向Consumer推送,ServiceProvider的服务节点列表。Consumer发送请求时,根据从ServiceRegistry获取到的服务列表,然后使用某种配置做负载均衡后请求到对应的ServiceProvider。
gRPC的服务发现和负载均衡可以通过下图看到,使用的是第二种方式:
其基本实现原理如下:
1、当服务端启动的时候,会向注册中心注册自己的IP地址等信息 2、客户端实例启动的时候会通过Name Resolver将连接信息,通过设置的策略获取到服务端地址 3、客户端的LB会为每一个返回的服务端地址,建立一个channel 4、当进行rpc请求时会根据LB策略,选择其中一个channel对服务端进行调用,如果没有可用的服务,请求将会被阻塞
Resolver
首先我们来看一下,gRPC的服务发现,让我们定位到google.golang.org/grpc/resolver
包下的resolver.go
文件,看一看gRPC提供的接口。其中最核心的两个接口如下:
复制 type Builder interface {
Build (target Target , cc ClientConn , opts BuildOptions ) ( Resolver , error )
Scheme () string
}
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
ResolveNow ( ResolveNowOptions )
Close ()
}
Builder
可用根据在客户端Dial()
方法传入的target
和一些配置信息创建一个Resolver
,Resolver
用于监听和更新服务节点的变化,并在处理完相应逻辑以后,将得到一个target
所对应的IP地址上报给ClientConn
。 进入ClientConn
接口,可以看到其中有一个UpdateState(State)
方法,就是用于上报地址状态的。如果我们的服务发现是静态的话,可以直接在Builder
的Build()
方法中直接配置一套规则,并通过ClientConn
上报。 让我看看一个静态Resolver
的简单实现:
复制 /*
Server discovery
*/
const scheme = "myresolve"
// Implement ResolveBuilder
type MyResolveBuilder struct {}
func (self *MyResolveBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// 发现服务
var state resolver . State
if target.Endpoint == "mytarget" {
state = resolver . State {
Addresses: [] resolver . Address {{Addr: "localhost:50055" }, {Addr: "localhost:50056" }},
}
}
err := cc. UpdateState (state)
if err != nil {
cc. ReportError (errors. Wrapf (err, "更新State失败:" ))
}
return & MyResolver {}, nil
}
func (self * MyResolveBuilder ) Scheme () string { return scheme }
// Implement Resolver
type MyResolver struct {}
func (self * MyResolver ) ResolveNow ( resolver . ResolveNowOptions ) {}
func (self * MyResolver ) Close () {}
使用的时候我们需要将其注册到grpc中,并在建立客户端连接到时候指定target
的schema
:
复制 func init () {
resolver. Register ( & MyResolveBuilder {})
}
func main () {
var conn * grpc . ClientConn
var err error
conn, _ = grpc. Dial ( "myresolve:///mytarget" ,
grpc. WithTransportCredentials (insecure. NewCredentials ()),
)
c := hellogrpc. NewGreeterClient (conn)
r, _ := c. SayHello (context. Background (), & hellogrpc . HelloRequest {Name: "zhangsan" })
log. Printf ( "Greeting: %s " , r. GetMessage ())
}
这里target
到解析规则,可以在google.golang.org/grpc/clientconn
包中的parseTarget()
方法查看。我们将原来的Server
相关代码稍作修改单独作为一个包下面的main函数启动起来,并启动:
复制 var addr = flag. String ( "addr" , "localhost:50055" , "http service address" )
func main () {
flag. Parse ()
var s * grpc . Server
s = grpc. NewServer ()
hellogrpc. RegisterGreeterServer (s, & hellogrpc . Server {Addr: * addr})
lis, _ := net. Listen ( "tcp" , * addr)
log. Printf ( "server listening at %v " , lis. Addr ())
s. Serve (lis)
}
复制 $ go build .
$ ./server
然后运行客户端代码,即可看到调用返回的信息:
复制 $ go build .
$ ./client
2022/02/17 18:01:53 Greeting: localhost:50055:Hello zhangsan
Load Balancer
参考:0x00 再看 RR-Picker 实现
gRPC负载均衡相关的代码在google.golang.org/grpc/balancer
包下,其中最关键的两个接口:
复制 type Picker interface {
Pick (info PickInfo ) ( PickResult , error )
}
type Balancer interface {
UpdateClientConnState ( ClientConnState ) error
ResolverError ( error )
UpdateSubConnState ( SubConn , SubConnState )
Close ()
}
和Resolver
一样,Balancer
也是由Builder
创建。进入LientConnState
结构体我们可以看到有两个参数,其中ResolverState
正是我们上一节中根据target
去解析的resolver.State
。进入这个接口方法的baseBalancer
实现,我们可以看到:
复制 for _, a := range s.ResolverState.Addresses {
addrsSet. Set (a, nil )
if _, ok := b.subConns. Get (a); ! ok {
// a is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
...
sc. Connect ()
}
}
每一个ResolverState
中的Address
都被建立了连接。UpdateSubConnState()
方法顾名思义就是每个连接状态变化时,用于上报的方法。
Picker
接口则是用于真正去实现负载均衡算法的接口。我们进入baseBalancer
可以看到,其Build()
方法的实现中,成员变量picker
定义成了一个ErrPicker
,并且在baseBuilder
中有个接口类型的成员变量pickerBuilder
。找到该接口我们可以发现,这个接口是用于创建Picker
的。 所以关于自定义LB我们只需要实现Picker接口并将其用于创建一个baseBuilder
即可。
首先,我们需要实现Picker
, 这里就做一个简单的轮训。需要两个字段,一个是当前所有连接的数组,一个是下一次应该被请求的index:
复制 // Implement Picker
type MyPicker struct {
subConns [] balancer . SubConn
next int
}
func (self * MyPicker ) Pick (_ balancer . PickInfo ) ( balancer . PickResult , error ) {
sc := self.subConns[self.next]
self.next = (self.next + 1 ) % len (self.subConns)
return balancer . PickResult {SubConn: sc}, nil
}
balancer.SubConn
我们可以根据接口PickerBuilder
的Build()
方法如参得到。 然后,我们需要实现PickerBuilder
即可:
复制 // Implement PickerBuilder
type MyPickerBuilder struct {}
func (self * MyPickerBuilder ) Build (info base . PickerBuildInfo ) balancer . Picker {
scs := make ([] balancer . SubConn , 0 , len (info.ReadySCs))
for sc := range info.ReadySCs {
scs = append (scs, sc)
}
return & MyPicker {subConns: scs}
}
让我们来试验一下。为了验证,我们在之前的MyResolverBuilder
中,添加两个Address
:
复制 func (self *MyResolveBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// 发现服务
//state = resolver.State{
// Addresses: []resolver.Address{{Addr: addrM[target.Endpoint]}},
//}
var state resolver . State
if target.Endpoint == "mytarget" {
state = resolver . State {
Addresses: [] resolver . Address {{Addr: "localhost:50055" }, {Addr: "localhost:50056" }},
}
}
err := cc. UpdateState (state)
if err != nil {
cc. ReportError (errors. Wrapf (err, "更新State失败:" ))
}
return & MyResolver {}, nil
}
之后我们启动两个Server分别监听以上两个端口:
复制 $ ./server -addr= "localhost:50055"
$ ./server -addr= "localhost:50056"
在客户端,我们首先需要将自定义的baseBalancer
注册到grpc中:
复制 const lbName = "mybalancer"
func init () {
resolver. Register ( & MyResolveBuilder {})
balancer. Register ( newMyBalanceBuilder ())
}
func newMyBalanceBuilder () balancer . Builder {
return base. NewBalancerBuilder (lbName, & MyPickerBuilder {}, base . Config {HealthCheck: true })
}
然后在建立grpc客户端连接到时候,需要指定Balancer:
复制 conn, err = grpc. Dial ( "myresolve:///mytarget" ,
grpc. WithTransportCredentials (insecure. NewCredentials ()),
grpc. WithDefaultServiceConfig ( `{"loadBalancingPolicy":"mybalancer"}` ),
//grpc.WithBalancerName("mybalancer"), // same as above
)
完整客户端代码如下:
复制 func init () {
resolver. Register ( & MyResolveBuilder {})
balancer. Register ( newMyBalanceBuilder ())
}
func main () {
var conn * grpc . ClientConn
var err error
conn, err = grpc. Dial ( "myresolve:///mytarget" ,
grpc. WithTransportCredentials (insecure. NewCredentials ()),
grpc. WithDefaultServiceConfig ( `{"loadBalancingPolicy":"mybalancer"}` ),
//grpc.WithBalancerName("mybalancer"), // same as above
)
if err != nil {
log. Fatalf ( "failed to Dial: %v " , err)
}
c := hellogrpc. NewGreeterClient (conn)
for i := 0 ; i < 5 ; i ++ {
r, err := c. SayHello (context. Background (), & hellogrpc . HelloRequest {Name: "zhangsan" })
if err != nil {
log. Fatal (err)
}
log. Printf ( "Greeting: %s " , r. GetMessage ())
}
}
启动之后可以看到两个服务返回的消息:
复制 2022 / 02 / 17 20 : 22 : 17 Greeting: localhost: 50055 :Hello zhangsan
2022 / 02 / 17 20 : 22 : 17 Greeting: localhost: 50056 :Hello zhangsan
2022 / 02 / 17 20 : 22 : 17 Greeting: localhost: 50055 :Hello zhangsan
2022 / 02 / 17 20 : 22 : 17 Greeting: localhost: 50056 :Hello zhangsan
2022 / 02 / 17 20 : 22 : 17 Greeting: localhost: 50055 :Hello zhangsan
其实,以上的Balancer
就是google.golang.org/grpc/balancer
包下roundrobin
的简易版。
拦截器
除此之外,gRPC还在客户端和服务端都提供了拦截器的接口,使得我们可以对发送/接收的请求做统一处理。 gRPC分别提供了单一请求的拦截器和流式调用的拦截器,可以通过以下方法添加到gRPC配置中:
复制 // 添加单一请求到拦截器
func UnaryInterceptor (i UnaryServerInterceptor ) ServerOption
// 添加单一请求多个拦截器
func ChainUnaryInterceptor (interceptors ... UnaryServerInterceptor ) ServerOption
// 添加流式调用拦截器
func StreamInterceptor (i StreamServerInterceptor ) ServerOption
// 添加流式调用多个拦截器
func ChainStreamInterceptor (interceptors ... StreamServerInterceptor ) ServerOption
当然客户端也提供了对应方法,方法名在此基础上添加了With
前缀,并接收UnaryClientInterceptor
,这里只使用服务端作为例子,具体可以在google.golang.org/grpc/dialoptions
包下查看。
让我们在之前的基础上定义一个拦截器,使得调用方法前后分别输出一些信息。 首先,我们需要根据UnaryServerInterceptor
定义一个拦截器,进入这个类型,我们可以看到,其实就是一个方法:
复制 type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
其中UnaryHandler
则是真正执行grpc调用的方法。所有我们需要先实现一个UnaryServerInterceptor
:
复制 interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// 前置处理
fmt. Printf ( "Before RPC handling. Info: %+v " , info)
// 调用方法
resp, err := handler (ctx, req)
fmt. Printf ( "After RPC handling. resp: %+v " , resp)
// 后置处理
return resp, err
}
然后通过grpc.UnaryInterceptor()
或者grpc.ChainUnaryInterceptor()
传入Server配置即可:
复制 s = grpc. NewServer (grpc. UnaryInterceptor (interceptor))
每次重新启动我们都需要再手动输入两次命令来启动服务端,我们可以使用goreman
这样的工具来管理多进程。具体使用可以参阅《Goreman 基本用法》 。 首先我们定义Procfile文件:
复制 server55: ./server -addr="localhost:50055"
server56: ./server -addr="localhost:50056"
然后在命令行启动:
复制 $ goreman start
15:45:40 server55 | Starting server55 on port 5000
15:45:40 server56 | Starting server56 on port 5100
15:45:40 server56 | 222/02/23 15:45:40 server listening at 127.0.0.1:50056
15:45:40 server55 | 2022/02/23 15:45:40 server listening at 127.0.0.1:50055
然后启动客户端请求,查看服务端控制台输出:
复制 15:45:54 server56 | Before RPC handling. Info: &{Server:0xc000126618 FullMethod:/helloworld.Greeter/SayHello}2022/02/23 15:45:54 Received: zhangsan
15:45:54 server55 | Before RPC handling. Info: &{Server:0xc0000ae618 FullMethod:/helloworld.Greeter/SayHello}2022/02/23 15:45:54 Received: zhangsan
15:45:54 server56 | After RPC handling. resp: message:"localhost:50056:Hello zhangsan"Before RPC handling. Info: &{Server:0xc000126618 FullMethod:/helloworld.Greeter/SayHello}2022/02/23 15:45:54 Received: zhangsan
15:45:54 server55 | After RPC handling. resp: message:"localhost:50055:Hello zhangsan"Before RPC handling. Info: &{Server:0xc0000ae618 FullMethod:/helloworld.Greeter/SayHello}2022/02/23 15:45:54 Received: zhangsan
15:45:54 server56 | After RPC handling. resp: message:"localhost:50056:Hello zhangsan"Before RPC handling. Info: &{Server:0xc000126618 FullMethod:/helloworld.Greeter/SayHello}2022/02/23 15:45:54 Received: zhangsan
15 : 45 : 54 server55 | After RPC handling. resp: message: "localhost:50055:Hello zhangsan"
15 : 45 : 54 server56 | After RPC handling. resp: message: "localhost:50056:Hello zhangsan"
gRPC-Gateway
Gateway
除了gRPC,有时候我们也需要进行RESTful调用,gRPC-Gateway就是一个用于解决这个问题的工具。其会根据proto文件,生成对应的代码,反向代理来自RESTful的请求,并转换成gRPC调用。上一张官网的经典图:
首先我们需要安装gRPC-Gateway:
复制 $ go install github.com / grpc - ecosystem / grpc - gateway / v2 / protoc - gen - grpc - gateway
然后修改原来的proto文件:
复制 + import "google/api/annotations.proto" ;
// The greeting service definition.
service Greeter {
// Sends a greeting
- rpc SayHello (HelloRequest) returns (HelloReply) {}
+ rpc SayHello (HelloRequest) returns (HelloReply) {
+ option (google.api.http) = {
+ post: "/v1/say_hello"
+ body: "*"
+ };
+ }
}
Makefile添加执行命令生成相关代码:
复制 # gRPC Gateway
PROTO_GRPC_FILES = $( shell find ./gateway/* -name *.proto )
proto_gateway:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
--grpc-gateway_out ./ \
--grpc-gateway_opt logtostderr= true \
--grpc-gateway_opt paths=source_relative \
--grpc-gateway_opt generate_unbound_methods= true \
$( PROTO_GRPC_FILES )
这里需要注意的是,在proto文件中,我们添加了引入import "google/api/annotations.proto";
,protoc默认会从执行命令的路径查找引入,所以我们需要从googleapis 上下载对应的文件到执行命令的目录,然后再执行protoc命令:
复制 $ tree - L 3
.
├── Makefile
├── gateway
│ └── hellogrpc.proto
├── go .mod
├── go .sum
└── google
└── api
├── annotations.proto
├── gogo.proto
├── http.proto
└── httpbody.proto
执行完之后我们可以看到生成了四个文件:
复制 hellogrpc.pb. go
hellogrpc.pb.gw. go
hellogrpc.proto
hellogrpc_grpc.pb. go
其中hellogrpc.pb.gw.go
包含的就是处理反向代理的相关代码。
我们还是需要先实现Server,然后启动服务端代码:
复制 type Server struct {
UnimplementedGreeterServer
}
func (s * Server ) SayHello (ctx context . Context , in * HelloRequest ) ( * HelloReply , error ) {
log. Printf ( "Received: %v " , in. GetName ())
return & HelloReply {Message: "Hello " + in. GetName ()}, nil
}
func TestServer (t * testing . T ) {
var addr = "localhost:50052"
var s * grpc . Server
s = grpc. NewServer ()
RegisterGreeterServer (s, & Server {})
log. Printf ( "server listening at %v " , lis. Addr ())
lis, _ := net. Listen ( "tcp" , addr)
s. Serve (lis);
}
然后在客户端通过hellogrpc.pb.gw.go
中的方法RegisterGreeterHandlerFromEndpoint
直接启动RESTful客户端:
复制 func TestGateway (t * testing . T ) {
endpoint := "localhost:50052"
gatewayMux := runtime. NewServeMux ()
dopts := [] grpc . DialOption {grpc. WithInsecure ()}
err := RegisterGreeterHandlerFromEndpoint (context. Background (), gatewayMux, endpoint, dopts)
if err != nil {
t. Fatal (err)
}
err = http. ListenAndServe ( ":8081" , gatewayMux)
if err != nil {
t. Fatal (err)
}
}
最后通过http请求:
复制 curl --location --request POST 'http://localhost:8081/v1/say_hello' \
--header 'Content-Type: application/json' \
--data-raw '{
"name":"zhangsan"
}'
{ "message" : "Hello zhangsan" }
成功获取到返回信息。
OpenAPI
通常我们的HTTP服务都需要提供相应的API文档,我们可以通过插件protoc-gen-openapi
帮助我们通过proto
文件来自动生成文档。在此之前,需要先了解一些背景知识。OpenAPI
是Linux基金会下的一个开源项目,其实就是一个关于HTTP接口描述的规范。比如在postman或者apifox中导入接口时都支持OpenAPI
的格式文件。
我们需要先安装插件:
复制 $ go install github.com/google/gnostic/cmd/protoc-gen-openapi@v0.6.8
还是使用刚才的proto文件,执行一下命令:
复制 # gRPC Gateway
PROTO_GRPC_FILES = $( shell find ./gateway/* -name *.proto )
proto_gateway:
protoc --openapi_out=./gateway $( PROTO_GRPC_FILES )
会发现在./gateway
目录下生成了一个openapi.yaml
文件。可以使用swagger-ui来渲染该文件,通过docker可以快速的在本地启动起来:
复制 docker run -p 19999:8080 -e SWAGGER_JSON=/foo/openapi.yaml -v ` pwd ` /gateway:/foo swaggerapi/swagger-ui
etcd Discovery
etcd
是一个分布式的key-value存储系统,并且提供了一个gRPC resolver
来根据服务名来找到对应的gRPC服务。底层机制是监听并修改以服务名作为前缀的一系列key。
让我们快速启动一个本地etcd集群,并用它来实现gRPC的服务发现。etcd的下载和安装可以参见这里 。 首先,需要准备Procfile
,然后通过goreman
快速启动etcd集群:
复制 etcd1: ./etcd --name infra1 --listen-client-urls http://127.0.0.1:2379 --advertise-client-urls http://127.0.0.1:2379 --listen-peer-urls http://127.0.0.1:2380 --initial-advertise-peer-urls http://127.0.0.1:2380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:2380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof --logger=zap --log-outputs=stderr
etcd2: ./etcd --name infra2 --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:2380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof --logger=zap --log-outputs=stderr
etcd3: ./etcd --name infra3 --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:2380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof --logger=zap --log-outputs=stderr
然后需要对之前的gRPC-Gateway中的代码进行简单改造。当启动Server
的时候,需要通过etcd client
连接etcd集群,并将自己的服务名和地址,注册到etcd中。如果该Server
不能再提供服务,那么应该从etcd中删除相关信息。考虑到一些意外情况导致Server
不能提供服务,我们可以利用etcd的租约,定期向etcd续租服务时间。 注册服务需要在启动Server
的时候调用,代码如下:
复制 func registerServer (ctx context . Context ) error {
// 连接etcd
client, err := etcd. New ( etcd . Config {
Endpoints: [] string { "localhost:2379" , "localhost:22379" , "localhost:32379" },
DialTimeout: time.Second * 5 ,
})
if err != nil {
return err
}
serverName := "grpc/hello_etcd"
addr := "localhost:50052"
manager, err := endpoints. NewManager (client, serverName)
if err != nil {
return err
}
lease := etcd. NewLease (client)
leaseResp, err := lease. Grant (ctx, 30 )
if err != nil {
return errors. Wrapf (err, "EtcdClient获取租约失败!" )
}
go func () { lease. KeepAlive (ctx, leaseResp.ID) }() // 续约
return manager. AddEndpoint (ctx, serverName + "/" + addr, endpoints . Endpoint {Addr: addr}, etcd. WithLease (leaseResp.ID))
}
在启动Client
的时候,需要为连接创建一个etcd提供的resolver
:
复制 func TestGateway (t * testing . T ) {
etcdClient, err := etcd. New ( etcd . Config {
Endpoints: [] string { "localhost:2379" , "localhost:22379" , "localhost:32379" },
DialTimeout: time.Second * 5 ,
})
builder, err := resolver. NewBuilder (etcdClient)
if err != nil {
log. Fatal (err)
}
endpoint := "etcd:///grpc/hello_etcd"
gatewayMux := runtime. NewServeMux ()
dopts := [] grpc . DialOption {
grpc. WithTransportCredentials (insecure. NewCredentials ()),
grpc. WithResolvers (builder),
}
err = RegisterGreeterHandlerFromEndpoint (context. Background (), gatewayMux, endpoint, dopts)
if err != nil {
t. Fatal (err)
}
err = http. ListenAndServe ( ":8081" , gatewayMux)
if err != nil {
t. Fatal (err)
}
}
值得注意的是,这里传入以创建连接的endpoint
需要添加Scheme
和Authority
前缀,才能被我们创建的resolver
处理。
将Server
和Client
启动起来,然后通过curl请求:
复制 curl --location --request POST 'http://localhost:8081/v1/say_hello' \
--header 'Content-Type: application/json' \
--data-raw '{
"name":"zhangsan"
}'
{ "message" : "Hello zhangsan" }
成功返回结果。
Protogen
我们可以使用gRPC
提供的编译器,来实现一些自定义的生成器。其代码在包google.golang.org/protobuf/compiler/protogen
中。我们先通过一个简单的例子来了解一下protogen
的使用。 首先创建一个单独的mod(命名需要以protoc-gen为前缀)
并编写如下main
函数:
复制 func main () {
v := "1"
options := protogen . Options {
ParamFunc: func (name, value string ) error {
if name == "version" {
v = value
}
return nil
},
}
options. Run ( func (gen * protogen . Plugin ) error {
gen.SupportedFeatures = uint64 (pluginpb.CodeGeneratorResponse_FEATURE_PROTO3_OPTIONAL)
for _, f := range gen.Files {
if ! f.Generate {
continue
}
g := gen. NewGeneratedFile ( "aaa.go" , f.GoImportPath)
g. P ( "// Code generated by protoc-gen-my. DO NOT EDIT." )
g. P ( "// Version: " , v)
g. P ()
g. P ( "package " , "gen" )
g. P ()
g. P ( "// " , protogen. GoImportPath ( "context" ). Ident ( "" ))
}
return nil
})
}
然后我们将该mod
安装,然后通过protoc
便可以使用:
复制 go install .
protoc --proto_path=./protoc-gen-my --my_out=version=1.1,paths=source_relative:./protoc-gen-my/gen ./protoc-gen-my/gen/hellogrpc.proto
可以看到,--my_out
选项设置的值会被传递到我们自定义的插件protoc-gen-my
。即上面的代码protogen.Options.ParamFunc
的方法。paths
参数属于protocgen
的默认参数,不需要我们手动处理。 执行后我们可以看到生成的文件aaa.go
:
复制 // Code generated by protoc-gen-my. DO NOT EDIT.
// Version: 1.1
package gen
import (
context "context"
)
// context.
这些内容都是根据Options.Run()
方法中的内容输出的,我们着重看一下import
的部分。这一部分是通过protogen.GoImportPath()
来添加的。进入*GeneratedFile.P()
->*GeneratedFile.QualifiedGoIdent()
我们可以看到添加的包被添加到了GeneratedFile
:
复制 func (g * GeneratedFile ) QualifiedGoIdent (ident GoIdent ) string {
...
packageName := cleanPackageName (path. Base ( string (ident.GoImportPath)))
for i, orig := 1 , packageName; g.usedPackageNames[packageName]; i ++ {
packageName = orig + GoPackageName (strconv. Itoa (i))
}
g.packageNames[ident.GoImportPath] = packageName
g.usedPackageNames[packageName] = true
return string (packageName) + "." + ident.GoName
}
而具体在哪里用到呢?我们可以在Options.Run()
中看到最终输出的文件是在执行了我们传入的方法后,通过调用gen.Response()
来得到的:
复制 func (opts Options ) Run (f func ( * Plugin ) error ) {
if err := run (opts, f); err != nil {
fmt. Fprintf (os.Stderr, " %s : %v \n" , filepath. Base (os.Args[ 0 ]), err)
os. Exit ( 1 )
}
}
func run (opts Options , f func ( * Plugin ) error ) error {
...
if err := f (gen); err != nil {
...
}
resp := gen. Response () //
out, err := proto. Marshal (resp)
if err != nil {
return err
}
if _, err := os.Stdout. Write (out); err != nil {
return err
}
return nil
}
进入*Plugin.Response()
方法,可以看到对genFiles
的遍历,并且可以看到通过genFiles
的Content()
方法获取到了返回到内容:
复制 func (gen * Plugin ) Response () * pluginpb . CodeGeneratorResponse {
...
for _, g := range gen.genFiles {
...
content, err := g. Content ()
...
resp.File = append (resp.File, & pluginpb . CodeGeneratorResponse_File {
Name: proto. String (filename),
Content: proto. String ( string (content)),
})
...
}
进入*GeneratedFile.Content()
方法:
复制 func (g * GeneratedFile ) Content () ([] byte , error ) {
...
for importPath := range g.packageNames { // 来源于protogen.GoImportPath()的添加
pkgName := string (g.packageNames[ GoImportPath (importPath)])
pkgPath := rewriteImport ( string (importPath))
importPaths = append (importPaths, [ 2 ] string {pkgName, pkgPath})
}
...
// 通过AST添加引入的包
if len (importPaths) > 0 {
...
impDecl := & ast . GenDecl {
Tok: token.IMPORT,
TokPos: pos,
Lparen: pos,
Rparen: pos,
}
...
file.Decls = append ([] ast . Decl {impDecl}, file.Decls ... ) //
}
var out bytes . Buffer
if err = (&printer.Config{Mode: printer.TabIndent | printer.UseSpaces, Tabwidth: 8}).Fprint(&out, fset, file); err != nil {
return nil , fmt. Errorf ( " %v : can not reformat Go source: %v " , g.filename, err)
}
return out. Bytes (), nil
}
可以看到,最后通过AST将文件最后输出,所以在我们生成内容有语法错误的时候会有错误提示。
此外,我们这里的例子相对简单,大部分情况我们需要根据proto
文件中的内容来生成相应的内容。在Options.Run()
方法的*protogen.Plugin.Files
中,提供了proto
文件中的一些内容,我们可以直接获取:
复制 type File struct {
Desc protoreflect . FileDescriptor
Proto * descriptorpb . FileDescriptorProto
GoDescriptorIdent GoIdent // name of Go variable for the file descriptor
GoPackageName GoPackageName // name of this file's Go package
GoImportPath GoImportPath // import path of this file's Go package
Enums [] * Enum // top-level enum declarations
Messages [] * Message // top-level message declarations
Extensions [] * Extension // top-level extension declarations
Services [] * Service // top-level service declarations
Generate bool // true if we should generate code for this file
// GeneratedFilenamePrefix is used to construct filenames for generated
// files associated with this source file.
//
// For example, the source file "dir/foo.proto" might have a filename prefix
// of "dir/foo". Appending ".pb.go" produces an output file of "dir/foo.pb.go".
GeneratedFilenamePrefix string
location Location
}