本文将详细介绍如何在 Spring Boot 项目中整合 Apache Kafka,包括 Kafka 的配置、消息的同步和异步发送。
在开始之前,请确保你已经安装并配置好 Kafka 集群。如果还没有,请参考 Kafka 官方文档进行安装和配置。
访问 Spring Initializr,选择以下配置:
点击 “Generate” 按钮,下载生成的项目,并解压到本地。
在 pom.xml
文件中添加 Kafka 依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
在 src/main/resources
目录下创建 application.yml
文件,并添加以下配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
配置说明:
bootstrap-servers
: Kafka broker 的地址列表。consumer
: 消费者配置,包括消费者组 ID、偏移量重置策略、键和值的反序列化器。producer
: 生产者配置,包括键和值的序列化器。在 src/main/java/com/example/demo
目录下创建 KafkaProducerConfig.java
文件,并添加以下代码:
package com.example.demo;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
在 src/main/java/com/example/demo
目录下创建 KafkaProducerService.java
文件,并添加以下代码:
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String TOPIC = "my-topic";
// 同步发送消息
public void sendMessageSync(String message) {
try {
kafkaTemplate.send(TOPIC, message).get();
System.out.println("同步消息发送成功: " + message);
} catch (Exception e) {
e.printStackTrace();
System.out.println("同步消息发送失败: " + message);
}
}
// 异步发送消息
public void sendMessageAsync(String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("异步消息发送成功: " + message);
}
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
System.out.println("异步消息发送失败: " + message);
}
});
}
}
在 src/main/java/com/example/demo
目录下创建 DemoApplication.java
文件,并添加以下代码:
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {
@Autowired
private KafkaProducerService kafkaProducerService;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
kafkaProducerService.sendMessageSync("Hello, Kafka (Sync)!");
kafkaProducerService.sendMessageAsync("Hello, Kafka (Async)!");
}
}
运行 DemoApplication
类,将看到控制台输出如下消息:
同步消息发送成功: Hello, Kafka (Sync)!
异步消息发送成功: Hello, Kafka (Async)!
如果 Kafka 生产者发送消息失败,将看到错误信息。
本文详细介绍了如何在 Spring Boot 项目中整合 Apache Kafka,包括 Kafka 的配置、消息的同步和异步发送。通过理解和实践这些内容,可以帮助你更好地掌握 Spring Boot 与 Kafka 的整合与应用。希望本文对你有所帮助,如有任何疑问或建议,欢迎留言讨论。