在这里,DataStream将键值对作为对象返回,我需要直接使用键值,而不是对象,因为我需要根据键对值进行分组。
DataStream<ObjectNode> stream = env
.addSource(new FlinkKafkaConsumer<>("test5", new JSONKeyValueDeserializationSchema (false), properties));
// stream.keyBy("record1").print();当我给stream.keyby(“record1”).print()时,它显示
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>) cannot be used as key.
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:337)
at ReadFromKafka.main(ReadFromKafka.java:27)发布于 2020-01-24 21:37:37
David Anderson的回答是正确的,作为补充,我可以补充说,您可以简单地创建将密钥提取为String的KeySelector。它可能看起来像这样:
public class JsonKeySelector implements KeySelector<ObjectNode, String> {
@Override
public String getKey(ObjectNode jsonNodes) throws Exception {
return jsonNodes.get("key").asText();
}
}这显然假设密钥应该是String。
发布于 2020-01-24 18:33:41
有几种方法可以在Flink keyBy中指定键选择器。例如,如果您有一个类型为Event的POJO,并且在名为"id“的字段中包含字符串键,则可以使用以下任意一种方法:
stream.keyBy("id")stream.keyBy(event -> event.id)stream.keyBy(
new KeySelector<Event, String>() {
@Override
public String getKey(Event event) throws Exception {
return event.id;
}
}
)只要你能以确定性的方式从对象计算出关键字,你就能做到这一点。
https://stackoverflow.com/questions/59894173
复制相似问题