前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Netty源码分析】04 服务端读流程

【Netty源码分析】04 服务端读流程

作者头像
Reactor2020
发布2023-03-22 19:00:40
4110
发布2023-03-22 19:00:40
举报
文章被收录于专栏:【云原生 • Prometheus】

读流程

客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel的事件轮询、事件处理是在NioEventLooprun方法中,从这里我们就很容易找我服务端读流程的入口方法:processSelectedKeys()

processSelectedKeys()一直追踪下去,可以看到OP_READ处理逻辑分支:

代码语言:javascript
复制
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
 unsafe.read();
}

可能你会比较奇怪:为什么OP_READOP_ACCEPT都会走这个分支?

  • OP_ACCEPTNioServerSocketChannel处理的事件,而OP_READNioSocketChannel处理的事件,所以,虽然它们都走这个分支,但是channel类型确是不一样的,即这里的unsafe类型也不一样,一个是:NioMessageUnsafe,另一个是:NioSocketChannelUnsafe
  • NioServerSocketChannel负责监听客户端连接,当有客户端连接进入时,对它来说就是有个读入消息需要被处理。

这里我们是处理client channleOP_READ,所以,unsafeNioSocketChannelUnsafe类型实例。

AbstractNioByteChannel.NioByteUnsafe#read方法代码如下:

代码语言:javascript
复制
public final void read() {
 final ChannelConfig config = config();
    if (shouldBreakReadReady(config)) {
     clearReadPending();
         return;
    }
    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // 申请ByteBuf对象
            byteBuf = allocHandle.allocate(allocator);
            //doReadBytes(byteBuf):将数据读取到ByteBuf中
            //lastBytesRead()将读取的字节数设置到lastBytesRead
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            //触发pipeline channelRead事件,将读入数据ByteBuf传入到handler中
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());//判断是否继续读取
  
        allocHandle.readComplete();
        //触发pipeline channelReadComplete
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

这个方法刨除其它逻辑,关于客户端数据处理逻辑主要包括3个步骤:

  • allocHandle.lastBytesRead(doReadBytes(byteBuf)):调用java api,从channel中读取字节数据到ByteBuf缓存中;
  • pipeline.fireChannelRead(byteBuf):触发pipelinechannelRead事件,并将带有读入数据的ByteBuf通过参数传入;
  • pipeline.fireChannelReadComplete():触发pipelinechannelReadComplete事件;

事件传播

调用pipelinefireChannelRead()就可触发channelRead事件在handler之间传播,事件传播这块代码比较绕,给人感觉不停的来回调用容易绕晕,下面通过图可以更加直观的看出调用流程,再配合代码就很好理解了。

关键点就在于HandlerContext中提供了一个静态方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),第一个是在哪个handler上触发事件,第二个参数就是数据本身,通过这个方法就可以指定在哪个handler上触发channelRead事件。由于pipeline中的handler是被包装成HandlerContext放入的,所以,可以通过handler()方法找到真正的handler对象进行触发。

比如pipelinefireChannelRead()就是触发headchannelRead事件,如果处理完成需要把事件继续传播给下一个handler,就需要调用ctx.fireChannelRead(msg)方法即可,该方法中通过next属性获取到下一个节点,然后执行static invokeChannelRead(next, msg)这个方法就可以将事件传播到下一个节点上。

pipeline.fireChannelRead(byteBuf)运行完成后会调用pipeline.fireChannelReadComplete()方法,触发channelReadComplete事件,执行机制和channelRead事件一样,就不再赘述。

搞清楚上面原理,就很容易理解ctx.fireChannelRead()ctx.pipeline().fireChannelRead()之间的区别了,避免误用。

Pipeline线程模型

上面分析的都是常规模式,没有给handler指定额外线程情况下channelReadchannelReadComplete传播机制,大致如下图:

先触发channelRead事件,按照pipeline中顺序依次触发,当所有handler都触发完后,再触发channelReadComplete事件,按照pipeline中的顺序依次触发。这些所有流程采用的都是同步方式,在同一个线程中执行,这个线程就是channel注册的NioEventLoop

我们来看下static void invokeChannelRead()这个方法:

代码语言:javascript
复制
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

在执行next.invokeChannelRead(m)方法前有个executor.inEventLoop()判断,判断当前执行线程是不是就是handler执行所需的线程。执行handler方法是不能随便线程都可以去执行的,必须使用handler内部指定的executor线程执行器中执行才行。如下图,也就是说红色框框中的内容必须在executor线程执行器中执行,如果当前线程和handler执行线程不是同一个,就需要进行线程切换:则调用封装成一个任务,提交到executor的任务队列中让其执行。

executor线程执行器是通过next.executor()方法获取到的,从这个方法源码中可以看到获取逻辑:如果HandlerContextexecutor有值则直接返回;否则返回channel注册的NioEventLoop作为线程执行器。

在添加handler时可以指定一个EventGrouppipeline.addLast( bizGroup, "handler2", new OtherTest02());,这样,再把handler包装成HandlerContext过程中会从这个EventGroup根据chooser选取策略获得一个EventLoop赋值给executor

所以,从上面分析,默认情况下handler都是在channel注册的NioEventLoop线程中执行的,除非在addLast添加handloer时特别指定。

下面我们通过一个案例分析下pipeline线程模型,如下,给handler02添加一个额外的线程池:

代码语言:javascript
复制
EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());

protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast( "handler01", new OtherTest01());
    pipeline.addLast( bizGroup, "handler02", new OtherTest02());
    pipeline.addLast( "handler03", new OtherTest03());
}

这时,channelReadchannelReadComplete事件触发流程见下图:

channelRead事件执行流程说明:

  • 上下两部分代表两个线程,上面是channel注册的eventLoop,下面是添加handler02指定的eventLoop;
  • 首先触发handler01channelRead事件,本身当前线程和handler01是同一个线程,所以,直接调用handler#channelRead()方法;
  • handler01#channelRead()方法执行完成后,事件继续向下传播,需要调用handler02#channelRead()方法,但是handler02执行线程并不是默认的channel的注册线程,而是额外设置的biz线程,需要将调用包装成一个任务提交到biz线程的任务队列taskQueue中,然后直接返回;
  • biz线程执行器内部线程会一直循环从taskQueue中获取任务执行,这样就完成了线程切换效果;
  • handler02#channelRead()方法执行完成后,需要执行handler03#channelRead(),它们又不在同一个线程中执行,这时有需要切换线程,所以会把handler03#channelRead()的调用封装成一个任务提交到register eventLoop的taskQueue中,待其内部线程提取执行;

下面再来看下channelReadComplete事件执行流程:

  • 上图a1将任务提交给taskQueue任务队列后直接返回了,而不是等其执行完成再返回;
  • a1返回后,从源码分析来看,会立即触发channelReadComplete事件,涉及到线程切换,同理b1这里也是将handler02#channelReadComplete()调用封装成任务放入到biz eventLooptaskQueue中的,然后也直接返回了;
  • 这样,biz eventLoop线程执行器taskQueue中就有两个任务,会按照顺序依次执行:先执行channelRead()调用,再执行channelReadComplete()调用;
  • 执行a3、b3时同理;

总结

从上面可以看出,Pipelinehandler可以在不同线程间切换得到关键是:taskQueue。还要一点非常重要:handler线程池执行器默认使用的channel注册的NioEventLoop这个,NioEventLoop采用的是单线程工作模式,同时还需要处理selector.select()事件轮询,所以,handler里肯定不能有耗时、特别是IO阻塞等操作,不然卡在handler中,selector#select()执行不到,无法及时接收到客户端传送过来的数据。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-04-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Reactor2020 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 读流程
  • 事件传播
  • Pipeline线程模型
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档