Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【Java】实现生产者消费者模型

【Java】实现生产者消费者模型

作者头像
后端码匠
发布于 2023-02-27 01:15:37
发布于 2023-02-27 01:15:37
86800
代码可运行
举报
文章被收录于专栏:后端码匠后端码匠
运行总次数:0
代码可运行

【Java】生产者消费者模型

0x1 前言

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

0x2 实现

以下用4种方式来实现生产者消费者模型

0x21 wait()和notify()方法

这也是最简单最基础的实现,缓冲区满和为空时都调用wait()方法等待,当生产者生产了一个产品或者消费者消费了一个产品之后会唤醒所有线程。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.com.codingce.juc.生产者消费者模型;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 生产者和消费者, wait()和notify()方式实现
 * <p>
 * 后端码匠
 *
 * @author mxz
 */
public class WaitNotifyExample {

    private static Integer count = 0;
    private static final Integer maxCount = 10;
    private final Object LOCK = new Object();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        WaitNotifyExample example = new WaitNotifyExample();
        service.execute(example.new Producer());
        service.execute(example.new Producer());
        service.execute(example.new Consumer());
        service.shutdown();
        System.out.println(service.awaitTermination(10L, TimeUnit.SECONDS));
    }

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(2000L);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                synchronized (LOCK) {
                    // 缓冲区满调用wait()方法等待
                    while (Objects.equals(count, maxCount)) {
                        try {
                            LOCK.wait();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    count++;
                    System.out.println("*************** " + Thread.currentThread().getName() + ", 生产者生产, 目前总共有: " + count + " ***************");
                    LOCK.notifyAll();
                }
            }
        }
    }

    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (LOCK) {
                    // 缓冲区为空调用wait()方法等待
                    while (count == 0) {
                        try {
                            LOCK.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    count--;
                    System.out.println("--------------- " + Thread.currentThread().getName() + ", 消费者消费, 目前总共有: " + count + " ---------------");
                    LOCK.notifyAll();
                }
            }
        }
    }
    
}

output

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
*************** pool-1-thread-2, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 0 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 0 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 2 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 3 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 2 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 3 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 4 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 3 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 4 ***************
false
*************** pool-1-thread-1, 生产者生产, 目前总共有: 5 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 6 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 5 ---------------
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 4 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 5 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 6 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 7 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 6 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 7 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 8 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 7 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 8 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 9 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 10 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 9 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 10 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 9 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 10 ***************

0x22 可重入锁ReentrantLock

java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,通过对locklock()方法和unlock()方法实现了对锁的显示控制,而synchronize()则是对锁的隐性控制。 可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响,简单来说,该锁维护这一个与获取锁相关的计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,函数调用结束计数器就减1,然后锁需要被释放两次才能获得真正释放。已经获取锁的线程进入其他需要相同锁的同步代码块不会被阻塞。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.com.codingce.juc.生产者消费者模型;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 生产者和消费者, {@link ReentrantLock} 方式实现
 * <p>
 * 后端码匠
 *
 * @author mxz
 */
public class ReentrantLockExample {

    private static Integer count = 0;
    private static final Integer maxCount = 10;
    // 创建一个锁对象
    private final Lock lock = new ReentrantLock();
    // 使用 Condition 对象来实现 wait 和 notify 的功能
    // 创建两个条件变量, 一个为缓冲区非满, 一个为缓冲区非空
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newCachedThreadPool();
        ReentrantLockExample example = new ReentrantLockExample();
        service.execute(example.new Producer());
        service.execute(example.new Consumer());
        service.shutdown();
        System.out.println(service.awaitTermination(15L, TimeUnit.SECONDS));
    }

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000L);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                // 获取锁
                lock.lock();
                try {
                    while (Objects.equals(count, maxCount)) {
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    count++;
                    System.out.println("*************** " + Thread.currentThread().getName() + ", 生产者生产, 目前总共有: " + count + " ***************");
                    // 唤醒消费者
                    notEmpty.signal();
                } finally {
                    // 释放锁
                    lock.unlock();
                }
            }
        }
    }

    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                lock.lock();
                try {
                    while (count == 0) {
                        try {
                            notEmpty.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    count--;
                    System.out.println("--------------- " + Thread.currentThread().getName() + ", 消费者消费, 目前总共有: " + count + " ---------------");
                    notFull.signal();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

}

output

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-2, 消费者消费, 目前总共有: 0 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-2, 消费者消费, 目前总共有: 0 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-2, 消费者消费, 目前总共有: 0 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-2, 消费者消费, 目前总共有: 0 ---------------
false
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-2, 消费者消费, 目前总共有: 0 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-2, 消费者消费, 目前总共有: 0 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-2, 消费者消费, 目前总共有: 0 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-2, 消费者消费, 目前总共有: 0 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-2, 消费者消费, 目前总共有: 0 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-2, 消费者消费, 目前总共有: 0 ---------------

0x23 阻塞队列BlockingQueue

BlockingQueue阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

  • 当队列满了的时候进行入队列操作;
  • 当队列空了的时候进行出队列操作。

因此,当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。从上可知,阻塞队列是线程安全的。 下面是BlockingQueue接口的一些方法:

操作

抛异常

特定值

阻塞

超时

插入

add(o)

offer(o)

put(o)

offer(o, timeout, timeunit)

移除

remove(o)

poll(o)

take(o)

poll(timeout, timeunit)

检查

element(o)

peek(o)

这四类方法分别对应的是:

  • ThrowsException:如果操作不能马上进行,则抛出异常;
  • Special Value:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false;
  • Blocks:如果操作不能马上进行,操作会被阻塞;
  • TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false。

下面来看由阻塞队列实现的生产者消费者模型,这里使用 take() 和 put() 方法,这里生产者和生产者,消费者和消费者之间不存在同步,所以会出现连续生成和连续消费的现象。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.com.codingce.juc.生产者消费者模型;

import java.util.concurrent.*;

/**
 * 使用 {@link BlockingQueue} 实现生产者消费者模型
 * <p>
 * 后端码匠
 *
 * @author mxz
 */
public class BlockingQueueExample {

    private static Integer count = 0;
    // 创建一个阻塞队列
    final BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(10);

    public static void main(String[] args) throws InterruptedException {
        BlockingQueueExample example = new BlockingQueueExample();
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(example.new Producer());
        service.execute(example.new Producer());
        service.execute(example.new Consumer());
        service.shutdown();
        System.out.println(service.awaitTermination(15L, TimeUnit.SECONDS));
    }

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000L);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                try {
                    blockingQueue.put(1);
                    count++;
                    System.out.println("*************** " + Thread.currentThread().getName() + ", 生产者生产, 目前总共有: " + count + " ***************");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                try {
                    blockingQueue.take();
                    count--;
                    System.out.println("--------------- " + Thread.currentThread().getName() + ", 消费者消费, 目前总共有: " + count + " ---------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

output

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 1 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 1 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 2 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 2 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 1 ---------------
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 1 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 3 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 2 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 5 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 5 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 4 ---------------
false
*************** pool-1-thread-1, 生产者生产, 目前总共有: 5 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 6 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 5 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 6 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 6 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 6 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 7 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 7 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 8 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 6 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 7 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 8 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 8 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 9 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 8 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 10 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 9 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 10 ***************

0x24 信号量Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。Java中的Semaphore维护了一个许可集,一开始先设定这个许可集的数量,可以使用 acquire() 方法获得一个许可,当许可不足时会被阻塞,release() 添加一个许可。在下列代码中,还加入了另外一个 mutex 信号量,维护生产者消费者之间的同步关系,保证生产者和消费者之间的交替进行。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.com.codingce.juc.生产者消费者模型;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * {@link Semaphore} (信号量)
 * <p>
 * 后端码匠
 *
 * @author mxz
 */
public class SemaphoreExample {

    private static Integer count = 0;
    // 创建三个信号量
    final Semaphore notFull = new Semaphore(10);
    final Semaphore notEmpty = new Semaphore(0);
    final Semaphore mutex = new Semaphore(1);

    public static void main(String[] args) throws InterruptedException {
        SemaphoreExample example = new SemaphoreExample();
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(example.new Producer());
        service.execute(example.new Producer());
        service.execute(example.new Consumer());
        service.shutdown();
        System.out.println(service.awaitTermination(15L, TimeUnit.SECONDS));
    }

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    notFull.acquire();
                    mutex.acquire();
                    count++;
                    System.out.println("*************** " + Thread.currentThread().getName() + ", 生产者生产, 目前总共有: " + count + " ***************");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    notEmpty.release();
                }
            }
        }
    }

    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                try {
                    notEmpty.acquire();
                    mutex.acquire();
                    count--;
                    System.out.println("--------------- " + Thread.currentThread().getName() + ", 消费者消费, 目前总共有: " + count + " ---------------");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mutex.release();
                    notFull.release();
                }
            }
        }
    }

}

output

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 2 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 1 ---------------
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 0 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 1 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 2 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 1 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 2 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 3 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 4 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 5 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 4 ---------------
false
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 3 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 4 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 5 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 6 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 5 ---------------
*************** pool-1-thread-1, 生产者生产, 目前总共有: 6 ***************
*************** pool-1-thread-2, 生产者生产, 目前总共有: 7 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 8 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 7 ---------------
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 6 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 7 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 8 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 9 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 8 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 9 ***************
*************** pool-1-thread-1, 生产者生产, 目前总共有: 10 ***************
--------------- pool-1-thread-3, 消费者消费, 目前总共有: 9 ---------------
*************** pool-1-thread-2, 生产者生产, 目前总共有: 10 ***************

0x3 拓展

0x31 管道输入输出流PipedInputStream和PipedOutputStream

Javaio包下,PipedOutputStreamPipedInputStream 分别是管道输出流管道输入流。 它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream 和 PipedInputStream 配套使用使用方法:先创建一个管道输入流和管道输出流,然后将输入流和输出流进行连接,用生产者线程往管道输出流中写入数据,消费者在管道输入流中读取数据,这样就可以实现了不同线程间的相互通讯,但是这种方式在生产者和生产者、消费者和消费者之间不能保证同步,也就是说在一个生产者和一个消费者的情况下是可以生产者和消费者之间交替运行的多个生产者和多个消费者者之间则不行

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.com.codingce.juc.生产者消费者模型;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * {@link PipedInputStream} {@link PipedOutputStream}
 * <p>
 * 后端码匠
 *
 * @author mxz
 */
public class PipedInputStreamExample {

    final PipedInputStream pis = new PipedInputStream();
    final PipedOutputStream pos = new PipedOutputStream();

    {
        try {
            pis.connect(pos);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PipedInputStreamExample example = new PipedInputStreamExample();
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(example.new Producer());
        service.execute(example.new Producer());
        service.execute(example.new Consumer());
        service.shutdown();
        System.out.println(service.awaitTermination(15L, TimeUnit.SECONDS));
    }

    class Producer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    Thread.sleep(1000L);
                    int num = (int) (Math.random() * 10);
                    System.out.println("*************** " + Thread.currentThread().getName() + ", 生产者生产了一个数字, 该数字为: " + num + " ***************");
                    pos.write(num);
                    pos.flush();
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    pos.close();
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    Thread.sleep(1000L);
                    int num = pis.read();
                    System.out.println("--------------- " + Thread.currentThread().getName() + ", 消费者消费了一个数字, 该数字为: " + num + " ---------------");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    pos.close();
                    pis.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

output

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 1 ***************
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 7 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 1 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 2 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 7 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 7 ---------------
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 2 ---------------
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 7 ***************
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 8 ***************
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 6 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 4 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 7 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 2 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 7 ---------------
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 3 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 0 ***************
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 1 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 8 ---------------
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 6 ---------------
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 1 ***************
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 7 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 9 ***************
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 1 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 4 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 7 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 2 ---------------
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 5 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 1 ***************
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 5 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 3 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 2 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 0 ---------------
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 5 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 8 ***************
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 0 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 1 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 1 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 8 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 1 ---------------
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 3 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 7 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 0 ***************
false
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 1 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 0 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 9 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 5 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 1 ---------------
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 7 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 7 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 7 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 4 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 7 ***************
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 5 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 5 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 2 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 1 ---------------
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 1 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 5 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 4 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 0 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 2 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 2 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 9 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 5 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 8 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 7 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 8 ---------------
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 2 ***************
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 2 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 5 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 0 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 3 ***************
*************** pool-1-thread-1, 生产者生产了一个数字, 该数字为: 7 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 1 ---------------
*************** pool-1-thread-2, 生产者生产了一个数字, 该数字为: 0 ***************
--------------- pool-1-thread-3, 消费者消费了一个数字, 该数字为: 8 ---------------
### 省略.......

项目地址:https://gitee.com/codingce/codingce-leetcode

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-01-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端码匠 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
生产者-消费者模式的多种实现
二者都是Object类的方法,使用这套方法时必须获得同步锁synchronized。
青山师
2023/05/05
2450
生产者-消费者模式的多种实现
Java实现生产者消费者的两种方式(r12笔记第66天)
我在8年前去面试程序员的时候,一个不大的公司,里面的开发主管接待了我们,给我的题目就是写一段程序模拟生产者消费者问题,当时可把我难坏了,一下子感觉自己的知识储备竟然如此的匮乏。 而在我从事DBA工作之后,经常会有大批量并发的环境,有的需要排障,有的需要优化,在很多并发的场景中,发现生产者消费者问题可以模拟出很多实际中的问题,所以生产者消费者问题非常重要,也是我想不断改进和探索的一类问题。 引入仓库的必要性 要想使用程序来模拟,其实也不用花太多的时间,我们简单说说需要考虑的地方。首先生产者,
jeanron100
2018/03/21
8210
使用BlockingQueue的生产者消费者模式
BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。使用场景。
用户3003813
2018/09/06
1.3K0
使用BlockingQueue的生产者消费者模式
Java实现生产者消费者问题
实现一个队列。 队列的应用场景为: 一个生产者线程将int类型的数入列,一个消费者线程将int类型的数出列 image.png 1、Consumer.java package com.week.pv; import java.util.LinkedList; import java.util.List; import java.util.Queue; /** * 消费者 * @author xingqijiang */ public class Consumer implements Runn
week
2018/08/24
4810
Java实现生产者消费者问题
生产者消费者模式的三种实现方式
java中生产者消费者模式的三种实现方式 生产者消费者的实现 生产者生产数据到缓冲区中,消费者从缓冲区中取数据。 如果缓冲区已经满了,则生产者线程阻塞; 如果缓冲区为空,那么消费者线程阻塞。 wait notify方式实现 1. 生产者 package com.earthchen.ProducerConsumer.waitnotify; import java.util.Queue; /** * @author earthchen * @date 2018/9/20 **/ public cla
earthchen
2020/09/24
3350
Java 多线程基础
多线程(multithreading):指从软件或者硬件上实现多个线程并发执行的技术。
星姮十织
2022/01/02
3860
面试官让我手写一个生产者消费者模式?
不知道你是否遇到过面试官让你手写生产者消费者代码。别说,前段时间有小伙伴还真的遇到了这种情况,当时是一脸懵逼。
烟雨星空
2020/06/16
1.4K0
面试官让我手写一个生产者消费者模式?
Java生产者消费者的三种实现
Java生产者消费者是最基础的线程同步问题,java岗面试中还是很容易遇到的,之前没写过多线程的代码,面试中被问到很尬啊,面完回来恶补下。在网上查到大概有5种生产者消费者的写法,分别如下。
xindoo
2021/01/21
2.4K0
用java语言实现生产者消费者问题[Java生产者消费者模型一对一]
大家好,我是架构君,一个会写代码吟诗的架构师。今天说一说用java语言实现生产者消费者问题[Java生产者消费者模型一对一],希望能够帮助大家进步!!!
Java架构师必看
2022/02/28
5550
用java语言实现生产者消费者问题[Java生产者消费者模型一对一]
Semaphore和生产者-消费者模型
这里后面打算出一期,品质比较高的文章系列,分类以语言为主,在这个文章系统里,基本是一个比较热门的知识点或者是一个比较大的知识点,我会复现,然后谈谈自己的理解
废江_小江
2022/12/01
6980
Carson带你学Java:解决生产者、消费者问题的五种实现方式
1. 简介 生产者 & 消费者之间存在 强耦合问题 2. 解决方案 采用 生产者 & 消费者 模式,具体介绍如下: 3. 具体解决方式介绍 方式1:wait() / notify() // Object类里的两个方法,所有Object子类都可使用这2个方法 // 对象的监视器对锁对象的锁定(也就是代码中的lock对象),注意是调用锁对象的wait() / nofity() public class Test { private static Integer count = 0;
Carson.Ho
2021/12/06
2460
Carson带你学Java:解决生产者、消费者问题的五种实现方式
线程同步之生产者消费者
  前面因时间关系,未将“生产者消费者问题”实例的介绍发布在博客随笔中,故本文作为对之前“多线程”一文的补充。 概念:   生产者消费者问题(Bounded-buffer problem),是一个多线程同步问题的经典案例。这个案例中主要实现的是两个角色协同对同一资源进行访问。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
云海谷天
2022/08/09
2360
线程同步之生产者消费者
Java实现生产者消费者
生产/消费者问题是个非常典型的多线程问题,涉及到的对象包括“生产者”、“消费者”、“仓库”和“产品”。他们之间的关系如下: (01) 生产者仅仅在仓储未满时候生产,仓满则停止生产。 (02) 消费者仅仅在仓储有产品时候才能消费,仓空则等待。 (03) 当消费者发现仓储没产品可消费时候会通知生产者生产。 (04) 生产者在生产出可消费产品时候,应该通知等待的消费者去消费。
说故事的五公子
2020/04/10
7090
Callable接口实现多线程,生产者消费者问题,多线下载(复制)文件
一.通过Callable接口实现多线程 1.Callable接口介绍: (1)java.util.concurrent.Callable是一个泛型接口,只有一个call()方法 (2)call()方法抛出异常Exception异常,且返回一个指定的泛型类对象 2.Callable接口实现多线程的应用场景 (1)当父线程想要获取子线程的运行结果时 3.使用Callable接口实现多线程的步骤 (1)第一步:创建Callable子类的实例化对象 (2)第二步:创建FutureTask对象,并将Callable
IT架构圈
2018/06/01
7050
Java面试——写一个生产者与消费者
【1】我们将公共的属性和方法放在 Resouce 类中,在资源类中使用 Lock 中的 lock()进行加锁,控制并发操作。使用 await()方法阻塞线程。使用 signalAll()唤醒线程。
Java架构师必看
2021/05/14
1.8K0
Java面试——写一个生产者与消费者
Java—多线程实现生产消费者
代码说明:Resource中使用synchronized代码块儿进行加锁的操作,线程需要等待的时候使用wait()方法进行等待操作,唤醒使用notify()方法进行唤醒,需要注意的是notify()唤醒是对其他等待线程进行随机唤醒操作,所以在TestMain中只设置了一个生产类和一个消费类,这都是有意而为之的。 存在问题: 如果我们的共有资源只有一份的时候并且存在多个消费者和生产者的时候,以上的代码就会出现问题。为什么呢?因为notify()是随机唤醒一个等待线程,可能消费者线程还会唤醒一个消费者线程,同理可能生产线程还会唤醒一个生产线程,所以在只有一份资源的情况下,就可能多次消费或者多次生产的情况,那么怎么解决这个问题呢?看下文version2。
科技新语
2024/12/25
1190
Java—多线程实现生产消费者
线程之生产者消费者模式
前几篇复习了下《线程的创建方式》、《线程的状态》、《Thread 的源码解析》、《wait、notify/notifyAll 源码解析》这几篇文章。这篇是第五篇生产者消费者模式在我们日常工作中用得非常多,比如:在模块解耦、消息队列、分布式场景中都很常见。这个模式里有三个角色,他们之间的关系是如下图这样的:
JavaFish
2020/09/24
9850
线程之生产者消费者模式
AQS Condition使用
标题 概述 1.Condition简单使用 2.Condition生产者消费者模型 第1节 Condition简单使用 Condition使用方式如下。 public cl
黑洞代码
2021/01/14
4290
AQS Condition使用
生产者和消费者模型
生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。
希希里之海
2019/08/30
6880
Java多线程编程(四)- 阻塞队列,生产者消费者模型,线程池
1.1.当队列满的时候, 继续入队列就会阻塞, 直到有其他线程从队列中取走元素 
用户11305962
2024/11/21
4980
Java多线程编程(四)- 阻塞队列,生产者消费者模型,线程池
相关推荐
生产者-消费者模式的多种实现
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档