这是先前反应性卡夫卡问题(Issue while sending the Flux of data to the reactive kafka)的后续问题。 我正在尝试使用反应式方法向kafka发送一些日志记录。下面是使用反应式kafka发送消息的反应式代码。 public class LogProducer {
private final KafkaSender<String, String> sender;
public LogProducer(String bootstrapServers) {
Map<String, Objec
我正在尝试将asyncio功能集成到我的kafka主题侦听器中,并且遇到了一些问题(在python中异步编程非常新)。
我创建了一个confluent-kafka consumer,用于收听一个主题。主题经常有消息,性能是最重要的(因此引入了异步io)。
main()函数如下所示:
def main(self):
while True:
try:
msg = consumer.poll(timeout=5.0)
if msg is None:
continue
as
我正在使用入站网关使用SOAP web服务,我们需要将消息放在kafka主题中,并使用spring集成的声明性方式向请求者返回同步确认。这是可能的吗?
public Acknowledgement process(@RequestPayload MessagePayload payload) {
// perform validation & logic
// need to send message to kafka topic using declarative way
// sending synchronous ack to request o
我有OSGI框架,在其中,我在一个包中接受REST调用,在rest调用中接收到的数据被发送给KAFKA brocker。还有另一个包将使用来自brocker的消息。
如果我在REST包之前初始化了KAFKA消费者包,那么REST bundleActivator就不会被调用,因为代码运行在KAFKA消费者代码的while循环中。如果在使用者包之前初始化REST包,则使用者包永远不会启动。
以下是KAFKA Bundle的激活者代码:
public class KafkaConsumerActivator implements BundleActivator {
private stat