作为流媒体应用程序的最后一步,我想对系统中的乱序事件进行排序。为此,我使用:
events.keyBy((Event event) -> event.id)
.process(new SortFunction())
.print();
其中sort
函数为:
public static class SortFunction extends KeyedProcessFunction<String, Event, Event> {
private ValueState<PriorityQueue<Event>> queueState = null;
@Override
public void open(Configuration config) {
ValueStateDescriptor<PriorityQueue<Event>> descriptor = new ValueStateDescriptor<>(
// state name
"sorted-events",
// type information of state
TypeInformation.of(new TypeHint<PriorityQueue<Event>>() {
}));
queueState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
PriorityQueue<Event> queue = queueState.value();
if (queue == null) {
queue = new PriorityQueue<>(10);
}
queue.add(event);
queueState.update(queue);
timerService.registerEventTimeTimer(event.timestamp);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<Event> out) throws Exception {
PriorityQueue<Event> queue = queueState.value();
Long watermark = context.timerService().currentWatermark();
Event head = queue.peek();
while (head != null && head.timestamp <= watermark) {
out.collect(head);
queue.remove(head);
head = queue.peek();
}
}
}
我现在想做的是尝试将其并行化。我现在的想法是做以下事情:
events.keyBy((Event event) -> event.id)
.rebalance()
.process(new SortFunction()).setParalelism(3)
.map(new KWayMerge()).setParalelism(1).
.print();
如果我的理解是正确的,那么在这种情况下应该发生什么,如果我错了,应该纠正我的错误,即给定的键(理想情况下是1/3)的每个事件的一部分将转到SortFunction
的每个并行实例,在这种情况下,为了有一个完整的排序,我需要创建一个map
,或者另一个processFunction
,它从3个不同的实例中接收排序的事件,并将它们合并在一起。
如果是这样的话,有没有办法区分map
接收到的事件的来源,以便我可以在map
上执行3向合并?如果这是不可能的,我的下一个想法是将PriorityQueue
替换为TreeMap
,并将所有内容放入一个窗口中,这样一旦收到3个TreeMaps
,合并就会在窗口的末尾发生。在选项a不可行的情况下,另一个选项是否有意义,或者有没有更好的解决方案来做这样的事情?
发布于 2019-07-01 08:56:13
首先,您应该意识到,如果且仅当您使用基于堆的状态后端时,在Flink ValueState中使用PriorityQueue或TreeMap是一个不错的主意。在RocksDB的情况下,这将表现得相当糟糕,因为PriorityQueues将在每次访问时反序列化,并在每次更新时重新序列化。一般来说,我们推荐基于MapState的排序,这就是在Flink的库中实现排序的方式。
这段代码将做什么
events.keyBy((Event event) -> event.id)
.process(new SortFunction())
是按键独立地对流进行排序--输出将针对每个键进行排序,而不是全局排序。
另一方面,这一点
events.keyBy((Event event) -> event.id)
.rebalance()
.process(new SortFunction()).setParalelism(3)
将不起作用,因为重新平衡的结果不再是KeyedStream,而SortFunction依赖于键控状态。
此外,我不认为对1/3的流进行3种排序,然后合并结果会比单一的全局排序性能明显更好。如果需要进行全局排序,则可能需要考虑改用Table API。有关示例,请参阅the answer here。
https://stackoverflow.com/questions/56829601
复制