A message queue is a form of asynchronous service-to-service communication used in serverless and microservices architectures. Messages are stored on the queue until they are processed and deleted. Each message is processed only once, by a single consumer.
MQ 全称 Message Queue,中文译为消息队列,其实质是一个队列,队列是一种先进先出的数据结构,所以我们可以简单理解 MQ 是一种存储消息的容器,MQ 一般包括三类参与者:
一般来说, MQ 的使用场景有以下几个方面:
比如一个评论系统,往往在评论之后要发邮件通知原作者,在不使用 MQ 时,我们首先会想到串行处理,即:
func Talk(ctx *gin.Context, req BaseReqInter) BaseRespInter {
// 存储评论数据
dao.InsertNewTalk(req.Talk)
// 发邮件
utils.SendEmail(...)
return SuccessResp
}
上面的实现在功能上当然是可以的,但性能却不是很好,由于评论这个事件本事不依赖于发邮件这个事件,也就是从正常角度来说,一旦新评论落库,就应该响应成功,而不是在邮件发成功后才响应成功。
另一个解决方案就是异步地去处理发邮件这个事件:
func Talk(ctx *gin.Context, req BaseReqInter) BaseRespInter {
// 存储评论数据
dao.InsertNewTalk(req.Talk)
// 异步发邮件
go utils.SendEmail(...)
return SuccessResp
}
我们当然可以简单地使用携程或线程去异步处理发邮件,但总不能每一次评论都搞一个新线程去处理啊,携程还好,想想Java的线程,如果 QPS 很大,那一次性得开多少线程,并且这种异步的代码往往不是像上面加个 go
就完事,需要涉及很多后续异常情况的处理,现在只有发邮件,如果还要发短信,还要发站内信,这种代码往往面临维护困难的问题,所以上面的异步并不是一种好的处理方法。
最后的解决办法就是 MQ, 我们开启一个邮件消费者,持续读取并消费MQ中邮件队列里的消息,然后在评论数据落库之后就把要发的邮件扔到 MQ 中,然后直接对客户端响应成功:
func Talk(ctx *gin.Context, req BaseReqInter) BaseRespInter {
// 存储评论数据
dao.InsertNewTalk(req.Talk)
// 将邮件推送到 MQ 中
ch.Publish(q.Name, []byte(email))
return SuccessResp
}
这样一来 Talk
作为生产者只是负责把消息放到队列中而并不需要关心邮件消费者的消费情况,以此来实现异步处理。
还是上面异步的例子,串行操作的实质问题在于 Talk
方法耦合了本不应该耦合的 SendEmail
方法,一方面导致调用链过长从而使响应时间增多,另一方面还会导致接口稳定性降低,试想如果邮件服务出了问题,那就会导致评论接口挂掉,使用 MQ 的另一个好处就是可以降低程序耦合度, 因为 MQ 屏蔽了生产和消费的双方,双方都只需要和 Queue 交互而不用管消息产生和消费的细节
比如某个接口的 QPS 突然达到 3000,但服务器只能处理 2000 的,如果任由请求打进来,那服务器可能就会由于扛不住 QPS 而挂掉,这时的解决办法就是先把请求放在 MQ 中,让服务器以自己能接受的量去从 MQ 中消费请求,以此避免突然的高 QPS 打挂服务器。
AMQP, 全称 Advanced Message Queuing Protocol, 中文译为高级消息队列协议, 是一个用于在进程间传递异步消息的应用层协议。
AMQP 一般被分为下面三个层次:
AMQP 服务器:
在 AMQP 的服务器(broker)中,三个主要功能模块连接成一个处理链以完成预期的功能,他们分别是:
Channel:
总结:
AMQP 是一种应用层协议,和普通的 HTTP, SMTP 没有什么区别,用在进程间传递异步消息上,如果一个客户端程序 AMQP 协议就可以和服务器(消息中间件代理)进行通信。
RabbitMQ 是 使用 Erlang 语言对 AMQP 协议的一种实现,其客户端支持几乎所有的主流编程语言。
掘金 - 消息中间件部署及比较:rabbitMQ、activeMQ、zeroMQ、rocketMQ、Kafka、redis
RabbitMQ 使用 erlang 语言编写,因此在安装 RabbitMQ 之前需要下载 erlang 相关依赖,具体下载方法参考 官网:
然后就可以安装使用 RabbitMQ 了,安装方法见官网
ps:
下载好 RabbitMQ 之后, 使用 rabbitmq-plugins enable rabbitmq_management
命令添加 rabbitmq_management
插件,以方便使用图像化界面配置,添加之后使用 rabbitmq-sever start
命令启动 RabbitMQ 服务端,之后访问本地 http://localhost:15672
,使用默认用户名密码(都是 guest)登录即可看见如下界面:
由于我是个垃圾, 不会 SpringBoot, 只能用 Go 来学
按照官方教程,Golang 使用 RabbitMQ 需要 amqp 依赖:
go get github.com/streadway/amqp
下面这是官网给出的例子
package main
import (
`github.com/streadway/amqp`
`log`
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 获取管道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明要发送到的队列
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 向队列中发送数据
body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
运行生产者程序,刷新图形化界面,不出意外可以发现队列中的消息数应该从 0 变化成了 1
可以抓包看到 AMQP 报文的具体内容:
同样是官网的例子, 前面的部分没有变化,任然需要连接 RabbitMQ, 获取管道,声明要消费的队列,但在编写 Receiver 时,需要声明一个消费者:
// 声明一个消费者
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
ch.Consume
会返回一个只读的管道(chan),我们只需要遍历这个管道就可以从 MQ 中持续读取数据:
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
运行 Receiver 后,原先 MQ 中的一个消息应该就会被消费,队列中的消息数应该会重新为 0