<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
package com.dance.netty.netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
/*
* 创建BossGroup 和 WorkerGroup
* 说明:
* 1. 创建连个线程组 bossGroup 和 workerGroup
* 2. bossGroup只处理连接请求(accept), 真正的和客户端业务处理, 会交给workerGroup完成
* 3. 两个都是无线循环
* 4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
* 默认: 实际CPU核数 * 2
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建服务器端的启动对象, 配置参数
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
/**
* 初始化通道
* 创建一个通道初始化对象(匿名对象)
* 给pipeline设置处理器
* @param socketChannel SocketChannel
* @throws Exception err
*/
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 设置 处理器
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}); // 给我们的workerGroup 的 EventLoop 对应的管道(pipeline)设置处理器
System.out.println("server is ready......");
// 绑定一个接口 并且同步 生成一个 ChannelFuture 对象
// 启动服务器(并绑定端口)
ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package com.dance.netty.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/**
* 说明:
* 1. 我们自定义一个handler 需要集成Netty 规定好的 HandlerAdapter
* 2. 这时我们自定义一个Handler 才能称为一个Handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取消息
* @param ctx 通道处理器上下文 含有管道 pipeline对象 地址
* @param msg 消息
* @throws Exception 异常
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("ctx = " + ctx);
// 将msg 转换为 byteBuf
// 这个是Netty的 ByteBuf 不是 NIO的ByteBuffer
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送的消息是: " + byteBuf.toString(StandardCharsets.UTF_8));
System.out.println("客户端地址: " + ctx.channel().remoteAddress());
}
/**
* 数据读取完毕
* @param ctx 通道处理器上下文
* @throws Exception 异常
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// write + flush
// 将数据写入到缓存并刷新
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello 客户端", StandardCharsets.UTF_8));
}
/**
* 处理异常
* @param ctx 上下文对象
* @param cause 异常
* @throws Exception 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 异常一般是关闭通道
ctx.close();
cause.printStackTrace();
}
}
package com.dance.netty.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 客户端需要一个事件循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
// 创建客户端启动对象
// 注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端 ok !");
// 启动客户端去连接服务端
// 关于 ChannelFuture 要分析, 涉及到Netty的异步模型
ChannelFuture sync = bootstrap.connect("127.0.0.1", 6668).sync();
// 给关闭通道进行监听
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
package com.dance.netty.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.StandardCharsets;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道被激活触发
* @param ctx 上下文
* @throws Exception 异常
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client ctx is " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello 服务器!", StandardCharsets.UTF_8));
}
/**
* 当有通道读取事件触发
* @param ctx 上下文
* @param msg 消息
* @throws Exception 异常
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务器回复的消息: " + byteBuf.toString(StandardCharsets.UTF_8));
System.out.println("服务器地址: " + ctx.channel().remoteAddress());
}
/**
* 异常
* @param ctx 上下文
* @param cause 异常
* @throws Exception 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
启动服务器端
启动客户端
服务器端输出
server is ready......
ctx = ChannelHandlerContext(NettyServerHandler#0, [id: 0x565d4ec5, L:/127.0.0.1:6668 - R:/127.0.0.1:51028])
客户端发送的消息是: Hello 服务器!
客户端地址: /127.0.0.1:51028
客户端输出
客户端 ok !
client ctx is ChannelHandlerContext(NettyClientHandler#0, [id: 0xd0a47750, L:/127.0.0.1:51028 - R:/127.0.0.1:6668])
服务器回复的消息: Hello 客户端
服务器地址: /127.0.0.1:6668