我已经创建了一个Java apache光束流管道,我计划在google dataflow上运行它。它接收与以下内容类似的元素:
ipAddress, serviceUsed, errorOrSuccess, time, parameter, etc.
例如
'237.98.58.248', 'service1', 'error', '12345', 'randomParameter', etc
我目前根据事件时间将这些数据放入固定的窗口中。我想使用我的管道来计算每个窗口接收的每个ip地址的错误数和成功数,然后丰富原始数据。
我希望调整每个原始元素,以输出类似于以下内容的最终元素:
totalErrorsInThisWindow, totalSuccessInThisWindow, ipAddress, serviceUsed, errorOrSuccess, time, parameter, etc.
例如
'237.98.58.248', 'service1', 'error', '12345', 'randomParameter', etc
'149.142.114.250', 'service2', 'success', '12346', 'randomParameter', etc
'237.98.58.248', 'service3', 'error', '12344', 'randomParameter', etc
...
变成类似这样的东西
'100', '1000', '237.98.58.248', 'service1', 'error', '12345', 'randomParameter', etc
'11', '34', '149.142.114.250', 'service2', 'success', '12346', 'randomParameter', etc
'100', '1000', '237.98.58.248', 'service3', 'error', '12344', 'randomParameter', etc
...
对如何做到这一点有什么建议吗?
我知道几种方法来计算每个客户端、每个窗口的totalErrorsInThisWindow
和totalSuccessInThisWindow
-一种方法是删除除ipAddress
和errorOrSuccess
之外的所有列,然后执行apply(Count.<String>perElement());
。然而,我正在努力丰富原始数据。第一个想法是使用侧输入,但我认为使用不断变化的侧输入不会很好地工作。
另一种选择是为成功和失败维护一个基于键的状态变量,我可以在处理每个元素时递增状态变量,并使用它来丰富相同DoFn中的数据。但是,我遇到的问题是,只有在窗口中为每个键处理的最后一个元素才会有正确的成功和失败值。
下面是一个我可以用state做什么与我想要用state做什么的例子:
输入:
'a'
'b'
'a'
'a'
我可以使用state获得的输出:
'a':1
'b':1
'a':2
'a':3
我希望使用state获得的输出:
'a':3
'b':1
'a':3
'a':3
我希望我的问题是明确的,我希望我目前的方法和挑战也是明确的。任何建议都将不胜感激。
发布于 2019-07-24 08:33:50
请看一下GroupByKey和组合器,以及how to use it with windowing。
我认为像这样的东西会工作得很好。您可以按IP分组,应用窗口并统计错误和成功次数。
PCollection<MyRecord> records = <Read from your source>
PCollection<KV<string, MyRecord>> withIP = records.apply(ParDo.of(
new DoFn<MyRecord, KV<KV<string, MyRecord>>>() {
// Implement processElement and call outputWithTimestamp
}
));
PCollection<MyRecord> windowed = withIP.apply(
Window.<MyRecord>into(FixedWindows.of(Duration.standardSeconds(60))));
PCollection<KV<String, Iterable<MyRecords>>> grouped =
windowed.apply(GroupByKey.<String, MyRecords>create());
PCollection<KV<String, MyErrorStats>> errorsPerIP =
playerAccuracy.apply(Combine.<String, MyRecord, MyErrorStats>perKey(
new MyErrorStatsCombiner())));
public static class MyErrorStatsCombiner implements
SerializableFunction<Iterable<MyRecords>, MyErrorStats> {
@Override
public Integer apply(Iterable<MyRecords> record) {
MyErrorStats stats = new MyErrorStats();
for (int item : record) {
stats.errorsInThisWindow += item.errorsInThisWindow;
stats.successInThisWindow += item.successInThisWindow;
}
return stats;
}
}
至于在记录中保留其他元数据字段,您可以决定如何在MyErrorStatsCombiner中聚合/保留这些字段。
我不清楚你是真的想按IP分组,还是按多个不同的元数据域分组。如果您想要按多个元数据字段进行分组并获取所有这些字段的计数,那么。这可能是一个有用的参考。GroupBy using multiple data properties。你可以先按IP分组,不管是成功还是失败。但是,我认为您不能在同一记录中获得所需的总错误和成功次数的输出。例如,您可以使用bigquery查询轻松完成最后一部分。
https://stackoverflow.com/questions/57096322
复制相似问题