package com.qfedu.b_niochat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* NIO 非阻塞状态的TCP聊天室客户端核心代码
*
* @author Anonymous 2020/3/16 16:20
*/
public class ChatClient {
/**
* 服务器IP地址
*/
private static final String HOST = "192.168.31.154";
/**
* 服务器连接对应的端口号
*/
private static final int PORT = 8848;
/**
* 返回NIO要求是ScoketChannel对象
*/
private SocketChannel socket;
/**
* 用户名
*/
private String userName;
/**
* 客户端构造方法,创建客户端对象
*
* @param userName 指定的用户名
*/
public ChatClient(String userName) throws IOException, InterruptedException {
// 1. 打开SocketChannel
socket = SocketChannel.open();
// 2. 设置非阻塞状态
socket.configureBlocking(false);
// 3. 根据指定的HOST IP地址和对应PORT端口号创建对应的 InetSocketAddress
InetSocketAddress address = new InetSocketAddress(HOST, PORT);
// 4. 连接服务器
if (!socket.connect(address)) {
// 如果没有连接到服务器,保持请求连接的状态
while (!socket.finishConnect()) {
System.out.println("服务器请求连接失败,等待2s继续请求连接...");
Thread.sleep(2000);
}
}
this.userName = userName;
System.out.println("客户端 " + userName + " 准备就绪");
}
/*
这里需要完成两个方法,一个是发送数据到服务器,一个是接受服务器发送的数据
*/
/**
* 发送数据到服务器,用于广播消息,群聊
*
* @param message 指定的消息
*/
public void sendMsg(String message) throws IOException {
// 断开服务器连接 close
if ("close".equals(message)) {
socket.close();
return;
}
/*
StringBuffer
线程安全,效率低
StringBuilder
线程不安全,效率高
*/
message = userName + ":" + message;
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
socket.write(buffer);
}
public void receiveMsg() throws IOException {
// 准备ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
int length = socket.read(buffer);
if (length > 0) {
System.out.println(new String(buffer.array()));
}
}
}
package com.qfedu.b_niochat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* NIO 非阻塞状态的TCP聊天室服务端核心代码
*
* @author Anonymous 2020/3/16 16:59
*/
public class ChatServer {
/**
* 服务器核心模块,ServerSocketChannel
*/
private ServerSocketChannel serverSocket;
/**
* 服务端NIO Selector选择器
*/
private Selector selector;
/**
* 服务端监听服务指定端口号
*/
private static final int PORT = 8848;
/*
1. 构造方法
2. 接收方法
3. 发送方法(广播)
4. 同时启动接收和发送功能 start
*/
/**
* 服务器构造方法,开启ServerSocketChannel,同时开启Selector,注册操作
*
* @throws IOException 异常
*/
public ChatServer() throws IOException {
// 1. 启动服务器SocketServer NIO服务器
serverSocket = ServerSocketChannel.open();
// 2. 启动选择器
selector = Selector.open();
// 3. 端口绑定
serverSocket.bind(new InetSocketAddress(PORT));
// 4. 选择NIO方式为非阻塞状态
serverSocket.configureBlocking(false);
// 5. 注册SeverSocket,确定当前监听的状态是OP_ACCEPT
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
}
/**
* 服务器干活方法,指定客户端绑定,数据接收和转发
*/
public void start(){
try {
while (true) {
if (0 == selector.select(2000)) {
System.out.println("服务器默默的等待连接,无人访问...");
continue;
}
/*
selectedKeys:
获取当前所有发生事件操作的对应SelectionKey Set集合
*/
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 1. 连接
if (key.isAcceptable()) {
// 连接客户端,获取对应的SocketChannel对象
SocketChannel socket = serverSocket.accept();
socket.configureBlocking(false);
socket.register(selector, SelectionKey.OP_READ);
// 广播上线
broadcast(socket, socket.getRemoteAddress().toString() + "上线了");
}
// 2. 接收数据转发
if (key.isReadable()) {
readMsg(key);
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 从指定的SelectionKey中读取数据
*
* @param key 符合OP_READ 要求的SelectionKey
*/
public void readMsg(SelectionKey key) throws IOException {
// 根据指定SelectionKey获取对应的SocketChannel对象
SocketChannel socket = (SocketChannel) key.channel();
// 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 从缓冲区中读取数据,返回值类型是读取到的字节数
int length = socket.read(buffer);
// 因为读取的数据可能存在0的情况
if (length > 0) {
String message = new String(buffer.array());
// 广播数据
broadcast(socket, message);
}
}
/**
* 广播方法,该方法是群发消息,但是不要发给自己
*
* @param self 当前发送数据的客户端
* @param message 消息
*/
public void broadcast(SocketChannel self, String message) throws IOException {
/*
获取当前Selector选择器中所有的SelectionKey
Selector中注册的内容有SocketChannel对应的SelectionKey
已经ServerSocketChannel对应的SelectionKey
*/
Set<SelectionKey> keys = selector.keys();
// 遍历整个SelectionKey Set集合
for (SelectionKey key : keys) {
// 获取对应SelectionKey的Channel对象
SelectableChannel channel = key.channel();
// 第一: channel对应的是一个SocketChannel对象,并且不是当前发送消息的SocketChannel对象
if (channel instanceof SocketChannel && !channel.equals(self)) {
SocketChannel socketChannel = (SocketChannel) channel;
// 根据指定的Byte类型数组,创建对应的ByteBuffer缓冲区
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
// 发送数据
socketChannel.write(buffer);
}
}
}
public static void main(String[] args) throws IOException {
new ChatServer().start();
}
}
package com.qfedu.b_niochat;
import java.io.IOException;
import java.util.Scanner;
/**
* 客户端线程代码
*
* @author Anonymous 2020/3/16 17:37
*/
public class ChatClientThread {
public static void main(String[] args) throws IOException, InterruptedException {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名:");
String userName = scanner.nextLine();
if (0 == userName.length()) {
return;
}
ChatClient chatClient = new ChatClient(userName);
// 接收消息
new Thread(() -> {
while (true) {
try {
chatClient.receiveMsg();
Thread.sleep(2000);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 发送消息
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
chatClient.sendMsg(msg);
}
}
}