本文源码基于: JDK13
用于延迟元素的一个无界的阻塞队列实现. 延迟元素只有在他的延迟过期之后,才可以被获取.
队头的元素,是队列中过期最早的元素。如果没有元素过期,那么将没有队头元素,poll
方法将会返回一个null.
过期操作只有元素的getDelay
方法返回一个小于等于0的数值时才会起作用.
尽管没有过期的元素,不能通过take
或者poll
来获取, 其他方面和正常的元素是一样的.
比如,size()
返回过期和未过期的元素的计数,同时,这个队列也是不接受空元素.
这个类和他的迭代器实现了Collection
和Iterator
接口的所有可选方法.
这个类也是Java集合框架的一部分噢。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
首先是一个普通队列, 且还是阻塞队列. 拥有他们的所有属性,同时,还要求放入的元素,是实现了Delayed
接口的. 该接口定义如下:
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
根据给定的时间单位,返回剩余的延迟时间.
// 锁
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 正在等待队头元素的线程
private Thread leader;
// 有元素可用的等待条件
private final Condition available = lock.newCondition();
使用优先级队列来保存元素,同时记录等待队首元素的线程.
这个优先级队列,是java.util
包里的,暂不做详细解释,相信大家都懂哈.
提供了等待条件available
来负责阻塞线程与唤醒.
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
提供两个构造方法,分别构造一个空的延迟队列和一个加载给定集合的阻塞队列.
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 插入元素
q.offer(e);
// 队头元素是刚才插入的元素,说明有可用元素,唤醒等待线程们
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
4个入队系列的方法,本质上都是调用了offer
. 直接调用内部优先级队列
的offer,无脑写入即可.
可以看到,该方法永远返回ture. 因为这个延迟队列也是无界的,因此不需要阻塞,不会插入失败.
插入只有两种可能:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取第一个元素
E first = q.peek();
// 第一个元素为空,或者第一个元素的延迟时间没有到期,返回null.
// 否则返回该元素
return (first == null || first.getDelay(NANOSECONDS) > 0)
? null
: q.poll();
} finally {
lock.unlock();
}
}
首先查看第一个元素,如果不为空且已经过期了,那就弹出进行返回. 否则就返回null.
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 自旋
for (;;) {
// 查看第一个元素
E first = q.peek();
// 第一个元素为空,直接等待
if (first == null)
available.await();
else {
// 第一个元素已经超时,可用了,就进行弹出
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
return q.poll();
// 等待
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
// 如果当前是第一个等待队首元素的线程,记录一下当前线程,且只阻塞剩余的时间,就苏醒来检查一下是否可用了
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 拿到元素后,协助唤醒一下等待线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
这个阻塞版本的获取元素复杂一点.
和上面的take
代码很像,只是在每一个线程的阻塞时都加上了时间限制,就不重复讲了.
这个简单的方法为啥要写呢,因为要注意: 返回的size,是所有过期的,未过期的总数.
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}
直接调用了内部的优先级队列的size()
方法,没有判断是否过期.
由于在延迟队列中,总是需要看一下,队首元素,如果已经过期,就弹出,没过期,就不处理. 因此也简单看一下peek()
方法.
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
没啥,加锁,然后调用优先级队列的peek
完事了。
延迟队列,本质上是一个带有优先级的阻塞队列,且根据延迟限制队首元素的出队.
java.util.PriorityQueue
,本质上实现应该也是一个堆实现的.Condition
条件. 由于是无界队列,入队操作不会阻塞. 出队行为在条件上等待,当有符合条件的元素时,唤醒所有等待线程.java.util.PriorityQueue
不是线程安全的,因此使用额外的一个ReentrantLock
来限制对数据的读写访问.完。
最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。 也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。
以上皆为个人所思所得,如有错误欢迎评论区指正。
欢迎转载,烦请署名并保留原文链接。
联系邮箱:huyanshi2580@gmail.com
更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十