高清思维导图原件(xmind/pdf/jpg
)可以关注公众号:一枝花算不算浪漫
回复netty01
即可。
上一篇文章讲了NIO
相关的知识点,相比于传统IO
,NIO
已经做得很优雅了,为什么我们还要使用Netty
?
上篇文章最后留了很多坑,讲了NIO
使用的弊端,也是为了引出Netty
而设立的,这篇文章我们就来好好揭开Netty
的神秘面纱。
本篇文章的目的很简单,希望看过后你能看懂Netty
的示例代码,针对于简单的网络通信,自己也能用Netty
手写一个开发应用出来!
以下是一个简单聊天室Server端的程序,代码参考自:http://www.imooc.com/read/82/article/2166
代码有点长,主要核心代码是在main()
方法中,这里代码也希望大家看懂,后面也会一步步剖析。
PS:我是用mac
系统,直接在终端输入telnet 127.0.0.1 8007
即可启动一个聊天框,如果提示找不到telnet
命令,可以通过brew
进行安装,具体步骤请自行百度。
/**
* @Description netty简易聊天室
*
* @Author 一枝花算不算浪漫
* @Date 2020/8/10 6:52 上午
*/
public final class NettyChatServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// 1. EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 2. 服务端引导器
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 3. 设置线bootStrap信息
serverBootstrap.group(bossGroup, workerGroup)
// 4. 设置ServerSocketChannel的类型
.channel(NioServerSocketChannel.class)
// 5. 设置参数
.option(ChannelOption.SO_BACKLOG, 100)
// 6. 设置ServerSocketChannel对应的Handler,只能设置一个
.handler(new LoggingHandler(LogLevel.INFO))
// 7. 设置SocketChannel对应的Handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 可以添加多个子Handler
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ChatNettyHandler());
}
});
// 8. 绑定端口
ChannelFuture f = serverBootstrap.bind(PORT).sync();
// 9. 等待服务端监听端口关闭,这里会阻塞主线程
f.channel().closeFuture().sync();
} finally {
// 10. 优雅地关闭两个线程池
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static class ChatNettyHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("one conn active: " + ctx.channel());
// channel是在ServerBootstrapAcceptor中放到EventLoopGroup中的
ChatHolder.join((SocketChannel) ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
String content = new String(bytes, StandardCharsets.UTF_8);
System.out.println(content);
if (content.equals("quit\r\n")) {
ctx.channel().close();
} else {
ChatHolder.propagate((SocketChannel) ctx.channel(), content);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("one conn inactive: " + ctx.channel());
ChatHolder.quit((SocketChannel) ctx.channel());
}
}
private static class ChatHolder {
static final Map<SocketChannel, String> USER_MAP = new ConcurrentHashMap<>();
/**
* 加入群聊
*/
static void join(SocketChannel socketChannel) {
// 有人加入就给他分配一个id
String userId = "用户"+ ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
send(socketChannel, "您的id为:" + userId + "\n\r");
for (SocketChannel channel : USER_MAP.keySet()) {
send(channel, userId + " 加入了群聊" + "\n\r");
}
// 将当前用户加入到map中
USER_MAP.put(socketChannel, userId);
}
/**
* 退出群聊
*/
static void quit(SocketChannel socketChannel) {
String userId = USER_MAP.get(socketChannel);
send(socketChannel, "您退出了群聊" + "\n\r");
USER_MAP.remove(socketChannel);
for (SocketChannel channel : USER_MAP.keySet()) {
if (channel != socketChannel) {
send(channel, userId + " 退出了群聊" + "\n\r");
}
}
}
/**
* 扩散说话的内容
*/
public static void propagate(SocketChannel socketChannel, String content) {
String userId = USER_MAP.get(socketChannel);
for (SocketChannel channel : USER_MAP.keySet()) {
if (channel != socketChannel) {
send(channel, userId + ": " + content);
}
}
}
/**
* 发送消息
*/
static void send(SocketChannel socketChannel, String msg) {
try {
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
ByteBuf writeBuffer = allocator.buffer(msg.getBytes().length);
writeBuffer.writeCharSequence(msg, Charset.defaultCharset());
socketChannel.writeAndFlush(writeBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
代码有点长,执行完的效果如上图所示,下面所有内容都是围绕着如何看懂
以及如何写出
这样的代码来展开的,希望你看完 也能轻松手写Netty
服务端代码~。通过简单demo开发让大家体验了Netty
实现相比NIO
确实要简单的多,但优点不限于此,只需要知道选择Netty就对了。
对应着文章开头的思维导图,我们知道Netty
的核心组件主要有:
类图如下:
一看到BootStrap
大家就应该想到启动类、引导类这样的词汇,之前分析过EurekaServer项目启动类时介绍过EurekaBootstrap
, 他的作用就是上下文初始化、配置初始化。
在Netty
中我们也有类似的类,Bootstrap
和ServerBootstrap
它们都是Netty
程序的引导类,主要用于配置各种参数,并启动整个Netty
服务,我们看下文章开头的示例代码:
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ChatNettyHandler());
}
});
Bootstrap
和ServerBootstrap
是针对于Client
和Server
端定义的两套启动类,区别如下:
Bootstrap
是客户端引导类,而ServerBootstrap
是服务端引导类。Bootstrap
通常使用connect()
方法连接到远程的主机和端口,作为一个TCP客户端
。ServerBootstrap
通常使用bind()
方法绑定本地的端口,等待客户端来连接。ServerBootstrap
可以处理Accept
事件,这里面childHandler
是用来处理Channel
请求的,我们可以查看chaildHandler()
方法的注解:Bootstrap
客户端引导只需要一个EventLoopGroup
,但是一个ServerBootstrap
通常需要两个(上面的boosGroup
和workerGroup
)。EventLoopGroup
及EventLoop
这两个类名称定义的很奇怪,对于初学者来说往往无法通过名称来了解其中的含义,包括我也是这样。
EventLoopGroup
可以理解为一个线程池,对于服务端程序,我们一般会绑定两个线程池,一个用于处理 Accept
事件,一个用于处理读写事件,看下EventLoop
系列的类目录:
通过上面的类图,我们才恍然大悟,我的亲娘咧,这不就是一个线程池嘛?(名字气的犄角拐弯的真是难认)
EventLoopGroup
是EventLoop
的集合,一个EventLoopGroup
包含一个或者多个EventLoop
。我们可以将EventLoop
看做EventLoopGroup
线程池中的一个个工作线程。
至于这里为什么要用到两个线程池,具体的其实可以参考Reactor
设计模式,这里暂时不做过多的讲解。
当一个连接到达时,Netty
就会创建一个 Channel
,然后从 EventLoopGroup
中分配一个 EventLoop
来给这个 Channel
绑定上,在该 Channel
的整个生命周期中都是有这个绑定的 EventLoop
来服务的。
在Java NIO
中我们有 ByteBuffer
缓冲池,对于它的操作我们应该印象深刻,往Buffer
中写数据时我们需要关注写入的位置,切换成读模式时我们还要切换读写状态,不然将会出现大问题。
针对于NIO
中超级难用的Buffer
类, Netty
提供了ByteBuf
来替代。ByteBuf
声明了两个指针:一个读指针,一个写指针,使得读写操作进行分离,简化buffer
的操作流程。
另外Netty
提供了发几种ByteBuf
的实现以供我们选择,ByteBuf
可以分为:
Pooled
和Unpooled
池化和非池化对于这么多种创建Buffer
的方式该怎么选择呢?Netty
也为我们处理好了,我们可以直接使用(真是暖男Ntetty
):
ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
ByteBuf buffer = allocator.buffer(length);
使用这种方式,Netty将最大努力的使用池化、Unsafe、对外内存的方式为我们创建buffer。
提起Channel
并不陌生,上一篇讲NIO
的三大组件提到过,最常见的就是java.nio.SocketChannel
和java.nio.ServerSocketChannel
,他们用于非阻塞的I/0操作。类似于NIO
的Channel
,Netty提供了自己的Channel
和其子类实现,用于异步I/0操作和其他相关的操作。
在 Netty
中, Channel
是一个 Socket
连接的抽象, 它为用户提供了关于底层 Socket
状态(是否是连接还是断开) 以及对 Socket
的读写等操作。每当 Netty
建立了一个连接后, 都会有一个对应的 Channel
实例。并且,有父子channel
的概念。 服务器连接监听的channel
,也叫 parent channel
。 对应于每一个 Socket
连接的channel
,也叫 child channel
。
既然channel
是 Netty 抽象出来的网络 I/O 读写相关的接口,为什么不使用 JDK NIO
原生的 Channel
而要另起炉灶呢,主要原因如下:
JDK
的 SocketChannel
和 ServersocketChannel
没有统一的 Channel
接口供业务开发者使用,对一于用户而言,没有统一的操作视图,使用起来并不方便。JDK
的 SocketChannel
和 ScrversockctChannel
的主要职责就是网络 I/O 操作,由于他们是 SPI
类接口,由具体的虚拟机厂家来提供,所以通过继承 SPI 功能直接实现 ServersocketChannel
和 SocketChannel
来扩展其工作量和重新 Channel
功类是差不多的。ChannelPipeline Channel
需要够跟 Netty 的整体架构融合在一起,例如 I/O 模型、基的定制模型,以及基于元数据描述配置化的 TCP 参数等,这些 JDK SocketChannel
和ServersocketChannel
都没有提供,需要重新封装。Channel
,功实现更加灵活。基于上述 4 原因,它的设计原理比较简单, Netty 重新设计了 Channel
接口,并且给予了很多不同的实现。但是功能却比较繁杂,主要的设计理念如下:
Channel
接口层,相关联的其他操作封装起来,采用 Facade
模式进行统一封装,将网络 I/O 操作、网络 I/O 统一对外提供。Channel
接口的定义尽量大而全,统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度上实现接口的重用。Channel
中,由 Channel
统一负责分配和调度,功能实现更加灵活。Channel
的实现类非常多,继承关系复杂,从学习的角度我们抽取最重要的两个 NioServerSocketChannel
和 NioSocketChannel
。
服务端 NioServerSocketChannel
的继承关系类图如下:
客户端 NioSocketChannel
的继承关系类图如下:
后面文章源码系列会具体分析,这里就不进一步阐述分析了。
ChannelHandler
是Netty
中最常用的组件。ChannelHandler
主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
ChannelHandler
有两个核心子类 ChannelInboundHandler
和 ChannelOutboundHandler
,其中 ChannelInboundHandler
用于接收、处理入站( Inbound
)的数据和事件,而 ChannelOutboundHandler
则相反,用于接收、处理出站( Outbound
)的数据和事件。
ChannelInboundHandler
处理入站数据以及各种状态变化,当Channel
状态发生改变会调用ChannelInboundHandler
中的一些生命周期方法.这些方法与Channel
的生命密切相关。
入站数据,就是进入socket
的数据。下面展示一些该接口的生命周期API
:
当某个 ChannelInboundHandler
的实现重写 channelRead()
方法时,它将负责显式地释放与池化的 ByteBuf
实例相关的内存。 Netty 为此提供了一个实用方法ReferenceCountUtil.release()
。
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}
这种方式还挺繁琐的,Netty提供了一个SimpleChannelInboundHandler
,重写channelRead0()
方法,就可以在调用过程中会自动释放资源.
public class SimpleDiscardHandler
extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// 不用调用ReferenceCountUtil.release(msg)也会释放资源
}
}
出站操作和数据将由 ChannelOutboundHandler
处理。它的方法将被 Channel
、 ChannelPipeline
以及 ChannelHandlerContext
调用。
ChannelOutboundHandler
的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如, 如果到远程节点的写入被暂停了, 那么你可以推迟冲刷操作并在稍后继续。
ChannelPromise
与ChannelFuture
: ChannelOutboundHandler
中的大部分方法都需要一个ChannelPromise
参数, 以便在操作完成时得到通知。 ChannelPromise
是ChannelFuture
的一个子类,其定义了一些可写的方法,如setSuccess()
和setFailure()
,从而使ChannelFuture
不可变。
ChannelHandlerAdapter
顾名思义,就是handler
的适配器。你需要知道什么是适配器模式,假设有一个A接口,我们需要A的subclass
实现功能,但是B类中正好有我们需要的功能,不想复制粘贴B中的方法和属性了,那么可以写一个适配器类Adpter
继承B实现A,这样一来Adapter
是A的子类并且能直接使用B中的方法,这种模式就是适配器模式。
就比如Netty中的SslHandler
类,想使用ByteToMessageDecoder
中的方法进行解码,但是必须是ChannelHandler
子类对象才能加入到ChannelPipeline
中,通过如下签名和其实现细节(SslHandler
实现细节就不贴了)就能够作为一个handler
去处理消息了。
public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler
ChannelHandlerAdapter
提供了一些实用方法isSharable()
如果其对应的实现被标注为 Sharable
, 那么这个方法将返回 true
, 表示它可以被添加到多个 ChannelPipeline
中 。如果想在自己的ChannelHandler
中使用这些适配器类,只需要扩展他们,重写那些想要自定义的方法即可。
每一个新创建的 Channel
都将会被分配一个新的 ChannelPipeline
。这项关联是永久性的; Channel
既不能附加另外一个 ChannelPipeline
,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。
Netty 的 ChannelHandler
为处理器提供了基本的抽象, 目前你可以认为每个 ChannelHandler
的实例都类似于一种为了响应特定事件而被执行的回调。从应用程序开发人员的角度来看, 它充当了所有处理入站和出站数据的应用程序逻辑的拦截载体。ChannelPipeline
提供了 ChannelHandler
链的容器,并定义了用于在该链上传播入站和出站事件流的 API
。当 Channel
被创建时,它会被自动地分配到它专属的 ChannelPipeline
。
ChannelHandler
安装到 ChannelPipeline
中的过程如下所示:
ChannelInitializer
的实现被注册到了ServerBootstrap
中ChannelInitializer.initChannel()
方法被调用时,ChannelInitializer
将在 ChannelPipeline
中安装一组自定义的 ChannelHandler
ChannelInitializer
将它自己从 ChannelPipeline
中移除如上图所示:这是一个同时具有入站和出站 ChannelHandler
的 ChannelPipeline
的布局,并且印证了我们之前的关于 ChannelPipeline
主要由一系列的 ChannelHandler
所组成的说法。 ChannelPipeline
还提供了通过 ChannelPipeline
本身传播事件的方法。如果一个入站事件被触发,它将被从 ChannelPipeline
的头部开始一直被传播到 Channel Pipeline 的尾端。
你可能会说, 从事件途经 ChannelPipeline
的角度来看, ChannelPipeline
的头部和尾端取决于该事件是入站的还是出站的。然而 Netty 总是将 ChannelPipeline
的入站口(图 的左侧)作为头部,而将出站口(该图的右侧)作为尾端。
当你完成了通过调用 ChannelPipeline.add*()
方法将入站处理器( ChannelInboundHandler
)和 出 站 处 理 器 ( ChannelOutboundHandler
) 混 合 添 加 到 ChannelPipeline
之 后 , 每 一 个ChannelHandler
从头部到尾端的顺序位置正如同我们方才所定义它们的一样。因此,如果你将图 6-3 中的处理器( ChannelHandler
)从左到右进行编号,那么第一个被入站事件看到的 ChannelHandler
将是1,而第一个被出站事件看到的 ChannelHandler
将是 5。
在 ChannelPipeline
传播事件时,它会测试 ChannelPipeline
中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。如果不匹配, ChannelPipeline
将跳过该ChannelHandler
并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。 (当然, ChannelHandler
也可以同时实现ChannelInboundHandler
接口和 ChannelOutboundHandler
接口。)
ChannelPipeline
修改指的是添加或删除ChannelHandler
,见代码示例:
ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler();
// 先添加一个Handler到ChannelPipeline中
pipeline.addLast("handler1", firstHandler);
// 这个Handler放在了first,意味着放在了handler1之前
pipeline.addFirst("handler2", new SecondHandler());
// 这个Handler被放到了last,意味着在handler1之后
pipeline.addLast("handler3", new ThirdHandler());
...
// 通过名称删除
pipeline.remove("handler3");
// 通过对象删除
pipeline.remove(firstHandler);
// 名称"handler2"替换成名称"handler4",并切handler2的实例替换成了handler4的实例
pipeline.replace("handler2", "handler4", new ForthHandler());
ChannelPipeline
的出入站API
入站API
所示:
图片上传失败...(image-6037f5-1598167949595)
出站API
所示:
ChannelPipeline
这个组件上面所讲的大致只需要记住这三点即可:
ChannelPipeline
保存了与 Channel
相关联的 ChannelHandler
ChannelPipeline
可以根据需要,通过添加或者删除 ChannelHandler
来动态地修改ChannelPipeline
有着丰富的API
用以被调用,以响应入站和出站事件当 ChannelHandler
被添加到 ChannelPipeline
时,它将会被分配一个 ChannelHandlerContext
,它代表了 ChannelHandler
和 ChannelPipeline
之间的绑定。ChannelHandlerContext
的主要功能是管理它所关联的ChannelHandler
和在同一个 ChannelPipeline
中的其他ChannelHandler
之间的交互。
如果调用Channel
或ChannelPipeline
上的方法,会沿着整个ChannelPipeline
传播,如果调用ChannelHandlerContext
上的相同方法,则会从对应的当前ChannelHandler
进行传播。
ChannelHandlerContext API
如下表所示:
ChannelHandlerContext
和 ChannelHandler
之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;ChannelHandlerContext
的方法将产生更短的事件流, 应该尽可能地利用这个特性来获得最大的性能。ChannelHandler
、ChannelPipeline
的关联使用从ChannelHandlerContext
访问channel
ChannelHandlerContext ctx = ..;
// 获取channel引用
Channel channel = ctx.channel();
// 通过channel写入缓冲区
channel.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));
从ChannelHandlerContext
访问ChannelPipeline
ChannelHandlerContext ctx = ..;
// 获取ChannelHandlerContext
ChannelPipeline pipeline = ctx.pipeline();
// 通过ChannelPipeline写入缓冲区
pipeline.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));
有时候我们不想从头传递数据,想跳过几个handler
,从某个handler
开始传递数据.我们必须获取目标handler
之前的handler
关联的ChannelHandlerContext
。
ChannelHandlerContext ctx = ..;
// 直接通过ChannelHandlerContext写数据,发送到下一个handler
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
好了,ChannelHandlerContext
的基本使用应该掌握了,但是你真的理解ChannelHandlerContext
,ChannelPipeline
和Channelhandler
之间的关系了吗?不理解也没关系,因为源码以后会帮你理解的更为深刻。
Channel
对应一个 ChannelPipeline
ChannelPipeline
包含一条双向的 ChannelHandlerContext
链ChannelHandlerContext
中包含一个 ChannelHandler
Channel
会绑定到一个EventLoop
上NioEventLoop
维护了一个 Selector(
使用的是 Java 原生的 Selector)NioEventLoop
相当于一个线程粘包拆包问题是处于网络比较底层的问题,在数据链路层、网络层以及传输层都有可能发生。我们日常的网络应用开发大都在传输层进行,由于UDP
有消息保护边界,不会发生粘包拆包问题,而因此粘包拆包问题只发生在TCP
协议中。具体讲TCP
是个”流"协议,只有流的概念,没有包的概念,对于业务上层数据的具体含义和边界并不了解,它只会根据TCP
缓冲区的实际情况进行包的划分。所以在业务上认为,一个完整的包可能会被TCP
拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP
粘包和拆包问题。
下面针对客户端分别发送了两个数据表Packet1
和Packet2
给服务端的时候,TCP
粘包和拆包会出现的情况进行列举说明:
(1)第一种情况,服务端分两次正常收到两个独立数据包,即没有发生拆包和粘包的现象;
(2)第二种情况,接收端只收到一个数据包,由于TCP
是不会出现丢包的,所以这一个数据包中包含了客户端发送的两个数据包的信息,这种现象即为粘包。这种情况由于接收端不知道这两个数据包的界限,所以对于服务接收端来说很难处理。
(3)第三种情况,服务端分两次读取到了两个数据包,第一次读取到了完整的Packet1
和Packet2
包的部分内容,第二次读取到了Packet2
的剩余内容,这被称为TCP拆包;
(4)第四种情况,服务端分两次读取到了两个数据包,第一次读取到了部分的Packet1
内容,第二次读取到了Packet1
剩余内容和Packet2
的整包。
如果此时服务端TCP接收滑窗非常小,而数据包Packet1
和Packet2
比较大,很有可能服务端需要分多次才能将两个包接收完全,期间发生多次拆包。以上列举情况的背后原因分别如下:
MSS
(最大报文长度)大小的TCP
分段,当TCP
报文长度-TCP
头部长度>MSS
的时候将发生拆包。由于底层的TCP
无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:
FTP
协议;int32
来表示消息的总长度;之前Netty示例中其实并没有考虑读半包问题,这在功能测试往往没有问题,但是一旦请求数过多或者发送大报文之后,就会存在该问题。如果代码没有考虑,往往就会出现解码错位或者错误,导致程序不能正常工作,下面看看Netty是如何根据主流的解决方案进行抽象实现来帮忙解决这一问题的。
如下表所示,Netty为了找出消息的边界,采用封帧方式:
方式 | 解码 | 编码 |
---|---|---|
固定长度 | FixedLengthFrameDecoder | 简单 |
分隔符 | DelimiterBasedFrameDecoder | 简单 |
专门的 length 字段 | LengthFieldBasedFrameDecoder | LengthFieldPrepender |
注意到,Netty提供了对应的解码器来解决对应的问题,有了这些解码器,用户不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和半包问题。为什么这么说呢?下面列举一个包尾增加分隔符的例子:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: wuxiaofei
* @Date: 2020/8/15 0015 19:15
* @Version: 1.0
* @Description:入站处理器
*/
@ChannelHandler.Sharable
public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
private AtomicInteger completeCounter = new AtomicInteger(0);
/*** 服务端读取到网络数据后的处理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept["+request
+"] and the counter is:"+counter.incrementAndGet());
String resp = "Hello,"+request+". Welcome to Netty World!"
+ DelimiterEchoServer.DELIMITER_SYMBOL;
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
}
/*** 服务端读取完成网络数据后的处理*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.fireChannelReadComplete();
System.out.println("the ReadComplete count is "
+completeCounter.incrementAndGet());
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import java.net.InetSocketAddress;
/**
* @Author: wuxiaofei
* @Date: 2020/8/15 0015 19:17
* @Version: 1.0
* @Description:服务端
*/
public class DelimiterEchoServer {
public static final String DELIMITER_SYMBOL = "@~";
public static final int PORT = 9997;
public static void main(String[] args) throws InterruptedException {
DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();
System.out.println("服务器即将启动");
delimiterEchoServer.start();
}
public void start() throws InterruptedException {
final DelimiterServerHandler serverHandler = new DelimiterServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
.localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
/*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
所以下面这段代码的作用就是为这个子channel增加handle*/
.childHandler(new ChannelInitializerImp());
ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
System.out.println("服务器启动完成,等待客户端的连接和数据.....");
f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
} finally {
group.shutdownGracefully().sync();/*优雅关闭线程组*/
}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL
.getBytes());
//服务端收到数据包后经过DelimiterBasedFrameDecoder即分隔符基础框架解码器解码为一个个带有分隔符的数据包。
ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
delimiter));
ch.pipeline().addLast(new DelimiterServerHandler());
}
}
}
添加到ChannelPipeline
的DelimiterBasedFrameDecoder
用于对使用分隔符结尾的消息进行自动解码,当然还有没有用到的FixedLengthFrameDecoder
用于对固定长度的消息进行自动解码等解码器。正如上门的代码使用案例,有了Netty提供的几码器可以轻松地完成对很多消息的自动解码,而且不需要考虑TCP粘包/拆包导致的读半包问题,极大地提升了开发效率。
相信看完上面的铺垫,你对Netty编码有了一定的了解了,下面再来整体梳理一遍吧。
1、设置EventLoopGroup
线程组(Reactor
线程组)
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
上面我们说过Netty
中使用Reactor
模式,bossGroup
表示服务器连接监听线程组,专门接受 Accept
新的客户端client
连接。另一个workerGroup
表示处理每一连接的数据收发的线程组,来处理消息的读写事件。
2、服务端引导器
ServerBootstrap serverBootstrap = new ServerBootstrap();
集成所有配置,用来启动Netty
服务端。
3、设置ServerBootstrap
信息
serverBootstrap.group(bossGroup, workerGroup);
将两个线程组设置到ServerBootstrap
中。
4、设置ServerSocketChannel
类型
serverBootstrap.channel(NioServerSocketChannel.class);
设置通道的IO
类型,Netty
不止支持Java NIO
,也支持阻塞式IO
,例如OIO
OioServerSocketChannel.class)
5、设置参数
serverBootstrap.option(ChannelOption.SO_BACKLOG, 100);
通过option()
方法可以设置很多参数,这里SO_BACKLOG
标识服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows
为200,其他为128,这里设置的是100。
6、设置Handler
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
设置 ServerSocketChannel
对应的Handler
,这里只能设置一个,它会在SocketChannel
建立起来之前执行。
7、设置子Handler
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new ChatNettyHandler());
}
});
Netty
中提供了一种可以设置多个Handler
的途径,即使用ChannelInitializer
方式。ChannelPipeline
是Netty
处理请求的责任链,这是一个ChannelHandler
的链表,而ChannelHandler
就是用来处理网络请求的内容的。
每一个channel
,都有一个处理器流水线。装配child channel
流水线,调用childHandler()
方法,传递一个ChannelInitializer
的实例。
在 child channel
创建成功,开始通道初始化的时候,在bootstrap启动器中配置的ChannelInitializer
实例就会被调用。
这个时候,才真正的执行去执行 initChannel
初始化方法,开始通道流水线装配。
流水线装配,主要是在流水线pipeline
的后面,增加负责数据读写、处理业务逻辑的handler
。
处理器 ChannelHandler
用来处理网络请求内容,有ChannelInboundHandler
和ChannelOutboundHandler
两种,ChannlPipeline
会从头到尾顺序调用ChannelInboundHandler
处理网络请求内容,从尾到头调用ChannelOutboundHandler
处理网络请求内容
8、绑定端口号
ChannelFuture f = serverBootstrap.bind(PORT).sync();
绑定端口号
9、等待服务端端口号关闭
f.channel().closeFuture().sync();
等待服务端监听端口关闭,sync()
会阻塞主线程,内部调用的是 Object
的 wait()
方法
10、关闭EventLoopGroup线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
这篇文章主要是从一个demo
作为引子,然后介绍了Netty
的包结构、Reactor
模型、编程规范等等,目的很简单,希望你能够读懂这段demo
并写出来。
后面开始继续Netty
源码解析部分,敬请期待。