本文源码基于: JDK13
一个用链表实现的无界的线程安全的队列. 这个队列提供FIFO
的元素顺序.
当多个线程需要共享一个集合的访问时, ConcurrentLinkedQueue
是一个合适的选择. 向其他的并发集合实现一样,这个类不接受null元素.
这个实现使用了高效的无锁算法. 来源于paper Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
迭代器是弱同步的,返回元素是创建迭代器时的元素快照. 不会抛出ConcurrentModificationException
. 可以和其他操作一起并发的进行. 迭代器创建时的元素,将会被精准的返回一次.
需要注意的是,和其他大多数集合不同,size
方法不是常量时间的操作. 因为队列的异步特性,决定了计数当前的元素需要遍历所有元素,因此如果有别的线程正在更改,size
方法可能返回不准确的数字.
批量操作不保证原子性,比如addAll
等. 当foreach
和addAll
一起运行时,可能foreach
只能观察到部分的元素.
这个类和他的迭代器实现了Queue
和Iterator
的所有可选方法.
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
这是一个队列.
static final class Node<E> {
volatile E item;
volatile Node<E> next;
/**
* Constructs a node holding item. Uses relaxed write because
* item can only be seen after piggy-backing publication via CAS.
*/
Node(E item) {
ITEM.set(this, item);
}
/** Constructs a dead dummy node. */
Node() {}
void appendRelaxed(Node<E> next) {
// assert next != null;
// assert this.next == null;
NEXT.set(this, next);
}
boolean casItem(E cmp, E val) {
// assert item == cmp || item == null;
// assert cmp != null;
// assert val == null;
return ITEM.compareAndSet(this, cmp, val);
}
}
保存了当前节点的数据Item
及指向下一个节点的指针next
.
提供了两个cas方法,分别用来更改数据以及指针.
transient volatile Node<E> head;
private transient volatile Node<E> tail;
保存了链表的头尾节点.
public ConcurrentLinkedQueue() {
head = tail = new Node<E>();
}
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
if (h == null)
h = t = newNode;
else
t.appendRelaxed(t = newNode);
}
if (h == null)
h = t = new Node<E>();
head = h;
tail = t;
}
提供了两个构造方法,支持创建空的队列和将给定的集合全部初始化进队列.
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
// 创建新的节点
final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 尾节点的下一个为空. 直接cas更新,且成功了
if (q == null) {
// p is last node
if (NEXT.compareAndSet(p, null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time; failure is OK
TAIL.weakCompareAndSet(this, t, newNode);
return true;
}
// Lost CAS race to another thread; re-read next
}
// p节点被删除了,也就是出队了. 重新设置p节点的值
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 如果别的地方更新了尾节点,看一下应该继续向后找还是.
p = (p != t && t != (t = tail)) ? t : q;
}
}
offer
方法进行实际的添加操作,将给定的节点,链接到已有队列的尾部. 过程中要充分考虑到与其他线程产生竞争的情况.
public E poll() {
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
// 从队头开始
final E item;
// 如果队头就OK,直接cas更新,并且返回结果
if ((item = p.item) != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 头结点的下一个为空,队列为空了
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// 当前节点出队了,重新从队头开始
else if (p == q)
continue restartFromHead;
}
}
}
从队头开始遍历,如果成功拿到头结点,且CAS更新成功,就返回. 否则继续找到下一个.
public E peek() {
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
final E item;
// 队头元素OK. 返回队头元素
if ((item = p.item) != null
|| (q = p.next) == null) {
updateHead(h, p);
return item;
}
// 当前节点出队了,重新找队头
else if (p == q)
continue restartFromHead;
}
}
}
比较简单,不断尝试获取队头元素.
public int size() {
restartFromHead: for (;;) {
int count = 0;
// 从队头开始计数
for (Node<E> p = first(); p != null;) {
if (p.item != null)
if (++count == Integer.MAX_VALUE)
break; // @see Collection.size()
// 当前元素出队了,从头开始计数
if (p == (p = p.next))
continue restartFromHead;
}
return count;
}
}
每次都从队头开始计数, 如果中间与双开被别人更改的情况,就重新从队头开始计数.
一个非阻塞的,线程安全的队列,全程无锁,采用CAS+自旋实现.
完。
最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。 也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。
以上皆为个人所思所得,如有错误欢迎评论区指正。
欢迎转载,烦请署名并保留原文链接。
联系邮箱:huyanshi2580@gmail.com
更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十