在日常开发中, 定时任务是一个比较关键的功能。 Java 中一般使用 JDK 中 Timer
、ScheduledExecutorService
和调度框架 Quartz
等。 通常用于实现延时任务, 周期性任务等, 一般会有两种需求:
轮询的方式在数据量大的时候性能会比较差, 通常我们会选择第二种方式。
Timer 调度任务有一次性调度和循环调度,循环调度有分为:
固定速率调度(fixRate)
固定时延调度(fixDelay):
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println(LocalDateTime.now());
}
};
// 延迟 1s 打印(只打印一次)
timer.schedule(timerTask, 1000);
// 周期性每隔 1s 打印一次
timer.schedule(timerTask, 1000, 1000);
Timer 类里包含一个任务队列和一个异步轮训线程。
Timer 类:
class Timer {
// 任务队列, 任务通过 Timer.schedule 方法将任务加入 TaskQueue
// taskQueue 是数组实现的一个最小堆
TaskQueue queue = new TaskQueue();
// 执行任务的 timer 线程
TimerThread thread = new TimerThread(queue);
}
class TaskQueue {
TimerTask[] queue = new TimerTask[128];
int size;
}
任务调度过程:
// 调度任务
private void sched(TimerTask task, long time, long period) {
synchronized(task.lock) {
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}
queue.add(task);
if (queue.getMin() == task)
queue.notify();
}
// new Timer 的时候 内部的 TimerThread 已经开始 run 调用 mainLoop 了
// 运行任务
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// 队列是空并且还有其他任务, 则 wait
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
// 如果队列没有任务, 直接结束
if (queue.isEmpty())
break;
// taskQueue 按照 nextExecutionTime 进行堆排序, 取出最小的(应该执行的)任务
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue;
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
// 如果没有设置执行间隔, 就移除该任务
if (task.period == 0) {
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else {
// 否则加入队列等待下一次执行(这里是修改下一次任务的执行时间)
// 计算下一次调度时间
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
注意:
使用:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
//提交一个一次性触发的Runnable任务,在给定时延后执行。
public ScheduledFuture<?> schedule(
Runnable command,
long delay,
TimeUnit unit
);
//提交一个定期重复的Runnable任务,在initialDelay时间后第一次执行,此后每隔period时间执行一次。如果某次执行抛出异常,之后所有任务取消执行。
// 如果一次执行时间过长,完成时已经超过下次执行开始时间,下一次执行会等待上一次执行结束后马上执行
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command,
long initialDelay,
long period,
TimeUnit unit
);
// 跟 scheduleAtFixRate 类似
// 区别是在上一次执行结束之后,再等待delay时间,然后再执行下一次任务。
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command,
long initialDelay,
long delay,
TimeUnit unit
);
无论是 scheduleAtFixedRate
还是 scheduleWithFixedDelay
都会把任务包装成 ScheduledFutureTask
, 然后调用delayedExecute(RunnableScheduledFuture<?> task
处理,
这里以scheduleAtFixedRate
为例:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 包装成 ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 处理任务
delayedExecute(t);
return t;
}
delayedExecute 逻辑如下:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
//如果线程池已经关闭,拒绝该任务
reject(task);
else {
//将任务加入队列并再次检查线程池状态,不可运行则取消任务
super.getQueue().add(task);
//如果任务刚入队后线程池关闭了且不允许执行任务,
//从任务队列移除该任务并取消之
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 保证核心线程运行任务
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
//如果worker数量小于corePoolSize,新建Worker
//但是不设置firstTask而是让Worker从延迟队列获取任务
addWorker(null, true);
else if (wc == 0)
//保证至少有一个Worker在运行
addWorker(null, false);
}
可以看到以上代码并没有包含任何控制或响应延时的代码,因此这些逻辑应该是由延迟队列本身来控制的,这样就可以直接使用继承自ThreadPoolExecutor的方法完成其他相同的部分, 构造函数显示队列类型是DelayedWorkQueue
那我们回到加入队列的任务 ScheduledFutureTask
的 run 方法:
public void run() {
//根据periodic是否为0判断是否是周期性任务
boolean periodic = isPeriodic();
//判断为非可执行任务状态时,取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
//不是周期性任务,直接执行
else if (!periodic)
ScheduledFutureTask.super.run();
//是周期性任务runAndReset方法会执行在执行结束时将任务的状态重置为NEW,便于下次再次执行
else if (ScheduledFutureTask.super.runAndReset()) {
//设置下周执行的时间
setNextRunTime();
//再次执行周期行任务
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
// 这里就是 fixRate 和 fixDelay 的区别, fixRate preiod > 0, 以固定频率执行(上次任务时间+执行周期)
if (p > 0)
time += p;
// 固定延迟, 则是当前系统时间+执行周期
else
time = triggerTime(-p);
}
long triggerTime(long delay) {
// 返回当前系统时间加执行周期
return now() +
// 这里是为了防止溢出
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
//将任务重新入队
super.getQueue().add(task);
//如果线程池已关闭,将任务取消
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();//再次执行任务
}
}
TimeWheel时间轮算法,是一种实现延迟队列的巧妙且高效的算法,被应用在Netty,Zookeeper,Kafka等各种框架中, 应用场景广泛。
比如在Dubbo中,为增强系统的容错能力,会有相应的监听判断处理机制。比如RPC调用的超时机制的实现,消费者判断RPC调用是否超时,如果超时会将超时结果返回给应用层。
在Dubbo最开始的实现中,是将所有的返回结果(DefaultFuture)都放入一个集合中,并且通过一个定时任务,每隔一定时间间隔就扫描所有的future,逐个判断是否超时。
这样的实现方式虽然比较简单,但是存在一个问题就是会有很多无意义的遍历操作开销。比如一个RPC调用的超时时间是10秒,而设置的超时判定的定时任务是2秒执行一次,那么可能会有4次左右无意义的循环检测判断操作。
对于以上问题, 目的就是要减少额外扫描的次数,这样能减少 CPU 的开销, 时间轮可以很好的解决这个问题
单时间轮只有一个由bucket串起来的轮子,每个bucket下链接着未来对应时刻到期的节点。
假设相邻bucket到期时间的间隔为slot=1s,从当前时刻0s开始计时,1s时到期的定时器节点挂在bucket[1]下,2s时到期的定时器节点挂在bucket[2]下……
当tick检查到时间过去了1s时,bucket[1]下所有节点执行超时动作,当时间到了2s时,bucket[2]下所有节点执行超时动作……
上图只有 8 个 bucket, 如果按照 slot=expire 来算, 只能挂 8s 的定时任务, 超过 8s 可以使用 slot = expire % N, 这里需要引入 rotation 的概念,定时器中expire表示到期时间,rotation表示节点在时间轮转了几圈后才到期
)
Linux的多时间轮算法,借鉴了日常生活中水表的度量方法,通过低刻度走得快的轮子带动高一级刻度轮子走动的方法,达到了仅使用较少刻度即可表示很大范围度量值的效果
HashedWheelTimer
是接口 io.netty.util.Timer
的实现:
public interface Timer {
// 创建一个定时任务
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
// 停止所有的还没有被执行的定时任务
Set<Timeout> stop();
}
Timeout 是一个接口类, TimerTask 非常简单,就一个 run()
方法:
public interface TimerTask {
void run(Timeout timeout) throws Exception;
}
public interface Timeout {
Timer timer();
TimerTask task();
boolean isExpired();
boolean isCancelled();
boolean cancel();
}
构造函数太多,摘要一下几个重要的参数:
RejectedExecutionException
异常。默认不限制。HashedWheelTimer 提交任务:
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
// 校验等待任务数是否达到阈值 maxPendingTimeouts
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 如果工作线程没有启动,这里负责启动(如果是第一个任务, 还会 await 在这里)
start();
/** 下面的代码,构建 Timeout 实例,将其放到任务队列中。 **/
// deadline 是一个相对时间,相对于 HashedWheelTimer 的启动时间
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 防止 deadline 溢出
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// timeout 实例,一个上层依赖 timer,一个下层依赖 task,另一个是任务到期时间
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// 放到 timeouts 队列中
timeouts.add(timeout);
return timeout;
}
这里的操作很简单:实例化 Timeout,然后放到任务队列中。需要注意的是,任务队列是 MPSC(Multiple Producer Single Consumer)队列, 刚好适用于这里的多生产线程,单消费线程的场景(这个队列是 JCTools 提供的一个并发数据结构)
Worker 线程工作原理:
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
// tick 过的次数,时针每 100ms tick 一次
private long tick;
@Override
public void run() {
// 在 HashedWheelTimer 中,用的都是相对时间,所以需要启动时间作为基准,并且要用 volatile 修饰
// 这里跟 newTimeout 中 start 的部分结合理解一下, start 希望工作线程真正启动之后, 并赋值完 startTime 才返回, 因为后面的逻辑需要一个正确的 startTime
// 所以在 start 方法中使用 while(startTime ==0 ) startTimeInitialized.awit 这样做保证
startTime = System.nanoTime();
if (startTime == 0) {
startTime = 1;
}
// 唤醒工作线程启动的时候提交的任务(在提交任务的start里面 await 的)
startTimeInitialized.countDown();
// 执行逻辑的地方
do {
// 等待并获取下个 tick 相对于开始时间的纳秒数
final long deadline = waitForNextTick();
if (deadline > 0) {
// 该次 tick,bucket 数组对应的 index
int idx = (int) (tick & mask);
// 处理一下已经取消的任务
processCancelledTasks();
// 取得当前 bucket
HashedWheelBucket bucket = wheel[idx];
// 将队列中所有的任务转移到相应的 buckets 中
transferTimeoutsToBuckets();
// 执行进入到这个 bucket 中的任务
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
/* 到这里,说明这个 timer 要关闭了,做一些清理工作 */
// 将所有 bucket 中没有执行的任务,添加到 unprocessedTimeouts 这个 HashSet 中,
// 主要目的是用于 stop() 方法返回
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
// 将任务队列中的任务也添加到 unprocessedTimeouts 中
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
在细看一下 run 流程中几个重要的方法, waitForNextTick
在每个 tick
到期返回:
/**
* 前面说过,这里用的都是相对时间,所以:
* 第一次进来的时候,工作线程会在 100ms 的时候返回,返回值是 100*10^6 (纳秒)
* 第二次进来的时候,工作线程会在 200ms 的时候返回,依次类推
* 另外就是注意极端情况,比如第二次进来的时候,由于被前面的任务阻塞,导致进来的时候就已经是 250ms,
* 那么,一进入这个方法就要立即返回,返回值是 250ms,而不是 200ms
*/
private long waitForNextTick() {
// 下一个 tick 的毫秒数
long deadline = tickDuration * (tick + 1);
// 嵌套在一个死循环里面
for (;;) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
// 这里可能是 HashedWheelTimer 启动到现在经过了太久, currentTime 已经溢出了
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
// 这里是出口,所以返回值是当前时间(相对时间)
return currentTime;
}
}
// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
// 如果 timer 已经 shutdown,那么返回 Long.MIN_VALUE
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
接着看看转移队列到 bucket 中的方法 transferTimeoutsToBuckets
:
private void transferTimeoutsToBuckets() {
// 这里一个 for 循环,还特地限制了 10 万次, 怕一直往队列里面仍任务, netty 的源码还是很细节的
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// 没有任务了
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// 该任务刚刚被取消了(在transfer之前其实已经做过一次了)
continue;
}
// 计算还需要等待几个 tick
long calculated = timeout.deadline / tickDuration;
// 计算出轮次
timeout.remainingRounds = (calculated - tick) / wheel.length;
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
// 单个 bucket,是由 HashedWheelTimeout 实例组成的一个链表,
// 放入 bucket 链表
bucket.addTimeout(timeout);
}
}
bucket 中任务的执行:
/**
* 这里会执行这个 bucket 中,轮次为 0 的任务,也就是到期的任务。
* 这个方法的入参 deadline 其实没什么用,因为轮次为 0 的都是应该被执行的。
*/
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// 处理链表上的所有 timeout 实例
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
// 这行代码负责执行具体的任务
timeout.expire();
} else {
// 这里的代码注释也说,不可能进入到这个分支
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
// 轮次减 1
timeout.remainingRounds --;
}
timeout = next;
}
}
总的来说 HashedWheelTimer
工作原理如下: