源码案例:https://github.com/tenqaz/python-examples (opens new window)
官方文档:https://grpc.io/docs/languages/python/quickstart (opens new window)
在目录proto目录下创建user.proto
文件,创建User的rpc服务定义,该服务中包含AddUser和GetUser两个调用,并使用下面创建的对应的结构体作为请求体和响应体。 注意:需要添加package proto
,否则下面编译生成的python文件引用路径则不正确。
syntax = "proto3";
// 包名
package proto;
// 定义User rpc服务
service User {
// 定义rpc服务的方法
rpc AddUser (UserRequest) returns (UserResponse);
rpc GetUser (GetUserRequest) returns (GetUserResponse);
}
// 请求的结构体
message UserRequest {
string name = 1;
uint32 age = 2;
}
// 响应的结构体
message UserResponse {
string msg = 1;
int32 code = 2;
}
message GetUserRequest {
string name = 1;
}
message GetUserResponse {
string name = 1;
string age = 2;
}
首选需要安装grpc的库和工具
python -m pip install grpcio #安装grpc
python -m pip install grpcio-tools #安装grpc tools
然后,运行命令对proto文件进行编译,会根据上面的proto文件生成对应的python文件,你会发现在proto目录下创建了user_pb2.py
和user_pb2_grpc.py
两个文件
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. ./proto/user.proto
编译后:
我们创建proto_test.py文件,创建User对象,填充值,并将该对象序列化成字符串输出
from proto import user_pb2
# 创建Student对象,将该对象序列化成字符串
s = user_pb2.UserRequest()
s.name = "zhangsan"
s.age = 12
req_str = s.SerializeToString()
print(req_str)
输出:
b'\n\x08zhangsan\x10\x0c'
然后我们再创建User对象将将上面的输出的序列化字符串反序列化进来。
# 将上面的输出的序列化字符串反序列化成对象
s2 = user_pb2.UserRequest()
s2.ParseFromString(req_str)
print(s2.name)
print(s2.age)
输出:
zhangsan
12
下面是使用之前创建的protobuf和grpc文件来构建grpc服务端代码。
import logging
from concurrent import futures
import grpc
from proto import user_pb2, user_pb2_grpc
class UserService(user_pb2_grpc.UserServicer):
# 实现proto文件中rpc的调用
def AddUser(self, request: user_pb2.UserRequest, context):
return user_pb2.UserResponse(msg='add user(name={},age={}) success'.format(request.name, request.age), code=0)
def GetUser(self, request: user_pb2.GetUserRequest, context):
return user_pb2.GetUserResponse(name=request.name, age="1888")
def serve():
# 使用线程池来完成grpc的请求
server = grpc.server(futures.ThreadPoolExecutor(max_workers=5))
user_pb2_grpc.add_UserServicer_to_server(UserService(), server)
server.add_insecure_port('[::]:50051') # 绑定端口
server.start()
server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig()
serve()
运行该服务端,会阻塞等待客户端的请求。
import logging
import grpc
from proto import user_pb2, user_pb2_grpc
def run():
# 连接rpc服务
with grpc.insecure_channel('localhost:50051') as channel:
stub = user_pb2_grpc.UserStub(channel)
# 调用rpc服务的AddUser方法
response: user_pb2.UserResponse = stub.AddUser(user_pb2.UserRequest(name="zhangsan", age=18))
print("add user, response is 'msg={}, code={}'".format(response.msg, response.code))
# 调用rpc服务的GetUser方法
response: user_pb2.GetUserResponse = stub.GetUser(user_pb2.GetUserRequest(name="lisi"))
print("get user[name={}, age={}]".format(response.name, response.age))
if __name__ == '__main__':
logging.basicConfig()
run()
运行客户端,调用rpc服务,输出:
add user, response is 'msg=add user(name=zhangsan,age=18) success, code=0'
get user[name=lisi, age=1888]