我正在尝试向Kubernates中部署的ververica平台提交作业,但我收到了下面的消息,我向Flink standalone提交了相同的代码,它的工作正常!!我使用的是Flink 1.10.1,代码是Scala 2.12。 Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find a suitable table factory for 'org.apache.flink.table.factorie
我有一个在flink流中序列化的参数params, class P extend Serializable {...}
val params = new P(...)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(new MySource(params))
.map(new MyMap(params))
.addSink(new MySink(params))
env.setParallelism(1)
env.execute("My Job") 但是驱动节点
我在一台机器上启动flink (bin/ start -cluster.sh),并通过flink web UI提交作业。如果作业有问题,比如接收器mysql表不存在或keyby字段错误,不仅这个作业失败,我不得不取消失败的任务,但在取消后,任务管理器似乎被“杀死”,它在flink web ui中消失。
有没有容错的解决方案(taskmanager会被失败的作业杀死)?唯一的方法是在纱线上运行flink?
我正在使用flink从Azure数据湖中读取数据。但flink无法找到Azure数据湖文件系统。我已经实现了spark来读取Azure Data Lake文件系统文件。
因此,我尝试从flink中触发火花,从Azure数据湖中读取数据,并执行一些逻辑操作,然后将列表返回给flink。所以flink可以使用这些数据?
是否可以从flink触发spark作业?或者如何配置flink来理解Azure数据湖文件系统。有人能在这方面给我指点一下吗?
使用flink版本1.13.1
我写了一个自定义的度量报告,但在我的flink中似乎不起作用。启动flink时,JobManager显示警告日志,如下所示:
2021-08-25 14:54:06,243 WARN org.apache.flink.runtime.metrics.ReporterSetup [] - The reporter factory (org.apache.flink.metrics.kafka.KafkaReporterFactory) could not be found for reporter kafka. Available f
我正在测试Flink 1.3.0中的弹性特性。我有一个具有检查点启用和固定延迟重启策略的作业.当我杀死一个TaskManager JVM时,一段时间后,作业将在剩余的节点上正确地重新启动。但是,当我添加一个新节点时,作业不会自动重新启动以利用它。
我试着使用bin/flink stop <jobId>,但它总是给我java.lang.IllegalStateException: Job with ID <jobId> is not stoppable.
如何重新启动作业以利用附加节点?
我试图通过提交给Flink中的作业管理器来运行java中的一个基本程序。我有一个来自open CV的本地库。当我尝试提交作业时,我得到"java.lang.UnsatisfiedLinkError: no opencv_java310 in java.library.path",但是当我通过设置flink执行环境在eclipse上运行它时,我得到了正确的结果。我遵循了apache flink支持网站上的一些解决方案:,并相应地修改了我的conf.yaml文件(通过指向env.java.opts:-Djava.library.path="/path of Open CV
使用flink 0.10.1在本地,由于以下错误,我无法与作业管理器连接:
Association with remote system [akka.tcp://flink@127.0.0.1:49789] has failed, address is now gated for [5000] ms. Reason is: [scala.Option; local class incompatible: stream classdesc serialVersionUID = -2062608324514658839, local class serialVersionUID = -114
让我把它说出来,我是一个非常初学者的Flink,并试图抓住尽可能多的概念。
比方说,我有一个flink集群,其中包含10个任务管理器。我每个人都有一个flink作业在运行。作业也使用广播状态。该广播状态是通过每10分钟读取5个S3文件,进行一些处理,并创建播放的int to list of strings地图来创建的。
问题:在哪里读取文件,是否在JobManager读取和处理文件,并将处理过的内容发送给任务管理人员。
或
是任务管理人员负责所有的读取和处理。如果是这种情况,那么flink如何确保如果任务管理器无法从S3读取,那么所有任务管理器的广播状态都是相同的。
编辑
因此,任务管理器读取