前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >RabbitMQ系列笔记终极封装篇

RabbitMQ系列笔记终极封装篇

作者头像
陌无崖
发布2019-08-16 17:34:55
发布2019-08-16 17:34:55
1.5K00
代码可运行
举报
运行总次数:0
代码可运行

不忘初心,砥砺前行

作者 | 陌无崖

转载请联系授权

导语

在阅读本篇笔记时,如果你还不熟悉RabbitMQ,请查看公众号中关于RabbitMQ系列笔记相关文章,如果你已经熟悉了,还请在本篇文章多多指教。本文使用go mod进行获取相关包,使用Go1.12.6版本进行编写,编译器工具使用Vscode。

封装思路

首先为我们的RabbitMQ的工作模式用变量进行区别,分别代表工作模式、广播模式、路由模式、主题模式。

代码语言:javascript
代码运行次数:0
复制
const (
    SimpleQueueType = "SimpleQueue"
    BroadQueueType  = "BroadQueue"
    DirectQueueType = "DirectQueue"
    TopicQueueType  = "TopicQueue"
)

定义一个客户端,该客户端包含连接,工作模式的类型,和所需队列的属性,我们使用这个结构体实现相关方法

代码语言:javascript
代码运行次数:0
复制
type MsgClient struct {
    Conn *amqp.Connection
    Type string `json:"type"` //消息类型
    Data string `json:"data"` //队列数据
}

定义一个接口,这里需要注意的是消费者里是一个函数,函数中的参数是将来我们获取的消息。

代码语言:javascript
代码运行次数:0
复制
// 定义rabbitMQ的接口方法
type IMMessageClient interface {
    // 连接RabbitMQ,并获取连接
    ConnectToRabbitmq(Connection string)
    // 发送消息
    PublishToQueue(msg []byte) error
    // 消费消息
    ConsumeFromQueue(handlerfunc func(d amqp.Delivery)) error
}

定义我们的四种工作模式的结构体,这里需要注意的是,因为对于广播模式和路由模式属性基本相同,用了同一个结构体,Topic主题模式由于路由和绑定的路由可能不同,故单独分离了出来。

代码语言:javascript
代码运行次数:0
复制
type SimpleQueue struct {
    Rout_key      string `json:"rout_key"`      //路由
    Queue         string `json:"queue"`         //队列的名字
    Is_persistent bool   `json:"is_persistent"` //队列是否持久化
}
type ComplexQueue struct {
    ExchangeName  string `json:"exchangeName"`
    Rout_key      string `json:"rout_key"`      //路由
    Queue         string `json:"queue"`         //队列的名字
    Is_persistent bool   `json:"is_persistent"` //队列是否持久化
}

type TopicQueue struct {
    ExchangeName  string `json:"exchangeName"`
    Rout_key      string `json:"rout_key"`      //路由
    Queue         string `json:"queue"`         //队列的名字
    Is_persistent bool   `json:"is_persistent"` //队列是否持久化
    Bind_key      string `json:"bind_key"`      //绑定的路由
}

实现接口的相关方法,这里只是部分思路代码,如果想看封装源码,我已经上传到github上,有需要的可以直接拉取下来,当然也可以提交更好的代码到分支上。

代码语言:javascript
代码运行次数:0
复制
//获取连接
func (m *MsgClient) ConnectToRabbitmq(Connection string) {

    var err error
    m.Conn, err = amqp.Dial(fmt.Sprintf("%s/", Connection))
    if err != nil {
        log.Fatal(err)
    }
}
// 发消息时判断其类型,注意使用json进行反序列化
if m.Type == SimpleQueueType {
            var s SimpleQueue
            json.Unmarshal([]byte(m.Data), &s)
            q, err := ch.QueueDeclare(
                s.Queue,         //name队列的名称
                s.Is_persistent, //durble是否持久化
                false,           //delete when unused是否自动删除
                false,           //exclusive是否设置排他,如果设置为true,则队列仅对首次声明他的连接可见,并在连接断开的时候自动删除
                false,           //no-wait是否阻塞
                nil,             //arguments
            )
            FailOnError(err, "队列申请失败")
            err = ch.Publish(
                "",
                q.Name, // 路由,即队列的名字
                false,  //mandatory
                false,  //immediate
                amqp.Publishing{
                    DeliveryMode: amqp.Persistent, //消息的持久化
                    ContentType:  "text/plain",
                    Body:         msg,
                },
            )
            FailOnError(err, "发送消息失败")
}
//接收消息时需要绑定路由
// 队列绑定
err = ch.QueueBind(
    q.Name,         //队列的名字
    s.Rout_key,     //routing key
    s.ExchangeName, //所绑定的交换器
    false,
    nil,
)

读取我们的消息

代码语言:javascript
代码运行次数:0
复制
func consumeLoop(deliveries <-chan amqp.Delivery, handlerfunc func(d amqp.Delivery)) {

    for d := range deliveries {
        fmt.Println("有数据:", string(d.Body))
        handlerfunc(d)
    }
}

现在我们来测试一下吧

测试需要编写一个消费者收到消息后处理消息

代码语言:javascript
代码运行次数:0
复制
func recive(d amqp.Delivery) {
    fmt.Println(string(d.Body))
    d.Acknowledger.Ack(d.DeliveryTag, true)
}

测试我们的work模式,这里为了持续测试,我们使用一个协程,并用http监听防止我们的程序退出。

代码语言:javascript
代码运行次数:0
复制
simplequeue := client.NewSimpleQueue("user", "Login", true)
body, _ := json.Marshal(simplequeue)
fmt.Println(string(body))
Simple := &client.MsgClient{
    Type: client.SimpleQueueType,
    Data: string(body),
}
body, _ = json.Marshal(Simple)
fmt.Println(string(body))
Simple.ConnectToRabbitmq("amqp://admin:admin@192.168.10.252:5672"
go Simple.ConsumeFromQueue(recive)
http.ListenAndServe("0.0.0.0:8200", nil)

输出进行了公平调度

测试我们的广播模式

代码语言:javascript
代码运行次数:0
复制
broadqueue := client.NewComplexQueue("broadqueue_exchange", "broadqueue_route", "", true)
body, _ := json.Marshal(broadqueue)
Simple := &client.MsgClient{
    Type: client.BroadQueueType,
    Data: string(body),
}
Simple.ConnectToRabbitmq("amqp://admin:admin@192.168.10.252:5672")
go Simple.ConsumeFromQueue(recive)

两个消费者同时收到了消息进行打印

END

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-08-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang技术杂文 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 导语
  • 封装思路
  • 现在我们来测试一下吧
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档