前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Java避坑指南:ScheduledThreadPoolExecutor避坑

Java避坑指南:ScheduledThreadPoolExecutor避坑

作者头像
崔认知
发布2023-06-20 11:16:45
发布2023-06-20 11:16:45
1.3K00
代码可运行
举报
文章被收录于专栏:nobodynobody
运行总次数:0
代码可运行

简介


单机场景下的延迟任务或周期性的调度任务,开发者经常使用ScheduledThreadPoolExecutor来完成。尤其是Apache RocketMQ 的很多功能都利用了定时任务来完成的。

ScheduledThreadPoolExecutor支持的延迟任务类型有:

【关注公众号:认知科技技术团队】

1、一次性延迟执行任务;

代码语言:javascript
代码运行次数:0
运行
复制
ScheduledThreadPoolExecutor#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit)
ScheduledThreadPoolExecutor#schedule(java.util.concurrent.Callable<V>, long, java.util.concurrent.TimeUnit)
ScheduledThreadPoolExecutor#schedule(java.util.concurrent.Callable<V>, long, java.util.concurrent.TimeUnit)
ScheduledThreadPoolExecutor#execute
ScheduledThreadPoolExecutor#submit(java.lang.Runnable)
ScheduledThreadPoolExecutor#submit(java.util.concurrent.Callable<T>)

2、按照固定的周期执行(fixed rate);

下次任务的执行时间:上次任务执行【开始】的时间 + period。

代码语言:javascript
代码运行次数:0
运行
复制
ScheduledThreadPoolExecutor#scheduleAtFixedRate

3、固定延迟的定期执行(fixed delay);

下次任务的执行时间:上次任务执行【完毕】当前的时间即System.nanoTime() + delay。

代码语言:javascript
代码运行次数:0
运行
复制
ScheduledThreadPoolExecutor#scheduleWithFixedDelay

ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor,参数基本相同:

代码语言:javascript
代码运行次数:0
运行
复制
public ScheduledThreadPoolExecutor(int corePoolSize,
                      ThreadFactory threadFactory,
                     RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
    }

1、corePoolSize 核心线程数;

2、ThreadFactory ThreadFactory 线程工厂;

3、RejectedExecutionHandler handler 任务拒绝策略;

4、线程池默认参数:maximumPoolSize 为Integer.MAX_VALUE,workQueue为延迟队列DelayedWorkQueue,keepAliveTime 为DEFAULT_KEEPALIVE_MILLIS;

延迟队列DelayedWorkQueue是基于数组实现的最小堆,堆顶是延迟最小的任务元素。DelayedWorkQueue容量是不受限的,最大为Integer.MAX_VALUE,默认初始化容量为16,会伴随着扩容操作由于任务队列容量无限,此时线程的maximumPoolSize和RejectedExecutionHandler设置与不设置都没什么用处。

避坑:提交的任务内部不处理异常,异常信息会丢失,任务不再继续被调度


提交的延迟任务被封装为ScheduledFutureTask,此类继承FutureTask,在任务处理过程中发生的异常会保存在java.util.concurrent.FutureTask#outcome中:

代码语言:javascript
代码运行次数:0
运行
复制
java.util.concurrent.FutureTask#run

如果我们不调用方法:

代码语言:javascript
代码运行次数:0
运行
复制
java.util.concurrent.FutureTask#get()

获取任务执行结果,异常信息就是丢失,调用方无法感知异常的发生。

Java避坑指南:ThreadPoolExecutor提交任务出现异常,异常是否吞掉,线程是否退出的不同影响

由于是调度任务,此方法大多不会被开发者调用,所以提交的任务内部需要处理异常

正确处理任务调度的异常案例:

代码语言:javascript
代码运行次数:0
运行
复制
org.apache.rocketmq.broker.BrokerController#initializeBrokerScheduledTasks

避坑:被周期性调度的任务,每次开始执行的时间不是很精确,不要以配置的周期时间去从DB拉取时间范围内的数据做业务处理


所有提交的延迟任务都会先保存至延迟队列中,等待线程池中的线程获取执行。

代码语言:javascript
代码运行次数:0
运行
复制
java.util.concurrent.ScheduledThreadPoolExecutor#delayedExecute

这个与ThreadPoolExecutor提交任务的流程是不一样的。

按照固定的周期执行(fixed rate)和固定延迟的定期执行(fixed delay)的任务执行是不会重叠的。

周期性的任务,当本次执行完毕,重置任务,重新添加到延迟队列,等待下次执行

如果任务执行时间大于周期,则下次任务的执行不再是上次任务执行【开始】的时间 + period,或者上次任务执行【完毕】当前的时间即System.nanoTime() + delay。如果此时业务根据当前时间(数据库为准)查询前period或delay时间范围内的数据,就会导致业务会处理不到之前稍久的数据。

避坑:不要初始化corePoolSize过小,或设置allowCoreThreadTimeOut


设置线程池数目过小或者核心线程池超时,可能导致任务不能及时被调度执行。

设置过小,由于线程池都在忙碌,即时延迟队列中的堆顶任务需要立即被调度执行,但是没有剩余的线程来执行,等忙碌的线程池执行完任务才有机会得到调度。

虽然周期性延迟任务在重新调度时,会按需创建核心线程,但是失去了线程池的好处

小结


ScheduledThreadPoolExecutor方便了任务的调度,但是任务的异常处理和任务调度的精度问题,需要开发者自己妥善处理。


本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-10-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 认知科技技术团队 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档