POP 消费模式的说明

最近更新时间:2024-09-05 17:18:11

我的收藏

问题背景

RocketMQ 以其高性能,低延迟和抗积压的特性被不少客户和开发者熟知,但是在 RocketMQ 4.x 客户端 SDK 的使用中,不少客户会反馈消费者客户端在实际消费消息过程中,4.x 的客户端(例如常用的 Push Comsumer)在消费的过程中遇到一些问题:
SDK 承担了太多功能,例如拉消息,负载均衡,消息位点管理和新增客户端时的 Rebalance 等等,这点对多语言开发开发者不友好。
队列独占的负载均衡策略容易导致消费瓶颈:Broker 上的每个队列只能分配到相同 Group 的一台消费者客户端上。因此当队列数固定时,单纯增加消费者客户端的数量并不能提升消费性能。假设某个 Topic 共有10个队列,Group 最多有 10 个客户端进行消费(即最多每个客户端消费一个队列)。在业务高峰期,即使客户想增加新的客户端去消费消息,新上线的第11个客户端也无法消费消息。
单个客户端异常导致堆积。假设单个客户端因为异常 “hang 机” 时,由于和服务端的心跳没有断开,因此该客户端会被分配到队列进行消费,而此时因为客户端异常实际并不能消费机器,导致异常堆积产生,且因为上一条的原因,单纯加客户端数量并不能解决问题。

解决方案

鉴于以上原因,5.x 推出了 POP 消费模式。
POP 模式下,消费位点由服务端进行管理,因此多个客户端可以消费同一个队列。使用 POP 消费模式的客户端,每个客户端都会从所有的队列去拉消息,因此解决了上述的单个客户端异常和消费瓶颈的问题。
同时,服务端维护消费信息,使得客户端 SDK 更加轻量,方便进行多语言移植。


代码示例

那么我们如何使用 POP 消费模式呢?
需要使用 5.x 的 gRPC SDK,引入相关依赖如下:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.6</version>
</dependency>
</dependencies>
同时参考开源社区的 DEMO 如下(以 Java 代码为例):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.java.example;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumerExample {
private static final Logger log = LoggerFactory.getLogger(SimpleConsumerExample.class);

private SimpleConsumerExample() {
}

@SuppressWarnings({"resource", "InfiniteLoopStatement"})
public static void main(String[] args) throws ClientException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// Credential provider is optional for client configuration.
String accessKey = "用户ak";
String secretKey = "用户sk";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);

String endpoints = "腾讯云页面接入点";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
// On some Windows platforms, you may encounter SSL compatibility issues. Try turning off the SSL option in
// client configuration to solve the problem please if SSL is not essential.
// .enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String consumerGroup = "消费组";
// 默认消费时间,30s,也就是说,对于拉到的消息,30s内如果消费没完成,该条消息会被别的客户端重新拉到,
// 需要用户根据自己的场景配置
Duration awaitDuration = Duration.ofSeconds(30);
String tag = "*";
String topic = "topic名字";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// set await duration for long-polling.
.setAwaitDuration(awaitDuration)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
// Receive message, multi-threading is more recommended.
do {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
log.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
consumer.ack(message);
log.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
} while (true);
// Close the simple consumer when you don't need it anymore.
// You could close it manually or add this into the JVM shutdown hook.
// consumer.close();
}
}
在这种情况下,同一个消费组内,一个消费者将不会再和队列一一绑定,也能最大程度避免之前4.x中单个消费者阻塞导致队列堆积的问题。