Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >合并重新平衡的分区

合并重新平衡的分区
EN

Stack Overflow用户
提问于 2019-07-01 03:06:11
回答 1查看 36关注 0票数 0

作为流媒体应用程序的最后一步,我想对系统中的乱序事件进行排序。为此,我使用:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
events.keyBy((Event event) -> event.id)
                .process(new SortFunction())
                .print();

其中sort函数为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static class SortFunction extends KeyedProcessFunction<String, Event, Event> {
        private ValueState<PriorityQueue<Event>> queueState = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "sorted-events",
                    // type information of state
                    TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
                    }));
            queueState = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
            TimerService timerService = context.timerService();

            if (context.timestamp() > timerService.currentWatermark()) {
                PriorityQueue<Event> queue = queueState.value();
                if (queue == null) {
                    queue = new PriorityQueue<>(10);
                }
                queue.add(event);
                queueState.update(queue);
                timerService.registerEventTimeTimer(event.timestamp);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
            PriorityQueue<Event> queue = queueState.value();
            Long watermark = context.timerService().currentWatermark();
            Event head = queue.peek();
            while (head != null && head.timestamp <= watermark) {
                out.collect(head);
                queue.remove(head);
                head = queue.peek();
            }
        }
    }

我现在想做的是尝试将其并行化。我现在的想法是做以下事情:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    events.keyBy((Event event) -> event.id)
                    .rebalance()
                    .process(new SortFunction()).setParalelism(3)
                    .map(new KWayMerge()).setParalelism(1).
                    .print();

如果我的理解是正确的,那么在这种情况下应该发生什么,如果我错了,应该纠正我的错误,即给定的键(理想情况下是1/3)的每个事件的一部分将转到SortFunction的每个并行实例,在这种情况下,为了有一个完整的排序,我需要创建一个map,或者另一个processFunction,它从3个不同的实例中接收排序的事件,并将它们合并在一起。

如果是这样的话,有没有办法区分map接收到的事件的来源,以便我可以在map上执行3向合并?如果这是不可能的,我的下一个想法是将PriorityQueue替换为TreeMap,并将所有内容放入一个窗口中,这样一旦收到3个TreeMaps,合并就会在窗口的末尾发生。在选项a不可行的情况下,另一个选项是否有意义,或者有没有更好的解决方案来做这样的事情?

EN

回答 1

Stack Overflow用户

发布于 2019-07-01 08:56:13

首先,您应该意识到,如果且仅当您使用基于堆的状态后端时,在Flink ValueState中使用PriorityQueue或TreeMap是一个不错的主意。在RocksDB的情况下,这将表现得相当糟糕,因为PriorityQueues将在每次访问时反序列化,并在每次更新时重新序列化。一般来说,我们推荐基于MapState的排序,这就是在Flink的库中实现排序的方式。

这段代码将做什么

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
events.keyBy((Event event) -> event.id)
            .process(new SortFunction())

是按键独立地对流进行排序--输出将针对每个键进行排序,而不是全局排序。

另一方面,这一点

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
events.keyBy((Event event) -> event.id)
                .rebalance()
                .process(new SortFunction()).setParalelism(3)

将不起作用,因为重新平衡的结果不再是KeyedStream,而SortFunction依赖于键控状态。

此外,我不认为对1/3的流进行3种排序,然后合并结果会比单一的全局排序性能明显更好。如果需要进行全局排序,则可能需要考虑改用Table API。有关示例,请参阅the answer here

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56829601

复制
相关文章
python中的twilio入门
Twilio是一个提供通信API服务的平台,可以让开发者通过代码实现短信、电话和视频等功能。在Python中,Twilio提供了一个方便易用的库,使得使用Twilio API变得简单,本文将介绍如何在Python中入门使用Twilio。
大盘鸡拌面
2023/10/24
1.1K1
使用Twilio拨打电话或短信
一直以来很想使用twilio来做拨打电话的系统,但是早期的twilio对无代码基础要求太高一直无法实现。现在,有更好的方式去实现了。
繁华是客
2023/03/03
2.5K0
PaaS独角兽Twilio为何股票遭遇滑铁卢
T客汇官网:tikehui.com 撰稿 | 窦悦怡 2016年 11月3日,美国独角兽公司、云通讯服务提供商 Twilio 发布第三季度财报。 财报显示在非美国通用会计准则下,该公司营收为 7150
人称T客
2018/03/26
1.8K0
PaaS独角兽Twilio为何股票遭遇滑铁卢
Python免费发短信香不香 - twilio
每次工作不顺心,生活无聊的时候就会去找一个有趣的python模块,最近就遇到了twillio模块,十余行代码即可完成免费发送短信的功能。
叫我阿柒啊
2022/05/09
6.3K0
Python免费发短信香不香 - twilio
使用python实现往手机发短信(基于twilio)
Twilio是一个位于加利福尼亚的云通信 (PaaS) 公司。Twilio 允许开发者通过使用它提供的 API 进行编程来接电话,收发短信等。
周小董
2019/03/25
6.1K1
使用python实现往手机发短信(基于twilio)
关于WebRTC发展的担忧和思考
作者 | Tsahi Levent-Levi 翻译 | Alex 技术审校 | 刘连响
LiveVideoStack
2022/02/11
1.2K0
关于WebRTC发展的担忧和思考
5G与上云,让云通信一飞上天
Juniper Research今年7月发布的一项研究显示,2020年CPaaS(通信平台即服务)市场规模预计将达到70亿美元,到2025年将增长到250亿美元。
科技云报道
2022/04/16
1.7K0
5G与上云,让云通信一飞上天
员工被钓鱼,云通讯巨头Twilio客户数据遭泄露
据Bleeping Computer网站8月8日消息,云通讯巨头Twilio表示,有攻击者利用短信网络钓鱼攻击窃取了员工凭证,并潜入内部系统泄露了部分客户数据。 根据Twilio在上周末的公开披露,8月4日,Twilio首次注意到了这些旨在窃取员工凭证的复杂社会工程学攻击。这些攻击者冒充公司内部的IT部门人员,向公司员工发送短信,警告他们的系统密码已经过期,需要通过点击短信附带的URL进行修改。该URL带有“Twilio”、“Okta”和“SSO”等具有高仿真性的字段,受害员工一旦点击便会跳转到一个克隆的
FB客服
2023/03/30
1.3K0
员工被钓鱼,云通讯巨头Twilio客户数据遭泄露
使用Python的flask和Nose对Twilio应用进行单元测试
首先,我们将在安装了Twilio和Flask模块的Python环境中打开一个文本编辑器,并开发出一个简单的应用程序,该应用程序将使用动词和名词创建一个Twilio会议室。
用户7466307
2020/06/17
4.9K0
继Twilio后,Cloudflare员工也遭到了同样的钓鱼攻击
继作日FreeBuf报道了《员工被钓鱼,云通讯巨头Twilio客户数据遭泄露》后,8月9日,知名云服务提供商Cloudflare 也表示,一些公司员工的系统账户凭证也在一次网络钓鱼短信攻击中被盗,手法和上周 Twilio批露的遭遇如出一辙。 根据Cloudflare在官方博客发布的说明,大约在 Twilio 遭到攻击的同时, Cloudflare 的员工也遭到了具有非常相似特征的攻击 ,有至少 76 名员工的个人或工作手机号码收到了钓鱼短信,一些短信也发送给了员工的家人。虽还无法确定攻击者是以何种方式收集
FB客服
2023/03/30
6230
继Twilio后,Cloudflare员工也遭到了同样的钓鱼攻击
2023年WebRTC趋势:黄金时代不在
 点击上方“LiveVideoStack”关注我们 ▲扫描图中二维码或点击阅读原文▲ 了解音视频技术大会更多信息 编者按:随着疫情防控全面放开,混合办公成为主流的协作方式,WebRTC作为主流的RTC基础技术自然也受到影响。在2023年,WebRTC代表的RTC技术会有怎样的剧本?本文来自Tsahi Levent-Levi,LiveVideoStack已获转载授权。 原文 / https://bloggeek.me/webrtc-predictions-2023/ 文 / Tsahi Levent-Lev
LiveVideoStack
2023/02/23
1.7K0
2023年WebRTC趋势:黄金时代不在
曾攻击云通讯巨头Twilio的黑客,又连续攻击130多个组织
在8月初接连攻击云通讯巨头Twilio和云服务商Cloudflare后,攻击者逐渐浮出水面。网络安全公司Group-IB指出,该组织在数月内疯狂入侵了130多家机构,盗取了近1万名员工的凭证。 Group-IB将该攻击组织追踪为0ktapus,该组织主要攻击使用Okta单点登录服务的企业。 Group-IB在一名客户受到网络钓鱼攻击后开展调查,结果显示,自3月以来,其至少窃取了9931个用户证书,其中超过一半包含用于访问公司网络的多因素认证码。 Group-IB高级威胁情报分析师Roberto Marti
FB客服
2023/03/30
4360
曾攻击云通讯巨头Twilio的黑客,又连续攻击130多个组织
小程序也可以用来挖掘App流量吗?
小程序是一种不需要下载安装即可使用的应用,它实现了应用“触手可及”的梦想,用户扫一扫或者搜一下即可打开应用。也体现了“用完即走”的理念,用户不用关心是否安装太多应用的问题。
二山山记
2022/09/30
1.4K0
Python3利用Twilio(国际)以及腾讯云服务(国内)免费发送手机短信
    短信服务验证服务已经不是什么新鲜事了,但是免费的手机短信服务却不多见,本次利用Python3.0基于Twilio和腾讯云服务分别来体验一下国际短信和国内短信接口。
用户9127725
2022/08/08
5.1K0
Python3利用Twilio(国际)以及腾讯云服务(国内)免费发送手机短信
用Python每天自动给女朋友免费发短信
之前发过一篇文章,用 Python 制作的给父母天气预报提醒的小工具。这篇文章我同步到博客上之后,有读者在评论区留言,对于部分微信没有网页版接口,导致无法实现这个功能,这位读者建议,建议用发短信的方式,这样,就不会受限于微信的限制。
数据森麟
2019/11/11
18K2
用Python每天自动给女朋友免费发短信
Twilio推出Autopilot,可使会话电话,短信和语音机器人无缝协作
Twilio希望将AI与呼叫中心联系起来。在旧金山举行的年度Signal开发者大会期间,该公司采用了Autopilot,这是一种自然语言服务,使开发人员能够构建可无缝协作的会话电话,短信和语音机器人。Autopilot本周开始在Twilio控制台的公共测试版中提供。
AiTechYun
2018/11/05
1.3K0
Python15行代码实现免费发送手机短信,推送消息「建议收藏」
通过代码定时给手机推送短信,短信内容可以自定义文字,当然你也可以去别的网站爬取每日心灵鸡汤,天气预报或其它信息进行推送。
全栈程序员站长
2022/07/11
11.3K1
Python15行代码实现免费发送手机短信,推送消息「建议收藏」
教你怎么用Python每天自动给女朋友免费发短信
发送短信接口,我知道的常见的有两个平台,一个是 twilio,可以免费发短信 500 条,可发任意信息,一个是腾讯云,可以免费发短信 100 条,需要申请短信发送内容模板。
程序员小新
2022/01/07
5.4K0
Python发手机短信
当然了,天下没有免费的午餐,能用这个功能,肯定有人在为你付费,这是印尼的一个服务商在为你默默提供
py3study
2020/01/15
5.9K0
点击加载更多

相似问题

由于cors策略问题,Axios和Vue不会对我的服务器进行api调用

21

SockitIO不断被cors策略屏蔽

128

为什么XMLHttpRequest会被CORS策略屏蔽,而urllib正常工作?

10

有没有我可以屏蔽的ip列表?

14

GitHub API CORS策略

28
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文