首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >ArrayBlockingQueue 分析

ArrayBlockingQueue 分析

原创
作者头像
ruochen
发布2021-12-15 14:28:17
发布2021-12-15 14:28:17
9640
举报

ArrayBlockingQueue底层使用环形数组实现阻塞队列,因此为有界队列,其容量上限在实例化时通过传入的参数capacity决定,本质上就是实例化了一个长度为capacity的数组。

代码语言:txt
复制
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
代码语言:txt
复制
        implements BlockingQueue<E>, java.io.Serializable {
  • 使用变量:
代码语言:txt
复制
* Condition 对象简介:  Condition是在java
1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。undefinedCondition是个接口,基本的方法就是await()和signal()方法;Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition();调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用。undefinedConditon中的await()对应Object的wait();Condition中的signal()对应Object的notify();Condition中的signalAll()对应Object的notifyAll()。undefined引用自:
代码语言:txt
复制
* Itrs 对象简介:  ArrayBlockingQueue队列集合中所有的迭代器都在Itrs迭代器组中进行管理,这些迭代器将在Itrs迭代器组中以单向链表的方式进行排列。所以ArrayBlockingQueue队列需要在特定的场景下,对已经失效、甚至已经被垃圾回收的迭代器管理节点进行清理。undefined例如,当ArrayBlockingQueue队列有新的迭代器被创建时(并为非独立/无效工作模式),Itrs迭代器组就会尝试清理那些无效的迭代器,其工作逻辑主要由Itrs.doSomeSweeping(boolean)方法进行实现。undefined引用自:
代码语言:txt
复制
    final Object[] items; //底层数组实现
代码语言:txt
复制
    int takeIndex; //队列头指针
代码语言:txt
复制
    int putIndex; //队列尾指针
代码语言:txt
复制
    int count; // 当前队列中的对象(任务)数
代码语言:txt
复制
    final ReentrantLock lock; //使用可重入(默认非公平)锁对象加锁
代码语言:txt
复制
    private final Condition notEmpty; // 用于在队列满发生写阻塞时进行线程通信
代码语言:txt
复制
    private final Condition notFull; //  用于在队列空发生读阻塞时进行线程通信
代码语言:txt
复制
    transient Itrs itrs = null; // 迭代器组对象
  • 底层调用方法:
代码语言:txt
复制
* checkNotNull(Object v):检查当前传入的任务对象是否为null,若为null报空指针异常
* enqueue(E x):向队列尾插入元素,内部构建了环形队列,并维护了当前任务数
* dequeue():从队列头取出元素,内部构建了环形队列,并维护了当前任务数
代码语言:txt
复制
    private static void checkNotNull(Object v) {
代码语言:txt
复制
        if (v == null)
代码语言:txt
复制
            throw new NullPointerException();
代码语言:txt
复制
    }
代码语言:txt
复制
    private void enqueue(E x) {
代码语言:txt
复制
        // 断言当前线程持有锁且队列尾指针为空
代码语言:txt
复制
        final Object[] items = this.items;
代码语言:txt
复制
        items[putIndex] = x; // 将对象插入尾指针指向位置
代码语言:txt
复制
        if (++putIndex == items.length) // 此处构建环形队列
代码语言:txt
复制
            putIndex = 0;
代码语言:txt
复制
        count++; // 任务数 +1
代码语言:txt
复制
        notEmpty.signal(); // 归还锁对象,并唤醒阻塞的线程
代码语言:txt
复制
    }
代码语言:txt
复制
    private E dequeue() {
代码语言:txt
复制
        // 断言当前线程持有锁且队列头指针不为空
代码语言:txt
复制
        final Object[] items = this.items;
代码语言:txt
复制
        @SuppressWarnings("unchecked")
代码语言:txt
复制
        E x = (E) items[takeIndex];
代码语言:txt
复制
        items[takeIndex] = null; // 获取当前头指针对应对象,并将指针位置置空
代码语言:txt
复制
        if (++takeIndex == items.length) // 此处构建环形队列
代码语言:txt
复制
            takeIndex = 0;
代码语言:txt
复制
        count--; // 任务数 -1
代码语言:txt
复制
        if (itrs != null)
代码语言:txt
复制
            itrs.elementDequeued();
代码语言:txt
复制
        notFull.signal(); // 归还锁对象,并唤醒阻塞的线程
代码语言:txt
复制
        return x;
代码语言:txt
复制
    }
  • 构造方法:
代码语言:txt
复制
* ArrayBlockingQueue(int capacity):默认非公平可重入锁实现
* ArrayBlockingQueue(int capacity, boolean fair):可通过fair参数选择是否使用公平锁
* ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c):构造时添加集合中的对象到队列中·
代码语言:txt
复制
    /**
代码语言:txt
复制
    *  capacity:队列容量
    *  默认非公平锁
    */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
代码语言:txt
复制
    /**
代码语言:txt
复制
    *  capacity:队列容量    fair:是否为公平加锁
    */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair); // 获取可重入锁(fair为true表示公平锁)
        notEmpty = lock.newCondition(); //从锁对象获取读阻塞的线程通信对象
        notFull =  lock.newCondition(); // 从锁对象获取写阻塞的线程通信对象
    }
代码语言:txt
复制
    /**
代码语言:txt
复制
    *  capacity:队列容量    fair:是否为公平加锁    c:将集合中的元素放入队列
    */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);// 调用构造方法
代码语言:txt
复制
        final ReentrantLock lock = this.lock;
代码语言:txt
复制
        lock.lock(); // 加锁仅为了可见性,不为互斥性
代码语言:txt
复制
        try {
代码语言:txt
复制
            int i = 0;
代码语言:txt
复制
            try {
代码语言:txt
复制
                for (E e : c) { // 将集合元素写入队列
代码语言:txt
复制
                    checkNotNull(e);
代码语言:txt
复制
                    items[i++] = e;
代码语言:txt
复制
                }
代码语言:txt
复制
            } catch (ArrayIndexOutOfBoundsException ex) {
代码语言:txt
复制
                throw new IllegalArgumentException();
代码语言:txt
复制
            }
代码语言:txt
复制
            count = i; // 初始化元素数量为i
代码语言:txt
复制
            putIndex = (i == capacity) ? 0 : i; //初始化队列尾指针
代码语言:txt
复制
        } finally {
代码语言:txt
复制
            lock.unlock(); //解锁
代码语言:txt
复制
        }
代码语言:txt
复制
    }
  • 入队列方法 :
代码语言:txt
复制
* put(E e):在阻塞时触发wait使线程等待,适用于并发量较小的情形
* offer(E e):若队列满直接false,适用于并发量极大的情形
* offer(E e, long timeout, TimeUnit unit):会在超时后直接false,适用于并发量较大的情形
代码语言:txt
复制
    public void put(E e) throws InterruptedException {
代码语言:txt
复制
        checkNotNull(e); // 检查 e 不为 null,若为 null 报空指针异常
代码语言:txt
复制
        final ReentrantLock lock = this.lock;
代码语言:txt
复制
        lock.lockInterruptibly(); // 可中断加锁
代码语言:txt
复制
        try {
代码语言:txt
复制
            while (count == items.length)
代码语言:txt
复制
                notFull.await(); // 若队列满,线程 wait()
代码语言:txt
复制
            enqueue(e); // 若队列不满,放入对象,并唤醒读阻塞的线程
代码语言:txt
复制
        } finally {
代码语言:txt
复制
            lock.unlock();
代码语言:txt
复制
        }
代码语言:txt
复制
    }
代码语言:txt
复制
    public boolean offer(E e) {
代码语言:txt
复制
        checkNotNull(e); // 确保 e 不为 null
代码语言:txt
复制
        final ReentrantLock lock = this.lock;
代码语言:txt
复制
        lock.lock();
代码语言:txt
复制
        try {
代码语言:txt
复制
            if (count == items.length) //若队列满直接放弃 false
代码语言:txt
复制
                return false;
代码语言:txt
复制
            else {
代码语言:txt
复制
                enqueue(e); // 队列不满直接插入
代码语言:txt
复制
                return true;
代码语言:txt
复制
            }
代码语言:txt
复制
        } finally {
代码语言:txt
复制
            lock.unlock();
代码语言:txt
复制
        }
代码语言:txt
复制
    }
代码语言:txt
复制
    public boolean offer(E e, long timeout, TimeUnit unit)
代码语言:txt
复制
        throws InterruptedException {
代码语言:txt
复制
        checkNotNull(e);
代码语言:txt
复制
        long nanos = unit.toNanos(timeout); // 获取 nano 等待时间
代码语言:txt
复制
        final ReentrantLock lock = this.lock;
代码语言:txt
复制
        lock.lockInterruptibly();
代码语言:txt
复制
        try {
代码语言:txt
复制
            while (count == items.length) {
代码语言:txt
复制
                if (nanos <= 0)
代码语言:txt
复制
                    return false;
代码语言:txt
复制
                nanos = notFull.awaitNanos(nanos); // 定时等待
代码语言:txt
复制
            }
代码语言:txt
复制
            enqueue(e);
代码语言:txt
复制
            return true;
代码语言:txt
复制
        } finally {
代码语言:txt
复制
            lock.unlock();
代码语言:txt
复制
        }
代码语言:txt
复制
    }
  • 出队列方法:
代码语言:txt
复制
* take():会在阻塞时触发wait使线程等待,适用于并发量较小的情形
* poll():若队列空直接false,适用于并发量极大的情形
* poll(long timeout, TimeUnit unit):会在超时后直接false,适用于并发量较大的情形
代码语言:txt
复制
    public E take() throws InterruptedException {
代码语言:txt
复制
        final ReentrantLock lock = this.lock;
代码语言:txt
复制
        lock.lockInterruptibly(); // 可中断加锁
代码语言:txt
复制
        try {
代码语言:txt
复制
            while (count == 0) //若队列空,线程 wait()
代码语言:txt
复制
                notEmpty.await(); 
代码语言:txt
复制
            return dequeue(); // 若队列不空,取出元素,并唤醒写阻塞的线程(可能当前取出后队列才不满)
代码语言:txt
复制
        } finally {
代码语言:txt
复制
            lock.unlock();
代码语言:txt
复制
        }
代码语言:txt
复制
    }
代码语言:txt
复制
    public E poll() {
代码语言:txt
复制
        final ReentrantLock lock = this.lock;
代码语言:txt
复制
        lock.lock();
代码语言:txt
复制
        try {
代码语言:txt
复制
            return (count == 0) ? null : dequeue(); // 取出队列头元素,若队列为空返回 null
代码语言:txt
复制
        } finally {
代码语言:txt
复制
            lock.unlock();
代码语言:txt
复制
        }
代码语言:txt
复制
    }
代码语言:txt
复制
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
代码语言:txt
复制
        long nanos = unit.toNanos(timeout); // 获取 nano 时间
代码语言:txt
复制
        final ReentrantLock lock = this.lock;
代码语言:txt
复制
        lock.lockInterruptibly();
代码语言:txt
复制
        try {
代码语言:txt
复制
            while (count == 0) {
代码语言:txt
复制
                if (nanos <= 0)
代码语言:txt
复制
                    return null;
代码语言:txt
复制
                nanos = notEmpty.awaitNanos(nanos); // 定时等待,若 nanos 时间耗尽,则返回 null 
代码语言:txt
复制
            }
代码语言:txt
复制
            return dequeue(); // 若队列不空,取出元素,并唤醒写阻塞的线程(可能当前取出后队列才不满)
代码语言:txt
复制
        } finally {
代码语言:txt
复制
            lock.unlock();
代码语言:txt
复制
        }
代码语言:txt
复制
    }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档