1)Producer :消息生产者,就是向kafka broker发消息的客户端; 2)Consumer :消息消费者,向kafka broker取消息的客户端; 3)Consumer Group (CG):消费者组,由多个consumer组成。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费;消费者组之间互不影响。
所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。 4)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。 5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic
; 6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列; 7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。 8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。 9)follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的follower。
shell
[root@hadoop102 kafka]$ /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
shell
[root@hadoop102 bin]$ kafka-topics.sh --list --zookeeper hadoop102:2181
shell
[root@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
选项说明:
--topic
定义topic名--replication-factor
定义副本数--partitions
定义分区数
shell
[root@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除。
shell
[root@hadoop102 bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
shell
[root@hadoop103 kafka]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
--from-beginning:会把主题中以往所有的数据都读取出来。
shell
[root@hadoop104 kafka]$ kafka-topics.sh --describe --topic first --zookeeper hadoop102:2181
shell
[root@hadoop104 kafka]$ kafka-topics.sh --alter --topic first --zookeeper hadoop102:2181 --partitions 6
Kafka
中消息是以topic
进行分类的,生产者生产消息,消费者消费消息,都是面向topic
的。topic
是逻辑上的概念,而partition
是物理上的概念,每个partition
对应于一个log
文件,该log
文件中存储的就是producer
生产的数据。Producer
生产的数据会被不断追加到该log
文件末端,且每条数据都有自己的offset
。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset
,以便出错恢复时,从上次的位置继续消费。
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个
partition
分为多个segment
。每个segment
对应两个文件——“.index
”文件和“.log
”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号
。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。
Topic的每个partition收到producer发送的数据后, 都需要向producer发送
ack
(acknowledgement
确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
Leader 维护了一个动态的
in-sync replica set (ISR)
,意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果follower长 时 间 未 向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms
参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。
ack关系数据丢不丢失的问题,ISR关系数据一致性和存储问题
消费者采用
pull(拉)
模式从broker中读取数据。push(推)
模式难以适应消费速率不同的消费者! 如果kafka没有数据,消费者可能陷入到循环中,一直返回空数据。
下图是记录在zookeeper中的数据结构
为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID(客户端给予的),并将
Producer
获得的PID
和Transaction ID
绑定。这样当Producer重启后就可以通过正在进行的TransactionID 获得原来的 PID。 为了管理 Transaction,Kafka 引入了一个新的组件Transaction Coordinator
。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。TransactionCoordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
由于
Consumer
可以通过offset
访问任意信息,而且不同的Segment File
生命周期不同,同一事务的消息可能会出现重启后被删除的情况。