前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >go中间件:基于mc协议写kafka

go中间件:基于mc协议写kafka

原创
作者头像
李子健
修改2022-09-17 17:56:36
5820
修改2022-09-17 17:56:36
举报
文章被收录于专栏:每日一善每日一善

概论

有没有一种场景,你需要写kafka,但是你不想使用kafka的包,你想要一种简单的方式写入kafka

整体的结构

整体的结构是这样的

  1. 客户端调用域名发起请求
  2. 通过nginx stream代理到指定服务器
  3. 代理服务器写入kafka

如何写入代理服务器呢?

  1. 需要监听tcp协议
  2. 写入kafka

使用golang去做io密集性的事情真是简单又方便啊

具体实现

  1. 使用cobra 处理命令行模式
代码语言:txt
复制
package cmd

import (

 "ho/pkg/global"

 "ho/pkg/kafka"

 "ho/pkg/memcache"

 "log"

 "os"

 "os/signal"

 "syscall"

 "github.com/spf13/cobra"

)

var mcCmd = &cobra.Command{

 Use:   "mc",

 Short: "运行服务",

 Long:  "运行服务",

 Run:   func(cmd \*cobra.Command, args []string) {},

}

var mcKafkaCmd = &cobra.Command{

 Use:   "kafka",

 Short: "配置文件",

 Long:  "配置文件地址",

 Run: func(cmd \*cobra.Command, args []string) {

 log.Println("mc to kafka")

 log.Println("configFilePath:" + configFilePath + " configFileName:" + configFileName)

 //init config

 global.InitConfig(configFilePath, configFileName)

 //init kafka

 kafka.InitKafka()

 //init server

 mcServer := memcache.GetMemcacheServer()

 mcServer.RegisterFunc("set", memcache.McSendToKafka)

 mcServer.Start()

 //sign

 signChan := make(chan os.Signal, 1)

 signal.Notify(signChan, syscall.SIGINT, syscall.SIGTERM)

 //wait

 for {

 select {

 case <-signChan: //stop sign

 global.LOGGER.Info("end")

 mcServer.Stop()

 kafka.StopAll()

 return

            }

        }

    },

}

var configFilePath string

var configFileName string

func init() {

 mcCmd.AddCommand(mcKafkaCmd)

 mcKafkaCmd.Flags().StringVarP(&configFilePath, "path", "p", "", `配置文件路径`)

 mcKafkaCmd.Flags().StringVarP(&configFileName, "filename", "f", "", `配置文件名称`)

}
  1. 使用rpcxio/gomemcached 包解析tcp协议,解包和压缩包
代码语言:txt
复制
func McSendToKafka(ctx context.Context, req \*mc.Request, res \*mc.Response) error {

 key := req.Key

 value := req.Data

 sendData := string(value)

 global.LOGGER.Info("send-to-kafka", zap.String(key, sendData))

 kafka.SendTokafka(key, sendData)

 res.Response = mc.RespStored

 return nil

}
  1. 使用https://github.com/NetEase-Media/ngo 的kafka库写入kafka

不过这个库我做了改造

  1. 需要支持安全认证
  2. 他的kafka必须有消费者这个是不合理的。
代码语言:txt
复制
func McSendToKafka(ctx context.Context, req \*mc.Request, res \*mc.Response) error {

 key := req.Key

 value := req.Data

 sendData := string(value)

 global.LOGGER.Info("send-to-kafka", zap.String(key, sendData))

 kafka.SendTokafka(key, sendData)

 res.Response = mc.RespStored

 return nil

}





func SendTokafka(key, value string) {

 kafkaProduce := GetProducer(key)

 if kafkaProduce != nil {

 kafkaProduce.Send(key, value, func(meta \*RecordMetadata, err error) {

 if err != nil {

 global.LOGGER.Sugar().Errorf("error:%v", err)

            } else {

 global.LOGGER.Sugar().Info(meta)

            }

        })

    } else {

 global.LOGGER.Sugar().Errorf("topic %s not exist", key)

    }

}

源码地址: https://github.com/beckbikang/ho

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概论
  • 整体的结构
    • 具体实现
    相关产品与服务
    消息队列 CKafka 版
    消息队列 CKafka 版(TDMQ for CKafka)是一个分布式、高吞吐量、高可扩展性的消息系统,100%兼容开源 Kafka API 2.4、2.8、3.2 版本。CKafka 基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。CKafka 具有高可用、数据压缩、同时支持离线和实时数据处理等优点,适用于日志压缩收集、监控数据聚合、流式数据集成等场景。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档