Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >KeyedProcessFunction实现抛出空指针异常?

KeyedProcessFunction实现抛出空指针异常?
EN

Stack Overflow用户
提问于 2019-07-28 21:14:19
回答 1查看 822关注 0票数 1

第一个示例:来自"https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html“的代码

我正在尝试重写KeyedProcessFunction类的processElement()。ProcessElement有3个参数,其中一个参数是context对象。当我试图从上下文对象中检索时间戳时,它抛出空指针异常。

在第一个示例代码中抛出空指针异常的行是

current.lastModified = ctx.timestamp();

第二个示例:“使用Apache Flink进行流处理”一书中的示例6.5。

我在一个扩展KeyedProcessFunction类的类中声明了两个ValueState变量。当我试图检索在状态中更新的最后一个值时,它返回一个空值。

在第一个示例代码中抛出空指针异常的行是

Double prevTemp = lastTemp.value();if(prevTemp==0.0 || r.temperature < prevTemp) {}

第一个示例代码

代码语言:javascript
运行
AI代码解释
复制
public class KeyedProcessFunctionExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=
                StreamExecutionEnvironment.getExecutionEnvironment();

        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Tuple2<String, String>> stream =
                environment.socketTextStream("localhost",9090)
                        .map(new MapFunction<String, Tuple2<String, String>>() {
                            @Override
                            public Tuple2<String, String> map(String s) throws Exception {
                                String[] words= s.split(",");

                                return new Tuple2<>(words[0],words[1]);
                            }
                        });

        DataStream<Tuple2<String, Long>> result = stream
                .keyBy(0)
                .process(new CountWithTimeoutFunction());

        result.print();

        environment.execute("Keyed Process Function Example");

    }
    public static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {
        private ValueState<CountWithTimestamp> state;

        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
        }

        @Override
        public void processElement(
                Tuple2<String, String> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // retrieve the current count
            CountWithTimestamp current = state.value();
            if (current == null) {
                current = new CountWithTimestamp();
                current.key = value.f0;
            }

            // update the state's count
            current.count++;

            // set the state's timestamp to the record's assigned event time timestamp
            current.lastModified = ctx.timestamp();

            // write the state back
            state.update(current);

            // schedule the next timer 60 seconds from the current event time
            ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
        }

        @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // get the state for the key that scheduled the timer
            CountWithTimestamp result = state.value();

            // check if this is an outdated timer or the latest timer
            if (timestamp == result.lastModified + 60000) {
                // emit the state on timeout
                out.collect(new Tuple2<String, Long>(result.key, result.count));
            }
        }
    }
}

class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

第二个例子

代码语言:javascript
运行
AI代码解释
复制
public class KeyedProcessFunctionTimerExample {
    public static void main(String[] args) throws Exception{
        // set up the streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // use event time for the application
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        DataStream<String> sensorData=
                env.addSource(new SensorSource())
                .keyBy(r -> r.id)
                .process(new TempIncreaseAlertFunction());

        sensorData.print();
        env.execute("Keyed Process Function execution");
    }

    public static class TempIncreaseAlertFunction extends KeyedProcessFunction<String, SensorReading, String> {

        private ValueState<Double> lastTemp;
        private ValueState<Long> currentTimer;

        @Override
        public void open(Configuration parameters) throws Exception {
            lastTemp = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastTemp", Types.DOUBLE));
            currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", org.apache.flink.api.common.typeinfo.Types.LONG));
        }

        @Override
        public void processElement(
                SensorReading r,
                Context ctx,
                Collector<String> out) throws Exception {

            // get previous Temp
            Double prevTemp = lastTemp.value();

            // update last temp
            lastTemp.update(r.temperature);

            Long curTimerTimestamp = currentTimer.value();

            if(prevTemp==0.0 || r.temperature < prevTemp) {
                ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);
                currentTimer.clear();
            }
            else if(r.temperature > prevTemp && curTimerTimestamp == 0) {
                Long timerTs = ctx.timerService().currentProcessingTime() + 1000;
                ctx.timerService().registerProcessingTimeTimer(timerTs);
                currentTimer.update(timerTs);

            }
        }

        @Override
        public void onTimer(
                long ts,
                OnTimerContext ctx,
                Collector<String> out) throws Exception {

            out.collect("Temperature of sensor ' " + ctx.getCurrentKey() + " ' monotonically increased for 1 second.");
            currentTimer.clear();

        }
    }

}

它不应该抛出空指针异常。您的帮助我们将不胜感激。谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-07-29 09:05:39

在Flink中使用事件时间时,您必须安排事件具有时间戳,并使流具有水印。您可以通过实现时间戳提取器和水印生成器来实现这一点,如herehere所述。

另请参见the tutorial

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57245017

复制
相关文章
全局id如何生成?
我们需要一个单独的表给我们专门生成自增id,每次到这个专门生成id的表里插入一条数据拿回id,带着这个id去新增自己分表数据;
名字是乱打的
2021/12/24
1.5K0
全局id如何生成?
设计实践:AWS IoT解决方案
随着物联网设备的激增,企业需要一种解决方案来收集、存储和分析其设备的数据。Amazon Web Services提供了一些有用的工具,可为IoT设备设计强大的数据管道。
用户4122690
2020/03/29
1.4K0
设计实践:AWS IoT解决方案
[ 物联网篇 ] ESP32 AWS IoT and Amazon Alexa Development
Build the future of the connected home with AWS IoT and Amazon Alexa
程序手艺人
2020/05/04
7730
云中树莓派(5):利用 AWS IoT Greengrass 进行 IoT 边缘计算
云中树莓派(5):利用 AWS IoT Greengrass 进行 IoT 边缘计算
SammyLiu
2019/06/28
2.3K0
云中树莓派(5):利用 AWS IoT Greengrass 进行 IoT 边缘计算
Certification Vending Machine: 智能设备接入 AWS IoT 平台解决方案
AWS IoT 平台为了保证终端设备通信的安全性,终端设备与 AWS IoT 平台的 MQTT 通信使用基于证书的 TLS 1.2 双向认证体系。即 IoT 平台会验证当前设备使用的证书是否可信,同时,终端设备也会验证 IoT 平台使用的 CA 证书是否可信。
TASKCTL 任务调度平台
2021/04/02
2.1K0
Certification Vending Machine: 智能设备接入 AWS IoT 平台解决方案
openssl 如何生成自签证书
CSR 是 Certificate Signing Request 的缩写,即证书签名请求,这不是证书,只是包含申请证书的基本信息。生成证书时要把这个提交给权威的证书颁发机构,颁发机构审核通过之后,再根据这些申请信息生成相应的证书。
BUG弄潮儿
2022/06/30
1.1K0
国外物联网平台(1):亚马逊AWS IoT
设备影子服务使用MQTT话题,便于应用和设备之间的通信,下面是相关的MQTT QoS 1话题:
庞小明
2018/09/19
7.7K0
国外物联网平台(1):亚马逊AWS IoT
openssl生成cer证书_tls证书生成
wget http://www.openssl.org/source/openssl-1.0.0a.tar.gz
全栈程序员站长
2022/11/04
2.7K0
[ 物联网篇 ] aws-iot-device-sdk-embedded-C Demo 测试
aws-iot-devices-sdk 是亚马逊针对物联网推出的一套解决方案,这里主要记录如何初步跑起来。
程序手艺人
2019/02/20
2K0
常见的ID生成策略 – IdUtil – Hutool的ID生成工具
雪花算法是推特公司开源的工具:想了解前往本站:https://www.zanglikun.com/2941.html
收心
2023/03/06
9.7K0
js中生成唯一id的,动态id,随机生成
1.随机数长度控制,定义一个长度变量(length),生成可控长度的随机数: Math.random().toString(36).substr(3,length) 2.引入时间戳: Date.now().toString(36) 3.合在一起最终办法: genID(length){ return Number(Math.random().toString().substr(3,length) + Date.now()).toString(36); }
李维亮
2021/07/08
11.6K0
ID生成工具类
public static void main(String[] args) { IDUtil idUtil = new IDUtil("yyyyMMddHHmmssSSS");//yyyy-MM-dd HH:mm:ss:SSS System.out.println(idUtil.generatedNoByFormatDateAndRandom(5,"5")); } 时间格式在创建对象时传入。在集群中使用应传入机器码或唯一标识作为唯一ID的根据。 * * * 更新 添加了多个可选的构造方
派大星在吗
2021/12/17
1.9K0
云中树莓派(2):将传感器数据上传到 AWS IoT 并利用Kibana进行展示
DHT22 是一款温度与湿度传感器,它有3个针脚,左边的第一个引脚(#1)为3-5V电源,第二个引脚(#2)连接到数据输入引脚,最右边的引脚(#4)接地。
SammyLiu
2019/06/28
1.2K0
云中树莓派(2):将传感器数据上传到 AWS IoT 并利用Kibana进行展示
生成全局ID
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wzy0623/article/details/53908632
用户1148526
2019/05/25
9260
如何生成全局的分布式ID
现在的系统中,很多系统都不是单体的了,都是以集群的方式部署的。系统也是分布式的了。我们很多场景都需要生成全局的ID。比如我们将数据库进行分库分表后,就需要全局的不重复的主键ID。比如在一些业务中,我们需要给用户生成不重复的编号(这里不是数据库的主键ID),如1000,1001,1002...。那么我们如何生成全局的ID呢?
Lvshen
2022/05/05
7020
如何生成全局的分布式ID
分布式 ID 生成器如何选择?
UUID(Universally Unique Identifier)的标准型式包含32个16进制数字,以“-”连接符分为五段,形式为8-4-4-4-12的36个字符。
IT技术小咖
2019/08/22
1.8K0
分布式 ID 生成器如何选择?
id 生成器
构成: 1/2/3-时间格式化-序号 1: auth 2: admin 3: api 例:
全栈程序员站长
2022/07/04
8120
id 生成器
ID生成策略——SnowFlake
某个项目采用了数据库(MySQL)自增ID作为主要业务数据的主键。数据库自增ID使用简单,自动编号,速度快,而且是增量增长,按顺序存放,对于检索非常有利。
普通程序员
2019/10/23
1.9K0
ID生成策略——SnowFlake
随机ID生成工具
程序员朱永胜
2023/09/26
2120
openssl 签发证书_keytool生成证书
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
全栈程序员站长
2022/10/02
9570

相似问题

AWS IoT证书文件

10

RaspberryPi AWS - IoT证书问题

116

AWS IoT证书突然失败

20

获取AWS Iot CA证书信息

11

从AWS IoT证书中分离东西

14
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档