操作场景
本文以调用 C++ SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
操作步骤
1. 准备环境。
1.1 在客户端环境安装 Pulsar C++ client,安装过程可参考官方教程 Pulsar C++ client。
1.2 在项目中引入 Pulsar C++ client 相关头文件及动态库。
2. 创建客户端。
// 客户端配置信息ClientConfiguration config;// 设置授权角色密钥AuthenticationPtr auth = pulsar::AuthToken::createWithToken(AUTHENTICATION);config.setAuth(auth);// 创建客户端Client client(SERVICE_URL, config);
3. 创建生产者。
// 生产者配置ProducerConfiguration producerConf;producerConf.setBlockIfQueueFull(true);producerConf.setSendTimeout(5000);// 生产者Producer producer;// 创建生产者Result result = client.createProducer(// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称"persistent://pulsar-xxx/sdk_cpp/topic1",producerConf,producer);if (result != ResultOk) {std::cout << "Error creating producer: " << result << std::endl;return -1;}
说明
Topic 名称需要填入完整路径,即
persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上 Topic管理 页面直接复制。3. 发送消息。
// 消息内容std::string content = "hello cpp client, this is a msg";// 构建消息对象Message msg = MessageBuilder().setContent(content).setPartitionKey("mykey") // 业务key.setProperty("x", "1") // 设置消息参数.build();// 发送消息Result result = producer.send(msg);if (result != ResultOk) {// 发送失败std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl;} else {// 发送成功std::cout << "The message " << content << " sent successfully" << std::endl;}
4. 创建消费者。
// 消费者配置信息ConsumerConfiguration consumerConfiguration;consumerConfiguration.setSubscriptionInitialPosition(pulsar::InitialPositionEarliest);// 消费者Consumer consumer;// 订阅topicResult result = client.subscribe(// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称"persistent://pulsar-xxx/sdk_cpp/topic1",// 订阅名称"sub_topic1",consumerConfiguration,consumer);if (result != ResultOk) {std::cout << "Failed to subscribe: " << result << std::endl;return -1;}
说明
Topic 名称需要填入完整路径,即
persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上 Topic管理 页面直接复制。
subscriptionName 需要写入订阅名,可在消费管理界面查看。
5. 消费消息。
Message msg;// 获取消息consumer.receive(msg);// 模拟业务std::cout << "Received: " << msg << " with payload '" << msg.getDataAsString() << "'" << std::endl;// 回复ackconsumer.acknowledge(msg);// 消费失败回复nack, 消息将会重新投递// consumer.negativeAcknowledge(msg);
6. 登录 TDMQ Pulsar 版控制台,依次点击 Topic 管理 > Topic 名称进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。
说明