现如今,我们使用通用的应用程序或库来相互通信。例如,我们经常使用HTTP客户端库从服务器上获取信息并通过web服务执行远程过程调用。但是,通用协议或它的实现有时并不能很好的伸缩。这就像我们不会使用通用HTTP服务器来交换大文件、电子邮件、还有像金融信息、游戏数据等实时信息。这些业务所需要的是高度优化实现协议,用于专门的目的。例如,您可能希望实现一个针对基于ajax的聊天应用程序、媒体流应用、大文件传输进行优化的http服务器。您甚至可能想要设计并实现一个完全符合您的需求的新协议。另一个不可避免的情况是,你不得不去处理一个遗留的专有协议,来保证和旧系统的互操作性。在这些情况下,重要的是在不牺牲最终应用程序的稳定性和性能的前提前,如何尽可能快的实现该协议。
Netty项目致力于提供一个异步事件驱动的网络应用框架和工具,以便快速开发可维护的高性能和高扩展性协议的服务器和客户端。
换句话说,Netty是一个NIO服务器客户端框架,它支持快速简单的开发协议服务器和客户端等网络应用程序。它极大的简化和流线化了网络开发(例如TCP和UDP安全套接字服务器开发)。
快速和简单并不意味着最终的应用程序会出现可维护性和性能问题。Netty经过精心设计,从许多已实现协议(例如FTP、SMTP、HTTP)和众多二进制和基于文本的协议中吸取经验。最终,Netty在不妥协的前提下成功找到一个方法来实现简单的开发、高性能、稳定、灵活的应用。
一些开发者可能已经找到其他声称具有相同优势的网络应用框架,那么你可能会问Netty与它们有什么不同。答案是它所基于的哲学,Netty旨在从第一天起就为你提供最舒服的API和实现体验。它并不是什么有形的东西,但是当你阅读这份指南并使用Netty时你会发现:这个哲学将使你的生活更加的简单。
本章将围绕Netty的核心构造和简单的案例来让你快速入门。在本章结束时,你将能够立即在Netty上编写服务器和客户端。
如果你喜欢自上而下的学习方法,你可能需要从第二章:结构概览开始,然后在回到这里。
运行本章中的案例有两点最低要求:Netty和JDK 1.6或以上的最低版本。Netty最新版本可以在项目下载页下载。为了下载和JDK版本相匹配的版本,请参考您首选的jdk运营商网站。
当你阅读的时候,你可能会对本章中介绍的一些类有一些疑问,当你想要了解更多关于这些类的信息的时候,你可以参考API。为了方便,文中的所有类都有链接到显示API文档。并且,如果哪里有不正确的信息、错误的语法或错别字或你有一些好的建议来提高这篇文档,请毫不犹豫的联系Netty项目社区让我们知道。
世界上最简单的协议不是“Hello,World!”而是“丢弃”,它是一种丢弃任何接收到的数据而没有任何响应的协议。
想要实现“丢弃”协议,你唯一要做的一件事就是忽略所有接收到的数据。让我们直接从处理器实现者开始,它处理Netty生成的I/O事件。
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
到目前还好,我们已经实现了丢弃服务器的前半部分,剩下的就是写main()方法来启动丢弃服务器处理器这个服务器。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
}
恭喜您,你刚刚完成了Netty上的第一个服务器。
现在我们已经编写了第一个服务器,我们需要测试它是否真的可以工作。测试的最简单的方法就是使用telnet命令,例如,你可以在命令行界面输入 telnet localhost 8080,回车,然后再输入一些其他的内容。
但是,我们能说这个服务器是正常工作的吗?我们并不能真正知道,因为这是一个丢弃服务器,你根本就不会得到任何响应。为了证明它真的工作,我们来修改下服务器让它打印它接收到的数据。
我们已经知道当数据到达后channelRead方法会被调用。让我们向DiscardServerHandler的channelRead方法中添加下面这些代码。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
丢弃服务器的完整源码位于发行版的io.netty.example.discard包中。
到目前为止,我们消费的数据完全没有应答。但是,一个服务器,通常都是用来响应请求的。现在我们来学习如何通过实现应答协议来向客户端响应消息,将接收到的数据发回原处。
应答服务器与我们在前面的章节中实现的丢弃服务器的唯一区别是:它将接收到的数据发回原处,而不是打印到控制台。因此,仅需要修改channelRead方法就可以了。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
如果你再次运行telnet命令,你会看到服务器将你发送的任何消息返回。
应答服务器的完整源码的位置在发行版的io.netty.example.echo包里。
本节要实现的协议是时间协议。它与前面的例子不同,它会发送一个32位整数的消息,不接收任何请求,并在发送消息之后关闭连接。在这个例子中,你会学到如何构造并发送一个消息,当发送完成后关闭连接。
因为我们会忽略任何接收到的数据,并且,当一个连接建立后,尽可能快的发送一个消息,因而这次我们不能使用channelRead方法了。替代的是覆写channelActive()方法。具体实现如下:
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
但是,等等,flip在哪呢?在NIO中,我们发送消息之前不是要调用java.nio.ByteBuffer.flip()方法的吗?ByteBuf并没有这样的方法,因为它有两个指针:一个用于读操作,一个用于写操作。当你往ByteBuf写入数据的时候写索引增加,但是读索引并未改变。读索引和写索引各自代表着消息的读写位置。
相比之下,如果不调用flip方法,NIO buffer并没有提供一种清晰发方法来确定消息内容的开始位置和结束位置。当你忘记反转(flip)buffer的时候你会困惑,因为错误数据甚至空数据会被发送。这样的错误在Netty中不会发生,因为对于不同类型的操作(读和写)有不同的指针。当你使用它后,你会发现他使得你的生活非常简单——一个没有反转的生活。
另一点需要注意的是,ChannelHandlerContext.write()(包括writeAndFlush())方法返回一个ChannelFuture。一个ChannelFuture代表一个还未发生的I/O操作。这意味着,由于Netty是异步操作,所以可能还没有对请求执行任何处理操作。
例如,下面的代码可能在消息到来之前就把连接关闭了。
Channel ch = ...;
ch.writeAndFlush(message);
ch.close();
因此,你需要在ChannelFuture完成之后再调用close()方法,ChannelFuture是write()方法返回的,当写操作完成后,它通知它的监听器。请注意,close()方法也可能不会立即关闭,它也返回一个ChannelFuture方法。
或者,你可以使用预定义的监听器简化代码:
f.addListener(ChannelFutureListener.CLOSE);
为了测试我们的时间服务器像我们期望的那样工作,你可以使用UNIX命令 rdate:
$ rdate -o <port> -p <host>
<port>标签替换成main()方法中指定的端口号,<host>标签一般使用localhost。
不同于丢弃服务器和应答服务器,我们需要一个时间协议的客户端,因为用户不可能让用户将一个32位二进制数据转化为一个时间。在本节中,我们学习如何确定服务端是否正常工作和如何编写一个Netty客户端。
Netty客户端和服务端的最大区别也是唯一区别就是它们使用不同的Bootstrap(启动器)和Channel(通道)实现。请看下面的代码:
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
正如你所能看到的,它真的和服务端的代码不一样。ChannelHandler的实现应该是怎样的呢?它应该从服务端接收32位整数,然后转换为用户可读的格式,并打印转换后的时间,然后关闭连接:
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
它看起来非常简单,并且和服务端示例没有任何不同。但是,这个处理器有时会拒绝工作,并抛出一个IndexOutBoundsException异常。我们在后续的章节中研究为何会这样。
在像TCP/IP那样基于流的传输中,接收到的数据被存入一个套接字接收缓冲区中。不幸的是,基于流的传输所传输的缓冲不是数据包队列,而是字节队列。也就是说,即使你发送的两条消息是独立的数据包,操作系统不会将它们当做两条消息,而是仅将它们当做一组字节处理。因此,并不能保证你读到的内容与远程对等点写的内容完全一致。例如,让我们假设操作系统的TCP/IP栈已经接收到三个数据包:
因为这是基于流的协议的一般属性,在您的应用程序中,很有可能以以下片段形式阅读它们:
因此,接收到的部分,不管是服务端还是客户端,都应该将接收到的数据整理成一个或多个有意义的数据帧,以便应用程序能够轻松理解。在上面的例子种,接收到的数据应该像下面的帧:
现在让我们回到时间客户端的例子中,例子中有着同样的问题。一个32位的整数是非常小的数据,通常不会被碎片化。但是,问题是,它还是可以被碎片化,随着流量的增加,碎片化的可能性也会增加。
最简单的解决方法就是创建一个内部累积缓冲,一直等待到4byte数据全部被接受近内部缓冲里。下面是时间客户端处理器实现的修改,这样就解决了这个碎片化问题:
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
尽管第一种方法解决了时间客户端的问题,修改后的处理器看起来不是很整洁。设想一个更复杂的协议,它由多个字段(例如可变长度的字段)组成。你的ChannelInboundHandler实现很快会变得无法维护。
正如你注意到的,你可以给通道管道线添加不止一个ChannelHandler(通道处理器),因此,你可以将单个ChannelHandler拆分成多个模块处理程序,以降低应用程序的复杂性。例如,你可以将TimeClientHandler拆分成两个处理器:
幸运的是,Netty提供了扩展类,帮助你编写第一个开箱即用的类:
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
既然我们有另一个处理器可以插入ChannelPipeline(通道管道线)中,我们应该在时间客户端中修改ChannelInitializer的实现了:
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
如果你是一个喜欢探索的人,你可能想尝试使用ReplayingDecoder,它极大简化了解码器。更多信息你可以参考API文档。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
此外,Netty提供了开箱即用的解码器可以让你非常容易的实现更多的协议,并且帮助你避免最终得到不可维护的单一处理器实现,更多详细的案例可以参考下面包里的案例:
到目前为止,我们看到的所有案例的协议消息使用的数据结构都是ByteBuf。在本节中,我们会通过使用POJO替代ByteBuf来改进时间协议的客户端和服务端案例。
在你的通道处理器(ChannelHandler)中使用POJO的好处是很明显的。通过将ByteBuf信息代码从处理器的中抽取出来,你的处理器变得可维护和重用。在时间客户端和服务端例子中,我们直接使用ByteBuf来读取32位整数并没有什么问题,但是,你会发现,在实现实际协议时,有必要进行分离。
首先,我们来定义一个新的类,叫做:UnixTime
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
我们现在可以修改时间解码器来生成一个UnixTime而不是ByteBuf.
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
由于修改了解码器,时间客户端处理器不在需要使用ByteBuf:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
非常简单而优雅,是不是?同样的技术可以应用在服务端。这次我们首先更新时间服务器处理器:
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
现在,唯一缺少的就是编码器了,它是ChannelOutboundHandler的实现,用来将UnixTime转成ByteBuf。相比较编写一个解码器,编写编码器非常简单,因为当编码一个消息时不需要处理数据包的碎片化和装配。
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
关闭一个Netty应用通常就像通过shutdownGracefully()关闭你所创建的所有EventLoopGroup一样简单。它将返回一个Future,当EventLoopGroup完全被终止,并且所有的通道已被关闭,它将通知你。
在本章中,我们简要介绍了Netty,并演示了如何在Netty上编写一个完整工作的网络应用程序。
在下面的章节中将有更多关于Netty的详细信息。我们也鼓励你复习io.netty.example包中的Netty案例。
请注意,社区永远都期待你的问题和建议,你的反馈可以帮助你理解Netty也帮助改进Netty和它的文档。
本文系外文翻译,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系外文翻译,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。