前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅谈kafka 一

浅谈kafka 一

作者头像
云原生
发布2022-05-16 14:47:44
2890
发布2022-05-16 14:47:44
举报
文章被收录于专栏:云原生实践

Kafka官方给出的定义是:Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. (Apache Kafka 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。)

大数据领域中我们常用kakfa来构建流处理数据管道,与Spark或者Flink对接。

搭建Kafka集群,我们选用的kafka版本是kafka_2.12-2.4.1,Zookeeper版本为3.6.3。

zookeeper配置

代码语言:javascript
复制
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.0=10.60.**.**:2888:3888
server.1=10.60.**.**:2888:3888
server.2=10.60.**.**:2888:3888

默认情况下,Linux系统中没有/tmp/zookeeper/data和/tmp/zookeeper/log这两个目录,所以接下来还要创建这两个目录。

第四步,在${dataDir}目录(也就是/tmp/zookeeper/data)下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号。

启动

代码语言:javascript
复制
./zkServer.sh start

查看zookeeper服务状态

代码语言:javascript
复制
./zkServer.sh status

三个节点成功部署的话,jps后会有一个Leader和两个follower。

Kafka配置

修改 server.properties

代码语言:javascript
复制
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/usr/local/kafka/data
# 配置zk的三个节点
zookeeper.connect=10.60.**.**:2181,10.60.**.**:2181,10.60.**.**:2181

将安装好的kafka复制到另外两台服务器,修改另外两个节点的broker.id分别为1和2

配置KAFKA_HOME环境变量

代码语言:javascript
复制
export KAFKA_HOME=/usr/local/kafka
export PATH=:$PATH:${KAFKA_HOME}

启动服务器

代码语言:javascript
复制
nohup bin/kafka-server-start.sh config/server.properties &

由于集群的服务器可能很多,手动启动比较麻烦,可以写一个一键启动和关闭的shell脚本:

在kafkalist文件中写入需要启动的服务器ip

代码语言:javascript
复制
cat /root/myshell/kafkalist | while read line
do
{
 echo $line
 ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
}&
wait
done

示例demo

我们创建一个topic,用java代码创建一个kafka producer向topic中发送数据。

代码语言:javascript
复制
./bin/kafka-topics.sh --zookeeper 10.60.**.**:2181 --create --topic topic-demo --replication-factor 3 --partitions 4

查看该topic

代码语言:javascript
复制
./bin/kafka-topics.sh --zookeeper 10.60.**.**:2181 --describe --topic topic-demo

Java Producer:

代码语言:javascript
复制
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 *
 * 1. 创建用于连接Kafka的Properties配置
 * 2. 创建一个生产者对象KafkaProducer
 * 3. 调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值
 * 4. 再调用一个Future.get()方法等待响应
 * 5. 关闭生产者
 */
public class KafkaProducerTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "117.50.**.**:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        // 3. 发送1-100的消息到指定的topic中
        for(int i = 0; i < 10000000; ++i) {
            // 一、使用同步等待的方式发送消息
            // // 构建一条消息,直接new ProducerRecord
            // ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
            // Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
            // // 调用Future的get方法等待响应
            // future.get();
            // System.out.println("第" + i + "条消息写入成功!");

            // 二、使用异步回调的方式发送消息
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-demo", null, i + "");
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    // 1. 判断发送消息是否成功
                    if(exception == null) {
                        // 发送成功
                        // 主题
                        String topic = metadata.topic();
                        // 分区id
                        int partition = metadata.partition();
                        // 偏移量
                        long offset = metadata.offset();
                        System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset);
                    }
                    else {
                        // 发送出现错误
                        System.out.println("生产消息出现异常!");
                        // 打印异常消息
                        System.out.println(exception.getMessage());
                        // 打印调用栈
                        System.out.println(exception.getStackTrace());
                    }
                }
            });
        }

        // 4.关闭生产者
        kafkaProducer.close();
    }
}

消费数据

代码语言:javascript
复制
[root@master kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic topic-demo --from-beginning
441665
441666
441667
441668
441669
441670
441671
441672
441673
441674
441675
441676
441677
441678
441679
441680
441681
441682
441683

可以看到数据已经写入。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-04-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 云原生 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档