剖析tomcat线程池的源码,本文以源码来解析tomcat的线程池使用策略
首先先在tomcat官网找到对应的tomcat线程池配置,具体定位在:Tomcat线程池
然后对其配置的默认参数进行解释:
threadPriority :优先级,默认是Normal
daemon :是否守护线程,默认是true
namePrefix:线程名字:tomcat-exc-1+
maxThreads:最大线程:默认 200
minSpareThreads :最小在线线程:默认25
maxIdleTime :最大在线时间:默认60s(线程执行完成之后60s就会被shutdown)
maxQueueSize:队列的最大值:Integer的最大值> Integer.MAX_VALUE
prestartminSpareThreads :是否在启动的时候占用最小在线线程:默认 false(如果为true,即在启动tomcat的时候就会启动minSpareThreads个线程)
threadRenewalDelay:重建线程池内的线程:默认值为1000(为了避免线程同时重建,每隔threadRenewalDelay(单位: ms )重建一个线程)
好了,看完这些参数之后,来进行分析tomcat启动线程的源码,这里以springboot内置的tomcat源码分析为主~
首先找到这个类,这个类即为启动的核心
org.apache.catalina.core.StandardThreadExecutor
protected void startInternal() throws LifecycleException {
//创建一个任务队列 容量为Int的最大值
taskqueue = new TaskQueue(maxQueueSize);
//创建线程factory
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
//创建一个线程池 核心线程25个 最大200个 存活60s
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
//设置重建线程时间 1s
executor.setThreadRenewalDelay(threadRenewalDelay);
//是否开启占用核心线程,默认关闭
if (prestartminSpareThreads) {
//如果开启 见下面的代码@1.1
executor.prestartAllCoreThreads();
}
//@1.0 设置parent,关联线程池对象
taskqueue.setParent(executor);
setState(LifecycleState.STARTING);
}
@1.1
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
继续跟源码:
当有一个请求执行自然也是会执行execute方法 找到StandardThreadExecutor#execute
public void execute(Runnable command, long timeout, TimeUnit unit) {
if (executor != null) {
//执行线程操作
executor.execute(command,timeout,unit);
} else {
throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
}
}
它这个是将jdk的execute二次封装了一下,我们不用管,继续跟源码……
public void execute(Runnable command, long timeout, TimeUnit unit) {
//这个字段是记录的线程提交的数量,如果线程执行完毕,这个字段会减1 等下会用到
submittedCount.incrementAndGet();
try {
//执行jdk的线程池代码
super.execute(command);
} catch (RejectedExecutionException rx) {
//拒绝策略,发生的情况是线程超过最大值(maxThreads),并且队列也已经满了,也就是(Integer.MAX+maxThreads)
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
//这里其实就是讲这个线程(60s后)丢到队列里面,如果60s后队列还是满的,那就没办法了,抛异常……
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
@1.2
/**
* The number of tasks submitted but not yet finished. This includes tasks
* in the queue and tasks that have been handed to a worker thread but the
* latter did not start executing the task yet.
*/
private final AtomicInteger submittedCount = new AtomicInteger(0);
大概意思是任务提交了但是没有执行结束,统计的是队列里的任务和已经在执行但是还没有执行完的任务
(其实前面已经分析过了:线程池原理分析)
// ctl 中保存的线程池当前的一些状态信息
int c = ctl.get();
// 下面会涉及到 3 步 操作
// 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize
// 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.注意这里是重点!!!
// 如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里
// 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
//断点达到workQueue,发现workQueue对象是我们tomcat启动类的TaskQueue对象,那么我们源码打到taskQueue的offer方法
//@1.3 workQueue.offer(command)如果返回false 则会执行@1.4操作
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程池为空就新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
//如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
@1.4
else if (!addWorker(command, false))
reject(command);
}
public boolean offer(Runnable o) {
//we can't do any checks >>> 如果parent为null直接丢到队列里面 当然,parent肯定不会为null,本页面搜@1.0
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object>>>如果工作线程数量等于最大线程数量>>>丢到队列里面
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue>>>如果队列里的任务加正在执行的任务(见@1.2) 小于等于正在执行的任务,丢到队列里面
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread >>> 如果正在执行的任务小于最大任务数量,返回false???
//这里看看,返回false是什么意思呢?见@1.3
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
因此,这里我们就分析完了,原来他是这么搞的,再用一张流程图来解释一下线程池中的队列执行流程
拒绝策略: 具体自己写个demo(工作线程大于将队列的容量+最大线程数量,则会触发拒绝策略)
最后:看一下dubbo的拒绝策略: 类:
org.apache.dubbo.common.threadpool.support.eager.EagerThreadPoolExecutor
跟源码…… 类名一样,还是TaskQueue,不过它是org.apache.dubbo.common.threadpool.support.eager.TaskQueue
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
//当前工作线程大小
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.>>已经提交的线程如果小于工作线程 丢到队列里面
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// return false to let executor create new worker.>> 当前线程小于最大线程容量 可以进行创建线程并执行
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// currentPoolThreadSize >= max
return super.offer(runnable);
}
然后再看一下它的拒绝策略,具体在下面那行代码
queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)
/**
* retry offer task
*
* @param o task
* @return offer success or not
* @throws RejectedExecutionException if executor is terminated.
*/
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
//不就是立马丢到队列里面???感觉又包装了一层……可能它想的是公用性吧
return super.offer(o, timeout, unit);
}
//dubbo的线程池拒绝策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
//dump栈信息
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
long now = System.currentTimeMillis();
//dump every 10 minutes
if (now - lastPrintTime < 10 * 60 * 1000) {
return;
}
if (!guard.tryAcquire()) {
return;
}
//开启一个新的线程将jstack信息dump下来,并保存到服务器上
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {
String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));
SimpleDateFormat sdf;
String os = System.getProperty("os.name").toLowerCase();
// window system don't support ":" in file name
if (os.contains("win")) {
sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
} else {
sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
}
String dateStr = sdf.format(new Date());
//try-with-resources
//栈文件日志名为……
try (FileOutputStream jStackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
JVMUtil.jstack(jStackStream);
} catch (Throwable t) {
logger.error("dump jStack error", t);
} finally {
guard.release();
}
lastPrintTime = System.currentTimeMillis();
});
//must shutdown thread pool ,if not will lead to OOM
//这里使用完线程池之后一定要shutdown,不然会oom
pool.shutdown();
}
这里的话tomcat线程池的源码和拒绝策略就解释完了
tomcat线程池是怎样执行的?拒绝策略是如何?
看完这个源码后,balabala……其实我们也可以先把最大线程数用完,然后再让任务进入队列。通过自定义队列,重写其 offer 方法就可以实现。目前我知道的 Tomcat 和 Dubbo 都提供了这样策略的线程池,扯一堆。 面试官:好了,我们聊聊其他的吧!