首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

pika:RabbitMQ客户端,消息队列必学!

想玩转RabbitMQ,pika这个Python客户端库是绕不开的。它是个纯Python写的AMQP协议实现,跟RabbitMQ配合特别好。那些大厂的消息队列系统,底层很多都在用它。今天带你实操一把,保证写完就能上手用。

安装配置

装个pika超简单,pip一把梭就完事:

1pip install pika

连接RabbitMQ也就几行代码的事:

1import pika

3# 建立连接

4connection = pika.BlockingConnection(

5 pika.ConnectionParameters(host='localhost')

6)

7channel = connection.channel()

9# 声明队列

10channel.queue_declare(queue='hello')

小贴士:

记得先把RabbitMQ服务启动了再连

本地测试用localhost,生产环境记得改成真实IP

默认端口是5672,要改端口得在ConnectionParameters里指定

发送消息

整个发消息的逻辑贼简单,写个生产者代码:

1# producer.py

2import pika

4connection = pika.BlockingConnection(

5 pika.ConnectionParameters('localhost')

6)

7channel = connection.channel()

9# 队列不存在才建,已存在就跳过

10channel.queue_declare(queue='hello', durable=True)

12# 发送消息

13message = “老铁们,这是一条测试消息”

14channel.basic_publish(

15 exchange='', # 默认交换机

16 routing_key='hello', # 队列名

17 body=message,

18 properties=pika.BasicProperties(

19 delivery_mode=2 # 消息持久化

20 )

21)

23print(f“发送消息: {message}”)

24connection.close()

接收消息

消费者这边也不难,整个回调函数处理消息就成:

1# consumer.py

2import pika

4def callback(ch, method, properties, body):

5 print(f“收到消息: {body.decode()}”)

6 ch.basic_ack(delivery_tag=method.delivery_tag)

8connection = pika.BlockingConnection(

9 pika.ConnectionParameters('localhost')

10)

11channel = connection.channel()

13channel.queue_declare(queue='hello', durable=True)

15# 每次只处理一条消息

16channel.basic_qos(prefetch_count=1)

18channel.basic_consume(

19 queue='hello',

20 on_message_callback=callback

21)

23print(“等待消息中...”)

24channel.start_consuming()

小贴士:

basic_ack确认消息已处理,不然消息会一直重发

prefetch_count限制未确认的消息数,防止消费者压力太大

加上durable=True让队列持久化,重启不丢数据

进阶玩法

来点高端操作,用交换机实现发布订阅:

1# 声明交换机

2channel.exchange_declare(

3 exchange='logs',

4 exchange_type='fanout' # 广播模式

5)

7# 发送消息到交换机

8channel.basic_publish(

9 exchange='logs',

10 routing_key='',

11 body=message

12)

消费者这边要绑定临时队列:

1# 创建临时队列

2result = channel.queue_declare(queue='', exclusive=True)

5# 绑定到交换机

6channel.queue_bind(

7 exchange='logs',

8 queue=queue_name

9)

小贴士:

fanout是广播模式,所有绑定的队列都能收到消息

direct可以根据routing_key路由消息

topic支持通配符匹配routing_key

异常处理

生产环境必须要处理连接断开的情况:

1try:

2 channel.start_consuming()

3except pika.exceptions.ConnectionClosedByBroker:

4 print(“连接被broker关闭”)

5except pika.exceptions.AMQPChannelError:

6 print(“channel异常”)

7except KeyboardInterrupt:

8 print(“手动停止消费”)

9finally:

10 channel.close()

11 connection.close()

用pika写消息队列就讲到这,贴心提醒下坑点:

默认是不持久化的,重启数据就没了

消息没确认的话会一直重发,小心内存爆了

生产环境记得加连接重试和心跳检测

长连接最好加个定时重连机制

代码写得不错,但也别忘了看看监控面板,关注下队列堆积情况。好了,整个pika实战就到这,上手写代码去吧!

  • 发表于:
  • 原文链接https://page.om.qq.com/page/O0vdsuKC7EtBRp2D5HQu71zA0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券