优点: (1)解耦 (2)冗余(备份) (3)扩展性 (4)灵活性、峰值处理能力 (5)可恢复性(冗余) (6)顺序保证(队列) (7)缓冲(冗余) (8)异步通信(宕机)
{kafka-version}.tgz。 Kafka对消息保存时根据Topic(保存)进行分类,发送消息者称为Producer(入口),消息接收者称为Consumer(出口),此外Kafka集群有多个Kafka实例组成,每个实例(Server)称为Broker。一个分区(Partition)维护一个偏移量 (offset)。 Kafka集群和Consumer都依赖zookeeper集群保存一些meta信息。
Leader为主,Follower为备,Kafka中的Follower不处理任何请求。 消费者组的不同消费者不能同时消费同一个分区的数据。
# 在解压的kafka目录下(推荐),创建一个存放日志(也存放数据)的信息
mkdir logs
# 修改server.properties
# 每一个实例的唯一辨识(int)
broker.id=0
# 是否可以删除topic
delete.topic.enable=true
# 设置日志打印的位置为创建的日志目录
log.dirs=/opt/kafka/logs
# 缓存数据的时间为7天、大小为1G
log.retention.hours=168
log.segment.bytes=1073741824
# 修改zk集群
zookeeper.connect=${id}:${port},${id}:${port},${id}:${port}
# 集群分发,修改broker.id
xsync kafka/
# bin/下的命令
kafka-server-start.sh
kafka-server-stop.sh
# 对topic的操作
kafka-topics.sh
# 测试时,控制台的消费
kafka-console-consumer.sh
kafka-console-producer.sh
--topic # 定义topic名
--replication-factor # 定义副本数
--partitions # 定义分区数
--daemon # 后台启动
# kafka依赖zk,先启动zk,并确定zk状态为leader/foller
zkstart.sh
zk/bin/zkServer.sh status
# 使用kafka自带的zk,-daemon表示后台启动
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
# 指定配置文件启动kafka,阻塞进程
bin/kafka-server-start.sh config/server.properties
# 查看进程
jps -l
# 创建分区数为x(kafka/logs下的一个目录),副本数为x(不能超过集群的节点数),名称为x的topic
bin/kafka-topics.sh --create --zookeeper ${id}:${port} --partitions ${number} --replication-factor ${number} --topic ${name}
# 查看topic数量,显示为topic name
bin/kafka-topics.sh --list --zookeeper ${id}:${port}
# 删除topic,能创建同名的即删除成功
bin/kafka-topics.sh --zookeeper ${id}:${port} --delete --topic ${name}
# 发送消息,连接kafka集群(kafka默认9092),写进topic
bin/kafka-console-producer.sh --broker-list ${id}:${port} --topic ${name}
# 消费消息,连接zk集群,读取topic,不加--from-beginning表示只获取最新的
bin/kafka-console-consumer.sh --zookeeper ${id}:${port} --from-beginning --topic ${name}
# 新版本的kafka中consumer的offset存储在本地,提升效率,不交由zk保存,会报警告!bootstrap(附属于)本地kafka集群,名为__consumer_offset的topic中。
bin/kafka-console-consumer.sh --bootstrap-server ${id}:${port} --from-beginning --topic ${name}
# 查看topic的详情,Isr为选举(其中某个与宕机的Leader节点数据最相近,作为新的Leader),ReplicationFactor为副本,值为broker.id
bin/kafka-topics.sh --zookeeper ${id}:${port} --describe-topic ${name}
三大流程:生产、存储、消费
Kafka Cluster:Broker1、Broker2、Broker3 Producer创建Topic,指定分区数和副本数
# 分区的好处:不同分区放在不同节点,实现了负载;消费者组只能消费不同分区数据,提高了并发度(topic的分区数应与一个消费者组中的消费者个数相同)
Broker1:Topic A-Partition 0-Leader、Topic A-Partition 1-Follower
Replication A/0 Replication A/1
Broker2:Topic A-Partition 0-Follower、Topic A-Partition 1-Leader
Consumer Group:Consumer A、Consumer B
Producer采用push模式将信息发布到Broker,每条消息都被append到patition中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐量)。
消息都被发送到一个topic,本质就是一个目录,而topic又由一些partition logs(分区日志,offset都从0开始,有序唯一,并不断追加)组成。
分区的原因:
(1)方便在集群中扩展,每个partition可以通过调整以适应它所在的及其,而每个topic又可以由多个partition组成,因此整个集群就可以适应任意大小的数据;
(2)可以提高并发,同一个消费者组不能读取同一个分区的数据,因此可以以partition为单位读写。
分区的原则:
(1)指定了partition,则直接使用;
(2)未指定partition但指定key,通过对key的value进行hash出一个partition;
(3)partition和key都未指定,使用轮询选出一个partition。
同一个partition可能有多个replication(server.properties配置中default.replication.factor=N),没有replication,一旦broker宕机,其上所有的数据都不可被消费,同时producer也不能将数据存于其上的partition。 引入replication,在需要时在其中选举出一个leader,producer和consumer只与这个leader交互,其他的作为follower从leader中复制数据。
ACK机制:0/1/all,1表示leader,all表示leader和follower均写入信息再继续接收。图为ACK为all的机制,防止数据丢失。
(1)存储方式 物理上把topic分为一个或多个partition,每个partition物理上对应一个文件夹,存储该partition的所有消息和索引文件。 实际存储数据的文件为logs/xxxx.log文件,存在序列化。
(2)存储策略 无论消息是否被消费,kafka都会保留所有的消息,删除方式有两种: 基于时间(log.retention.hours=168)和大小(log.retention.bytes=1073741824)。
kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,删除过期文件与提高kafka性能无关。
(3)zk存储机制
Consumer存储的是偏移量(低版本kafka),Producer不在zk注册,Brokers也存储在zk。
## 3. 消费过程 Kafka提供了两套consumer API:高级Comsumer API和低级Consumer API。
(1)高级API 不能管理offset,书写简单,系统通过zk自行管理; 不能管理分区、副本等,系统自动管理(默认1分钟更新zk中保存的offset )。 可以使用group来区分对同一个topic的不同程序访问分离开俩。
(2)低级API 能够开发者控制offset,随机读取; 书写复杂,需要自行控制offset,连接分区,找到leader等。
(3)消费者组
consumer.properties中group.id=group0
,设置消费者组名。启动消费者时,需要添加命令--consumer.config config/consumer.properties
。
Maven依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
ProducerConfig类下包含所有的配置参数,以及doc参考文档。
1.高级API:带或不带回调函数的生产者
public class Producer {
public static void main(String[] args) {
// 设置配置文件
Properties props = new Properties();
// Kafka集群
props.put("bootstrap.servers", "localhost:9092");
// 应答级别,all可以写出-1
props.put("acks", "all");
// 重试次数
props.put("retries", 0);
// 批量大小和提交延迟,决定发送时刻
props.put("batch.size", 16384);
props.put("linger.ms", 1);
// 缓存
props.put("buffer.memory", 33554432);
// KV序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送数据,第二个参数可以加上回调函数,重写onCompletion(RecordMetadata, exception)函数
producer.send(new ProducerRecord<>("first", String.valueOf(123)), (metadata, exception) -> {
if (exception == null) {
System.out.println(metadata.partition() + "-" + metadata.offset());
} else {
System.out.println("ERROR:" + exception);
}
});
producer.close();
}
}
// 2.低级API:自定义分区的生产者
// 实现Partition类,重写方法partition、close、configure,配置文件需要匹配生产者
props.put("partitioner.class", "${全类名}");
注意:分区中所有偏移数据消费掉,再消费下一个分区,可能会出现消费数据的顺序和生产的顺序不同。
// 1.高级API
public class Consumer {
public static void main(String[] args) {
// 配置信息
Properties props = new Properties();
// kafka集群
props.put("bootstrap.servers", "localhost:9092");
// 消费者组id
props.put("group.id", "test");
// 设置自动提交offset
props.put("enable.auto.commit", "true");
// 提交延迟
props.put("auto.commit.interval.ms", "1000");
// KV的反序列化
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 指定Topic
consumer.subscribe(Arrays.asList("first", "second"));
// 获取数据结束,JVM自动退出
while (true) {
// 获取数据,参数为获取延迟
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
// 打印数据
for (ConsumerRecord<String, String> record : consumerRecords) {
System.out.println(record.topic() + "-" + record.partition() + "-" + record.value());
}
}
}
}
// 2.低级API:读取指定topic、partition(找leader)、offset的数据
/**
* 主要步骤:
* (1*)findLeader(),根据指定的分区从主题元数据中找到主副本;
* (2)getLastOffset(),获取分区最新的消费进度;
* (3*)run(),从主副本拉取分区的消息;
* (4)findNewLeader(),识别主副本的变化,重试。
**/
// 找分区leader(元数据信息)
private BrokerEndPoint findLeader(List<String> brokers, int port, String topic, int partition) {
for (String broker : brokers) {
// 创建获取分区Leader的消费者对象,链接到具体某一个节点
SimpleConsumer getLeader = new SimpleConsumer(broker, port, 1000,
1024*4, "getLeader");
// 创建一个主题元数据的信息请求
TopicMetadataRequest topicMetadataRequest =
new TopicMetadataRequest(Collections.singletonList(topic));
// 获取返回值
TopicMetadataResponse topicMetadataResponse = getLeader.send(topicMetadataRequest);
// 解析元数据返回值
List<TopicMetadata> topicsMetadata = topicMetadataResponse.topicsMetadata();
for (TopicMetadata topicMetadata : topicsMetadata) {
List<PartitionMetadata> partitionsMetadata = topicMetadata.partitionsMetadata();
for (PartitionMetadata partitionMetadata : partitionsMetadata) {
if (partition == partitionMetadata.partitionId()) {
return partitionMetadata.leader();
}
}
}
}
return null;
}
// 获取数据
private void run(List<String> brokers, int port, String topic, int partition, long offset) {
// 获取分区Leader
BrokerEndPoint leader = findLeader(brokers, port, topic, partition);
if (leader == null) {
return;
}
String leaderHost = leader.host();
// 创建获取数据的消费者
SimpleConsumer simpleConsumer = new SimpleConsumer(leaderHost, port, 1000,
1024 * 4, "getData");
// 创建获取数据的对象(可以获取多个数据.addFetch())
FetchRequest fetchRequest =
new FetchRequestBuilder().addFetch(topic, partition, offset, 1024 * 4).build();
// 获取返回值
FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest);
// 解析返回值,创建获取数据的对象时可以多次.addFetch
ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition);
for (MessageAndOffset messageAndOffset : messageAndOffsets) {
// offset可以自行保存
long offset1 = messageAndOffset.offset();
ByteBuffer payload = messageAndOffset.message().payload();
// 自行反序列化
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(offset1 + "-" + new String(bytes));
}
}