下面是一个完整的示例,它使用Spring Cloud Stream和Kafka来创建一个简单的消息处理器和发布器:
在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
在application.properties文件中添加以下配置:
propertiesCopy codespring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.configuration.acks=all
spring.cloud.stream.kafka.binder.configuration.retries=3
spring.cloud.stream.kafka.binder.configuration.batch.size=16384
spring.cloud.stream.kafka.binder.configuration.linger.ms=1
spring.cloud.stream.kafka.binder.configuration.buffer.memory=33554432
spring.cloud.stream.kafka.binder.configuration.compression.type=gzip
@EnableBinding(MyProcessor.class)
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
@Autowired
private MyProcessor processor;
@StreamListener(MyProcessor.INPUT)
public void handle(Message<String> message) {
System.out.println("Received message: " + message.getPayload());
}
public interface MyProcessor {
String INPUT = "myInput";
String OUTPUT = "myOutput";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
}
在这个示例中,我们定义了一个名为MyProcessor的声明式接口,其中包含了一个名为myInput的输入通道和一个名为myOutput的输出通道。我们使用@EnableBinding注解告诉Spring Boot应用程序使用MyProcessor接口中定义的输入和输出通道。
然后,我们定义了一个@StreamListener注解的方法handle(),该方法处理从输入通道接收到的消息,并将其打印到控制台。
@Component
public class MyPublisher {
@Autowired
private MyProcessor processor;
public void publish(String message) {
processor.output().send(MessageBuilder.withPayload(message).build());
}
}
在这个示例中,我们创建了一个名为MyPublisher的组件,并在其中注入了MyProcessor接口。我们还定义了一个名为publish()的方法,该方法使用processor.output().send()方法将一个带有有效载荷的消息发送到名为myOutput的输出通道中。
@RestController
public class MyController {
@Autowired
private MyPublisher publisher;
@PostMapping("/publish")
public void publishMessage(@RequestBody String message) {
publisher.publish(message);
}
}
在这个示例中,我们创建了一个名为MyController的REST控制器,并在其中注入了MyPublisher组件。我们还定义了一个名为publishMessage()的POST请求处理程序,该处理程序将消息正文作为输入,并使用MyPublisher组件将其发送到名为myOutput的输出通道中。
现在我们可以启动应用程序并测试它了。我们可以使用任何HTTP客户端向/publish端点发送POST请求,并将消息正文作为输入。
例如,我们可以使用curl命令向端口8080发送一条消息:
curl -X POST -H "Content-Type: text/plain" -d "Hello, Kafka!" http://localhost:8080/publish
应用程序应该在控制台上输出以下内容:
Received message: Hello, Kafka!
这证明消息已成功从myOutput输出通道发送到myInput输入通道,并由handle()方法处理。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有