一个gRPC Proxy的实现方案
可用性、可靠性和扩展性是衡量后台服务的基本标准,HTTP反向代理,是任何一个提供大型Web服务后台所必备的,用以提高服务的这些基础参数,且通过支持到负载均衡而进一步提升服务性能。然而,随着微服务框架的盛行,RPC技术也已经开始承载大量的微服务之间的通信,在众多RPC技术中,gRPC是Google开源的通用高性能RPC框架,因此,一个支持gRPC的反向代理的需求应运而生。
gRPC底层通信使用了HTTP/2技术,关于HTTP/2相对于HTTP/1.1的区别和优势,有兴趣的同学,可以通过下面的链接详细了解HTTP/2.0 相比1.0有哪些重大改进,有助于增加本文的理解。在此,我们不会详细的去谈HTTP/2的特性,简单的总结,之所以使用HTTP/2,主要是由于HTTP/2的这两点:
Stream是HTTP/2中是一个逻辑上的概念,指的是在一个TCP连接上,我们可以向对方不断发送一个个的消息,这里每一个消息即是一个frame,而每一个frame中有个stream identifier的字段标明这一帧属于哪个“stream”。当对方接收到每一个frame时,根据stream identifier拼接每个“stream”的所有frame组成一整块数据。
众所周知,HTTP/1.1中的已经有了keepAlive功能,相比于KeepAlive,在HTTP/2中的通过分帧技术实现了Multiplexing,从而更高效的实现了数据传输,一个速度展示的示例:http2响应速度展示
下图是HTTP/1.1与HTTP/2的数据传输区别图示:
接下来的内容,建立在读者对gRPC已经有了一个很好的使用和了解的基础上。同时,也依然强烈建议通过阅读从源码透析gRPC调用原理一文来较深入地理解gRPC调用过程中的一些细节实现,从而更好地理解为什么会有下文的实现思路。
在说到具体如何实现grpc的proxy之前,我们先来分析一下为了实现gRPC的反向代理,我们需要做到什么。作为代理,一言以蔽之,需要做到的是两件事:
在gRPC中,为了实现上述功能,主要有这些特性需要实现:
我们知道,gRPC在收到消息后,会根据protobuf的codec来对消息进行编解码。然而,在proxy的内部,其既需要作为server端接收数据,又需要作为client端发送数据。而对于数据在proxy内部转发的时候,不需要对其进行任何形式的编解码(换句话说,只需要将数据当成原始裸数据,直接转发即可),不可以采用gRPC自带的codec。因此,在gRPC中,对于收到的protobuf消息,我们需要采用默认的protobuf codec,而对于proxy内部的数据转发我们需要一个对协议的无感知仅作转发的codec。
为了实现在编解码中说到的,在前后端服务之间做到数据的正确转发,当proxy接收到任何一个请求流之后,需要根据该流携带的信息,判断出正确的对应的目的方,并建立到该目的方的链接connection,从而将请求流通过该connection发送到目的方。
当获得到特性2中的connection之后,便需要一个数据处理的handler,通过该connection去真正操作数据的转发。关于该handler的实现,根据在上文中分析,需要同时做到一边接收请求方数据并推送到响应方,一边接收响应方数据并回复到请求方。
当实现了上述三点之后,proxy的基本功能已经可以满足了。但是,负载均衡的功能一般都是会附带支持的,这里可以可以实现一个balancer的接口,通过不同的负载均衡算法对balancer的实现来支持到不同的负载均衡算法。
无论是请求方的数据流,还是响应方的数据流,对于proxy服务来说都是数据流的进入,也即是proxy需要作为一个server的身份来处理这些请求。因此,proxy需要作为gRPC Server来启动,也即是:
s := grpc.NewServer()
在gRPC中,通过从源码透析gRPC调用原理一文,我们了解到作为server启动时,支持一系列的ServerOption来实现gRPC的一些服务端配置。为了实现特性1中说到的特殊的编解码,我们需要借助grpc.CustomCodec()方法来实现:
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
o.codec = codec
}
}
type Codec interface {
// Marshal returns the wire format of v.
Marshal(v interface{}) ([]byte, error)
// Unmarshal parses the wire format into v.
Unmarshal(data []byte, v interface{}) error
// String returns the name of the Codec implementation. This is unused by
// gRPC.
String() string
}
CustomCodec()函数返回一个ServerOption类型的实例在NewServer()时作为参数传入,从而用于设定grpc中消息的Marshal和Unmarshal,也即是在初始化server时我们需要这样来写:
grpc.NewServer(
grpc.CustomCodec(myCodec))
其中,myCodec即是一个需要我们去实现的,一个支持到了grpc.Codec接口的类型的实例。我们将通过下面的Codec()函数来返回该实例,从而支持到了自定义codec:
// 返回了一个grpc.Codec类型的实例,
// 以protobuf原生codec为默认codec,实现了一个透明的Marshal和UnmarshMal
func Codec() grpc.Codec {
return CodecWithParent(&protoCodec{})
}
// 一个协议无感知的codec实现,返回一个grpc.Codec类型的实例
// 该函数尝试将gRPC消息当作raw bytes来实现,当尝试失败后,会有fallback作为一个后退的codec
func CodecWithParent(fallback grpc.Codec) grpc.Codec {
return &rawCodec{fallback}
}
// 自定义codec类型,
// 实现了grpc.Codec接口中的Marshal和Unmarshal
// 成员变量parentCodec用于当自定义Marshal和Unmarshal失败时的回退codec
type rawCodec struct {
parentCodec grpc.Codec
}
type frame struct {
payload []byte
}
// 序列化函数,
// 尝试将消息转换为*frame类型,并返回frame的payload实现序列化
// 若失败,则采用变量parentCodec中的Marshal进行序列化
func (c *rawCodec) Marshal(v interface{}) ([]byte, error) {
out, ok := v.(*frame)
if !ok {
return c.parentCodec.Marshal(v)
}
return out.payload, nil
}
// 反序列化函数,
// 尝试通过将消息转为*frame类型,提取出payload到[]byte,实现反序列化
// 若失败,则采用变量parentCodec中的Unmarshal进行反序列化
func (c *rawCodec) Unmarshal(data []byte, v interface{}) error {
dst, ok := v.(*frame)
if !ok {
return c.parentCodec.Unmarshal(data, v)
}
dst.payload = data
return nil
}
func (c *rawCodec) String() string {
return fmt.Sprintf("proxy>%s", c.parentCodec.String())
}
//-----------------------
// protoCodec实现protobuf的默认的codec
type protoCodec struct{}
func (protoCodec) Marshal(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message))
}
func (protoCodec) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message))
}
func (protoCodec) String() string {
return "proto"
}
关于特性2中的流转发器,需要根据请求流的meta信息,判断出正确的对应的目的方,返回一个正确到目的方的connection,从而方便之后的handler在拿到正确的connection之后,实现数据包的透明转发。
为了去获取到请求流的meta信息,我们有两种方式:
第一,我们可以结合gRPC协议的特性,根据grpc.MethodFromServerStream()函数,从grpc client的请求中剥离出调用的接口名fullMethodName:
// 该函数根据请求的strem,返回请求的方法名(string类型)
// 返回的字符串格式为:"/service/method".
func MethodFromServerStream(stream ServerStream) (string, bool) {
return Method(stream.Context())
}
第二,同样是依托于gRPC的特性,我们利用了其metadata,通过在请求方传入metadata参数,当proxy收到请求流之后在读取出metadata,从而根据metada来进行endpoint的选择。
无论采用哪种方式,当我们找到endpoint并与之建立链接后,便可以得到期望的connection,下方是一个StreamDirector的实现:
func GetDirector() func(context.Context, string) (context.Context, *grpc.ClientConn, error) {
return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
// 获取配置信息
appConfig := config.GetViper(inited.ConfigPrefix)
var cfg lconfig.Config
err := appConfig.Unmarshal(&cfg)
if err != nil {
log.LoggerWrapperWithCaller().Errorf(err.Error())
return nil, nil, err
}
// 寻找对应到fullMethodName的endpoint
for _, backend := range cfg.Backends {
// 仅转发外部的请求
if strings.HasPrefix(fullMethodName, backend.Filter) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
err := fmt.Errorf("incoming metadata is empty")
log.LoggerWrapperWithCaller().Errorf(err.Error())
return nil, nil, err
}
outCtx, _ := context.WithCancel(ctx)
outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())
if ok {
// 根据gRPC中的metadata,获取到对应的endpoint
if val, exists := md[MODULE_TEXT]; exists && inSlice(val[0], backend.Module) {
log.LoggerWrapperWithCaller().Debugf("Found md caller [%v]", val)
// 做一个负载均衡
endpoint, err := getBackendByRR(backend)
if err != nil {
log.LoggerWrapperWithCaller().Errorf("grpc.DialContext failed: ", err.Error())
return nil, nil, err
}
log.LoggerWrapperWithCaller().Debugf("balanced, redirecting to [%v]", endpoint)
// 根据获取到的endpoint,建立到目的方的connection
// 同时,需要配置客户端codec为我们自定义的codec
conn, err := grpc.DialContext(ctx, endpoint, grpc.WithCodec(proxy.Codec()), grpc.WithInsecure())
if err != nil {
log.LoggerWrapperWithCaller().Errorf("grpc.DialContext failed: ", err.Error())
return nil, nil, err
}
return outCtx, conn, err
}
}
}
}
return nil, nil, fmt.Errorf("Unknown method")
}
}
在上述代码中,有三点需要额外解释的:
首先,在调用gRPC.DialContext()创建连接的时候,我们传入了自定义codec用以配置client端的编解码,在从源码透析gRPC调用原理一文中有对该实现的原理的具体分析。
其次,appConfig是从配置文件加载到的数据,下面列了一个示例配置。在demo中,proxy代理了两种类型的服务:
且两种服务均有两份实例进行运行着。对于crypto服务,运行在"127.0.0.1:12001"和"127.0.0.1:12002",对于cos服务的两个运行实例,则是"127.0.0.1:33001"和"127.0.0.1:33002"。多份实例存在的原因是为了实现负载均衡,下文将会单独介绍这一功能。
[project]
name = "grpc-proxy"
version = "1.0.0"
[server]
ip = "0.0.0.0"
port = 50051
[[backends]]
tag = 0
backend = ["127.0.0.1:12001","127.0.0.1:12002"]
filter = "/cloud.crypto.pb.CryptoService"
module = ["crypto"]
[[backends]]
tag = 1
backend = ["127.0.0.1:33001","127.0.0.1:33002"]
filter = "/cloud.cos.pb.CosService"
module = ["cos"]
最后,需要注意的是,通过gRPC请求的fullMethodName,显而易见的是我们完全能够支持到接口级别的代理,根据不同的接口扔到不同的后端服务中。与此同时,通过支持了metadata来进行endpoint的选择,那么就完全可以实现任意自定义的路由组合。
作为server来启动时,为了实现一个协议无感知的代理,我们需要利用到了grpc中的UnknownServiceHandler()接口。同样地,UnknownServiceHandler()是返回一个ServerOption类型的实例在NewServer()时作为参数传入,其主要功能是支持了一个自定义的对未知服务的handler。通过配置了该方法,当grpc server接收到一个未注册的服务时,不再返回一个“unimplemented”的gRPC错误,而是通过我们实现的handler来进行服务,从而实现了协议无感知的proxy。也是因此,我们的handler一定是一个bidi-streaming RPC handler。因此,我们启动服务时的命令变成了这样:
grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)))
下面,就让我们来看下,如何实现这样一个bidi-streaming的handler,来完成流的转发。
// 该handler以gRPC server的模式来接受数据流,并将受到的数据转发到指定的connection中
func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error {
// 获取请求流的目的接口名称
fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
if !ok {
return grpc.Errorf(codes.Internal, "failed to get method from server stream")
}
// 该director即为上述的StreamDirector,获取到对应的目的方connection
outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName)
if err != nil {
return err
}
defer backendConn.Close()
// 封装为clientStream
clientCtx, clientCancel := context.WithCancel(outgoingCtx)
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
if err != nil {
return err
}
// 启动流控,目的方->请求方
s2cErrChan := s.forwardServerToClient(serverStream, clientStream)
// 启动流控,请求方->目的方
c2sErrChan := s.forwardClientToServer(clientStream, serverStream)
// 数据流结束处理 & 错误处理
for i := 0; i < 2; i++ {
select {
case s2cErr := <-s2cErrChan:
if s2cErr == io.EOF {
// 正常结束
clientStream.CloseSend()
break
} else {
// 错误处理 (如链接断开、读错误等)
clientCancel()
return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
}
case c2sErr := <-c2sErrChan:
// 设置Trailer
serverStream.SetTrailer(clientStream.Trailer())
if c2sErr != io.EOF {
return c2sErr
}
return nil
}
}
return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}
func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
ret := make(chan error, 1)
go func() {
// 设置*frame结构作为RecvMsg的参数,
// *frame即为我们自定义codec中使用到的数据结构
f := &frame{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err
break
}
if i == 0 {
// grpc中客户端到服务器的header只能在第一个客户端消息后才可以读取到,
// 同时又必须在flush第一个msg之前写入到流中。
md, err := src.Header()
if err != nil {
ret <- err
break
}
if err := dst.SendHeader(md); err != nil {
ret <- err
break
}
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}
func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
ret := make(chan error, 1)
go func() {
f := &frame{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
ret <- err
break
}
if err := dst.SendMsg(f); err != nil {
ret <- err
break
}
}
}()
return ret
}
其中,实现流控的两个函数分别是:
我们通过forwardServerToClient()来详细介绍如何实现转发的。在forwardServerToClient()中,启动了一个go程,持续着接收来自于src(也就是server端)的数据流,并在收到后将数据转发到dst(client端)。
对于src.RecvMsg(f),最终会调用grpc/rpc_util.go中的recv()函数,除去一些不关键的代码,我们看到,其通过codec中的Unmarshal()将接收到的data反序列化到了参数m中,而m就是src.RecvMsg(f)中的f实参。
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) error {
pf, d, err := p.recvMsg(maxReceiveMessageSize)
...
if err := c.Unmarshal(d, m); err != nil {
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
...
return nil
}
而在这儿使用到的codec,即是我们上文中实现的codec,因为f是我们定义的frame{}结构,所以自定义的dst, ok := v.(*frame)会得到成功执行,并通过frame.payload承载原始的data数据。
func (c *rawCodec) Unmarshal(data []byte, v interface{}) error {
dst, ok := v.(*frame)
if !ok {
return c.parentCodec.Unmarshal(data, v)
}
dst.payload = data
return nil
}
同样,在dst.SendMsg(f)时,也将会通过rawCodec实现的Marshal()方法,将frame中的payload取出并发送。从而也就实现了对于proxy内部的数据的透明传输。
func (c *rawCodec) Marshal(v interface{}) ([]byte, error) {
out, ok := v.(*frame)
if !ok {
return c.parentCodec.Marshal(v)
}
return out.payload, nil
}
如上所说,负载均衡并不是proxy所必须的,可以说是一个锦上添花的附加功能。为了实现负责均衡,就需要在director中获取到connection之前运用负载均衡算法来获取到endpoint:
// a roundrobin balancer
endpoint, err := getBackendByRR(backend)
我们在这儿实现的是一个简单的RoundRobin的负载均衡,通过RoundRobin算法拿到合适的目的方endpoint并通过grpc.Dial()建立到目的方的链接,获取connection。
func getBackendByRR(backend lconfig.BackendConfig) (endpoint string, err error) {
tag := backend.Tag
endpoints := backend.Backend
if rrPicker == nil {
err = fmt.Errorf("getBackendByRR failed, picker is nil")
log.LoggerWrapperWithCaller().Errorf(err.Error())
return
}
iendpoints := make([]interface{}, 0)
for _, v := range endpoints {
iendpoints = append(iendpoints, v)
}
rrPicker.SetCandidates(tag, iendpoints)
end, err := rrPicker.Pick(tag)
if err != nil {
log.LoggerWrapperWithCaller().Errorf(err.Error())
return
}
endpoint, ok := end.(string)
if !ok {
err = fmt.Errorf("Pick reply format failed, [%v]", end)
log.LoggerWrapperWithCaller().Errorf(err.Error())
return
}
return
}
func (r *RoundRobinPicker) Pick(tag int) (interface{}, error) {
r.mu.Lock()
defer r.mu.Unlock()
candidate, ok := r.candidates[tag]
if !ok || len(candidate) <= 0 {
err := fmt.Errorf("roundrobin candidates [%v] is empty", tag)
return nil, err
}
if r.next[tag] >= len(candidate) {
r.next[tag] = 0
}
sc := candidate[r.next[tag]]
r.next[tag] = (r.next[tag] + 1) % len(candidate)
return sc, nil
}
gRPC,作为google开源的高性能RPC方案,已经日趋成熟且被众多框架中使用了。这次因为开发项目中有了对gRPC proxy的需求,结合互联网这一广大的资源集合处来实现,同时也是借助了这次机会,更多的了解了gRPC的实现。不得不说,gRPC中的知识点还是很多的,其深层的处理数据的方式,诸如数据包的接收、HTTP/2协议的处理等,都是有着很多细节的实现以至于在本文中没有更多的精力去一一解读。如果大家在阅读本文中有一些实现的疑问,可以留言,也可以去读一读源码,我相信,一定是会收获很多的!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。