首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Vite 从入门到精通,玩转新时代前端构建法则-完整分享

核心代码,注释必读

// download:3w 52xueit com

vue 2.x 响应式

Object.defineProperty

爱学it学无止境

Kafka 目前总共演进了 8 个大版本,分别是 0.7、0.8、0.9、0.10、0.11、1.0 和 2.0、3.0,

建议使用0.10.0.0之后的版本,因为这是里程碑式的大版本,该版本引入了 Kafka Streams。从这个版本起,Kafka 正式升级成分布式流处理平台,虽然此时的 Kafka Streams 还基本不能线上部署使用。0.10 大版本包含两个小版本:0.10.1 和 0.10.2,它们的主要功能变更都是在 Kafka Streams 组件上。自 0.10.2.1 版本起,新版本 Consumer API 算是比较稳定了。据我了解,目前公司现网环境使用的最低的版本也是这个0.10.2.1。

最后强烈建议,不论你用的是哪个版本,都得尽量保持服务器端版本和客户端版本一致,否则你将损失很多 Kafka 为你提供的性能优化收益。而且可能会出现很多莫名其妙的问题,比如kafka进程假死、连接不上broker等问题。

三、kafka生产者分区策略

使用过kafka的小伙伴都应该比较清楚,kafka下真实存储数据的地方是topic(主题)之下的partition(分区),而topic下的每条消息只会保存在某一个partition中,不会在多个分区中被保存多份。之所以topic之下还有partition,主要作用是为了提高kafka负载均衡的能力,提高系统的吞吐性。

标题中提到的分区策略就是决定生产者将消息发送到哪个分区的算法,那么kafka分区都有哪些策略呢?

主要有四个:

1、轮询策略,即按顺序分配,默认分区策略。举个例子,假设一个主题包含3个分区。第一条消息会被发送到分区0,第二条消息会被发送到分区1,第三条消息会被发送到分区2。接着,当生产第4条消息时,分配将重新开始,这条消息会被发送到分区0。以此类推。

2、随机策略,就是随意地将消息放置到任何一个分区,这个本质上和轮询差不多,也是为了将数据打散,使其均匀分布,但是打散效果比轮询差一点,好像新版本的kafka已经废弃了,改为默认是轮询分配了。

3、按key消息建保存策略。Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key可以是某个业务的标识划分比如公司、部门、业务ID等等。只要消息定义了key,那么就可以保证同一个key的所有消息都进入相同的分区里面。如果指定了 Key,那么默认实现按消息键保存策略;如果没有指定 Key,则使用轮询策略。这个方式作用非常强大,当你需要实现消息的顺序消费的时候,就可以指定这个key。

举个实际的使用场景,我这里有一个业务,用户会有两种行为,新增和更改,这两种行为我这边都会生产kafka消息给下游消费,那么这种情况下能使用上面的轮询和随机策略吗?很明显不行,假如新增和更改只隔了很短的一个时间间隔,然后这两条消息被推送到不同的分区,那么就可能出现这样的情况:消费者先消费了更改的数据,然后再消费到新增的数据,这样数据就乱了啊。那这时候,按key分区的策略就派上用场了,我可以将用户ID设置成一个key,那么该用户的数据都会落到同一个分区,且有先后顺序了,这样就不会出问题了。

下面是使用sarama实现的一个demo:

package main

import (

"fmt"

"log"

"strconv"

"github.com/IBM/sarama"

)

func main() {

// 创建生产者配置

config := sarama.NewConfig()

config.Producer.Return.Successes = true

config.Version = sarama.V1_1_1_0 //kafka指定版本号,与broker保持一致

// 创建生产者

producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)

if err != nil {

log.Fatalf("Failed to create Kafka producer: %v", err)

}

defer producer.Close()

for i := 0; i < 100; i++ { //生产100条消息

// 创建消息并指定分区

message := &sarama.ProducerMessage{

Topic: "live-task-reward",

Key:   sarama.StringEncoder("jay"), //指定key,那么该key的100条消息都会落在同一个分区,落在哪个分区根据这个key计算出来

Value: sarama.StringEncoder("Hello, Kafka!" + strconv.Itoa(i)),

}

// 发送消息

partition, offset, err := producer.SendMessage(message)

if err != nil {

log.Fatalf("Failed to send message: %v", err)

}

fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)

}

}

4、指定分区。就是在生产消息的时候可以直接指定分区生产,使消息落入到具体的某个分区中。下面是使用sarama实现的一个demo:

package main

import (

"fmt"

"log"

"strconv"

"github.com/IBM/sarama"

)

func main() {

// 创建生产者配置

config := sarama.NewConfig()

config.Producer.Return.Successes = true

config.Producer.Partitioner = sarama.NewManualPartitioner //如果需要���定分区的时候,这个参数必须设置

config.Version = sarama.V1_1_1_0                          //kafka指定版本号,与broker保持一致

// 创建生产者

producer, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)

if err != nil {

log.Fatalf("Failed to create Kafka producer: %v", err)

}

defer producer.Close()

for i := 0; i < 100; i++ { //生产100条消息

// 创建消息并指定分区

message := &sarama.ProducerMessage{

Topic:     "live-task-reward",

Key:       sarama.StringEncoder("jay"), //即使这里指定了key,但kafka不会去计算该key。因为下面指定了分区1,那么所有数据都会落在分区1

Value:     sarama.StringEncoder("Hello, Kafka!" + strconv.Itoa(i)),

Partition: 1, // 指定分区为 1

}

// 发送消息

partition, offset, err := producer.SendMessage(message)

if err != nil {

log.Fatalf("Failed to send message: %v", err)

}

fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)

}

}

以上,分别讲了kafka的常用类库、版本和生产者分区策略的一些知识点和踩过的一些坑,这也都是用好kafka必须掌握的一些基础知识。好了,下篇我将会继续为大家讲解kafak的剩余知识。回见~

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OF_rzGlx4kspHMU3NwXR5rDg0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券