// Reactor 反应器
// - 由Selector IO多路复用选择器提供事件注册与捕获
// - 被捕获事件进行统一处理分发给下游处理
public class DefaultReactor implements Reactor {
private final static int PORT = 8080;
private final Selector selector;
private final Server server;
public DefaultReactor() throws IOException {
this(PORT);
}
public DefaultReactor(int port) throws IOException {
selector = Selector.open();//IO多路复用
server = new Server(selector, port);
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
// 获取发生的事件(阻塞)
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterable = selectionKeys.iterator();
while (iterable.hasNext()) {
// 对事件进行分发
dispatch(iterable.next());
iterable.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
LockSupport.parkNanos(1000 * 1000 * 1000);
}
}
private void dispatch(SelectionKey selectionKey) {
// 获取事件的附加器
// ACCEPT 事件的附加器是 Acceptor, 故由 Acceptor 来处理 ACCEPT 事件
// READ 事件的附加器是 Handler, 故由 Handler 来处理 READ 事件
Object attachment = selectionKey.attachment();
if (attachment instanceof Acceptor) {
((Acceptor) attachment).run();
return;
}
if (attachment instanceof Handler) {
((Handler) attachment).run();
return;
}
}
}
// Acceptor接收器
// - 类似于ServerSocketChannel服务器
// - 专门处理ACCEPT首次访问的IO事件
// - 每次有ACCEPT访问时, 就创建新IO Channel(SocketChannel)注册到Reactor反应器的Selector中, 等待捕获
public class Server implements Acceptor{
private final ServerSocketChannel serverSocketChannel;
private final Selector selector;
public Server(Selector selector, int port) throws IOException {
this.selector = selector;
// 服务端创建 listen-socket 管道
this.serverSocketChannel = ServerSocketChannel.open();
// 绑定端口
this.serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 设置为非阻塞模式
this.serverSocketChannel.configureBlocking(false);
// ACCEPT 事件的附加器是 Acceptor
this.serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, this);
}
@Override
public void run() {
try {
// 处理ACCEPT事件
// 为连接的客户端创建 client-socket 管道
SocketChannel clientSocketChannel = serverSocketChannel.accept();
// 设置为非阻塞
clientSocketChannel.configureBlocking(false);
// READ 事件的附加器是 Handler
clientSocketChannel.register(selector, SelectionKey.OP_READ,
new DefaultHandler(clientSocketChannel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
// Handler 处理器
// - 由SocketChannel实现
// - 处理客户端发送过来的真正业务内容
public class DefaultHandler implements Handler {
private final SocketChannel clientSocketChannel;
public DefaultHandler(SocketChannel clientSocketChannel) {
this.clientSocketChannel = clientSocketChannel;
}
@Override
public void run() {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
// 读取数据
int read = clientSocketChannel.read(byteBuffer);
if (read <= 0) {
clientSocketChannel.close();
} else {
System.out.println("----" + new String(byteBuffer.array()) + "----");
// 响应结果 200, 模拟请求响应
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Yes, He is";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());// 数据存放在byte数组
while (buffer.hasRemaining()) {
// hasRemaining() 返回是否有剩余的可用长度
clientSocketChannel.write(buffer); // 非阻塞
}
}
} catch (IOException e1) {
try {
clientSocketChannel.close();
} catch (IOException e2) {
e2.printStackTrace();
}
e1.printStackTrace();
}
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。