首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >Apache beam/Google数据流通过下游聚合丰富了上游记录

Apache beam/Google数据流通过下游聚合丰富了上游记录
EN

Stack Overflow用户
提问于 2019-07-18 22:00:22
回答 1查看 173关注 0票数 1

我已经创建了一个Java apache光束流管道,我计划在google dataflow上运行它。它接收与以下内容类似的元素:

代码语言:javascript
代码运行次数:0
运行
复制
ipAddress, serviceUsed, errorOrSuccess, time, parameter, etc.

例如

代码语言:javascript
代码运行次数:0
运行
复制
'237.98.58.248', 'service1', 'error', '12345', 'randomParameter', etc

我目前根据事件时间将这些数据放入固定的窗口中。我想使用我的管道来计算每个窗口接收的每个ip地址的错误数和成功数,然后丰富原始数据。

我希望调整每个原始元素,以输出类似于以下内容的最终元素:

代码语言:javascript
代码运行次数:0
运行
复制
totalErrorsInThisWindow, totalSuccessInThisWindow, ipAddress, serviceUsed, errorOrSuccess, time, parameter, etc.

例如

代码语言:javascript
代码运行次数:0
运行
复制
'237.98.58.248', 'service1', 'error', '12345', 'randomParameter', etc
'149.142.114.250', 'service2', 'success', '12346', 'randomParameter', etc
'237.98.58.248', 'service3', 'error', '12344', 'randomParameter', etc
...

变成类似这样的东西

代码语言:javascript
代码运行次数:0
运行
复制
'100', '1000', '237.98.58.248', 'service1', 'error', '12345', 'randomParameter', etc
'11', '34', '149.142.114.250', 'service2', 'success', '12346', 'randomParameter', etc
'100', '1000', '237.98.58.248', 'service3', 'error', '12344', 'randomParameter', etc
...

对如何做到这一点有什么建议吗?

我知道几种方法来计算每个客户端、每个窗口的totalErrorsInThisWindowtotalSuccessInThisWindow -一种方法是删除除ipAddresserrorOrSuccess之外的所有列,然后执行apply(Count.<String>perElement());。然而,我正在努力丰富原始数据。第一个想法是使用侧输入,但我认为使用不断变化的侧输入不会很好地工作。

另一种选择是为成功和失败维护一个基于键的状态变量,我可以在处理每个元素时递增状态变量,并使用它来丰富相同DoFn中的数据。但是,我遇到的问题是,只有在窗口中为每个键处理的最后一个元素才会有正确的成功和失败值。

下面是一个我可以用state做什么与我想要用state做什么的例子:

输入:

代码语言:javascript
代码运行次数:0
运行
复制
'a'
'b'
'a'
'a'

我可以使用state获得的输出:

代码语言:javascript
代码运行次数:0
运行
复制
'a':1
'b':1
'a':2
'a':3

我希望使用state获得的输出:

代码语言:javascript
代码运行次数:0
运行
复制
'a':3
'b':1
'a':3
'a':3

我希望我的问题是明确的,我希望我目前的方法和挑战也是明确的。任何建议都将不胜感激。

EN

回答 1

Stack Overflow用户

发布于 2019-07-24 08:33:50

请看一下GroupByKey和组合器,以及how to use it with windowing

我认为像这样的东西会工作得很好。您可以按IP分组,应用窗口并统计错误和成功次数。

代码语言:javascript
代码运行次数:0
运行
复制
PCollection<MyRecord> records = <Read from your source>

PCollection<KV<string, MyRecord>> withIP = records.apply(ParDo.of(
    new DoFn<MyRecord, KV<KV<string, MyRecord>>>() {
      // Implement processElement and call outputWithTimestamp
    }
));


PCollection<MyRecord> windowed = withIP.apply(
    Window.<MyRecord>into(FixedWindows.of(Duration.standardSeconds(60))));

PCollection<KV<String, Iterable<MyRecords>>> grouped = 
    windowed.apply(GroupByKey.<String, MyRecords>create());


PCollection<KV<String, MyErrorStats>> errorsPerIP =
  playerAccuracy.apply(Combine.<String, MyRecord, MyErrorStats>perKey(
    new MyErrorStatsCombiner())));


public static class MyErrorStatsCombiner implements     
SerializableFunction<Iterable<MyRecords>, MyErrorStats> {
  @Override
  public Integer apply(Iterable<MyRecords> record) {
    MyErrorStats stats = new MyErrorStats();
    for (int item : record) {
      stats.errorsInThisWindow += item.errorsInThisWindow;
      stats.successInThisWindow += item.successInThisWindow;
    }
    return stats;
  }
}

至于在记录中保留其他元数据字段,您可以决定如何在MyErrorStatsCombiner中聚合/保留这些字段。

我不清楚你是真的想按IP分组,还是按多个不同的元数据域分组。如果您想要按多个元数据字段进行分组并获取所有这些字段的计数,那么。这可能是一个有用的参考。GroupBy using multiple data properties。你可以先按IP分组,不管是成功还是失败。但是,我认为您不能在同一记录中获得所需的总错误和成功次数的输出。例如,您可以使用bigquery查询轻松完成最后一部分。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57096322

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档