前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)

【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)

原创
作者头像
JavaDog程序狗
修改2024-09-19 18:05:28
180
修改2024-09-19 18:05:28
举报
文章被收录于专栏:消息队列

前言

🍊缘由

消息队列一出手,pulsar就知有没有
🐣闪亮主角

大家好,我是【JavaDog程序狗】

今天跟大家分享pulsar,一个分布式的消息发布/订阅传递平台。

本狗以身入局,将pulsar的使用场景,结合实际使用案例,进行大白话分析

通过简单代码demo进行讲解,pulsar在java中如何使用?如何通过pulsar进行异步解耦?......等

😈你想听的故事

狗哥最近在整理学习笔记时,偶然在百度发现pulsar相关的教程竟然大部分付费会员才能看,淦!

首先,我不反对知识付费,但是花钱才能看总感觉差那么一点意思。

于是乎,狗哥将我司日常使用的消息队列pulsar进行总结整理,让大家一次性免费学个够,不付费也能学的酣畅淋漓

正文

🎯主要目标

1.pulsar是什么
2.pulsar有什么核心特性
3.pulsar与其他消息队列区别
4.docker如何安装pulsar
5.docker如何安装pulsar-manager
6.实际电商业务分析
7.java中如何使用pulsar解决业务问题

🍪目标讲解

一.pulsar是什么?
1.官网地址

https://pulsar.apache.org/

2.pulsar简介

Apache Pulsar 是一个高性能、可扩展且灵活的分布式消息传递和流处理平台

👽人话解释

Pulsar 就是一个消息中间件,和Kafka、RocketMQ功能差不多,多用于削峰解耦

3.pulsar发展历程
  • Pulsar 最初由 Yahoo 开发,并在内部大规模使用。
  • 2016 年,Yahoo 将 Pulsar 开源,并将其捐赠给 Apache 软件基金会。
  • 2018 年,Pulsar 成为了 Apache 软件基金会的顶级项目

******

二.pulsar核心特性?
1.消息传递
  • 支持多种消息订阅模式 如独占(Exclusive)、灾备(Failover)、共享(Shared)和 Key_Shared 订阅。
  • 支持消息的延迟发送, 即消息可以在发布后的一段时间内被消费者消费。
2.存储与计算分离架构
  • Pulsar 采用了存储与计算分离的设计,这意味着消息的处理逻辑(Broker)与数据存储可以独立扩展。
  • 这种架构使得 Pulsar 在扩展性和可靠性方面表现优异。
3.多租户支持
  • Pulsar 支持多租户环境,允许不同的应用程序或团队在同一个集群中运行而互不影响。
4.持久化存储
  • Pulsar 提供了持久化的消息存储能力,确保即使在故障发生时消息也不会丢失。
5.轻量级函数式计算
  • Pulsar 支持在消息传递之上构建简单的函数式计算,这使得它不仅仅是一个消息队列,还可以用于实现流处理应用。
6.跨地域复制
  • Pulsar 允许跨多个地理位置的数据中心进行数据复制,以实现高可用性和灾难恢复。

👽人话解释

Pulsar 就像是一个快递中转站,但它不仅能高效地处理大量的包裹(消息),还能确保每个包裹都能准确无误地送达目的地(消费者)

******

三.pulsar与其他消息队列区别?

Pulsar

ActiveMQ

RabbitMQ

RocketMQ

Kafka

单机吞吐量

十万级

万级

万级

十万级

十万级

开发语言

Java

Java

Erlang

Java

Java/Scala

维护者

Apache

Apache

Spring

Apache

Apache

社区活跃度

消费模式

独占、共享、灾备、key共享

P2P、Pub-Sub

direct、topic、Headers、fanout

基于Topic和MessageTag

基于Topic的Pub-Sub

顺序消息

支持

不支持

不支持

支持

支持

稳定性

一般

一般

较差

集群支持

集群模式

主备模式

复制模式

主备模式

集群可扩展性强

关于消息队列的选取,在实际案例中取决于你的具体需求和技术背景

  1. 大规模数据流处理 推荐使用:Apache Pulsar 或 Apache Kafka 理由: Pulsar:支持存储与计算分离,可以独立扩展存储和处理能力,非常适合大规模数据流处理。 Kafka:以其高性能和低延迟著称,适合处理大量实时数据流。
  2. 多租户环境 推荐使用:Apache Pulsar 理由: Pulsar:提供了强大的多租户支持,可以为不同的应用程序和服务分配独立的命名空间和资源。
  3. 高可用性和容错性 推荐使用:Apache Pulsar 或 Apache Kafka 理由: Pulsar:具有内置的高可用性支持,消息持久化到磁盘,支持消息复制。 Kafka:同样具有高可用性,支持消息持久化和副本机制。
  4. 灵活的消息路由 推荐使用:Apache RabbitMQ 理由: RabbitMQ:基于 AMQP 协议,支持复杂的路由规则和消息模式,非常适合需要灵活消息路由的场景。
  5. 简单易用性 推荐使用:Apache RabbitMQ 或 RocketMQ 理由: RabbitMQ:提供了丰富的客户端库,易于集成。 RocketMQ:API 设计简洁,易于理解和使用。

******

四.docker如何安装pulsar

小伙伴如果不会在windows上安装docker,请查看狗哥之前文章

https://mp.weixin.qq.com/s/kZXSpU8Cc2yswEglozAvTQ

1.安装pulsar

拉取pulsar2.9.2版本镜像

在Windows PowerShell,输入拉取pulsar镜像命令

代码语言:shell
复制
docker pull apachepulsar/pulsar:2.9.2
2.启动pulsar

启动pulsar2.9.2单机版

代码语言:shell
复制
docker run -it -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:2.9.2 bin/pulsar standalone
3.关闭对话框

使用Ctrl+P+Q组合键。当你在容器内部时,按下这个组合键可以将当前的终端会话断开,同时保持容器继续运行。

代码语言:shell
复制
Ctrl+P+Q

******

五.docker如何安装pulsar-manager

什么是pulsar-manager?

Pulsar Manager 是 Apache Pulsar 的一个管理工具,它提供了一个用户界面和 RESTful API 用于管理和监控 Pulsar 集群。 Pulsar Manager 可以帮助管理员执行一系列集群管理任务,例如创建和删除命名空间、管理 topic、查看集群状态等。

👽人话解释

pulsar-manager就是一个pulsar的可视化工具,功能就像Navicat差不多

1.安装pulsar-manager

拉取pulsar-manager0.2.0版本镜像

在Windows PowerShell,输入拉取pulsar镜像命令

代码语言:shell
复制
docker pull apachepulsar/pulsar-manager:v0.2.0
2.启动pulsar-manager

启动pulsar-manager v0.2.0

代码语言:shell
复制
docker run -it -p 9527:9527 -p 7750:7750 -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties apachepulsar/pulsar-manager:v0.2.0
3.关闭对话框

使用Ctrl+P+Q组合键。当你在容器内部时,按下这个组合键可以将当前的终端会话断开,同时保持容器继续运行。

代码语言:shell
复制
Ctrl+P+Q
4.设置账号和密码

设置账号为admin,密码为apachepulsar

代码语言:shell
复制
CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
   -H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
   -H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
   -H "Content-Type: application/json" \
   -X PUT http://localhost:7750/pulsar-manager/users/superuser \
   -d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "xxx@test.org"}'
5.登录控制台验证

访问 http://localhost:9527/

输入账号:admin 密码:apachepulsar

6.新增环境

新增环境,填写环境名,及启动的pulsar单体应用地址

7.查看控制台
  • Clusters (集群) 定义: Pulsar 集群是一组运行 Pulsar Broker 的服务器集合。 作用: 多个集群可以被配置来实现地理分布式的部署,以支持数据复制和灾难恢复。 管理: 使用 pulsar-admin clusters 命令来管理集群。
  • Tenants (租户) 定义: 租户代表了组织或应用程序的逻辑分隔。 作用: 租户用于隔离不同组织的数据,并且可以为每个租户设置访问控制策略。 管理: 使用 pulsar-admin tenants 命令来管理租户。
  • Namespaces (命名空间) 定义: 命名空间是在租户内部进一步划分资源的方式。 作用: 命名空间用于隔离不同应用程序的数据,并且可以为每个命名空间设置不同的策略和配置。 管理: 使用 pulsar-admin namespaces 命令来管理命名空间。
  • Topics (主题) 定义: 主题是消息的实际载体,是生产者发送消息和消费者接收消息的地方。 作用: 每个主题都属于一个特定的命名空间,主题可以有多个消费者订阅。 管理: 使用 pulsar-admin topics 命令来管理主题。
  • Tokens (令牌) 定义: 令牌是一种认证机制,允许客户端通过提供一个安全令牌来访问 Pulsar 资源。 作用: 令牌通常用于简化客户端认证过程,特别是对于不需要长期凭证的情况。 管理: 使用 pulsar-admin tokens 命令来管理令牌。

******

六.实际电商业务分析

🌰场景实例

张三在电商平台买了一个产品,支付成功后,张三等着收货就好...

但是在程序业务视角来看,支付成功后,其实还有很多下游服务在默默执行。

如库存、物流、订单服务都会有相应逻辑执行。

我们就模拟真实电商案例,来讲解下如何使用pulsar以及pulsar能解决什么痛点问题

情况一:没有使用pulsar消息队列,业务正常串行执行

张三支付成功后,更新订单,更新库存,更新物流...串行操作,每个服务都耗时2秒

这种串行的执行方式有很大的问题,如果整个链路串行执行,那么响应的时间就是每个业务执行时间想加,更新订单(2秒),更新库存(2秒),更新物流(2秒),总共耗时6秒

如果还有其他下游业务,链路时间会一直叠加,造成张三用户访问等待时长,并且如果链路中有失败,则会导致整个链路异常

情况二:使用pulsar消息队列,实现异步解耦

张三支付成功后,更新订单,更新库存,更新物流...并行操作

这种使用异步解耦方式,每个服务都异步执行,响应立刻返回,用户体验绝佳

******

七.java中如何使用pulsar解决业务问题

我们将上面六中的两种情况进行代码实操,串行执行和使用pulsar异步解耦

1.代码结构
2.关键代码
  • 引入pulsar依赖
代码语言:xml
复制
  <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>${pulsar.version}</version>
  </dependency>
  • pulsar配置
代码语言:yml
复制
pulsar:
  #支付topic
  pay-topic: persistent://public/default/pay-topic
  #支付subscription
  pay-subscription: pay-subscription
  #订单topic
  order-topic: persistent://public/default/order-topic
  #订单subscription
  order-subscription: order-subscription
  #库存topic
  stock-topic: persistent://public/default/stock-topic
  #库存subscription
  stock-subscription: stock-subscription
  #物流topic
  logistics-topic: persistent://public/default/logistics-topic
  #物流subscription
  logistics-subscription: logistics-subscription
代码语言:yml
复制
# Pulsar配置
pulsar:
  url: pulsar://192.168.31.27:6650
  • pulsar生产者
代码语言:java
复制
package net.javadog.pulsar.pay.service.pulsar.impl;

import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import net.javadog.pulsar.logistics.service.pulsar.LogisticsPulsarProducer;
import net.javadog.pulsar.order.service.pulsar.OrderPulsarProducer;
import net.javadog.pulsar.stock.service.pulsar.StockPulsarProducer;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;

/**
 * 支付异步通知-消费者
 *
 * @author hdx
 * @version 1.0
 * @since 2024.07
 */
@Slf4j
@Component
public class PayPulsarConsumer {

    @Value("${pulsar.url}")
    private String url;
    @Value("${pulsar.pay-topic}")
    private String topic;
    @Value("${pulsar.pay-subscription}")
    private String subscription;

    private PulsarClient client = null;
    private Consumer<byte[]> consumer = null;

    private OrderPulsarProducer orderPulsarProducer;

    private StockPulsarProducer stockPulsarProducer;

    private LogisticsPulsarProducer logisticsPulsarProducer;


    @Resource
    public void setStockPulsarProducer(StockPulsarProducer stockPulsarProducer) {
        this.stockPulsarProducer = stockPulsarProducer;
    }

    @Resource
    public void setOrderPulsarProducer(OrderPulsarProducer orderPulsarProducer) {
        this.orderPulsarProducer = orderPulsarProducer;
    }

    @Resource
    public void setLogisticsPulsarProducer(LogisticsPulsarProducer logisticsPulsarProducer) {
        this.logisticsPulsarProducer = logisticsPulsarProducer;
    }

    /**
     * 使用@PostConstruct注解用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化
     */
    @PostConstruct
    public void initPulsar() throws Exception {
        try {
            //构造Pulsar client
            client = PulsarClient.builder()
                    .serviceUrl(url)
                    .build();
            final String ip = InetAddress.getLocalHost().getHostAddress().replaceAll("\\.", "");
            //创建consumer
            consumer = client.newConsumer()
                    .topic(topic.split(","))
                    .consumerName("payPulsarConsumer" + ip)
                    .subscriptionName(subscription)
                    //指定消费模式,包含:Exclusive,Failover,Shared,Key_Shared。默认Exclusive模式
                    .subscriptionType(SubscriptionType.Shared)
                    //指定从哪里开始消费还有Latest,valueof可选,默认Latest
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                    //指定消费失败后延迟多久broker重新发送消息给consumer,默认60s
                    .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
                    .batchReceivePolicy(BatchReceivePolicy.builder()
                    .maxNumBytes(10*1024*1024)
                    .maxNumMessages(-1)
                    .timeout(100, TimeUnit.MILLISECONDS)
                    .build())
                    .subscribe();
        } catch (Exception e) {
            log.error("Pulsar初始化异常:", e);
            throw e;
        }
    }

    public void start() throws Exception {
        //消费消息
        log.debug("支付消费者启动");
        while (true) {
            Message<byte[]> message = consumer.receive();
            log.debug("消费消息中,message:{}", message);
            final String id = new String(message.getValue());

            if (ObjectUtil.isNotNull(id)) {
                try {
                    messageHandle(id);
                    consumer.acknowledge(message);
                } catch (Exception e) {
                    log.error("消费Pulsar支付数据异常,key【{}】,orderId【{}】:", message.getKey(), id, e);
                }
            }

        }
    }

    private void messageHandle(String orderId) {
        log.info("【支付消费】,orderId:{}", orderId);

        // 订单生产消息
        orderPulsarProducer.handler(orderId, 0);

        // 库存生产消息
        stockPulsarProducer.handler(orderId, 0);

        // 物流生产消息
        logisticsPulsarProducer.handler(orderId, 0);
    }

    public void close() {
        try {
            consumer.close();
        } catch (PulsarClientException e) {
            log.error("关闭Pulsar消费者失败:", e);
        }
        try {
            client.close();
        } catch (PulsarClientException e) {
            log.error("关闭Pulsar连接失败:", e);
        }
    }

}
3.启动调试
  • 程序启动

在pulsar-boot-main下找到Application并运行。注意:pulsar一定先起来!!

  • 打开swagger接口文档

在控制台打开打印的swagger地址,进行接口调试

  • 访问串行调用方法A

随意录入orderId,点击支付-普通串行支付-方式A接口【Execute】按钮,查看结果

  • 访问消息队列解耦方法B

随意录入orderId,点击采用消息队列解耦-方式B接口【Execute】按钮,查看结果

4.演示总结

demo代码逻辑简单,业务逻辑清晰,以最直观的响应时间来展示使用pulsar异步解耦优势,并使用优雅的分层使代码结构干净整洁

希望大家能够下载demo实操一下,好记性不如烂笔头;彻底掌握使用pulsar的小技巧,将其运用到实战中,真正体现它的优点。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 🍊缘由
      • 消息队列一出手,pulsar就知有没有
  • 正文
    • 🎯主要目标
      • 1.pulsar是什么
      • 2.pulsar有什么核心特性
      • 3.pulsar与其他消息队列区别
      • 4.docker如何安装pulsar
      • 5.docker如何安装pulsar-manager
      • 6.实际电商业务分析
      • 7.java中如何使用pulsar解决业务问题
    • 🍪目标讲解
      • 一.pulsar是什么?
      • 二.pulsar核心特性?
      • 三.pulsar与其他消息队列区别?
      • 四.docker如何安装pulsar
      • 五.docker如何安装pulsar-manager
      • 六.实际电商业务分析
      • 七.java中如何使用pulsar解决业务问题
相关产品与服务
消息队列 Pulsar 版
消息队列 Pulsar 版(TDMQ for Apache Pulsar,简称 TDMQ Pulsar 版)是基于 Apache Pulsar 自研的消息中间件,具备极好的云原生和 Serverless 特性,计算存储分离的架构使其在扩缩容方面具备良好的底层优势。目前已应用在腾讯计费绝大部分场景,包括支付主路径、实时对账、实时监控、大数据实时分析等方面。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档