前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java NIO 实现网络通信

Java NIO 实现网络通信

作者头像
水货程序员
修改2018-11-15 22:47:41
1K0
修改2018-11-15 22:47:41
举报
文章被收录于专栏:javathings

Java NIO 的相关资料很多,对 channel,buffer,selector 如何相关概念也有详细的阐述。但是,不亲自写代码调试一遍,对这些概念的理解仍然是一知半解。

即使代码跑起来,也不见得有多懂这些概念,因为只是肤浅的尝试,但肤浅的尝试胜过于纸上谈兵,至少迈出了第一步,后续深入,可能要等到真的有实际应用时,才会深入研究。先贴示例代码。

一个典型的服务端:

Server端代码

代码语言:javascript
复制
public class MyServer {
 
	private Selector selector;
	private ServerSocketChannel serverChannel;
 
	public void start() throws Exception {
		int port = 9527;
		// 创建选择器
		selector = Selector.open();
		// 打开监听通道
		serverChannel = ServerSocketChannel.open();
		// 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
		serverChannel.configureBlocking(false);// 开启非阻塞模式
		// 绑定端口 backlog设为1024
		serverChannel.socket().bind(new InetSocketAddress(port), 1024);
		// 监听客户端连接请求
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		System.out.println("服务器已启动,端口号:" + port);
		while (true) {
			// 无论是否有读写事件发生,selector每隔1s被唤醒一次
			selector.select(1000);
			// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
			// selector.select();
			Set<SelectionKey> keys = selector.selectedKeys();
			Iterator<SelectionKey> it = keys.iterator();
			while (it.hasNext()) {
				SelectionKey key = it.next();
				it.remove();
				handleInput(key);
			}
		}
	}
 
	private void handleInput(SelectionKey key) throws Exception {
		if (key.isValid()) {
			// 处理新接入的请求消息
			if (key.isAcceptable()) {
				ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
				// 通过ServerSocketChannel的accept创建SocketChannel实例
				// 完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
				SocketChannel sc = ssc.accept();
				// 设置为非阻塞的
				sc.configureBlocking(false);
				// 注册为读
				sc.register(selector, SelectionKey.OP_READ);
			}
			// 读消息
			if (key.isReadable()) {
				SocketChannel sc = (SocketChannel) key.channel();
				// 创建ByteBuffer,并开辟一个1M的缓冲区
				ByteBuffer buffer = ByteBuffer.allocate(1024);
				// 读取请求码流,返回读取到的字节数
				int readBytes = sc.read(buffer);
				// 读取到字节,对字节进行编解码
				if (readBytes > 0) {
					// 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
					buffer.flip();
					// 根据缓冲区可读字节数创建字节数组
					byte[] bytes = new byte[buffer.remaining()];
					// 将缓冲区可读字节数组复制到新建的数组中
					buffer.get(bytes);
					String input = new String(bytes, "UTF-8");
					System.out.println("服务器收到消息:" + input);
					// 发送应答消息
					doWrite(sc, LocalTime.now().toString());
				}
 
			} else if (key.isWritable()) {
				ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
				sendbuffer.clear();
				SocketChannel sc = (SocketChannel) key.channel();
				sc.write(sendbuffer);
			}
		}
	}
 
	// 异步发送应答消息
	private void doWrite(SocketChannel channel, String response) throws IOException {
		// 将消息编码为字节数组
		byte[] bytes = response.getBytes();
		// 根据数组容量创建ByteBuffer
		ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
		// 将字节数组复制到缓冲区
		writeBuffer.put(bytes);
		// flip操作
		writeBuffer.flip();
		// 发送缓冲区的字节数组
		channel.write(writeBuffer);
	}
}

启动Server端

代码语言:java
复制
public class Main {
 	public static void main(String[] args) throws Exception {
		MyServer myserver = new MyServer();
		myserver.start();
	}
}

关键代码:

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

把 channel 注册到 selector 上,如此,一个 selector 可以管理多个 channel。

selector.select(1000);

该方法功能就是阻塞直到该选择器中的通道所关注的事件就绪,最多阻塞 1000 毫秒,使得程序可以继续往下运行。

while(true){

while (it.hasNext()) {}

}

这样的循环结构,使得服务器不断的轮询是否有请求事件发生,如果有发生,则会获得这个请求中的 channel,并且往这个 channel 中读写数据。

handleInput 方法中,就是处理对应的请求。

客户端:

Client端代码

代码语言:javascript
复制
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
 
public class MyClient {
	private Selector selector;
	private SocketChannel socketChannel;
 
	public void start() throws Exception {
		int port = 9527;
		String host = "127.0.0.1";
		// 创建选择器
		selector = Selector.open();
		// 打开监听通道
		socketChannel = SocketChannel.open();
		// 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
		socketChannel.configureBlocking(false);// 开启非阻塞模式
 
		socketChannel.connect(new InetSocketAddress(host, port));
		// 等待100毫秒直到连接上服务器
		while (!socketChannel.finishConnect()) {
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		socketChannel.register(selector, SelectionKey.OP_CONNECT);
 
		while (true) {
			try {
				// 无论是否有读写事件发生,selector每隔1s被唤醒一次
				selector.select(1000);
				// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
				// selector.select();
				Set<SelectionKey> keys = selector.selectedKeys();
				Iterator<SelectionKey> it = keys.iterator();
				SelectionKey key = null;
				while (it.hasNext()) {
					key = it.next();
					it.remove();
					try {
						handleInput(key);
					} catch (Exception e) {
						if (key != null) {
							key.cancel();
							if (key.channel() != null) {
								key.channel().close();
							}
						}
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
				System.exit(1);
			}
		}
 
	}
 
	private void handleInput(SelectionKey key) throws IOException {
		if (key.isValid()) {
			SocketChannel sc = (SocketChannel) key.channel();
			// 读消息
			if (key.isReadable()) {
				// 创建ByteBuffer,并开辟一个1M的缓冲区
				ByteBuffer buffer = ByteBuffer.allocate(1024);
				// 读取请求码流,返回读取到的字节数
				int readBytes = sc.read(buffer);
				// 读取到字节,对字节进行编解码
				if (readBytes > 0) {
					// 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
					buffer.flip();
					// 根据缓冲区可读字节数创建字节数组
					byte[] bytes = new byte[buffer.remaining()];
					// 将缓冲区可读字节数组复制到新建的数组中
					buffer.get(bytes);
					String result = new String(bytes, "UTF-8");
					System.out.println("客户端收到消息:" + result);
				}
			}
		}
	}
 
	// 异步发送消息
	private void doWrite(SocketChannel channel, String request) throws IOException {
		// 将消息编码为字节数组
		byte[] bytes = request.getBytes();
		// 根据数组容量创建ByteBuffer
		ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
		// 将字节数组复制到缓冲区
		writeBuffer.put(bytes);
		// flip操作
		writeBuffer.flip();
		// 发送缓冲区的字节数组
		channel.write(writeBuffer);
	}
 
	public void sendMsg(String msg) throws Exception {
		socketChannel.register(selector, SelectionKey.OP_READ);
		doWrite(socketChannel, msg);
	}
} 

启动Server端Client

代码语言:javascript
复制
public class Main {
 
	public static void main(String[] args) throws Exception {
		final MyClient myclient = new MyClient();
		FutureTask<Integer> Task2 = new FutureTask<>(() -> {
			myclient.start();
			return 0;
		});// 用FutureTask包裹
		Thread Thread2 = new Thread(Task2);// 用Thread包裹
		Thread2.start();
		
		Thread.sleep(1000);
		myclient.sendMsg("time");
		
	}
 
}

客户端代码和服务器端代码,原理是类似的。

上述例子中,先启动服务器端代码,然后启动客户端代码,就能跑起来。例子中,客户端发送任意字符到服务器端,服务器返回当前时间给客户端。

原创文章,转载请注明出处!https://cloud.tencent.com/developer/article/1362838

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档