首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >合并重新平衡的分区

合并重新平衡的分区
EN

Stack Overflow用户
提问于 2019-07-01 03:06:11
回答 1查看 36关注 0票数 0

作为流媒体应用程序的最后一步,我想对系统中的乱序事件进行排序。为此,我使用:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
events.keyBy((Event event) -> event.id)
                .process(new SortFunction())
                .print();

其中sort函数为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static class SortFunction extends KeyedProcessFunction<String, Event, Event> {
        private ValueState<PriorityQueue<Event>> queueState = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "sorted-events",
                    // type information of state
                    TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
                    }));
            queueState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
            TimerService timerService = context.timerService();

            if (context.timestamp() > timerService.currentWatermark()) {
                PriorityQueue<Event> queue = queueState.value();
                if (queue == null) {
                    queue = new PriorityQueue<>(10);
                }
                queue.add(event);
                queueState.update(queue);
                timerService.registerEventTimeTimer(event.timestamp);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
            PriorityQueue<Event> queue = queueState.value();
            Long watermark = context.timerService().currentWatermark();
            Event head = queue.peek();
            while (head != null && head.timestamp <= watermark) {
                out.collect(head);
                queue.remove(head);
                head = queue.peek();
            }
        }
    }

我现在想做的是尝试将其并行化。我现在的想法是做以下事情:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    events.keyBy((Event event) -> event.id)
                    .rebalance()
                    .process(new SortFunction()).setParalelism(3)
                    .map(new KWayMerge()).setParalelism(1).
                    .print();

如果我的理解是正确的,那么在这种情况下应该发生什么,如果我错了,应该纠正我的错误,即给定的键(理想情况下是1/3)的每个事件的一部分将转到SortFunction的每个并行实例,在这种情况下,为了有一个完整的排序,我需要创建一个map,或者另一个processFunction,它从3个不同的实例中接收排序的事件,并将它们合并在一起。

如果是这样的话,有没有办法区分map接收到的事件的来源,以便我可以在map上执行3向合并?如果这是不可能的,我的下一个想法是将PriorityQueue替换为TreeMap,并将所有内容放入一个窗口中,这样一旦收到3个TreeMaps,合并就会在窗口的末尾发生。在选项a不可行的情况下,另一个选项是否有意义,或者有没有更好的解决方案来做这样的事情?

EN

回答 1

Stack Overflow用户

发布于 2019-07-01 08:56:13

首先,您应该意识到,如果且仅当您使用基于堆的状态后端时,在Flink ValueState中使用PriorityQueue或TreeMap是一个不错的主意。在RocksDB的情况下,这将表现得相当糟糕,因为PriorityQueues将在每次访问时反序列化,并在每次更新时重新序列化。一般来说,我们推荐基于MapState的排序,这就是在Flink的库中实现排序的方式。

这段代码将做什么

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
events.keyBy((Event event) -> event.id)
            .process(new SortFunction())

是按键独立地对流进行排序--输出将针对每个键进行排序,而不是全局排序。

另一方面,这一点

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
events.keyBy((Event event) -> event.id)
                .rebalance()
                .process(new SortFunction()).setParalelism(3)

将不起作用,因为重新平衡的结果不再是KeyedStream,而SortFunction依赖于键控状态。

此外,我不认为对1/3的流进行3种排序,然后合并结果会比单一的全局排序性能明显更好。如果需要进行全局排序,则可能需要考虑改用Table API。有关示例,请参阅the answer here

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56829601

复制
相关文章
【说站】NodeJS如何在文件中追加内容
2、appendFile()可以接收三个参数,第一个是路径,第二个是内容,第三个是回调函数,执行node learnNode.js即可。
很酷的站长
2022/11/24
2.9K0
nodejs中如何使用流数据读写文件
在nodejs中,可以使用fs模块的readFile方法、readFileSync方法、read方法和readSync方法读取一个文件的内容,还可以使用fs模块的writeFile方法、writeFileSync方法、write方法和writeSync方法向一个文件中写入内容。
前端_AWhile
2019/08/29
6.2K0
Nodejs中读取文件目录中的所有文件
关于Nodejs中的文件系统即File System可以参考官方Node.js v12.18.1的文档File system
ccf19881030
2020/06/28
14.8K0
nodejs与javascript中的aes加密
简介 1.aes加密简单来说,在密码学中又称Rijndael加密法,是美国联邦政府采用的一种区块加密标准。这个标准用来替代原先的DES,已经被多方分析且广为全世界所使用。高级加密标准已然成为对称密钥加密中最流行的算法之一。 2.AES的区块长度固定为128 比特,密钥长度则可以是128,192或256比特;而Rijndael使用的密钥和区块长度可以是32位的整数倍,以128位为下限,256比特为上限。包括AES-ECB,AES-CBC,AES-CTR,AES-OFB,AES-CFB。   具体的加密算法和模
磊哥
2018/05/08
3.9K0
【DB笔试面试511】如何在Oracle中写操作系统文件,如写日志?
可以利用UTL_FILE包,但是,在此之前,要注意设置好UTL_FILE_DIR初始化参数。
AiDBA宝典
2019/09/30
28.8K0
【DB笔试面试511】如何在Oracle中写操作系统文件,如写日志?
如何在nodejs中实现兄弟进程通信
在nodejs主进程中,开启一个额外的子进程A,进程A负责和线程池通信,完成cpu密集型的任务。通过nodejs主进程创建出来的多个nodejs工作进程可以把任务提交到进程A,然后拿到处理结果。
theanarkh
2020/10/10
1.4K0
如何在nodejs中实现兄弟进程通信
nodejs中追加内容到文件
最近在使用nodejs写日志记录的时候,发现一个问题:使用fs模块读写文件,调用writeFile(path,data)或者writeFileSync(path,data)时会将日志文件原来的内容给覆盖掉,显然这不是我所想要的结果,我想要的效果是在文件末尾追加,所以需要使用appendFile(path,data)或者appendFileSync(path,data)方法。以下内容转载自nodejs中追加内容到文件
ccf19881030
2020/10/16
4.8K0
nodejs中追加内容到文件
nodejs中的文件系统
nodejs使用了异步IO来提升服务端的处理效率。而IO中一个非常重要的方面就是文件IO。今天我们会详细介绍一下nodejs中的文件系统和IO操作。
用户2323866
2021/06/22
1.5K0
nodejs中的文件系统
nodejs使用了异步IO来提升服务端的处理效率。而IO中一个非常重要的方面就是文件IO。今天我们会详细介绍一下nodejs中的文件系统和IO操作。
程序那些事
2021/01/28
1.3K0
nodejs使用aes-128-ecb加密如何在c#中解密
却发现C#端怎么也解密不了,一直报错,改了一整天,后来终于发现,nodejs端加密用的key其实在使用之前已经使用md5加密了一次,而这个操作是默认的,暂时没发现有配置可以默认去掉,服务端如果需要使用这个key解密,则需要也同样使用MD5加密
frontoldman
2019/09/03
2.6K0
NodeJS使用formidable实现文件上传
  最近自学了一下NodeJS,然后做了一个小demo,实现歌曲的添加、修改、播放和删除的功能,其中自然要实现音乐和图片的上传功能。于是上网查找资料,找到了一个formidable插件,该插件可以很好的实现文件的上传功能。该小demo用到了MySQL数据库,所有的数据都存放到了数据库中。下面简单说一些如何使用。 1.创建app.js主文件 const express = require('express'); const router = require('./router'); const path =
用户1174387
2018/01/17
2.2K0
使用nodejs做文件下载中转
之前做了一个功能就是点击按钮实现文件下载,文件保存在了阿里云的OSS上,阿里的OSS和七牛的OSS其实个人感觉差不多,一般情况下,前端下载文件很多都是通过一个a标签来进行下载。但是对于OSS存储的文件比如图片点击后在浏览器直接打开了,即使是添加了download属性也无济于事,于是我就想到了使用nodejs来搭建一个中转站。
OECOM
2020/07/02
3.4K0
nodejs文件上传组件multer使用
多图上传,发送端: var express = require('express') var rp = require('request-promise') var fs = require("fs
用户1141560
2017/12/26
2.8K0
如何在JavaScript中使用for循环
循环允许我们通过循环数组或对象中的项并做一些事情,比如说打印它们,修改它们,或执行其他类型的任务或动作。JavaScript有各种各样的循环,for循环允许我们对一个集合(如数组)进行迭代。
chuckQu
2022/11/28
5.1K0
如何在JavaScript中使用for循环
NodeJS & Dapr Javascript SDK 官方使用指南
Dapr 是一个可移植的、事件驱动的运行时,它使任何开发人员能够轻松构建出弹性的、无状态和有状态的应用程序,并可运行在云平台或边缘计算中,它同时也支持多种编程语言和开发框架。Dapr 确保开发人员专注于编写业务逻辑,不必分神解决分布式系统难题,从而显著提高了生产力。Dapr 降低了构建微服务架构类现代云原生应用的门槛。
为少
2022/12/06
9130
NodeJS & Dapr Javascript SDK 官方使用指南
【译】如何在JavaScript中复制Object
不管在什么编程语言中,复制一个对象的值而不是它的引用都是一个十分常见的工作。复值对象的值和复制对象的引用的区别在与通过复制值可以得到两个有着相同值或数据,但是毫不相干的对象,复制引用意味着得到的两个对象在内存中指向相同的数据块。当objet A和object B都引用自相同的底层数据时,只要你操作object A,就会修改到object B。
腾讯IVWEB团队
2020/06/28
2.2K0
前端必读:如何在 JavaScript 中使用SpreadJS导入和导出 Excel 文件
JavaScript在前端领域占据着绝对的统治地位,目前更是从浏览器到服务端,移动端,嵌入式,几乎所有的所有的应用领域都可以使用它。技术圈有一句很经典的话“凡是能用JavaScript实现的东西,最后都会用JavaScript实现”。 Excel 电子表格自 1980 年代以来一直为各行业所广泛使用,至今已拥有超过3亿用户,大多数人都熟悉 Excel 电子表格体验。许多企业在其业务的各个环节中使用了 Excel 电子表格进行数据管理。
葡萄城控件
2022/10/04
4.1K0
前端必读:如何在 JavaScript 中使用SpreadJS导入和导出 Excel 文件
如何在JavaScript中处理大量数据
在几年之前,开发人员不会去考虑在服务端之外处理大量的数据。现在这种观念已经改变了,很多Ajax程序需要在客户端和服务器端传输大量的数据。此外,更新DOM节点的处理在浏览器端来看也是一个很耗时的工作。而且,需要对这些信息进行分析处理的时候也很可能导致程序无响应,浏览器抛出错误。 将需要大量处理数据的过程分割成很多小段,然后通过JavaScript的计时器来分别执行,就可以防止浏览器假死。先看看怎么开始: function ProcessArray(data,handler,callback){ Process
CSDN技术头条
2018/02/09
3K0
如何在 Chrome 中执行 JavaScript 代码
要在浏览器中执行 JavaScript 脚本,首先你的浏览器得支持。现在主流推荐 Chrome 浏览器,也可以使用基于 Chromium 的 Edge 浏览器。下面来介绍如何在 Chrome 中打开开发者工具,以及如何在开发者工具中运行调试 JavaScript 代码。
村雨遥
2022/03/14
5.9K0
如何在 Chrome 中执行 JavaScript 代码
点击加载更多

相似问题

Twilio:使用一个Twilio电话号码同时召开多个呼出会议。

23

Twilio记录会议

126

Twilio弹性SIP集群和Twilio SIP有什么区别?

12

Twilio电话会议

15

Twilio:从会议拨出

11
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文