问题: 1. 代码生成怎么做? 2. 如何实现一个类似的parser
, 本质是显示protoc对一个插件,而protoc对于插件对实现比较直接,protoc会按照protobuf的相关定义解析protoc文件,然后把解析的结果传入插件的stdin
, 然后从插件的stdout
├── descriptor:
├── jsonpb:
├── proto: library部分
├── protoc-gen-go: 插件部分
├── ptypes: protobuf里面的各种类型
// Message is implemented by generated protocol buffer messages.
// Message 代表生成的数据结构,
// lib.go
type Message interface {
Reset() // 重置为0值结构体
String() string // string格式的pb内容,做了unmarshal
ProtoMessage() // 没有实现
// 内部使用了一个Buffer做数据存储,以marshal\unmarshal
// lib.go
type Buffer struct {
buf []byte // encode/decode byte stream
index int // read point
deterministic bool // deterministic 模式(尽力)same message =》 same bytes
// varint-encoded integer => int32, int64, uint32, uint64, bool, and enum protocol buffer types.
// 类似的函数还有 DecodeFixed64, DecodeFixed32, DecodeZigzag64, DecodeRawBytes, DecodeStringBytes...
// 和数据格式设计有关:参考 https://blog.csdn.net/erlib/article/details/46345111
// 里面比较关键的设计有:
// 1. 基于128bits的数值存储方式(Base 128 Varints):每块数据由接连的若干个字节表示(小的数据用1个字节就可以表示),每个字节最高位标识本块数据是否结束(1:未结束,0:结束),低7位表示数据内容。
// 2. 基于序号的协议字段映射 (类似key-value结构)序列号是key,很关键
// 3. 基于无符号数的带符号数表示(ZigZag 编码)
// 4. 协议数据结构 data1_head + data1 ... data表示一个数据如int 0yyyyxxx x: 数据类型 y: 字段序号
// decode.go
func (p *Buffer) DecodeVarint() (x uint64, err error)
// tag 指的是 字段序号
type tagMap struct {
fastTags []int // 优化,默认为-1,如果对应index 值>=0,那么值就是一个字段序号
slowTags map[int]int
// 代表结构体struct
// StructProperties represents properties for all the fields of a struct.
// decoderTags and decoderOrigNames should only be used by the decoder.
type StructProperties struct {
Prop []*Properties // properties for each field
reqCount int // required count
decoderTags tagMap // map from proto tag to struct field number
decoderOrigNames map[string]int // map from original name to struct field number
order []int // list of struct field numbers in tag order
// OneofTypes contains information about the oneof fields in this message.
// It is keyed by the original name of a field.
OneofTypes map[string]*OneofProperties
// 代表结构体的一个field
// Properties represents the protocol-specific behavior of a single struct field.
// 例如一个字段转化成go加上了 `protobuf:"varint,1,opt,name=Id,proto3" json:"Id,omitempty"` 这里面的就是Properties
// 有Parse函数能解析,String函数生成, 定义proto => 生成 => go结构体 => 解析 => 数据到二进制
type Properties struct {
Name string // name of the field, for error messages
OrigName string // original name before protocol compiler (always set)
JSONName string // name to use for JSON; determined by protoc
Wire string
WireType int
Tag int
Required bool
Optional bool
Repeated bool
Packed bool // relevant for repeated primitives only
Enum string // set for enum types only
proto3 bool // whether this is known to be a proto3 field
oneof bool // whether this is a oneof field
Default string // default value
HasDefault bool // whether an explicit default was provided
stype reflect.Type // set for struct types only
sprop *StructProperties // set for struct types only
mtype reflect.Type // set for map types only
MapKeyProp *Properties // set for map types only
MapValProp *Properties // set for map types only
// 生成的代码里面使用这个结构做如 Unmarshal/Marshal/Merge等等操作
// 一个例子: xxx_messageInfo_ServiceConfig 就是InternalMessageInfo类型
// func (m *S) XXX_Unmarshal(b []byte) error {
// return xxx_messageInfo_S.Unmarshal(m, b)
// }
// InternalMessageInfo is a type used internally by generated .pb.go files.
// This type is not intended to be used by non-generated code.
// This type is not subject to any compatibility guarantee.
type InternalMessageInfo struct {
marshal *marshalInfo
unmarshal *unmarshalInfo
merge *mergeInfo
discard *discardInfo
// table_unmarshal.go
// Unmarshal is the entry point from the generated .pb.go files.
// This function is not intended to be used by non-generated code.
// This function is not subject to any compatibility guarantee.
// msg contains a pointer to a protocol buffer struct.
// b is the data to be unmarshaled into the protocol buffer.
// a is a pointer to a place to store cached unmarshal information.
func (a *InternalMessageInfo) Unmarshal(msg Message, b []byte) error {
// Load the unmarshal information for this message type.
// The atomic load ensures memory consistency.
u := atomicLoadUnmarshalInfo(&a.unmarshal)
if u == nil {
// Slow path: find unmarshal info for msg, update a with it.
u = getUnmarshalInfo(reflect.TypeOf(msg).Elem())
atomicStoreUnmarshalInfo(&a.unmarshal, u)
// Then do the unmarshaling.
err := u.unmarshal(toPointer(&msg), b)
return err
// 一个go结构 UnMarshal => InternalMessageInfo.UnMarshal => unmarshalInfo.UnMarshal (类型specific, 有cache)
// => 根据field,tag,对不同类型有 unmarshalFieldInfo.unmarshaler 用对应的 unmarshaler做unmarshal
// => decodeVarint (主要就用这个函数,因为各种数字都是这个函数处理,string类型直接读buf)
// 这里的decode相关函数和deocode.go里面的不一样。这里会写到 go结构体的field里面去(pointer)
// decode.go里面的函数可以作为lib 用来做debug
// text_parser.go
// text.go 是解析文本格式的protobuf的方法,并不是parse .proto文件
// grpc/grpc.go
// proto-gen-go 的plugin,生成go结构体的同时,生成grpc的代码,server/client, 以及需要用户实现的interface
// 生成的方式是文本拼接,可读性很差,没有template
// 比如下面的例子
func (g *grpc) generateClientMethod(servName, fullServName, serviceDescVar string, method *pb.MethodDescriptorProto, descExpr string) {
// ....
g.P("func (c *", unexport(servName), "Client) ", g.generateClientSignature(servName, method), "{")
if !method.GetServerStreaming() && !method.GetClientStreaming() {
g.P("out := new(", outType, ")")
// TODO: Pass descExpr to Invoke.
g.P(`err := c.cc.Invoke(ctx, "`, sname, `", in, out, opts...)`)
g.P("if err != nil { return nil, err }")
g.P("return out, nil")
// ....
的protobuf二进制结构, 插件返回 CodeGeneratorResponse
// The version number of protocol compiler.
message Version {
optional int32 major = 1;
optional int32 minor = 2;
optional int32 patch = 3;
// A suffix for alpha, beta or rc release, e.g., "alpha-1", "rc2". It should
// be empty for mainline stable releases.
optional string suffix = 4;
// An encoded CodeGeneratorRequest is written to the plugin's stdin.
message CodeGeneratorRequest {
// 所有的proto文件
repeated string file_to_generate = 1;
// 插件运行参数
optional string parameter = 2;
// proto文件解析后的描述结构,带了所有proto文件的信息,如有多少结构、函数定义,结构有多少字段,每个字段定义等
repeated FileDescriptorProto proto_file = 15;
// The version number of protocol compiler.
optional Version compiler_version = 3;
// The plugin writes an encoded CodeGeneratorResponse to stdout.
message CodeGeneratorResponse {
// Error message. If non-empty, code generation failed. T
optional string error = 1;
// Represents a single generated file.
message File {
// 生成的文件名
optional string name = 1;
// 表示文件已经存在,内容插入到文件的一个位置去
optional string insertion_point = 2;
// 文件内容
optional string content = 15;
repeated File file = 15;
// Describes a complete .proto file.
message FileDescriptorProto {
optional string name = 1; // file name, relative to root of source tree
optional string package = 2; // e.g. "foo", "foo.bar", etc.
repeated string dependency = 3;
repeated int32 public_dependency = 10;.
repeated int32 weak_dependency = 11;
// All top-level definitions in this file.
repeated DescriptorProto message_type = 4;
repeated EnumDescriptorProto enum_type = 5;
repeated ServiceDescriptorProto service = 6;
repeated FieldDescriptorProto extension = 7;
optional FileOptions options = 8;
optional SourceCodeInfo source_code_info = 9;
optional string syntax = 12;
, 生成方式采用字符串拼凑的方式,字符串拼凑的方式较为灵活,但是可读性比较差。
// Fill the response protocol buffer with the generated output for all the files we're
// supposed to generate.
func (g *Generator) generate(file *FileDescriptor) {
g.P("// This is a compile-time assertion to ensure that this generated file")
g.P("// is compatible with the proto package it is being compiled against.")
g.P("// A compilation error at this line likely means your copy of the")
g.P("// proto package needs to be updated.")
g.P("const _ = ", g.Pkg["proto"], ".ProtoPackageIsVersion", generatedCodeVersion, " // please upgrade the proto package")
for _, td := range g.file.imp {
for _, enum := range g.file.enum {
for _, desc := range g.file.desc {
// Don't generate virtual messages for maps.
if desc.GetOptions().GetMapEntry() {
for _, ext := range g.file.ext {
// Run the plugins before the imports so we know which imports are necessary.
// Generate header and imports last, though they appear first in the output.
rem := g.Buffer
remAnno := g.annotations
g.Buffer = new(bytes.Buffer)
g.annotations = nil
// ...
// Reformat generated code and patch annotation locations.
fset := token.NewFileSet()
// ...
fileAST, err := parser.ParseFile(fset, "", original, parser.ParseComments)
// ...
ast.SortImports(fset, fileAST)
// ...
if g.annotateCode {
// Generate the type, methods and default constant definitions for this Descriptor.
func (g *Generator) generateMessage(message *Descriptor) {
// 对于一个
message Book {
string title = 1;
bytes raw_data = 2;
// 会生成
type Book struct {
Title string `protobuf:"bytes,1,opt,name=title,proto3" json:"title,omitempty"`
RawData []byte `protobuf:"bytes,2,opt,name=raw_data,json=rawData,proto3" json:"raw_data,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
func (m *Book) Reset() { *m = Book{} }
func (m *Book) String() string { return proto.CompactTextString(m) }
func (*Book) ProtoMessage() {}
func (*Book) Descriptor() ([]byte, []int) {
return fileDescriptor_ab04eb4084a521db, []int{1}
func (m *Book) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Book.Unmarshal(m, b)
func (m *Book) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Book.Marshal(b, m, deterministic)
func (m *Book) XXX_Merge(src proto.Message) {
xxx_messageInfo_Book.Merge(m, src)
func (m *Book) XXX_Size() int {
return xxx_messageInfo_Book.Size(m)
func (m *Book) XXX_DiscardUnknown() {
var xxx_messageInfo_Book proto.InternalMessageInfo
func (m *Book) GetTitle() string {
if m != nil {
return m.Title
return ""
func (m *Book) GetRawData() []byte {
if m != nil {
return m.RawData
return nil
对于一个proto.go文件,不足以完成所有Unmarshal, Marshal功能,生成的proto.go文件里面会引用 github.com/golang/protobuf/proto
, 作为一个go语言的proto功能库,配合完成如Unmarshal, Marshal功的功能,对于grpc的支持更是如此
proto.plugin -> go文件 引用 -> proto/lib
grpc-gateway是protoc的另一个插件,同时他对protobuf对描述也做了拓展,用于生成rest风格对http函数和server,代码中建http请求打包转发给grpc server,再将返回解包成json格式,完成对http对支持。通过这个插件,可以只需要添加很少量对代码,给一个gprc服务添加http支持。
// 这是核心的生成输入文件的语法结构定义,可以看出和golang/protobuf是类似的,无非是protoc解析完成的proto语法结构
// File wraps descriptor.FileDescriptorProto for richer features.
type File struct {
// GoPkg is the go package of the go file generated from this file..
GoPkg GoPackage
// Messages is the list of messages defined in this file.
Messages []*Message
// Enums is the list of enums defined in this file.
Enums []*Enum
// Services is the list of services defined in this file.
Services []*Service
// 核心处理函数,可以看出主要由三个template完成 headerTemplate; handlerTemplate; trailerTemplate
func applyTemplate(p param, reg *descriptor.Registry) (string, error) {
w := bytes.NewBuffer(nil)
if err := headerTemplate.Execute(w, p); err != nil {
return "", err
for _, svc := range p.Services {
for _, meth := range svc.Methods {
for _, b := range meth.Bindings {
methodWithBindingsSeen = true
if err := handlerTemplate.Execute(w, binding{
if err := trailerTemplate.Execute(w, tp); err != nil {
return "", err
return w.String(), nil
// headerTemplate 和 handlerTemplate的部分内容
headerTemplate = template.Must(template.New("header").Parse(`
// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
// source: {{.GetName}}
Package {{.GoPkg.Name}} is a reverse proxy.
It translates gRPC into RESTful JSON APIs.
package {{.GoPkg.Name}}
import (
{{range $i := .Imports}}{{if $i.Standard}}{{$i | printf "%s\n"}}{{end}}{{end}}
{{range $i := .Imports}}{{if not $i.Standard}}{{$i | printf "%s\n"}}{{end}}{{end}}
var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray
handlerTemplate = template.Must(template.New("handler").Parse(`
{{if and .Method.GetClientStreaming .Method.GetServerStreaming}}
{{template "bidi-streaming-request-func" .}}
{{else if .Method.GetClientStreaming}}
{{template "client-streaming-request-func" .}}
{{template "client-rpc-request-func" .}}
假设现在有一个类似go-grpc-gateway的需求,不同的是,输入是一个消息队列,要求生成的代码完成这样的功能:从消息队列取数据 -> 根据不同的topic/service发送给不同的grpc server -> 处理完成之后返回给消息队列
为了简单,沿用go-grpc-gateway对http handler的定义,如'post /hello' 只是不把他作为一个http path,而是当成一个mq topic, 比如post /hello 对于的topic 即post_/hello
, 返回数据的topic为 post_/hello_out
和grpc-gateway不同,runtime的定义变成处理mq相关的逻辑,比如定义、注册handler处理不同的topic,转发给对应的grpc client
func NewServeMux(amqpURI, exchangeName, exchangeType string) *ServeMux {
serveMux := &ServeMux{
exchangeType: exchangeType,
exchangeName: exchangeName,
handlers: make(map[string]handler),
mqClient: newMqclient(amqpURI, exchangeName, exchangeType),
return serveMux
// handler注册函数,注册一个h, 接收一个queue的mq消息,经过hanlterfunc处理后publish给queue
func (s *ServeMux) Handle(queueName string, h HandlerFunc) {
handle := handler{queue: queueName, h: h}
s.handlers[queueName] = handle
log.Printf("declaring Queue %q", queueName)
queue, err := s.mqClient.channel.QueueDeclare(
log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange",
queue.Name, queue.Messages, queue.Consumers)
queue_out, err := s.mqClient.channel.QueueDeclare(
if err = s.mqClient.channel.QueueBind(
if err = s.mqClient.channel.QueueBind(
deliveries, err := s.mqClient.channel.Consume(
if err != nil {
go s.consume(deliveries, handle)
func (s *ServeMux) consume(deliveries <-chan amqp.Delivery, handle handler) {
for d := range deliveries {
out := handle.h(d.Body)
s.publish(handle.queue, out)
_ = d.Ack(false)
log.Printf("handle: deliveries channel closed")
可以使用大部分protoc-gen-grpc-gateway的的生成逻辑,只要修改生成的template即可, 修改调其中耦合http的逻辑,输入输出都使用[]byte,为了简单,我这里省略的大量元数据、以及错误处理的逻辑.
proto定义, 定义了一个echo service,希望实现的逻辑是根据echo请求,返回一个结果,输入返回都通过mq完成
message EchoRequest {
// common
string Hello = 1;
repeated string Names = 2;
message EchoResponse {
string Hello = 1;
service EchoService {
rpc echo(EchoRequest) returns (EchoResponse) {
option (google.api.http) = {
post : "/v1/echo"
body : "*"
protoc -I/usr/local/include -I. \
-I$(GOPATH)/src \
-I$(GOPATH)/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis \
--mq-gateway_out=logtostderr=true:. \
package main
import (
var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray
func request_EchoService_Echo_0(ctx context.Context, marshaler runtime.Marshaler, client EchoServiceClient, req []byte, pathParams map[string]string) (proto.Message, error) {
var protoReq EchoRequest
if err := marshaler.NewDecoder(bytes.NewReader(req)).Decode(&protoReq); err != nil && err != io.EOF {
return nil, status.Errorf(codes.InvalidArgument, "%v", err)
msg, err := client.Echo(ctx, &protoReq)
return msg, err
func RegisterEchoServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
return err
defer func() {
if err != nil {
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
go func() {
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
return RegisterEchoServiceHandler(ctx, mux, conn)
func RegisterEchoServiceHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return RegisterEchoServiceHandlerClient(ctx, mux, NewEchoServiceClient(conn))
func RegisterEchoServiceHandlerClient(ctx context.Context, mux *runtime.ServeMux, client EchoServiceClient) error {
mux.Handle("POST"+"_/v1/echo", func(r []byte) (out []byte) {
inboundMarshaler, outboundMarshaler := &runtime.JSONPb{OrigName: true}, &runtime.JSONPb{OrigName: true}
resp, _ := request_EchoService_Echo_0(ctx, inboundMarshaler, client, r, nil)
out, _ = outboundMarshaler.Marshal(resp)
return nil
package main
import (
var (
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
exchangeName = flag.String("exchange", "test-exchange", "Durable AMQP exchange name")
exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
func main() {
rpcAddress := "localhost:9008"
// Establish gateways for incoming HTTP requests.
mux := runtime.NewServeMux(*uri, *exchangeName, *exchangeType)
ctx := context.Background()
dialOpts := []grpc.DialOption{grpc.WithInsecure()}
err := RegisterEchoServiceHandlerFromEndpoint(ctx, mux, rpcAddress, dialOpts)
if err != nil {
go mux.Start()
// RunRPCServer ...
func runRPCServer(rpcAddress string) {
listen, err := net.Listen("tcp", rpcAddress)
if err != nil {
log.Fatalf("failed to listen: %v", err)
rpcs := grpc.NewServer()
RegisterEchoServiceServer(rpcs, NewEchoService())
err = rpcs.Serve(listen)
if err != nil {
log.Fatalf("failed to serve: %v", err)
// EchoService ...
type EchoService struct {
// NewEchoService ...
func NewEchoService() *EchoService {
return &EchoService{}
// DescribeEvent ...
func (s *EchoService) Echo(ctx context.Context, req *EchoRequest) (res *EchoResponse, err error) {
return &EchoResponse{
Hello: req.GetHello() + " " + strings.Join(req.GetNames(), ","),
}, nil
mq-http-gateway, 为了便于测试,创建一个http server这个server比较检查,接收请求,把请求放入mq,同时从mq中取结果,作为http请求结果返回
import pika
from flask import Flask, request
import _thread
import queue
import uuid
app = Flask(__name__)
class Client:
def __init__(self):
// ...
def on_response(self, ch, method, props, body):
self.response = body
def call(self, body):
self.response = None
while self.response is None:
return self.response
@app.route('/v1/echo', methods=['POST'])
def echo():
c = Client()
return c.call(request.data)
if __name__ == '__main__':
credentials = pika.PlainCredentials('root', 'root')
channel.queue_declare(queue='POST_/v1/echo', durable=True)
print("start app")
# 启动server
➜ simple git:(master) ✗ ./simple -uri amqp://root:root@xxxx
2019/07/28 16:46:48 dialing "amqp://root:root@xxx
2019/07/28 16:46:48 got Connection, getting Channel
2019/07/28 16:46:48 got Channel, declaring Exchange ("test-exchange")
2019/07/28 16:46:48 declaring Queue "POST_/v1/echo"
2019/07/28 16:46:48 declared Queue ("POST_/v1/echo" 0 messages, 0 consumers), binding to Exchange
2019/07/28 16:46:48 declared Queue ("POST_/v1/echo_out" 0 messages, 0 consumers), binding to Exchange
2019/07/28 16:46:48 Starting Consume
# 在另一个console启动mq-gateway.py
➜ simple git:(master) ✗ python mq-gateway.py
start app
# 在另一个console测试,一切正常!
➜ simple git:(master) ✗ curl -H "Content-Type: application/json" localhost:5000/v1/echo -d '{"Hello": "x", "Names":["a", "b"] }'
{"Hello":"x a,b"}%
