NIO(Non-blocking I/O,在Java领域,也称为New I/O),是一种同步非阻塞的I/O模型,也是I/O多路复用的基础,已经被越来越多地应用到大型应用服务器,成为解决高并发与大量连接、I/O处理问题的有效方式。
核心组件主要包括:
标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。
Java NIO可以让你异步的使用IO,例如:当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似。
Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。
Channel
与 Stream
的区别
Channel
进行读和写,但只能从Stream
中,单向获取数据(读或写)Channels
可以进行异步 读 或 写Channel
通常都是基于 Buffer
进行读或写Channel
在 Java NIO
中的一些 具体实现
FileChannel Demo
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);
while (bytesRead != -1) {
System.out.println("Read " + bytesRead);
buf.flip();
while(buf.hasRemaining()){
System.out.print((char) buf.get());
}
buf.clear();
bytesRead = inChannel.read(buf);
}
aFile.close();
Buffer
本质上是一个内存块,您可以在其中写入数据,然后可以在以后再次读取。该内存块包装在NIO Buffer对象中,该对象提供了一组方法,可以更轻松地使用该部分数据。
Buffer
中三个主要属性:
常用方法:
limit
设置到数据末端, position
设置为0,进行读操作position
为0 ,进行读操作Buffer
在 Java NIO
中的一些 具体实现
Selector
的优势在于,只需要使用一个线程,就可以管控多个 channel
,线程之间的切换对于操作系统来说是昂贵的,并且每个线程也占用操作系统中的一些资源(内存)。因此,使用的线程越少越好。
需要注意的是, channel 必须设置成 non-blocking
的模式,因此对于 FileChannel 不适用当前模式
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
SelectionKey 的 四种状态
1、创建 ServerSocketChannel 监听信息
public class RPCServer {
private Map<String, Object> services = new HashMap<>();
private Selector selector;
private ServerSocketChannel ssc;
public RPCServer() {
try {
ssc = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(3003);
ssc.configureBlocking(false);
ssc.bind(address);
selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
2、提供服务类(HelloService.class
)
public interface HelloService {
String sayHello();
void sayHi(String temp);
}
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello() {
return "jiuling";
}
@Override
public void sayHi(String temp) {
System.out.println(temp);
}
}
3、ServerSocketChannel
监听请求
public void start() {
System.out.println("-----开始监听请求------");
try {
while (selector.select() > 0) {
for (SelectionKey sk : selector.selectedKeys()) {
selector.selectedKeys().remove(sk);
if (sk.isAcceptable()) {
// 通过accept方法,获取对应的SocketChannel
SocketChannel sc = ssc.accept();
// 设置采用非阻塞模式
sc.configureBlocking(false);
// 将channel 状态设置为可读状态
sc.register(selector, SelectionKey.OP_READ);
sk.interestOps(SelectionKey.OP_ACCEPT);
} else if (sk.isReadable()) {
SocketChannel sc = (SocketChannel) sk.channel();
try {
//调用反射方法
remoteHandMethod(sk, sc);
} catch (Exception e) {
//从Selector中删除指定的SelectionKey
sk.cancel();
if (sk.channel() != null) {
sk.channel().close();
}
}
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
4、remoteHandMethod
反射调用
private void remoteHandMethod(SelectionKey sk,
SocketChannel sc) throws Exception {
//1、从流中读取数据
ByteBuffer buff = ByteBuffer.allocate(1024);
sc.read(buff);
int postion = buff.position();//这里获取它真正的大小
byte[] data = buff.array();
String message = new String(data, 0, postion);// class/方法名(参数类型:参数,参数类型:参数)
message = message.trim();
//2、解析数据,获取请求内容()
String[] clazzInfo = message.split("/");
String className = clazzInfo[0];
String methodName = clazzInfo[1].substring(0,
clazzInfo[1].indexOf("("));
String temp = clazzInfo[1].substring(
clazzInfo[1].indexOf("(") + 1,
clazzInfo[1].indexOf(")"));
String typeValues = decodeParamsTypeAndValue(temp);
//3、反射调用
Object object = services.get(className);
Class clazz = object.getClass();
Object result = null;
if (typeValues == null) {
Method method = clazz.getDeclaredMethod(methodName,
null);
result = method.invoke(object, null);
} else {
Class[] types = new Class[typeValues.length];
Object[] values = new Object[typeValues.length];
for (int i = 0; i < typeValues.length; i++) {
String[] tv = typeValues[i].split(":");
String type = tv[0];
String value = tv[1];
types[i] = Class.forName(type);
if (type.contains("Integer") || type.contains("int"))
values[i] = Integer.parseInt(value);
else if (type.contains("Float") || type.contains("float"))
values[i] = Float.parseFloat(value);
else if (type.contains("Double") || type.contains("double"))
values[i] = Double.parseDouble(value);
else if (type.contains("Long") || type.contains("long"))
values[i] = Long.parseLong(value);
else
values[i] = value;
}
Method method = clazz.getDeclaredMethod(methodName,
types);
result = method.invoke(object, values);
}
//4、 返回内容
sc.write(ByteBuffer.wrap(result.toString()
.getBytes()));
sk.interestOps(SelectionKey.OP_READ);
}
// 它返回的格式是 参数类型:参数值
private String[] decodeParamsTypeAndValue(String params) {
if (params == null || params.equals(""))
return null;
if (params.indexOf(",") < 0)
return new String[]{params};
return params.split(",");
}
1、创建客户端
public class RPCClient {
private SocketChannel channel;
private ByteBuffer buffer = ByteBuffer.allocate(1024);
private static RPCClient client = new RPCClient();
private Selector selector = null;
public RPCClient(String serverIp) {
try {
System.out.println("------客户端要启动了--------");
selector = Selector.open();
InetSocketAddress isa = new InetSocketAddress(serverIp, 3003);
// 获取socket通道
channel = SocketChannel.open(isa);
// 连接服务器
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
2、获取代理类
// 获取代理
public Object getRemoteProxy(final Class clazz) {
//动态产生实现类
return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
String methodName = method.getName();
String clazzName = clazz
.getSimpleName();
Object result = null;
if (args == null || args.length == 0) {// 表示没有参数 它传递的类型
// 接口名/方法名()
channel.write(ByteBuffer
.wrap((clazzName + "/" + methodName + "()")
.getBytes()));
} else {
int size = args.length;
String[] types = new String[size];
StringBuffer content = new StringBuffer(clazzName)
.append("/").append(methodName).append("(");
for (int i = 0; i < size; i++) {
types[i] = args[i].getClass().getName();
content.append(types[i]).append(":").append(args[i]);
if (i != size - 1)
content.append(",");
}
content.append(")");
channel.write(ByteBuffer
.wrap(content.toString().getBytes()));
}
// 获取结果
result = getresult();
return result;
}
});
}
3、获取返回结果
private Object getresult() {
// 解析结果 如果结尾为null或NULL则忽略
try {
while (selector.select() > 0) {
for (SelectionKey sk : selector.selectedKeys()) {
selector.selectedKeys().remove(sk);
if (sk.isReadable()) {
SocketChannel sc = (SocketChannel) sk.channel();
buffer.clear();
sc.read(buffer);
int postion = buffer.position();
String result = new String(buffer.array(), 0, postion);
result = result.trim();
buffer.clear();
if (result.endsWith("null") || result.endsWith("NULL"))
return null;
String[] typeValue = result.split(":");
String type = typeValue[0];
String value = typeValue[1];
if (type.contains("Integer") || type.contains("int"))
return Integer.parseInt(value);
else if (type.contains("Float")
|| type.contains("float"))
return Float.parseFloat(value);
else if (type.contains("Long") || type.contains("long"))
return Long.parseLong(value);
else
return value;
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
}
完整代码可以关注公众号获取~
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。