TDMQ RocketMQ 版兼容了社区版 HTTP SDK 的接入,如果您此前使用的客户端使用了社区版 HTTP SDK,您在切换到 TDMQ RocketMQ 版后,您无需在客户端进行任何代码改造。
操作场景
如果当前您已使用了 HTTP 协议进行消息的收发,在您的客户端引入开源 HTTP SDK 后,TDMQ RocketMQ 版支持用户通过内网或公网使用 HTTP 协议接入。
本文以调用 Java SDK 为例介绍通过 HTTP SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
注意:
暂不支持通过使用 HTTP 协议实现事务消息。
在创建 Group(消费组)时需要制定类型(TCP 或者 HTTP,详情请参见 创建 Group 说明 ),因此,同一个 Group(消费组)不支持 TCP 和 HTTP 客户端同时消费。
前提条件
通过 Maven 方式引入依赖,在 pom.xml 文件中添加对应语言的 SDK 依赖。
重试机制
HTTP 采用固定重试间隔的机制,暂不支持自定义。
消息类型 | 重试间隔 | 最大重试次数 |
普通消息 | 5分钟 | 288 |
顺序消息 | 1分钟 | 288 |
说明:
客户端在重试间隔内 ACK 这条消息,表示消费成功,不会重试。
重试间隔到期后客户端仍未 ACK,客户端会重新消费到这条消息。
每次消费的消息句柄只在重试间隔内有效,过期无效。
操作步骤
步骤1:安装 Java 依赖库
在 Java 项目中引入社区版 HTTP SDK 依赖。
步骤2:获取参数
1. 登录 TDMQ 控制台,选择所在的集群,单击集群名进入集群详情页。
2. 如下图所示,选择顶部的命名空间页签,单击右侧的配置权限进入权限配置页面,如当前角色列表为空,可以单击新建,新建一个角色,详细描述请参见 完成资源创建与准备。
3. 在页面上复制对应的 AK 和 SK,以备在接下来的步骤中使用。
步骤3:生产消息
创建消息生产者
// 获取ClientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// 获取Topic的ProducerMQProducer producer = mqClient.getProducer(namespace, topicName);
发送消息
try {for (int i = 0; i < 10; i++) {TopicMessage pubMsg;pubMsg = new TopicMessage(("Hello RocketMQ " + i).getBytes(),"TAG");TopicMessage pubResultMsg = producer.publishMessage(pubMsg);System.out.println("Send mq message success. MsgId is: " + pubResultMsg.getMessageId());}} catch (Throwable e) {System.out.println("Send mq message failed.");e.printStackTrace();}
参数 | 说明 |
TAG | 设置消息的 TAG。 |
步骤3:消费消息
创建消费者
// 获取ClientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// 获取Topic的ConsumerMQConsumer consumer = mqClient.getConsumer(namespace, topicName, groupName, "TAG");
订阅消息
do {List<Message> messages = null;try {// 长轮询消费消息// 长轮询表示如果 topic 没有消息则请求会在服务端等待,如果有消息可以消费则立即返回messages = consumer.consumeMessage(Integer.parseInt(batchSize),Integer.parseInt(waitSeconds));} catch (Throwable e) {e.printStackTrace();}if (messages == null || messages.isEmpty()) {System.out.println(Thread.currentThread().getName() + ": no new message, continue!");continue;}for (Message message : messages) {System.out.println("Receive message: " + message);}{List<String> handles = new ArrayList<String>();for (Message message : messages) {handles.add(message.getReceiptHandle());}try {consumer.ackMessage(handles);} catch (Throwable e) {if (e instanceof AckMessageException) {AckMessageException errors = (AckMessageException) e;System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");if (errors.getErrorMessages() != null) {for (String errorHandle :errors.getErrorMessages().keySet()) {System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());}}continue;}e.printStackTrace();}}} while (true);
参数 | 说明 |
batchSize | 一次拉取的消息条数,支持最多16条。 |
waitSeconds | 一次拉取的轮询等待时间,支持最长30秒。 |