Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >rabbitmq消息队列——"路由"

rabbitmq消息队列——"路由"

作者头像
用户1141560
发布于 2017-12-26 03:33:26
发布于 2017-12-26 03:33:26
69100
代码可运行
举报
文章被收录于专栏:西安-晁州西安-晁州
运行总次数:0
代码可运行

在之前的教程中,我们创建了一个简单的日志系统。我们能够向许多交换器转发日志消息。

在本教程中,我们将添加一个功能——我们让它仅仅接收我们感兴趣的日志类别。举例:我们 实现仅将严重级别的错误日志写入磁盘(为了节省磁盘空间),其余日志级别的日志直接打印到控制台。

绑定

之前的章节中我们已经创建过绑定,你可能还会记得:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
err = ch.QueueBind(
  q.Name, // queue name
  "",     // routing key
  "logs", // exchange
  false,
  nil)

绑定是用来维系交换器和队列关系的,这可以被简单地理解为:队列仅仅对从交换器中传的消息感兴趣。

绑定有个额外参数叫做routing_key,为了避免与Channel.Publish方法中的参数相混淆,我们称之为binding key(绑定键)。使用绑定键创建绑定如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
err = ch.QueueBind(
  q.Name,    // queue name
  "black",   // routing key
  "logs",    // exchange
  false,
  nil)

绑定键的含义取决于交换器的类型。我们之前使用的fanout类型的交换器,就会直接忽略这个参数。

Direct型交换器

我们之前的教程中的日志系统是广播所有的消息到所有消费者。我们希望以此拓展来实现根据消息严重性来过滤消息。比如我们希望 写日志到硬盘的代码仅仅接收严重级别的,不要浪费磁盘存储在warning或者info级别的日志。

之前使用的是fanout类型交换器,没有更好的拓展性或者说灵活性——它只能盲目的广播。

现在 使用direct型交换器替代。Direct型的路由算法 比较简单——消息会被派发到某个队列,该队列的绑定键恰好和消息的路由键一致。

为了阐述,考虑如下设置:

该设置中,可以看到direct型的交换器X被绑定到了两个队列:Q1、Q2。Q1使用绑定键orange绑定,Q2包含两个绑定键:black和green。

基于如上设置的话,使用路由键orange发布的消息会被路由到Q1队列,而使用black或者green路由键的消息均会被路由到Q2,所有其余消息将被丢弃。

备注:这里的交换器X和队列的绑定是多对多的关系,也就是说一个交换器可以到绑定多个队列,一个队列也可以被多个交换器绑定,消息只会被路由一次,不能因为两个绑定键都匹配上了路由键消息就会被路由两次,这种是不存在的。

多个绑定

用相同的绑定键去绑定多个队列是完全合法的,我们可以再添加一个black绑定键来绑定X和Q1,这样Q1和Q2都使用black绑定到了交换器X,这其实和fanout类型的交换器直接绑定到队列Q1、Q2功能相同:使用black路由键的消息会被直接路由到Q1和Q2。

发送日志

我们将使用该模型来构建日志系统。使用direct型的交换器替换fanout型的,我们将日志的严重级别作为路由键,这样的话接收端程序可以选择日志接收级别进行接收,首先聚焦下日志发送端:

首先创建一个交换器:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
err = ch.ExchangeDeclare(
  "logs_direct", // name
  "direct",      // type
  true,          // durable
  false,         // auto-deleted
  false,         // internal
  false,         // no-wait
  nil,           // arguments
)

然后是发送消息:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
err = ch.ExchangeDeclare(
  "logs_direct", // name
  "direct",      // type
  true,          // durable
  false,         // auto-deleted
  false,         // internal
  false,         // no-wait
  nil,           // arguments
)
failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)
err = ch.Publish(
  "logs_direct",         // exchange
  severityFrom(os.Args), // routing key
  false, // mandatory
  false, // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
  })

为了简单起见,我们假设日志严重级别如下:'info', 'warning', 'error'。

订阅

接收还和之前章节接收一样,只有一个例外:我们将为每一个感兴趣的严重级别创建一个绑定:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
q, err := ch.QueueDeclare(
  "",    // name
  false, // durable
  false, // delete when usused
  true,  // exclusive
  false, // no-wait
  nil,   // arguments
)
failOnError(err, "Failed to declare a queue")

if len(os.Args) < 2 {
  log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
  os.Exit(0)
}
for _, s := range os.Args[1:] {
  log.Printf("Binding queue %s to exchange %s with routing key %s",
     q.Name, "logs_direct", s)
  err = ch.QueueBind(
    q.Name,        // queue name
    s,             // routing key
    "logs_direct", // exchange
    false,
    nil)
  failOnError(err, "Failed to bind a queue")
}

糅合在一起

发送端:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// rabbitmq_4_emit_log_direct.go project main.go
package main

import (
	"fmt"
	"log"
	"os"
	"strings"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
		panic(fmt.Sprintf("%s: %s", msg, err))
	}
}

func main() {
	//链接队列服务
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	//声明一个channel
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	//声明一个direct类型交换器
	err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil)
	failOnError(err, "Failed to declare an exchange")

	body := bodyFrom(os.Args)
	ch.Publish("logs_direct", severityFrom(os.Args), false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	})
	failOnError(err, "Failed to publish a message")

	log.Printf(" [x] Sent %s", body)

}

//接收消息发送内容
func bodyFrom(args []string) string {
	var s string
	if (len(args) < 3) || os.Args[2] == "" {
		s = "hello"
	} else {
		s = strings.Join(args[2:], " ")
	}
	return s
}

//接收日志级别,作为路由键使用
func severityFrom(args []string) string {
	var s string
	if len(args) < 2 || args[1] == "" {
		s = "info"
	} else {
		s = args[1]
	}
	return s
}

接收端:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// rabbitmq_4_receive_logs_direct.go project main.go
package main

import (
	"fmt"
	"log"
	"os"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
		panic(fmt.Sprintf("%s: %s", msg, err))
	}
}

func main() {
	//链接队列服务
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	//声明一个channel
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	//声明一个direct类型交换器
	err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil)
	failOnError(err, "Failed to declare an exchange")

	//声明一个队列
	q, err := ch.QueueDeclare("", false, false, true, false, nil)
	failOnError(err, "Failed to declare a queue")

	//判断cmd窗口接收参数是否足够
	if len(os.Args) < 2 {
		log.Printf("Usage:%s [info] [warning] [error]", os.Args[0])
		os.Exit(0)
	}

	//cmd窗口输入的多个日志级别,分别循环处理—进行绑定
	for _, s := range os.Args[1:] {
		log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s)
		ch.QueueBind(q.Name, s, "logs_direct", false, nil)
		failOnError(err, "Failed to bind a queue")
	}

	msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)
	go func() {
		for d := range msgs {
			log.Printf(" [x] %s", d.Body)
		}
	}()
	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

如果您只想保存“警告”和“错误”(而不是“信息”)日志消息到文件,只需要打开一个控制台然后输入:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
go run receive_logs_direct.go warning error > logs_from_rabbit.log

如果你想看到所有的日志消息在你的屏幕上,打开一个新的终端,输入:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
go run receive_logs_direct.go info warning error

发出一个错误日志消息类型如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
go run emit_log_direct.go error "Run. Run. Or it will explode."

可以观察到:

消息可以进行分类接收了, 只有error级别的消息才会被存入log日志文件,而info、warning级别的都不存入。

实际效果如下:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016-11-16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
rabbitmq消息队列——"topic型交换器"
本文介绍了RabbitMQ中Topic Exchange和Direct Exchange的区别以及它们在实际使用中需要注意的事项。同时,还提供了一个使用Topic Exchange的日志系统示例。
用户1141560
2017/12/26
8050
rabbitmq消息队列——"topic型交换器"
【实践】消息队列RabbitMQ从入门安装到精通原理
从安装环境,配置入门,到HelloWorld实操,各种类型消息传递的演示代码,原理介绍,答疑解惑,面试题,全面介绍RabbitMQ消息队列。 RabbitMQ集群搭建另外一篇文章介绍。
辉哥
2021/01/29
1.2K0
RabbitMQ中文系列教程五
在上一教程中,我们构建了一个简单的日志记录系统。我们能够向许多消息消费者推送广播日志消息。
兔云小新LM
2023/02/28
3280
RabbitMQ中文系列教程五
rabbitmq消息队列——"发布订阅"
三、”发布订阅” 上一节的练习中我们创建了一个工作队列。队列中的每条消息都会被发送至一个工作进程。这节,我们将做些完全不同的事情——我们将发送单个消息发送至多个消费者。这种模式就是广为人知的“发布订阅
用户1141560
2017/12/26
1K0
rabbitmq消息队列——"发布订阅"
RabbitMQ中文系列教程四
在前面的教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务都是只交付给一个消费者。在这一部分中,我们将做一些完全不同的事情。我们将向多个消费者传递消息。此模式被称为“发布/订阅”。
兔云小新LM
2023/02/28
5480
RabbitMQ中文系列教程四
rabbitmq消息队列——"工作队列"
二、”工作队列” 在第一节中我们发送接收消息直接从队列中进行。这节中我们会创建一个工作队列来分发处理多个工作者中的耗时性任务。 工作队列主要是为了避免进行一些必须同步等待的资源密集型的任务。实际上我们
用户1141560
2017/12/26
1.5K0
rabbitmq消息队列——"工作队列"
rabbitmq消息队列——"Hello World!"
用户1141560
2017/12/26
1.2K0
rabbitmq消息队列——"Hello World!"
RabbitMQ系列笔记主题订阅模式
昨天的内容主要讲了RabbitMQ的发布订阅模式和路由模式,都很好的满足了我们的日志打印,但是如果说,我对日志的打印,希望可以过滤掉一些内容呢,比如说,在打印错误日志的时候,只打印login时的错误?这个时候,就需要我们使用主题订阅的模式,可以说,主题订阅模式可以完全代替路由模式,因为在主题订阅模式中,如果没有响应的关键词,便和路由模式完全一样。
陌无崖
2019/08/16
6030
RabbitMQ系列笔记主题订阅模式
Go之RabbitMQ(三)优先级队列
RabbitMQ3.5.0之后官方版本已经实现了优先级队列。数值越大则优先级越高。
灰子学技术
2020/06/03
9440
【深度知识】RabbitMQ死信队列的原理及GO实现
本文按照以下目前讲解RabbitMQ死信队列的内容,包括: (1)死信队列是什么? (2)如何配置死信队列? (3)死信队列代码实现演示(GO版本/JAV版本) (3)死信队列的应用场景? 网上Java版本的死信队列演示代码较多,特定找了GO版本的代码供大家演示使用。
辉哥
2021/01/29
1.8K0
RabbitMQ系列笔记广播模式和路由模式
上一节介绍了简单的工作模式,即一个队列可以被多个消费者进行消费,只有一条消息被送到消费者,采用公平调度的方式,在以往的例子中似乎我们还没用到交换器进行发送消息,我们都知道,往队列里发送消息,是需要用交换器进行分发的消息的,为什么我们没有申请交换器仍然可以发送消息呢?因为在RabbitMQ服务器中,如果不申请交换器,服务器会使用默认的交换器,所以说,交换器在发送消息的时候必不可少,今天我们学习两种交换器分别为fanout(扇形交换器)和direct(直连交换器)
陌无崖
2019/08/16
1.1K0
RabbitMQ系列笔记广播模式和路由模式
RabbitMQ中文系列教程三
在第一个教程中,我们 编写程序以从命名队列发送和接收消息。在此 我们将创建一个将用于分发的工作队列 多个工作人员之间的耗时任务。
兔云小新LM
2023/02/28
7260
RabbitMQ中文系列教程三
RabbitMQ性能测试
作者一直在寻找一个极低延时的消息队列,从目前的测试结果来看,只有nats达到了<1ms的水平,本文旨在测试rabbitmq的国latency,撰文记录并与大家分享。
王亚昌
2018/11/22
2.4K0
RabbitMQ 学习笔记3 - 使用amqp库连接RabbitMQ
使用Go 操作RabbitMQ 收发消息,可以 使用Go RabbitMQ客户端库 连接 RabbitMQ 来实现。
张云飞Vir
2020/04/09
2.6K0
RabbitMQ系列笔记入门篇
在了解本篇文章之前,先复习以下知识点,如果你对RabbitMQ还不了解,请先查看RabbitMQ系列笔记介绍篇这篇文章。
陌无崖
2019/08/16
4850
RabbitMQ系列笔记入门篇
RabbitMQ系列笔记终极封装篇
在阅读本篇笔记时,如果你还不熟悉RabbitMQ,请查看公众号中关于RabbitMQ系列笔记相关文章,如果你已经熟悉了,还请在本篇文章多多指教。本文使用go mod进行获取相关包,使用Go1.12.6版本进行编写,编译器工具使用Vscode。
陌无崖
2019/08/16
1.6K0
RabbitMQ系列笔记终极封装篇
Rabbitmq 简单介绍,安装和go客户端使用
消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。 消息队列,一般我们会简称他为MQ(Message Queue),消息队列可以简单的理解为:把要传输的数据放在队列中
张琳兮
2019/09/16
1.1K0
Rabbitmq 简单介绍,安装和go客户端使用
[译]RabbitMQ教程C#版 - 路由
在本教程中,我们会日志系统其再添加一个特性,使其可以只订阅消息的一个子集。例如,将所有日志消息打印到 控制台的同时,只会将严重错误消息写入日志文件(保存到磁盘空间)。
Esofar
2018/09/05
6960
[译]RabbitMQ教程C#版 - 路由
详解SpringCloud中RabbitMQ消息队列原理及配置,一篇就够!
公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。
java进阶架构师
2020/09/22
3.8K0
详解SpringCloud中RabbitMQ消息队列原理及配置,一篇就够!
【MQ】什么是 MQ
A message queue is a form of asynchronous service-to-service communication used in serverless and microservices architectures. Messages are stored on the queue until they are processed and deleted. Each message is processed only once, by a single consumer.
JuneBao
2022/10/26
2.3K0
【MQ】什么是 MQ
相关推荐
rabbitmq消息队列——"topic型交换器"
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验