最近遇到一个组件,其 client 和 server 端的通信协议如下:
0 15 79
+----------------+---------------------------------------------------------------------+
| body length | service name |
+----------------+---------------------------------------------------------------------+
| |
| body |
| |
+--------------------------------------------------------------------------------------+
整个报文采用 UTF-8 编码,Server 端使用 C++ 开发的。
接下来,采用 Java + Netty 模拟该组件的功能,以演示私有协议下 netty 的粘包/拆包的实现。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class DemoPrivateProtocolDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("ByteBuf.readableBytes " + in.readableBytes());
while (true) {
if (in.readableBytes() < 80) {
break;
}
in.markReaderIndex();
String strLen = in.readCharSequence(16, CharsetUtil.UTF_8).toString().trim();
int bodyLength = Integer.parseInt(strLen);
in.skipBytes(64);
if (in.readableBytes() < bodyLength) {
in.resetReaderIndex();
break;
}
String body = in.readCharSequence(bodyLength, StandardCharsets.UTF_8).toString();
out.add(body);
}
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class SocketServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DemoPrivateProtocolDecoder());
pipeline.addLast(new DemoServerHandler());
}
});
int port = 9000;
ChannelFuture channelFuture = serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("绑定端口 " + port + " 成功!");
} else {
System.err.println("绑定端口 " + port + " 失败!");
}
}).sync();
channelFuture.channel().closeFuture().sync();
}
finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
public class DemoServerHandler extends ChannelInboundHandlerAdapter {
private static AtomicInteger count = new AtomicInteger(0);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String)msg;
Map<String, String> results = new HashMap<>();
String[] resultTokens = body.split("&");
for (String result : resultTokens) {
String[] tokens = result.split("=");
if (tokens.length > 0) {
if (tokens.length == 2) {
results.put(tokens[0], URLDecoder.decode(tokens[1], StandardCharsets.UTF_8.name()));
}
else {
results.put(tokens[0], "");
}
}
}
System.out.println("---> received: " + count.incrementAndGet() );
for (Map.Entry<String, String> entry : results.entrySet()) {
System.out.println(" ( " + entry.getKey() + ", " + entry.getValue() + " )");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class SocketClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SocketClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 9000).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接服务器成功!");
} else {
System.err.println("连接服务器失败!");
}
}).sync();
channelFuture.channel().closeFuture().sync();
}
finally {
eventLoopGroup.shutdownGracefully();
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class SocketClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Map<String, String> msgMap = new HashMap<>();
for (int j = 0; j < 1000; j++) {
for (int i = 0; i < 1000; ) {
System.out.println("---> send: " + (++i + 3600 * j));
msgMap.put("name", "Michael " + i);
msgMap.put("sex", i % 2 == 0 ? "F" : "M");
msgMap.put("age", "" + i);
ByteBuf byteBuf = this.encode(msgMap);
ctx.writeAndFlush(byteBuf);
}
TimeUnit.MILLISECONDS.sleep(5);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private ByteBuf encode(Map<String, String> data) throws Exception {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : data.entrySet()) {
sb.append(entry.getKey()).append("=")
.append(URLEncoder.encode(entry.getValue(), CharsetUtil.UTF_8.name()))
.append("&");
}
String body = sb.toString();
int bodyLength = body.length();
System.out.println("---> [send packet] body length: " + bodyLength );
ByteBuf out = Unpooled.buffer(80 + bodyLength);
out.writerIndex(0);
out.writeCharSequence(String.valueOf(bodyLength), StandardCharsets.UTF_8);
out.writerIndex(16);
out.writeCharSequence("DemoService", StandardCharsets.UTF_8);
out.writerIndex(80);
out.writeCharSequence(body, StandardCharsets.UTF_8);
return out;
}
}
下面是代码运行后的截图,可以看出 TCP 报文被 Netty 正确的进行了粘包和拆包处理。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。