阻塞队里是在普通的队列(先进先出队列)基础上,做出了扩充
基于阻塞队列,最大的应用场景,就是实现“生产者消费者模型”(日常开发中,常见的编程手法)
比如:
小猪佩奇一家准备包饺子,成员有佩奇,猪爸爸和猪妈妈,外加一个桌子
此时:
为什么是是阻塞队列而不是普通队列?
因为阻塞队列可以很好的协调生产者和消费者
上述生产者消费者模型在后端开发中,经常会涉及到
当下后端开发,常见的结构——“分布式系统”,不是一台服务器解决所有问题,而是分成了多个服务器,服务器之间相互调用
主要有两方面的好处
我们希望见到“低耦合”
通常谈到的“阻塞队列”是代码中的一个数据结构
但是由于这个东西太好用了,以至于会把这样的数据结构单独封装成一个服务器程序,并且在单独的服务器机器上进行部署
此时,这样的饿阻塞队列有了一个新的名字,“消息队列”(Message Queue,MQ)
编写 A 和 B 代码中,会出现很多对方服务器相关的代码
并且,此时如果 B 服务器挂了,A 可能也会直接受到影响
再并且,如果后续想加入一个 C 服务器,此时对 A 的改动就很大
如果是通过阻塞队列:
A 之和队列通信
B 也只和队列通信
A 和 B 互相不知道对方的存在,代码中就更没有对方的影子
看起来,A 和 B 之间是解耦合了,但是 A 和队列,B 和队列之间,不是引入了新的耦合吗?
耦合的代码,在后续的变更工程中,比较复杂,容易产生 bug
但消息队列是成熟稳定的产品,代码是稳定的,不会频繁更改。A、B 和队列之间的耦合,对我们的影响微乎其微
再增加 C 服务器也很方便,也不会影响到原有的 A 和 B 服务器
通过中间的阻塞队列,可以起到削峰填谷的效果,在遇到请求量激增突发的情况下,可以有效保护下游服务器,不会被请求冲垮
阻塞队列的作用就相当与三峡大坝在三峡的防汛作用
A 向队列中写入数据变快了,但是 B 仍然可以按照原有的速度来消费数据
阻塞队列扛下了这样的压力,就像三峡大坝抗住上游的大量水量的压力
如果是直接调用,A 收到多少请求,B 也收到多少,那很可能直接就把 B 给搞挂了
当 A 不再写入数据的时候,但队列中还存有数据,可以继续工给 B
类似的,像 MySQL 这样的数据库,处理每个请求的时候,做的工作就是比较多的,消耗的硬件资源也是比较多的,因此 MySQL 也是后端系统中,容易挂的部分
对应的,像 Redis 这种内存数据库,处理请求,做的工作远远少于 MySQL,消耗的资源更少,Redis 就比 MySQL 硬朗很多,不容易挂
每个技术都有优缺点,不能无脑吹,也不能无脑黑
比如:微服务
阻塞队列在 Java 标准库中也提供了现成的封装——BlockingQueue
BlockingQueue
本质上是一个接口,不能直接new
,只能new
一个类因为是继承与
Queue
,所以Queue
的一些操作,offer
、poll
这些,在BlockingQueue
中同样可以使用(不过不建议使用,因为都不能阻塞)
BlockingQueue
提供了另外两个专属方法,都能阻塞- `put`——入列
- `take`——出队列
BlockingQueue<String> queue = new ArrayBlockingQueue<>(1000);
capacity 指的是容量,是一个需要加上的参数
public class Demo10 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
queue.put("111");
System.out.println("put成功");
queue.put("111");
System.out.println("put成功");
}
}
//运行结果
put成功
put成功
put成功
只打印了三个,说明第四次 put 的时候容量不够,阻塞了
public class Demo10 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
queue.put("111");
System.out.println("put 成功");
queue.put("111");
System.out.println("put 成功");
queue.take();
System.out.println("take 成功");
queue.take();
System.out.println("take 成功");
queue.take();
System.out.println("take 成功");
}
}
//运行结果
put 成功
put 成功
take 成功
take 成功
由于只有
put
了两次,所以也只有两次take
,随后阻塞住了
public class Demo11 {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);
Thread t1 = new Thread(() -> {
int i = 1;
while(true){
try {
queue.put(i);
System.out.println("生产者元素"+i);
i++;
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2 = new Thread(() -> {
while(true) {
try {
Integer i = queue.take();
System.out.println("消费者元素"+i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}
上述程序中,一个线程生产,一个线程消费
实际开发中,通常可能是多个线程生产,多个线程消费
基于数组的队列
实现一个基础的队列
//此处不考虑泛型参数,只是基于 String 进行存储
class MyBlockingQueue {
private String[] data = null;
private int head = 0;
private int tail = 0;
private int size = 0;
public MyBlockingQueue(int capacity) {
data = new String[capacity];
}
public void put(String s) {
if(size == data.length) {
//队列满了
return;
}
data[tail] = s;
tail++;
if(tail >= data.length){
tail = 0;
}
size++;
}
public String take() {
if(size == 0) {
//队列为空
return null;
}
String ret = data[head];
head++;
if(head >= data.length){
head = 0;
}
size--;
return ret;
}
}
take
就要阻塞,在其他线程 put
的时候唤醒//此处不考虑泛型参数,只是基于 String 进行存储
class MyBlockingQueue {
private String[] data = null;
private int head = 0;
private int tail = 0;
private int size = 0;
private Object locker = new Object();
public MyBlockingQueue(int capacity) {
data = new String[capacity];
}
public void put(String s) throws InterruptedException {
//加锁的对象,可以单独定义一个,也可以直接就地使用this
synchronized (locker) {
if (size == data.length) {
//队列满了,需要阻塞
//return;
locker.wait();
}
data[tail] = s;
tail++;
if (tail >= data.length) {
tail = 0;
}
size++;
//唤醒 take 的阻塞
locker.notify();
}
}
public String take() throws InterruptedException {
String ret = "";
synchronized (locker) {
if (size == 0) {
//队列为空,需要阻塞
//return null;
locker.wait();
}
ret = data[head];
head++;
if (head >= data.length) {
head = 0;
}
size--;
//唤醒 put 的阻塞
locker.notify();
}
return ret;
}
}put
就要阻塞,在其他线程 take
的时候唤醒原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。