前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty之bossGroup接收请求转给workGroup

Netty之bossGroup接收请求转给workGroup

作者头像
克虏伯
发布2022-09-23 11:18:18
5180
发布2022-09-23 11:18:18
举报
文章被收录于专栏:软件开发-青出于蓝

    bossGroup和workGroup是分开的,bossGroup负责accept请求,而workGroup负责read/write事件,bossGroup accept之后转交给workGroup具体是怎么实现的呢。

BossGroup

    NioEventLoop#processSelectedKey()方法中如下List-1,当bossGroup收到accept事件后,调用unsafe.read()

List-1

代码语言:javascript
复制
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

    AbstractNioMessageChannel.NioMessageUnsafe.read(),如下List-2,1处获取JDK channel,2处pipeline.fireChannelRead(readBuf.get(i))会调用ServerBootstrapAcceptor.channelRead()。

List-2

代码语言:javascript
复制
private final class NioMessageUnsafe extends AbstractNioUnsafe {

    private final List<Object> readBuf = new ArrayList<Object>();

    @Override
    public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);

        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    //1
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }

            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                //2
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

    NioServerSocketChannel.doReadMessages()如下获取JDK channel,之后封装后NioSocketChannel,放入到buf中

List-3 

代码语言:javascript
复制
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

    bossGroup的pipeline,是head->ServerBootstrapAcceptor->tail,ServerBootstrapAcceptor.channelRead()中,childGroup.register(child),这里的child是netty封装后的JDK channel,转交给childGroup,如下List-5

List-4

代码语言:javascript
复制
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    for (Entry<ChannelOption<?>, Object> e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: " + e);
            }
        } catch (Throwable t) {
            logger.warn("Failed to set a channel option: " + child, t);
        }
    }

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

WorkGroup

    MultithreadEventLoopGroup,如下所示的方法中,next()从众多的child group中选出一个来处理这个新的channel,默认是轮询。

List-5

代码语言:javascript
复制
@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

    List-5中register()只是在这个channel上注册read事件监听,待这个channel上有read事件,即有数据可读时就会进行read。

    List-6中bootstrap设置了childHandler,initializer里对pipeline添加了channelHandler,workGroup从channel中读取数据后,会顺着channel handdler链表进行处理,也就是我们设置的这些channel handler会收到对应的数据。

List-6

代码语言:javascript
复制
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
            ...
            .childHandler(new ServerInitializer(...));

...

public class TrpcServerInitializer extends ChannelInitializer<SocketChannel> {
    private Map<String, Object> handlerMap;
    private List<TrpcFilter> filters;

    public TrpcServerInitializer(Map<String, Object> handlerMap, List<TrpcFilter> filters) {
        this.handlerMap = handlerMap;
        this.filters = filters;
    }

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new IdleStateHandler(30, 30, PingPongRequest.BEAT_INTERVAL, TimeUnit.SECONDS));
        pipeline.addLast(new TrpcDecoder(TrpcRequest.class));
        pipeline.addLast(new TrpcEncoder(TrpcResponse.class));
        pipeline.addLast(new TrpcServerHandler(handlerMap, filters));
    }

    workGroup从bossGroup获取channel,之后注册read事件,之后如List-1,如果有read事件,则会调用List-2中的方法,1处从JDK channel中获取byte数据写入ByteBBuf,2处pipeline.fireChannelRead(),将读取的Byte数据转交给我们自定义的channelHandler,比如ByteToMessageDecoder中就可以进行反序列化byte,之后的业务ChannelHandler才能处理业务请求

List-7

代码语言:javascript
复制
protected class NioByteUnsafe extends AbstractNioUnsafe {
...

    @Override
    public final void read() {
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator);
                //1
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }

                allocHandle.incMessagesRead(1);
                readPending = false;
                //2
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-08-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • BossGroup
  • WorkGroup
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档