首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Pinterest是如何基于Flink做实时分析的?

在Pinterest,我们每天都要进行数千个实验。我们主要依靠日常实验指标来评估实验效果。日常实验管道运行一次可能会花费10多个小时,有时还会超时,因此想要验证实验设置、触发的正确性以及预期的实验性能时就没那么方便了。当代码中存在一些错误时这个问题尤为突出。有时可能要花几天时间才能发现错误,这对用户体验和重要指标造成了更大的损害。我们在Pinterest开发了一个近实时实验平台,以提供更具时效性的实验指标,从而帮助我们尽快发现这些问题。

可能出现的问题有:

  1. 实验导致impression的统计数据显著下降,因此需要尽快关闭实验。
  2. 与对照组相比,实验导致搜索的执行次数显著增加。

图1-带有置信区间的实时实验指标

上图的面板显示了所选事件的实验组和对照组的流量(也就是动作数)和倾向(也就是unique user的数量)。自实验开始以来,这些计数已经累计了3天时间。如果在3天后发生了re-ramp(分配给实验组和对照组的用户数量增加),则计数会归零0并重新开始累计3天时间。

为了确保实验组与对照组之间的对比在统计上是有效的,我们做了一些统计检验。由于指标是实时交付的,因此每次按顺序收到新记录时,我们都必须进行这些检验。这需要与传统的固定视野检验不一样的方法,否则会带来较高的假正率。我们考虑过几种顺序测试方法,包括赌徒破产贝叶斯A/B检验Alpha消耗函数方法。为了保证数值稳定性,我们从t检验+ Boferroni校正(将我们的案例作为多次检验进行处理)开始,并为我们的初始实现预先确定了检验次数。

高阶设计

图2-实时实验管道的高阶设计

实时实验管道包括下列主要组件:

  • 最近ramp的实验组作业→每5分钟将一个CSV文件发布到一个S3位置。这个CSV是过去3天中所分配用户有所增加的实验组的快照。通过查询托管实验元数据的内部Analytics(分析)应用程序的MySQL数据库,就能获得这一信息。
  • 筛选事件作业→我们分析了Pinterest上的数百种用户动作。这一作业仅保留最关key的业务事件,这些事件已插入“filtered_events”Kafka主题中。这些事件被剥离掉了不需要的字段,因此filtered_events主题相当轻巧。该作业运行在Flink processing时间内,并且通过Flink的增量检查点,每隔5秒将其进度保存到HDFS中。
  • 过滤实验Activation作业→每当一个用户被触发进入一个实验时,都会创建一个Activation(激活)记录。触发规则取决于实验逻辑,一名用户可以被触发进入一个实验数百次。我们只需要最近3天启动,或组分配增加的实验的Activation记录即可。

为了过滤Activation记录,此作业使用Flink的广播状态模式。每10秒检查一次“最近ramp的实验组”作业所发布的CSV的更改情况,并将其发布到一个KeyedBroadcastProcessFunction的所有分区上,该函数也消费Activation。

KeyedBroadcastProcessFunction将广播的CSV与Activation流结合在一起,就可以过滤掉那些最近3天内未ramp-up实验的Activation记录。此外,“group-ramp-up-time”已添加到Activation记录中,并插入“filtered_experiment_activations”kafka主题中。

图3-Scala对象被插入中间层Kafka主题中

图4-实时实验累积作业图

上面是实时累积(Aggregation)Flink作业的高阶概览。这里简单提及了一些operator,后文中还将详细介绍另一些operator。Source operator从Kafka读取数据,而sink使用一个REST接口写入我们的内部Analytics Store上。

删除重复事件→这里用一个KeyedProcessFunction实现,由(event.user_id,event.event_type,event.timestamp)作为key。这里的思想是,如果来自同一用户的相同事件类型的事件具有相同的时间戳,则它们是重复事件。第一个这样的事件被发送到下游,但也会缓存进状态持续5分钟时间。任何后续事件都将被丢弃。5分钟后,一个计时器会启动并清除状态。这里的假定是所有重复事件之间的间隔都在5分钟之内。

查找首次触发时间→这里是一个Flink KeyedProcessFunction,由(experiment_hash,experiment_group,user_id)作为key。这里的假设是,为一个用户收到的第一个实验Activation记录也是具有第一个触发时间的Activation。一个实验ramp-up以后,收到的第一个Activation将发送至下游,并保存为状态并持续3天时间(我们累积了实验组ramp-up以来为期3天的计数)。经过3天的ramp时间后,一个计时器将清除状态。

15分钟的processing时间tumbling窗口→事件进入并向下游发送结果时,Numerator Computer和Denominator computer都将累积计数。这意味着数百万条记录,但是我们不需要如此频繁地将结果发送到Analytics Store上。我们可以在processing时间内运行一个持续15分钟的Flink tumbling窗口,这样效率更高。对于Numerator Computer来说,这个窗口由(“experiment_hash”,“experiment_group”,“event_type”,“timestamp”)作为key。当窗口在15分钟后触发时,将获取带有max_users的记录并将其发送到下游的Analytics Store sink。

连接事件和Activation

图5-通过用户ID连接Activation流与事件流

我们使用Flink的IntervalJoin operator实现流到流的连接。IntervalJoin会在接下来的3天内缓冲每位用户的单个Activation记录,并且所有匹配事件都将与Activation记录中的其他实验元数据一起发送到下游。

这种方法的局限性:

  1. 对我们的需求而言,IntervalJoin operator有点不够灵活,因为它的间隔是固定的而不是动态的。比如说,用户可以在实验启动2天后加入进来,但IntervalJoin还是会为这名用户运行3天时间,也就是说我们停止累积数据后还会运行2天时间。如果3天后组很快re-ramp,则一位用户也可以有2个这样的连接。这种情况会在下游处理。
  2. 事件和Activation不同步:如果Activation作业失败并且Activation流被延迟,则可能会丢失一些数据,因为没有匹配Activation的事件还会继续流动。这将导致计数不足。

我们研究了Flink的IntervalJoin源代码。它会在“左侧缓冲区”中缓冲Activation 3天时间,但事件将被立即删除。目前似乎无法通过配置更改此行为。我们正在研究使用Flink的协同处理函数来实现这个Activation到事件的连接,该函数是用于流到流连接的更通用的函数。我们可以将事件缓冲X分钟,这样即使Activation流延迟了X分钟,管道也可以处理延迟而不会出现计数不足。这将帮助我们避免同一用户的两次连接,并能形成更加动态的管道,其可以立即感知到实验组的re-ramp,并支持更多动态行为,例如在组re-ramp时自动扩展累积的覆盖范围 。

Join Results Deduplicator

图6-Join Results Deduplicator

Join Results Deduplicator是一个Flink KeyedProcessFunction,它由experiment_hash,experiment_group,event_type,user_id作为key。这个operator的主要目的是在向下游发送记录时插入“user_first_time_seen”标志——下游Numerator Computer使用这个标志来计算倾向编号(# unique users),而无需使用设置的数据结构。

这个operator将状态存储到last-ramp-time+ 3天,之后状态将被清除。

Numerator Computer

图7-Numerator Computer

Numerator Computer是一个KeyedProcessFunction,由experiment_hash,experiment_group,event_type作为key。它会在最后2小时内一直滚动15分钟的存储桶(bucket),每当有新记录进入时都会更新这些桶。对于流量来说,每个动作都很重要;因此对于每个事件,动作计数都会增加。对于倾向数字(unique user)——它取决于"first_time_seen”标志(仅在为true时递增)。

随着时间的流逝,存储桶会滚动/旋转。每次新事件进入时,存储桶数据都会向下游刷新到15分钟的tumbling窗口中。

它有一个时间为3天的计时器(从ramp-up时间→3天),可在触发后清除所有状态,这样就能在ramp-up3天后重置/清除计数,完成归零。

垃圾消息与处理

为了使我们的流管道具有容错能力,Flink的增量检查点和RocksDB状态后端被用来保存应用程序检查点。我们面临的一项有趣挑战是检查点失败。问题似乎在于检查点流程需要花费很长时间,并且最终会超时。我们还注意到,在发生检查点故障时通常也会有很高的背压。

图8-Flink UI中显示的检查点故障

在仔细检查了检查点故障的内部机制之后,我们发现超时是由于某些子任务未将确认发送给检查点协调器而导致的,整个检查点流程都卡住了,如下所示。

图9-子任务未发送确认

然后我们针对导致失败的根本原因应用了一些调试步骤:

  1. 检查作业管理日志
  2. 检查在检查点期间卡住的子任务的任务管理器日志
  3. 使用Jstack详细查看子任务

原来子任务运行很正常,只是抽不出空来处理消息。结果,这个特定的子任务具有很高的背压,从而阻止了barrier通过。没有barrier的收据,检查点流程将无法进行。

在进一步检查所有子任务的Flink指标之后,我们发现其中一个子任务产生的消息数量比其对等任务多100倍。由于消息是通过user_id在子任务之间分区的,这表明有些用户产生的消息比其他用户多得多,这就意味着那是垃圾消息。临时查询我们的spam_adjusted数据集后也确认了这一结果。

图10-不同子任务的消息数

为了缓解该问题,我们在“过滤器事件作业”中应用了一个上限规则:对于一个小时内的用户,如果我们看到的消息多于X条,则仅发送前X条消息。应用上限规则后,检查点就不再出现故障了。

数据稳健性和验证

数据准确性对于实验指标的计算而言更为重要。为了确保我们的实时实验流程按预期运行,并始终提供准确的指标,我们启动了一个单独的每日工作流,其执行与流作业相同的计算,但使用的是临时方式。如果流作业结果违反以下任一条件,则会提醒开发人员:

  • 在同一累积期间(本例中为3天),计数不应减少
  • 如果在第一个累积期之后进行了re-ramp,则计数应从0开始再累积3天
  • 流结果与验证流结果之间的差异不应超过某个阈值(在我们的例子中为2%)。

通过查询实验元数据,我们分别在3种情况下对实验进行了验证:

  1. 单次ramp-up实验
  2. 在初始累积期间内进行多次ramp-up实验
  3. 在初始累积期后进行多次ramp-up实验

这一流程如下所示:

图11-验证流程

规模

在这一部分中,我们提供了一些基本统计信息,展示实时实验管道的规模:

  1. 输入主题流量(一天的平均值):

Kafka主题名称

消息数/每秒

MB/每秒

experiment_activation

2,513,006.863

1,873.295

event

127,347.091

64.704

filted_experiment_activation

876,906.711

88.237

filtered_backend_event

9,478.253

0.768

  1. 100G检查点
  2. 200~300个实验
  3. 8个master,50个worker,每个都是ec2 c5d.9xlarge
  4. 计算的并行度为256

未来计划

  1. 支持更多指标,例如PWT(pinner等待时间),这样如果实验导致Pinner的延迟异常增加,则可以尽快停止。
  2. 可能更新管道以使用Flink的协同处理功能代替“间隔连接”,使管道更具动态性和弹性,以应对事件流和Activation流之间的不同步问题。
  3. 分区:研究分区可以支持的分区类型,因为分区会导致状态增加。
  4. 通过电子邮件或Slack支持实时警报。

致谢

实时实验分析是Pinterest在生产环境中的第一个基于Flink的应用程序。非常感谢我们的大数据平台团队(特别感谢Steven Bairos-Novak、Jooseong Kim和Ang Zhang)构建了Flink平台并将其作为服务提供出来。同时还要感谢Analytics Platform团队(Bo Sun)出色的可视化效果,Logging Platform团队提供实时数据提取,以及Data Science团队(Brian Karfunkel)提供的统计咨询!

原文链接

https://www.ververica.com/blog/real-time-experiment-analytics-at-pinterest-using-apache-flink

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/d7iRao1DooFCHrGvCt9L
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券