使用docker-compose
搭建kafka
集群,解析一些参数含义及列出搭建过程的一些坑。
直接上yml文件
version: '3.3'
networks:
kafka:
services:
zookeeper:
image: zookeeper
container_name: zookeeper-kafka
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- /usr/local/etc/kafka/data/zookeeper/data:/data
- /usr/local/etc/kafka/data/zookeeper/datalog:/datalog
networks:
- kafka
restart: unless-stopped
kafka1:
image: wurstmeister/kafka
container_name: kafka1
depends_on:
- zookeeper
ports:
- 9091:9091
environment:
KAFKA_BROKER_ID: 0
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9091
KAFKA_LOG_DIRS: /data/kafka-log
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9091
BOOTSTRAP_SERVERS: kafka1:9091,kafka2:9092,kafka3:9093
volumes:
- /usr/local/etc/kafka/data/kafka1:/kafka
- /var/run/docker.sock:/var/run/docker.sock
networks:
- kafka
restart: unless-stopped
kafka2:
image: wurstmeister/kafka
container_name: kafka2
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_LOG_DIRS: /data/kafka-log
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
BOOTSTRAP_SERVERS: kafka1:9091,kafka2:9092,kafka3:9093
volumes:
- /usr/local/etc/kafka/data/kafka2:/kafka
- /var/run/docker.sock:/var/run/docker.sock
networks:
- kafka
restart: unless-stopped
kafka3:
image: wurstmeister/kafka
container_name: kafka3
depends_on:
- zookeeper
ports:
- 9093:9093
environment:
KAFKA_BROKER_ID: 2
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181/kafka
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
KAFKA_LOG_DIRS: /data/kafka-log
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9093
BOOTSTRAP_SERVERS: kafka1:9091,kafka2:9092,kafka3:9093
volumes:
- /usr/local/etc/kafka/data/kafka3:/kafka
- /var/run/docker.sock:/var/run/docker.sock
networks:
- kafka
restart: unless-stopped
kafka_manager:
image: hlebalbau/kafka-manager:latest
container_name: kafka_manager
ports:
- 9000:9000
environment:
ZK_HOSTS: zookeeper:2181
APPLICATION_SECRET: doper
networks:
- kafka
depends_on:
- zookeeper
- kafka1
- kafka2
- kafka3
这里zookeeper
使用的是zookeeper
官方提供的镜像,与大多数教程中的wurstmeister/zookeeper
是一样的,在使用时可以挂载卷实现数据持久化,具体挂载的目录在官方的README
中有,如下:
kafka
的参数配置在镜像README
中也有说明,简单来说就是先从kafka
官方文档中找到要配置的参数,然后在docker-compose.yml
文件中配置时只需要将配置项的名字全部换成大写,然后.
换成_
,再加上KAFKA_
前缀几个。举个例子,修改broker.id
,则需要在配置文件中设置KAFKA_BROKER_ID=XXX
即可。
KAFKA_BROKER_ID
: 配置broker.id
,每个broker
的id
必须唯一KAFKA_NUM_PARTITIONS
: 设置topic
的分区数KAFKA_DEFAULT_REPLICATION_FACTOR
: topic
的复制系数,也就是消息副本数KAFKA_ZOOKEEPER_CONNECT
: zookeeper
的连接地址,这里注意的是加了/kafka
后缀,是为了方便管理,把集群所有的信息都放入zookeeper
的/kafka
目录下。KAFKA_LISTENERS
: 监听器,指定以什么协议及哪个主机名和端口来访问kafka
服务。这里设置了监听所有网卡,listeners
解决的是kafka
监听来自于哪个网卡的请求。KAFKA_ADVERTISED_LISTENERS
: broker
真正要注册进zookeeper
的监听信息,及broker
对外开放的端口。KAFKA_LOG_DIRS
: 日志位置BOOTSTRAP_SERVERS
: kafka
集群各个服务器的地址信息listeners
和advertised.listeners
的区别
reference
在部署时对内外网暴露端口需要做区分时使用,原文:
listeners: INSIDE://172.17.0.10:9092,OUTSIDE://172.17.0.10:9094 advertised_listeners: INSIDE://172.17.0.10:9092,OUTSIDE://<公网 ip>:端口 kafka_listener_security_protocol_map: "INSIDE:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT" kafka_inter_broker_listener_name: "INSIDE"
advertised_listeners
监听器会注册在zookeeper
中; 当我们对172.17.0.10:9092
请求建立连接,kafka
服务器会通过zookeeper
中注册的监听器,找到INSIDE
监听器,然后通过listeners
中找到对应的 通讯ip
和 端口; 同理,当我们对<公网 ip>:端口
请求建立连接,kafka
服务器会通过zookeeper
中注册的监听器,找到OUTSIDE
监听器,然后通过listeners
中找到对应的 通讯ip
和 端口172.17.0.10:9094
; 总结:advertised_listeners
是对外暴露的服务端口,真正建立连接用的是listeners
。
即broker
会把advertised_listeners
中的信息注册进zookeeper
,而客户端建立连接时会去找zookeeper
中注册的advertised_listeners
,然后根据找到的对应监视器去listeners
中找对应的ip
和端口进行访问。
因此这里可以有内外网分流的应用,具体见reference
volumne
挂载volumes:
- /usr/local/etc/kafka/data/kafka3:/kafka # 挂载kafka使用过程中产生的数据
- /var/run/docker.sock:/var/run/docker.sock # 挂载宿主机的docker.sock
在kafka-docker
中给出的docker-compose.yml
例子中就有挂载docker.sock
了
这里为什么需要挂载docker.sock
?
reference
: https://blog.csdn.net/boling_cavalry/article/details/92846483
首先需要明白docker.sock
的作用
实际上docker
是由client
和server
组成,通过docker --version
可以查看:
⚡ root@doper /home/doper docker version
Client:
Version: 20.10.15
API version: 1.41
Go version: go1.18.1
Git commit: fd82621d35
Built: Thu May 5 23:16:45 2022
OS/Arch: linux/amd64
Context: default
Experimental: true
Server:
Engine:
Version: 20.10.15
API version: 1.41 (minimum version 1.12)
Go version: go1.18.1
Git commit: 4433bf67ba
Built: Thu May 5 23:16:31 2022
OS/Arch: linux/amd64
Experimental: false
containerd:
Version: v1.6.4
GitCommit: 212e8b6fa2f44b9c21b2798135fc6fb7c53efc16.m
runc:
Version: 1.1.2
GitCommit:
docker-init:
Version: 0.19.0
GitCommit: de40ad0
我们发送的docker
命令其实都是客户端发送请求到docker daemon
服务,docker daemon
再返回相关的命令结果即可。
而docker daemon
默认监听的就是/var/run/docker.sock
,daemon
通过这个socket
和其他进程通信,所以客户端只要将消息发送到这个socket
即可实现与daemon
通信。具体可见api
文档
注意,这里和
reference
不同的是因为curl
版本高于7.50
,这个版本的curl
在使用时需要提供的URL
必须包括hostname
. 具体见-> Can cURL send requests to sockets?
eg:
⚡ root@doper /home/doper curl --unix-socket /var/run/docker.sock http://localhost/containers/json
[{"Id":"f2cce65f7b4752396843.... # 一大串的json信息
那么在kafka
容器中就可以通过docker
命令来得到相关的容器信息,在官方github
中的start-kafka.sh
脚本中也确实看到了其使用了docker port
命令
有了socket
,那在容器中有docker
的client
可供使用吗?答案是有的,在Dockerfile
其已经安装了docker
用来管理kafka
集群的,在容器启动后可以通过localhost:9000
访问可视化界面。
注意创建时由于上面docker-compose.yml
中已经设置将所有的kafka
集群信息放在/kafka
目录下,所以这里也要加上/kafka
后缀
但这里有一个坑,就是在创建集群后会遇到如下错误
Yikes! KeeperErrorCode = Unimplemented for /kafka-manager/mutex Try again.
解决方法:
reference
: https://github.com/yahoo/CMAK/issues/731
➜ docker exec -it zookeeper bash
root@98747a9eac65:/zookeeper-3.4.14# ./bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 2] ls /kafka-manager
[configs, deleteClusters, clusters]
[zk: localhost:2181(CONNECTED) 3] create /kafka-manager/mutex ""
Created /kafka-manager/mutex
[zk: localhost:2181(CONNECTED) 5] create /kafka-manager/mutex/locks ""
Created /kafka-manager/mutex/locks
[zk: localhost:2181(CONNECTED) 6] create /kafka-manager/mutex/leases ""
Created /kafka-manager/mutex/leases