今天把昨天的一个示例写完整了,运行一下还挺好玩。
这段代码是一个经典的生产者-消费者模型实现,用于在多线程环境下控制生产和消费的平衡,避免缓冲区溢出或空耗。
调整一下生产者的生产时间,消费者的消费时间,就会产生不一样的运行结果,还可以加入多个生产者或消费者。
生产者-消费者模型主要分为四个部分,第一部分是缓冲区,缓冲区用于存放生产者生产的数据,也是消费者获取数据的地方;第二部分生产者,它负责数据的生产;第三部分消费者,它负责数据的消费,第四部分主代码,开启生产-消费模式。
下面对代码中的各个部分进行详细解读:
1. BoundedBuffer 类
BoundedBuffer 是一个有界缓冲区,负责管理生产者和消费者之间的共享数据。它的主要作用是确保在生产者写入数据时不超过缓冲区的最大容量,在消费者读取数据时不从空缓冲区读取。
成员变量:
// 缓冲队列
private final Queue<Integer> buffer = new LinkedList<>();
// 队列最大容量
private final int capacity;
// 可重入锁
private final ReentrantLock lock = new ReentrantLock();
// 非空条件
private final Condition notEmpty = lock.newCondition();
// 未满条件
private final Condition notFull = lock.newCondition();
buffer:使用 Queue<Integer> 存储数据。
capacity:表示缓冲区的最大容量。
lock:使用 ReentrantLock 实现线程同步,保证同一时刻只有一个线程可以修改缓冲区。
notEmpty 和 notFull:条件变量,用于控制缓冲区的非空和未满状态,从而实现线程间的协调。
构造方法:
初始化缓冲区的最大容量。
// 初始化队列大小
public BoundedBuffer(int capacity) {
this.capacity = capacity;
}
put 方法:
负责将生产者的数据写入缓冲区。
// 放入数据
public void put(int value) throws InterruptedException {
// 上锁
lock.lock();
try {
while (buffer.size() == capacity) {
System.out.println("缓存区满了,稍等在放..." );
notFull.await(); // 等待直到有空位
}
buffer.add(value);
System.out.println(Thread.currentThread().getName() + "刚刚生产了: " + value);
notEmpty.signal(); // 通知消费者可以消费了
} finally {
// 释放锁
lock.unlock();
}
}
首先上锁(lock.lock()),然后检查缓冲区是否已满。
如果缓冲区已满,调用 notFull.await() 进入等待状态,直到有空位出现。
如果有空位,将数据添加到 buffer 中,并通过 notEmpty.signal() 通知消费者有数据可消费。
take 方法:
// 取出数据
public int take() throws InterruptedException {
// 上锁
lock.lock();
try {
while (buffer.isEmpty()) {
System.out.println("缓存区没了,稍等在取..." );
notEmpty.await(); // 等待直到有数据
}
int value = buffer.remove();
System.out.println(Thread.currentThread().getName() + "刚刚消费了: " + value);
notFull.signal(); // 通知生产者可以生产了
return value;
} finally {
// 释放锁
lock.unlock();
}
}
负责从缓冲区读取数据供消费者使用。
首先上锁,然后检查缓冲区是否为空。
如果缓冲区为空,调用 notEmpty.await() 进入等待状态,直到有数据可供消费。
如果有数据,从 buffer 中移除并返回该数据,同时通过 notFull.signal() 通知生产者可以继续生产。
完整代码:
public class BoundedBuffer {
// 缓冲队列
private final Queue<Integer> buffer = new LinkedList<>();
// 队列最大容量
private final int capacity;
// 可重入锁
private final ReentrantLock lock = new ReentrantLock();
// 非空条件
private final Condition notEmpty = lock.newCondition();
// 未满条件
private final Condition notFull = lock.newCondition();
// 初始化队列大小
public BoundedBuffer(int capacity) {
this.capacity = capacity;
}
// 放入数据
public void put(int value) throws InterruptedException {
// 上锁
lock.lock();
try {
while (buffer.size() == capacity) {
System.out.println("缓存区满了,稍等在放..." );
notFull.await(); // 等待直到有空位
}
buffer.add(value);
System.out.println(Thread.currentThread().getName() + "刚刚生产了: " + value);
notEmpty.signal(); // 通知消费者可以消费了
} finally {
// 释放锁
lock.unlock();
}
}
// 取出数据
public int take() throws InterruptedException {
// 上锁
lock.lock();
try {
while (buffer.isEmpty()) {
System.out.println("缓存区没了,稍等在取..." );
notEmpty.await(); // 等待直到有数据
}
int value = buffer.remove();
System.out.println(Thread.currentThread().getName() + "刚刚消费了: " + value);
notFull.signal(); // 通知生产者可以生产了
return value;
} finally {
// 释放锁
lock.unlock();
}
}
}
2. Producer 类
Producer 类实现了 Runnable 接口,用于创建生产者线程,它负责将数据写入缓冲区。
成员变量:
// 缓冲区
private final BoundedBuffer buffer;
buffer:生产者所共享的缓冲区。
run 方法:
@Override
public void run() {
try {
int i=0;
while (true) {
buffer.put(i++);
Thread.sleep(200);// 模拟生产时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
以无限循环的方式不断调用 buffer.put(i++) 生成数据。
每次生产数据后,线程休眠 200 毫秒(模拟生产时间)。
完整代码:
// 生产者
pubic class Producer implements Runnable {
// 缓冲区
private final BoundedBuffer buffer;
public Producer(BoundedBuffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
try {
int i=0;
while (true) {
buffer.put(i++);
Thread.sleep(200);// 模拟生产时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
3. Consumer 类
Consumer 类实现了 Runnable 接口,用于创建消费者线程,它负责从缓冲区读取数据。
成员变量:
// 缓冲区 private final BoundedBuffer buffer;
buffer:消费者所共享的缓冲区。
run 方法:
@Override
public void run() {
try {
while(true){
buffer.take();
Thread.sleep(100); // 模拟消费时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
以无限循环的方式不断调用 buffer.take() 消费数据。
每次消费数据后,线程休眠 100 毫秒(模拟消费时间)。
完整代码:
// 消费者
public class Consumer implements Runnable {
// 缓冲区
private final BoundedBuffer buffer;
public Consumer(BoundedBuffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
try {
while(true){
buffer.take();
Thread.sleep(100); // 模拟消费时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
4. ProducerConsumerExample 类
这是主类,用于运行生产者-消费者示例。
main 方法:
// 应用示例
public class ProducerConsumerExample {
public static void main(String[] args) {
BoundedBuffer buffer = new BoundedBuffer(5); // 设置缓冲区容量为5
Thread producerThread = new Thread(new Producer(buffer));
Thread consumerThread = new Thread(new Consumer(buffer));
producerThread.start();
consumerThread.start();
}
}
创建一个容量为 5 的 BoundedBuffer 实例。
创建并启动 Producer 和 Consumer 线程。
程序执行流程
1)生产者线程启动后,尝试往 BoundedBuffer 中放入数据。
如果缓冲区未满,则放入数据,并唤醒等待的消费者。
如果缓冲区已满,则生产者线程等待,直到消费者取出数据腾出空间。
2)消费者线程启动后,尝试从 BoundedBuffer 中取出数据。
如果缓冲区不为空,则取出数据,并唤醒等待的生产者。
如果缓冲区为空,则消费者线程等待,直到生产者放入新数据。
3)同步和通知机制:生产者和消费者通过 Condition 的 await 和 signal 方法实现同步,避免了不必要的忙等(轮询)。
运行结果:
Thread-0刚刚生产了: 0
Thread-1刚刚消费了: 0
缓存区没了,稍等在取...
Thread-0刚刚生产了: 1
Thread-1刚刚消费了: 1
缓存区没了,稍等在取...
在上面的代码里,我把生产时间设置为200毫秒,消费时间设置为100毫秒,明显的供不应求。刚生产的被消费了,所以消费者经常在等待。
把生产者变为两个:
BoundedBuffer buffer = new BoundedBuffer(5); // 设置缓冲区容量为5
Thread producerThread = new Thread(new Producer(buffer));
Thread producerThread1 = new Thread(new Producer(buffer));
Thread consumerThread = new Thread(new Consumer(buffer));
producerThread.start();
producerThread1.start();
consumerThread.start();
运行结果:
Thread-0刚刚生产了: 0
Thread-1刚刚生产了: 0
Thread-2刚刚消费了: 0
Thread-2刚刚消费了: 0
Thread-1刚刚生产了: 1
Thread-2刚刚消费了: 1
Thread-0刚刚生产了: 1
Thread-2刚刚消费了: 1
两个生产者的生产力刚好匹配一个消费者的消费力,基本达到了供需平衡。
最后总结
线程通过 lock.lock() 和 lock.unlock() 保证了互斥访问,避免了线程安全问题。
通过 notEmpty 和 notFull 条件变量,程序实现了生产者等待缓冲区有空位和消费者等待缓冲区有数据的机制。
Thread.sleep() 模拟了生产和消费的时间间隔,使程序更接近实际的生产-消费场景。
ReentrantLock 与 Condition 结合使用,实现了生产者和消费者的协调机制。
await 和 signal 方法避免了忙等,确保资源使用的高效性。
本文示例使用生产者-消费者模型,基于显式锁和条件变量,以实现多线程间的安全通信与协调。这种模型广泛应用于需要控制资源访问的场景,比如任务队列和数据缓冲系统。
这个代码运行起来,像不像流水线上相邻两个工位的供求关系?
领取专属 10元无门槛券
私享最新 技术干货