定义一个线程,然后 while 循环
public static void main(String[] args) { final long timeInterval = 5000; new Thread(new Runnable() { @Override public void run() { while (true) { System.out.println(Thread.currentThread().getName() + "每隔5秒执行一次"); try { Thread.sleep(timeInterval); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start();}这种实现方式下多个定时任务需要开启多个线程,而且线程在做无意义sleep,消耗资源,性能低下。
实现代码,调度两个任务
public static void main(String[] args) { Timer timer = new Timer(); //每隔1秒调用一次 timer.schedule(new TimerTask() { @Override public void run() { System.out.println("test1"); } }, 1000, 1000); //每隔3秒调用一次 timer.schedule(new TimerTask() { @Override public void run() { System.out.println("test2"); } }, 3000, 3000);}schedule实现源码
public void schedule(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, -period); }shed里面将任务add到最小堆,然后fixUp进行调整
TimerThread其实就是一个任务调度线程,首先从TaskQueue里面获取排在最前面的任务,然后判断它是否到达任务执行时间点,如果已到达,就会立刻执行任务
class TimerThread extends Thread { boolean newTasksMayBeScheduled = true; private TaskQueue queue; TimerThread(TaskQueue queue) { this.queue = queue; } public void run() { try { mainLoop(); } finally { // Someone killed this Thread, behave as if Timer cancelled synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // Eliminate obsolete references } } } /** * The main timer loop. (See class comment.)
*/
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// Wait for queue to become non-empty
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } }}总结这个利用最小堆实现的方案,相比 while + sleep
方案,多了一个线程来管理所有的任务,优点就是减少了线程之间的性能开销,提升了执行效率;但是同样也带来的了一些缺点,整体的新加任务写入效率变成了
O(log(n))。
同时,细心的发现,这个方案还有以下几个缺点:
串行阻塞:调度线程只有一个,长任务会阻塞短任务的执行,例如,A任务跑了一分钟,B任务至少需要等1分钟才能跑
容错能力差:没有异常处理能力,一旦一个任务执行故障,后续任务都无法执行
鉴于 Timer 的上述缺陷,从 Java 5 开始,推出了基于线程池设计的 ScheduledThreadPoolExecutor 。
image
其设计思想是,每一个被调度的任务都会由线程池来管理执行,因此任务是并发执行的,相互之间不会受到干扰。需要注意的是,只有当任务的执行时间到来时,ScheduledThreadPoolExecutor
才会真正启动一个线程,其余时间 ScheduledThreadPoolExecutor 都是在轮询任务的状态。
简单的使用示例:
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3); //启动1秒之后,每隔1秒执行一次 executor.scheduleAtFixedRate(()-> System.out.println("test3"),1,1, TimeUnit.SECONDS); //启动1秒之后,每隔3秒执行一次 executor.scheduleAtFixedRate((() -> System.out.println("test4")),1,3, TimeUnit.SECONDS);同样的,我们首先打开源码,看看里面到底做了啥
首先是校验基本参数,然后将任务作为封装到ScheduledFutureTask线程中,ScheduledFutureTask继承自RunnableScheduledFuture,并作为参数调用delayedExecute()方法进行预处理
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t;}可以很清晰的看到,当线程池没有关闭的时候,会通过super.getQueue().add(task)操作,将任务加入到队列,同时调用ensurePrestart()方法做预处理
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //预处理 ensurePrestart(); }}其中super.getQueue()得到的是一个自定义的new DelayedWorkQueue()阻塞队列,数据存储方面也是一个最小堆结构的队列,这一点在初始化new ScheduledThreadPoolExecutor()的时候,可以看出!
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());}打开源码可以看到,DelayedWorkQueue其实是ScheduledThreadPoolExecutor中的一个静态内部类,在添加的时候,会将任务加入到RunnableScheduledFuture数组中。然后调用线程池的ensurePrestart方法将任务添加到线程池。调用链:addWorker->t.run->new Worker.run-> runWorker->Runnable r = timed ?undefined workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :undefined workQueue.take();->task.run->RunnableScheduledFuture.run
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> { private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; //.... public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; } public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }}ScheduledFutureTask任务线程,才是真正执行任务的线程类,只是绕了一圈,做了很多包装,run()方法就是真正执行定时任务的方法。
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** Sequence number to break ties FIFO */ private final long sequenceNumber; /** The time the task is enabled to execute in nanoTime units */ private long time; /** * Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long period; /** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture<V> outerTask = this; /** * Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)//非周期性定时任务
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {//周期性定时任务,需要重置
setNextRunTime();
reExecutePeriodic(outerTask);
}
} //...}ScheduledExecutorService 相比 Timer 定时器,完美的解决上面说到的 Timer 存在的两个缺点!
在单体应用里面,使用 ScheduledExecutorService 可以解决大部分需要使用定时任务的业务需求!
但是这是否意味着它是最佳的解决方案呢?
我们发现线程池中 ScheduledExecutorService 的排序容器跟 Timer
一样,都是采用最小堆的存储结构,新任务加入排序效率是O(log(n)),执行取任务是O(1)。
这里的写入排序效率其实是有空间可提升的,有可能优化到O(1)的时间复杂度,也就是我们下面要介绍的 时间轮实现 !
DelayQueue是一个无界延时队列,内部有一个优先队列,可以重写compare接口,按照我们想要的方式进行排序。
实现Demo
public static void main(String[] args) throws Exception { DelayQueue<Order> orders = new DelayQueue<>(); Order order1 = new Order(1000, "1x"); Order order2 = new Order(2000, "2x"); Order order3 = new Order(3000, "3x"); Order order4 = new Order(4000, "4x"); orders.add(order1); orders.add(order2); orders.add(order3); orders.add(order4); for (; ; ) { //没有到期会阻塞 Order take = orders.take(); System.out.println(take); } }}class Order implements Delayed { @Override public String toString() { return "DelayedElement{" + "delay=" + delayTime + ", expire=" + expire + ", data='" + data + '\'' + '}'; } Order(long delay, String data) { delayTime = delay; this.data = data; expire = System.currentTimeMillis() + delay; } private final long delayTime; //延迟时间 private final long expire; //到期时间 private String data; //数据 /** * 剩余时间=到期时间-当前时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} /** * 优先队列里面优先级规则
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}从源码可以看出,DelayQueue的offer和take方法调用的是优先队列的offer和take。并且使用了ReetrtantLock保证线程安全
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }https://my.oschina.net/u/2474629/blog/1919127
代码实现:支持秒级别的循环队列,从下标最小的任务集合开始,提交到线程池执行。然后休眠1s,指针移动到下一个下标处。
所谓时间轮(RingBuffer)实现,从数据结构上看,简单的说就是循环队列,从名称上看可能感觉很抽象。
它其实就是一个环形的数组,如图所示,假设我们创建了一个长度为 8 的时间轮。
image
插入、取值流程:
因此,总结起来有两个核心的变量:
通过这张图可以更直观的理解!
image
当我们需要取出延时任务时,只需要每秒往下移动这个指针,然后取出该位置的所有任务即可,取任务的时间消耗为O(1)。
当我们需要插入任务,也只需要计算出对应的下表和圈数,即可将任务插入到对应的数组位置中,插入任务的时间消耗为O(1)。
如果时间轮的槽比较少,会导致某一个槽上的任务非常多,那么效率也比较低,这就和 HashMap 的 hash
冲突是一样的,因此在设计槽的时候不能太大也不能太小。
package com.hui.hui;import java.util.Collection;import java.util.HashSet;import java.util.Map;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class RingBuffer { private static final int STATIC_RING_SIZE = 64; private Object[] ringBuffer; private int bufferSize; /** * business thread pool
*/
private ExecutorService executorService; private volatile int size = 0; /*** * task stop sign
*/
private volatile boolean stop = false; /** * task start sign
*/
private volatile AtomicBoolean start = new AtomicBoolean(false); /** * total tick times
*/
private AtomicInteger tick = new AtomicInteger(); private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private AtomicInteger taskId = new AtomicInteger(); private Map<Integer, Task> taskMap = new ConcurrentHashMap<>(16); /** * Create a new delay task ring buffer by default size
*
* @param executorService the business thread pool
*/
public RingBuffer(ExecutorService executorService) {
this.executorService = executorService;
this.bufferSize = STATIC_RING_SIZE;
this.ringBuffer = new Object[bufferSize];
} /** * Create a new delay task ring buffer by custom buffer size
*
* @param executorService the business thread pool
* @param bufferSize custom buffer size
*/
public RingBuffer(ExecutorService executorService, int bufferSize) {
this(executorService); if (!powerOf2(bufferSize)) { throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2"); } this.bufferSize = bufferSize; this.ringBuffer = new Object[bufferSize]; } /** * Add a task into the ring buffer(thread safe)
*
* @param task business task extends {@link Task}
*/
public int addTask(Task task) {
int key = task.getKey();
int id; try { lock.lock(); int index = mod(key, bufferSize); task.setIndex(index); Set<Task> tasks = get(index); int cycleNum = cycleNum(key, bufferSize); if (tasks != null) { task.setCycleNum(cycleNum); tasks.add(task); } else { task.setIndex(index); task.setCycleNum(cycleNum); Set<Task> sets = new HashSet<>(); sets.add(task); put(key, sets); } id = taskId.incrementAndGet(); task.setTaskId(id); taskMap.put(id, task); size++; } finally { lock.unlock(); } start(); return id; } /** * Cancel task by taskId
*
* @param id unique id through {@link #addTask(Task)}
* @return
*/
public boolean cancel(int id) { boolean flag = false; Set<Task> tempTask = new HashSet<>(); try { lock.lock(); Task task = taskMap.get(id); if (task == null) { return false; } Set<Task> tasks = get(task.getIndex()); for (Task tk : tasks) { if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) { size--; flag = true; taskMap.remove(id); } else { tempTask.add(tk); } } //update origin data ringBuffer[task.getIndex()] = tempTask; } finally { lock.unlock(); } return flag; } /** * Thread safe
*
* @return the size of ring buffer
*/
public int taskSize() {
return size;
} /** * Same with method {@link #taskSize}
*
* @return
*/
public int taskMapSize() {
return taskMap.size();
} /** * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
*/
public void start() {
if (!start.get()) {
System.out.println("Delay task is starting");
if (start.compareAndSet(start.get(), true)) {
Thread job = new Thread(new TriggerJob());
job.setName("consumer RingBuffer thread");
job.start();
start.set(true);
} } } /** * Stop consumer ring buffer thread
*
* @param force True will force close consumer thread and discard all pending tasks
* otherwise the consumer thread waits for all tasks to completes before closing.
*/
public void stop(boolean force) {
if (force) {
stop = true;
executorService.shutdownNow();
} else {
System.out.println("Delay task is stopping");
if (taskSize() > 0) {
try {
lock.lock();
condition.await();
stop = true;
} catch (InterruptedException e) {
System.out.println("InterruptedException" + e);
} finally {
lock.unlock();
}
}
executorService.shutdown();
} } private Set<Task> get(int index) { return (Set<Task>) ringBuffer[index]; } private void put(int key, Set<Task> tasks) { int index = mod(key, bufferSize); ringBuffer[index] = tasks; } /** * Remove and get task list.
*
* @param key
* @return task list
*/
private Set<Task> remove(int key) {
Set<Task> tempTask = new HashSet<>();
Set<Task> result = new HashSet<>(); Set<Task> tasks = (Set<Task>) ringBuffer[key]; if (tasks == null) { return result; } for (Task task : tasks) { if (task.getCycleNum() == 0) { result.add(task); size2Notify(); } else { // decrement 1 cycle number and update origin data task.setCycleNum(task.getCycleNum() - 1); tempTask.add(task); } // remove task, and free the memory. taskMap.remove(task.getTaskId()); } //update origin data ringBuffer[key] = tempTask; return result; } private void size2Notify() { try { lock.lock(); size--; if (size == 0) { condition.signal(); } } finally { lock.unlock(); } } private boolean powerOf2(int target) { if (target < 0) { return false; } int value = target & (target - 1); if (value != 0) { return false; } return true; } private int mod(int target, int mod) { // equals target % mod target = target + tick.get(); return target & (mod - 1); } private int cycleNum(int target, int mod) { //equals target/mod return target >> Integer.bitCount(mod - 1); } /** * An abstract class used to implement business.
*/
public abstract static class Task extends Thread { private int index; private int cycleNum; private int key; /** * The unique ID of the task
*/
private int taskId; @Override public void run() { } public int getKey() { return key; } /** * @param key Delay time(seconds)
*/
public void setKey(int key) {
this.key = key;
} public int getCycleNum() { return cycleNum; } private void setCycleNum(int cycleNum) { this.cycleNum = cycleNum; } public int getIndex() { return index; } private void setIndex(int index) { this.index = index; } public int getTaskId() { return taskId; } public void setTaskId(int taskId) { this.taskId = taskId; } } private class TriggerJob implements Runnable { @Override public void run() { int index = 0; while (!stop) { try { Set<Task> tasks = remove(index); for (Task task : tasks) { executorService.submit(task); } if (++index > bufferSize - 1) { index = 0; } //Total tick number of records tick.incrementAndGet(); TimeUnit.SECONDS.sleep(1); } catch (Exception e) { System.out.println("Exception" + e); } } System.out.println("Delay task has stopped"); } } public static void main(String[] args) { RingBuffer ringBufferWheel = new RingBuffer(Executors.newFixedThreadPool(2)); for (int i = 0; i < 3; i++) { RingBuffer.Task job = new Job(); job.setKey(i); ringBufferWheel.addTask(job); } } public static class Job extends RingBuffer.Task { @Override public void run() { System.out.println("test5"+getIndex()); } }}之前说的单机实现,一旦服务器重启,那么延时任务会丢失,而分布式的方案则不会丢失任务。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。