Kafka 特性
高吞吐量、低延迟每个topic可以分多个partition, consumer group 对partition进行consume操作。
可扩展性:kafka集群支持热扩展。
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
高并发:支持数千个客户端同时读写。
KafKa 中文文档
http://kafka.apachecn.org/documentation.html#gettingStarted
Kafka流程
Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,然后进行业务处理。
一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。
正常的topic相当于一个MQ的队列,发布者发送message必须指定topic,然后Kafka会根据接收到的message进行load balance,均匀的分布到topic的不同的partition上,一个消费者组要全部消费这个topic上的所有partition,所以一个消费者组如果多个消费者,那么这里面的消费者是不能消费到全部消息的。
订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。
KafKa 名词概念
Broker
Kafka集群包含一个或多个服务器,一个kafka服务器被称为一个broker。
Topic
Kafka集群的消息主题,这个主题被称为Topic,一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处。
Partition
分区或分组。分组是Kafka提升吞吐量的一个关键设计。这样可以让消费者多线程并行接收消息。创建Topic时可指定Parition数量。一个Topic可以分为多个Partition,也可以只有一个Partition。每一个Partition是一个有序的,不可变的消息序列。每一个消息在各自的Partition中有唯一的ID。这些ID是有序的。称之为offset,offset在不同的Partition中是可以重复的,但是在一个Partition中是不可能重复的。
如果是多个Partition,生产者在把消息放到Topic中时,可以决定放到哪一个Patition。这个可以使用简单的轮训方法,也可以使用一些Hash算法。一个Topic的多个Partition可以分布式部署在不同的Server上,一个Kafka集群。配置项为:num.partitions,默认是1。每一个Partition也可以在Broker上复制多分,用来做容错。
topic与partition分区关系
在 kafka 中, topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 kafka 集群的消息都有一个topic。物理上来说,不同的 topic 的消息是分开存储的,每个 topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息;partition分区是topic的进一步拆分,每个topic可以拆分为多个partition分区,类似于数据库中表的水平拆分,每条消息都会分到某一个分区当中,分区内部会给消息分配一个offset来表示消息的顺序所在。
多个生产者可以向topic发送消息,消息可以按照一定规则均匀分布在各个partition里面,由于各个partition物理上也是隔离存储的,这点就类似于数据库对于表做水平拆分来提高性能。
kafka集群中分区如何分配到broker上
一个topic可以建立多个分区,当然在单机环境下分区都在一个broker上面。在集群环境下,分区是按照什么规则分布到集群中各台broker上面?使用的是最简单的取模算法。
例如:topic:message_topic有 p0,p1,p2,p3四个分区,有三台broker b1,b2,b3。那么分区分配到broker上面的策略就是
b1:p0,p3
b2:p1
b3:p2
简单来说就是topic排序后所在序号对broker 的size取模,结果就是所在broker。
kafka消息如何确定发送到哪个分区
kafka的消息内容包含了key-value键值对,key的作用之一就是确定消息的分区所在。默认情况下, kafka 采用的是 hash 取模的分区算法。如果Key 为 null,则会随机分配一个分区。这个随机是在这个参数”metadata.max.age.ms”的时间范围内随机选择一个。对于这个时间段内,如果 key 为 null,则只会发送到唯一的分区。简单来说就是:hash(key) % partitions.size。
Producer & Consumer
生产者和消费者。一般消息系统都有生产者和消费者的概念。生产者产生消息,即把消息放入Topic中,而消费者则从Topic中获取消息处理。一个生产者可以向多个Topic发送消息,一个消费者也可以同时从几个Topic接收消息。同样的,一个Topic也可以被多个消费者来接收消息。
Consumer Group
顾名思义,定义了一组消费者。一般来说消息中间件都有两种模式:队列模式和发布订阅模式。队列模式及每一个消息都会给其中一个消费者,而发布订阅模式则是每个消息都广播给所有的消费者。Kafka就是使用了Consumer Group来实现了这两种模式。
如果所有的消费者都是同一个Consumer Group的话,就是队列模式,每个消息都会负载均衡的分配到所有的消费者。
如果所有的消息者都在不同的Consumer Group的话,就是发布订阅模式,每个消费者都会得到这个消息。下图是一个Topic,配置了4个Patition,分布在2个Broker上。由于有2个Consumer Group,Group A和Group B都可以得到P0-P3的所有消息,是一个订阅发布模式。两个Group中的Consumer则负载均衡的接收了这个Topic的消息。如果Group中的Consumer的总线程数量超过了Partition的数量,则会出现空闲状态。
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
Zookeeper
Kafka的运行依赖于Zookeeper(kafka集成zookeeper,单机部署kafka可以使用集成的zookeeper,集群部署则新建zookeeper集群)。Topic、Consumer、Patition、Broker等注册信息都存储在ZooKeeper中。
Zookeeper作用
zookeeper是为了解决分布式一致性问题的工具。
kafka 安装后默认集成kafka自带的zk,至于kafka为什么使用zk,首先要知道zk的作用, 作为去中心化的集群模式。需要消费者知道现在那些生产者(对于消费者而言,kafka就是生产者)是可用的。如果没了zk消费者如何知道呢?如果每次消费者在消费之前都去尝试连接生产者测试下是否连接成功,效率呢?所以kafka需要zk,在kafka的设计中就依赖了zk了。安装kafka之前需要先安装zookeeper集群,虽然kafka有自带的zk集群,但是建议还是使用单独的zk集群。
使用Zookeeper具体原因
kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。这里的客户端指的是Kafka的消息生产端(Producer)和消息消费端(Consumer)·
Kafka使用zk的分布式协调服务,将生产者,消费者,消息储存(broker,用于存储信息,消息读写等)结合在一起。
同时借助zk,kafka能够将生产者,消费者和broker在内的所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现生产者的负载均衡。
1. broker 在zk中注册
kafka的每个broker(相当于一个节点,相当于一个机器)在启动时,都会在zk中注册,告诉zk其broker id,在整个的集群中,broker.id/brokers/ids,当节点失效时,zk就会删除该节点,就很方便的监控整个集群broker的变化,及时调整负载均衡。
2. topic 在zk中注册
在kafka中可以定义很多个topic,每个topic又被分为很多个分区。一般情况下,每个分区独立在存在一个broker上,所有的这些topic和broker的对应关系都有zk进行维护
3. consumer(消费者)在zk中注册
Zookeeper作用:管理broker、consumer。
分布式
每个分区在Kafka集群的若干服务中都有副本,这样这些持有副本的服务可以共同处理数据和请求,副本数量是可以配置的。副本使Kafka具备了容错能力。每个分区都由一个服务器作为“leader”,零或若干服务器作为“followers”,leader负责处理消息的读和写,followers则去复制leader.如果leader down了,followers中的一台则会自动成为leader。集群中的每个服务都会同时扮演两个角色:作为它所持有的一部分分区的leader,同时作为其他分区的followers,这样集群就会据有较好的负载均衡。
消息的持久化
Kafka可以通过配置时间和大小来持久化所有的消息,不管是否被消费(消费者收掉)。举例来说,如果消息保留被配置为1天,那么,消息就会在磁盘保留一天的时间,也就是说,一天以内,任意消费这个消息。一天以后,这个消息就会被删除。保留多少时间就取决于业务和磁盘的大小。
Kafka主要有两种方式:时间和大小。在Broker中的配置参数为:
log.retention.bytes:最多保留的文件字节大小。默认-1。
log.retention.hours:最多保留的时间,小时。优先级最低。默认168。
log.retention.minutes:最多保留的时间,分钟。如果为空,则看log.retention.hours。默认null。
log.retention.ms:最多保留的时间,毫秒。如果为空,则看log.retention.minutes。默认null。
KafKa 单机部署
openjdk 安装
yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel
KafKa下载解压
#创建kafka安装目录mkdir -p /data/kafka#创建kafka日志目录mkdir -p /data/kafka/kafkalogs#download kafka软件包wget http://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz #解压kafkatar -xzvf kafka_2.12-2.1.0.tgz -C /data/kafka#Portzookeeper 2181kafka 9092
kafka 配置文件详解
egrep -v "*#|^$" /data/kafka/kafka_2.12-2.5.0/config/server.properties broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样listeners=PLAINTEXT://192.168.99.221:9092num.network.threads=3 #borker进行网络处理的线程数num.io.threads=8 #borker进行I/O处理的线程数socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘socket.request.max.bytes=104857600 #向kafka请求消息或者向kafka发送消息的请求的最大数,这个值不能超过java的堆栈大小log.dirs=/data/kafka/kafkalogs/num.partitions=1 #默认的分区数,一个topic默认1个分区数num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天log.segment.bytes=1073741824 #因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有删除zookeeper.connect=127.0.0.1:2181 #设置zookeeper的连接端口zookeeper.connection.timeout.ms=18000group.initial.rebalance.delay.ms=0
KafKa 默认使用内存为1G,通过修改"kafka-server-start.sh"实现
vim /data/kafka/kafka_2.12-2.5.0/bin/kafka-server-start.shexport KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
服务启动
手动启动 zookeeper kafka
/data/kafka/kafka_2.12-2.5.0/bin/./zookeeper-server-start.sh ../config/zookeeper.properties & #启动kafka自带的zookeeper(若不用自带zk可不执行此句)./kafka-server-start.sh ../config/server.properties &
zookeeper 系统命令启动
#cd /usr/lib/systemd/system# vim zookeeper.service [Unit]Description=zookeeper server daemonAfter=zookeeper.target
[Service]Type=forkingExecStart=/data/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh -daemon /data/kafka/kafka_2.12-2.5.0/config/zookeeper.properties ExecReload=/data/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.sh && sleep 2 && /data/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh -daemon /data/kafka/kafka_2.12-2.5.0/config/zookeeper.propertiesExecStop=/data/kafka/kafka_2.12-2.5.0/bin/zookeeper-server-stop.shRestart=always
[Install]WantedBy=multi-user.target
启动并开机启动
# systemctl start zookeeper # systemctl enable zookeeper
kafka 系统命令启动
#cd /usr/lib/systemd/system#vim kafka.service [Unit]Description=kafka server daemonAfter=kafka.target
[Service]Type=forkingExecStart=/data/kafka/kafka_2.12-2.5.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.12-2.5.0/config/server.propertiesExecReload=/data/kafka/kafka_2.12-2.5.0/bin/kafka-server-stop.sh && sleep 2 && /data/kafka/kafka_2.12-2.5.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.12-2.5.0/config/server.propertiesExecStop=/data/kafka/kafka_2.12-2.5.0/bin/kafka-server-stop.shRestart=always
[Install]WantedBy=multi-user.target
启动并开机启动
# systemctl start kafka# systemctl enable kafka
KafKa 操作命令
#创建topic./bin/kafka-topics.sh --create --zookeeper 192.168.99.221:2181 --replication-factor 1 --partitions 1 --topic message_topic##查看topic 信息./bin/kafka-topics.sh --describe --zookeeper 192.168.99.221:2181 --topic message_topic#生产者发送消息./bin/kafka-console-producer.sh --broker-list 192.168.99.221:9092 --topic message_topic#消费者消费消息() ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.99.221:9092 --topic message_topic --from-beginning#查看topic./bin/kafka-topics.sh --list --zookeeper 192.168.99.221:2181./bin/kafka-topics.sh --list --bootstrap-server 192.168.99.221:9092 #删除topic./bin/kafka-topics.sh --delete --topic message_topic --zookeeper 192.168.99.221:2181#查看每分区consumer_offsets(可以连接到的消费主机) ./bin/kafka-topics.sh --describe --zookeeper 192.168.99.221:2181 --topic __cons-umer_offse ts
KafKa验证测试
创建 topic message_topic
./bin/kafka-topics.sh --create --zookeeper 192.168.99.221:2181 --replication-factor 1 --partitions 1 --topic message_topicWARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.Created topic message_topic.
查看 topic
./bin/kafka-topics.sh --list --zookeeper 192.168.99.221:2181__consumer_offsetsmessage_topic
生产消息
./bin/kafka-console-producer.sh --broker-list 192.168.99.221:9092 --topic message_topic>china
消费消息(克隆一个终端进行数据消费)
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.99.221:9092 --topic message_topic --from-beginningchina
删除 topic message_topic
./bin/kafka-topics.sh --delete --topic message_topic --zookeeper 192.168.99.221:2181Topic message_topic is marked for deletion.Note: This will have no impact if delete.topic.enable is not set to true
KafKa 集群部署
集群主机清单
Hostname | Server IP | Software Version | Role |
---|---|---|---|
kafka-node1 | 192.168.99.233 | kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242" | kafka/zookeeper |
kafka-node2 | 192.168.99.232 | kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242" | kafka/zookeeper |
kafka-node3 | 192.168.99.221 | kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242" | kafka/zookeeper |
系统环境配置
selinux 配置
#配置文件永久关闭selinuxsed -i '/SELINUX/s/enforcing/disabled/' /etc/selinux/config#重启系统systemctl reboot#临时关闭selinuxsetenforce 0#check selinux status getenforce 或 sestatus
防火墙配置
firewall-cmd --add-port=2821/tcp --add-port=9092/tcp --permanentfirewall-cmd --add-port=2888/tcp --add-port=3888/tcp --permanentfirewall-cmd --add-service=ntp --permanentfirewall-cmd --reloadfirewall-cmd --list-all#Centos 7 最小化安装后firewalld配置命令无法补全,由于系统缺少了bash-completion,用yum安装,需要重启系统。yum install bash-completion -y
hosts 配置
vim /etc/hosts192.168.99.232 kafka-node1192.168.99.233 kafka-node2192.168.99.221 kafka-node3
ntp server 配置
#修改系统时区Asia/Shanghaicp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime#查看当前时区timedatectlNTP SERVER 搭建(内网NTP服务器与同时与公网进行同步)#安装ntp服务yum install ntp--------------------------------------------------------[root@kafka-node1 ~]# egrep -v "*#|^$" /etc/ntp.conf driftfile /var/lib/ntp/driftrestrict default nomodify notrap nopeer noqueryrestrict 127.0.0.1 restrict ::1restrict default nomodifyserver ntp1.aliyun.comrestrict ntp1.aliyun.com nomodify notrap noqueryserver 127.127.1.0 fudge 127.127.1.0 stratum 10includefile /etc/ntp/crypto/pwkeys /etc/ntp/keysdisable monitor--------------------------------------------------------#启动ntp服务systemctl restart ntpd && systemctl enable ntpd#查看ntp服务状态netstat -auntlp |grep ntp 或 systemctl status ntpd#查看网络中的NTP服务器,同时显示客户端和每个服务器的关系ntpq –p[root@kafka-node1 zookeeper]# ntpq -p remote refid st t when poll reach delay offset jitter==============================================================================*120.25.115.20 10.137.53.7 2 u 36 128 377 76.219 -1.728 1.000 LOCAL(0) .LOCL. 10 l 1585 64 0 0.000 0.000 0.000#查看时间同步状态ntpstat [root@kafka-node1 zookeeper]# ntpstatsynchronised to NTP server (120.25.115.20) at stratum 3 time correct to within 47 ms polling server every 128 s
ntp client 配置
[root@kafka-node2 ~]# yum install ntpdate[root@kafka-node2 ~]#ntpdate 192.168.99.23219 Apr 09:47:17 ntpdate[16532]: step time server 192.168.99.232 offset -28757.681820 sec[root@kafka-node3 ~]# yum install ntpdate[root@kafka-node3 ~]# ntpdate 192.168.99.23219 Apr 09:47:17 ntpdate[16532]: step time server 192.168.99.232 offset -28757.681820 sec#配置计划任务crontab -l*/10 * * * * /usr/sbin/ntpdate 192.168.99.232; /sbin/hwclock -w#重启计划任务systemctl restart crond#查看计划任务执行状态tail -n 10 /var/spool/mail/root
openjdk 安装
[root@kafka-node1 ~]# yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y[root@kafka-node2 ~]# yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y[root@kafka-node3 ~]# yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y
Zookeeper Config
zookeeper 下载解压
#download zookeeper软件包wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz #创建zookeeper安装目录 mkdir -p /data/zookeeper#解压zookeeper到/data/zookeeper/tar -zvxf apache-zookeeper-3.5.7.tar.gz -C /data/zookeeper/#切换目录路径cd /data/zookeeper/#重命名apache-zookeeper-3.5.7目录mv apache-zookeeper-3.5.7/ zookeeper-3.5.7创建zookeeper保存数据的目录mkdir -p /data/zookeeper/zookeeper-3.5.7/data创建zookeeper日志目录mkdir -p /data/zookeeper/zookeeper-3.5.7/datalog
配置环境变量
#编辑/etc/profilevim /etc/profileexport ZK_HOME=/data/zookeeper/zookeeper-3.5.7export PATH=$PATH:$ZK_HOME/bin#刷新环境变量source /etc/profile
zookeeper 配置文件
#切换zookeeper文件目录cd /data/zookeeper/zookeeper-3.5.7/confcp -f zoo_sample.cfg zoo.cfg#zookeeper配置文件[root@kafka-node1 data]# egrep -v "*#|^$" /data/zookeeper/zookeeper-3.5.7/conf/zoo.cfg tickTime=2000initLimit=10syncLimit=5dataDir=/data/zookeeper/zookeeper-3.5.7/data/datalogDir=/data/zookeeper/zookeeper-3.5.7/datalog/clientPort=2181server.1=192.168.99.232:2888:3888server.2=192.168.99.233:2888:3888server.3=192.168.99.221:2888:3888
zookeeper 端口号
2181 对客户端提供服务的端口2888 zookeeper 集群master和slave之间的通信端口3888 zookeeper 集群leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举。
kafka-node2和kafka-node3 配置如下
kafka-node2#创建zookeeper安装目录 mkdir -p /data/zookeeper #切换zookeepe目录cd /data/zookeeper#拷贝zookeeper文件目录 scp -r root@kafka-node1:/data/zookeeper/zookeeper-3.5.7 ./ --------------------------------------------------------------kafka-node3#创建zookeeper安装目录 mkdir -p /data/zookeeper#切换zookeepe目录 /data/zookeeper#拷贝zookeeper文件目录 scp -r root@kafka-node1:/data/zookeeper/zookeeper-3.5.7 ./
添加各个主机zk集群的竞选ID
kafka-node1echo "1" >/data/zookeeper/zookeeper-3.5.7/data/myidkafka-node2echo "2" >/data/zookeeper/zookeeper-3.5.7/data/myidkafka-node3echo "3" >/data/zookeeper/zookeeper-3.5.7/data/myid
启动停止zookeeper命令
#启动zkServer.sh start #停止 zkServer.sh stop #查看状态zkServer.sh status
zookeeper 系统命令启动
[root@kafka-node1 system]# cat zookeeper.service [Unit]Description=zookeeper server daemonAfter=zookeeper.target
[Service]Type=forkingExecStart=/data/zookeeper/zookeeper-3.5.7/bin/zkServer.sh startExecReload=/data/zookeeper/zookeeper-3.5.7/bin/zkServer.sh stop && sleep 2 && /data/zookeeper/zookeeper-3.5.7/bin/zkServer.sh startExecStop=/data/zookeeper/zookeeper-3.5.7/bin/zkServer.sh stopRestart=always
[Install]WantedBy=multi-user.target
启动并开机启动
# systemctl start zookeeper# systemctl enable zookeeper
KafKa Config
KafKa 下载解压
#创建kafka安装目录mkdir -p /data/kafka#创建kafka日志目录mkdir -p /data/kafka/kafkalogs#download kafka软件包wget http://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz #解压kafkatar -xzvf kafka_2.12-2.1.0.tgz -C /data/kafka
kafka 配置文件
[root@kafka-node1 kafka]# egrep -v "*#|^$" /data/kafka/kafka_2.12-2.5.0/config/server.properties broker.id=1listeners=PLAINTEXT:192.168.99.232:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/data/kafka/kafkalogsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=192.168.99.232:2181,192.168.99.233:2181,192.168.99.221:2181zookeeper.connection.timeout.ms=18000group.initial.rebalance.delay.ms=0
注意:另外两台kafka配置除了broker.id和listeners需要修改其他配置相同
#kafka-node2broker.id=2listeners=PLAINTEXT:192.168.99.233:9092#kafka-node3broker.id=3listeners=PLAINTEXT:192.168.99.221:9092
kafka 系统命令启动
#cd /usr/lib/systemd/system#vim kafka.service [Unit]Description=kafka server daemonAfter=kafka.target
[Service]Type=forkingExecStart=/data/kafka/kafka_2.12-2.5.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.12-2.5.0/config/server.propertiesExecReload=/data/kafka/kafka_2.12-2.5.0/bin/kafka-server-stop.sh && sleep 2 && /data/kafka/kafka_2.12-2.5.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.12-2.5.0/config/server.propertiesExecStop=/data/kafka/kafka_2.12-2.5.0/bin/kafka-server-stop.shRestart=always
[Install]WantedBy=multi-user.target
启动并开机启动
# systemctl start kafka# systemctl enable kafka
KafKa 集群验证测试
创建 topic songhp
[root@kafka-node1 kafka_2.12-2.5.0]# ./bin/kafka-topics.sh --create --zookeeper 192.168.99.232:2181,192.168.99.233:2181,192.168.99.221:2181 --replication-factor 3 --partitions 3 --topic songhpCreated topic songhp.
查看 topic
[root@kafka-node1 kafka_2.12-2.5.0]# ./bin/kafka-topics.sh --list --zookeeper 192.168.99.232:2181,192.168.99.233:2181,192.168.99.221:2181 __consumer_offsetssonghp
#查看描述 topic
[root@kafka-node1 kafka_2.12-2.5.0]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.99.232:2181,192.168.99.233:2181,192.168.99.221:2181 --topic songhpTopic: songhp PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: songhp Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: songhp Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: songhp Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
kafka-node1上生产消息 "china"
[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-console-producer.sh --broker-list 192.168.99.233:9092 --topic songhp>china
kafka-node2 上消费消息
[root@kafka-node2 kafka_2.12-2.5.0]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092 --topic songhp --from-beginningchina
删除 topic songhp
[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-topics.sh --delete --topic songhp --zookeeper 192.168.99.232:2181,192.168.99.233:2181,192.168.99.221:2181
查看 consumer-groups
/bin/kafka-consumer-groups.sh --bootstrap-server 192.168.99.221:9092 --list./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.99.221:9092 --describe --group logstash