生产者:
# 新增一个生产者(单个client下可以创建多个生产者,请尽量复用)
producer = mq_client.create_producer('persistent://pulsar-kxxxx/dev/test')
for i in range(10):
producer.send(('Message-%d' % i).encode('utf-8'), deliver_at=int(datetime.timestamp(datetime.now() + timedelta(seconds=10))*1000))
# 关闭客户端(长时间不使用一定要记得关闭客户端,及时回收连接池资源)
mq_client.close()
消费者:
consumer = mq_client.subscribe('persistent://pulsar-kxxxx/dev/test', 'sub')
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# 确认消息已经成功收到和处理
consumer.acknowledge(msg)
except:
# 消息未被成功处理
consumer.negative_acknowledge(msg)
不管使用deliver_after还是deliver_at, 发现没有延迟效果,只要生产者一推消息,消费者就直接消费掉了,是我用的不对吗?还是sdk有问题
python3.8.8 sdk:pulsar-client-2.8.0.post0
相似问题