操作场景
本文以调用 PHP SDK 为例介绍通过 HTTP SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
注意:
暂不支持通过使用 HTTP 协议实现事务消息。
前提条件
完成 资源创建与准备。
安装 PHP。
更多示例可以参见开源社区的 Demo 示例。
重试机制
HTTP 采用固定重试间隔的机制:
重试间隔 | 最大重试次数 |
5分钟 | 可通过修改消费组配置实现自定义最大重试次数,默认 16 次。 |
说明:
客户端在重试间隔内 ACK 这条消息,表示消费成功,不会重试。
重试间隔到期后客户端仍未 ACK,客户端会重新消费到这条消息。
每次消费的消息句柄只在重试间隔内有效,过期无效。
操作步骤
步骤1:安装 PHP 依赖库
在 PHP 项目中引入相关依赖:
{"require": {"aliyunmq/mq-http-sdk": ">=1.0.4"}}
步骤2:获取参数
1. 登录消息队列 RocketMQ 控制台,选择集群。
2. 复制集群 ID、接入地址等参数。
步骤3:生产消息
创建消息生产者
private $client;private $producer;public function __construct(){// 获取 client$this->client = new MQClient(endpoint,accessKey,secretKey);$topic = topicName;$instanceId = instanceId;// 获取 producer$this->producer = $this->client->getProducer($instanceId, $topic);}
参数 | 说明 |
endpoint | 接入地址,在基本信息页面获取。 |
accessKey | 角色 AccessKey,在集群权限页面获取。 |
secretKey | 角色 SecretKey,在集群权限页面获取。 |
instanceId | 集群 ID。 |
topicName | 主题名称,在 Topic 页面获取。 |
发送消息
public function run(){try{for ($i=0; $i<8; $i++){$publishMessage = new TopicMessage("hello mq!");// 设置属性$publishMessage->putProperty("a", $i);$result = $this->producer->publishMessage($publishMessage);print "Send success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\\n";}} catch (\\Exception $e) {print_r($e->getMessage() . "\\n");}}
步骤3:消费消息
创建消费者
private $client;private $consumer;public function __construct(){// 获取 client$this->client = new MQClient(endpoint,accessKey,secretKey);$topic = topicName;$groupId = groupName;$instanceId = instanceId;// 获取consumer$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);}
参数 | 说明 |
endpoint | 接入地址,在基本信息页面获取。 |
accessKey | 角色 AccessKey,在集群权限页面获取。 |
secretKey | 角色 SecretKey,在集群权限页面获取。 |
instanceId | 集群 ID。 |
topicName | 主题名称,在 Topic 页面获取。 |
groupName | 消费组名称,在 Group 页面获取。 |
订阅消息
public function run(){while (True) {try {// 长轮询消费消息// 长轮询表示如果 topic 没有消息则请求会在服务端等待,如果有消息可以消费则立即返回// 如果对消费延迟比较敏感,强烈建议并发拉取消息$messages = $this->consumer->consumeMessage(batchSize,waitSeconds);} catch (\\Exception $e) {if ($e instanceof MQ\\Exception\\MessageNotExistException) {printf("No message, contine long polling!RequestId:%s\\n", $e->getRequestId());continue;}print_r($e->getMessage() . "\\n");sleep(1);continue;}print "consume finish, messages:\\n";$receiptHandles = array();foreach ($messages as $message) {$receiptHandles[] = $message->getReceiptHandle();printf("MessageID:%s TAG:%s BODY:%s \\nPublishTime:%d, FirstConsumeTime:%d, \\nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\\n",$message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),$message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),$message->getMessageKey());print_r($message->getProperties());// 处理业务}print_r($receiptHandles);try {$this->consumer->ackMessage($receiptHandles);} catch (\\Exception $e) {if ($e instanceof MQ\\Exception\\AckMessageException) {printf("Ack Error, RequestId:%s\\n", $e->getRequestId());foreach ($e->getAckMessageErrorItems() as $errorItem) {printf("\\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());}}}print "ack finish\\n";}}
参数 | 说明 |
batchSize | 一次拉取的消息条数,支持最多16条。 |
waitSeconds | 一次拉取的轮询等待时间,支持最长30秒。 |