Kafka官方给出的定义是:Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. (Apache Kafka 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。)
大数据领域中我们常用kakfa来构建流处理数据管道,与Spark或者Flink对接。
搭建Kafka集群,我们选用的kafka版本是kafka_2.12-2.4.1,Zookeeper版本为3.6.3。
zookeeper配置
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.0=10.60.**.**:2888:3888
server.1=10.60.**.**:2888:3888
server.2=10.60.**.**:2888:3888
默认情况下,Linux系统中没有/tmp/zookeeper/data和/tmp/zookeeper/log这两个目录,所以接下来还要创建这两个目录。
第四步,在${dataDir}目录(也就是/tmp/zookeeper/data)下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号。
启动
./zkServer.sh start
查看zookeeper服务状态
./zkServer.sh status
三个节点成功部署的话,jps后会有一个Leader和两个follower。
Kafka配置
修改 server.properties
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/usr/local/kafka/data
# 配置zk的三个节点
zookeeper.connect=10.60.**.**:2181,10.60.**.**:2181,10.60.**.**:2181
将安装好的kafka复制到另外两台服务器,修改另外两个节点的broker.id分别为1和2
配置KAFKA_HOME环境变量
export KAFKA_HOME=/usr/local/kafka
export PATH=:$PATH:${KAFKA_HOME}
启动服务器
nohup bin/kafka-server-start.sh config/server.properties &
由于集群的服务器可能很多,手动启动比较麻烦,可以写一个一键启动和关闭的shell脚本:
在kafkalist文件中写入需要启动的服务器ip
cat /root/myshell/kafkalist | while read line
do
{
echo $line
ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
}&
wait
done
示例demo
我们创建一个topic,用java代码创建一个kafka producer向topic中发送数据。
./bin/kafka-topics.sh --zookeeper 10.60.**.**:2181 --create --topic topic-demo --replication-factor 3 --partitions 4
查看该topic
./bin/kafka-topics.sh --zookeeper 10.60.**.**:2181 --describe --topic topic-demo
Java Producer:
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
*
* 1. 创建用于连接Kafka的Properties配置
* 2. 创建一个生产者对象KafkaProducer
* 3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
* 4. 再调用一个Future.get()方法等待响应
* 5. 关闭生产者
*/
public class KafkaProducerTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. 创建用于连接Kafka的Properties配置
Properties props = new Properties();
props.put("bootstrap.servers", "117.50.**.**:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
// 3. 发送1-100的消息到指定的topic中
for(int i = 0; i < 10000000; ++i) {
// 一、使用同步等待的方式发送消息
// // 构建一条消息,直接new ProducerRecord
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
// Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
// // 调用Future的get方法等待响应
// future.get();
// System.out.println("第" + i + "条消息写入成功!");
// 二、使用异步回调的方式发送消息
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-demo", null, i + "");
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
// 1. 判断发送消息是否成功
if(exception == null) {
// 发送成功
// 主题
String topic = metadata.topic();
// 分区id
int partition = metadata.partition();
// 偏移量
long offset = metadata.offset();
System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset);
}
else {
// 发送出现错误
System.out.println("生产消息出现异常!");
// 打印异常消息
System.out.println(exception.getMessage());
// 打印调用栈
System.out.println(exception.getStackTrace());
}
}
});
}
// 4.关闭生产者
kafkaProducer.close();
}
}
消费数据
[root@master kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic topic-demo --from-beginning
441665
441666
441667
441668
441669
441670
441671
441672
441673
441674
441675
441676
441677
441678
441679
441680
441681
441682
441683
可以看到数据已经写入。