首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >PriorityBlockingQueue源码解析

PriorityBlockingQueue源码解析

原创
作者头像
后台技术汇
修改2024-11-25 08:38:09
修改2024-11-25 08:38:09
19600
代码可运行
举报
运行总次数:0
代码可运行

“好事”文章分享

作者:繁依Fanyi

文章:构建加载状态与流畅交互的精妙艺术

**文章总结**

本文探讨了如何通过技术手段实现加载状态与流畅交互的优化,旨在提升用户体验。文章详细介绍了在应用开发中,如何通过合理的架构设计、异步处理和性能优化等策略,减少用户等待时间,提高系统的响应速度和稳定性。

1、底层数据结构

完全二叉树

完全二叉树是一种特殊的二叉树,在需要高效存储和检索数据的情况下,完全二叉树在计算机科学中有着广泛的应用。它具有以下特点:

  1. 定义:对于深度为k的,有n个节点的二叉树,当且仅当其每一个节点都与深度为k的满二叉树中编号从1至n的节点一一对应时称之为完全二叉树。
  2. 特点:
    1. 除了最后一层外,每一层的节点都被填满(即,每一层节点数量满足2的i-1次方)。
    2. 如果最后一层存在节点,那么这些节点从左到右依次填充,不留空缺(即,最下层节点数<=2的i-1次方,最后一层至少1个节点)。
    3. 完全二叉树的高度通常较小,具有良好的平衡性。

二叉堆

二叉堆和完全二叉树之间存在密切的关系,主要体现在以下几个方面:

二叉堆是一种特殊的完全二叉树

  • 定义:二叉堆是一种特殊的完全二叉树,其中每个父节点的值都大于或等于(最大堆)或小于或等于(最小堆)其子节点的值。
  • 结构性质:二叉堆的节点按照完全二叉树的规则排列,除了最后一层外,其他层的节点都是满的,且最后一层的节点从左到右依次排列

完全二叉树是二叉堆的底层数据结构

  • 数组表示:二叉堆通常用数组来表示,数组的第一个位置(索引为1)为空,第二个位置是根节点,后面的节点按照完全二叉树的顺序排列
  • 节点关系:对于数组中的第i个元素,其左子节点的位置是2i,右子节点的位置是2i+1,父节点的位置是i/2(根节点除外)

大顶堆和小顶堆

大顶堆、小顶堆都是二叉堆的变种,二叉堆则是一种特殊的完全二叉树。

二叉堆、大顶堆和小顶堆都是基于完全二叉树的概念,它们之间的主要区别在于节点值的大小关系以及应用场景。

大顶堆
  • 定义:在大顶堆中,根结点的值必须大于它的孩子结点的值,对于二叉树中所有子树也应递归地满足这一特性
  • 应用:大顶堆通常用于实现优先队列,其中堆顶元素是最优先的元素
小顶堆
  • 定义:在小顶堆中,根结点的值必须小于它的孩子结点的值,且对于二叉树的所有子树也均递归地满足同一特性
  • 应用:小顶堆也常用于实现优先队列,其中堆顶元素是最不优先的元素
两者区别
  • 大小关系:大顶堆中父节点的值大于等于子节点的值,而小顶堆中父节点的值小于等于子节点的值
  • 应用场景:大顶堆常用于需要获取最大值的场景,小顶堆则常用于需要获取最小值的场景

长度为len,堆节点个数为size,size<=len。

大根堆:最大值在根节点,其余节点都小

小堆根:最小值在根节点,其余节点都大

小顶堆的操作

堆的初始化

初始化如下的4层小顶堆,可以通过数组heap[11]来支撑实现小顶堆。

数组下标:表示元素节点,数组元素值:表示节点。

堆的增加操作(针对堆底)

在小顶堆中,堆底增加元素,可以分解为以下步骤:

  • 添加新元素
    • 首先,将新元素添加到堆的末尾,即堆底。
  • 上浮操作(Sift Up)
    • 为了保持小顶堆的属性,需要将新添加的元素上浮到正确的位置。
    • 比较新元素和父亲parent节点(parent下标=当前下标/2):
      • 如果新元素的值小于其父节点的值,则交换它们的位置。
      • 如果新元素的值大于或等于其父节点的值,则停止上浮操作。
    • 一直递归到对顶,重复上述比较和交换过程,直到新元素到达正确的位置或成为根节点。

核心代码算法如下:

堆的删除操作(针对堆顶)

在小顶堆中,删除操作通常是指删除堆顶元素(即最小元素),并保持堆属性不变。以下是删除堆顶元素的步骤:

  1. 删除堆顶元素
    1. 首先,将堆顶元素删除。由于堆顶元素是最小元素,删除它后需要找到一个新的最小元素来替代它。
  2. 替换堆顶元素
    1. 将堆的最后一个元素(即堆底元素)移动到堆顶位置。
  3. 下沉操作(Sift Down)
    1. 为了保持小顶堆的属性,需要将新的堆顶元素下沉到正确的位置。具体步骤如下:
      1. 比较新堆顶元素与其子节点:
        1. 如果新堆顶元素的值大于其子节点中的最小值,则交换它们的位置。
        2. 如果新堆顶元素的值小于或等于其子节点的值,则停止下沉操作。
    2. 递归重复上述比较和交换过程,直到新堆顶元素到达正确的位置或成为叶子节点。

核心代码如下:

总结一下

  • 堆是如何确定parent下标?按照当前元素的二分之一,即 (parent下标=当前下标/2)
  • 堆是如何判断比较parent和son?在上面案例里,我们是直接比较字面值;但是PriorityBlockingQueue应用里是通过comparable接口的实现,即比较器。

2、PriorityBlockingQueue源码分析

代码语言:javascript
代码运行次数:0
运行
复制
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    // 默认容量为11
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    // 最大数组大小
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    // 存储元素的地方
    private transient Object[] queue;
    // 元素个数
    private transient int size;
    // 比较器
    private transient Comparator<? super E> comparator;
    // 重入锁
    private final ReentrantLock lock;
    // 非空条件
    private final Condition notEmpty;
    // 扩容的时候使用的控制变量,CAS更新这个值,谁更新成功了谁扩容,其它线程让出CPU
    private transient volatile int allocationSpinLock;
    // 不阻塞的优先级队列,非存储元素的地方,仅用于序列化/反序列化时
    private PriorityQueue<E> q;
}

成员定义:

  • DEFAULT_INITIAL_CAPACITY: 默认容量为11
  • MAX_ARRAY_SIZE:最大数组大小是Integer.MAX_VALUE - 8,说明了PriorityBlockingQueue是一个无界队列;另外之所以减掉8,是因为要兼容JVM版本
  • Object[] queue:优先级队列的存储元素容器
  • size:元素个数
  • comparator:比较器
  • ReentrantLock lock:重入锁
  • Condition notEmpty:非空条件
  • allocationSpinLock:队列容器扩容的时候使用的控制变量,CAS更新这个值,谁更新成功了谁扩容,其它线程让出CPU
  • PriorityQueue<E> q:不阻塞的优先级队列,非存储元素的地方,仅用于序列化/反序列化时

构造器

代码语言:javascript
代码运行次数:0
运行
复制
// 默认容量为11
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
// 传入初始容量
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
// 传入初始容量和比较器
// 初始化各变量
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.comparator = comparator;
    this.queue = new Object[Math.max(1, initialCapacity)];
}

代码解析:

  • 可控参数:initialCapacity、比较器

入队操作:offer/put

每个阻塞队列都有四个方法,offer()put() 方法是 PriorityBlockingQueue 中两种用于向队列中插入元素的方法,它们之间的主要区别在于它们处理队列已满时的行为:

  1. offer() 方法
  • 非阻塞:offer(E e) 方法尝试将指定的元素插入队列。如果队列未满,元素会被插入并返回 true;如果队列已满,则不会插入元素并返回 false
  • 超时版本:还有一个带超时的版本 offer(E e, long timeout, TimeUnit unit),该方法会在指定的时间内尝试插入元素。如果在指定时间内队列未满,元素会被插入并返回 true;如果超时时间内队列仍然已满,则不会插入元素并返回 false
  1. put() 方法
  • 阻塞:put(E e) 方法将指定的元素插入队列。如果队列未满,元素会被插入;如果队列已满,该方法会阻塞当前线程,直到队列中有可用空间。
  • 无超时:put() 方法没有超时版本,它会一直阻塞直到有空间可用。

我们这里只分析一个offer(E e)方法:

代码语言:javascript
代码运行次数:0
运行
复制
public boolean offer(E e) {
    // 元素不能为空
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    int n, cap;
    Object[] array;
    // 判断是否需要扩容,即元素个数达到了数组容量
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        // 根据是否有比较器选择不同的方法
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        // 插入元素完毕,元素个数加1            
        size = n + 1;
        // 唤醒notEmpty条件
        notEmpty.signal();
    } finally {
        // 解锁
        lock.unlock();
    }
    return true;
}

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        // 取父节点
        int parent = (k - 1) >>> 1;
        // 父节点的元素值
        Object e = array[parent];
        // 如果key大于父节点,堆化结束
        if (key.compareTo((T) e) >= 0)
            break;
        // 否则,交换二者的位置,继续下一轮比较
        array[k] = e;
        k = parent;
    }
    // 找到了应该放的位置,放入元素
    array[k] = key;
}

代码解析:

入队的整个操作跟PriorityQueue几乎一致:

  1. 加锁;
  2. 判断是否需要扩容;
  3. 添加元素并做自下而上的堆化;
  4. 元素个数加1并唤醒notEmpty条件,唤醒取元素的线程;
  5. 解锁;

扩容操作:tryGrow

代码语言:javascript
代码运行次数:0
运行
复制
private void tryGrow(Object[] array, int oldCap) {
    // 先释放锁,因为是从offer()方法的锁内部过来的
    // 这里先释放锁,使用allocationSpinLock变量控制扩容的过程
    // 防止阻塞的线程过多
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // CAS更新allocationSpinLock变量为1的线程获得扩容资格
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // 旧容量小于64则翻倍,旧容量大于64则增加一半
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            // 判断新容量是否溢出
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // 创建新数组
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            // 相当于解锁
            allocationSpinLock = 0;
        }
    }
    // 只有进入了上面条件的才会满足这个条件
    // 意思是让其它线程让出CPU
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    // 再次加锁
    lock.lock();
    // 判断新数组创建成功并且旧数组没有被替换过
    if (newArray != null && queue == array) {
        // 队列赋值为新数组
        queue = newArray;
        // 并拷贝旧数组元素到新数组中
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

代码解析:

  1. 解锁,解除offer()方法中加的锁;
  2. 使用allocationSpinLock变量的CAS操作来控制扩容的过程;
  3. 旧容量小于64则翻倍,旧容量大于64则增加一半;
  4. 创建新数组;
  5. 修改allocationSpinLock0,相当于解锁;
  6. 其它线程在扩容的过程中要让出CPU
  7. 再次加锁;
  8. 新数组创建成功,把旧数组元素拷贝过来,并返回到offer()方法中继续添加元素操作;

出队操作:take&poll

PriorityBlockingQueue 支持线程安全的插入、删除和查找操作。

take()poll() 方法是 PriorityBlockingQueue 中两种用于检索并移除队列头部的元素的方法,它们之间的主要区别在于它们处理队列为空时的行为:

  1. take() 方法:当队列为空时,take() 方法会阻塞当前线程,直到队列中有新的元素可用。这种行为使得 take() 方法非常适合在需要等待可用元素的生产者-消费者场景中使用。
  2. poll() 方法:当队列为空时,poll() 方法会立即返回 null,而不会阻塞当前线程。如果需要在指定的时间内等待可用元素,可以使用 poll(long timeout, TimeUnit unit) 方法,该方法会在指定的超时时间内等待元素,如果在超时时间内没有找到元素,则返回 null

这里我们主要分析take方法。

代码语言:javascript
代码运行次数:0
运行
复制
public E take() throws InterruptedException {
    //申请可重入锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // 判断结果是否为空
        while ( (result = dequeue()) == null)
            // 如果为空,则挂起当前线程
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

private E dequeue() {
    // 元素个数减1
    int n = size - 1;
    if (n < 0)
        // 数组元素不足,返回null
        return null;
    else {
        Object[] array = queue;
        // 弹出堆顶元素
        E result = (E) array[0];
        // 把堆尾元素拿到堆顶
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        // 并做自上而下的堆化
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        // 修改size
        size = n;
        // 返回出队的元素
        return result;
    }
}

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        // 只需要遍历到叶子节点就够了
        while (k < half) {
            // 左子节点
            int child = (k << 1) + 1; // assume left child is least
            // 左子节点的值
            Object c = array[child];
            // 右子节点
            int right = child + 1;
            // 取左右子节点中最小的值
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            // key如果比左右子节点都小,则堆化结束
            if (key.compareTo((T) c) <= 0)
                break;
            // 否则,交换key与左右子节点中最小的节点的位置
            array[k] = c;
            k = child;
        }
        // 找到了放元素的位置,放置元素
        array[k] = key;
    }
}

代码分析:出队的过程与PriorityQueue基本类似:

  1. 加锁;
  2. 判断是否出队成功,未成功就阻塞在notEmpty条件上;
  3. 出队时弹出堆顶元素,并把堆尾元素拿到堆顶;
  4. 再做自上而下的堆化;
    1. 如何比较parent和Min(左节点,右节点)呢?
  5. 解锁;

总结一下

【1】PriorityBlockingQueue是堆的一个应用场景,底层就是用了小顶堆

体现小顶堆的逻辑是:

  • 删除节点逻辑,针对堆顶:siftDownComparable/siftDownUsingComparator
  • 增加节点逻辑,针对堆底:siftUpComparable/siftUpUsingComparator

【2】为什么PriorityQueue中的add(e)方法没有做异常检查呢?

因为PriorityQueue是无限增长的队列,元素不够用了会扩容,所以添加元素不会失败。

3、总结

  1. PriorityBlockingQueue是一个小顶堆,只有堆顶存储着最小的元素;
  2. PriorityBlockingQueue整个入队出队的过程与PriorityQueue基本是保持一致的;
  3. PriorityBlockingQueue使用一个锁+一个notEmpty条件控制并发安全;
  4. PriorityBlockingQueue扩容时使用一个单独变量的CAS操作来控制只有一个线程进行扩容;
  5. 入队先加堆底,然后使用自下而上的堆化;出队先删对顶,然后使用自上而下的堆化;这里也就满足了队列的入口和出口了

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • “好事”文章分享
  • 1、底层数据结构
    • 完全二叉树
    • 二叉堆
      • 大顶堆和小顶堆
    • 小顶堆的操作
      • 堆的初始化
      • 堆的增加操作(针对堆底)
      • 堆的删除操作(针对堆顶)
    • 总结一下
  • 2、PriorityBlockingQueue源码分析
    • 构造器
    • 入队操作:offer/put
    • 扩容操作:tryGrow
    • 出队操作:take&poll
    • 总结一下
  • 3、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档