前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >生产者消费者问题Java实现

生产者消费者问题Java实现

作者头像
里克贝斯
发布2021-05-21 16:08:26
发布2021-05-21 16:08:26
45900
代码可运行
举报
文章被收录于专栏:图灵技术域图灵技术域
运行总次数:0
代码可运行

生产消费者模型

多线程并发应用程序有一个经典的模型,即生产者/消费者模型。系统中,产生消息的是生产者,处理消息的是消费者,消费者和生产者通过一个缓冲区进行消息传递。生产者产生消息后提交到缓冲区,然后通知消费者可以从中取出消息进行处理。消费者处理完信息后,通知生产者可以继续提供消息。

要实现这个模型,关键在于消费者和生产者这两个线程进行同步。也就是说:只有缓冲区中有消息时,消费者才能够提取消息;只有消息已被处理,生产者才能产生消息提交到缓冲区。

生产消费者模式如下图。

缓冲区实际上是一个先进先出的队列,锁(信号量)的条件notEmpty和notFull。

Java实现:

代码语言:javascript
代码运行次数:0
复制
import java.util.concurrent.*;
import java.util.concurrent.locks.*;

public class ConsumerProducer {
  private static Buffer buffer = new Buffer();

  public static void main(String[] args) {
    // Create a thread pool with two threads
    ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.execute(new ProducerTask());
    executor.execute(new ConsumerTask());
    executor.shutdown();
  }

  // A task for adding an int to the buffer
  private static class ProducerTask implements Runnable {
    public void run() {
      try {
        int i = 1;
        while (true) {
          System.out.println("生产者生产 " + i);
          buffer.write(i++); // Add a value to the buffer
          // Put the thread into sleep
          Thread.sleep((int)(Math.random() * 1000));
        }
      } 
      catch (InterruptedException ex) {
        ex.printStackTrace();
      }
    }
  }

  // A task for reading and deleting an int from the buffer
  private static class ConsumerTask implements Runnable {
    public void run() {
      try {
        while (true) {
          System.out.println("\t\t\t消费者消费 " + buffer.read());
          // Put the thread into sleep
          Thread.sleep((int)(Math.random() * 1000));
        }
      } 
      catch (InterruptedException ex) {
        ex.printStackTrace();
      }
    }
  }

  // An inner class for buffer
  private static class Buffer {
    private static final int CAPACITY = 1; // buffer size
    private java.util.LinkedList<Integer> queue =
      new java.util.LinkedList<>();

    // Create a new lock
    private static Lock lock = new ReentrantLock();

    // Create two conditions
    private static Condition notEmpty = lock.newCondition();
    private static Condition notFull = lock.newCondition();

    public void write(int value) {
      lock.lock(); // Acquire the lock
      try {
        while (queue.size() == CAPACITY) {
          System.out.println("缓冲区已满!");
          notFull.await();
        }

        queue.offer(value);
        notEmpty.signal(); // Signal notEmpty condition
      } 
      catch (InterruptedException ex) {
        ex.printStackTrace();
      } 
      finally {
        lock.unlock(); // Release the lock
      }
    }

    public int read() {
      int value = 0;
      lock.lock(); // Acquire the lock
      try {
        while (queue.isEmpty()) {
          System.out.println("\t\t\t缓冲区为空");
          notEmpty.await();
        }

        value = queue.remove();
        notFull.signal(); // Signal notFull condition
      } 
      catch (InterruptedException ex) {
        ex.printStackTrace();
      } 
      finally {
        lock.unlock(); // Release the lock
        return value;
      }
    }
  }
}

效果:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-04-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 生产消费者模型
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档