前言
一直用公司的kafka组件,对kafka原生实现比较模糊,作为一名优秀(渣渣)程序员,必须自己搭建kafka环境调试一把,Let`s go~
环境
:0.11.0.1
: 1.8.0_161
: 1.1.1.RELEASE
: 10.12.6
spring-kafka是什么呢?首先我们要知道,kafka服务端运行起来之后,消息的发送和消费都是从服务端(broker)获取,那么就涉及到业务代码和kafka服务端的连接(client)。kafka提供client来建立连接,但是提供的实现Api较为底层,而spring-kafka项目则是为了方便集成spring项目,而封装了client Api,方便java业务代码接入。
spring-kafka 主页地址: https://spring.io/projects/spring-kafka,其中,需要注意spring-kafka的版本和kafka-client版本的对应关系,官网有对照表,红框为本次实验所使用的版本。
名词解释
消息生产者:Producer,负责生产消息并发送到kafka服务器
消息消费者:Consumer,消息的使用方,负责消费kafka服务器上的消息
主题:topic,由使用方(用户)定义,并配置在kafka服务器,用来建立生产者和消费者之间的关系
消息分区:Partition,一个topic可以有多个分区,同一个topic的消息,生产者发送到kafka服务器后,会根据相应的负载均衡写入到某个partition分区中,写入时按顺序追加写入到尾部
Broker:即kafka服务器,用于存储消息。kafka消息以log方式存储,但是其中并不是字符串,而是标准格式的二进制文件,使用kafka提供的shell工具可以查看内容,使用 也可以直接查看log文件内容
offset:消息存储在kafka的Broker上,消费者拉取消息数据过程中需要知道消息在文件中的偏移量,偏移量即是offset
服务启动
启动步骤:
1 启动zookeeper
kafka的启动是依赖zk的,kafka启动时会首先连接上zk进行节点注册(该节点为临时节点,随着kafka的宕机或下线,节点会被删除),注册成功后,kafka服务器(Broker)会将自己的IP地址和端口信息写入到节点下。
同时同一个topic的消息会被分别存储在多个分区(Partition)并分布到多个Broker上,这些分区信息和Broker的对应关系也是由zk来维护的。
解压下载的kafka,进入到最上层目录,运行 启动单机zk
2 启动kafka
打开另一个终端Tab,运行
再打开一个终端Tab,使用jps查看,出现 和 即可。kafka默认端口是9092,zk默认端口是2181
建立springboot工程
常规springboot工程,版本为 1.5.9 ,工程目录如下:
maven依赖如下:
配置如下,简单整合如下配置就好:
消息实体:
生产者:
其中topic主题不需要提前在kafka上建立,没有该topic,kafka会自动建立
消费者:
其中 注解接收一组topic,可以订阅多个主题
触发controller:
运行工程:在浏览器中输入:
IDEA 输出:
可以看到消息成功发送并消费到了。
消息在哪里
到这里完成kafka消息的生产和消费,那么我们推送到kafka服务器的消息,到底在哪里呢?
上面提到过,kafka会持久化接收到的消息,以log的形式进行写入,其中log以标准二进制存储数据。存储文件的配置路径就在
文件中,其中属性: 指明了消息存储路径,打开该路径,可以看到一个文件夹
前面部分是topic名称,后面数字0表示分区,意为topic为 的消息,分布在分区0的消息存储在该文件夹中,打开文件夹看到: 文件,该文件即是消息存储的文件了。使用
可以看到:
注意:有些可能会遇到提示说 实际是因为没有权限,加上sudo即可
一些报错和解决
1、NoSuchMethodException
原因:应该是spring-kafka的版本用的太高了,用了2.x.x的版本,版本需要spring5的支持,但是springboot使用了1.5.x,集成的是spring4
解决方法:使用springboot版本 2.x.x
2、zk启动以后会有下面的Error信息:
原因:暂时不是很清楚,google了好像是zk的机制,第一次注册上去会提示说没有节点,但紧接着会创建节点(不是很确定)
解决办法:暂时忽略,后面找时间详细研究下,从结果看不影响使用kafka
3、发送消息会失败,大约1分钟后提示超时
原因:可能是没有连上kafka服务器
解决办法:这里只能说是有可能的解决方式。
1、 文件中指明本机地址,不要使用
2、打开kafka的配置文件
图中圈红的一行,放开注释,并且写上本机的ip地址。
小提示:关闭的时候,先关闭kafka,再关闭zk;否则先关掉这zk,kafka会一直报错,导致 失效....
领取专属 10元无门槛券
私享最新 技术干货