消费者(Java客户端)选择代理0作为组协调器,并开始正确地使用消息。但是,当作为组协调器的代理0关闭时,消费者不做任何事情,并停止使用poll()方法。仅当代理0启动并运行时,该进程才会恢复。如何处理Java客户端中组协调器更改的这种场景?当组协调器死亡时,我得到这个错误:
16/09/22 17:42:45 INFO in
在我的kafka streams应用程序中创建状态存储时,我得到了这个错误Failed to lock the state directory: /tmp/kafka-streams/string-monitor:222)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.jav
(NetworkClient.java:435)at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.<
我们正在将kafka实现升级到.9,并使用新的使用者java来创建使用者。我正在为消费者使用下面的代码,我们使用对消费者设置主题,如A行和B行是对我们的服务的调用,它处理我们接收到的消息。:标记协调器2147483647死亡。(AbstractCoordinator.java:665) at org.apache.kafka.clients.consumer.internals.<
我几乎复制了其中的大部分(除了PageViewEventSource),并将名称重构到我的用例中。我还用示例中使用的键更新了我的application.properties。(AbstractCoordinator.java:622) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordina
我试着用卡夫卡-客户库(0.9.0.1)来测试生产者,消费者.代理(0.9.0.1)正在服务器上运行,我已经进行了测试KafkaProducer,没有问题。,但是当我检查Kafka核心源时,我重新定义了请求类型键'10‘定义为’Group协调员键‘。(ConsumerNetworkClient.java:163) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureC