前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java GRPC 双向通信

Java GRPC 双向通信

原创
作者头像
8菠萝
发布2022-06-21 23:50:52
1.3K0
发布2022-06-21 23:50:52
举报
文章被收录于专栏:菠萝上市没有

背景

使用grpc的stream流模式,让服务器具备推送消息的能力。

例子

简单例子,实现双向通信

proto 文件

使用stream 关键字

代码语言:txt
复制
message CommandMessage {
  required int32  type  = 1;
  optional string data  = 2;
}

service CommandStreamService {
  rpc CommandDispatch(stream CommandMessage) returns (stream CommandMessage){}
}

服务端

代码语言:java
复制
public class CommandStreamServerImpl extends CommandStreamServiceGrpc.CommandStreamServiceImplBase {
    // 用来向客户端推送消息
    private StreamObserver sendCmdObServer;

    public io.grpc.stub.StreamObserver<com.zdpower.grpc.Hello.CommandMessage> commandDispatch(
            io.grpc.stub.StreamObserver<com.zdpower.grpc.Hello.CommandMessage> responseObserver) {
        // server  => client
        sendCmdObServer = responseObserver;

        // client => server 
        return  new StreamObserver<Hello.CommandMessage>() {
            @Override
            public void onNext(Hello.CommandMessage value) {
                System.out.println("服务端:" + value.getData());
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onCompleted() {

            }
        };
    }

    // contoller 测试用
    public void sendUpdateCmd() {
        sendCmdObServer.onNext(Hello.CommandMessage.newBuilder().setType(1).setData("update").build());
    }
}

客户端

代码语言:java
复制
public class AgentRunApp {
    private StreamObserver receivedEnd;

    public static void main(String[] args) {
        DefaultEventLoopGroup loopGroup = new DefaultEventLoopGroup();
        try {
            AgentRunApp app = new AgentRunApp();
            app.run();
            // 堵塞 netty
            loopGroup.awaitTermination(30, TimeUnit.DAYS);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public void run() throws Exception {
        // 连接
        ManagedChannel channel = ManagedChannelBuilder.forTarget("127.0.0.1:50054").usePlaintext().build();
        CommandStreamServiceGrpc.CommandStreamServiceStub stub = CommandStreamServiceGrpc.newStub(channel);
        // 客户端处理服务器消息  server=>client
        receivedEnd = new StreamObserver<Hello.CommandMessage>() {
            @Override
            public void onNext(Hello.CommandMessage message) {
                System.out.println("客户端收到消息:" + message.getData());

            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onCompleted() {
            }
        };

        // 模拟客户端发送消息
        while (true){
            // client => server
            StreamObserver<Hello.CommandMessage> sendEnd =  stub.commandDispatch(receivedEnd);
            sendEnd.onNext(Hello.CommandMessage.newBuilder().setType(0).setData("client send").build());
            sendEnd.onCompleted();
            Thread.sleep(1000);
        }
    }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 例子
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档