我对kafka消费者有问题,它有时会抛出异常。
ERROR [*KafkaConsumerWorker] (Thread-125) [] Kafka Consumer thread 235604751 Exception while polling Kafka.: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another membe
我将Spring Kafka侦听器初始化为
@Bean
public Map<String, Object> consumerConfig() {
final HashMap<String, Object> result = new HashMap<>();
result.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
result.put(GROUP_ID_CONFIG, groupId);
result.put(KEY_DESERIALIZER_CLASS_CONFIG,
我使用手动kafka commit,将属性enable.auto.commit设置为false,同时初始化Kafka消费者,并在接收和处理消息后手动调用kafka commit。
然而,因为在我的消费者中处理消息是很耗时的,所以我得到了Exception with message "error": "Broker: Group rebalance in progress"
原因是重新平衡超时后提交被拒绝,并出现此错误。现在,恢复操作是退出并重新实例化进程,这将再次触发重新平衡和分区分配。另一种方法是捕获此异常,然后照常继续,只有在poll()调用被阻塞直到重新平
在创建多个使用者(使用Kafka0.9javaAPI)并启动每个线程之后,我将得到以下异常
Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
class com.messagehub.consumer.Consumer is shutting down.
org.apache.kafka.clients.consumer.CommitFailedExceptio
我使用的是KafkaConsumer 0.10Java api。我想从一个特定的分区和特定的偏移量消费。我抬头一看,发现有一个seek方法,但它抛出了一个异常。有没有人有类似的用例或解决方案?
代码:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);
异常
java.lang.IllegalStateException: No current ass
有一个基本的例子,它对1个消费者来说就像一个护身符。它接收消息。但是添加一个额外的消费者将被忽略。
let kafka = require('kafka-node');
let client = new kafka.Client();
let producer = new kafka.Producer(client);
let consumer1 = new kafka.Consumer(client,[ {topic: 'topic1', partition: 0}]);
let consumer2 = new kafka.Consumer(clien
我在使用kafka的小批次和使用commitAsync时遇到了这个异常
couldn't ack 17 messages
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException
看
我在spring boot应用程序的消费者接收器上使用@SendTo注释,没有参数。收到的消息包含头部"Kafka_replyTopic",但@SendTo无法读取,kafka在发送回复时抛出异常: Listener failed; nested exception is java.lang.IllegalStateException: With no topic header, a defaultTopic is required
我有一个关于管理多个CG的问题,创建了三个消费者组,每个CG都有自己的kafka服务、组Id和主题。
现在我收到了预期的消息,但是,我想知道是否有可能创建下一个场景:
创建三个消费者组,但只接收来自一个用户的消息,暂时暂停/暂停其他用户组,如果他的kafka服务将下降,则使用下一个消费者组的消息,与第三个用户组相同。
下面是我的代码示例:
function createConsumerGroup(topics){
const ConsumerGroup = kafka.ConsumerGroup;
//CREATE CONSUMER GROUPS FOR EVERY SER
我们有卡夫卡消费者(5的并发)与手动黑。使用下面的实现,有时无法完成异常提交,因为组已经重新平衡了.
在异常情况下,消息不被确认并再次被消耗。
关于配置变化的建议对消费者的性能有很大影响吗?
消费工厂
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
/*
* Reading of the variables from yml file
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
M
我从日志中看到,665次使用了完全相同的消息。这一切为什么要发生?
我在日志里也看到了这个
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies
that