老铁不管用过hfs,还是dubbo等等RPC框架,对nio,bio,aio多熟悉,咱们一起以初学者的心态,一起学习下netty。大家都知道netty是基于nio,为什么会有nio,是之前的bio有写问题无法解决,所以出现了nio,nio也有自身的问题,例如:堵塞。源码:https://github.com/limingios/netFuture/tree/master
传统的BIO方式进行通信堵塞
1.serversocket 就是建立服务端绑定端口。 2.serverSocket.accept()建立一个accept线程,一直等待客户端的连接事件,连接之后他才会返回,所以说这是阻塞的。
Socket socket = serverSocket.appept(); in data = is.read(b); 2个阻塞点,也就是在同一个时间只能为一个客户服务,如果是你餐厅的老板,在同一个时间你只能为一个就餐的人服务,你这个餐厅是不是开不下去。如何改进,就是请多个服务生为客户服务。 传统的BIO方式进行解决堵塞
每次来一个请求开辟一个线程,互相不影响,主线程一直是空闲的。也就是就餐的人来一个就让一个服务员专门为你服务,是不是海底捞的感觉。
早期互联网还不太发达的时候可以这么搞,现在都是互联网大爆炸的时候,如果成千上万个,这时候你需要new thread 成千上万个,线程其实是系统的资源,而且是稀有的资源,请求client一直增长,线程也就一直的增长,此时我们操作系统的资源越来越紧张,最后server端无法使用。这跟餐厅是一样的,一个餐厅成百上千个人来吃饭,你就请成白上千个人,最后餐厅就倒闭了。其实很多的程序设计跟现实生活息息相关。小应用其实没问题,多用户肯定不能这样。这时候肯定有老铁说可以用线程池啊。 多线程伪异步IO
并发编程jdk1.5之后。ExecutorService可以new一个newCachedTheadPool,首先它是一个线程池,它是缓存,无限大是它的特点,既然是无限大,可以回收线程,如果此时有100个请求,也就是有1000线程在哪里,底层依然是new Thread。底层还是m:m的概念。m:m = 请求:线程。最重要点就是复用。
虽然可以复用,但是还是解决不了m个请求n个线程的问题。需要m要大于n。如果设置 ExecutorService threadPool = Excutors.newCachedThreadPool(100); 本身这个机制是队列,如果1000个请求来了,需要排队,也就是100个人服务1000个人。所以就是伪异步IO。 底层还是堵塞的。
在Java1.4之前的I/O系统中,提供的都是面向流的I/O系统,系统一次一个字节地处理数据,一个输入流产生一个字节的数据,一个输出流消费一个字节的数据,面向流的I/O速度非常慢,而在Java 1.4中推出了NIO,这是一个面向块的I/O系统,系统以块的方式处理处理,每一个操作在一步中产生或者消费一个数据库,按块处理要比按字节处理数据快的多。在NIO中有几个核心对象需要掌握:缓冲区(Buffer)、通道(Channel)、选择器(Selector)。
import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;
/** * nio */public class NioSocketDemo { // 通道管理器(选择器),多个用户共用的,所以需要放到这里 private Selector selector;
/** * 初始化服务端ServerSocketChannel通道,并初始化选择器 * 获得一个ServerSocket通道,并对该通道做一些初始化的工作 */ public void initServer(int port) throws IOException { // 获取ServerSocket通道 , 相对于传统的ServerSocket ServerSocketChannel serverChannel = ServerSocketChannel.open(); // 设置通道为非阻塞 serverChannel.configureBlocking(false); // 将该通道对应的ServerSocket绑定到port端口 serverChannel.socket().bind(new InetSocketAddress(port)); // 获得一个通道选择器(管理器) this.selector = Selector.open(); // 将通道选择器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后, // 当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。 // 意思是大门交给selector看着,给我监听是否有accpet事件 serverChannel.register(this.selector, SelectionKey.OP_ACCEPT); System.out.println("服务端启动成功..."); /* ***SelectionKey中定义的4中事件 *** OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了 OP_CONNECT —— 连接就绪事件,表示客户与服务器的连接已经建立成功 OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了) OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作) */
}
/** * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理 */ public void listenSelector() throws IOException { // 轮询访问selector while (true) { // 当注册的事件到达时,方法返回;否则,该方法会一直阻塞 // 多路复用 Reactor模型 this.selector.select(); // 无论是否有读写事件发生,selector每隔1s被唤醒一次 //this.selector.select(1000); //this.selector.selectNow(); // 获得selector中选中的项的迭代器,选中的项为注册的事件 Iterator<?> iteratorKey = this.selector.selectedKeys().iterator(); while (iteratorKey.hasNext()) { SelectionKey selectionKey = (SelectionKey) iteratorKey.next(); // 删除已选的key,以防重复处理 iteratorKey.remove();
new Thread(new Runnable() {
@Override public void run() { // 处理请求 try { handler(selectionKey); } catch (IOException e) { e.printStackTrace(); }
} });
} } }
/** * 处理请求 */ public void handler(SelectionKey selectionKey) throws IOException { if (selectionKey.isAcceptable()) {//处理客户端连接请求事件 System.out.println("新的客户端连接..."); ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); // 获得和客户端连接的通道 // 完成该操作意味着完成TCP三次握手,TCP物理链路正式建立 SocketChannel channel = server.accept(); // 设置成非阻塞 channel.configureBlocking(false); // 在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。 channel.register(this.selector, SelectionKey.OP_READ); } else if (selectionKey.isReadable()) {// 处理读的事件 // 服务器可读取消息:得到事件发生的Socket通道 SocketChannel channel = (SocketChannel) selectionKey.channel(); // 创建读取的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024);//1kb int readData = channel.read(buffer); if(readData > 0){ String msg = new String(buffer.array(),"GBK").trim();// 先讲缓冲区数据转化成byte数组,再转化成String System.out.println("服务端收到信息:" + msg);
//回写数据 ByteBuffer writeBackBuffer = ByteBuffer.wrap("receive data".getBytes("GBK")); channel.write(writeBackBuffer);// 将消息回送给客户端 }else{ System.out.println("客户端关闭咯..."); //SelectionKey对象会失效,这意味着Selector再也不会监控与它相关的事件 selectionKey.cancel(); } } } /** * 启动服务端测试 */ public static void main(String[] args) throws IOException { NioSocketDemo server = new NioSocketDemo(); // 初始化服务端 server.initServer(8888); // 服务器端监听Selector事件 server.listenSelector(); }}
PS:NIO不需要的代码里面根本没有多线程,实际上nio只有一个工作线程,一个线程可以为多个客人服务。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有