前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >Java并发指南: 线程池ThreadPoolTaskExecutor的工作原理解析及避坑

Java并发指南: 线程池ThreadPoolTaskExecutor的工作原理解析及避坑

作者头像
崔认知
发布2023-06-20 11:15:18
发布2023-06-20 11:15:18
5.2K10
代码可运行
举报
文章被收录于专栏:nobodynobody
运行总次数:0
代码可运行

简介


Java的线程与操作系统的线程一一对应,每创建一个Java线程对象,操作系统会负责创建与之对应的系统线程线程是操作系统中宝贵的资源,创建和销毁非常昂贵且低效。(目前JDK19已经出现了虚拟线程-Virtual Threads 预览版 )。

为了提升性能和方便线程管理,在开发中,我们一般规定必须使用线程池,不允许自己手动创建线程

在微服务场景下,使用线程池时,为了避免链路追踪信息丢失,必须使用经过链路信息封装的线程池,如Spring Cloud 环境下的TraceableExecutorService

【关注公众号:认知科技技术团队】

线程池ThreadPoolTaskExecutor的7大核心参数及解析


创建线程池【必须使用】的一个包含7大核心参数的构造函数

代码语言:javascript
代码运行次数:0
复制
【关注公众号:认知科技技术团队】

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

1、核心线程数(corePoolSize) 和最大线程数(maximumPoolSize)

线程池根据corePoolSize和maximumPoolSize的值,自动维护线程池中工作线程的数量,大致规则如下:

(1)当向线程池提交任务时,如果当前线程池中工作线程数小于corePoolSize,就会创建一个新线程来执行该任务,即使线程池中其它的工作线程处于空闲状态

如果核心线程提前创建:prestartAllCoreThreads:

代码语言:javascript
代码运行次数:0
复制
java.util.concurrent.ThreadPoolExecutor#prestartCoreThread
java.util.concurrent.ThreadPoolExecutor#prestartAllCoreThreads

则直接进入步骤(2)。

(2)当向线程池提交任务时,如果当前线程池中工作线程数大于corePoolSize,但小于maximumPoolSize,则仅当任务工作队列workQueue满时,才会创建一个新线程来执行该任务

(3)corePoolSize和maximumPoolSize的值不仅能在构造函数指定,而且支持线程池运行时动态设值

代码语言:javascript
代码运行次数:0
复制
java.util.concurrent.ThreadPoolExecutor#setCorePoolSize
java.util.concurrent.ThreadPoolExecutor#setMaximumPoolSize

2、keepAliveTime和unit ,空闲线程回收策略时间

默认情况下,非核心线程空闲一定的时间后,是需要释放回收的,这个空闲时间是由keepAliveTime和unit共同决定的:

代码语言:javascript
代码运行次数:0
复制
this.keepAliveTime = unit.toNanos(keepAliveTime);

我们也可以根据任务情况动态设值:

代码语言:javascript
代码运行次数:0
复制
java.util.concurrent.ThreadPoolExecutor#setKeepAliveTime

空闲线程回收策略,默认情况下只针对非核心线程,如果想应用于核心线程,我们必须显示设置:

代码语言:javascript
代码运行次数:0
复制
java.util.concurrent.ThreadPoolExecutor#allowsCoreThreadTimeOut

线程池中的工作线程运行时,

代码语言:javascript
代码运行次数:0
复制
java.util.concurrent.ThreadPoolExecutor.Worker#run

只要第一个任务或者从任务队列中能获取到任务,此工作线程一直运行。keepAliveTime主要应用于从任务队列阻塞超时获取队头任务。

如果此工作线程空闲keepAliveTime,即任务队列阻塞超时keepAliveTime获取队头任务,获取不到任务时候,设置超时标志,下次for循环,根据下面超时策略判断,是否进入for循环再次从任务队列获取任务,或者超时返回NULL,导致此工作线程运行结束,线程被回收。

3、workQueue,任务队列

任务队列workQueue的作用主要是缓存提交的任务。当线程池线程数目超过非核心线程数且不超过最大线程数,提交的任务被缓存在此任务队列中。线程池中的线程会一直从此任务队列消费任务。

避坑:建议使用容量有限的任务队列,防止任务堆积导致OOM的发生。

4、threadFactory线程工厂

主要是在创建线程时设置线程的名字、daemon属性、优先级等属性。良好的线程名字在jstack命令下很好的分析解决问题

java.util.concurrent.Executors.DefaultThreadFactory的默认实现:

建议使用org.apache.commons.lang3.concurrent.BasicThreadFactory.Builder:

代码语言:javascript
代码运行次数:0
复制


BasicThreadFactory factory = new BasicThreadFactory.Builder()
                .namingPattern("认知科技thread-%d")
                .daemon(true)
                .priority(Thread.MAX_PRIORITY)
                .build();

5、handler-线程池的任务拒绝策略

向线程池提交任务时,当前线程数超过最大线程数maximumPoolSize,此时核心线程都在忙碌,而且任务队列也满了,提交的任务就会被拒绝。当然,如果线程池正处于关闭时,也会被拒绝提交

JUC提供了几种默认实现:

(1)ThreadPoolExecutor.AbortPolicy

如果线程池不设置任务拒绝策略,默认策略为AbortPolicy,通过抛出异常:

RejectedExecutionException拒绝任务提交。

(2)ThreadPoolExecutor.CallerRunsPolicy

提交任务的线程去执行此任务即当前提交任务的线程直接通过调用Runnable.run()执行任务,会阻塞当前线程。

一般情况下使用此策略,要打印日志上报。

代码语言:javascript
代码运行次数:0
复制
new ThreadPoolExecutor.CallerRunsPolicy() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                        log.info("rejectedExecution ");
                        super.rejectedExecution(r, e);
                    }
                }

使用场景一般为:串行调用改为并行调用,防止线程池设置不够用,退化为串行调用。当有任务被拒绝时,动态调整线程池大小,找到合适的参数。

(3)ThreadPoolExecutor.DiscardPolicy

被拒绝的任务默默地被丢弃,什么都不会去做,连日志都不打印。

此策略在开发中禁止使用。

如果向线程池提交任务后返回Future,使用不用带超时的get方法获取结果,可能永远会被阻塞。

避坑 崔认知,公众号:认知科技技术团队Java并发:FutureTask如何完成多线程并发执行、任务结果的异步获取?以及如何避其坑

(4)ThreadPoolExecutor.DiscardOldestPolicy

丢失任务队列丢弃队头任务,再次尝试向线程池提交任务。

不建议使用此策略。

上述线程池的任务拒绝策略实现,我们一般都需要继承,增加一个打印日志的操作。

任务提交至线程池ThreadPoolTaskExecutor流程


如下图所示:

(1)当向线程池提交任务时,如果当前线程池中工作线程数小于corePoolSize,就会创建一个新线程来执行该任务,即使线程池中其它的工作线程处于空闲状态。

(2)当向线程池提交任务时,如果当前线程池中工作线程数大于corePoolSize,当前任务被存储至任务工作队列workQueue中。

(3)当向线程池提交任务时,如果当前线程池中工作线程数大于corePoolSize,但小于maximumPoolSize,而且任务工作队列workQueue已满,则创建一个新线程来执行该任务。

(4)当向线程池提交任务时,如果当前线程池中工作线程数大于corePoolSize,并且任务工作队列workQueue已满,而且当前线程池中工作线程数大于maximumPoolSize,则执行任务拒绝策略拒绝任务提交。

线程池ThreadPoolTaskExecutor避坑


本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-10-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 认知科技技术团队 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档