前言
抱歉好久没更原创文章了,看了下上篇更新时间,已经拖更一个多月了。
这段时间也一直在学习Netty
相关知识,因为涉及知识点比较多,也走了不少弯路。目前网上关于Netty学习资料玲琅满目,不知如何下手,其实大家都是一样的,学习方法和技巧都是总结出来的,我们在没有找到很好的方法之前不如按部就班先从基础开始,一般从总分总的渐进方式,既观森林,又见草木。
之前恰巧跟杭州一个朋友小飞也提到过,两者在这方面的初衷是一致的,也希望更多的朋友能够加入一起学习和探讨。(PS:本篇文章是和小飞一起学习整理所得~)
Netty
是一款提供异步的、事件驱动的网络应用程序框架和工具,是基于NIO
客户端、服务器端的编程框架。所以这里我们先以NIO
和依赖相关的基础铺垫来进行剖析讲解,从而作为Netty
学习之旅的一个开端。
Socket
本身有“插座”的意思,不是Java中特有的概念,而是一个语言无关的标准,任何可以实现网络编程的编程语言都有Socket
。在Linux
环境下,用于表示进程间网络通信的特殊文件类型,其本质为内核借助缓冲区形成的伪文件。既然是文件,那么理所当然的,我们可以使用文件描述符引用套接字。
与管道类似的,Linux
系统将其封装成文件的目的是为了统一接口,使得读写套接字和读写文件的操作一致。区别是管道主要应用于本地进程间通信,而套接字多应用于网络进程间数据的传递。
可以这么理解:Socket
就是网络上的两个应用程序通过一个双向通信连接实现数据交换的编程接口API。
Socket
通信的基本流程具体步骤如下所示:
(1)服务端通过Listen
开启监听,等待客户端接入。
(2)客户端的套接字通过Connect
连接服务器端的套接字,服务端通过Accept
接收客户端连接。在connect-accept
过程中,操作系统将会进行三次握手。
(3)客户端和服务端通过write
和read
发送和接收数据,操作系统将会完成TCP
数据的确认、重发等步骤。
(4)通过close
关闭连接,操作系统会进行四次挥手。
针对Java编程语言,java.net
包是网络编程的基础类库。其中ServerSocket
和Socket
是网络编程的基础类型。
SeverSocket
是服务端应用类型。Socket
是建立连接的类型。当连接建立成功后,服务器和客户端都会有一个Socket
对象示例,可以通过这个Socket
对象示例,完成会话的所有操作。对于一个完整的网络连接来说,Socket
是平等的,没有服务器客户端分级情况。
对于一次IO操作,数据会先拷贝到内核空间中,然后再从内核空间拷贝到用户空间中,所以一次read
操作,会经历两个阶段:
(1)等待数据准备
(2)数据从内核空间拷贝到用户空间
基于以上两个阶段就产生了五种不同的IO模式。
这五种IO模式不难发现存在这两对关系:同步和异步、阻塞和非阻塞。那么稍微解释一下:
同步和异步的区别最大在于异步的话调用者不需要等待处理结果,被调用者会通过回调等机制来通知调用者其返回结果。
阻塞和非阻塞是针对进程在访问数据的时候,根据IO操作的就绪状态来采取的不同方式,说白了是一种读取或者写入操作方法的实现方式,阻塞方式下读取或者写入函数将一直等待,而非阻塞方式下,读取或者写入方法会立即返回一个状态值。
如果组合后的同步阻塞(blocking-IO
)简称BIO
、同步非阻塞(non-blocking-IO
)简称NIO
和异步非阻塞(asynchronous-non-blocking-IO
)简称AIO
又代表什么意思呢?
java
中的 BIO
、NIO
和AIO
理解为是 Java 语言
在操作系统层面对这三种 IO
模型的封装。程序员在使用这些 封装API 的时候,不需要关心操作系统层面的知识,也不需要根据不同操作系统编写不同的代码,只需要使用Java
的API就可以了。由此,为了使读者对这三种模型有个比较具体和递推式的了解,并且和本文主题NIO
有个清晰的对比,下面继续延伸。
BIO
编程方式通常是是Java的上古产品,自JDK 1.0-JDK1.4就有的东西。编程实现过程为:首先在服务端启动一个ServerSocket
来监听网络请求,客户端启动Socket
发起网络请求,默认情况下SeverSocket
会建立一个线程来处理此请求,如果服务端没有线程可用,客户端则会阻塞等待或遭到拒绝。服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理。大致结构如下:
如果要让 BIO
通信模型能够同时处理多个客户端请求,就必须使用多线程(主要原因是 socket.accept()
、socket.read()
、 socket.write()
涉及的三个主要函数都是同步阻塞的),也就是说它在接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的 一请求一应答通信模型 。我们可以设想一下如果这个连接不做任何事情的话就会造成不必要的线程开销,不过可以通过线程池机制改善,线程池还可以让线程的创建和回收成本相对较低。使用线程池机制改善后的 BIO
模型图如下:
BIO
方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,是JDK1.4以前的唯一选择,但程序直观简单易懂。Java BIO
编程示例网上很多,这里就不进行coding举例了,毕竟后面NIO
才是重点。
NIO
(New IO或者No-Blocking IO),从JDK1.4 开始引入的非阻塞IO
,是一种非阻塞
+ 同步
的通信模式。这里的No Blocking IO
用于区分上面的BIO
。
NIO
本身想解决 BIO
的并发问题,通过Reactor模式
的事件驱动机制来达到Non Blocking
的。当 socket
有流可读或可写入 socket
时,操作系统会相应的通知应用程序进行处理,应用再将流读取到缓冲区或写入操作系统。
也就是说,这个时候,已经不是一个连接就 要对应一个处理线程了,而是有效的请求,对应一个线程,当连接没有数据时,是没有工作线程来处理的。
当一个连接创建后,不需要对应一个线程,这个连接会被注册到 多路复用器
上面,所以所有的连接只需要一个线程就可以搞定,当这个线程中的多路复用器
进行轮询的时候,发现连接上有请求的话,才开启一个线程进行处理,也就是一个请求一个线程模式。
NIO
提供了与传统BIO模型中的Socket
和ServerSocket
相对应的SocketChannel
和ServerSocketChannel
两种不同的套接字通道实现,如下图结构所示。这里涉及的Reactor
设计模式、多路复用Selector
、Buffer
等暂时不用管,后面会讲到。
NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局 限于应用中,编程复杂,JDK1.4 开始支持。同时,NIO
和普通IO的区别主要可以从存储数据的载体、是否阻塞等来区分:
与 NIO
不同,当进行读写操作时,只须直接调用 API 的 read
或 write
方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入 read
方 法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将 write
方法传递的流写入完毕时,操作系统主动通知应用程序。即可以理解为,read/write
方法都是异步的,完成后会主动调用回调函数。在 JDK7
中,提供了异步文件通道和异步套接字通道的实现,这部分内容被称作 NIO
.
AIO
方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS
参与并发操作,编程比较复杂,JDK7
开始支持。
目前来说 AIO
的应用还不是很广泛,Netty
之前也尝试使用过 AIO
,不过又放弃了。
在NIO
中,基本所有的IO操作都是从Channel
开始的,Channel
通过Buffer(缓冲区)
进行读写操作。
read()
表示读取通道中数据到缓冲区,write()
表示把缓冲区数据写入到通道。
Channel
有好多实现类,这里有三个最常用:
SocketChannel
:一个客户端发起TCP连接的ChannelServerSocketChannel
:一个服务端监听新连接的TCP Channel,对于每一个新的Client连接,都会建立一个对应的SocketChannelFileChannel
:从文件中读写数据其中SocketChannel
和ServerSocketChannel
是网络编程中最常用的,一会在最后的示例代码中会有讲解到具体用法。
Buffer
也被成为内存缓冲区,本质上就是内存中的一块,我们可以将数据写入这块内存,之后从这块内存中读取数据。也可以将这块内存封装成NIO Buffer
对象,并提供一组常用的方法,方便我们对该块内存进行读写操作。
Buffer
在java.nio
中被定义为抽象类:
我们可以将Buffer
理解为一个数组的封装,我们最常用的ByteBuffer
对应的数据结构就是byte[]
Buffer
中有4个非常重要的属性:capacity、limit、position、mark
capacity
属性:容量,Buffer能够容纳的数据元素的最大值,在Buffer初始化创建的时候被赋值,而且不能被修改。上图中,初始化Buffer的容量为8(图中从0~7,共8个元素),所以capacity = 8
limit
属性:代表Buffer可读可写的上限。
limit
代表能写入数据的上限位置,这个时候limit = capacity
读模式下:在Buffer
完成所有数据写入后,通过调用flip()
方法,切换到读模式,此时limit
等于Buffer
中实际已经写入的数据大小。因为Buffer
可能没有被写满,所以limit<=capacityposition
属性:代表读取或者写入Buffer
的位置。默认为0。
Buffer
中写入一个值,position
就会自动加1,代表下一次写入的位置。Buffer
中读取一个值,position
就自动加1,代表下一次读取的位置。从上图就能很清晰看出,读写模式下capacity、limit、position的关系了。
mark
属性:代表标记,通过mark()方法,记录当前position值,将position值赋值给mark,在后续的写入或读取过程中,可以通过reset()方法恢复当前position为mark记录的值。这几个重要属性讲完,我们可以再来回顾下:
0 <= mark <= position <= limit <= capacity
现在应该很清晰这几个属性的关系了~
allocate(int capacity)
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
例子中创建的ByteBuffer
是基于堆内存的一个对象。
wrap(array)
wrap
方法可以将数组包装成一个Buffer
对象:
ByteBuffer buffer = ByteBuffer.wrap("hello world".getBytes());
channel.write(buffer);
allocateDirect(int capacity)
通过allocateDirect
方法也可以快速实例化一个Buffer
对象,和allocate
很相似,这里区别的是allocateDirect
创建的是基于堆外内存的对象。
堆外内存不在JVM堆上,不受GC的管理。堆外内存进行一些底层系统的IO操作时,效率会更高。
Buffer
写入可以通过put()
和channel.read(buffer)
两种方式写入。
通常我们NIO的读操作的时候,都是从Channel
中读取数据写入Buffer
,这个对应的是Buffer
的写操作。
Buffer
读取可以通过get()
和channel.write(buffer)
两种方式读入。
还是同上,我们对Buffer
的读入操作,反过来说就是对Channel
的写操作。读取Buffer
中的数据然后写入Channel
中。
rewind()
:重置position位置为0,可以重新读取和写入buffer,一般该方法适用于读操作,可以理解为对buffer的重复读。public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
flip()
:很常用的一个方法,一般在写模式切换到读模式的时候会经常用到。也会将position设置为0,然后设置limit等于原来写入的position。public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
clear()
:重置buffer中的数据,该方法主要是针对于写模式,因为limit设置为了capacity,读模式下会出问题。public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
mark()&reset()
: mark()
方法是保存当前position
到变量mark
z中,然后通过reset()
方法恢复当前position
为mark
,实现代码很简单,如下:public final Buffer mark() {
mark = position;
return this;
}
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
常用的读写方法可以用一张图总结一下:
Selector
是NIO中最为重要的组件之一,我们常常说的多路复用器
就是指的Selector
组件。
Selector
组件用于轮询一个或多个NIO Channel
的状态是否处于可读、可写。通过轮询的机制就可以管理多个Channel,也就是说可以管理多个网络连接。
通过open()
方法,我们可以创建一个Selector
对象。
Selector selector = Selector.open();
我们需要将Channel
注册到Selector
中,才能够被Selector
管理。
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
某个Channel
要注册到Selector
中,那么该Channel必须是非阻塞,所有上面代码中有个configureBlocking()
的配置操作。
在register(Selector selector, int interestSet)
方法的第二个参数,标识一个interest
集合,意思是Selector对哪些事件感兴趣,可以监听四种不同类型的事件:
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << ;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
Connect事件
:连接完成事件( TCP 连接 ),仅适用于客户端,对应 SelectionKey.OP_CONNECT。Accept事件
:接受新连接事件,仅适用于服务端,对应 SelectionKey.OP_ACCEPT 。Read事件
:读事件,适用于两端,对应 SelectionKey.OP_READ ,表示 Buffer 可读。Write事件
:写时间,适用于两端,对应 SelectionKey.OP_WRITE ,表示 Buffer 可写。Channel
触发了一个事件,表明该时间已经准备就绪:
当然,Selector
是可以同时对多个事件感兴趣的,我们使用或运算即可组合多个事件:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
public abstract int select() throws IOException;
public abstract int select(long timeout) throws IOException;
public abstract int selectNow() throws IOException;
当Selector执行select()
方法就会产生阻塞,等到注册在其上的Channel准备就绪就会立即返回,返回准备就绪的数量。
select(long timeout)
则是在select()
的基础上增加了超时机制。
selectNow()
立即返回,不产生阻塞。
有一点非常需要注意: select
方法返回的 int
值,表示有多少 Channel
已经就绪。
自上次调用select
方法后有多少 Channel
变成就绪状态。如果调用 select
方法,因为有一个 Channel
变成就绪状态则返回了 1 ;
若再次调用 select
方法,如果另一个 Channel
就绪了,它会再次返回1。
Set selectedKeys = selector.selectedKeys();
当有新增就绪的Channel
,调用select()
方法,就会将key添加到Set集合中。
前面铺垫了这么多,主要是想让大家能够看懂NIO
代码示例,也方便后续大家来自己手写NIO
网络编程的程序。创建NIO服务端的主要步骤如下:
1. 打开ServerSocketChannel,监听客户端连接 2. 绑定监听端口,设置连接为非阻塞模式 3. 创建Reactor线程,创建多路复用器并启动线程 4. 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件 5. Selector轮询准备就绪的key 6. Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路 7. 设置客户端链路为非阻塞模式 8. 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息 9. 异步读取客户端消息到缓冲区 10.对Buffer编解码,处理半包消息,将解码成功的消息封装成Task 11.将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端
NIOServer.java
:
public class NIOServer {
private static Selector selector;
public static void main(String[] args) {
init();
listen();
}
private static void init() {
ServerSocketChannel serverSocketChannel = null;
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(9000));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NioServer 启动完成");
} catch (IOException e) {
e.printStackTrace();
}
}
private static void listen() {
while (true) {
try {
selector.select();
Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while (keysIterator.hasNext()) {
SelectionKey key = keysIterator.next();
keysIterator.remove();
handleRequest(key);
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
private static void handleRequest(SelectionKey key) throws IOException {
SocketChannel channel = null;
try {
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
channel = serverSocketChannel.accept();
channel.configureBlocking(false);
System.out.println("接受新的 Channel");
channel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
if (count > 0) {
System.out.println("服务端接收请求:" + new String(buffer.array(), 0, count));
channel.register(selector, SelectionKey.OP_WRITE);
}
}
if (key.isWritable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("收到".getBytes());
buffer.flip();
channel = (SocketChannel) key.channel();
channel.write(buffer);
channel.register(selector, SelectionKey.OP_READ);
}
} catch (Throwable t) {
t.printStackTrace();
if (channel != null) {
channel.close();
}
}
}
}
NIOClient.java
:
public class NIOClient {
public static void main(String[] args) {
new Worker().start();
}
static class Worker extends Thread {
@Override
public void run() {
SocketChannel channel = null;
Selector selector = null;
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
channel.connect(new InetSocketAddress(9000));
while (true) {
selector.select();
Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while (keysIterator.hasNext()) {
SelectionKey key = keysIterator.next();
keysIterator.remove();
if (key.isConnectable()) {
System.out.println();
channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("你好".getBytes());
buffer.flip();
channel.write(buffer);
}
channel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = channel.read(buffer);
if (len > 0) {
System.out.println("[" + Thread.currentThread().getName()
+ "]收到响应:" + new String(buffer.array(), 0, len));
Thread.sleep(5000);
channel.register(selector, SelectionKey.OP_WRITE);
}
}
if(key.isWritable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("你好".getBytes());
buffer.flip();
channel = (SocketChannel) key.channel();
channel.write(buffer);
channel.register(selector, SelectionKey.OP_READ);
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally{
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
打印结果:
// Server端
NioServer 启动完成
接受新的 Channel
服务端接收请求:你好
服务端接收请求:你好
服务端接收请求:你好
// Client端
[Thread-0]收到响应:收到
[Thread-0]收到响应:收到
[Thread-0]收到响应:收到
回顾一下使用 NIO
开发服务端程序的步骤:
ServerSocketChannel
和业务处理线程池。Selector
,将之前创建的 ServerSocketChannel
注册到 Selector
上,监听 SelectionKey.OP_ACCEPT
。Selector.select()`` 方法,轮询就绪的
Channel`。Channel
时,如果是处于 OP_ACCEPT
状态,说明是新的客户端接入,调用 ServerSocketChannel.accept
接收新的客户端。SocketChannel
为非阻塞模式,并注册到 Selector
上,监听 OP_READ
。Channel
状态是 OP_READ
,说明有新的就绪数据包需要读取,则构造 ByteBuffer
对象,读取数据。那从这些步骤中基本知道开发者需要熟悉的知识点有:
jdk-nio
提供的几个关键类:Selector
, SocketChannel
, ServerSocketChannel
, FileChannel
,ByteBuffer
,SelectionKey
linux
底层实现,如何正确的关闭channel
,如何退出注销selector
,如何避免selector
太过于频繁client
端获得server
端的返回值,然后才返回给前端,需要如何等待或在怎样作熔断机制NIO 原生 API 的弊端 :
① NIO 组件复杂 : 使用原生 NIO
开发服务器端与客户端 , 需要涉及到 服务器套接字通道 ( ServerSocketChannel
) , 套接字通道 ( SocketChannel
) , 选择器 ( Selector
) , 缓冲区 ( ByteBuffer
) 等组件 , 这些组件的原理 和API 都要熟悉 , 才能进行 NIO
的开发与调试 , 之后还需要针对应用进行调试优化
② NIO 开发基础 : NIO
门槛略高 , 需要开发者掌握多线程、网络编程等才能开发并且优化 NIO
网络通信的应用程序
③ 原生 API 开发网络通信模块的基本的传输处理 : 网络传输不光是实现服务器端和客户端的数据传输功能 , 还要处理各种异常情况 , 如 连接断开重连机制 , 网络堵塞处理 , 异常处理 , 粘包处理 , 拆包处理 , 缓存机制 等方面的问题 , 这是所有成熟的网络应用程序都要具有的功能 , 否则只能说是入门级的 Demo
④ NIO BUG : NIO
本身存在一些 BUG , 如 Epoll
, 导致 选择器 ( Selector
) 空轮询 , 在 JDK 1.7 中还没有解决
Netty
在 NIO
的基础上 , 封装了 Java 原生的 NIO API
, 解决了上述哪些问题呢 ?
相比 Java NIO,使用 Netty
开发程序,都简化了哪些步骤呢?...等等这系列问题也都是我们要问的问题。不过因为这篇只是介绍NIO
相关知识,没有介绍Netty API
的使用,所以介绍Netty API
使用简单开发门槛低等优点有点站不住脚。那么就留到后面跟大家一起开启Netty
学习之旅,探讨人人说好的Netty
到底是不是江湖传言的那么好。
一起期待后续的Netty
之旅吧!