前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty 私有协议粘包拆包实例

Netty 私有协议粘包拆包实例

原创
作者头像
鲍远林
修改2022-03-27 15:20:58
9210
修改2022-03-27 15:20:58
举报
文章被收录于专栏:TPlus

1. 实例说明

最近遇到一个组件,其 client 和 server 端的通信协议如下:

  • 报文头部长度:80 byte;其中前 16 byte 为字符串格式的数字,表示 body 的长度;后 64 byte 为服务名称。
  • 报文体:从第 81 byte 开始为报文体,采用 URL encode 后的字符串。
代码语言:javascript
复制
0                15                                                                   79
+----------------+---------------------------------------------------------------------+
|   body length  |                         service name                                |
+----------------+---------------------------------------------------------------------+
|                                                                                      |
|                                        body                                          |
|                                                                                      |
+--------------------------------------------------------------------------------------+

整个报文采用 UTF-8 编码,Server 端使用 C++ 开发的。

接下来,采用 Java + Netty 模拟该组件的功能,以演示私有协议下 netty 的粘包/拆包的实现。

2. Decoder

代码语言:javascript
复制
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.CharsetUtil;

import java.nio.charset.StandardCharsets;
import java.util.List;


public class DemoPrivateProtocolDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        System.out.println("ByteBuf.readableBytes " + in.readableBytes());

        while (true) {
            if (in.readableBytes() < 80) {
                break;
            }
            in.markReaderIndex();

            String strLen = in.readCharSequence(16, CharsetUtil.UTF_8).toString().trim();
            int bodyLength = Integer.parseInt(strLen);

            in.skipBytes(64);
            if (in.readableBytes() < bodyLength) {
                in.resetReaderIndex();
                break;
            }

            String body = in.readCharSequence(bodyLength, StandardCharsets.UTF_8).toString();
            out.add(body);
        }
    }
}

3. Server

代码语言:javascript
复制
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class SocketServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new DemoPrivateProtocolDecoder());
                            pipeline.addLast(new DemoServerHandler());
                        }
                    });

            int port = 9000;
            ChannelFuture channelFuture = serverBootstrap.bind(port).addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println("绑定端口 " + port + " 成功!");
                } else {
                    System.err.println("绑定端口 " + port + " 失败!");
                }
            }).sync();

            channelFuture.channel().closeFuture().sync();
        }
        finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

代码语言:javascript
复制
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class DemoServerHandler extends ChannelInboundHandlerAdapter {

    private static AtomicInteger count = new AtomicInteger(0);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        String body = (String)msg;

        Map<String, String> results = new HashMap<>();

        String[] resultTokens = body.split("&");
        for (String result : resultTokens) {
            String[] tokens = result.split("=");
            if (tokens.length > 0) {
                if (tokens.length == 2) {
                    results.put(tokens[0], URLDecoder.decode(tokens[1], StandardCharsets.UTF_8.name()));
                }
                else {
                    results.put(tokens[0], "");
                }
            }
        }

        System.out.println("---> received: " + count.incrementAndGet() );
        for (Map.Entry<String, String> entry : results.entrySet()) {
            System.out.println("    ( " + entry.getKey() + ", " + entry.getValue() + " )");
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

4. Client

代码语言:javascript
复制
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class SocketClient {

    public static void main(String[] args) throws InterruptedException {

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new SocketClientHandler());
                        }
                    });

            ChannelFuture channelFuture = bootstrap.connect("localhost", 9000).addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println("连接服务器成功!");
                } else {
                    System.err.println("连接服务器失败!");
                }
            }).sync();

            channelFuture.channel().closeFuture().sync();
        }
        finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

代码语言:java
复制
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class SocketClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        Map<String, String> msgMap = new HashMap<>();

        for (int j = 0; j < 1000; j++) {
            for (int i = 0; i < 1000; ) {
                System.out.println("---> send: " + (++i + 3600 * j));

                msgMap.put("name", "Michael " + i);
                msgMap.put("sex", i % 2 == 0 ? "F" : "M");
                msgMap.put("age", "" + i);

                ByteBuf byteBuf = this.encode(msgMap);
                ctx.writeAndFlush(byteBuf);
            }

            TimeUnit.MILLISECONDS.sleep(5);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    private ByteBuf encode(Map<String, String> data) throws Exception {

        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : data.entrySet()) {
            sb.append(entry.getKey()).append("=")
                    .append(URLEncoder.encode(entry.getValue(), CharsetUtil.UTF_8.name()))
                    .append("&");
        }

        String body = sb.toString();
        int bodyLength = body.length();

        System.out.println("---> [send packet] body length: " +  bodyLength );

        ByteBuf out = Unpooled.buffer(80 + bodyLength);

        out.writerIndex(0);
        out.writeCharSequence(String.valueOf(bodyLength), StandardCharsets.UTF_8);

        out.writerIndex(16);
        out.writeCharSequence("DemoService", StandardCharsets.UTF_8);

        out.writerIndex(80);
        out.writeCharSequence(body, StandardCharsets.UTF_8);

        return out;
    }
}

5. 运行结果

下面是代码运行后的截图,可以看出 TCP 报文被 Netty 正确的进行了粘包和拆包处理。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 实例说明
  • 2. Decoder
  • 3. Server
  • 4. Client
  • 5. 运行结果
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档