前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《最佳实践》-NIO知识梳理

《最佳实践》-NIO知识梳理

原创
作者头像
九灵
修改2020-12-14 15:24:30
3320
修改2020-12-14 15:24:30
举报
文章被收录于专栏:Jaycekon

1、概述

NIO(Non-blocking I/O,在Java领域,也称为New I/O),是一种同步非阻塞的I/O模型,也是I/O多路复用的基础,已经被越来越多地应用到大型应用服务器,成为解决高并发与大量连接、I/O处理问题的有效方式。

核心组件主要包括:

  • Channel
  • Buffer
  • Selector
Java NIO: Channels and Buffers(通道和缓冲区)

标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。

Java NIO: Asynchronous IO(异步IO)

Java NIO可以让你异步的使用IO,例如:当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似。

Java NIO: Selectors(选择器)

Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。

2、基础组件

2.1、Channel

ChannelStream 的区别

  • 你可以通过 Channel 进行读和写,但只能从Stream中,单向获取数据(读或写)
  • Channels 可以进行异步
  • Channel 通常都是基于 Buffer 进行读或写

ChannelJava NIO 中的一些 具体实现

  • FileChannel : 通过文件获取数据
  • DatagramChannel : 通过UDP进行网络数据传输
  • SocketChannel : 通过TCP进行网络数据传输
  • ServerSocketChannel : 通监听TCP链接,进行数据链接。每个链接都会创建一个 ServerSocketChannel。

FileChannel Demo

代码语言:txt
复制
    RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
    FileChannel inChannel = aFile.getChannel();

    ByteBuffer buf = ByteBuffer.allocate(48);

    int bytesRead = inChannel.read(buf);
    while (bytesRead != -1) {

      System.out.println("Read " + bytesRead);
      buf.flip();

      while(buf.hasRemaining()){
          System.out.print((char) buf.get());
      }

      buf.clear();
      bytesRead = inChannel.read(buf);
    }
    aFile.close();

2.2、Buffer

Buffer 本质上是一个内存块,您可以在其中写入数据,然后可以在以后再次读取。该内存块包装在NIO Buffer对象中,该对象提供了一组方法,可以更轻松地使用该部分数据。

Buffer 中三个主要属性:

  • capacity : Buffer 大小
  • position : 当前操作下标
  • limit : 数据大小

常用方法:

  • flip() : 写数据完成后,将limit设置到数据末端, position设置为0,进行读操作
  • rewind() : 重新设置 position为0 ,进行读操作

BufferJava NIO 中的一些 具体实现

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

2.3、Selector

Selector 的优势在于,只需要使用一个线程,就可以管控多个 channel ,线程之间的切换对于操作系统来说是昂贵的,并且每个线程也占用操作系统中的一些资源(内存)。因此,使用的线程越少越好。

需要注意的是, channel 必须设置成 non-blocking 的模式,因此对于 FileChannel 不适用当前模式

代码语言:txt
复制
channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

SelectionKey 的 四种状态

  • SelectionKey.OP_CONNECT
  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE

3、NIO 实现RPC通信

3.1、RpcServer

1、创建 ServerSocketChannel 监听信息

代码语言:txt
复制
public class RPCServer {
    private Map<String, Object> services = new HashMap<>();
    private Selector selector;
    private ServerSocketChannel ssc;

    public RPCServer() {
        try {
            ssc = ServerSocketChannel.open();
            InetSocketAddress address = new InetSocketAddress(3003);
            ssc.configureBlocking(false);
            ssc.bind(address);
            selector = Selector.open();
            ssc.register(selector, SelectionKey.OP_ACCEPT);

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

2、提供服务类(HelloService.class)

代码语言:txt
复制
public interface HelloService {

    String sayHello();

    void sayHi(String temp);
}

public class HelloServiceImpl implements HelloService {

    @Override
    public String sayHello() {
        return "jiuling";
    }

    @Override
    public void sayHi(String temp) {
        System.out.println(temp);
    }
}

3、ServerSocketChannel 监听请求

代码语言:txt
复制
 public void start() {
        System.out.println("-----开始监听请求------");
        try {
            while (selector.select() > 0) {
                for (SelectionKey sk : selector.selectedKeys()) {
                    selector.selectedKeys().remove(sk);
                    if (sk.isAcceptable()) {
						// 通过accept方法,获取对应的SocketChannel
                        SocketChannel sc = ssc.accept();
						// 设置采用非阻塞模式
                        sc.configureBlocking(false);
                        // 将channel 状态设置为可读状态
                        sc.register(selector, SelectionKey.OP_READ);
                        sk.interestOps(SelectionKey.OP_ACCEPT);
                    } else if (sk.isReadable()) {
					
                        SocketChannel sc = (SocketChannel) sk.channel();
                        try {
                        	//调用反射方法
                            remoteHandMethod(sk, sc);
                        } catch (Exception e) {
                            //从Selector中删除指定的SelectionKey
                            sk.cancel();
                            if (sk.channel() != null) {
                                sk.channel().close();
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

4、remoteHandMethod反射调用

代码语言:txt
复制
private void remoteHandMethod(SelectionKey sk,
                                  SocketChannel sc) throws Exception {
      //1、从流中读取数据                            
      ByteBuffer buff = ByteBuffer.allocate(1024);
        sc.read(buff);

        int postion = buff.position();//这里获取它真正的大小
        byte[] data = buff.array();
        String message = new String(data, 0, postion);// class/方法名(参数类型:参数,参数类型:参数)
        message = message.trim();
	
      //2、解析数据,获取请求内容()
        String[] clazzInfo = message.split("/");
        String className = clazzInfo[0];
        String methodName = clazzInfo[1].substring(0,
                clazzInfo[1].indexOf("("));
        String temp = clazzInfo[1].substring(
                clazzInfo[1].indexOf("(") + 1,
                clazzInfo[1].indexOf(")"));
		String typeValues = decodeParamsTypeAndValue(temp);
       
       //3、反射调用
      Object object = services.get(className);
      Class clazz = object.getClass();
      Object result = null;                    
     
     if (typeValues == null) {
            Method method = clazz.getDeclaredMethod(methodName,
                    null);
            result = method.invoke(object, null);
        } else {
            Class[] types = new Class[typeValues.length];
            Object[] values = new Object[typeValues.length];
            for (int i = 0; i < typeValues.length; i++) {
                String[] tv = typeValues[i].split(":");
                String type = tv[0];
                String value = tv[1];
                types[i] = Class.forName(type);
                if (type.contains("Integer") || type.contains("int"))
                    values[i] = Integer.parseInt(value);
                else if (type.contains("Float") || type.contains("float"))
                    values[i] = Float.parseFloat(value);
                else if (type.contains("Double") || type.contains("double"))
                    values[i] = Double.parseDouble(value);
                else if (type.contains("Long") || type.contains("long"))
                    values[i] = Long.parseLong(value);
                else
                    values[i] = value;
            }
            Method method = clazz.getDeclaredMethod(methodName,
                    types);
            result = method.invoke(object, values);
        }
        
        
       //4、 返回内容
        sc.write(ByteBuffer.wrap(result.toString()
                .getBytes()));
        sk.interestOps(SelectionKey.OP_READ);
                                  
}


    // 它返回的格式是 参数类型:参数值
    private String[] decodeParamsTypeAndValue(String params) {
        if (params == null || params.equals(""))
            return null;
        if (params.indexOf(",") < 0)
            return new String[]{params};
        return params.split(",");


    }

3.2、RpcClient

1、创建客户端

代码语言:txt
复制
public class RPCClient {
    private SocketChannel channel;
    private ByteBuffer buffer = ByteBuffer.allocate(1024);
    private static RPCClient client = new RPCClient();
    private Selector selector = null;

	public RPCClient(String serverIp) {
     try {
            System.out.println("------客户端要启动了--------");
            selector = Selector.open();
            InetSocketAddress isa = new InetSocketAddress(serverIp, 3003);
// 获取socket通道
            channel = SocketChannel.open(isa);
// 连接服务器
            channel.configureBlocking(false);


            channel.register(selector, SelectionKey.OP_READ);


        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    
    }

}

2、获取代理类

代码语言:txt
复制
   // 获取代理
    public Object getRemoteProxy(final Class clazz) {
//动态产生实现类

        return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{clazz}, new InvocationHandler() {


            @Override
            public Object invoke(Object proxy, Method method, Object[] args)
                    throws Throwable {

                String methodName = method.getName();
                String clazzName = clazz
                        .getSimpleName();
                Object result = null;
                if (args == null || args.length == 0) {// 表示没有参数 它传递的类型
// 接口名/方法名()
                    channel.write(ByteBuffer
                            .wrap((clazzName + "/" + methodName + "()")
                                    .getBytes()));
                } else {
                    int size = args.length;
                    String[] types = new String[size];
                    StringBuffer content = new StringBuffer(clazzName)
                            .append("/").append(methodName).append("(");
                    for (int i = 0; i < size; i++) {
                        types[i] = args[i].getClass().getName();
                        content.append(types[i]).append(":").append(args[i]);
                        if (i != size - 1)
                            content.append(",");
                    }
                    content.append(")");
                    channel.write(ByteBuffer
                            .wrap(content.toString().getBytes()));
                }
// 获取结果
                result = getresult();


                return result;
            }
        });


    }

3、获取返回结果

代码语言:txt
复制
private Object getresult() {
// 解析结果 如果结尾为null或NULL则忽略
        try {
            while (selector.select() > 0) {
                for (SelectionKey sk : selector.selectedKeys()) {
                    selector.selectedKeys().remove(sk);
                    if (sk.isReadable()) {
                        SocketChannel sc = (SocketChannel) sk.channel();
                        buffer.clear();
                        sc.read(buffer);
                        int postion = buffer.position();

                        String result = new String(buffer.array(), 0, postion);
                        result = result.trim();
                        buffer.clear();

                        if (result.endsWith("null") || result.endsWith("NULL"))
                            return null;


                        String[] typeValue = result.split(":");
                        String type = typeValue[0];
                        String value = typeValue[1];
                        if (type.contains("Integer") || type.contains("int"))
                            return Integer.parseInt(value);
                        else if (type.contains("Float")
                                || type.contains("float"))
                            return Float.parseFloat(value);
                        else if (type.contains("Long") || type.contains("long"))
                            return Long.parseLong(value);
                        else
                            return value;
                    }
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return null;
    }

完整代码可以关注公众号获取~

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、概述
  • 2、基础组件
    • 2.1、Channel
      • 2.2、Buffer
        • 2.3、Selector
        • 3、NIO 实现RPC通信
          • 3.1、RpcServer
            • 3.2、RpcClient
            相关产品与服务
            云服务器
            云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档