参考
https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
https://juejin.cn/post/7210225864355659835
https://thepracticaldeveloper.com/spring-boot-kafka-config/
https://reflectoring.io/spring-boot-kafka/
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
spring:
kafka:
producer:
bootstrap-servers: 127.0.0.1:9092
@Slf4j
@RestController
public class TopicCreateController {
@Autowired
private KafkaProperties properties;
@GetMapping("/create/{topicName}")
public String createTopic(@PathVariable String topicName) {
AdminClient client = AdminClient.create(properties.buildAdminProperties());
if (client != null) {
try {
Collection<NewTopic> newTopics = new ArrayList<>(1);
newTopics.add(new NewTopic(topicName, 1, (short) 1));
client.createTopics(newTopics);
} catch (Throwable e) {
e.printStackTrace();
} finally {
client.close();
}
}
return topicName;
}
@GetMapping("/test")
public String createTopic() {
return "success";
}
}
发送消息后不需要逻辑程序关心是否发送成功。
即sender()方法后再调用get()方法会同步地等待结果返回,根据结果可以判断是否发送成功。
@Slf4j
@RestController
public class ProducerController {
private static final String topic = "hello";
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
/**
* 同步获取通知结果
* @param msg
* @return
*/
@GetMapping("/produce/{msg}")
public String produce(@PathVariable String msg) {
// 发送消息
try {
SendResult result = kafkaTemplate.send(topic, msg).get();
System.out.println(result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("send"+ msg);
return "send: "+ msg;
}
@GetMapping("/produceAsync/{msg}")
public String hello2(@PathVariable String msg) {
// 同步获取结果
ListenableFuture<SendResult<Object,Object>> future = kafkaTemplate.send(topic,msg);
try {
SendResult<Object,Object> result = future.get();
System.out.println("success >>> "+ result.getRecordMetadata().topic() + ",offset"+result.getRecordMetadata().offset()); // success >>> hello2);
}catch (Throwable e){
e.printStackTrace();
}
System.out.println("async send: " +msg);
return "async send: " +msg;
}
// 输入代码内容
@KafkaListener(id = "helloGroup", topics = "hello")
public void hello(String msg) {
System.out.println(msg);
}
private static Properties props = new Properties();
static {
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
public void ProduceMsg(String topic,String msg){
Producer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
ProducerRecord<String,String> record =
new ProducerRecord<String, String>(topic,msg);
//producer.send(record);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
}
});
}
private static Properties props = new Properties();
static {
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id","tpd-loggers");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
final KafkaConsumer<String, String> consumer;
private volatile boolean isRunning = true;
public AutoCommitConsumer(String topicName) {
consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
}
public void printReceiveMsg() {
try {
while (isRunning) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1000));
Thread.sleep(1000);
if (!consumerRecords.isEmpty()) {
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("TopicName: " + consumerRecord.topic() + " Partition:" +
consumerRecord.partition() + " Offset:" + consumerRecord.offset() + "" +
" Msg:" + consumerRecord.value());
}
}
}
}catch (Exception e){
System.out.println(e.getMessage());
}
finally {
close();
}
}
public void close() {
isRunning = false;
if (consumer != null) {
consumer.close();
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有