前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >在Golang中使用Kafka实现消息队列

在Golang中使用Kafka实现消息队列

作者头像
Petrochor
发布2022-09-19 14:37:09
7180
发布2022-09-19 14:37:09
举报
文章被收录于专栏:StephenStephen

安装JDK1.8

1、搜索jdk安装包

代码语言:javascript
复制
yum search java|grep jdk

2、下载jdk1.8,下载之后默认的目录为: /usr/lib/jvm/

代码语言:javascript
复制
yum install java-1.8.0-openjdk

下载安装zookeeper

安装zookeeper

kafka依赖zookeeper,所以需要下载安装zookeeper

代码语言:javascript
复制
# 下载压缩包
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
# 解压
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz

修改配置文件

代码语言:javascript
复制
cd apache-zookeeper-3.7.0-bin/conf/
mv zoo_sample.cfg zoo.cfg

启动zookeeper

代码语言:javascript
复制
cd ../bin/
./zkServer.sh start

出现以下信息表示启动成功

代码语言:javascript
复制
[root@localhost apache-zookeeper-3.7.0-bin]# bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /root/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

启动异常

如果出现 already running as process 错误,这个一般是因为机器异常关闭缓存目录中残留PID文件导致的(为关闭进程强行关机等导致的)

解决方案:到配置文件 conf/zoo.cfg 查找 dataDir 配置的目录

代码语言:javascript
复制
dataDir=/tmp/zookeeper

dataDir 目录下,清理缓存文件

代码语言:javascript
复制
cd /tmp/zookeeper
rm -rf zookeeper_server.pid

下载安装kafka

下载并解压

代码语言:javascript
复制
wget https://downloads.apache.org/kafka/3.2.1/kafka_2.13-3.2.1.tgz
tar -zxvf kafka_2.13-3.2.1.tgz

启动kafka

代码语言:javascript
复制
bin/kafka-server-start.sh config/server.properties

创建主题

代码语言:javascript
复制
bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic topic1 --bootstrap-server localhost:9092

发送消息

代码语言:javascript
复制
bin/kafka-console-producer.sh --topic topic1 --bootstrap-server localhost:9092

接收消息

代码语言:javascript
复制
bin/kafka-console-consumer.sh --topic topic1 --from-beginning --bootstrap-server localhost:9092

golang中简单使用kafka

安装golang客户端

代码语言:javascript
复制
go get github.com/Shopify/sarama

使用golang创建消息生产者

代码语言:javascript
复制
package main

import (
    "fmt"
    "log"
    "os"
    "time"

    "github.com/Shopify/sarama"
)

var Address = []string{"192.168.18.128:9092"}

func main() {
    syncProducer(Address)
    // aSyncProducer()
}

//同步消息模式
func syncProducer(address []string) {
    // 配置
    config := sarama.NewConfig()
    // 属性设置
    config.Producer.Return.Successes = true
    config.Producer.Timeout = 5 * time.Second
    // 创建生成者
    p, err := sarama.NewSyncProducer(address, config)
    // 判断错误
    if err != nil {
        log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
        return
    }
    // 最后关闭生产者
    defer p.Close()
    // 主题名称
    topic := "topic1"
    // 消息
    srcValue := "sync: this is a message. index=%d"
    // 循环发消息
    for i := 0; i < 10; i++ {
        // 格式化消息
        value := fmt.Sprintf(srcValue, i)
        // 创建消息
        msg := &sarama.ProducerMessage{
            Topic: topic,
            Value: sarama.ByteEncoder(value),
        }
        // 发送消息
        part, offset, err := p.SendMessage(msg)
        if err != nil {
            log.Printf("send message(%s) err=%s \n", value, err)
        } else {
            fmt.Fprintf(os.Stdout, value+"发送成功,partition=%d, offset=%d \n", part, offset)
        }
        // 每隔两秒发送一个消息
        time.Sleep(2 * time.Second)
    }
}

// 异步消息
func aSyncProducer() {

    config := sarama.NewConfig()
    //等待服务器所有副本都保存成功后的响应
    config.Producer.RequiredAcks = sarama.WaitForAll
    //随机向partition发送消息
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
    config.Producer.Return.Successes = true
    config.Producer.Return.Errors = true
    //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
    //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
    config.Version = sarama.V0_10_0_1

    fmt.Println("start make producer")
    //使用配置,新建一个异步生产者
    producer, e := sarama.NewAsyncProducer([]string{"192.168.18.128:9092"}, config)
    if e != nil {
        fmt.Println(e)
        return
    }
    defer producer.AsyncClose()

    //循环判断哪个通道发送过来数据.
    fmt.Println("start goroutine")
    go func(p sarama.AsyncProducer) {
        for {
            select {
            case <-p.Successes():
                //fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
            case fail := <-p.Errors():
                fmt.Println("err: ", fail.Err)
            }
        }
    }(producer)

    var value string
    for i := 0; ; i++ {
        time.Sleep(500 * time.Millisecond)
        time11 := time.Now()
        value = "this is a message 0606 " + time11.Format("15:04:05")

        // 发送的消息,主题。
        // 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。
        msg := &sarama.ProducerMessage{
            Topic: "topic2",
        }

        //将字符串转化为字节数组
        msg.Value = sarama.ByteEncoder(value)
        //fmt.Println(value)

        //使用通道发送
        producer.Input() <- msg
    }
}

使用golang创建消息消费者

代码语言:javascript
复制
package main

import (
    "fmt"
    "time"

    "github.com/Shopify/sarama"
    cluster "github.com/bsm/sarama-cluster"
)

var (
    kafkaConsumer *cluster.Consumer
    kafkaBrokers  = []string{"192.168.18.128:9092"}
    kafkaTopic    = "topic1"
    groupId       = "test_1"
)

func init() {
    // 配置
    var err error
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = true
    config.Group.Return.Notifications = true
    config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
    config.Consumer.Offsets.Initial = -2
    config.Consumer.Offsets.CommitInterval = 1 * time.Second
    config.Group.Return.Notifications = true
    // 创建消费者
    kafkaConsumer, err = cluster.NewConsumer(kafkaBrokers, groupId, []string{kafkaTopic}, config)
    if err != nil {
        panic(err.Error())
    }
    if kafkaConsumer == nil {
        panic(fmt.Sprintf("consumer is nil. kafka info -> {brokers:%v, topic: %v, group: %v}", kafkaBrokers, kafkaTopic, groupId))
    }
    fmt.Printf("kafka init success, consumer -> %v, topic -> %v, ", kafkaConsumer, kafkaTopic)
}

func main() {
    for {
        select {
        case msg, ok := <-kafkaConsumer.Messages():
            if ok {
                fmt.Printf("kafka 接收到的消息: %s \n", msg.Value)
                kafkaConsumer.MarkOffset(msg, "")
            } else {
                fmt.Printf("kafka 监听服务失败")
            }
        case err, ok := <-kafkaConsumer.Errors():
            if ok {
                fmt.Printf("consumer error: %v", err)
            }
        case ntf, ok := <-kafkaConsumer.Notifications():
            if ok {
                fmt.Printf("consumer notification: %v", ntf)
            }
        }
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-08-04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装JDK1.8
  • 下载安装zookeeper
  • 下载安装kafka
  • golang中简单使用kafka
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档