前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入浅出Tomcat网络通信的高并发处理机制

深入浅出Tomcat网络通信的高并发处理机制

原创
作者头像
菜菜的后端私房菜
发布2024-08-06 09:08:11
3041
发布2024-08-06 09:08:11
举报
文章被收录于专栏:深入浅出Tomcat(Web服务器)

深入浅出Tomcat网络通信的高并发处理机制

随着互联网应用的快速发展,Web服务器面临的访问压力日益增大,如何高效处理高并发的网络请求成为关键

Tomcat作为Java世界中最受欢迎的Web容器之一,可以灵活选择不同的IO模型来处理网络通信,确保面对高并发的网络请求时能够快速处理

上篇文章21张图解析Tomcat运行原理与架构全貌,我们说到Tomcat中通过Connector来处理网络通信,其中Connector的职责主要由组件EndPoint、Processor、Adapter来完成

EndPoint负责网络通信、Processor负责解析、Adapter负责将请求转换为Servlet的请求并交给容器处理

image.png
image.png

本篇文章就来重点聊聊AbstractEndpoint的多种实现类是如何处理网络通信的

AbstractEndpoint有三种实现类:NioEndPoint、Nio2EndPoint、AprEndPoint

其中默认使用NioEndPoint(多路复用模型),Nio2EndPoint使用异步IO模型,而AprEndPoint为早期提供高性能(Tomcat 10时被弃用)

NioEndPoint

NioEndPoint将处理网络通信分离为三个步骤,分别使用三个组件进行执行:接收连接、检测IO事件、处理请求

image.png
image.png

我们先大致对这些组件的作用进行描述,后续再通过源码分析~

Acceptor用于接收连接(循环执行):使用LimitLatch限制最大连接数量,等待客户端完成TCP三次握手连接后,将连接交给Poller

Poller用于检测IO事件是否就绪(循环执行):将连接注册到Selector上,使用Selector监听IO事件,当事件发生(读就绪)时交给Executor进行处理

Executor池化管理线程,使用线程执行后续流程(解析请求、封装适配、交给容器处理...)

Acceptor

Acceptor使用LimitLatch限制连接数量,收到连接后将其逐步封装为PollerEvent并放到Poller的队列中

NioEndpoint.initServerSocket 在启动组件时初始化socket

代码语言:java
复制
//服务端Channel
ServerSocketChannel serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
//服务端channel绑定端口,getAcceptCount为连接上限
serverSock.bind(addr, getAcceptCount());

ServerSocketChannel作为服务端Channel只监听一个端口(连接器中设置的端口)

无论Acceptor线程数量为多少,都共享该服务端channel

getAcceptCount() 为建立连接后最大积压的数量,acceptCount默认为100

TCP三次握手完成后会将客户端连接放到accept队列中等待服务端拿走,acceptCount就是accept队列最大存放的连接数量

Acceptor.run Acceptor接收连接

为了简化流程,只保留了较重要的流程:

  1. 使用LimitLatch限制连接数量,如果达到最大值则等待
  2. 等待获取客户端连接socket channel(TCP三次握手完成)
  3. 把socket channel交给Poller处理
代码语言:java
复制
public void run() {
    
    //...
    while (!stopCalled) {
    	//1.使用limit latch限制连接数量,如果达到最大值则等待
        endpoint.countUpOrAwaitConnection();

        U socket = null;
                
        //2.等待获取客户端socket channel
        socket = endpoint.serverSocketAccept();
                
        //3.交给Poller处理
        endpoint.setSocketOptions(socket);             
    } 
}

对于实现限制连接数量,当达到最大值时则进行等待,实现的关键为计数、等待

熟悉并发包的同学会立马想到信号量Semaphore,但Tomcat没有直接使用信号量组件,而是基于AQS自己实现同步组件LimitLatch

LimitLatch直接使用原子类进行计数,利用AQS来实现等待

(LimitLatch基于AQS实现比较简单,这里就不进行分析,不熟悉AQS的同学可以查看这篇文章:10分钟从源码级别搞懂AQS

image.png
image.png

如果没有超过限制则会获取下一个完成连接(TCP三次握手)的客户端连接SocketChannel result = serverSock.accept();

并将Channel包装成SocketWrapper,再包装为PollerEvent,加入Poller的队列中

代码语言:java
复制
public void register(final NioSocketWrapper socketWrapper) {
    //监听读事件(读就绪时poller能够继续处理)
    socketWrapper.interestOps(SelectionKey.OP_READ);
    //包装为PollerEvent 指定关心注册事件OP_REGISTER(后续poller将该通道注册到select上)
    PollerEvent pollerEvent = createPollerEvent(socketWrapper, OP_REGISTER);
    //放入poller的队列
    addEvent(pollerEvent);
}

Poller的队列SynchronizedQueue也是Tomcat自己实现的

Acceptor与Poller之间通过队列通信,SynchronizedQueue使用synchronized保证并发操作下的原子性

Poller

Poller循环处理队列中的PollerEvent事件,当Selector上监听的连接发生IO事件时迭代处理,将事件封装为SocketProcessor交给线程池处理

Poller.run

Poller主要循环检测是否有IO事件发生,主要流程为:

  1. 轮询处理队列中的事件PollerEvent,比如将通道注册到Selector上
  2. Selector阻塞到事件发生或超时
  3. 迭代遍历处理事件,交给线程池处理
代码语言:java
复制
public void run() {
    while (true) {
        //...
        
        //1.轮询处理队列中的事件
    	events();
        
    	//2.select 阻塞直到事件发生
        selector.select(selectorTimeout);
        
        //3.迭代遍历处理事件
        Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
		while (iterator != null && iterator.hasNext()) {
            SelectionKey sk = iterator.next();
            iterator.remove();
            //从附件中拿到连接的包装
            NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
            if (socketWrapper != null) {
                //交给线程池处理
                processKey(sk, socketWrapper);
            }
       }                
                
    }   
}

交给线程池处理前会将其封装为SocketProcessor

AbstractEndpoint.processSocket

代码语言:java
复制
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
    SocketProcessorBase<S> sc = null;
    if (processorCache != null) {
        sc = processorCache.pop();
    }
    //封装SocketProcessor
    if (sc == null) {
        sc = createSocketProcessor(socketWrapper, event);
    } else {
        sc.reset(socketWrapper, event);
    }
    
    Executor executor = getExecutor();
    if (dispatch && executor != null) {
        //交给线程池执行
        executor.execute(sc);
    } else {
        //当前线程执行(使用AIO时走这里)
        sc.run();
    }
}

该方法用于封装SocketProcessor和调用后续执行,放于父类中,所有实现类通用该方法

Executor

线程池中的线程执行SocketProcessor时会去交给ProtocolHandler处理 getHandler().process(socketWrapper, event)

SocketProcessor中存在连接等包装组件,通过它能够处理请求(读取数据)和响应(写回数据)

需要注意的是Tomcat中的线程池是自己实现的,而不是并发包下的ThreadPoolExecutor

(对于Tomcat的线程池,我们后续文章再进行分析)

NioEndPoint大致的运行流程如下图:

image.png
image.png

连接的包装类(NioSocketWrapper)为核心贯穿全文,封装流程如下:

Acceptor接收连接:NioChannel(拿到客户端连接) -> NioSocketWrapper -> PollerEvent(放入Poller的SynchronizedQueue)

Poller处理PollerEvent注册到Selector,并阻塞监听IO事件:

PollerEvent(从SynchronizedQueue中取出) -> NioSocketWrapper(注册到Selector) -> SocketProcessor(事件发生时交给Executor)

Nio2EndPoint

Nio2EndPoint使用异步IO模型(AIO)来处理网络通信

AIO的特点就是异步,使用回调函数,当数据就绪时使用异步线程调用回调函数

无需再像NIO中使用Selector阻塞,让应用线程来触发读取数据,阻塞到数据拷贝到应用缓冲区

Nio2实际上指的就是AIO,NIO2表明这是对原有NIO的一个升级版本

Nio2EndPoint处理网络通信时不再需要检测IO事件,把这件事交给内核去做,当事件发生(数据就绪)时使用异步线程调用回调函数即可

相比于NioEndPoint,Nio2EndPoint在处理网络通信时,不需要再用Poller检测IO事件

Nio2Acceptor

Nio2EndPoint中使用Nio2Acceptor接收连接,Nio2Acceptor继承Acceptor并实现回调接口CompletionHandler

代码语言:java
复制
class Nio2Acceptor 
extends Acceptor<AsynchronousSocketChannel> 
implements CompletionHandler<AsynchronousSocketChannel,Void>

回调接口第一个参数为IO操作的结果,第二个泛型为操作使用的附件,其中有两个方法分别代表着成功/失败后执行的回调

代码语言:java
复制
public interface CompletionHandler<V,A> {
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

Nio2Acceptor.run

由于使用AIO,Nio2Acceptor在执行任务时不再需要循环,只需要携带回调函数,当客户端连接完成时触发回调

在执行时主要做两件事:

  1. 使用LimitLatch限制连接数
  2. 接收连接
代码语言:java
复制
public void run() {
    if (!isPaused()) {
        try {
            //1.使用LimitLatch限制连接数
            countUpOrAwaitConnection();
        } catch (InterruptedException e) {
        }
        if (!isPaused()) {
            //2.接收连接
            serverSock.accept(null, this);
        } else {
            state = AcceptorState.PAUSED;
        }
    } else {
        state = AcceptorState.PAUSED;
    }
}

serverSock.accept(null, this); 在接收连接时,使用的服务端channel为AsynchronousServerSocketChannel,并把当前对象作为回调传入

这样在下次收到连接后的回调又可以调用该方法,以此来达到不需要循环调用

Nio2Acceptor.completed

回调成功的方法中主要做几件事:

  1. 是否限制连接数量
  2. 调用accept,方便接收下次连接
  3. 调用后续处理
代码语言:java
复制
public void completed(AsynchronousSocketChannel socket,Void attachment) {
    errorDelay = 0;
    if (isRunning() && !isPaused()) {
        //1.是否限制连接数量
        if (getMaxConnections() == -1) {
            //不限制连接数量,方便接收下一次连接
            serverSock.accept(null, this);
        } else if (getConnectionCount() < getMaxConnections()) {
            try {
                //当前连接数小于最大限制连接数,不阻塞,主要是去自增计数
                countUpOrAwaitConnection();
            } catch (InterruptedException e) {
                // Ignore
            }
            //方便接收下次连接
            serverSock.accept(null, this);
        } else {
            //当前连接数大于等于最大限制连接数,再调用limitlatch会阻塞,为了避免阻塞使用线程池去执行(排队)
            getExecutor().execute(this);
        }
        //setSocketOptions后续处理
        if (!setSocketOptions(socket)) {
            closeSocket(socket);
        }
    } else {
        if (isRunning()) {
            state = AcceptorState.PAUSED;
        }
        destroySocket(socket);
    }
}

在Acceptor中也会调用setSocketOptions方法,那时会将连接包装NioSocketWrapper,然后封装为PollerEvent放入poller队列

在AIO中由于不再存在poller,该方法会将连接包装为Nio2SocketWrapper,然后调用父类AbstractEndpoint.processSocket方法去执行

AbstractEndpoint.processSocket

在此方法中会先将包装类封装为SocketProcessor再去执行

代码语言:java
复制
	public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event, boolean dispatch) {
        	//...
        
        	//封装为SocketProcessor
            SocketProcessorBase<S> sc = null;
            if (processorCache != null) {
                sc = processorCache.pop();
            }
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                //当前线程执行
                sc.run();
            }
        
        	return true;
	}

需要注意的是,当前线程本来就是异步回调线程,参数dispatch会为false

也就是这里不会使用线程池去执行,而是由当前异步回调的线程去执行SocketProcessor

Nio2SocketWrapper

当前线程是连接完成执行异步回调的线程,去执行SocketProcessor也就是会使用Processor解析数据,但此时数据可能还未准备好

为了不让Processor阻塞等待,这里会失败,直到数据就绪时触发的异步回调来执行时才能够读到数据

Nio2SocketWrapper中包含一些读写事件的回调

比如读回调中:当数据就绪时,会去执行processSocket,也就是封装SocketProcessor进行后续调用(此时会第二次使用Processor进行读数据,这样确保数据已就绪)

代码语言:java
复制
this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
    @Override
    public void completed(Integer nBytes, ByteBuffer attachment) {
        //...
        getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false);
    }
}    
Http11Processor

在Processor处理HTTP协议的实现类Http11Processor中,执行service解析请求时,会先解析请求头parseRequestLine

代码语言:java
复制
public SocketState service(SocketWrapperBase<?> socketWrapper)  throws IOException {
    //...
    inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),protocol.getKeepAliveTimeout())
    //...    
}

当parseRequestLine返回false时,说明数据未就绪,不会执行后续操作,因此第一次读数据时由于数据未就绪不会再往后执行

NioEndPoint大致的运行流程如下图:

image.png
image.png

在Nio2EndPoint中,使用异步回调的方式,避免poller中的操作,能够提升效率,但是大量异步线程的引入又会带来线程上下文切换的开销

连接的包装类(Nio2SocketWrapper)为核心贯穿全文,封装流程如下:

Nio2Channel(回调拿到客户端连接) -> Nio2SocketWrapper -> SocketProcessor(事件发生时)

连接完成的回调:Nio2Channel -> Nio2SocketWrapper -> SocketProcessor -> Http11Processor(解析失败,数据未就绪)

读事件就绪的回调:Nio2SocketWrapper -> SocketProcessor -> Http11Processor

AprEndPoint

APR(Apache Portable Runtime)是Apache提供的可移植运行库,是为了早期的Tomcat的提供高性能的

早期NIO还不成熟,使用APR通过JNI调用本地C语言实现的库,能够使用操作系统的epoll来实现多路复用模型,旨在提高高性能

AprEndPoint 流程与NioEndPoint相同,只是其中调用的方法不同,NioEndPoint调用JDK NIO的API,而AprEndPoint调用APR库

AprEndPoint在通道上使用的缓冲区是基于直接内存的(DirectByteBuffer),而NioEndPoint与Nio2EndPoint都是使用堆内存的(HeapByteBuffer)

使用直接内存的好处是能够减少数据拷贝带来的开销,但无法使用JVM来进行管理内存

并且AprEndPoint还能使用零拷贝sendfile,将数据从磁盘读到网卡发送时减少各种拷贝开销

image.png
image.png

但在后来NIO、AIO逐渐成熟,AprEndPoint带来的好处逐渐被追平,在Tomcat 10时被遗弃

总结

NioEndPoint将处理网络通信分为接收连接、监听事件、处理请求三个步骤

其中Acceptor负责接收连接,使用LimitLatch限制连接数量(若超过上限则等待),获取客户端连接NioChannel,包装为NioSocketWrapper,再封装为PollerEvent放入Poller的队列中

Poller会轮询处理PollerEvent,通过PollerEvent拿到NioSocketWrapper将连接注册到Selector上,使用Selector监听事件,有事件触发时,从附件中获取连接的包装NioSocketWrapper,将其封装为SocketProcessor交给线程池处理

线程池的线程处理SocketProcessor时,则会使用Processor解析协议,后续再封装请求/响应调用容器处理

Nio2EndPoint 使用AIO,由内核监听事件(数据就绪)后使用异步线程执行回调

其中Nio2Acceptor继承Acceptor,接收连接不再循环处理,而是使用异步回调:当连接完成后再使用LimitLatch判断是否限制连接,调用非阻塞accept便于接收下次连接(回调),然后将客户端连接Nio2Channel封装为Nio2SocketWrapper再封装为SocketProcessor处理(后续调用processor无法解析,因为当前是连接完成的回调线程,数据还未就绪)

当数据就绪时,通过Nio2SocketWrapper的回调继续封装为SocketProcessor向后处理(后续调用processor可以解析,因为当前为读数据就绪的回调线程,第二次读)

早期的APR通过本地库、直接内存、零拷贝等多种方式进行性能优化

🌠最后(不要白嫖,一键三连求求拉~)

本篇文章被收入专栏 Tomcat全解析:架构设计与核心组件实现,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 深入浅出Tomcat网络通信的高并发处理机制
    • NioEndPoint
      • Acceptor
      • Poller
      • Executor
    • Nio2EndPoint
      • Nio2Acceptor
      • Nio2SocketWrapper
      • Http11Processor
    • AprEndPoint
      • 总结
        • 🌠最后(不要白嫖,一键三连求求拉~)
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档