目录
阻塞队列指的就是在队列的基础上附加了两个操作的队列。 两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
放入数据:
获取数据:
在java中提供了7个阻塞队列,分别如下
ArryBlockingQueue
它是用数组实现的有界阻塞队列,并按照先进先出(FIFO) 的原则对元素进行排序。默认情况下不保证线程公平的访问队列。 公平访问队列就是指阻塞的所有生产者线程或消费线程,当队列可用是,可以按照阻塞的先后顺序访问队列。即先阻塞的生产者线程,可以先往队列里插入元素;先阻塞的消费者线程,可以先从队列里获取元素 。通常情况下为了保证公平性会降低通吐量。我们可以使用以下代码创建一个公平的阻塞队列。
ArrayBlockingQueue fairQueue=new ArrayBlockingQueue(2000,true);
LinkedBlockingQueue
它是基于链表的阻塞队列,同ArrayListBlockingQueue类似,此队列按照先进先出(FIFO)的原则对元素进行排序,其内部也维持着一个数据缓冲队列(该队列由一个链表构成). 当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓冲在队列内部,而生产者立即返回;只有当队列缓冲区达到缓存容量的最大值是(LinkedBlockingQueue可以通过构造方法指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒。反之对于消费者这端的处理也基于同样的原理。 而LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步。 这也意味着在高并发的情况下生产者和消费者可以并行的操作队列中的数据,以此来提高整个队列的并发性能。 我们需要注意的是,如果构造一个 LinkedBlockingQueue 对象,而没有指定其容量大小,LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE).这样一来,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。ArrayBlockingQueue 和 LinkedBlockingQueue 是两个最普通也是最常用的阻塞队列。一般情况下,在处理多线程的 生产者-消费者问题是,使用这两个类足以。 Demo
//生产者
public class Shengchan implements Runnable{
private volatile boolean isRunnage =true; //是否在运行状态
private BlockingQueue<String> queue; //阻塞队列
//原子方式更新
private static AtomicInteger cout=new AtomicInteger(); //自动更新的值
private static final int De=1000;
@Override
public void run() {
String data=null;
Random r=new Random();
System.out.println("启动生产者线程");
while (isRunnage){
System.out.println("正在生产");
try {
Thread.sleep(r.nextInt(De)); //取0-1000的一个随机数
data="data"+cout.incrementAndGet(); //以原子方式将 cout+1
System.out.println("将数据:"+data+"放入队列...");
if (!queue.offer(data,2, TimeUnit.SECONDS)){ //设定的等待时间,如果超过2S还没加进去返回true
System.out.println("放入数据失败:"+data);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}finally {
System.out.println("退出生产者线程");
}
}
}
public void stop(){
isRunnage=false;
}
public Shengchan(BlockingQueue<String> queue) {
this.queue = queue;
}
}
//消费者
public class Xiaofeizhe implements Runnable{
private BlockingQueue<String> queue;
private static final int DEFAULT=1000;
//构造函数
public Xiaofeizhe(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("启动消费者线程");
Random r=new Random();
boolean isRunnag=true;
while(isRunnag){
System.out.println("正在从队列获取数据");
try {
String data=queue.poll(2, TimeUnit.SECONDS); //有数据是直接从队列的对首取走,无数据是阻塞,在2s内有数据,取走,超过2s还没数据,返回失败
if (data != null) {
System.out.println("拿到数据"+data);
System.out.println("正在消费数据"+data);
Thread.sleep(r.nextInt(DEFAULT));
}else {
//超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。
isRunnag=false;
}
} catch (InterruptedException e) {
e.printStackTrace();
//中断线程
Thread.currentThread().interrupt();
}finally {
System.out.println("退出消费者线程");
}
}
}
}
//测试类
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
//声明一个容量为10的缓存队列
BlockingQueue<String> queue=new LinkedBlockingDeque<>(10);
//new了3个生产者和一个消费者
Shengchan s1=new Shengchan(queue);
Shengchan s2=new Shengchan(queue);
Shengchan s3=new Shengchan(queue);
Xiaofeizhe x1=new Xiaofeizhe(queue);
//借助Excutors
//创建线程池
ExecutorService service= Executors.newCachedThreadPool();
service.execute(s1);
service.execute(s2);
service.execute(s3);
service.execute(x1);
//执行10s
Thread.sleep(10*1000);
s1.stop();
s2.stop();
s3.stop();
Thread.sleep(2000);
//退出Excutor
service.shutdown();
}
}
PriorityBlockingQueue
它是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。这里可以自定义实现 comepareTo() 方法来指定元素进行排序规则;或者初始化PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。但其不能保证痛优先级元素的顺序 .
DelayQueue
它是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现Delayed接口。创建元素时,可以指定元素到期的时间,只有在元素到期时才能从队列中取走。
SynchronousQueue
它是一个不存储元素的阻塞队列。每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此队列内部其实没有任何一个元素,或者说容量是0,严格来说它并不是一种容器。由于队列没有容量,因此不能调用peek操作(返回队列的头元素)。
LinkedTransferQueue
它是一个由链表结构组成的无界阻塞TransferQueue队列。LinkedTransferQueue实现了一个重要的接口TransferQueue。该接口含有5个方法,其中有3个重要方法,分别如下。
LinkBlockingDeque
它是一个由链表结构组成的双向阻塞队列。双向队列可以从队列的两端插入和移除元素,因此在多线程同时入队时,也就减少了一半的竞争。由于是双向的,因此 LinkedBlockingDeque 多了addFrist,addLast,offerFitrst,offerLast,peekFirst,peekLast等方法。其中,以Frist单词结尾的方法,表示插入,获取或移除双端队列的第一个元素;以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。
我们以ArryBlockingQueue 为例子
我们可以看到,ArrayBockingQueue 维护了一个 Object 类型的数组,takeIndex 和 putIndex 分别表示队首元素和队尾元素的下标,count表示队列中元素的个数,接着往下看
lock是一个可重入锁,notEmpty和 notFull 是等待条件,接着我们看put 方法
它先获取了锁,并且是一个可中断锁,然后判断当前线程个数是否等于数组长度,如果相等,则嗲用 notFulawait()进行等待。当次线程被其他线程唤醒时,通过 enqueue(e) 方法插入元素,接着看 enqueue 方法
插入成功后,通过notEntry唤醒正在等待元素的线程。 下面再来看看take方法
和 put 方法类似,put 方法是等待 notFull 信号,而 Take是等待 notEmpty 的信号,在 take 方法中,如果可以取元素,则通过 dequeue 方法取得元素,下面看看dequeue 方法
和 enqueue 方法类似,在获取元素后,通过notFull 的signal 唤醒正在等待插入元素的线程。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有