首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将对象节点转换为json节点

将对象节点转换为json节点
EN

Stack Overflow用户
提问于 2020-01-24 18:03:18
回答 2查看 1K关注 0票数 0

在这里,DataStream将键值对作为对象返回,我需要直接使用键值,而不是对象,因为我需要根据键对值进行分组。

代码语言:javascript
运行
复制
DataStream<ObjectNode> stream = env
    .addSource(new FlinkKafkaConsumer<>("test5", new JSONKeyValueDeserializationSchema (false), properties));

// stream.keyBy("record1").print();

当我给stream.keyby(“record1”).print()时,它显示

代码语言:javascript
运行
复制
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>) cannot be used as key.
    at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
    at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:337)
    at ReadFromKafka.main(ReadFromKafka.java:27)
EN

回答 2

Stack Overflow用户

发布于 2020-01-24 21:37:37

David Anderson的回答是正确的,作为补充,我可以补充说,您可以简单地创建将密钥提取为StringKeySelector。它可能看起来像这样:

代码语言:javascript
运行
复制
public class JsonKeySelector implements KeySelector<ObjectNode, String> {
    @Override
    public String getKey(ObjectNode jsonNodes) throws Exception {
        return jsonNodes.get("key").asText();
    }
}

这显然假设密钥应该是String

票数 1
EN

Stack Overflow用户

发布于 2020-01-24 18:33:41

有几种方法可以在Flink keyBy中指定键选择器。例如,如果您有一个类型为Event的POJO,并且在名为"id“的字段中包含字符串键,则可以使用以下任意一种方法:

代码语言:javascript
运行
复制
stream.keyBy("id")
代码语言:javascript
运行
复制
stream.keyBy(event -> event.id)
代码语言:javascript
运行
复制
stream.keyBy(
    new KeySelector<Event, String>() {
        @Override
        public String getKey(Event event) throws Exception {
            return event.id;
        }
    }
)

只要你能以确定性的方式从对象计算出关键字,你就能做到这一点。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59894173

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档