Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >消息队列

消息队列

作者头像
橘子君丶
发布于 2023-03-06 05:39:09
发布于 2023-03-06 05:39:09
3K00
代码可运行
举报
文章被收录于专栏:springBoot3.0springBoot3.0
运行总次数:0
代码可运行

关于消息队列

???? 文章简介:Kafka ???? 创作目的:消息队列 ☀️ 今日天气:天气很好 ???? 每日一言:“所行皆坦途 所求皆如愿。”


kafka常用于构建TB级别的异步消息系统

首先谈到对于框架的含义 :

Java 框架由一系列可重用的预编写代码组成,它们起着模板的作用,开发人员可以根据需要通过填充自定义代码来创建应用。

框架创建后可反复使用,这样开发人员即可以在一定的结构上来编写应用,而无需从头开始手动创建。

Java 框架中可以包含预定义类(例如对象类别)和函数,用于处理、输入和管理硬件设备,以及与系统软件进行交互。当然,具体的框架内容要取决于框架的类型、Java 开发人员的技能水平、他们所要完成的工作以及自己的偏好。

框架(Framework )的本质:对特定的类或方法进行封装的集成

如果我们不使用框架是否还能解决类似的问题呢 ?
答案是:

可以的,比如Kafka框架。在我们不使用Kafka的情况下,我们也能通过Java自带的API:BlockingQueue解决阻塞队列、实现消息系统或解决类似的问题、 ![6NUAJZIC_RJ`5NI4TDRZS.png](http://blog-dm-01.oss-cn-hangzhou.aliyuncs.com/articles/5509dbc218cd3763fa8bdd4298d9f36f.png)

关于Kafka使用的冷知识 现象:在windows的命令行里启动kafka之后,当关闭命令行窗口时,就会强制关闭kafka。这种关闭方式为暴力关闭,很可能会导致kafka无法完成对日志文件的解锁。届时,再次启动kafka的时候,就会提示日志文件被锁,无法成功启动。 方案:将kafka的日志文件全部删除,再次启动即可。 建议:不要暴力关闭kafka,建议通过在命令行执行kafka-server-stop命令来关闭它。 其他:将来在Linux上部署kafka之后,采用后台运行的方式,就会避免这样的问题

那么什么是阻塞队列呢 ?

阻塞队列—BlockingQueue(Java自带的API)

生产者&消费者

生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,如下图所示,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况:

存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,生产者等着消费者消费产品,从而向空间中添加产品。互相等待,从而发生死锁。

上代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class ProducerConsumerTest {
   public static void main(String[] args) {
      CubbyHole c = new CubbyHole();
      Producer p1 = new Producer(c, 1);
      Consumer c1 = new Consumer(c, 1);
      p1.start();
      c1.start();
   }
}
class CubbyHole {
   private int contents;
   private boolean available = false;
   public synchronized int get() {
      while (available == false) {
         try {
            wait();
         }
         catch (InterruptedException e) {
         }
      }
      available = false;
      notifyAll();
      return contents;
   }
   public synchronized void put(int value) {
      while (available == true) {
         try {
            wait();
         }
         catch (InterruptedException e) {
         }
      }
      contents = value;
      available = true;
      notifyAll();
   }
}

class Consumer extends Thread {
   private CubbyHole cubbyhole;
   private int number;
   public Consumer(CubbyHole c, int number) {
      cubbyhole = c;
      this.number = number;
   }
   public void run() {
      int value = 0;
         for (int i = 0; i < 10; i++) {
            value = cubbyhole.get();
            System.out.println("消费者 #" + this.number+ " got: " + value);
         }
    }
}

class Producer extends Thread {
   private CubbyHole cubbyhole;
   private int number;

   public Producer(CubbyHole c, int number) {
      cubbyhole = c;
      this.number = number;
   }

   public void run() {
      for (int i = 0; i < 10; i++) {
         cubbyhole.put(i);
         System.out.println("生产者 #" + this.number + " put: " + i);
         try {
            sleep((int)(Math.random() * 100));
         } catch (InterruptedException e) { }
      }
   }
}

运行结果:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
消费者 #1 got: 0
生产者 #1 put: 0
生产者 #1 put: 1
消费者 #1 got: 1
生产者 #1 put: 2
消费者 #1 got: 2
生产者 #1 put: 3
消费者 #1 got: 3
生产者 #1 put: 4
消费者 #1 got: 4
生产者 #1 put: 5
消费者 #1 got: 5
生产者 #1 put: 6
消费者 #1 got: 6
生产者 #1 put: 7
消费者 #1 got: 7
生产者 #1 put: 8
消费者 #1 got: 8
生产者 #1 put: 9
消费者 #1 got: 9

同等案例(二)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @program: BlockingQueue
 * @description:
 * @author: DM
 * @create: 2023
 **/

public class BlockingQueueTests {
    public static void main(String[] args) {
        //因为数组实现所以要求设定队列容量
        BlockingQueue queue=new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();//启动生产者线程

        new Thread(new Consumer(queue)).start();//三个消费者并发消费数据
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();


    }
}

//生成者
class Producer implements Runnable{
    //传入阻塞队列
    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue){
        this.queue=queue;
    }

    @Override
    public void run() {
        try {
            for (int i=0;i<100;i++){
                //不管企业和组件中间都有间隔
                Thread.sleep(20);//停顿20毫秒
                queue.put(i);
                System.out.println(Thread.currentThread().getName()+"生产:"+queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

//消费者
class Consumer implements Runnable{
    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue){
        this.queue=queue;
    }

    @Override
    public void run() {
        try {
            while (true){
                //不管企业和组件中间都有间隔
                Thread.sleep(new Random().nextInt(1000));//停顿0~1000毫秒
                //使用数据
                queue.take();
                System.out.println(Thread.currentThread().getName()+"消费:"+queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

运行结果:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Thread-0生产:1
Thread-0生产:2
Thread-0生产:3
Thread-0生产:4
Thread-0生产:5
Thread-0生产:6
Thread-0生产:7
Thread-0生产:8
Thread-0生产:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-0生产:10
Thread-1消费:10
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
//中间为生产者生产与消费者消费的过程(由于生产数据量相对较长所以省略)
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-3消费:8
Thread-2消费:7
Thread-1消费:6
Thread-2消费:5
Thread-3消费:4
Thread-1消费:3
Thread-2消费:2
Thread-3消费:1
Thread-1消费:0

Process finished with exit code 130

BlockingQueue实现类

BlockingQueue常见的有下面5个实现类,主要是应用场景不同。

  • ArrayBlockingQueue 基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。
  • LinkedBlockingQueue 基于链表实现的阻塞队列,默认是无界队列,创建可以指定容量大小
  • SynchronousQueue 一种没有缓冲的阻塞队列,生产出的数据需要立刻被消费
  • PriorityBlockingQueue 实现了优先级的阻塞队列,基于数据显示,是无界队列
  • DelayQueue 实现了延迟功能的阻塞队列,基于PriorityQueue实现的,是无界队列

BlockingQueue源码解析

BlockingQueue的5种子类实现方式大同小异,这次就以最常用的ArrayBlockingQueue做源码解析。

ArrayBlockingQueue类属性

先看一下ArrayBlockingQueue类里面有哪些属性:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 用来存放数据的数组
final Object[] items;

// 下次取数据的数组下标位置
int takeIndex;

// 下次放数据的数组下标位置
int putIndex;

// 当前已有元素的个数
int count;

// 独占锁,用来保证存取数据安全
final ReentrantLock lock;

// 取数据的条件
private final Condition notEmpty;

// 放数据的条件
private final Condition notFull;

ArrayBlockingQueue中4组存取数据的方法实现也是大同小异,本次以put和take方法进行解析。

put方法源码解析

无论是放数据还是取数据都是从队头开始,逐渐往队尾移动。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 放数据,如果队列已满,就一直阻塞,直到有其他线程从队列中取走数据
public void put(E e) throws InterruptedException {
    // 校验元素不能为空
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
      // 加锁,加可中断的锁
    lock.lockInterruptibly();
    try {
        // 如果队列已满,就一直阻塞,直到被唤醒
        while (count == items.length)
            notFull.await();
          // 如果队列未满,就往队列添加元素
        enqueue(e);
    } finally {
          // 结束后,别忘了释放锁
        lock.unlock();
    }
}

// 实际往队列添加数据的方法
private void enqueue(E x) {
    // 获取数组
    final Object[] items = this.items;
    // putIndex 表示本次插入的位置
    items[putIndex] = x;
    // ++putIndex 计算下次插入的位置
    // 如果本次插入的位置,正好等于队尾,下次插入就从 0 开始
    if (++putIndex == items.length)
        putIndex = 0;
      // 元素数量加一
    count++;
    // 唤醒因为队列空等待的线程
    notEmpty.signal();
}

源码中有个有意思的设计,添加元素的时候如果已经到了队尾,下次就从队头开始添加,相当于做成了一个循环队列。

像下面这样:

4.3 take方法源码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 取数据,如果队列为空,就一直阻塞,直到有其他线程往队列中放数据
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
      // 加锁,加可中断的锁
    lock.lockInterruptibly();
    try {
        // 如果队列为空,就一直阻塞,直到被唤醒
        while (count == 0)
            notEmpty.await();
        // 如果队列不为空,就从队列取数据
        return dequeue();
    } finally {
          // 结束后,别忘了释放锁
        lock.unlock();
    }
}

// 实际从队列取数据的方法
private E dequeue() {
      // 获取数组
    final Object[] items = this.items;
    // takeIndex 表示本次取数据的位置,是上一次取数据时计算好的
    E x = (E) items[takeIndex];
    // 取完之后,就把队列该位置的元素删除
    items[takeIndex] = null;
    // ++takeIndex 计算下次拿数据的位置
    // 如果本次取数据的位置,正好是队尾,下次就从 0 开始取数据
    if (++takeIndex == items.length)
        takeIndex = 0;
    // 元素数量减一
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // 唤醒被队列满所阻塞的线程
    notFull.signal();
    return x;
}

总结

  1. ArrayBlockingQueue基于数组实现的阻塞队列,创建队列时需指定容量大小,是有界队列。
  2. ArrayBlockingQueue底层采用循环队列的形式,保证数组位置可以重复使用。
  3. ArrayBlockingQueue存取都采用ReentrantLock加锁,保证线程安全,在多线程环境下也可以放心使用。
  4. 使用ArrayBlockingQueue的时候,预估好队列长度,保证生产者和消费者速率相匹配。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023/01/01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Carson带你学Java:解决生产者、消费者问题的五种实现方式
1. 简介 生产者 & 消费者之间存在 强耦合问题 2. 解决方案 采用 生产者 & 消费者 模式,具体介绍如下: 3. 具体解决方式介绍 方式1:wait() / notify() // Object类里的两个方法,所有Object子类都可使用这2个方法 // 对象的监视器对锁对象的锁定(也就是代码中的lock对象),注意是调用锁对象的wait() / nofity() public class Test { private static Integer count = 0;
Carson.Ho
2021/12/06
2490
Carson带你学Java:解决生产者、消费者问题的五种实现方式
java线程通信的三种方式「建议收藏」
在synchronized修饰的同步方法或者修饰的同步代码块中使用Object类提供的wait(),notify()和notifyAll()3个方法进行线程通信。
全栈程序员站长
2022/09/06
5690
并发工具箱 concurrent包的原理分析以及使用
BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。下图是对这个原理的阐述:
小勇DW3
2018/08/30
8700
并发工具箱 concurrent包的原理分析以及使用
java并发编程学习: 阻塞队列 使用 及 实现原理
队列(Queue)与栈(Stack)是数据结构中的二种常用结构,队列的特点是先进先出(First In First Out),而Stack是先进后出(First In Last Out),说得通俗点:Queue就是电影院入场时人们排起来的进场队伍,先来的人(即:前排在前面的人)先入场,而Statck则是一队人依次进入了一个死胡同想出来,先进去(最里面)的人,必须等后面的人(后进入的人)出来了,自己才能出来。 队列在多线程应用中,常用于生产-消费场景,打个通俗的比方:很多人早上喜欢去买油条,买油条的人相当于消
菩提树下的杨过
2018/01/18
9700
Java中的设计模式(二):生产者-消费者模式与观察者模式
  在上一篇 Java中的设计模式(一):观察者模式 中我们了解了 观察者模式 的基本原理和使用场景,在今天的这篇文章中我们要做一点简单的延伸性学习——对比一下 生产者-消费者模式 和 观察者模式 的异同。
闲宇非鱼
2022/02/08
8110
Java中的设计模式(二):生产者-消费者模式与观察者模式
七种阻塞队列
  在前面我们接触的队列都是非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)。
用户7353950
2022/05/10
1.6K0
七种阻塞队列
Java并发编程:阻塞队列
Java并发编程:阻塞队列   在前面几篇文章中,我们讨论了同步容器(Hashtable、Vector),也讨论了并发容器(ConcurrentHashMap、CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便。今天我们来讨论另外一类容器:阻塞队列。   在前面我们接触的队列都是非阻塞队列,比如PriorityQueue、LinkedList(LinkedList是双向链表,它实现了Dequeue接口)。   使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产
陈树义
2018/04/13
1K0
Java多线程之并发协作生产者消费者设计模式
两个线程一个生产者个一个消费者 需求情景 两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个 涉及问题 同步问题:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用标记或加锁机制 wait() / nofity() 方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。 wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。 notify()方法:当
Java团长
2018/07/23
9440
Java多线程之并发协作生产者消费者设计模式
Java多线程高并发学习笔记——阻塞队列
在探讨可重入锁之后,接下来学习阻塞队列,这篇文章也是断断续续的写了很久,因为最近开始学ssm框架,准备做一个自己的小网站,后续可能更新自己写网站的技术分享。 请尊重作者劳动成果,转载请标明原文链接: http://www.cnblogs.com/superfj/p/7757876.html 阻塞队列是什么? 首先了解队列,队列是数据先进先出的一种数据结构。阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况: 1.当阻塞队列为空时,获取队列元素的线程将等待,直到该则塞队列非空;
Janti
2018/04/10
1.3K0
Java多线程高并发学习笔记——阻塞队列
ArrayBlockingQueue
上一篇我们手写了一个阻塞队列,今天我们接着开始学习之旅,让我们一起来看看ArrayBlockingQueue的源码吧。ArrayBlockingQueue是JDK中提供的工业级的通过数组实现的阻塞队列。
码农飞哥
2021/08/18
3980
ArrayBlockingQueue 阻塞队列
一直都在写业务代码,对于 jdk 底层的代码难免有些疏忽,所以决定把一些比较重要的源码过一遍……
haifeiWu
2020/02/10
5410
信号量及其应用
上述代码中的若把s为当前可用资源的数量,P操作表示获取一个资源,V操作表示释放一个资源。使用资源前先进行P操作,使用完后进行V操作。
你的益达
2020/08/05
7080
深入理解阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
Java识堂
2019/08/13
2500
生产者-消费者模式的多种实现
二者都是Object类的方法,使用这套方法时必须获得同步锁synchronized。
青山师
2023/05/05
2470
生产者-消费者模式的多种实现
一文带你彻底掌握阻塞队列!
在之前的文章中,我们介绍了生产者和消费者模型的最基本实现思路,相信大家对它已经有一个初步的认识。
Java极客技术
2023/11/16
8080
一文带你彻底掌握阻塞队列!
简单易懂的Java Queue入门教程!
今天我要给大家分享一些自己日常学习到的一些知识点,并以文字的形式跟大家一起交流,互相学习,一个人虽可以走的更快,但一群人可以走的更远。
喵手
2023/11/17
3180
简单易懂的Java Queue入门教程!
BlockingQueue 源码分析
BlockingQueue 首先是一个队列,其次提供了阻塞功能。它看起来很像消息队列可让消息解耦,但其在生产者-消费者模型中通过阻塞又可使二者速度达到平衡。使用阻塞队列无需过多考虑线程安全问题,专注业务逻辑的实现即可
晚上没宵夜
2022/05/09
2610
BlockingQueue 源码分析
手写一个阻塞队列
前面我们手写实现了一个单向队列,一个循环队列,今天我们再手写实现一个阻塞队列。顾名思义,阻塞队列就是在普通队列的基础上加了阻塞功能。本文是为了后面看ArrayBlockingQueue的源码做的前期准备。
码农飞哥
2021/08/18
8560
Java并发编程--BlockingQueue
  BlockingQueue支持两个附加操作的Queue:1)当Queue为空时,获取元素线程被阻塞直到Queue变为非空;2)当Queue满时,添加元素线程被阻塞直到Queue不满。BlockingQueue不允许元素为null,如果入队一个null元素,会抛NullPointerException。常用于生产者消费者模式。
在周末
2019/09/11
5680
Java 7 种阻塞队列详解
队列(Queue)是一种经常使用的集合。Queue 实际上是实现了一个先进先出(FIFO:First In First Out)的有序表。和 List、Set 一样都继承自 Collection。它和 List 的区别在于,List可以在任意位置添加和删除元素,而Queue 只有两个操作:
海星
2020/09/27
9.7K0
相关推荐
Carson带你学Java:解决生产者、消费者问题的五种实现方式
更多 >
LV.1
这个人很懒,什么都没有留下~
作者相关精选
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验