我有一台DataStreamGenericRecord val consumer = new FlinkKafkaConsumer[String]("input_csv_topic", new SimpleStringSchema(), properties)
val stream = senv.
addSource(consumer).
map(line => {
val arr = line.split(",")
val schemaUrl = "" // avro schema li
我的客户类已经使用maven-avro plugin.When创建了,我尝试运行这个程序,我得到的是Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo错误。
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.example.Customer does not contain a setter for field first_name
[main
我已经编写了一个使用者来使用模式注册表读取Avro的通用记录。
FlinkKafkaConsumer010 kafkaConsumer010 = new FlinkKafkaConsumer010(KAFKA_TOPICS,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);
反序列化类如下所示:
public class KafkaGenericAvroDeserializationSchema implements KeyedDe
我是flink和Kafka的新手。我正在尝试使用Confluent Schema注册表对avro数据进行反序列化。我已经在ec2机器上安装了flink和Kafka。此外,在运行代码之前已经创建了"test“主题。 代码路径:https://gist.github.com/mandar2174/5dc13350b296abf127b92d0697c320f2 作为实现的一部分,代码执行以下操作: 1) Create a flink DataStream object using a list of user element. (User class is avro generated
我真的很难让Flink与运行中的Kafka实例进行正确的通信,使用来自communicate的Avro模式(对于,键和值)。
经过一段时间的思考和重组我的计划,我能够推动我的实现到目前为止:
生产方法
public static FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>> kafkaAvroGenericProducer() {
final Properties properties = new Properties();
properties.put(Pro
我已经编写了一个消费者,它读取卡夫卡主题,并使用StreamSink以拼图格式写入数据。但是我得到了以下错误
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowS
作为PyFlink pipeline.classpath配置的一部分传递的类加载类与将它们放入$FLINK_HOME\lib目录之间有什么区别?
当我想使用flink-sql-connector-kafka-*.jar时,只要使用pipeline.classpath传递它就可以了,但是当我想使用一些具有一些外部依赖项的东西时,比如需要avro-*.jar jars的flink-avro-*.jar。它似乎加载了flink-avro-*.jar,但它似乎无法加载avro-*.jar并抛出:
java.lang.NoClassDefFoundError: Could not initialize
我有一个flink应用程序,它从kafka中读取并将其吸收到kafka。
当我从Intellij运行应用程序时,没有问题,但是当我将ClassCastException提交给flink集群时,它会给出shadowJar。我能在找出我在这里做错了什么方面得到一些帮助吗?
异常跟踪:
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.ka
我将avro序列化数据发布到kafka主题,然后尝试通过SQL接口从主题创建Flink表。我能够创建主题,但在执行SQL SELECT语句后无法查看主题数据。但是,我能够使用简单的kafka用户反序列化和打印已发布的数据。在SQL上获取此错误:
Flink SQL> SELECT * FROM test_flink2;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ArrayIndexOutOfBoundsException: Index -3 out of bounds for length 2
表创建
Flink
我在网上到处查找,发现这是一个很常见的错误,但没有任何解决方案对我有帮助。
我在读卡夫卡的一个主题。到目前为止,我还没有遇到这样的问题,但是现在,当我在aws环境中运行flink集群而不是在我的IDE (intellij)上运行时,我得到了这个错误的。
NoClassDefFoundError: org/xerial/snappy/Snappy
at org.apache.avro.file.SnappyCodec.decompress(SnappyCodec.java:58)
at org.apache.avro.file.DataFileStream$DataBlock.decompres