简介
单机场景下的延迟任务或周期性的调度任务,开发者经常使用ScheduledThreadPoolExecutor来完成。尤其是Apache RocketMQ 的很多功能都利用了定时任务来完成的。
ScheduledThreadPoolExecutor支持的延迟任务类型有:
【关注公众号:认知科技技术团队】
1、一次性延迟执行任务;
ScheduledThreadPoolExecutor#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit)
ScheduledThreadPoolExecutor#schedule(java.util.concurrent.Callable<V>, long, java.util.concurrent.TimeUnit)
ScheduledThreadPoolExecutor#schedule(java.util.concurrent.Callable<V>, long, java.util.concurrent.TimeUnit)
ScheduledThreadPoolExecutor#execute
ScheduledThreadPoolExecutor#submit(java.lang.Runnable)
ScheduledThreadPoolExecutor#submit(java.util.concurrent.Callable<T>)
2、按照固定的周期执行(fixed rate);
下次任务的执行时间:上次任务执行【开始】的时间 + period。
ScheduledThreadPoolExecutor#scheduleAtFixedRate
3、固定延迟的定期执行(fixed delay);
下次任务的执行时间:上次任务执行【完毕】当前的时间即System.nanoTime() + delay。
ScheduledThreadPoolExecutor#scheduleWithFixedDelay
ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor,参数基本相同:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
1、corePoolSize 核心线程数;
2、ThreadFactory ThreadFactory 线程工厂;
3、RejectedExecutionHandler handler 任务拒绝策略;
4、线程池默认参数:maximumPoolSize 为Integer.MAX_VALUE,workQueue为延迟队列DelayedWorkQueue,keepAliveTime 为DEFAULT_KEEPALIVE_MILLIS;
延迟队列DelayedWorkQueue是基于数组实现的最小堆,堆顶是延迟最小的任务元素。DelayedWorkQueue容量是不受限的,最大为Integer.MAX_VALUE,默认初始化容量为16,会伴随着扩容操作。由于任务队列容量无限,此时线程的maximumPoolSize和RejectedExecutionHandler设置与不设置都没什么用处。
避坑:提交的任务内部不处理异常,异常信息会丢失,任务不再继续被调度
提交的延迟任务被封装为ScheduledFutureTask,此类继承FutureTask,在任务处理过程中发生的异常会保存在java.util.concurrent.FutureTask#outcome中:
java.util.concurrent.FutureTask#run
如果我们不调用方法:
java.util.concurrent.FutureTask#get()
获取任务执行结果,异常信息就是丢失,调用方无法感知异常的发生。
Java避坑指南:ThreadPoolExecutor提交任务出现异常,异常是否吞掉,线程是否退出的不同影响
由于是调度任务,此方法大多不会被开发者调用,所以提交的任务内部需要处理异常。
正确处理任务调度的异常案例:
org.apache.rocketmq.broker.BrokerController#initializeBrokerScheduledTasks
避坑:被周期性调度的任务,每次开始执行的时间不是很精确,不要以配置的周期时间去从DB拉取时间范围内的数据做业务处理
所有提交的延迟任务都会先保存至延迟队列中,等待线程池中的线程获取执行。
java.util.concurrent.ScheduledThreadPoolExecutor#delayedExecute
这个与ThreadPoolExecutor提交任务的流程是不一样的。
按照固定的周期执行(fixed rate)和固定延迟的定期执行(fixed delay)的任务执行是不会重叠的。
周期性的任务,当本次执行完毕,重置任务,重新添加到延迟队列,等待下次执行。
如果任务执行时间大于周期,则下次任务的执行不再是上次任务执行【开始】的时间 + period,或者上次任务执行【完毕】当前的时间即System.nanoTime() + delay。如果此时业务根据当前时间(数据库为准)查询前period或delay时间范围内的数据,就会导致业务会处理不到之前稍久的数据。
避坑:不要初始化corePoolSize过小,或设置allowCoreThreadTimeOut
设置线程池数目过小或者核心线程池超时,可能导致任务不能及时被调度执行。
设置过小,由于线程池都在忙碌,即时延迟队列中的堆顶任务需要立即被调度执行,但是没有剩余的线程来执行,等忙碌的线程池执行完任务才有机会得到调度。
虽然周期性延迟任务在重新调度时,会按需创建核心线程,但是失去了线程池的好处:
小结
ScheduledThreadPoolExecutor方便了任务的调度,但是任务的异常处理和任务调度的精度问题,需要开发者自己妥善处理。