前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java AIO 异步IO应用实例

Java AIO 异步IO应用实例

作者头像
WindWant
发布2020-09-11 10:42:08
6420
发布2020-09-11 10:42:08
举报
文章被收录于专栏:后端码事

项目地址:https://github.com/windwant/windwant-demo/tree/master/io-service

Server:

代码语言:javascript
复制
package org.windwant.io.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;

/**
 * AsynchronousServerSocketChannel
 */
public class AIOServer implements Runnable{

    private int port = 8889;
    private int threadSize = 10;
    protected AsynchronousChannelGroup asynchronousChannelGroup;

    protected AsynchronousServerSocketChannel serverChannel;

    public AIOServer(int port, int threadSize) {
        this.port = port;
        this.threadSize = threadSize;
        init();
    }

    private void init(){
        try {
            asynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 10);
            serverChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup);
            serverChannel.bind(new InetSocketAddress(port));
            System.out.println("listening on port: " + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void run() {
        try{
           if(serverChannel == null) return;
            serverChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel, AIOServer>() {
                final ByteBuffer echoBuffer = ByteBuffer.allocateDirect(1024);

                public void completed(AsynchronousSocketChannel result, AIOServer attachment) {
                    System.out.println("==============================================================");
                    System.out.println("server process begin ...");
                    try {
                        System.out.println("client host: " + result.getRemoteAddress());
                        echoBuffer.clear();
                        result.read(echoBuffer).get();
                        echoBuffer.flip();
                        System.out.println("received : " + Charset.defaultCharset().decode(echoBuffer));

                        int random = ThreadLocalRandom.current().nextInt(5);
                        printProcess(random);
                        System.out.println("server deal request execute: " + random + "s");

                        String msg = "server test msg-" + Math.random();
                        System.out.println("server send data: " + msg);
                        result.write(ByteBuffer.wrap(msg.getBytes()));
                        System.out.println("server process end ...");
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    } finally {
                        attachment.serverChannel.accept(attachment, this);// 监听新的请求,递归调用。
                    }

                }

                public void failed(Throwable exc, AIOServer attachment) {
                    System.out.println("received failed");
                    exc.printStackTrace();
                    attachment.serverChannel.accept(attachment, this);
                }
            });
            System.in.read();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    private void printProcess(int s) throws InterruptedException {
        String dot = "";
        for (int i = 0; i < s; i++) {
            Thread.sleep(1000);
            dot += ".";
            System.out.println(dot);

        }
    }

    public static void main(String[] args) throws IOException {
        new Thread(new AIOServer(8989, 19)).start();
    }
}

Client:

代码语言:javascript
复制
package org.windwant.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

/**
 * AsynchronousSocketChannel
 */
public class AIOClient implements Runnable{

    private AsynchronousSocketChannel client;
    private String host;
    private int port;
    public AIOClient(String host, int port) throws IOException {
        this.client = AsynchronousSocketChannel.open();
        this.host = host;
        this.port = port;
    }

    public static void main(String[] args) {
        try {
            new Thread(new AIOClient("127.0.0.1", 8989)).start();
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public void run() {
        client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
            public void completed(Void result, Object attachment) {
                    String msg = "client test msg-" + Math.random();
                    client.write(ByteBuffer.wrap(msg.getBytes()));
                    System.out.println("client send data:" + msg);
            }

            public void failed(Throwable exc, Object attachment) {
                System.out.println("client send field...");
            }
        });

        final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        client.read(byteBuffer, this, new CompletionHandler<Integer, Object>() {
            public void completed(Integer result, Object attachment) {
                System.out.println(result);
                System.out.println("client read data: " + new String(byteBuffer.array()));
            }

            public void failed(Throwable exc, Object attachment) {
                System.out.println("read faield");
            }
        });
    }
}

2017-12-11 改造client: AsynchronousChannelGroup

代码语言:javascript
复制
package org.windwant.io.aio;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
 
/**
 * AsynchronousSocketChannel
 */
public class AIOClient implements Runnable{
 
    private AsynchronousChannelGroup group;   //异步通道组 封装处理异步通道的网络IO操作
    private String host;
    private int port;
    public AIOClient(String host, int port) {
        this.host = host;
        this.port = port;
        initGroup();
    }
 
    private void initGroup(){
        if(group == null) {
            try {
                group = AsynchronousChannelGroup.withCachedThreadPool(Executors.newFixedThreadPool(5), 5); //使用固定线程池实例化组
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
 
 
    private void send(){
        try {
            //异步流式socket通道 open方法创建 并绑定到组 group
            final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group);
            //连接
            client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
                public void completed(Void result, Object attachment) {
                    String msg = "client test msg-" + Math.random();
                    client.write(ByteBuffer.wrap(msg.getBytes()));
                    System.out.println(Thread.currentThread().getName() + " client send data:" + msg);
 
                    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    client.read(byteBuffer, this, new CompletionHandler<Integer, Object>() {
                        public void completed(Integer result, Object attachment) {
                            System.out.println(Thread.currentThread().getName() + " client read data: " + new String(byteBuffer.array()));
                            try {
                                byteBuffer.clear();
                                if (client != null) client.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
 
                        public void failed(Throwable exc, Object attachment) {
                            System.out.println("read faield");
                        }
                    });
                }
 
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("client send field...");
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    public void run() {
        for (int i = 0; i < 100; i++) {
            send();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
    @Override
    protected void finalize() throws Throwable {
        super.finalize();
        group.awaitTermination(10000, TimeUnit.SECONDS);
    }
 
    public static void main(String[] args) {
        try {
            new Thread(new AIOClient("127.0.0.1", 8989)).start();
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
 
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016-10-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档