本篇示例代码仓库:learn-netty
在 I/O 操作中有这么两组概念,其中同步/异步 要和线程中的同步线程/异步线程要区分开,这里指的是同步IO / 异步IO
阻塞/非阻塞:
同步/异步:
常见的 IO 模型:
BIO 是 blocking I/O 的简称,它是同步阻塞型 IO,其相关的类和接口在 java.io 下,简单来讲:
编写一个简单的 BioServer:
public class BioServer {
public static void main (String[] args) throws IOException {
// BIO 模型的服务端要为每一个客户端建立一个对应的连接
ServerSocket serverSocket = new ServerSocket(1145);
while (true) {
// 持续接受客户端的连接
Socket accept = serverSocket.accept();
// 为每一个客户端连接新开一个线程,执行对应的业务
new Thread(new ClientService(accept)).start();
}
}
static class ClientService implements Runnable {
private Socket socket;
public ClientService (Socket socket) {
this.socket = socket;
}
@Override
public void run() {
System.out.println("执行对应的业务操作:" + socket);
}
}
}
对应来一个简单的 Client:
public class Client {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 1145);
System.out.println("建立连接:" + socket);
}
}
这种 IO 模型的弊端十分明显:
NIO,称之为 New IO 或是 non-block IO(非阻塞IO),这两种说法都可以,其实称之为非阻塞 IO 更恰当一些
NIO的三大核心组件:
在应用层面,数据从网络传递给 Buffer,我们操作 Buffer 中的数据,之后再通过 NIO 的 api 将处理后的 Buffer 中的数据写回到网络中:
查看一下 java.nio 包下的 Buffer.java 源码中的几个私有属性:
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
mark <= position <= limit <= capacity 这个大小关系是在写模式下的:
当 Buffer 需要读数据时会进行读写模式切换:
public Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
Channel是一个通道,管道,网络数据通过 Channel 读取和写入
Channel 和流 Stream 的不同之处:
Java 提供两个网络读写相关的 Channel:
这里 SocketChannel 在不同端上所支持的事件是不一样的:
端类型 | Channel 类型 | OP_ACCEPT | OP_CONNECT | OP_WRITE | OP_READ |
---|---|---|---|---|---|
Client 端 | SocketChannel | 支持 | 支持 | 支持 | |
Server 端 | ServerSocketChannel | 支持 | |||
Server 端 | SocketChannel | 支持 | 支持 |
Selector(选择器/多路复用器):
基于 NIO 来实现一个服务端:
public class NioServer {
public static void main(String[] args) {
try {
//1、打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道(代表客户端连接的管道都是通过它创建的)
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//2、绑定监听端口,设置连接为非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(1145));
serverSocketChannel.configureBlocking(false);
//3、创建多路复用器Selector
Selector selector = Selector.open();
//4、将ServlerSocketChannel注册到selector上,监听客户端连接事件ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//5、创建 Reactor线程,让多路复用器在 Reactor 线程中执行多路复用程序
new Thread(new SingleReactor(selector)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class SingleReactor implements Runnable{
private final Selector selector;
public SingleReactor(Selector selector) {
this.selector = selector;
}
public void run() {
//6、selector轮询准备就绪的事件
while (true) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
try {
processKey(selectionKey);
} catch (IOException e) {
e.printStackTrace();
if (selectionKey !=null ) {
selectionKey.cancel();
SelectableChannel channel = selectionKey.channel();
if (channel !=null) {
channel.close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void processKey(SelectionKey key) throws IOException {
if (key.isValid()) {
//7、根据准备就绪的事件类型分别处理
if (key.isAcceptable()) {//客户端请求连接事件就绪
//7.1、接收一个新的客户端连接,创建对应的SocketChannel,
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
//7.2、设置socketChannel的非阻塞模式,并将其注册到Selector上,监听读事件
socketChannel.configureBlocking(false);
socketChannel.register(this.selector,SelectionKey.OP_READ);
}
if (key.isReadable()) {//读事件准备继续
//7.1、读客户端发送过来的数据
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBufer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(readBufer);
//前面设置过socketChannel是非阻塞的,故要通过返回值判断读取到的字节数
if (readBytes > 0) {
readBufer.flip();//读写模式切换
byte[] bytes = new byte[readBufer.remaining()];
readBufer.get(bytes);
String msg = new String(bytes,"utf-8");
//进行业务处理
String response = doService(msg);
//给客户端响应数据
System.out.println("服务端开始向客户端响应数据");
byte[] responseBytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(responseBytes.length);
writeBuffer.put(responseBytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
}else if (readBytes < 0) {
//值为-1表示链路通道已经关闭
key.cancel();
socketChannel.close();
}else {
//没读取到数据,忽略
}
}
}
}
private String doService(String msg) {
System.out.println("成功接收来自客户端发送过来的数据:"+msg);
return "hello client,i am server!";
}
}
对应的客户端实现:
public class NioClient {
public static void main(String[] args) {
try {
//1、窗口客户端SocketChannel,绑定客户端本地地址(不选默认随机分配一个可用地址)
SocketChannel socketChannel = SocketChannel.open();
//2、设置非阻塞模式,
socketChannel.configureBlocking(false);
//3、创建Selector
Selector selector = Selector.open();
//3、创建Reactor线程
new Thread(new SingleReactorClient(socketChannel,selector)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
class SingleReactorClient implements Runnable{
private final SocketChannel socketChannel;
private final Selector selector;
public SingleReactorClient(SocketChannel socketChannel, Selector selector) {
this.socketChannel = socketChannel;
this.selector = selector;
}
public void run() {
try {
//连接服务端
doConnect(socketChannel,selector);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
//5、多路复用器执行多路复用程序
while (true) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
processKey(selectionKey);
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void doConnect(SocketChannel sc, Selector selector) throws IOException {
System.out.println("客户端成功启动,开始连接服务端");
//3、连接服务端
boolean connect = sc.connect(new InetSocketAddress("127.0.0.1", 1145));
//4、将socketChannel注册到selector并判断是否连接成功,连接成功监听读事件,没有继续监听连接事件
System.out.println("connect="+connect);
if (connect) {
sc.register(selector, SelectionKey.OP_READ);
System.out.println("客户端成功连上服务端,准备发送数据");
//开始进行业务处理,向服务端发送数据
doService(sc);
}else {
sc.register(selector,SelectionKey.OP_CONNECT);
}
}
private void processKey(SelectionKey key) throws IOException {
if (key.isValid()) {
//6、根据准备就绪的事件类型分别处理
if (key.isConnectable()) {//服务端可连接事件准备就绪
SocketChannel sc = (SocketChannel) key.channel();
if (sc.finishConnect()) {
//6.1、向selector注册可读事件(接收来自服务端的数据)
sc.register(selector,SelectionKey.OP_READ);
//6.2、处理业务 向服务端发送数据
doService(sc);
}else {
//连接失败,退出
System.exit(1);
}
}
if (key.isReadable()) {//读事件准备继续
//6.1、读服务端返回的数据
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBufer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBufer);
//前面设置过socketChannel是非阻塞的,故要通过返回值判断读取到的字节数
if (readBytes > 0) {
readBufer.flip();//读写模式切换
byte[] bytes = new byte[readBufer.remaining()];
readBufer.get(bytes);
String msg = new String(bytes,"utf-8");
//接收到服务端返回的数据后进行相关操作
doService(msg);
}else if (readBytes < 0) {
//值为-1表示链路通道已经关闭
key.cancel();
sc.close();
}else {
//没读取到数据,忽略
}
}
}
}
private static void doService(SocketChannel socketChannel) throws IOException {
System.out.println("客户端开始向服务端发送数据:");
//向服务端发送数据
byte[] bytes = "hello nioServer,i am nioClient !".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
}
private String doService(String msg) {
System.out.println("成功接收来自服务端响应的数据:"+msg);
return "";
}
}
在NIO中,Selector 多路复用器在做轮询时,如果没有事件发生,也会进行阻塞,如何优化?
这里提出 AIO,它是 Asynchronous l/O 的简称(异步非阻塞 IO),是异步IO,该异步 IO 是需要依赖于操作系统底层的异步 IO 实现
目前该技术在 Windows 下实现成熟,但很少作为百万级以上或者说高并发应用的服务器操作系统来使用
Liux系统下,异步 IO 模型在 2.6 版本才引入,目前并不完善。所以 Liux 下,实现高并发网络编程时都是以 NIO 多路复用模型模式为主
Reactor 线程模型不是 Java专属,也不是 Netty 专属,它其实是一种并发编程模型,是一种思想,具有指导意义。
Reactor 模型中定义了三种角色:
我们之前在 Java NIO 中实现的代码其实就是一个类似的 Reactor 单线程模型:
在 Reactor 单线程模型中:
这样的模型好处是编码简单,实现容易,但是所有的业务都需要依赖单线程执行,很容易达到性能瓶颈,因此可以将业务抽离出来放到线程池中执行,这就是单 Reactor 多线程模型:
对于单 Reactor 多线程模型中,虽然我们已经将业务进行了分离,但是仍然存在缺陷:
对于服务器来说,接收客户端的连接是比较重要的,因此将这部分操作单独用线程去操作:
这里的 subReactor 可以有多个,但都只负责对连接建立事件的监听,已建立连接的 SocketChannel 将会注册到 MainReactor 中。
Netty 是由 JBOSS 提供的一个 java 开源框架,现为 Github上的独立项目,项目地址。
Netty 提供非阻塞的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序:
核心:
传输服务:
协议支持:
API隔离:
简化开发:
高可用机制:
缺陷处理
Netty 线程模型是基于 Reactor 模型实现的,对 Reactor 的三种模式都有非常好的支持,并做了一定的改进,也非常的灵活,一般情况,在服务端会采用主从架构模型。
对于主从 Reactor 架构:
Netty 抽象出两组线程池:
EventLoop 表示一个不断循环的执行事件处理的线程,每个 EventLoop 都包含一个 Selector,用于监听注册在其上的 Socket 网络连接(Channel)
每个 Boss EventLoop 中循环执行以下三个步骤:
每个 WorkerEventLoop 中循环执行以下三个步骤:
在以上两个 processSelectedKeys 步骤中,会使用 Pipeline(管道),Pipeline中引用了 Channel,即通过 Pipeline 可以获取到对 应的Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)
ChannelPipeline 提供了 ChannelHandler 链的容器:
以服务端程序为例:
Handler 的头节点和尾节点都是初始化好的,用户无需自己实现,只需要实现中间的 Handler 即可
当一个事件如接收到数据或者异常发生时,这个事件会按照 ChannelPipeline 中的 ChannelHandler 的顺序被处理,每个ChannelHandler 可以传递给下一个,直到有一个处理器处理它或者 Pipeline 中没有更多的处理器了,这个处理过程是责任链设计模式的体现。
在 ChannelPipeline 的处理流程中,对于入站和出站的数据,对应的 ChannelHandler 的类型不同:
inbound 入站事件处理顺序(方向)是由链表的头到链表尾,outbound 事件的处理顺序是由链表尾到链表头:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
这里为了后续便于演示,添加 sl4j:
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
public class NettyServer {
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
NettyServer NettyServer = new NettyServer();
NettyServer.start(8888);
}
public void start(int port) {
//创建 bossGroup workerGroup 分别管理连接建立事件和具体的业务处理事件
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
//创建启动引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
//配置参数
serverBootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class) //指定服务端通道,用于接收并创建新连接
.handler(new LoggingHandler(LogLevel.DEBUG)) // 给 boss group 配置 handler
.childHandler(new ChannelInitializer<SocketChannel>() {
//每个客户端 channel 初始化时都会执行该方法来配置该 channel 的相关 handler
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获取与该 channel 绑定的 pipeline
ChannelPipeline pipeline = ch.pipeline();
//向 pipeline 中添加 handler,如果没有注册到这里则不会生效
pipeline.addLast(new ServerOutboundHandler1());
pipeline.addLast(new ServerInboundHandler1());
pipeline.addLast(new ServerInboundHandler2());
}
}); //给 worker group 配置 handler
//服务端绑定端口启动
ChannelFuture future = serverBootstrap.bind(port).sync();
//服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("netty server error ,{}",e.getMessage());
} finally {
//优雅关闭 boss worker
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
这样我们就配置好了服务端,我们需要做的就是完成 worker 的 Pipeline 中各个 Handler 的处理逻辑即可
对于入站处理数据,需要一个 Inbound 类型的 Handler:
public class ServerInboundHandler1 extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ServerInboundHandler1.class);
/**
* 通道准备就绪时
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("ServerInboundHandler1 channelActive-----");
//将事件向下传递
ctx.fireChannelActive();
}
/**
* 通道有数据可读时
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("ServerInboundHandler1 channelRead----,remoteAddress={}", ctx.channel().remoteAddress());
//处理接收的数据
ByteBuf buf = (ByteBuf) msg;
log.info("ServerInboundHandler1:received client data = {}", buf.toString(StandardCharsets.UTF_8));
//将事件消息向下传递,如果不传递则 msg 不会到达下一个 handler
ctx.fireChannelRead(msg);
}
/**
* 数据读取完毕时
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("channelReadComplete----");
//数据读取结束后向客户端写回数据
byte[] data = "hello client , i am server".getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = Unpooled.buffer(data.length);
buffer.writeBytes(data);//以bytebuf为中心,看是写到bytebuf中还是从bytebuf中读
ByteBuf buf = Unpooled.copiedBuffer("hello client , i am server", StandardCharsets.UTF_8);
ctx.writeAndFlush(buf);//通过ctx写,事件会从当前handler向pipeline头部移动
//ctx.channel().writeAndFlush(buf);//通过Channel写,事件会从通道尾部向头部移动
}
/**
* 发生异常时
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("ServerInboundHandler1 exceptionCaught----,cause={}", cause.getMessage());
}
}
这里要注意,如果该 Handler 需要向下传递数据,即要让他之后的 Handler 也拿到 msg,需要在 channelRead 内 ChannelHandlerContext 的 fireChannelRead 方法
再来一个 ServerInboundHandler2 进行 msg 传递测试:
public class ServerInboundHandler2 extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ServerInboundHandler2.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("ServerInboundHandler2 channelActive-----");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("ServerInboundHandler2 channelRead----,remoteAddress={}", ctx.channel().remoteAddress());
//处理接收的数据
ByteBuf buf = (ByteBuf) msg;
log.info("ServerInboundHandler2:received client data = {}", buf.toString(StandardCharsets.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("ServerInboundHandler2 channelReadComplete----");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}
在数据处理完后,会由 tail 节点写回,我们也可以编写 Outbound 类型的 Handler 来添加对出站数据的处理:
public class ServerInboundHandler1 extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ServerInboundHandler1.class);
/**
* 通道准备就绪时
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("ServerInboundHandler1 channelActive-----");
//将事件向下传递
//ctx.fireChannelActive();
super.channelActive(ctx);
}
/**
* 通道有数据可读时
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("ServerInboundHandler1 channelRead----,remoteAddress={}", ctx.channel().remoteAddress());
//处理接收的数据
ByteBuf buf = (ByteBuf) msg;
log.info("ServerInboundHandler1:received client data = {}", buf.toString(StandardCharsets.UTF_8));
//将事件消息向下传递,如果不传递则 msg 不会到达下一个 handler
ctx.fireChannelRead(msg);
// super.channelRead(ctx, msg);
}
/**
* 数据读取完毕时
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("channelReadComplete----");
//数据读取结束后向客户端写回数据
byte[] data = "hello client , i am server".getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = Unpooled.buffer(data.length);
buffer.writeBytes(data);//以bytebuf为中心,看是写到bytebuf中还是从bytebuf中读
ByteBuf buf = Unpooled.copiedBuffer("hello client , i am server", StandardCharsets.UTF_8);
ctx.writeAndFlush(buf);//通过ctx写,事件会从当前handler向pipeline头部移动
//ctx.channel().writeAndFlush(buf);//通过Channel写,事件会从通道尾部向头部移动
}
/**
* 发生异常时
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("ServerInboundHandler1 exceptionCaught----,cause={}", cause.getMessage());
}
}
这里注意,在写回数据时;
ctx.channel().writeAndFlush()
:则会从 tail 节点从后往前寻找 Outbound 类型的 Handler 节点处理ctx.writeAndFlush()
:则会从当前的 Handler 流向 headpublic class NettyClient {
private static final Logger log = LoggerFactory.getLogger(NettyClient.class);
public static void main(String[] args) {
NettyClient client = new NettyClient();
client.start("127.0.0.1", 8888);
}
public void start(String host, int port) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加客户端 channel 对应的 handler
pipeline.addLast(new ClientInboundHandler1());
pipeline.addLast(new ClientSimpleInboundHandler2());
}
});
//连接远程启动
ChannelFuture future = bootstrap.connect(host, port).sync();
//监听通道关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("netty client error ,msg={}", e.getMessage());
} finally {
//优雅关闭
group.shutdownGracefully();
}
}
}
和服务端一样,只不过客户端不需要 worker,只需要完成当前 Pipeline 中各个 Handler 的处理逻辑即可
public class ClientInboundHandler1 extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ClientInboundHandler1.class);
/**
* 通道准备就绪
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("ClientInboundHandler1 channelActive begin send data");
//通道准备就绪后开始向服务端发送数据
ByteBuf buf = Unpooled.copiedBuffer("hello server,i am client".getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 通道有数据可读(服务端返回了数据)
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("ClientInboundHandler1 channelRead");
ByteBuf buf = (ByteBuf) msg;
log.info("ClientInboundHandler1: received server data ={}", buf.toString(StandardCharsets.UTF_8));
// 接着传递消息给下一个 ChannelInboundHandler
ctx.fireChannelRead(msg);
}
/**
* 数据读取完毕
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
/**
* 产生了异常
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
同样的,Client 的 ChannelInboundHandler 在 channelRead 也需要 fireChannelRead 才能将 msg 向后传递
这里继续编写一个 Handler 用于测试 msg 传递:
public class ClientSimpleInboundHandler2 extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger log = LoggerFactory.getLogger(ClientSimpleInboundHandler2.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
log.info("ClientSimpleInboundHandler2 channelRead");
log.info("ClientSimpleInboundHandler2: received server data ={}", msg.toString(StandardCharsets.UTF_8));
}
}
这里我们仍然基于上述的 Netty 线程模型来看:
Bootstrap 是引导的意思,它的作用是配置整个 Netty 程序,将各个组件都串起来,最后绑定端口、启动 Netty 服务
Netty 中提供了两种类型的引导类:
ServerBootstrap 将绑定到一个端口,因为服务器必须要监听连接,而 Bootstrap 则是由想要连接到远程节点的客户端应用程序所使用的
引导一个客户端只需要一个 EventLoopGroup,但是一个 ServerBootstrap 则需要两个
Netty 中的 Channel 是与网络套接字相关的,可以理解为是 socket 连接
在客户端与服务端连接的时候就会建立一个Channel,它负责基本的 IO 操作,比如:bind()、connecti()、read()、write()等
主要作用:
不同协议、不同的 I/O 类型的连接都有不同的 Channel 类型与之对应
Netty 是基于事件驱动的,比如:连接注册,连接激活;数据读取;异常事件等等,有了事件,就需要一个组件去监控事件的产生和事件的协调处理——这个组件就是 EventLoop(事件循环/EventExecutor)
在 Netty 中,每个 Channel 都会被分配到一个 EventLoop,一个 EventLoop 可以服务于多个 Channel,每个 EventLoop 会占用一个Thread,同时这个 Thread 会处理 EventLoop 上面发生的所有 IO 操作和事件
EventLoopGroup 是用来生成 EventLoop 的,包含了一组 EventLoop,可以初步理解成 Netty 线程池
在我们之前的示例代码中,EventLoopGroup 是接口,我们采用的实现是 NioEventLoopGroup:
// 主线程,不处理任何业务逻辑,只是接收客户的连接请求
EventLoopGroup boss = new NioEventLoopGroup();
// 工作线程,处理注册其上 Channel的 I/O 事件及其他 Task
EventLoopGroup worker = new NioEventLoopGroup();
这里查看 NioEventLoopGroup 源码,继承自 MultithreadEventLoopGroup:
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
其中 DEFAULT_EVENT_LOOP_THREADS
表示默认的核心线程数:
对于 bossgroup,我们其实也只用到了其中的一个线程,因为服务端一般只会绑定一个端口启动
每个客户端 Channel 创建后初始化时均会向与该 Channel 绑定的 Pipeline 中添加 Handler,此种模式下,每个 Channel 享有的是各自独立的 Handler,例如之前 NettyServer 中的配置初始化:
.childHandler(new ChannelInitializer<SocketChannel>() {
//每个客户端 channel 初始化时都会执行该方法来配置该 channel 的相关 handler
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获取与该 channel 绑定的 pipeline
ChannelPipeline pipeline = ch.pipeline();
//向 pipeline 中添加 handler,如果没有注册到这里则不会生效
pipeline.addLast(new ServerOutboundHandler1());
pipeline.addLast(new ServerInboundHandler1());
pipeline.addLast(new ServerInboundHandler2());
}
原先上述方式会给每次新注册进来的 Channel 初始化新的 Handler,如果我们稍作修改:
public class NettyServer {
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) {
NettyServer NettyServer = new NettyServer();
NettyServer.start(8888);
}
public void start(int port) {
//创建 bossGroup workerGroup 分别管理连接建立事件和具体的业务处理事件
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
// 只创建一次 serverInboundHandler2 对象
ServerInboundHandler2 serverInboundHandler2 = new ServerInboundHandler2();
try {
//创建启动引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
//配置参数
serverBootstrap.group(boss,worker)
.channel(NioServerSocketChannel.class) //指定服务端通道,用于接收并创建新连接
.handler(new LoggingHandler(LogLevel.DEBUG)) // 给 boss group 配置 handler
.childHandler(new ChannelInitializer<SocketChannel>() {
//每个客户端 channel 初始化时都会执行该方法来配置该 channel 的相关 handler
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获取与该 channel 绑定的 pipeline
ChannelPipeline pipeline = ch.pipeline();
//向 pipeline 中添加 handler,如果没有注册到这里则不会生效
pipeline.addLast(new ServerOutboundHandler1());
pipeline.addLast(new ServerInboundHandler1());
// 在这里对 serverInboundHandler2 进行复用
pipeline.addLast(serverInboundHandler2);
}
}); //给 worker group 配置 handler
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("netty server error ,{}",e.getMessage());
} finally {
//优雅关闭 boss worker
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
如果我们此时直接运行两个 NettyClient 实例并且绑定到这个 NettyServer,则第二个运行的实例将会报错:
[nioEventLoopGroup-2-1] INFO handler.client.ClientInboundHandler1 - ClientInboundHandler1 channelActive begin send data
[nioEventLoopGroup-2-1] WARN io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.io.IOException: 你的主机中的软件中止了一个已建立的连接。
at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:46)
at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:330)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:284)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:259)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:417)
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1147)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
Process finished with exit code 0
如果想要实现 ChannelHandler 复用,则只需要在对应需要复用的 Handler 上添加 @Shareble 注解即可:
对 ServerInboundHandler2 添加注解即可:
@ChannelHandler.Sharable
public class ServerInboundHandler2 extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ServerInboundHandler2.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("ServerInboundHandler2 channelActive-----");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("ServerInboundHandler2 channelRead----,remoteAddress={}", ctx.channel().remoteAddress());
//处理接收的数据
ByteBuf buf = (ByteBuf) msg;
log.info("ServerInboundHandler2:received client data = {}", buf.toString(StandardCharsets.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("ServerInboundHandler2 channelReadComplete----");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}
对于编写 Netty 数据入站处理器,可以选择继承 ChannellnboundHandlerAdapter
,也可以选择继 SimpleChannellnboundHandlers<I>
区别是什么?
对于继承了 ChannellnboundHandlerAdapter 的 channelRead 方法:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("ServerInboundHandler1 channelRead----,remoteAddress={}", ctx.channel().remoteAddress());
//处理接收的数据
ByteBuf buf = (ByteBuf) msg;
log.info("ServerInboundHandler1:received client data = {}", buf.toString(StandardCharsets.UTF_8));
//将事件消息向下传递,如果不传递则 msg 不会到达下一个 handler
ctx.fireChannelRead(msg);
}
其中 msg 是 Object 类型的,因此在当前 Handler 处理时需要判断上一个 Handler 处理的 msg 是什么类型的,在之前的示例中,我们默认了处理的 msg 都是 ByteBuf 类型,每次处理都要做强制转换。
对于 SimpleChannellnboundHandlers<I>
,本质上也是继承自 ChannellnboundHandlerAdapter
,但仅仅对其中的 channelRead 方法进行了重写:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
// 判断是否是接收
if (this.acceptInboundMessage(msg)) {
// 调用 channelRead0 方法
this.channelRead0(ctx, msg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (this.autoRelease && release) {
// 对原始资源释放
ReferenceCountUtil.release(msg);
}
}
}
继承 SimpleChannellnboundHandler 需要重写channelRead0方法,且可以通过泛型指定 msg 类型:
protected abstract void channelRead0(ChannelHandlerContext var1, I var2) throws Exception;
是一个抽象方法,参数将 var2 作为泛型指定,因此在使用 SimpleChannellnboundHandlers<I>
指定的类型后,只需要重写 channelRead0 方法可以帮我们把 msg 来转换。
注意:
Java NIO 提供了 ByteBuffer 作为它的字节容器,但是这个类使用起来过于复杂,而且也有些繁琐。
Netty 使用 ByteBuf 来替代 ByteBuffer,它是一个强大的实现,既解决了 JDK API 的局限性,又为网络应用程序的开发者提供了更好的 API。
从结构上来说:
如果 writerlndex 与 capacity() 容量相等时继续向 ByteBuf 中写数据,Netty 会自动扩容 ByteBuf,直到扩容到底层的内存大小为 maxCapacity
Netty 默认使用的是DirectByteBuf,如果需要使用HeapByteBuf模式,则需要进行系统参数的设置:
// 设置 HeapByteBuf 模式,但 ByteBuf 的分配器 ByteBufAllocator 要设置为非池化,否则不能切换到堆缓冲器模式
System.setProperty("io.netty.noUnsafe","true");
Netty 提供了两种 ByteBufAllocator 的实现,分别是:
Netty 默认使用了PooledByteBufAllocator,但可以通过引导类设置非池化模式
参考源码 DefaultChannelConfig 中的 allocator属性:
//引导类中设置非池化模式
bootstrap.childOption(ChannelOption.ALLOCATOR,UnpooledByteBufAllocator.DEFAULT)
//或者通过系统参数设置
System.setProperty("io.netty.allocator.type","pooled");
System.setProperty("io.netty.allocator.type","unpooled");
对于 Pooled 类型的 ByteBuf :
Netty 提供 Unpooled 工具类创建的 ByteBuf 都是 unpooled 类型,默认采用的 Allocator 是 direct 类型;当然用户可以自己选择创建 UnpooledDirectByteBuf 和 UnpooledHeapByteBuf
ByteBuf 不同模式下的释放:
Netty 自身引入了引用计数,提供了 ReferenceCounted 接口,当对象的引用计数 > 0 时要保证对象不被释放,当为 0 时需要被释放,这里分为手动释放和自动释放:
总结:
对于 JDK5 的 Future 来说,在调用 Future 后想要得到任务执行的返回值必须要通过 future. get() 方法监听 future 对象中 result 字段,因此这种方式会导致线程阻塞。
对于 Netty 来说,其的异步模型为 Future/Promise 异步模型:
对于JDK8来说,新增加了一个类:CompletableFuture,它提供了非常强大的 Future 的扩展功能,最重要的是实现了回调的功能
Netty 中使用了一个 ChannelFuture 来实现异步操作,其中 ChannelFuture 继承自 io.netty.util.concurrent.Future 接口:
public interface Future<V> extends java.util.concurrent.Future<V> {
// 只有IO操作完成时才返回true
boolean isSuccess();
// 只有当cancel(boolean)成功取消时才返回true
boolean isCancellable();
// IO操作发生异常时,返回导致IO操作以此的原因,如果没有异常,返回null
Throwable cause();
// 向Future添加事件,future完成时,会执行这些事件,如果add时future已经完成,会立即执行监听事件
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 移除监听事件,future完成时,不会触发
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待future done
Future<V> sync() throws InterruptedException;
// 等待future done,不可打断
Future<V> syncUninterruptibly();
// 等待future完成
Future<V> await() throws InterruptedException;
// 等待future 完成,不可打断
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 立刻获得结果,如果没有完成,返回null
V getNow();
// 如果成功取消,future会失败,导致CancellationException
@Override
boolean cancel(boolean mayInterruptIfRunning);
Netty 自己实现的 Future 继承了 JDK 的 Future,新增了 sync()
和await()
用于阻塞等待,还加了 Listeners,只要任务结束去回调 Listener 就可以了,那么我们就不一定要主动调用 isDone()
来获取状态,或通过 get()
阻塞方法来获取值。
Netty的 Future 与 Java 的 Future 虽然类名相同,但功能上略有不同,Netty 中引入了 Promise 机制。
Netty 的Future,只是增加了监听器。整个异步的状态,是不能进行设置和修改的,于是 Netty 的 Promise 接口扩展了 Netty 的 Future接口,可以设置异步执行的结果。在IO操作过程,如果顺利完成、或者发生异常,都可以设置Promise的结果,并且通知Promise的Listener 们,示例如下:
package netty;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class NettyFutureTest {
private static final Logger log = LoggerFactory.getLogger(NettyFutureTest.class);
@Test
public void testFuture() throws InterruptedException, ExecutionException {
EventLoopGroup group = new NioEventLoopGroup();
Future<String> future = group.submit( () ->{
log.info("---异步线程执行任务开始----,time={}", LocalDateTime.now().toString());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("---异步线程执行任务结束----,time={}", LocalDateTime.now().toString());
return "hello netty future";
});
/*String result = future.get();
log.info("----主线程阻塞等待异步线程执行结果:{}",result);*/
//设置监听
future.addListener( future1 -> {
log.info("---收到异步线程执行任务结果通知----执行结果是;{},time={}",future1.get(), LocalDateTime.now().toString());
});
log.info("---主线程----");
TimeUnit.SECONDS.sleep(10);
}
@Test
public void testPromise() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Promise promise = new DefaultPromise(group.next());//promise绑定到eventloop上
group.submit(()->{
log.info("---异步线程执行任务开始----,time={}", LocalDateTime.now().toString());
try {
int i = 1/0;
TimeUnit.SECONDS.sleep(3);
promise.setSuccess("hello netty promise");
TimeUnit.SECONDS.sleep(3);
log.info("---异步线程执行任务结束----,time={}", LocalDateTime.now().toString());
return;
} catch (Exception e) {
promise.setFailure(e);
}
});
//设置监听回调
promise.addListener(future -> {
log.info("----异步任务执行结果:{}",future.isSuccess());
});
promise.addListener(future2 -> {
log.info("----异步任务执行结果:{}",future2.isSuccess());
});
log.info("---主线程----");
TimeUnit.SECONDS.sleep(10);
}
}
在 Java 的 Future 中,业务逻辑为一个 Callable 或 Runnable 实现类,该类的 call()
或 run()
执行完毕意味着业务逻辑的完结,在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败,这样更加方便的监控自己的业务逻辑。
Netty 底层是基于 TCP 协议来处理网络数据传输,而对于 TCP 协议而言,它传输数据是基于字节流传输的。
应用层在传输数据时:
SO_SNDBUF:
SO_RCVBUF:
接收缓冲区保存收到的数据一直到应用进程读走为止。对于 TCP,如果应用进程一直没有读取,buffer 满了之后发生的动作是:通知对端 TCP 协议中的窗口关闭。这个便是滑动窗口的实现。保证 TCP 套接口接收缓冲区不会溢出,从而保证了 TCP 是可靠传输。因为对方不允许发出超过所通告窗口大小的数据。 这就是 TCP 的流量控制,如果对方无视窗口大小而发出了超过窗口大小的数据,则接收方 TCP 将丢弃它。
滑动窗口:
MTU (Maxitum Transmission Unit,最大传输单元)是链路层对一次可以发送的最大数据的限制。MSS(Maxitum Segment Size,最大分段大小)是 TCP 报文中 data 部分的最大长度,是传输层对一次可以发送的最大数据的限制。
数据在传输过程中,每经过一层,都会加上一些额外的信息:
MTU 是以太网传输数据方面的限制,每个以太网帧最大不能超过 1518bytes。刨去以太网帧的帧头(DMAC+SMAC+Type域) 14Bytes 和帧尾 (CRC校验 ) 4 Bytes,那么剩下承载上层协议的地方也就是 data 域最大就只能有 1500 Bytes 这个值 我们就把它称之为 MTU。
因此,对于过大的数据传输,TCP 必须对其进行拆包。
TCP/IP 协议中,无论发送多少数据,总是要在数据(data)前面加上协议头(TCP Header+IP Header),同时,对方接收到数据,也需要发送 ACK 表示确认。
为了尽可能的利用网络带宽,TCP 总是希望尽可能的发送足够大的数据。(一个连接会设置 MSS 参数,因此,TCP/IP 希望每次都能够以 MSS 尺寸的数据块来发送数据)。Nagle 算法就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。
Nagle 算法的基本定义是任意时刻,最多只能有一个未被确认的小段。 所谓 “小段”,指的是小于 MSS 尺寸的数据块;所谓“未被确认”,是指一个数据块发送出去后,没有收到对方发送的 ACK 确认该数据已收到。
Nagle 算法的规则:
TCP_NODELAY=true
选项,则允许发送。TCP_NODELAY 是取消 TCP 的确认延迟机制,相当于禁用了 Negale 算法。正常情况下,当 Server 端收到数据之后,它并不会马上向 client 端发送 ACK,而是会将 ACK 的发送延迟一段时间(一般是 40ms),它希望在 t 时间内 server 端会向 client 端发送应答数据,这样 ACK 就能够和应答数据一起发送,就像是应答数据捎带着 ACK 过去。当然,TCP 确认延迟 40ms 并不是一直不变的, TCP 连接的延迟确认时间一般初始化为最小值 40ms,随后根据连接的重传超时时间(RTO)、上次收到数据包与本次接收数据包的时间间隔等参数进行不断调整。另外可以通过设置 TCP_QUICKACK 选项来取消确认延迟;因此,综合分析来看,TCP 层肯定是会出现当次接收到的数据是不完整数据的情况。
出现粘包可能的原因有:
出现半包的可能原因有:
解决问题肯定不是在4层来做而是在应用层,通过定义通信协议来解决粘包和拆包的问题。发送方 和 接收方约定某个规则:
由此可知:发生这种现象的根本原因是因为,TCP协议是面向连接的、可靠的、基于字节流的传输层通信协议,是一种流式协议,消息无边界。
经过上述分析,解决TCP粘包,半包问题的根本在于找出消息的边界:
Netty 提供了针对封装成帧这种形式下不同方式的拆包器。
所谓的拆包其实就是数据的解码,所谓解码就是将网络中的一些原始数据解码成上层应用的数据,那对应在发送数据的时候要按照同样的方式进行数据的编码操作然后发送到网络中。
以下针对 Netty 的编码器示例完整代码:learn-netty-demo
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new FixedLengthFrameDecoder(10));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 自己的逻辑Handler
pipeline.addLast("handler", new PeServerHandler());
}
}
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new FixedLengthFrameDecoder(10));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 客户端的逻辑
pipeline.addLast("handler", new PeClientHandler());
}
}
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(2048));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 自己的逻辑Handler
pipeline.addLast("handler", new PeServerHandler());
}
}
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(2048));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 客户端的逻辑
pipeline.addLast("handler", new PeClientHandler());
}
}
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, // 帧的最大长度,即每个数据包最大限度
0, // 长度字段偏移量
4, // 长度字段所占的字节数
0, // 消息头的长度,可以为负数
4) // 需要忽略的字节数,从消息头开始,这里是指整个包
);
// pipeline.addLast("decoder", new StringDecoder());
//pipeline.addLast("encoder", new StringEncoder());
// 自己的逻辑Handler
pipeline.addLast("handler", new PeServerHandler());
}
}
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(10));
// pipeline.addLast("decoder", new StringDecoder());
// pipeline.addLast("encoder", new StringEncoder());
// 客户端的逻辑
pipeline.addLast("handler", new PeClientHandler());
}
}
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyProtocolEncoder());
/**
*
* MsgReq 对象 :
* byte type 字段占一个字节
* int length 字段 占4个字节
* 剩下的是消息体
* 那么 这里的 参数设置:
*
* int maxFrameLength:可以设置你认为的小题最大上限
* int lengthFieldOffset:长度字段从哪个位置开始读:第0个字节是type,长度是从1开始的
* int lengthFieldLength:长度字段所占的字节数,int类型占4个字节
* int lengthAdjustment:是否需要调整消息头的长度,即读取消息头是否需要偏移一下,我们这里不需要
* int initialBytesToStrip:消息体是否需要忽略一些字节数,比如忽略消息头的长度,我们这里消息头也算在对象里面了所以不忽略
* boolean failFast:如果读取长度不够是否快速失败
*
*/
pipeline.addLast(new MyProtocolDecoder(1024, // 帧的最大长度,即每个数据包最大限度
1, // 长度字段偏移量
4, // 长度字段所占的字节数
0, // 消息头的长度,可以为负数
0, // 需要忽略的字节数,从消息头开始,这里是指整个包
true)
);
// 自己的逻辑Handler
pipeline.addLast("handler", new PeServerHandler());
}
}
public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new MyProtocolEncoder());
/**
*
* MsgReq 对象 :
* byte type 字段占一个字节
* int length 字段 占4个字节
* 剩下的是消息体
* 那么 这里的 参数设置:
*
* int maxFrameLength:可以设置你认为的小题最大上限
* int lengthFieldOffset:长度字段从哪个位置开始读:第0个字节是type,长度是从1开始的
* int lengthFieldLength:长度字段所占的字节数,int类型占4个字节
* int lengthAdjustment:是否需要调整消息头的长度,即读取消息头是否需要偏移一下,我们这里不需要
* int initialBytesToStrip:消息体是否需要忽略一些字节数,比如忽略消息头的长度,我们这里消息头也算在对象里面了所以不忽略
* boolean failFast:如果读取长度不够是否快速失败
*
*/
pipeline.addLast(new MyProtocolDecoder(1024,
1,
4,
0,
0,
true));
// 客户端的逻辑
pipeline.addLast("handler", new PeClientHandler());
}
}
我们把解决半包粘包问题的常用三种解码器叫一次解码器。
其作用是将原始数据流(可能会出现粘包和半包的数据流)转换为用户数据(ByteBuf中存储),但仍然是字节数据。
因此我们需要二次解码器将字节数组转换为java对象,或者将一种格式转化为另一种格式,方便上层应用程序使用。
一次解码器继承自:ByteToMessageDecoder
二次解码器继承自:MessageToMessageDecoder
但他们的本质都是继承: ChannellnboundHandlerAdapter
用户数据(ByteBuf)Java Object之间的转换,或者将将一种格式转化为另一种格式(譬如将应用数据转化成某种 协议数据)。
protected void initChannel(SocketChannel ch) throws Exception {
//获取与该channel绑定的pipeline
ChannelPipeline pipeline = ch.pipeline();
//测试 http service
pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(new HttpRequestDecoder());
//文件上传需要设置大点儿 单位是字节
pipeline.addLast(new HttpObjectAggregator(1024*1024*8));
pipeline.addLast(new MyHttpServerHandler());
}
public class MyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger log = LoggerFactory.getLogger(MyHttpServerHandler.class);
private static final HttpDataFactory HTTP_DATA_FACTORY = new DefaultHttpDataFactory(DefaultHttpDataFactory.MAXSIZE);
static {
DiskFileUpload.baseDirectory = "/opt/netty/fileupload";
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception {
//HttpRequest request = fullHttpRequest;
//String uri = fullHttpRequest.uri();
//获取method
HttpMethod method = fullHttpRequest.method();
//根据method解析参数,封装数据,
if (HttpMethod.GET.equals(method)) {
parseGet(fullHttpRequest);
}else if (HttpMethod.POST.equals(method)) {
parsePost(fullHttpRequest);
}else {
log.error("{} method is not supported ,please change http method for get or post!");
}
//service
//response client
StringBuilder sb = new StringBuilder();
sb.append("<html>");
sb.append("<head>");
sb.append("</head>");
sb.append("<body>");
sb.append("<h3>success</h3>");
sb.append("</body>");
sb.append("</html>");
writeResponse(ctx,fullHttpRequest,HttpResponseStatus.OK,sb.toString());
}
private void writeResponse(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, HttpResponseStatus status, String msg) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,status);
response.content().writeBytes(msg.getBytes(StandardCharsets.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8");
HttpUtil.setContentLength(response,response.content().readableBytes());
boolean keepAlive = HttpUtil.isKeepAlive(fullHttpRequest);
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION,"keep-alive");
ctx.writeAndFlush(response);
}else {
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
private void parsePost(FullHttpRequest fullHttpRequest) {
//获取content-type
String contentType = getContentType(fullHttpRequest);
switch (contentType) {
case "application/json":
parseJson(fullHttpRequest.content());
break;
case "application/x-www-form-urlencoded":
parseForm(fullHttpRequest);
break;
case "multipart/form-data":
parseMultipart(fullHttpRequest);
break;
default:
}
}
private void parseMultipart(FullHttpRequest fullHttpRequest) {
HttpPostRequestDecoder postRequestDecoder = new HttpPostRequestDecoder(HTTP_DATA_FACTORY,fullHttpRequest);
//判断是否是multipart
if (postRequestDecoder.isMultipart()) {
//获取 body中的数据
List<InterfaceHttpData> bodyHttpDatas = postRequestDecoder.getBodyHttpDatas();
bodyHttpDatas.forEach(dataItem ->{
//获取数据项的类型
InterfaceHttpData.HttpDataType dataType = dataItem.getHttpDataType();
//判断是普通表达项还是文件上传项
if (dataType.equals(InterfaceHttpData.HttpDataType.Attribute)) {
//普通表单项 直接获取数据
Attribute attribute = (Attribute) dataItem;
try {
log.info("表单项名称:{},表单项值:{}",attribute.getName(),attribute.getValue());
} catch (IOException e) {
log.error("获取表单项数据错误,msg={}",e.getMessage());
}
}else if (dataType.equals(InterfaceHttpData.HttpDataType.FileUpload)) {
//文件上传项 处理待上传的数据
FileUpload fileUpload = (FileUpload) dataItem;
//获取原始文件名称
String filename = fileUpload.getFilename();
//获取表单name属性
String name = fileUpload.getName();
log.info("文件名称:{},表单项名称:{}",filename,name);
//将文件数据保存到磁盘
if (fileUpload.isCompleted()) {
try {
String path = DiskFileUpload.baseDirectory + File.separator + filename;
//File file = fileUpload.getFile();
fileUpload.renameTo(new File(path));
} catch (IOException e) {
log.error("文件转存失败,msg={}",e.getMessage());
}
}
}else {
}
});
}
}
private void parseForm(FullHttpRequest fullHttpRequest) {
//post请求时uri中也可能携带参数
parseKVstr(fullHttpRequest.uri(),true);
//解析请求体中的表单数据
parseFormData(fullHttpRequest.content());
}
private void parseFormData(ByteBuf body) {
String bodystr = body.toString(StandardCharsets.UTF_8);
parseKVstr(bodystr,false);
}
private void parseJson(ByteBuf jsonbody) {
String jsonstr = jsonbody.toString(StandardCharsets.UTF_8);
//使用json工具反序列化
JSONObject jsonObject = JSONObject.parseObject(jsonstr);
//打印 json数据
jsonObject.entrySet().stream().forEach(entry ->{
log.info("json key={},json value={}",entry.getKey(),entry.getValue());
});
}
private String getContentType(FullHttpRequest request) {
HttpHeaders headers = request.headers();
String contentType = headers.get(HttpHeaderNames.CONTENT_TYPE);// text/plain;charset=UTF-8
//List<String> acceptEncoding = headers.getAll(HttpHeaderNames.ACCEPT_ENCODING);//accept-encoding:gzip, deflate, br
return contentType.split(";")[0];
}
private void parseGet(FullHttpRequest request) {
//通过uri解析请求参数
parseKVstr(request.uri(),true);
}
private void parseKVstr(String str,boolean hasPath) {
//通过QueryStringDecoder解析kv字符串
QueryStringDecoder qsd = new QueryStringDecoder(str, StandardCharsets.UTF_8,hasPath);////get请求的uri是: path?k=v
Map<String, List<String>> parameters = qsd.parameters();
//封装参数,执行业务 此处打印即可
if (parameters!=null && parameters.size() > 0) {
parameters.entrySet().stream().forEach(entry->{
log.info("参数名:{},参数值:{}",entry.getKey(),entry.getValue());
});
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("MyHttpServerHandler Exception,{}",cause.getMessage());
}
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>测试 netty http server</title>
</head>
<body>
<form action="http://localhost:8888/test" method="post" enctype="multipart/form-data">
<label for="username">用户名:</label><input id="username" type="text" name="username"><br/>
<label for="password">密码:</label><input id="password" type="password" name="password"><br/>
<label for="email">邮箱:</label><input id="email" type="email" name="email"><br/>
<label for="address">地址:</label><input id="address" type="text" name="address"><br/>
<label for="pic">选择文件:</label><input id="pic" type="file" name="pic"><br/>
<input type="submit" value="提交">
</form>
</body>
</html>
.childHandler(new ChannelInitializer<SocketChannel>() {
//每个客户端channel初始化时都会执行该方法来配置该channel的相关handler
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获取与该channel绑定的pipeline
ChannelPipeline pipeline = ch.pipeline();
//基于netty开发websocket 服务端
pipeline.addLast(new HttpServerCodec());//HttpServerCodec = HttpRequestDecoder + HttpResponseEncoder
pipeline.addLast(new HttpObjectAggregator(1024*1024*8));
//pipeline.addLast(new ChunkedWriteHandler());
//添加业务处理器
pipeline.addLast(new MyWebSocketServerHandler());
}
});
public class MyWebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger log = LoggerFactory.getLogger(MyWebSocketServerHandler.class);
private WebSocketServerHandshaker serverHandshaker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//判断是http握手请求还是websocket请求
if (msg instanceof FullHttpRequest) {
boolean handShaker = handleHttpRequest(ctx, (FullHttpRequest) msg);
if (handShaker) {
//握手成功后 服务端主动推送消息,每隔5s推送一次
new Thread(()->{
while (true) {
try {
ctx.channel().writeAndFlush(new TextWebSocketFrame("你好,这是服务器主动推送回来的数据,当前时间为:"+new Date().toString()));
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
log.error("push msg exception ,{}",e.getMessage());
}
}
}).start();
}
}else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx,(WebSocketFrame)msg);
}
}
/**
* 接收到的消息是已经解码的WebSocketFrame消息
* @param ctx
* @param frame
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断链路消息类型
if (frame instanceof CloseWebSocketFrame) { // 关闭链路指令
serverHandshaker.close(ctx.channel(),((CloseWebSocketFrame) frame).retain());
return;
}
if (frame instanceof PingWebSocketFrame) { // 维持链路的ping 指令
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}
if (frame instanceof TextWebSocketFrame) { // 普通文本消息
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
String message = textFrame.text();
log.info("receive text msg is {}",message);
//构造返回
ctx.channel().writeAndFlush(new TextWebSocketFrame("你好,欢迎使用netty websocket 服务,当前时间为:"+new Date().toString()));
return;
}
if (frame instanceof BinaryWebSocketFrame) { //二进制消息
log.info("frame is binarywebsocketframe");
}
}
private boolean handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) {
//先判断是否解码成功,
if (!fullHttpRequest.decoderResult().isSuccess()) {
sendHttpResponse(ctx,fullHttpRequest,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return false;
}
// 然后判断是否要建立websocket连接
//构造握手工厂 创建握手处理类,并且构造握手响应给客户端
WebSocketServerHandshakerFactory serverHandshakerFactory = new WebSocketServerHandshakerFactory("ws://localhost:8888/mywebsocket",null,false);
if (!fullHttpRequest.headers().contains(HttpHeaderNames.UPGRADE,"websocket",true)) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
return false;
}
serverHandshaker = serverHandshakerFactory.newHandshaker(fullHttpRequest);
serverHandshaker.handshake(ctx.channel(),fullHttpRequest);
return true;
}
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
if (!response.status().equals(HttpResponseStatus.OK)) {
ByteBuf byteBuf = Unpooled.wrappedBuffer("error".getBytes(StandardCharsets.UTF_8));
response.content().writeBytes(byteBuf);
byteBuf.release();
HttpUtil.setContentLength(response,response.content().readableBytes());
}
ChannelFuture future = ctx.channel().writeAndFlush(response);
if (!HttpUtil.isKeepAlive(request) || !response.status().equals(HttpResponseStatus.OK) ) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("server error,msg is {}",cause.getMessage());
ctx.close();
}
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>test netty websocket </title>
</head>
<body>
<br>
<script type="text/javascript">
var socket;
if(!window.WebSocket){
window.WebSocket=window.MozWebSocket;
}
if(window.WebSocket){
socket=new WebSocket("ws://localhost:8888/mywebSocket");
socket.onmessage=function(event){
var ta=document.getElementById('responseText');
ta.value="";
ta.value=event.data;
};
socket.onopen=function(event){
var ta=document.getElementById('responseText');
ta.value='打开WebSocket服务器正常,浏览器支持WebSocket!';
};
socket.onclose=function(event){
var ta=document.getElementById('responseText');
ta.value='';
ta.value="WebSocket 关闭!";
};
}else{
alert("抱歉,您的浏览器不支持WebSocket协议!");
}
function send(message){
if(!window.WebSocket){
return;
}
if(socket!=null){
socket.send(message);
}else{
alert("WebSocket连接没有建立成功,请刷新页面!");
}
/* if(socket.readyState==WebSocket.open){
socket.send(message);
}else{
alert("WebSocket连接没有建立成功!");
} */
}
</script>
<form onsubmit="return false;">
<input type="text" name="message" value="Netty WebSocket实战"/>
<br><br>
<input type="button" value="发送WebSocket请求消息" onclick="send(this.form.message.value)"/>
<hr color="blue"/>
<h3>服务端返回的应答消息</h3>
<textarea id="responseText" style="width:500px;height:300px;"></textarea>
</form>
</body>
</html>
主要由如下三个参数:
当启用(默认关闭)keepalive时:
所以总耗时一般为:2小时11分钟(7200秒+75秒*9次
除了在tcp网络层开启 keepalive 之外,我们普遍还需要在应用层启动 keepalive,一般称之为:应用心跳(心跳机制)
原因如下:
另外需要注意:http虽然属于应用层协议,因此会经常听到HTTP的头信息:Connection:Keep-Alive,HTTP/1.1默认使 Connection:keep-alive 进行长连接。
在一次TCP连接中可以完成多个HTTP请求,但是对每个请求仍然要单独发header,Keep-Alive不会永久保持连接,它有一个保持时间,可以在不同的服务器软件(如Apache)中设定这个时间。
这种长连接是一种“伪链接”,而且只能由客户端发送请求,服务端响应。HTTP协议的长连接和短连接,实质上是TCP协议的长连接和短连接。
ldle监测,只是负责诊断,诊断后,做出不同的行为,决定Idle监测的最终用途,一般用来配合keepalive,减少 keepalive 消息:
这样的机制优点在于:
首先,将NettyClient
和NettyServer
拷贝一份得到NettyClientV2
和NettyServerV2
,其次初始化服务端和客户端的pipeline
//Server端
protected void initChannel(SocketChannel ch) throws Exception {
//获取与该channel绑定的pipeline
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new StringEncoder());
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024,0,2,0,2));
pipeline.addLast(new StringDecoder());
}
//Client端
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new StringEncoder());
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024,0,2,0,2));
pipeline.addLast(new StringDecoder());
}
服务端编写用于idle监测的handler
@Slf4j
public class ServerReadIdleCheckHandler extends IdleStateHandler {
public ServerReadIdleCheckHandler() {
super(10, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.info("server channel idle----");
if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) {
ctx.close();
log.info("server read idle , close channel.....");
return;
}
super.channelIdle(ctx, evt);
}
}
服务端添加 ServerIdleCheckHandler
到 pipeline
中
protected void initChannel(SocketChannel ch) throws Exception {
//获取与该channel绑定的pipeline
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new ServerReadIdleCheckHandler());//添加ServerReadIdleCheckHandler
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new StringEncoder());
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024,0,2,0,2));
pipeline.addLast(new StringDecoder());
}
客户端完成,5s的 write 监测,超过5s不发送数据,就发送一个 keepalive 消息,避免被服务端断掉连接,故,编写客户端的 idlehandler
@Slf4j
public class ClientWriteCheckIdleHandler extends IdleStateHandler {
public ClientWriteCheckIdleHandler() {
super(0, 5, 0, TimeUnit.SECONDS);
}
//也可在channelIdle方法中直接处理
}
@Slf4j
@ChannelHandler.Sharable
public class KeepaliveHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
log.info("client write idle,so send keepalive msg to server");
ctx.writeAndFlush("this is keepalive msg");
}
super.userEventTriggered(ctx, evt);
}
}
客户端向 pipeline 中添加 handler
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new ClientWriteCheckIdleHandler());//write idle监测
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new StringEncoder());
pipeline.addLast(new KeepaliveHandler());//发送keepalvie消息
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024,0,2,0,2));
pipeline.addLast(new StringDecoder());
}
设置 linux 系统参数,例如:/proc/sys/net/ipv4/tcp_keepalive_time
Linux 参数:
netty支持的系统参数设置,例如:serverbootstrap.option(ChannelOption.S0 BACKLOG,1024),且设置形式有两种:
ScoketChannel:有 7 个参数可以设置,常用的两个:
ServerSocketChannel:有 3 个参数可以设置,常用的一个如下:
根据实际需求将业务进行分类:
系统内核处理 IO 操作分为两个阶段:
在 OS 层面上的 Zero-copy 通常指避免在用户态(User-space)与内核态(Kernel-space)之间来回拷贝数据
例如,Linux 提供的 mmap 系统调用:
正因为有这样的映射关系, 我们就不需要在 用户态(User-space)与 内核态(Kernel-space)之间拷贝数据,提高了数据传输的效率
通过java的 FileChannel.transferTo 方法(底层基于NIO),可以避免多余的拷贝(当然这需要底层操作系统的支持)
Netty 中的 Zero-copy 与上面我们所提到到 OS 层面上的 Zero-copy 不太一样。
Netty 的 Zero-coyp 完全是在用户态(Java层面)的,它的 Zero-copy 的更多的是偏向于优化数据操作这样的概念,Netty 的 Zero-copy 主要体现在如下几个方面: