基于 Apache Pulsar 消息队列的弹性伸缩

最近更新时间:2024-05-14 14:47:01

我的收藏

概述

KEDA 的触发器支持 Apache Pulsar,即根据 Pulsar 消息队列中的未消费的消息数量进行水平伸缩,用法请参见 KEDA Scalers: Apache Pulsar
腾讯云也有商业版的 Pulsar 产品,即 TDMQ for Pulsar。本文举例介绍配置基于 TDMQ for Pulsar 消息队列中未消费的消息数量进行水平伸缩。当然,如果您使用的是自建的开源 Apache Pulsar,配置方法也是类似的。

操作步骤

下面使用 pulsar-demo 来模拟 Pulsar 生产者和消费者,再结合 KEDA 配置实现 Pulsar 消费者基于 Pulsar 消息数量的水平伸缩,在实际使用中,可根据自己的情况进行相应替换。

1. 获取 Pulsar API 调用地址

1. Pulsar 集群管理页面 找到需要使用的 Pulsar 集群,单击接入地址可获取 Pulsar 的 URL,通常使用 VPC 内网接入地址(解析出来是169保留网段的 IP,在任意 VPC 都可用)。如下图所示:



2. 复制并记录 API 调用地址。

获取 Pulsar Topic

Pulsar Topic 管理页面,复制需要使用的 Topic 名称。如下图所示:



注意:
只支持持久化类型的 Topic,配置所需的 Topic 是在已复制的 Topic 名称前面加 persistent://

获取 Pulsar JWT Token

1. 确保在 Pulsar 角色管理 创建了所需的角色,并在 Pulsar 命名空间 中配置了相应的权限,确保所需角色有相应的生产消息或消费消息的权限。
2. 复制密钥,即 Pulsar 客户端所需的 JWT Token。如下图所示:




获取订阅名称

在 Topic 管理的消费者页面,根据需要,查看已有的订阅,或者新建订阅,并记录下需要使用的订阅名称。如下图所示:




部署生产者

1. 准备生产者配置,根据之前获取的 Pulsar 相关信息替换配置。示例如下:
apiVersion: v1
kind: Secret
type: Opaque
metadata:
name: producer-secret
stringData:
URL: http://pulsar-xxxxxxxxxxxx.tdmq.ap-cd.qcloud.tencenttdmq.com:5005 # 替换 API 调用地址
TOPIC: persistent://pulsar-xxxxxxxxxxxx/test-ns/test-topic # 替换 Topic
TOKEN: xxx # 替换角色密钥 (JWT Token)
2. 部署生产者持续发送新消息:
apiVersion: apps/v1
kind: Deployment
metadata:
name: producer
spec:
replicas: 1
selector:
matchLabels:
app: producer
template:
metadata:
labels:
app: producer
spec:
containers:
- name: producer
image: imroc/pulsar-demo:main
imagePullPolicy: Always
args:
- producer
- --produce-duration
- 2s # 每 2s 生产一条消息
envFrom:
- secretRef:
name: producer-secret
terminationGracePeriodSeconds: 1

部署消费者

1. 准备消费者配置,根据前面获取的 Pulsar 相关信息替换配置。示例如下:
apiVersion: v1
kind: Secret
type: Opaque
metadata:
name: consumer-secret
stringData:
URL: http://pulsar-xxxxxxxxxxxx.tdmq.ap-cd.qcloud.tencenttdmq.com:5005 # 替换 API 调用地址
TOPIC: persistent://pulsar-xxxxxxxxxxxx/test-ns/test-topic # 替换 Topic
TOKEN: xxx # 替换角色密钥 (JWT Token)
SUBSCRIPTION: xxx # 替换订阅名称
2. 通过 Deployment 部署消费者,持续消费消息:
apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer
spec:
replicas: 1
selector:
matchLabels:
app: consumer
template:
metadata:
labels:
app: consumer
spec:
containers:
- args:
- consumer
- --consume-duration
- 10s # 单个消费者每 10s 处理完一条消息
envFrom:
- secretRef:
name: consumer-secret
image: imroc/pulsar-demo:main
imagePullPolicy: Always
name: consumer
terminationGracePeriodSeconds: 1

配置 ScaledObject

1. 先创建 TriggerAuthentication 并引用 consumer-secret 中的 TOKEN:
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: consumer-auth
spec:
secretTargetRef:
- parameter: bearerToken
name: consumer-secret
key: TOKEN
2. 创建 ScaledObject(替换高亮行配置):
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: consumer-scaledobject
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: consumer
pollingInterval: 15
idleReplicaCount: 0 # 没有消息时缩到 0
minReplicaCount: 1
maxReplicaCount: 100
triggers:
- type: pulsar
metadata:
adminURL: http://pulsar-xxxxxxxxxxxx.tdmq.ap-cd.qcloud.tencenttdmq.com:5005 # 替换 API 调用地址
topic: persistent://pulsar-xxxxxxxxxxxx/test/persist-topic # 替换 Topic
subscription: my-sub # 替换订阅名称
isPartitionedTopic: "true" # 如果分区数大于 1,这里就置为 true
msgBacklogThreshold: "5" # 伸缩阈值,副本数=CEIL(消息堆积数/msgBacklogThreshold)
activationMsgBacklogThreshold: "1" # 如果当前副本数为 0,只要队列里来新消息了,就将副本置为 1 并启用伸缩
authModes: bearer # 角色密钥(JWT Token)本质上是 bearer 的认证模式
authenticationRef:
name: consumer-auth # 引用前面创建的 TriggerAuthentication

查看 HPA

如果配置正确,会自动创建出对应的 HPA 资源。执行如下命令,查看 HPA。
$ kubectl get hpa
NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE
keda-hpa-consumer-scaledobject Deployment/consumer 4600m/5 (avg) 1 10 5 31m
说明:
在上述输出中,可以通过 "TARGETS" 字段反推当前消息堆积数量。以上述输出为例,堆积消息数为4.6 * 5 = 23。

ScaledJob + 超级节点

如果单条消息处理耗时较大,但又需要尽量及时获取处理结果,可以配置 ScaledJob,每当队列中有新消息时,将自动创建一个 Job 来消费,让 Job 的 Pod 调度到超级节点,以实现按需使用计算资源和按量计费。
触发器的配置对于 ScaledObject 与 ScaledJob 完全一致,如需配置 ScaledJob,可参考 ScaledObject 的配置。