客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel
的事件轮询、事件处理是在NioEventLoop
的run
方法中,从这里我们就很容易找我服务端读流程的入口方法:processSelectedKeys()
。
从processSelectedKeys()
一直追踪下去,可以看到OP_READ
处理逻辑分支:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
可能你会比较奇怪:为什么OP_READ
和OP_ACCEPT
都会走这个分支?
OP_ACCEPT
是NioServerSocketChannel
处理的事件,而OP_READ
是NioSocketChannel
处理的事件,所以,虽然它们都走这个分支,但是channel类型确是不一样的,即这里的unsafe
类型也不一样,一个是:NioMessageUnsafe
,另一个是:NioSocketChannelUnsafe
。NioServerSocketChannel
负责监听客户端连接,当有客户端连接进入时,对它来说就是有个读入消息需要被处理。这里我们是处理client channle
的OP_READ
,所以,unsafe
是NioSocketChannelUnsafe
类型实例。
AbstractNioByteChannel.NioByteUnsafe#read
方法代码如下:
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 申请ByteBuf对象
byteBuf = allocHandle.allocate(allocator);
//doReadBytes(byteBuf):将数据读取到ByteBuf中
//lastBytesRead()将读取的字节数设置到lastBytesRead
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
//触发pipeline channelRead事件,将读入数据ByteBuf传入到handler中
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());//判断是否继续读取
allocHandle.readComplete();
//触发pipeline channelReadComplete
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
这个方法刨除其它逻辑,关于客户端数据处理逻辑主要包括3个步骤:
allocHandle.lastBytesRead(doReadBytes(byteBuf))
:调用java api
,从channel
中读取字节数据到ByteBuf
缓存中;pipeline.fireChannelRead(byteBuf)
:触发pipeline
的channelRead
事件,并将带有读入数据的ByteBuf
通过参数传入;pipeline.fireChannelReadComplete()
:触发pipeline
的channelReadComplete
事件;调用pipeline
的fireChannelRead()
就可触发channelRead
事件在handler
之间传播,事件传播这块代码比较绕,给人感觉不停的来回调用容易绕晕,下面通过图可以更加直观的看出调用流程,再配合代码就很好理解了。
关键点就在于HandlerContext
中提供了一个静态方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
,第一个是在哪个handler
上触发事件,第二个参数就是数据本身,通过这个方法就可以指定在哪个handler
上触发channelRead
事件。由于pipeline
中的handler
是被包装成HandlerContext
放入的,所以,可以通过handler()
方法找到真正的handler
对象进行触发。
比如pipeline
的fireChannelRead()
就是触发head
的channelRead
事件,如果处理完成需要把事件继续传播给下一个handler
,就需要调用ctx.fireChannelRead(msg)
方法即可,该方法中通过next
属性获取到下一个节点,然后执行static invokeChannelRead(next, msg)
这个方法就可以将事件传播到下一个节点上。
pipeline.fireChannelRead(byteBuf)
运行完成后会调用pipeline.fireChannelReadComplete()
方法,触发channelReadComplete
事件,执行机制和channelRead
事件一样,就不再赘述。
搞清楚上面原理,就很容易理解
ctx.fireChannelRead()
和ctx.pipeline().fireChannelRead()
之间的区别了,避免误用。
上面分析的都是常规模式,没有给handler
指定额外线程情况下channelRead
和channelReadComplete
传播机制,大致如下图:
先触发channelRead
事件,按照pipeline
中顺序依次触发,当所有handler
都触发完后,再触发channelReadComplete
事件,按照pipeline
中的顺序依次触发。这些所有流程采用的都是同步方式,在同一个线程中执行,这个线程就是channel
注册的NioEventLoop
。
我们来看下static void invokeChannelRead()
这个方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
在执行next.invokeChannelRead(m)
方法前有个executor.inEventLoop()
判断,判断当前执行线程是不是就是handler
执行所需的线程。执行handler
方法是不能随便线程都可以去执行的,必须使用handler
内部指定的executor
线程执行器中执行才行。如下图,也就是说红色框框中的内容必须在executor
线程执行器中执行,如果当前线程和handler
执行线程不是同一个,就需要进行线程切换:则调用封装成一个任务,提交到executor
的任务队列中让其执行。
executor
线程执行器是通过next.executor()
方法获取到的,从这个方法源码中可以看到获取逻辑:如果HandlerContext
中executor
有值则直接返回;否则返回channel
注册的NioEventLoop
作为线程执行器。
在添加handler
时可以指定一个EventGroup
:pipeline.addLast( bizGroup, "handler2", new OtherTest02());
,这样,再把handler
包装成HandlerContext
过程中会从这个EventGroup
根据chooser
选取策略获得一个EventLoop
赋值给executor
。
所以,从上面分析,默认情况下handler
都是在channel
注册的NioEventLoop
线程中执行的,除非在addLast
添加handloer
时特别指定。
下面我们通过一个案例分析下pipeline
线程模型,如下,给handler02
添加一个额外的线程池:
EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast( "handler01", new OtherTest01());
pipeline.addLast( bizGroup, "handler02", new OtherTest02());
pipeline.addLast( "handler03", new OtherTest03());
}
这时,channelRead
和channelReadComplete
事件触发流程见下图:
channelRead
事件执行流程说明:
handler01
的channelRead
事件,本身当前线程和handler01
是同一个线程,所以,直接调用handler#channelRead()
方法;handler01#channelRead()
方法执行完成后,事件继续向下传播,需要调用handler02#channelRead()
方法,但是handler02
执行线程并不是默认的channel
的注册线程,而是额外设置的biz
线程,需要将调用包装成一个任务提交到biz
线程的任务队列taskQueue
中,然后直接返回;taskQueue
中获取任务执行,这样就完成了线程切换效果;handler02#channelRead()
方法执行完成后,需要执行handler03#channelRead()
,它们又不在同一个线程中执行,这时有需要切换线程,所以会把handler03#channelRead()
的调用封装成一个任务提交到register eventLoop的taskQueue
中,待其内部线程提取执行;下面再来看下channelReadComplete
事件执行流程:
a1
将任务提交给taskQueue
任务队列后直接返回了,而不是等其执行完成再返回;a1
返回后,从源码分析来看,会立即触发channelReadComplete
事件,涉及到线程切换,同理b1
这里也是将handler02#channelReadComplete()
调用封装成任务放入到biz eventLoop
的taskQueue
中的,然后也直接返回了;biz eventLoop
线程执行器taskQueue
中就有两个任务,会按照顺序依次执行:先执行channelRead()
调用,再执行channelReadComplete()
调用;a3、b3
时同理;从上面可以看出,Pipeline
中handler
可以在不同线程间切换得到关键是:taskQueue
。还要一点非常重要:handler
线程池执行器默认使用的channel
注册的NioEventLoop
这个,NioEventLoop
采用的是单线程工作模式,同时还需要处理selector.select()
事件轮询,所以,handler
里肯定不能有耗时、特别是IO
阻塞等操作,不然卡在handler
中,selector#select()
执行不到,无法及时接收到客户端传送过来的数据。