前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Netty入门之可写事件以及多线程版的通信

Netty入门之可写事件以及多线程版的通信

作者头像
@派大星
发布2023-07-15 13:37:35
发布2023-07-15 13:37:35
22000
代码可运行
举报
文章被收录于专栏:码上遇见你码上遇见你
运行总次数:0
代码可运行

往期文章简单讲解了Netty入门基础篇的相关基本概念:

本次主要讲解如何处理ByteBuffer的可写事件.

先上代码:

  • Server
代码语言:javascript
代码运行次数:0
运行
复制
public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        Selector selector = Selector.open();
        // 注册并绑定accept事件
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));

        while(true){
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                // 这里记得要移除key
                iter.remove();
                if(key.isAcceptable()) {
                    // 因为serverSocketChannel的key只有一个。所以这里简写了。直接
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);

                    StringBuilder sb  = new StringBuilder();
                    for(int i=0;i<=3000000; i++){
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    while (buffer.hasRemaining()){
                        // 这里不能保证一次全部写入进去 返回实际写入的字节数
                        int write = sc.write(buffer);
                        System.out.println(write);
                    }
                }
            }

        }
    }
  • Client
代码语言:javascript
代码运行次数:0
运行
复制
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",8080));
        int count = 0;

        while(true){
            // 接收数据
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            count += sc.read(buffer);
            System.out.println(count);
            buffer.clear();

        }
    }

上述代码的运行结果如图所示:

  • server
  • client

通过上述结果我们不难发现这个server端发送数据的时候并不是一次全部发送出去的,他尝试了很多次,效率很低, 并且有的时候Buffer是满的( server端打印0的时候,它是无法写的)他也无法发送,这样其实无法满足非阻塞模式的,接下来进行一个优化: 当buffer满的时候,我去进行别的操作,当buffer清空了触发一个写事件 上代码:

  • server(就是对上述代码进行了优化)
代码语言:javascript
代码运行次数:0
运行
复制
public class WriteServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);

        Selector selector = Selector.open();
        // 注册并绑定accept事件
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));

        while(true){
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                // 这里记得要移除key
                iter.remove();
                if(key.isAcceptable()) {
                    // 因为serverSocketChannel的key只有一个。所以这里简写了。直接
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector,0,null);
                    sckey.interestOps(SelectionKey.OP_READ);
                    StringBuilder sb  = new StringBuilder();
                    for(int i=0;i<=3000000; i++){
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());



                    // 判断是否有剩余内容
                    if ( buffer.hasRemaining()){
                        // 关注可写事件  这里需要注意以下,避免替换掉之前关注的 可读事件
                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE );
//                        sckey.interestOps(sckey.interestOps() | SelectionKey.OP_WRITE );
                        // 把未写完的数据挂到sckey上  通过附件的方式
                        sckey.attach(buffer);

                    }
                }else if (key.isWritable()){
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel sc = (SocketChannel) key.channel();
                    int write = sc.write(buffer);
                    System.out.println(write);
                    // 清理操作
                    if (!buffer.hasRemaining()){
                        // 清除buffer
                        key.attach(null);
                        // 不需要关注可写事件
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                    }
                }
            }

        }
    }
}

主要就是利用附件的特性和关注可写事件

关于可读事件就讲这些,接下来给大家说一下如何利用多线程来进行优化通信,充分利用多核CPU

如图所示:

说明
  • 黄色框框代表客户端
  • Boss建立连接 accept事件
  • worker 关注读写事件
单个worker 版本
  • server
代码语言:javascript
代码运行次数:0
运行
复制
@Slf4j
public class MultiThreadServer {
    public static void main(String[] args) throws IOException {
        Thread.currentThread().setName("Boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();
        SelectionKey bossKey = ssc.register(boss, 0, null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));
        //1. 创建固定数量的worker 并初始化
        Worker worker = new Worker("worker-0");
        while (true){
            boss.select();
            Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()){
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("connected...{}",sc.getRemoteAddress());
                    //2. 关联selector
                    // 初始化selector 启动worker-0
                    log.debug("before register...{}",sc.getRemoteAddress());
                    worker.register(sc);
                    log.debug("after register...{}",sc.getRemoteAddress());

                }
            }
        }
    }
    static class Worker implements Runnable{
        private Thread thread;
        private Selector selector;
        private String name;
        private volatile boolean start = false; // 还未初始化
        private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
        public Worker(String name){
            this.name = name;
        }
        // 初始化线程 和selector
        public void register(SocketChannel sc) throws IOException {
            if (!start){
                selector = Selector.open();
                thread = new Thread(this,name);
                thread.start();
                start =true;
            }
            // 此时这里还是boss线程执行的  因为run方法才是worker-0线程 可以利用消息队列 ConcurrentLinkedDeque
//            sc.register(selector,SelectionKey.OP_READ,null);
            // 像队列添加任务 但是这个任务并没有立即执行  我们在worker-0线程中取出来执行
            queue.add(()->{
                try {
                    sc.register(selector,SelectionKey.OP_READ,null);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
            selector.wakeup();// 唤醒selector
        }

        @Override
        public void run() {
            while (true){
                try {
                    selector.select();  // worker-0 阻塞 wakeup
                    //  取出来的可能为空
                    Runnable task = queue.poll();
                    if (task !=null) {
                        task.run();// 执行了   sc.register(selector,SelectionKey.OP_READ,null);
                    }
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()){
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel) key.channel();
                            log.debug("read ....{}",channel.getLocalAddress());
                            // 这里有很多细节 具体的我不在这里进行赘述,往期文章有写  比如这里的buffer可能会出现黏包半包
                            // 客户端异常断开 检测异常  还有写的数据量过多 等等问题  往期文章都有写这里简单写一下关于多线程的逻辑
                            channel.read(buffer);
                            buffer.flip();
                            debugAll(buffer);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // worker.register();log.debug("before register...{}",sc.getLocalAddress()); 启动worker-0
    // sc.register(worker.selector,SelectionKey.OP_READ,null);

    //*这两个代码是运行在两个线程中  当有客户端连接的时候它会阻塞住  这里如何解决? 方法有很多,  接下来模仿Netty的
    // 让register 也运行在worker-0线程中*/
}
  • Client
代码语言:javascript
代码运行次数:0
运行
复制
public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",8080));
        sc.write(Charset.defaultCharset().encode("1234567890abcdef"));
        System.in.read();
//        System.out.println("waiting.......");
    }

}
  • 注意:

这里采用了队列的方式

多个worker版本

主要修改的地方

  • 完整代码
代码语言:javascript
代码运行次数:0
运行
复制
public class MultiThreadServer {
    public static void main(String[] args) throws IOException {
        Thread.currentThread().setName("Boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        Selector boss = Selector.open();
        SelectionKey bossKey = ssc.register(boss, 0, null);
        bossKey.interestOps(SelectionKey.OP_ACCEPT);
        ssc.bind(new InetSocketAddress(8080));
        //1. 创建固定数量的worker 并初始化
        Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker("worker-" + i);
        }
        // 计时器
        AtomicInteger index = new AtomicInteger();
        while (true){
            boss.select();
            Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()){
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    log.debug("connected...{}",sc.getRemoteAddress());
                    //2. 关联selector
                    // 初始化selector 启动worker-0
                    log.debug("before register...{}",sc.getRemoteAddress());
                    // round robin 负载均衡算法
                    workers[index.getAndIncrement() % workers.length].register(sc);
                    log.debug("after register...{}",sc.getRemoteAddress());

                }
            }
        }
    }
    static class Worker implements Runnable{
        private Thread thread;
        private Selector selector;
        private String name;
        private volatile boolean start = false; // 还未初始化
        private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>();
        public Worker(String name){
            this.name = name;
        }
        // 初始化线程 和selector
        public void register(SocketChannel sc) throws IOException {
            if (!start){
                selector = Selector.open();
                thread = new Thread(this,name);
                thread.start();
                start =true;
            }
            queue.add(()->{
                try {
                    sc.register(selector,SelectionKey.OP_READ,null);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
            selector.wakeup();// 唤醒selector
        }

        @Override
        public void run() {
            while (true){
                try {
                    selector.select();  // worker-0 阻塞 wakeup
                    //  取出来的可能为空
                    Runnable task = queue.poll();
                    if (task !=null) {
                        task.run();// 执行了   sc.register(selector,SelectionKey.OP_READ,null);
                    }
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()){
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel) key.channel();
                            log.debug("read ....{}",channel.getLocalAddress());
                            // 这里有很多细节 具体的我不在这里进行赘述,往期文章有写  比如这里的buffer可能会出现黏包半包
                            // 客户端异常断开 检测异常  还有写的数据量过多 等等问题  往期文章都有写这里简单写一下关于多线程的逻辑
                            channel.read(buffer);
                            buffer.flip();
                            debugAll(buffer);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

多个worker版本的个数获取Runtime.getRuntime().availableProcessors() 这里需要注意

  • Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数
  • 这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

关于多线程版的通信讲到这里就告一段落了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码上遇见你 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 说明
  • 单个worker 版本
  • 多个worker版本
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档