前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >02-Reactor线程模型-(单线程)

02-Reactor线程模型-(单线程)

原创
作者头像
spbreak
修改2023-10-21 14:30:14
1840
修改2023-10-21 14:30:14
举报
文章被收录于专栏:netty

Reactor线程模型-反应器线程模型

  • 网络IO设计中的高性能模型
  • 事件驱动(IO的读/写/接受....)
  • ACCEPT与READ等功能不一的IO事件分离, 交由不同角色处理

Reactor的角色

  • Reactor : 反应器负责注册事件等待与分发(IO多路复用), 解决因传统IO等待而出现的性能等问题
  • Acceptor : 接收器负责首次接收accept事件的处理, 并注册新事件给Reactor, 给Reactor增加需要等待与分发的事件.
  • Handler : 处理器负责实际业务的处理, 承接Reactor分发的事件的下一发加工动作
Rector线程模型
Rector线程模型
代码语言:javascript
复制
// Reactor 反应器
// - 由Selector IO多路复用选择器提供事件注册与捕获
// - 被捕获事件进行统一处理分发给下游处理
public class DefaultReactor implements Reactor {
    
    private final static int PORT = 8080;
    
    private final Selector selector;
    private final Server server;

    public DefaultReactor() throws IOException {
        this(PORT);
    }
    
    public DefaultReactor(int port) throws IOException {
        selector = Selector.open();//IO多路复用
        server = new Server(selector, port);
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                // 获取发生的事件(阻塞)
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterable = selectionKeys.iterator();
                while (iterable.hasNext()) {
                    // 对事件进行分发
                    dispatch(iterable.next());
                    iterable.remove();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            LockSupport.parkNanos(1000 * 1000 * 1000);
        }
    }

    private void dispatch(SelectionKey selectionKey) {
        // 获取事件的附加器
        // ACCEPT 事件的附加器是 Acceptor, 故由 Acceptor 来处理 ACCEPT 事件
        // READ 事件的附加器是 Handler, 故由 Handler 来处理 READ 事件
        Object attachment = selectionKey.attachment();
        if (attachment instanceof Acceptor) {
            ((Acceptor) attachment).run();
            return;
        }
        if (attachment instanceof Handler) {
            ((Handler) attachment).run();
            return;
        }
    }
}
代码语言:javascript
复制
// Acceptor接收器
// - 类似于ServerSocketChannel服务器
// - 专门处理ACCEPT首次访问的IO事件
// - 每次有ACCEPT访问时, 就创建新IO Channel(SocketChannel)注册到Reactor反应器的Selector中, 等待捕获
public class Server implements Acceptor{
    
    private final ServerSocketChannel serverSocketChannel;
    private final Selector selector;
    
    public Server(Selector selector, int port) throws IOException {
        this.selector = selector;
        // 服务端创建 listen-socket 管道
        this.serverSocketChannel = ServerSocketChannel.open();
        // 绑定端口
        this.serverSocketChannel.socket().bind(new InetSocketAddress(port));
        // 设置为非阻塞模式
        this.serverSocketChannel.configureBlocking(false);
        // ACCEPT 事件的附加器是 Acceptor
        this.serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, this);
    }

    @Override
    public void run() {
        try {
            // 处理ACCEPT事件
            // 为连接的客户端创建 client-socket 管道
            SocketChannel clientSocketChannel = serverSocketChannel.accept();
            // 设置为非阻塞
            clientSocketChannel.configureBlocking(false);
            // READ 事件的附加器是 Handler
            clientSocketChannel.register(selector, SelectionKey.OP_READ,
                    new DefaultHandler(clientSocketChannel));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
代码语言:javascript
复制
// Handler 处理器
// - 由SocketChannel实现
// - 处理客户端发送过来的真正业务内容
public class DefaultHandler implements Handler {
    private final SocketChannel clientSocketChannel;

    public DefaultHandler(SocketChannel clientSocketChannel) {
        this.clientSocketChannel = clientSocketChannel;
    }


    @Override
    public void run() {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        try {
            // 读取数据
            int read = clientSocketChannel.read(byteBuffer);
            if (read <= 0) {
                clientSocketChannel.close();
            } else {
                System.out.println("----" + new String(byteBuffer.array()) + "----");
                // 响应结果 200, 模拟请求响应
                String response = "HTTP/1.1 200 OK\r\n" +
                        "Content-Length: 11\r\n\r\n" +
                        "Yes, He is";
                ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());// 数据存放在byte数组
                while (buffer.hasRemaining()) {
                    // hasRemaining() 返回是否有剩余的可用长度
                    clientSocketChannel.write(buffer); // 非阻塞
                }
            }
        } catch (IOException e1) {
            try {
                clientSocketChannel.close();
            } catch (IOException e2) {
                e2.printStackTrace();
            }
            e1.printStackTrace();
        }
    }
}

GITHUB源码

https://github.com/spbreak/i-netty/tree/master/02-reactor

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Reactor线程模型-反应器线程模型
  • Reactor的角色
  • GITHUB源码
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档