有一个基本的例子,它对1个消费者来说就像一个护身符。它接收消息。但是添加一个额外的消费者将被忽略。
let kafka = require('kafka-node');
let client = new kafka.Client();
let producer = new kafka.Producer(client);
let consumer1 = new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}]);
let consumer2 = new kafka.Consumer(clien
我使用Kafka Rest代理来生成和使用Azure APIM从Kafka主题消费,因此使用Rest代理的原因。我可以使用auto.offset.reset =最早的线性方式使用消息,所以每次调用Kafka Rest都会给我最新的消息,这是很棒的,
在以下情况下:
A1 is data requestor
B2 is API that fulfills A1's request using REST proxy for Kafka
C3 is Kafka Rest proxy server
如果A1向B2请求数据,而B2使用卡夫卡主题C3提供的消息给A1,但由于某些互联网问题,该消息
我有下面的卡夫卡设置
Number of producer : 1
Number of topics : 1
Number of partitions : 2
Number of consumers : 3 (with same group id)
Number of Kafka cluster : none(single Kafka server)
Zookeeper.session.timeout : 1000
Consumer Type : High Level Consumer
生产者生成没有任何特定分区逻辑的消息(默认分区逻辑)
我们有一个HDP集群,有3个kafka经纪人(来自hortonworks )。
我们希望运行kafka控制台使用者,以便从带有特定偏移量的主题中获得一条消息。
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper zoo01:2181 --topic lopet.lo.pm--partition 0 --offset 34537263 --max-messages 1
但我们得到了以下信息:
我们哪里错了?
Using the ConsoleConsumer with old consumer is d
我的python脚本的输入是使用者组的名称,输出应该是使用者组下的主题列表。
我的代码目前没有返回当前偏移量为-1的主题。
有没有更好的方法,可以使用Kafka-python获取消费者组下所有主题的列表。
我实际上是在寻找替换描述一个主题在一个组cmd使用Kafka工具。
from kafka import KafkaAdminClient
topics_in_groups = {};
client = KafkaAdminClient(bootstrap_server='localhost:9092')
for group in client.list_consumer_gr
我是Kafka的新手,正在运行一个简单的kafka消费者/生产者示例,就像和上给出的一样。当我从终端运行consumer时,consumer正在接收消息,但我无法使用Java代码进行侦听。我也在StackoverFlow (链接:,)上搜索了类似的问题,并尝试了这些解决方案,但似乎没有什么对我有效。Kafka版本: pom使用kafka_2.10-0.10.2.1和对应的maven依赖。
面向生产者和消费者的Java代码:
public class SimpleProducer {
public static void main(String[] args) throws Interrupte
我正在使用Spring和Kafka处理一个小批处理,该批处理从Kafka主题读取json数据,将其转换为sends对象,更改值并将其发送回Kafka主题。一切都很好,但我唯一的问题是,我的消费者总是在阅读这个话题的乞求。我需要它从最后一条未消耗的信息中读出来。我已经添加了这些属性:
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
ConsumerConfig.GROUP_ID_CONFIG to a random value
但这似乎不起作
我有一个Ingres DB with History表,它记录数据库事件,如插入、更新和删除。我有一个生产者,这将是多线程。这个生产者将读取History表,以找到要选择的表和行,然后将该行添加到Kafka主题中。现在生产者需要确保将事件添加到Kafka主题中,其方式与History表登录的方式相同。因此,使用者读取它们的顺序与它在History表中记录的顺序相同,并在Postgrace上执行它。
我可以将这些数据生成多个生产者。示例
Producer1 has message 1 to 5
producer2 has message 6 to 10
producer3 has message
在启动应用程序时,在Kafka流上出现了一些奇怪的错误。
java.lang.IllegalArgumentException: Illegal base64 character 7b
at java.base/java.util.Base64$Decoder.decode0(Base64.java:743)
at java.base/java.util.Base64$Decoder.decode(Base64.java:535)
at java.base/java.util.Base64$Decoder.decode(Base64.java:558)
at o