前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Go 语言微服务框架 Kratos 集成第三方库 kafka-go 操作消息队列 Kafka

Go 语言微服务框架 Kratos 集成第三方库 kafka-go 操作消息队列 Kafka

作者头像
frank.
发布于 2025-01-06 04:29:38
发布于 2025-01-06 04:29:38
10600
代码可运行
举报
运行总次数:0
代码可运行

大家好,我是 frank。「Golang语言开发栈」公众号作者。

01 、介绍

Go 语言微服务框架 Kratos 不限制使用任何第三方库,Go 语言操作消息队列 Kafka 有很多优秀的第三方库,比如 sarama 和 kafka-go,我们在之前的文章中介绍过 Go 语言怎么使用 sarama 操作消息队列 Kafka。

本文我们介绍 Go 微服务框架 Kratos 怎么集成第三方库 kafka-go[1] 操作消息队列 Kafka。

02 、Kratos 集成第三方库 kafka-go

我们在本地搭建 Go 运行环境,并安装 kratos 工具,使用 kratos 工具创建项目 blog。

在 blog 项目中,集成第三方库 kafka-go。

创建项目

示例代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kratos new blog

安装 kafka-go

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
go get github.com/segmentio/kafka-go

集成 Kafka Producer(生产者)和 Kafka Consumer(消费者)

编写文件 blog/internal/data/data.go

导入第三方库:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import (
 "github.com/segmentio/kafka-go"
)

添加 Kafka Producer(生产者)和 Kafka Consumer(消费者):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Data .
type Data struct {
 // TODO wrapped database client
 dbEngine *xorm.Engine
 kp       *kafkaProducer
 kc       *KafkaConsumer
}

// NewData .
func NewData(c *conf.Data, logger log.Logger, dbEngin *xorm.Engine, kp *kafkaProducer, kc *KafkaConsumer) (*Data, func(), error) {
 cleanup := func() {
  log.NewHelper(logger).Info("closing the data resources")
 }
 return &Data{
  dbEngine: dbEngin,
  kp:       kp,
  kc:       kc,
 }, cleanup, nil
}

Kafka Producer(生产者):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type kafkaProducer struct {
 writer *kafka.Writer
}

func NewKafkaProducer(c *conf.Data) *kafkaProducer {
 brokers := c.Kafka.Brokers
 topic := c.Kafka.Topic
 writer := &kafka.Writer{
  Addr:     kafka.TCP(brokers...),
  Topic:    topic,
  Balancer: &kafka.LeastBytes{},
 }
 return &kafkaProducer{writer: writer}
}

func (p *kafkaProducer) SendMessage(ctx context.Context, key, value []byte) error {
 err := p.writer.WriteMessages(ctx, kafka.Message{
  Key:   key,
  Value: value,
 })
 if err != nil {
  return err
 }
 return nil
}

func (p *kafkaProducer) Close() error {
 return p.writer.Close()
}

Kafka Consumer(消费者):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type KafkaConsumer struct {
 reader *kafka.Reader
}

func NewKafkaConsumer(c *conf.Data) *KafkaConsumer {
 brokers := c.Kafka.Brokers
 topic := c.Kafka.Topic
 groupId := c.Kafka.GroupId
 reader := kafka.NewReader(kafka.ReaderConfig{
  Brokers: brokers,
  Topic:   topic,
  GroupID: groupId,
 })
 return &KafkaConsumer{
  reader: reader,
 }
}

func (c *KafkaConsumer) Start(ctx context.Context) {
 for {
  msg, err := c.reader.ReadMessage(ctx)
  if err != nil {
   return
  }
  log.Debugf("key=%s || value=%s", string(msg.Key), string(msg.Value))
 }
}

func (c *KafkaConsumer) Close() error {
 return c.reader.Close()
}

生产 kafka 消息的方法:

创建文件 blog/internal/data/kafka.go

示例代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func (u *userRepository) KafkaSendMessage(ctx context.Context, key []byte, value []byte) (err error) {
 defer u.data.kp.Close()
 // 设置超时时间
 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
 defer cancel()
 err = u.data.kp.SendMessage(ctx, key, value)
 if err != nil {
  log.Errorf("KafkaSendMessage() || err=%v", err)
  return
 }
 return
}

阅读上面这段代码,我们可以发现 KafkaSendMessage 方法封装了生产 kafka 消息的方法 u.data.kp.SendMessage

需要注意的是,我们需要设置超时时间,否则,会返回错误消息 context deadline exceeded

添加 wire 提供者:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// ProviderSet is data providers.
var ProviderSet = wire.NewSet(NewData, NewGreeterRepo, NewDbEngine, NewUserRepository, NewKafkaProducer, NewKafkaConsumer)

生成 wire 代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
cd blog/cmd/blog
wire

03 、操作 Kafka

在 Kratos 项目中,一般在项目的 bizservice 层使用 Kafka 的生产逻辑;在 service 层使用 Kafka 的消费逻辑。

限于篇幅,我们以 Kafka 的生产逻辑为例,介绍怎么在 biz 层生产 Kafka 消息。

编写文件 blog/internal/biz/user.go,在 CreateUser 方法中添加生产 Kafka 消息的代码。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type UserRepository interface {
 Create(ctx context.Context, user *User) (int64, error)
 KafkaSendMessage(ctx context.Context, key []byte, value []byte) (err error)
}

func (u *UserUsecase) CreateUser(ctx context.Context, user *User) (id int64, err error) {
 id, err = u.userRepo.Create(ctx, user)
 if err != nil {
  return
 }
 if id > 0 {
  var b []byte
  b, err = json.Marshal(user)
  if err != nil {
   return
  }
  err = u.userRepo.KafkaSendMessage(ctx, []byte(user.Name), b)
  if err != nil {
   return
  }
 }
 return
}

阅读上面这段代码,我们可以发现 UserRepository 接口中的方法 KafkaSendMessage,就是我们在 blog/internal/data/kafka.go 文件中实现的方法。

项目运行和测试:

Kratos 运行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kratos run

curl 请求示例:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
curl -H "Content-Type: application/json" -X POST -d '{"name":"mac", "email":"mac@gmail.com", "password":"123456"}' http://192.168.110.209:8000/user/create

kafka 消费者:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
kafka_2.13-3.9.0/bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
{"Id":10,"Name":"mac","Email":"mac@gmail.com","Password":"123456","Created":1735972949,"Updated":1735972949}

04 、总结

本文我们通过示例代码,介绍 Kratos 微服务框架怎么集成第三方库 kafka-go,操作 Kafka。

参考资料

[1]

kafka-go: https://github.com/segmentio/kafka-go

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-01-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Golang语言开发栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Kafka 并发消费单个 partition
kafka可以通过多个partition实现并发,但是针对单个partition,必须顺序提交。假如消息发送顺序为1,2,3,如果先提交3,会导致1,2被提交。所以不能并发执行后立即提交。
Yuyy
2023/05/01
1K0
Go 语言微服务框架 Kratos 开发 HTTP API
本文我们分为开发环境,创建项目,代码目录,HTTP API,四个部分介绍 Kratos 微服务框架。
frank.
2024/12/23
3810
Go 语言微服务框架 Kratos 开发 HTTP API
kafka-go 读取kafka消息丢失数据的问题定位和解决
segmentio/kafka-go 是一款开源的golang kafka读写sdk,开源地址为:https://github.com/segmentio/kafka-go 。截止写文章时,这个开源代码库收获了3.3K的star,在很多公司内外部项目广泛使用。与 https://github.com/confluentinc/confluent-kafka-go 和 https://github.com/Shopify/sarama 一起,作为最常用的三个golang kafka sdk。
谢盼
2021/04/06
7.3K1
kafka-go 读取kafka消息丢失数据的问题定位和解决
砥砺前行 | Kratos 框架 v2 版本架构演进之路
Kratos 是一套轻量级 Go 微服务框架,包含大量微服务相关功能及工具。名字来源于游戏《战神》,该游戏以希腊神话为背景,讲述了奎托斯(Kratos)由凡人成为战神并展开弑神屠杀的冒险历程。
从大数据到人工智能
2022/06/15
1.6K0
砥砺前行 | Kratos 框架 v2 版本架构演进之路
Go 语言微服务框架 Kratos 服务注册与发现
Go 语言微服务框架 Kratos 服务注册与发现,支持多种注册中心,本文我们以 Consul 为例,介绍 Kratos 项目怎么实现服务注册与发现。
frank.
2025/01/20
1550
Go 语言微服务框架 Kratos 服务注册与发现
Golang实现非常好用的第三方库(一)
正如 这里 和 这里所描述的, Go语言原生的map类型并不支持并发读写。concurrent-map提供了一种高性能的解决方案:通过对内部map进行分片,降低锁粒度,从而达到最少的锁等待时间(锁冲突)
KunkkaWu
2023/03/14
9K5
Golang实现非常好用的第三方库(一)
kratos源码分析系列(6)
直接获取当前节点:selector/node/direct/direct.go
golangLeetcode
2023/09/06
5660
kratos源码分析系列(6)
Go 语言微服务框架 Kratos 操作 MySQL 和 Redis
Go 语言微服务框架 Kratos 不限制使用任何第三方库,我们可以根据个人喜好选择第三方库。
frank.
2024/12/30
1540
Go 语言微服务框架 Kratos 操作 MySQL 和 Redis
go: 优雅处理kafka消费退出
在业务中,kafka的消费者服务非常常见。主要流程是从kafka中取出消息,处理消息。
超级大猪
2023/11/08
1.1K0
go使用消息队列优化接口
在我们编写后端接口时,通常有些接口对于实时性的要求并不是那么高,但其中有些函数却相当占用接口调用时间,如调用第三方接口、发送短信、发送邮件等等。为了提升用户的体验感、系统的稳定性,此时我们就可以使用消息队列对于接口进行优化,对于实时性要求不高的接口使用消息队列来进行处理,提高api响应速度,优化用户体验。本文将以go语言使用rabbitMQ来演示如何对于一个接口进行优化。
陈杪秋
2024/07/22
1110
milvus对象存储和消息中间件设计模式分析
对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。
melodyshu
2024/04/23
1730
基于Kafka构建事件溯源模式的微服务
微服务本身并不算什么新概念,它要解决的问题在软件工程历史中早已经有人提出:解耦、扩展性、灵活性,解决“烂架构”膨胀后带来的复杂度问题。
RiboseYim
2018/01/12
1.9K0
基于Kafka构建事件溯源模式的微服务
go微服务框架go-micro深度学习(二) 入门例子
    上一篇帖子简单介绍了go-micro的整体框架结构,这一篇主要写go-micro使用方式的例子,中间会穿插一些go-micro的源码,和调用流程图,帮大家更好的理解go-micro的底层。更详细更具体的调用流程和细节,会在以后的帖子里详细讲解。
lpxxn
2018/10/09
3.2K0
go微服务框架go-micro深度学习(二) 入门例子
Golang超实用第三方库合集
正如 这里 和 这里所描述的, Go语言原生的map类型并不支持并发读写。concurrent-map提供了一种高性能的解决方案:通过对内部map进行分片,降低锁粒度,从而达到最少的锁等待时间(锁冲突)
luckpunk
2025/01/08
3940
Go 语言微服务框架 Kratos 怎么读取配置?
微服务框架 Kratos 提供了一个强大的配置管理模块 - config 组件,它支持加载、解析、动态更新配置。
frank.
2025/01/13
1890
Go 语言微服务框架 Kratos 怎么读取配置?
[Go实战]golang使用mysql实例和第三方库Gendry
Builder用于生成sql语句,手写sql简单直观但是可维护性差,并且硬编码容易出错,如果遇到大where in查询,且in的集合内容又是动态的就很麻烦了。
TOMOCAT
2020/06/10
1.2K0
如何将第三方库与业务代码解耦
日常开发中我们经常会用到各种第三方库,而如何使用别人的代码其实也有一点讲究。如果直接在业务代码中使用第三方库,导致项目对某个第三方库的依赖过重,那一旦因为各种原因需要更换方案的时候,所需要修改的代码量之大可能还不如直接重写了。
Sheepy
2018/09/10
8360
Strimzi Kafka Bridge(桥接)实战之三:自制sdk(golang版本)
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 本篇概览 本文是《Strimzi Kafka Bridge(桥接)实战》的第三篇,前文咱们掌握了Strimzi Kafka Bridge的基本功能:基于http提供各种kafka消息的服务 此刻,如果想通过http接口调用bridge的服务,势必要写不少代码(请求数据的生成、响应数据的解析),好在Strimzi已经提供了标准OpenApi的配置文件,咱们可以
程序员欣宸
2022/12/19
7870
Strimzi Kafka Bridge(桥接)实战之三:自制sdk(golang版本)
使用go-zero微服务框架实现云监控后台(一.后台微服务搭建)
接上一篇文章,按照”终端出厂实现自动化运维方案",https://blog.csdn.net/yyz_1987/article/details/118358038
杨永贞
2022/01/07
1.1K0
使用go-zero微服务框架实现云监控后台(一.后台微服务搭建)
Kratos技术系列|从Kratos设计看Go微服务工程实践
导读 github.com/go-kratos/kratos(以下简称Kratos)是一套轻量级 Go 微服务框架,致力于提供完整的微服务研发体验,整合相关框架及周边工具后,微服务治理相关部分可对整体业务开发周期无感,从而更加聚焦于业务交付。Kratos在设计之初就考虑到了高可扩展性,组件化,工程化,规范化等。对每位开发者而言,整套 Kratos 框架也是不错的学习仓库,可以了解和参考微服务的技术积累和经验。 接下来我们从Protobuf、开放性、规范、依赖注入这4个点了解一下Kratos 在Go
腾讯云中间件团队
2021/07/14
2.8K0
推荐阅读
相关推荐
Kafka 并发消费单个 partition
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验