Java NIO 的相关资料很多,对 channel,buffer,selector 如何相关概念也有详细的阐述。但是,不亲自写代码调试一遍,对这些概念的理解仍然是一知半解。
即使代码跑起来,也不见得有多懂这些概念,因为只是肤浅的尝试,但肤浅的尝试胜过于纸上谈兵,至少迈出了第一步,后续深入,可能要等到真的有实际应用时,才会深入研究。先贴示例代码。
一个典型的服务端:
Server端代码
public class MyServer {
private Selector selector;
private ServerSocketChannel serverChannel;
public void start() throws Exception {
int port = 9527;
// 创建选择器
selector = Selector.open();
// 打开监听通道
serverChannel = ServerSocketChannel.open();
// 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
serverChannel.configureBlocking(false);// 开启非阻塞模式
// 绑定端口 backlog设为1024
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
// 监听客户端连接请求
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器已启动,端口号:" + port);
while (true) {
// 无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(1000);
// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
// selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
handleInput(key);
}
}
}
private void handleInput(SelectionKey key) throws Exception {
if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// 通过ServerSocketChannel的accept创建SocketChannel实例
// 完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
SocketChannel sc = ssc.accept();
// 设置为非阻塞的
sc.configureBlocking(false);
// 注册为读
sc.register(selector, SelectionKey.OP_READ);
}
// 读消息
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
// 创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
// 读取到字节,对字节进行编解码
if (readBytes > 0) {
// 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
// 根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
// 将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String input = new String(bytes, "UTF-8");
System.out.println("服务器收到消息:" + input);
// 发送应答消息
doWrite(sc, LocalTime.now().toString());
}
} else if (key.isWritable()) {
ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
sendbuffer.clear();
SocketChannel sc = (SocketChannel) key.channel();
sc.write(sendbuffer);
}
}
}
// 异步发送应答消息
private void doWrite(SocketChannel channel, String response) throws IOException {
// 将消息编码为字节数组
byte[] bytes = response.getBytes();
// 根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
// 将字节数组复制到缓冲区
writeBuffer.put(bytes);
// flip操作
writeBuffer.flip();
// 发送缓冲区的字节数组
channel.write(writeBuffer);
}
}
启动Server端
public class Main {
public static void main(String[] args) throws Exception {
MyServer myserver = new MyServer();
myserver.start();
}
}
关键代码:
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
把 channel 注册到 selector 上,如此,一个 selector 可以管理多个 channel。
selector.select(1000);
该方法功能就是阻塞直到该选择器中的通道所关注的事件就绪,最多阻塞 1000 毫秒,使得程序可以继续往下运行。
while(true){
while (it.hasNext()) {}
}
这样的循环结构,使得服务器不断的轮询是否有请求事件发生,如果有发生,则会获得这个请求中的 channel,并且往这个 channel 中读写数据。
handleInput 方法中,就是处理对应的请求。
客户端:
Client端代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class MyClient {
private Selector selector;
private SocketChannel socketChannel;
public void start() throws Exception {
int port = 9527;
String host = "127.0.0.1";
// 创建选择器
selector = Selector.open();
// 打开监听通道
socketChannel = SocketChannel.open();
// 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
socketChannel.configureBlocking(false);// 开启非阻塞模式
socketChannel.connect(new InetSocketAddress(host, port));
// 等待100毫秒直到连接上服务器
while (!socketChannel.finishConnect()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
socketChannel.register(selector, SelectionKey.OP_CONNECT);
while (true) {
try {
// 无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(1000);
// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
// selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
// 读消息
if (key.isReadable()) {
// 创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
// 读取到字节,对字节进行编解码
if (readBytes > 0) {
// 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
// 根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
// 将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result = new String(bytes, "UTF-8");
System.out.println("客户端收到消息:" + result);
}
}
}
}
// 异步发送消息
private void doWrite(SocketChannel channel, String request) throws IOException {
// 将消息编码为字节数组
byte[] bytes = request.getBytes();
// 根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
// 将字节数组复制到缓冲区
writeBuffer.put(bytes);
// flip操作
writeBuffer.flip();
// 发送缓冲区的字节数组
channel.write(writeBuffer);
}
public void sendMsg(String msg) throws Exception {
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}
}
启动Server端Client
public class Main {
public static void main(String[] args) throws Exception {
final MyClient myclient = new MyClient();
FutureTask<Integer> Task2 = new FutureTask<>(() -> {
myclient.start();
return 0;
});// 用FutureTask包裹
Thread Thread2 = new Thread(Task2);// 用Thread包裹
Thread2.start();
Thread.sleep(1000);
myclient.sendMsg("time");
}
}
客户端代码和服务器端代码,原理是类似的。
上述例子中,先启动服务器端代码,然后启动客户端代码,就能跑起来。例子中,客户端发送任意字符到服务器端,服务器返回当前时间给客户端。
原创文章,转载请注明出处!https://cloud.tencent.com/developer/article/1362838