Loading [MathJax]/jax/output/CommonHTML/config.js
社区首页 >问答首页 >如何将spark结构化流数据重置为最后一个可用偏移量

如何将spark结构化流数据重置为最后一个可用偏移量
EN

Stack Overflow用户
提问于 2020-04-18 16:21:30
回答 1查看 291关注 0票数 0

我正在使用Kafka运行一个结构化的流应用程序。我发现如果由于某种原因系统停机了几天...检查点变得陈旧,并且在Kafka中找不到与检查点对应的偏移量。我如何让Spark结构化流媒体应用选择最后一个可用的偏移量,并从那里开始。我尝试将偏移量重置设置为较早/最新,但系统崩溃,出现以下错误:

代码语言:javascript
代码运行次数:0
复制
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {MyTopic-574=6559828}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchData(KafkaDataConsumer.scala:470)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchRecord(KafkaDataConsumer.scala:361)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:251)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:234)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:64)
at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:500)
at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.next(KafkaMicroBatchReader.scala:357)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
EN

回答 1

Stack Overflow用户

发布于 2020-04-18 16:28:59

如果系统停机了几天,则意味着某些日志可能已被压缩。更准确地说,您的应用程序尝试从主题MyTopic中的第574个分区读取偏移量6559828

为了找到每个分区最早的可用偏移量,您可以简单地运行以下命令:

代码语言:javascript
代码运行次数:0
复制
bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list localhost:9092 \
    --topic MyTopic \
    --time -2
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61292447

复制
相关文章
double类型转换成int类型
这个是官方源码的构造方法,我们可以看到还可以尝试转换为其他类型,比如转换为short类型。
全栈程序员站长
2022/07/22
2.1K0
double类型转换成int类型
char转换成string java_int类型转换成char类型
原文地址:http://www.only-demo.com/java/20190220/277.html
全栈程序员站长
2022/11/09
8490
java中将Object类型转换成String类型[通俗易懂]
从下图(a图)的String类的valueOf(Object)的源码可以看到,当传入的值为null的时候返回的是“null”字符串,而不是null,所以在这里如果想判断这个string的值不为空的时候,应该用字段串相关判断不为空的方法例如用equals方法。示例如下(b图)
全栈程序员站长
2022/08/30
1.3K0
java中将Object类型转换成String类型[通俗易懂]
把字符串转换成float类型_c++如何将string类型转换成int类型
在学习数据结构经常碰到各种数据间转换的例程,就想c的库里面有没有现有的函数,用的时候直接调用就好,一查果然有:整型转字符串itoa(); 字符串转整型atoi();用的时候需要添加头文件#include<stdlib.h>。
全栈程序员站长
2022/09/27
1.1K0
string类型如何转换成date类型(数据类型自动转换)
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/129263.html原文链接:https://javaforall.cn
全栈程序员站长
2022/07/29
1.1K0
java string类型转换成int类型(string怎么强转int)
需要明确的是String是引用类型,int是基本类型,所以两者的转换并不是基本类型间的转换,这也是该问题提出的意义所在,SUN公司提供了相应的类库供编程人员直接使用。
全栈程序员站长
2022/07/30
1.9K0
如何将int类型转换为char_int类型转换成char类型
近来面试遇到一个问题,通过控制台输入一个12位的数字,然后进行一些计算,然后被困在如何把char类型的数字转换成int类型。通过搜索,找到两个解决办法。
全栈程序员站长
2022/11/09
2.8K0
golang如何创建动态的struct类型以及如何转换成slice类型
最近研究了一下reflect包,感觉这个包的功能很强大,顺便研究了一下如何在函数中动态创建struct{},平常我们都是用如下方式定义struct类型。
公众号-利志分享
2022/04/25
3.5K0
char类型转换成int_char 转int
程序真的是极简啊,用联合类型有时候用来转换数据类型是非常方便实用的,其中的原理就是因为联合类型共用一个内存空间咯,但是要注意它是小端模式存储的,低字节对应低地址。
全栈程序员站长
2022/09/27
3.3K0
char类型转换成int_char 转int
python将字符串类型list转换成list
python读取了一个list是字符串形式的'[11.23,23.34]',想转换成list类型:
机器学习和大数据挖掘
2019/07/01
5.1K0
js中将日期类型转换成字符串型格式
<script language="javascript"> /////////////////////////////////e/////////////////////// // 取得当前日期,格式yyyy-mm-dd //////////////////////////////////////////////////////// function GetCurrentDate() {     var Year=0;     var Month=0;     var Day=0;     var Cur
源哥
2018/08/28
3.3K0
java long转换成string类型_java中Long类型转换为String类型的两种方法及区别[通俗易懂]
包装类型:Byte,Integer,Short,Long,Boolean,Character,Float,Double等。
全栈程序员站长
2022/07/28
1.8K0
C#如何把List of Object转换成List of T具体类型
上周码程序的时候碰到个问题,因为设计上的约束,一个方法接受的参数只能为List<object>类型,然而该方法需要处理的真实数据则是确定的List<Currency>。然而C#不允许显示的直接转换类型,并且两个方向上都不可以操作。这个问题让我爆了一会儿,最后在MSDN上找到了一个OfType<T>的拓展方法可以完成这件事。
跟着阿笨一起玩NET
2018/09/20
3.8K0
以12c Identity类型示范自我探索式学习方法
作者简介: Oracle ACE总监,ACOUG联合创始人,云和恩墨的联合创始人,致力于通过不断的技术探索,帮助中国用户理解和接触新技术,推广数据库技术应用;热切关注Oracle技术和其他相关技术。
数据和云
2018/03/06
1.1K0
以12c Identity类型示范自我探索式学习方法
python 将json类型的字符串转换成字典 使用demo
我们从网页上抓取的很多数据都是json格式,保存下来也就是字符串格式,我们这时候如果使用字符串拼接或者正则表达式在json字符串中寻找信息是比较麻烦的。把json字符串转换成python中的字典,然后再使用字典查找。
用户7886150
2021/01/14
2.5K0
自我学习规划
  从2018年的实习开始算起,工作时间也接近两年,在两年的工作中,由于工作时间较长(基本996),所以一直以来留给自我学习及提升的时间并不多,在相当多零碎的时间里,也只学习了一些零散的东西,对技术的提升并不明显。   日常的工作基于C语言开发,开发内容与业务逻辑耦合性大,时常有种感觉,除了C写得熟练些外,离开这个业务背景,现在的开发工作貌似对今后的发展并无多大裨益。我认为关键问题在于,基础的数据结构与算法缺乏整理,相关能力难以提升,同时对于开发来说,与业务逻辑耦合过大,无相关领域业务涉及,会使得自己的技术之路越走越窄,从长远发展来看,作为一名算法开发人员,这样确实不够合格。   处于现在的窘境,就打算开始系统地整理下现在所涉及到的相关理论知识,提炼并总结经验,同时学习新的技术。为了监督自己,开始撰写博客,一方面是时刻提醒自己,清楚自己的计划,另一方面也是为了把总结的知识与经验都记录下来,防止自己在同一个坑上跌倒多次。   目前粗糙地计划了一些,包括自己之前在做的零碎事情整理,现在工作相关的经验总结,后面要学习的新技术相关,大致记录如下:
全栈程序员站长
2022/09/20
1430
自我学习规划
Eureka自我保护
为了防止EurekaClient可以正常运行,但是 与 EurekaServer网络不通情况下,EurekaServer不会立刻将EurekaClient服务剔除
一个风轻云淡
2022/11/13
3090
Eureka自我保护
EasyNVR硬件盒子自我维护----进入系统进行自我维护
该方案在之前的博文中有过说明,本篇博文将具体介绍一下,0基础如何在拿到盒子的时候实现设备的现场接入和自我维护。
EasyNVR
2020/04/23
6890
EasyNVR硬件盒子自我维护----进入系统进行自我维护
MySQL自我保护参数
上文(MySQL自我保护工具--pt-kill ) 提到用pt-kill工具来kill相关的会话,来达到保护数据库的目的,本文再通过修改数据库参数的方式达到阻断长时间运行的SQL的目的。
俊才
2021/12/13
1.1K0
点击加载更多

相似问题

__weak类型(自我) weakSelf =自我或__weak MyObject *弱自=自我?

36

Scala:非法继承;自我类型Y不符合X的自我类型

10

自我提升数据类型

22

使用,不使用‘自我’类型

36

类型提示子类返回自我

24
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文