项目实战
项目介绍
这篇将介绍具体怎么使用Kafka,大前提是由于项目中使用的是SpringBoot,本文将介绍的是Kafka与SpringBoot的整合使用(本文的项目代码将在GitHub开放出来,地址是:https://github.com/zfrHJ/kafka)。环境是:
SpringBoot 2.0 以上。
JDK 8 以上。
编译器是IDEA。
项目相关代码
《Jar包》
本项目采用IDEA快速生成,同时利用Maven,下面是Kafka所需要的Jar包依赖:
org.springframework.boot
spring-boot-starter
org.springframework.kafka
spring-kafka
《yml文件》
本项目将采用yml后缀的配置文件格式,如下:
spring:
application:
name: migrate
kafka:
#服务器地址
bootstrap-servers: 192.168.X.X:9092
producer:
retries: 0
batch-size: 26384
buffer-memory: 53554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test-consumer-group
auto-commit-interval: 1000
auto-offset-reset: latest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 150
《代码》
消费者:
@Component
public class KafkaConsumer {
/**
* 案例
*
* @param record
*/
@KafkaListener(topics = {"testSend"})
public void listen(ConsumerRecord record) {
Optional kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("----------------- record =" + record);
System.out.println("------------------ message =" + message);
}
}
}
生成者:
@Component
public class KafkaSender {
@Resource
private KafkaTemplate kafkaTemplate;
private Gson gson = new GsonBuilder().create();
/**
* 发送消息方法
*/
public void send() {
Mesage message = new Mesage();
message.setId(String.valueOf(System.currentTimeMillis()));
message.setMsg(UUID.randomUUID().toString());
message.setSendTime(new Date());
System.out.println("+++++++++++++++++++++ message = "+ gson.toJson(message));
kafkaTemplate.send("testSend",gson.toJson(message));
System.out.println(("发送成功 message = "+ gson.toJson(message)));
}
}
《启动成功标识》
Kafka优化
补充知识点
1.清理消息命令
在测试阶段中,会产生很多垃圾数据,我们应该怎么清理呢?下面是清理消费的命令:
./kafka-topics.sh --delete --zookeeper 192.168.X.X:2181 --topic testSend
2.更改分区的命令
上面优化提到了怎么提高分区数量,下面是更改分区数量的命令:
./kafka-topics.sh --alter --partitions 3 --zookeeper 192.168.X.X:2181 --topic testSend
领取专属 10元无门槛券
私享最新 技术干货