Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Kafka流:由于在恢复过程中更改日志状态而导致重新平衡失败

Kafka流:由于在恢复过程中更改日志状态而导致重新平衡失败
EN

Stack Overflow用户
提问于 2017-08-29 22:52:07
回答 1查看 2.2K关注 0票数 1

需要一些帮助,找出我收到的一个卡夫卡流消费者的例外。

我用低级别处理器API实现了Kafka流。对于从Kafka接收到的每一次更新,都会将其合并并更新到keystore,从而维护状态。最初,我们只运行了一个消费者,过了一段时间,我们尝试提出第二个消费者。但第二个消费者在再平衡期间抛出了一个例外,称其未能实现再平衡。这是因为更改日志的状态发生了更改(下面的异常共享)。我假设,当重新平衡发生时,第一个使用者收到了一些更新,因此更新被推送到相应的更改日志中。请帮帮忙。还共享相同的示例代码。我使用的是Kafka 2_11 0.10.2.1,主题有72个分区

异常

代码语言:javascript
运行
AI代码解释
复制
    Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:598)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
    Caused by: java.lang.IllegalStateException: task [0_60] Log end offset of Kafka-xxxxxxxxxxxxxxxx-InfoStore-changelog-60 should not change while restoring: old end offset 80638, current offset 80640
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:252)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
    at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:56)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
    at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)

代码片段

代码语言:javascript
运行
AI代码解释
复制
    public class InfoProcessor extends AbstractProcessor<Key, Update> {
private static Logger logger = Logger.getLogger(InfoProcessor.class);
private ProcessorContext context;
private KeyValueStore<Key, Info> infoStore;
private int visitProcessorInstanceId;

@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
    infoStore = (KeyValueStore<Key, Info>) context.getStateStore("InfoStore");
}

@Override
public void process(Key key, Update update) {
    try {
        if (key != null && update != null) {
            Info info = infoStore.get(key);
            // merge logic
            infoStore.put(key, info);
        }

    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    } finally {
    }
    context.commit();
}

@Override
public void punctuate(long timestamp) {
    try {
        KeyValueIterator<Key, Info> iter = this.infoStore.all();
        while (iter.hasNext()) {
            // processing logic

        }
        iter.close();
        context.commit();
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}

}

谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-08-30 13:23:54

你的观察--推理--是正确的。如果由于状态迁移而导致的再平衡需要很长时间而另一次再平衡发生,则可能会发生这种情况:

  1. 第一个实例正在运行。
  2. 第二个实例启动,触发重新平衡的。
    • 第二个实例重新创建状态。

  1. 另一种再平衡发生了(不确定在您的情况下如何触发)
    • 第二个实例仍然在重新创建状态,并且没有重新加入组(因此它退出组)。

  1. 第一个实例得到迁移回来的状态(它仍然有状态的完整副本,因此没有什么可重新创建的-第二个实例还没有开始处理任何事情),并继续写到changelog主题。
  2. 第二个实例以异常作为描述而死亡。

你能证实一下吗?如果是,你需要避免第二次再平衡,只要国家娱乐正在运行。

顺便说一句:这种行为已经在trunk中得到了改进,并将在即将发布的0.11.0.1版本中得到修正。您可以将您的Kafka应用程序更新为0.11.0.1,而无需升级代理。0.11.0.1应该会在接下来的几周内发布。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45953816

复制
相关文章
最新更新 | Kafka - 2.6.0版本发布新特性说明
以下是Kafka 2.6.0版本中解决JIRA问题的摘要,有关该版本的完整文档,入门指南以及关于该项目的信息,请参考Kafka官方文档。
大数据真好玩
2020/08/28
5K0
最新更新 | Kafka - 2.6.0版本发布新特性说明
由MasterProcWals状态日志过多导致的HBase Master重启失败问题
本文主要讲述如何解决由MasterProcWals状态日志过多导致的HBase Master重启失败问题。
Fayson
2018/09/29
7K0
kafka中文文档
之前的版本:0.7.x,0.8.0,0.8.1.X,0.8.2.X,0.9.0.X,0.10.0.X。
gemron的空间
2019/11/04
15.4K0
kafka中文文档
一文读懂Kafka Connect核心概念
Kafka Connect 是一种用于在 Apache Kafka 和其他系统之间可扩展且可靠地流式传输数据的工具。 它使快速定义将大量数据移入和移出 Kafka 的连接器变得简单。 Kafka Connect 可以摄取整个数据库或从所有应用程序服务器收集指标到 Kafka 主题中,使数据可用于低延迟的流处理。 导出作业可以将数据从 Kafka 主题传送到二级存储和查询系统或批处理系统进行离线分析。
从大数据到人工智能
2022/01/15
2K0
一文读懂Kafka Connect核心概念
由于ActionList导致的数据保存失败的问题;「建议收藏」
由于本人喜欢用,用来与一些 buttion按钮绑定。当绑定后,你在双击绑定POST功能的 button按钮写入相关的操作后并且用代码实现POST的功能。因为主要是想用 actionlist 来自动控制按钮是否生效的功能,但是又不想用 actionlist 数据操作的相关功能。因为很多时候,在POST前都要处理一些相关的事件;
全栈程序员站长
2022/11/01
3950
Kafka Streams 核心讲解
•Kafka Stream 提供了一个非常简单而轻量的 Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署•除了 Kafka 外,无任何外部依赖•充分利用 Kafka 分区机制实现水平扩展和顺序性保证•通过可容错的 state store 实现高效的状态操作(如 windowed join 和aggregation)•支持正好一次处理语义•提供记录级的处理能力,从而实现毫秒级的低延迟•支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)•同时提供底层的处理原语 Processor(类似于 Storm 的 spout 和 bolt),以及高层抽象的DSL(类似于 Spark 的 map/group/reduce)
java达人
2021/06/21
2.7K0
Kafka Streams 核心讲解
SAP QM 由于存在未清TO单导致QA11失败
业务人员使用事务代码QA11对于某个物料批次执行使用决策,系统报错:No posting possible due to open transfer orders for storage unit CN4 150382461101454421
SAP虾客
2021/06/17
3820
​kafka概述 01 0.10之后的kafka版本有哪些有意思的feature?【kafka技术图谱 1/50】
# **kafka release reviews: what happen from kafka 0.10 to 2.6*
大数据事务所-大菜菜
2021/09/09
1K0
​kafka概述 01 0.10之后的kafka版本有哪些有意思的feature?【kafka技术图谱 1/50】
图文详解:Kafka到底有哪些秘密让我对它情有独钟呢?
最近很少发文,一是开始总结了一下自己做了两个多月的公号了,都收获了什么,学到了什么。
浅羽技术
2020/12/17
4830
图文详解:Kafka到底有哪些秘密让我对它情有独钟呢?
Structured Streaming | Apache Spark中处理实时数据的声明式API
随着实时数据的日渐普及,企业需要流式计算系统满足可扩展、易用以及易整合进业务系统。Structured Streaming是一个高度抽象的API基于Spark Streaming的经验。Structured Streaming在两点上不同于其他的Streaming API比如Google DataFlow。 第一,不同于要求用户构造物理执行计划的API,Structured Streaming是一个基于静态关系查询(使用SQL或DataFrames表示)的完全自动递增的声明性API。 第二,Structured Streaming旨在支持端到端实时的应用,将流处理与批处理以及交互式分析结合起来。 我们发现,在实践中这种结合通常是关键的挑战。Structured Streaming的性能是Apache Flink的2倍,是Apacha Kafka 的90倍,这源于它使用的是Spark SQL的代码生成引擎。它也提供了丰富的操作特性,如回滚、代码更新、混合流\批处理执行。 我们通过实际数据库上百个生产部署的案例来描述系统的设计和使用,其中最大的每个月处理超过1PB的数据。
王知无-import_bigdata
2020/01/14
2K0
Structured Streaming | Apache Spark中处理实时数据的声明式API
Apache Kafka 3.2.0 重磅发布!
3.2.0 版本包含许多新功能和改进。本文将重点介绍一些最突出的新功能。有关更改的完整列表,请务必查看发行说明。您还可以观看发布视频,了解 Apache Kafka 3.2.0 中的新功能摘要。
大数据真好玩
2022/06/17
2.1K0
MySQL存储过程中包含HINT导致升级失败纪实
作为万里数据库的战略合作伙伴,某运营商一直密切关注着国产数据库的发展。其系统中一套基于MySQL8.0.11版本的核心报表平台,近期由于存在安全扫描的漏洞,需要尽快将其升级到MySQL8.0.25及以上版本。
GreatSQL社区
2023/02/24
1K0
[KDD | 论文简读] 避免由于节点嵌入中的相似性假设而导致的偏差
Avoiding Biases due to Similarity Assumptions in Node Embeddings
智能生信
2022/12/29
6070
[KDD | 论文简读] 避免由于节点嵌入中的相似性假设而导致的偏差
事件驱动的基于微服务的系统的架构注意事项
今天的 IT 系统正在生成、收集和处理比以往更多的数据。而且,他们正在处理高度复杂的流程(正在自动化)以及跨越典型组织边界的系统和设备之间的集成。同时,预计 IT 系统的开发速度更快、成本更低,同时还具有高可用性、可扩展性和弹性。 为了实现这些目标,开发人员正在采用架构风格和编程范式,例如微服务、事件驱动架构、DevOps 等。正在构建新的工具和框架来帮助开发人员实现这些期望。 开发人员正在结合事件驱动架构 (EDA) 和微服务架构风格来构建具有极强可扩展性、可用、容错、并发且易于开发和维护的系统。 在本文
IT大咖说
2022/03/04
1.5K0
Kafka异地双活深度讲解 - Mirrormaker V2
总结:Apache Kafka Mirrormaker V1的解决方案在提供企业管理的灾难恢复方面存在局限性。MM V2(KIP-382)针对MM V1 进行了扩展,并修复了MM V1的局限性,使其能够动态修改配置,并且能够将Topic在群集之间保持同步,同时尽可能地降低触发Rebalance的情况以提高性能。此外,Active-Active群集和Disaster Recover在MM V2中已经属于开箱即用(Out-of-the-box)功能。
Fayson
2019/10/31
9.8K0
Kafka异地双活深度讲解 - Mirrormaker V2
kafka是什么牌子_kafka为什么叫kafka
Apache Kafka 是一款开源的消息系统。可以在系统中起到“肖峰填谷”的作用,也可以用于异构、分布式系统中海量数据的异步化处理。 系统包括四个主要API:
全栈程序员站长
2022/11/04
9810
Kafka详细设计及其生态系统
Kafka生态-Kafka Core,Kafka Streams,Kafka Connect,Kafka REST Proxy和Schema Registry Kafak的核心主要有Broker,Topic,日志,分区和集群。该核心还包括相关的工具,如MirrorMaker。 Kafka生态系统由Kafka Core,Kafka Streams,Kafka Connect,Kafka REST Proxy和Schema Registry组成。Kafka生态系统的大多数附件来自Confluent,而不是Apa
用户1263954
2018/01/30
2.2K0
Kafka详细设计及其生态系统
大数据开发:Kafka日志加载与恢复
之前我们已经对Kafka的日志结构做了基本的讲解,相信大家也都有了一定的了解了。今天我们接着来讲kafka日志管理的部分,Kafka日志加载与恢复。
成都加米谷大数据
2021/06/02
1.2K0
大数据开发:Kafka日志加载与恢复
点击加载更多

相似问题

kafka由于: SSL握手失败而导致身份验证失败

511

Kafka流- Kafka 1.0.0中的重新平衡异常

15

是否可以在重新启动后恢复Kafka流状态存储而不使用changelog主题?

12

基于异常的Kafka流状态恢复

13

由于证书过期而导致SSLError在MacOS上失败--如何恢复?

20
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档