首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

如何使用Python操作RabbitMQ实现direct关键字发布订阅模式?有录播直播私教课视频教程

基本用法

发布者

import json

from rabbitmq import pika

import rabbitmq

# 建立连接

credentials = rabbitmq.PlainCredentials(

'zhangdapeng',

'zhangdapeng520',

)  # mq用户名和密码

connection_target = rabbitmq.ConnectionParameters(

host='127.0.0.1',

port=5672,

virtual_host='/',

credentials=credentials,

)

connection = rabbitmq.BlockingConnection(connection_target)

# 队列信息

exchange_name = "user_manager_direct"

# 创建管道

channel = connection.channel()

# 声明一个交换机

channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.direct)

# 向队列中写入数据

user = {"id": 1, "name": "张三", "age": 23}

message = json.dumps(user, ensure_ascii=True)

channel.basic_publish(

exchange=exchange_name,

routing_key="error",  # 这里不再是队列名了,而是关键字

body=message.encode('utf8'),

properties=pika.BasicProperties(delivery_mode=2),  # 声明消息在队列中持久化

)

channel.basic_publish(

exchange=exchange_name,

routing_key="info",  # 这里不再是队列名了,而是关键字

body=message.encode('utf8'),

properties=pika.BasicProperties(delivery_mode=2),  # 声明消息在队列中持久化

)

print(message)

# 关闭连接

connection.close()

消费者

import rabbitmq

import json

# 创建连接

credentials = rabbitmq.PlainCredentials(

'zhangdapeng',

'zhangdapeng520',

)

target = rabbitmq.ConnectionParameters(

host='127.0.0.1',

port=5672,

virtual_host='/',

credentials=credentials,

)

connection = rabbitmq.BlockingConnection(target)

# 创建管道

channel = connection.channel()

# 队列信息

exchange_name = "user_manager_direct"

# 绑定交换机

channel.exchange_declare(

exchange=exchange_name,

exchange_type=rabbitmq.ExchangeType.direct,

)

# 绑定队列

result = channel.queue_declare(

queue="",  # 这里不要指定队列名

exclusive=True,

)

queue_name = result.method.queue  # 通过result获取队列名

channel.queue_bind(

exchange=exchange_name,

queue=queue_name,

routing_key="error",  # 可以通过routing_key绑定多个关键字

)

channel.queue_bind(

exchange=exchange_name,

queue=queue_name,

routing_key="info",  # 可以通过routing_key绑定多个关键字

)

def callback(ch, method, properties, body):

"""每次接收到消息的消费回调方法"""

ch.basic_ack(delivery_tag=method.delivery_tag)

data = body.decode("utf8")

print(json.loads(data))

# 开始消费

channel.basic_consume(

queue=queue_name,

on_message_callback=callback,

auto_ack=False,

)

try:

channel.start_consuming()

finally:

connection.close()

简化代码

生产者

import json

from rabbitmq import pika

import rabbitmq

# 建立连接

connection = rabbitmq.get_connection()

# 队列信息

exchange_name = "user_manager_direct"

# 创建管道

channel = connection.channel()

# 声明一个交换机

channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.direct)

# 向队列中写入数据

user = {"id": 1, "name": "张三", "age": 23}

rabbitmq.send_json(channel, user, exchange_name, "error")

rabbitmq.send_json(channel, user, exchange_name, "info")

# 关闭连接

connection.close()

消费者

import rabbitmq

import json

# 创建连接

connection = rabbitmq.get_connection()

# 创建管道

channel = connection.channel()

# 队列信息

exchange_name = "user_manager_direct"

# 绑定交换机

channel.exchange_declare(

exchange=exchange_name,

exchange_type=rabbitmq.ExchangeType.direct,

)

# 绑定队列

result = channel.queue_declare(

queue="",  # 这里不要指定队列名

exclusive=True,

)

queue_name = result.method.queue  # 通过result获取队列名

channel.queue_bind(

exchange=exchange_name,

queue=queue_name,

routing_key="error",  # 可以通过routing_key绑定多个关键字

)

channel.queue_bind(

exchange=exchange_name,

queue=queue_name,

routing_key="info",  # 可以通过routing_key绑定多个关键字

)

def callback(ch, method, properties, body):

"""每次接收到消息的消费回调方法"""

print(rabbitmq.receive_json(ch, method, body))

# 开始消费

rabbitmq.consume(connection, queue_name, callback)

进一步简化代码

生产者

import rabbitmq

# 建立连接

connection = rabbitmq.get_connection()

# 队列信息

exchange_name = "user_manager_direct"

# 创建管道

channel, _ = rabbitmq.get_direct_channel(connection, exchange_name)

# 向队列中写入数据

user = {"id": 1, "name": "张三", "age": 23}

rabbitmq.send_json(channel, user, exchange_name, "error")

rabbitmq.send_json(channel, user, exchange_name, "info")

# 关闭连接

connection.close()

消费者

import rabbitmq

import json

# 创建连接

connection = rabbitmq.get_connection()

# 队列信息

exchange_name = "user_manager_direct"

# 创建管道

channel, queue_name = rabbitmq.get_direct_channel(connection, exchange_name, ["error", "info"])

def callback(ch, method, properties, body):

"""每次接收到消息的消费回调方法"""

print(rabbitmq.receive_json(ch, method, body))

# 开始消费

rabbitmq.consume(connection, queue_name, callback)

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OajYGui6AbzN7pqWV-BYoblQ0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券