生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。
再具体一点:
实现生产者消费者模型有两种方式:
实现过程并不复杂,直接上代码:
这里设置了生产者生产速度大于消费者消费速度(通过 sleep()
方法实现)。
缓冲区 BufferArea.java
:
public class BufferArea {
// 当前资源数量的计数值
private int currNum = 0;
// 资源池中允许存放的资源数目
private int maxSize = 10;
/**
* 从资源池中取走资源
*/
public synchronized void get() {
if (currNum > 0) {
currNum--;
System.out.println("Cosumer_" + Thread.currentThread().getName() + "消耗一件资源," + "当前缓冲区有" + currNum + "个");
// 通知生产者生产资源
notifyAll();
} else {
try {
// 如果没有资源,则 Cosumer_ 进入等待状态
System.out.println("Cosumer_" + Thread.currentThread().getName() + ": 当前缓冲区资源不足,进入等待状态");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 向缓冲区中添加资源
*/
public synchronized void put() {
// 若当前缓冲区内的资源计数小于最大 size 数,才加
if (currNum < maxSize) {
currNum++;
System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + currNum + "个");
// 通知等待的消费者
notifyAll();
} else {
// 若当前缓冲区的资源计数大于最大 size 数,则等待
try {
System.out.println(Thread.currentThread().getName() + "线程进入等待 << 当前缓冲区的资源计数大于最大 size 数");
// 生产者进入等待状态,并释放锁
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
生产者 Producer.java
:
public class Producer extends Thread {
private BlockQueueBufferArea mBufferArea;
public Producer(BlockQueueBufferArea bufferArea) {
this.mBufferArea = bufferArea;
setName("Producer_" + getName());
}
@Override
public void run() {
// 不断的生产资源
while (true) {
sleepSomeTime();
mBufferArea.put();
}
}
private void sleepSomeTime() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者 Consumer
:
public class Consumer extends Thread {
private BlockQueueBufferArea mBufferArea;
public Consumer(BlockQueueBufferArea bufferArea) {
this.mBufferArea = bufferArea;
setName("Consumer_" + getName());
}
@Override
public void run() {
// 不断的取出资源
while (true) {
sleepSomeTime();
mBufferArea.get();
}
}
private void sleepSomeTime() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试 Test.java
:
public class Test {
public static void main(String[] args) {
BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();
Consumer consumer1 = new Consumer(bufferArea);
Consumer consumer2 = new Consumer(bufferArea);
Consumer consumer3 = new Consumer(bufferArea);
Producer producer1 = new Producer(bufferArea);
Producer producer2 = new Producer(bufferArea);
Producer producer3 = new Producer(bufferArea);
consumer1.start();
consumer2.start();
consumer3.start();
producer1.start();
producer2.start();
producer3.start();
}
}
打印结果如下:
ProducerThread-5生产一件资源,当前资源池有1个
ProducerThread-4生产一件资源,当前资源池有2个
ProducerThread-3生产一件资源,当前资源池有3个
ProducerThread-5生产一件资源,当前资源池有4个
ProducerThread-4生产一件资源,当前资源池有5个
ProducerThread-3生产一件资源,当前资源池有6个
ProducerThread-5生产一件资源,当前资源池有7个
ProducerThread-4生产一件资源,当前资源池有8个
ProducerThread-3生产一件资源,当前资源池有9个
ProducerThread-3生产一件资源,当前资源池有10个
ProducerThread-4线程进入等待 << 当前缓冲区的资源计数大于最大 size 数
ProducerThread-5线程进入等待 << 当前缓冲区的资源计数大于最大 size 数
ProducerThread-3线程进入等待 << 当前缓冲区的资源计数大于最大 size 数
>> 注释:3个生产者线程生产满了10个(maxSize)产品,然后就都进入了等待
Cosumer_Consumer_Thread-0消耗一件资源,当前缓冲区有9个
Cosumer_Consumer_Thread-1消耗一件资源,当前缓冲区有8个
Cosumer_Consumer_Thread-2消耗一件资源,当前缓冲区有7个
>> 注释:3个消费者消费了3个产品
ProducerThread-3生产一件资源,当前资源池有8个
ProducerThread-5生产一件资源,当前资源池有9个
ProducerThread-4生产一件资源,当前资源池有10个
>> 注释:生产者立马又生产3个
...
>> 然后一直循环往复这个过程
ArrayBlockingQueue 与 LinkedBlockingQueue 都是支持 FIFO (先进先出),但是 LinkedBlockingQueue 是无界的,而ArrayBlockingQueue 是有界的。
这里我们采用无界阻塞队列来演示生产者消费者模式。
还是设置生产者生产速度大于消费者消费速度(通过 sleep()
方法实现)
缓冲区 BlockQueueBufferArea.java
:
public class BlockQueueBufferArea {
BlockingQueue<Integer> mProductPoll = new LinkedBlockingQueue(10);
public void put() {
try {
System.out.println(Thread.currentThread().getName() + "产品池被放入了一个资源");
mProductPoll.put(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void get() {
try {
System.out.println(Thread.currentThread().getName() + "产品池被取走了一个资源");
mProductPoll.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
生产者 Producer.java
:
public class Producer extends Thread {
private BlockQueueBufferArea mBufferArea;
public Producer(BlockQueueBufferArea bufferArea) {
this.mBufferArea = bufferArea;
setName("Producer_" + getName());
}
@Override
public void run() {
// 不断的生产资源
while (true) {
sleepSomeTime();
mBufferArea.put();
}
}
private void sleepSomeTime() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者 Consumer.java
:
public class Consumer extends Thread {
private BlockQueueBufferArea mBufferArea;
public Consumer(BlockQueueBufferArea bufferArea) {
this.mBufferArea = bufferArea;
setName("Consumer_" + getName());
}
@Override
public void run() {
// 不断的取出资源
while (true) {
sleepSomeTime();
mBufferArea.get();
}
}
private void sleepSomeTime() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试 Test.java
:
public class Test {
public static void main(String[] args) {
BlockQueueBufferArea bufferArea = new BlockQueueBufferArea();
Consumer consumer1 = new Consumer(bufferArea);
Consumer consumer2 = new Consumer(bufferArea);
Consumer consumer3 = new Consumer(bufferArea);
Producer producer1 = new Producer(bufferArea);
Producer producer2 = new Producer(bufferArea);
Producer producer3 = new Producer(bufferArea);
consumer1.start();
consumer2.start();
consumer3.start();
producer1.start();
producer2.start();
producer3.start();
}
}
打印结果如下:
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源
Consumer_Thread-0产品池被取走了一个资源
Consumer_Thread-1产品池被取走了一个资源
Consumer_Thread-2产品池被取走了一个资源
Producer_Thread-4产品池被放入了一个资源
Producer_Thread-5产品池被放入了一个资源
Producer_Thread-3产品池被放入了一个资源