需要一些帮助,找出我收到的一个卡夫卡流消费者的例外。
我用低级别处理器API实现了Kafka流。对于从Kafka接收到的每一次更新,都会将其合并并更新到keystore,从而维护状态。最初,我们只运行了一个消费者,过了一段时间,我们尝试提出第二个消费者。但第二个消费者在再平衡期间抛出了一个例外,称其未能实现再平衡。这是因为更改日志的状态发生了更改(下面的异常共享)。我假设,当重新平衡发生时,第一个使用者收到了一些更新,因此更新被推送到相应的更改日志中。请帮帮忙。还共享相同的示例代码。我使用的是Kafka 2_11 0.10.2.1,主题有72个分区
异常
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: java.lang.IllegalStateException: task [0_60] Log end offset of Kafka-xxxxxxxxxxxxxxxx-InfoStore-changelog-60 should not change while restoring: old end offset 80638, current offset 80640
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:252)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:56)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
代码片段
public class InfoProcessor extends AbstractProcessor<Key, Update> {
private static Logger logger = Logger.getLogger(InfoProcessor.class);
private ProcessorContext context;
private KeyValueStore<Key, Info> infoStore;
private int visitProcessorInstanceId;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
infoStore = (KeyValueStore<Key, Info>) context.getStateStore("InfoStore");
}
@Override
public void process(Key key, Update update) {
try {
if (key != null && update != null) {
Info info = infoStore.get(key);
// merge logic
infoStore.put(key, info);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
}
context.commit();
}
@Override
public void punctuate(long timestamp) {
try {
KeyValueIterator<Key, Info> iter = this.infoStore.all();
while (iter.hasNext()) {
// processing logic
}
iter.close();
context.commit();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
谢谢。
发布于 2017-08-30 13:23:54
你的观察--推理--是正确的。如果由于状态迁移而导致的再平衡需要很长时间而另一次再平衡发生,则可能会发生这种情况:
你能证实一下吗?如果是,你需要避免第二次再平衡,只要国家娱乐正在运行。
顺便说一句:这种行为已经在trunk
中得到了改进,并将在即将发布的0.11.0.1
版本中得到修正。您可以将您的Kafka应用程序更新为0.11.0.1
,而无需升级代理。0.11.0.1
应该会在接下来的几周内发布。
https://stackoverflow.com/questions/45953816
复制相似问题