前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >(juc系列)并发集合之concurrentlinkedqueue源码

(juc系列)并发集合之concurrentlinkedqueue源码

作者头像
呼延十
发布2021-11-10 11:19:37
4490
发布2021-11-10 11:19:37
举报
文章被收录于专栏:呼延

本文源码基于: JDK13

ConcurrentLinkedQueue

官方注释翻译

一个用链表实现的无界的线程安全的队列. 这个队列提供FIFO的元素顺序.

当多个线程需要共享一个集合的访问时, ConcurrentLinkedQueue是一个合适的选择. 向其他的并发集合实现一样,这个类不接受null元素.

这个实现使用了高效的无锁算法. 来源于paper Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms

迭代器是弱同步的,返回元素是创建迭代器时的元素快照. 不会抛出ConcurrentModificationException. 可以和其他操作一起并发的进行. 迭代器创建时的元素,将会被精准的返回一次.

需要注意的是,和其他大多数集合不同,size方法不是常量时间的操作. 因为队列的异步特性,决定了计数当前的元素需要遍历所有元素,因此如果有别的线程正在更改,size方法可能返回不准确的数字.

批量操作不保证原子性,比如addAll等. 当foreachaddAll一起运行时,可能foreach只能观察到部分的元素.

这个类和他的迭代器实现了QueueIterator的所有可选方法.

源码

定义
代码语言:javascript
复制
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {

这是一个队列.

链表节点
代码语言:javascript
复制
    static final class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a node holding item.  Uses relaxed write because
         * item can only be seen after piggy-backing publication via CAS.
         */
        Node(E item) {
            ITEM.set(this, item);
        }

        /** Constructs a dead dummy node. */
        Node() {}

        void appendRelaxed(Node<E> next) {
            // assert next != null;
            // assert this.next == null;
            NEXT.set(this, next);
        }

        boolean casItem(E cmp, E val) {
            // assert item == cmp || item == null;
            // assert cmp != null;
            // assert val == null;
            return ITEM.compareAndSet(this, cmp, val);
        }
    }

保存了当前节点的数据Item及指向下一个节点的指针next.

提供了两个cas方法,分别用来更改数据以及指针.

属性
代码语言:javascript
复制
    transient volatile Node<E> head;

    private transient volatile Node<E> tail;

保存了链表的头尾节点.

构造方法
代码语言:javascript
复制
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>();
    }

    public ConcurrentLinkedQueue(Collection<? extends E> c) {
        Node<E> h = null, t = null;
        for (E e : c) {
            Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
            if (h == null)
                h = t = newNode;
            else
                t.appendRelaxed(t = newNode);
        }
        if (h == null)
            h = t = new Node<E>();
        head = h;
        tail = t;
    }

提供了两个构造方法,支持创建空的队列和将给定的集合全部初始化进队列.

入队方法 offer
代码语言:javascript
复制
    public boolean add(E e) {
        return offer(e);
    }

    public boolean offer(E e) {
        // 创建新的节点
        final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            // 尾节点的下一个为空. 直接cas更新,且成功了
            if (q == null) {
                // p is last node
                if (NEXT.compareAndSet(p, null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time; failure is OK
                        TAIL.weakCompareAndSet(this, t, newNode);
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            // p节点被删除了,也就是出队了. 重新设置p节点的值
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                // 如果别的地方更新了尾节点,看一下应该继续向后找还是.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

offer方法进行实际的添加操作,将给定的节点,链接到已有队列的尾部. 过程中要充分考虑到与其他线程产生竞争的情况.

出队方法 poll
代码语言:javascript
复制
    public E poll() {
        restartFromHead: for (;;) {
            for (Node<E> h = head, p = h, q;; p = q) {
                // 从队头开始
                final E item;
                // 如果队头就OK,直接cas更新,并且返回结果
                if ((item = p.item) != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                // 头结点的下一个为空,队列为空了
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                // 当前节点出队了,重新从队头开始
                else if (p == q)
                    continue restartFromHead;
            }
        }
    }

从队头开始遍历,如果成功拿到头结点,且CAS更新成功,就返回. 否则继续找到下一个.

查看队首 peek
代码语言:javascript
复制
    public E peek() {
        restartFromHead: for (;;) {
            for (Node<E> h = head, p = h, q;; p = q) {
                final E item;
                // 队头元素OK. 返回队头元素
                if ((item = p.item) != null
                    || (q = p.next) == null) {
                    updateHead(h, p);
                    return item;
                }
                // 当前节点出队了,重新找队头
                else if (p == q)
                    continue restartFromHead;
            }
        }
    }

比较简单,不断尝试获取队头元素.

查看数量 size
代码语言:javascript
复制
    public int size() {
        restartFromHead: for (;;) {
            int count = 0;
            // 从队头开始计数
            for (Node<E> p = first(); p != null;) {
                if (p.item != null)
                    if (++count == Integer.MAX_VALUE)
                        break;  // @see Collection.size()
                // 当前元素出队了,从头开始计数
                if (p == (p = p.next))
                    continue restartFromHead;
            }
            
            return count;
        }
    }

每次都从队头开始计数, 如果中间与双开被别人更改的情况,就重新从队头开始计数.

总结

一个非阻塞的,线程安全的队列,全程无锁,采用CAS+自旋实现.

参考文章

完。

联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。 也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。

以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ConcurrentLinkedQueue
    • 官方注释翻译
      • 源码
        • 定义
        • 链表节点
        • 属性
        • 构造方法
        • 入队方法 offer
        • 出队方法 poll
        • 查看队首 peek
        • 查看数量 size
        • 总结
    • 参考文章
    • 联系我
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档