在讲述Netty的高性能特性之前,基于之前的epoll技术分析中谈到C10K问题,其实是属于一个性能优化问题,目的是为了能够在单台机器上支撑更多的并发连接调度所做的性能优化,为了达到上述目标,需要要求我们设计的web服务采用合理的IO模型,并在对应的IO模型基础上引入多线程与并发库技术的使用来支撑更多的连接调度,同时考虑到计算机资源的限制,我们需要在设计web服务的时候合理对资源进行分配优化,比如内存,网络带宽以及CPU核数的充分利用,也就是说我们还需要考虑到可伸缩性的问题,通过增加资源来使得我们的web服务能够得到线性提升效果.接下来我们就来结合部分源码分析Netty技术是如何体现高性能这一个特性.
C10K&C10M问题分析
C10K问题
C10K,即要让单个web服务能够支撑1w的并发连接请求的资源调度,关于C10K的性能与可伸缩性问题,基于自身的理解如下:
因此,优化一个C10K的问题可以从以下几个方面考虑:
C10M问题
随着互联网技术发展,又提出了一个C10M的优化问题,即如何让我们的单台机器支撑1000w的并发连接,这个时候Errata Security首席执行官Robert Graham从历史的角度出发讲述Unix最开始设计不是通用的服务器OS,而是作为电话网络的控制系统,实际上是电话网络在控制数据传输,因而控制平面与数据平面存在清晰的分隔,于是指出一个问题,即当前我们使用的Unix服务器是作为数据平面的一部分,这也是他所说的内核不是解决方案,而是问题所在,什么意思呢?就是不要让内核承担所有繁重的工作.将数据包处理,内存管理和处理器调度从内核中移出,并将其放入应用程序中,可以在其中高效地完成它.让Linux处理控制平面,让应用程序处理数据平面.因而可以从以下几个方面来了解一个C10M问题面临的挑战:
为了构建一个能够支撑1000w/s的并发连接系统,我们需要让数据平面系统能够处理1000w/s个数据包,而对于一个控制平面系统而言,持续10s的最多也就只能处理100w个并发连接,为了实现这个目标,我们借鉴C10K问题的解决方案,C10K问题主要是从构建一个可伸缩性的IO模型的web服务来达到支撑10K并发连接的目的,同时也引入线程模型与性能优化手段来配合实现达到目的,从这里我们也可以看到可伸缩性是我们设计的目标,同时为了支撑1000w的连接,我们不能将性能优化外包给操作系统,那么我们要编写一个可伸缩性的软件来达到上述的目标,这个时候就需要解决以下的问题:
因此,我们可以借鉴C10K与C10M的优化思路来推导一个具备高并发且可伸缩性的web服务设计,高并发连接调度我们可以从IO模型以及线程模型思考,高性能的指标我们可以从计算机资源分配管理与资源优化方面思考(比如内存/无锁编程),而一个可伸缩性的web服务,我们则会从资源方面来考虑,通过增加相关的资源配置是否能够得到线性的性能提升.
高并发关注指标
并发连接/QPS/TPS
基于上述的高并发指标的理解,现将并发连接/QPS/TPS的区分通过以下图解的方式展开:
IO与线程模型实现高并发连接调度
结合之前的高性能IO编程文章以及C10K与C10M问题,我们可以考虑设计一个高性能的Web服务可以从以下几个方面思考:
数据包的收发
应用程序的处理能力
对于处理处理能力,我们可以用一个词来说明,那就是吞吐量,既然想要提升吞吐量,那么我们的目标其实也是很明确的,即“快速响应”.
Netty高并发机制
在Netty技术中主要是采用NIO实现多连接的单线程复用机制以及借助多线程异步处理方式来提升支撑并发连接调度的处理能力,在C10M问题中已经指出,为了优化C10M问题,我们应该考虑在应用程序方面去设计数据平面系统来构建一个支撑1000W/s并发连接的调度处理机制.
关于Netty框架的线程模式架构设计图如下所示:
现在我们基于宏观上对Netty的线程模型有一个基本认知之后,结合先前文章对Netty组件源码以及事件流程的分析可知,在Netty中存在EventLoopGroup,通过EventLoopGroup来分配EventLoop,而每一个EventLoop既具备线程池的功能又承担着事件轮询的工作,同时每个EventLoop都分配对应的一个FastThread专有线程来负责对处理当前EventLoop的pipeline的流水工作,由于每一个启动EventLoop都绑定专有的一个线程FastThread,那么对于EventLoop处理的一系列流水工作也将会在当前的线程执行,从而保证了单线程资源无竞争高效串行化流水任务的执行,简单点就是无锁串行化地进行流水工作,这个在我们上述讲到的C10M优化方案中体现的一个多核扩展问题,Netty框架很好地运用这一理念来提升我们web服务支撑高并发连接的调度处理.
关于Netty处理的单线程无锁串行化的流水工作流程示意图如下:
在了解上述的无锁串行化任务执行流程之后,我们还需要关注Netty另一个问题,即在多Reactor模式中,我们看到服务端channel其实只完成一次创建,初始化以及注册,相比客户端channel,提供给客户端的EventLoopGroup由于在客户端有新连接进来的时候就会在Acceptor进行注册,同时我们也分析channel的注册流程,注册的时候会在EventLoopGroup根据选举策略分配一个EventLoop来完成channel到EventLoop的绑定,对此,我们知道对于客户端channel而言,EventLoopGroup的作用是类似于我们分布式的“集群”机器服务来对外提供服务的,分担高并发的连接压力,那么对于服务端channel而言呢,提供EventLoopGroup如果指定的线程数量大于1,这个时候EventLoopGroup又起到什么作用呢?其实对于服务端的channel,我们很多时候并不仅仅是处理连接的接收,还要在处理连接之前做一些鉴权校验抑或是风控等安全措施的处理,如果这些过程会比较耗时,那么就需要在我们处理的handler上添加从EventLoopGroup选举一个新的EventLoop事件轮询活动来缓解我们并发连接调度处理能力,其实说到底EventLoopGroup还是类似于分布式系统中的“集群”来缓解并发调度的压力,
于是基于上述的分析,我们对Netty支撑高并发采用的技术手段总结如下:
高性能的ByteBuf
对于linux操作系统读取数据块一般流程是:先从硬件设备将数据块加载数据到内核缓冲区,然后由内核将内核缓冲区的数据复制到用户空间的缓冲区,最后唤醒应用程序读取用户空间的缓冲区,对于Java程序而言,其无法直接操作OS系统内存区域,必须通过JVM堆申请内存区域来存放数据块,于是需要再从OS内存中的数据缓冲区将数据块复制到JVM堆中才能够进行操作数据,于是对于JVM操作socket的数据包,数据包拷贝的路径如下图示:
网卡设备接收到数据包流量事件,内核将数据块加载到内核缓冲区中,并且通过socket传输数据到用户空间的缓冲区,最后JVM要操作socket缓冲区的数据,需要将其读取到JVM堆中存储,这个时候需要再JVM堆中申请一个内存区域用于存放数据包数据,而如果直接通过堆外内存读取数据,则可以减少一次数据的拷贝以及内存资源的损耗,如下图所示:
Netty的堆外内存操作通过底层操作系统Unsfe的方式获取其内存位置来直接操作内存,相比使用堆内存分配更为高性能便利,同时也减少了数据拷贝,直接通过Unsafe指向的堆外内存引用来进行操作.
// 假设buffer1以及buffer2都存储在堆外内存,堆内内存同理(只是在JVM中)
ByteBuf httpHeader = buffer1.silice(OFFSET_PAYLOAD, buffer1.readableBytes() - OFFSET_PAYLOAD);
ByteBuf httpBody = buffer2.silice(OFFSET_PAYLOAD, buffer2.readableBytes() - OFFSET_PAYLOAD);
// 逻辑上的复制,header与body仍然存储在原有的内存区域中,http为JVM在堆中创建的对象,指向一个逻辑结构上的ByteBuf
ByteBuf http = ChannelBuffers.wrappedBuffer(httpHeader, httpBody);
上述的零拷贝机制示意图如下:
这个时候在应用程序中可以直接通过http的ByteBuf操作合并之后的header+body的ByteBuf缓冲区,http的byteBuf是属于逻辑上的合并,实际上并没有发生数据拷贝,只是在JVM中创建一个http的ByteBuf引用指向并操作合并之后的bytebuf.
核心源代码
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
// > 4M
// cap / 4 * 4 + 4
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
// < 4M 进行以2的倍数进行增长
// 2096,此时分配的内存为3072byte
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
通过上述源码可知:
newCapacity = capacity/4*4+4
进行计算在ByteBuf添加引用计数能够计算当前对象持有的资源引用活动情况,通常以活动的引用计数为1作为开始,当引用计数大于0的时候,就能够保证对象不会被释放,当引用计数减少到0的时候说明当前对象实例就会被释放,将会被JVM的GC进行回收,对于池化技术而言则是存放到内存池中以便于重复利用.因此使用池化技术的PooledByteBufAllocator
而言,使用引用计数能够降低内存分配的开销,有助于优化内存使用和性能的提升.
// ByteBuf.java
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>{
boolean isAccessible() {
return refCnt() != 0;
}
}
//AbstractByteBuf.java
public abstract class AbstractByteBuf extends ByteBuf {
// 对于引用计数为0的实例将无法访问,会抛出异常IllegalReferenceCountException
protected final void ensureAccessible() {
if (checkAccessible && !isAccessible()) {
throw new IllegalReferenceCountException(0);
}
}
}
//ReferenceCounted.java
public interface ReferenceCounted {
// 调用retain(increament) 将会增加引用计数increament
// 调用release(increament)将会减少引用计数increament
}
// 对于入站事件,如果当前消费入站数据并且没有事件进行传播的话,那么就需要手动释放资源
public void channelRead(ChannelHandlerContext ctx, Object msg){
// ...
// not call fireChannelRead,事件传播在当前handler终止,这个时候需要手动清除
ReferenceCountUtil.release(msg);
// SimpleChannelInboundHandler能够手动清除,但是一般入站事件我个人习惯用ChannelInboundHandlerAdapter并且自己手动管理,方法单一,处理简单,可以手动管理,同理出站事件也是用Adapter
}
// 对于出站事件,如果当前需要对非法消息采取丢弃操作,则也需要手动进行处理释放资源
public void channelWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise){
ReferenceCountUtil.release(msg);
promise.setSuccess(); // 丢弃消息意味着不会将数据传输到出站事件的责任链上,这个时候FutureListener无法监听到消息处理情况,需要手动通知处理结果
}
## 关于监控类的级别详细可查看Netty类下的ResourceLeakDetector
## 通过java配置并执行可以查看资源泄漏情况以及输出报告
java -Dio.netty.leakDetection.level=ADVANCED
内存分配策略
// buf = directArena.allocate(cache, initialCapacity, maxCapacity);
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
// 创建池化的ByteBuf
// 核心处理流程: 先从线程缓存获取栈,从栈获取buf,如果不存在则将创建ByteBuf并存储栈中,最后更新栈数据并一并更新到到线程的cache中最后返回的时候会重置buf的引用计数器
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
// 从内存中申请资源存储数据
allocate(cache, buf, reqCapacity);
return buf;
}
// 分配策略
// PoolArena.java
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 计算合适的一个区域
// 比如现在申请一个资源为19byte,则会为其创建一个2的临近整数方,这个时候会分配一个32byte的数据
final int normCapacity = normalizeCapacity(reqCapacity);
// 申请的容量小于8kb
if (isTinyOrSmall(normCapacity)) {
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
// 容量小于512byte
if (tiny) {
// 从缓存中获取
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
// 分配的容量大于512byte小于8kb
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
// 如果线程缓存没有数据则直接申请OS内存资源,存在竞争,需要同步加锁保证线程安全
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
// 并发原子操作更新数据
incTinySmallAllocation(tiny);
return;
}
// 容量大于等于8kb小于16M
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// > 16M,直接从操作系统中申请资源并且不做缓存和池化处理,于是不会添加到arena中
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
// 最后使用ByteBuf进行回收添加到cache中
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
// 可以看到这里将数据添加到线程缓存中
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass, nioBuffer, false);
}
}
至此,Netty从分配到回收一个池化的ByteBuf工作流程如下:
Netty内存分配逻辑结构视图
上述的Tiny MemoryRegionCache对应于TinySubPageCache,Small MemoryRegionCache对应于SmallSubPageCache,而Normal MemoryRegionCache对应于NormalCache.
高效的程序处理能力
// NioEventLoop.java
// 仅摘录部分代码
static{
// 可配置select的循环次数,当网络数据包一直不可达的时候,通过次数控制减少当前selector不断无结果的空轮询,一旦超过次数将会重建selector,将原有的selector关闭,避免cpu飙升.
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
}
void run(){
for(;;){
try{
select();
}catch(){}
selectCnt++;
// 处理就绪事件
processSelectedKeys();
// 执行任务
ranTasks = runAllTasks();
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug(...);
selectCnt = 0;
// unexpectedSelectorWakeup处理空轮询
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
}
}
// unexpectedSelectorWakeup方法
// 解决空轮询根据轮询次数判断是否重建selector,丢弃原有的selector,降低CPU的负载
private boolean unexpectedSelectorWakeup(int selectCnt) {
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 超过一定的次数之后重建selector,如何重建这里不贴代码
rebuildSelector();
return true;
}
return false;
}
基于事件轮询器的源码与线程模型可知,分配给每个EventLoop的专属线程都会负责处理select之后的就绪事件集合以及所有在阻塞队列中的任务,且线程与EventLoop通过FastThreadLocal进行绑定,也就是说所有事件的处理与任务的执行都是处于一个线程中,从而保证事件处理与任务处理都是保持在同一个线程中,同时为了保持一个channelHandler实例能够共享于多个pipeline中,需要通过注解@Shareble方式来保证线程安全.于是对于Netty处理的任务还是channelHandler下的完成事件处理都是能够得到线程安全的保证,于是对于无锁串行化的描述如下图:
在先前的高性能IO设计一文中有说到,在资源竞争的环境下,使用并发库甚至是无锁编程能够提升程序的性能,避免锁的争抢与等待.
C10K & C10M
高性能
基于可伸缩性的jemalloc算法
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有