前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java 的 NIO 是如何工作的?

Java 的 NIO 是如何工作的?

作者头像
水货程序员
修改2018-11-16 09:59:07
1.6K0
修改2018-11-16 09:59:07
举报
文章被收录于专栏:javathings

在这个数据爆炸的时代,有大量的数据在系统中流动,一个应用系统的瓶颈往往都是 IO 瓶颈。传统的 javaIO 模型是 BIO,也就是同步阻塞 IO,数据在写入 OutputStream 或者从 InputStream 读取时,如果没有数据没有读到或写完,线程都会被阻塞,处于等待状态,直到数据读取完成或写入完成。而在网络编程中,每一个客户端连接发出后,服务端都会有一个对应线程来处理请求,服务器线程与并发数成 1:1 关系,然而一个服务器所能处理的线程是有限的,处理高并发时就会有问题。

NIO 是一种非阻塞同步 IO,它是一种 Reactor 模式的编程模型,简单来讲,就是当服务端有多个连接接入时,并不为每个连接单独创建线程,而是创建一个 Reactor 线程,用多路复用器来不断的轮询每一个接入的连接,找出有数据需要读写的连接进行后续操作,从而具有处理多个连接的能力。

java 原生的 NIO 实现有很多类和组件,但其核心组件有三个,其他的都是一些相关的工具类:

  • Channel    与 BIO 中的流不同,NIO 用 Chananl 来抽象数据通道,数据通过 Channel 来读取和写入,从 Channle 的类图来看,通道分为两大类:用于网络读写的 SelectableChannel 和用于文件读写的 FileChannel
  • Buffer     在 NIO 中,数据与 Channel 之间的交互是通过 buffer 来进行的,数据读写先经过 buffer 再进入通道
  • Selector   多路复用器 Selector 是 NIO 的基础。Selector 会不断轮询注册在其上面的 Channel,如果某个 Channel 上发生读或写事件,这个 Channel 就处于就绪状态,会被 Selector 轮询出来,然后通过 SelectionKey 可以获取就绪 Channel 集合,让后异步的将 Channel 的数据读入缓冲区

下面是一个简单 NIO 服务器,用来演示 NIO 的编程模型

代码语言:javascript
复制
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
 
import javax.swing.text.html.HTMLDocument.Iterator;
 
/**
* Simple echo-back server which listens for incoming stream connections and
* echoes back whatever it reads. A single Selector object is used to listen to
* the server socket (to accept new connections) and all the active socket
* channels.
* @author zale (zalezone.cn)
*/
public class SelectSockets {
    public static int PORT_NUMBER = 1234;
    public static void main(String[] argv) throws Exception 
    {
        new SelectSockets().go(argv);
    }
    public void go(String[] argv) throws Exception 
    {
        int port = PORT_NUMBER;
        if (argv.length > 0) 
        { // 覆盖默认的监听端口
            port = Integer.parseInt(argv[0]);
        }
        System.out.println("Listening on port " + port);
        ServerSocketChannel serverChannel = ServerSocketChannel.open();// 打开一个未绑定的serversocketchannel
        ServerSocket serverSocket = serverChannel.socket();// 得到一个ServerSocket去和它绑定 
        Selector selector = Selector.open();// 创建一个Selector供下面使用
        serverSocket.bind(new InetSocketAddress(port));//设置server channel将会监听的端口
        serverChannel.configureBlocking(false);//设置非阻塞模式
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);//将ServerSocketChannel注册到Selector
        while (true) 
        {
            // This may block for a long time. Upon returning, the
            // selected set contains keys of the ready channels.
            int n = selector.select();
            if (n == 0) 
            {
                continue; // nothing to do
            }           
            java.util.Iterator<SelectionKey> it = selector.selectedKeys().iterator();// Get an iterator over the set of selected keys
            //在被选择的set中遍历全部的key
            while (it.hasNext()) 
            {
                SelectionKey key = (SelectionKey) it.next();
                // 判断是否是一个连接到来
                if (key.isAcceptable()) 
                {
                    ServerSocketChannel server =(ServerSocketChannel) key.channel();
                    SocketChannel channel = server.accept();
                    registerChannel(selector, channel,SelectionKey.OP_READ);//注册读事件
                    sayHello(channel);//对连接进行处理
                }
                //判断这个channel上是否有数据要读
                if (key.isReadable()) 
                {
                    readDataFromSocket(key);
                }
                //从selected set中移除这个key,因为它已经被处理过了
                it.remove();
            }
        }
    }
    // ----------------------------------------------------------
    /**
    * Register the given channel with the given selector for the given
    * operations of interest
    */
    protected void registerChannel(Selector selector,SelectableChannel channel, int ops) throws Exception
    {
        if (channel == null) 
        {
            return; // 可能会发生
        }
        // 设置通道为非阻塞
        channel.configureBlocking(false);
        // 将通道注册到选择器上
        channel.register(selector, ops);
    }
    // ----------------------------------------------------------
    // Use the same byte buffer for all channels. A single thread is
    // servicing all the channels, so no danger of concurrent acccess.
    //对所有的通道使用相同的缓冲区。单线程为所有的通道进行服务,所以并发访问没有风险
    private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    /**
    * Sample data handler method for a channel with data ready to read.
    * 对于一个准备读入数据的通道的简单的数据处理方法
    * @param key
    *
    A SelectionKey object associated with a channel determined by
    the selector to be ready for reading. If the channel returns
    an EOF condition, it is closed here, which automatically
    invalidates the associated key. The selector will then
    de-register the channel on the next select call.
 
    一个选择器决定了和通道关联的SelectionKey object是准备读状态。如果通道返回EOF,通道将被关闭。
    并且会自动使相关的key失效,选择器然后会在下一次的select call时取消掉通道的注册
    */
    protected void readDataFromSocket(SelectionKey key) throws Exception 
    {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        int count;
        buffer.clear(); // 清空Buffer
        // Loop while data is available; channel is nonblocking
        //当可以读到数据时一直循环,通道为非阻塞
        while ((count = socketChannel.read(buffer)) > 0) 
        {
            buffer.flip(); // 将缓冲区置为可读
            // Send the data; don't assume it goes all at once
            //发送数据,不要期望能一次将数据发送完
            while (buffer.hasRemaining()) 
            {
                socketChannel.write(buffer);
            }
            // WARNING: the above loop is evil. Because
            // it's writing back to the same nonblocking
            // channel it read the data from, this code can
            // potentially spin in a busy loop. In real life
            // you'd do something more useful than this.
            //这里的循环是无意义的,具体按实际情况而定
            buffer.clear(); // Empty buffer
        }
        if (count < 0) 
        {
            // Close channel on EOF, invalidates the key
            //读取结束后关闭通道,使key失效
            socketChannel.close();
        }
    }
    // ----------------------------------------------------------
    /**
    * Spew a greeting to the incoming client connection.
    *
    * @param channel
    *
    The newly connected SocketChannel to say hello to.
    */
    private void sayHello(SocketChannel channel) throws Exception 
    {
        buffer.clear();
        buffer.put("Hi there!\r\n".getBytes());
        buffer.flip();
        channel.write(buffer);
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档