<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
public class ClientInitializer extends ChannelInitializer<SocketChannel>{
private CountDownLatch lathc;
public ClientInitializer(CountDownLatch lathc) {
this.lathc = lathc;
}
private ClientHandler handler;
@Override
protected void initChannel(SocketChannel sc) throws Exception {
handler = new ClientHandler(lathc);
sc.pipeline().addLast(new StringDecoder());//进行字符串的编解码设置
sc.pipeline().addLast(new StringEncoder());
sc.pipeline().addLast(new ReadTimeoutHandler(60));//设置超时时间
sc.pipeline().addLast(handler);
}
public String getServerResult(){
return handler.getResult();
}
public void resetLathc(CountDownLatch lathc) {
handler.resetLatch(lathc);
}
}
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CountDownLatch;
public class ClientHandler extends ChannelHandlerAdapter{
private CountDownLatch lathc;
private String result;
public ClientHandler(CountDownLatch lathc) {
this.lathc = lathc;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
result = (String) msg;
lathc.countDown();// 消息接收后释放同步锁,lathc是从Client加一传回来的
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
public void resetLatch(CountDownLatch lathc) {
this.lathc = lathc;
}
public String getResult() {
return result;
}
}
import java.util.concurrent.CountDownLatch;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;;
public class Client {//编写客户端单例模式方便系统调用
private static class SingletonHolder {
static final Client instance = new Client();
}
public static Client getInstance(){
return SingletonHolder.instance;
}
private EventLoopGroup group;
private Bootstrap b;
private ChannelFuture cf ;
private ClientInitializer clientInitializer;
private CountDownLatch lathc;
private Client(){
lathc = new CountDownLatch(0);
clientInitializer = new ClientInitializer(lathc);
group = new NioEventLoopGroup();
b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(clientInitializer);
}
public void connect(){
//192.168.43.51测试端口8766 192.168.43.102 线上端口8765
try {
this.cf = b.connect("127.0.01", 8888).sync();
System.out.println("远程服务器已经连接, 可以进行数据交换..");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
}
public ChannelFuture getChannelFuture(){
if(this.cf == null) {
this.connect();
}
if(!this.cf.channel().isActive()){
this.connect();
}
return this.cf;
}
public void close(){
try {
this.cf.channel().closeFuture().sync();
this.group.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String setMessage(String msg) throws InterruptedException{
ChannelFuture cf =getInstance().getChannelFuture();//单例模式获取ChannelFuture对象
cf.channel().writeAndFlush(msg);
//发送数据控制门闩加一
lathc = new CountDownLatch(1);
clientInitializer.resetLathc(lathc);
lathc.await();//开始等待结果返回后执行下面的代码
return clientInitializer.getServerResult();
}
public static void main(String[] args) throws Exception {
System.out.println(Client.getInstance().setMessage("123"));//测试等待数据返回
}
}
欢迎大家有问题和意见可以留言