在Pinterest,我们每天都要进行数千个实验。我们主要依靠日常实验指标来评估实验效果。日常实验管道运行一次可能会花费10多个小时,有时还会超时,因此想要验证实验设置、触发的正确性以及预期的实验性能时就没那么方便了。当代码中存在一些错误时这个问题尤为突出。有时可能要花几天时间才能发现错误,这对用户体验和重要指标造成了更大的损害。我们在Pinterest开发了一个近实时实验平台,以提供更具时效性的实验指标,从而帮助我们尽快发现这些问题。
可能出现的问题有:
图1-带有置信区间的实时实验指标
上图的面板显示了所选事件的实验组和对照组的流量(也就是动作数)和倾向(也就是unique user的数量)。自实验开始以来,这些计数已经累计了3天时间。如果在3天后发生了re-ramp(分配给实验组和对照组的用户数量增加),则计数会归零0并重新开始累计3天时间。
为了确保实验组与对照组之间的对比在统计上是有效的,我们做了一些统计检验。由于指标是实时交付的,因此每次按顺序收到新记录时,我们都必须进行这些检验。这需要与传统的固定视野检验不一样的方法,否则会带来较高的假正率。我们考虑过几种顺序测试方法,包括赌徒破产、贝叶斯A/B检验和Alpha消耗函数方法。为了保证数值稳定性,我们从t检验+ Boferroni校正(将我们的案例作为多次检验进行处理)开始,并为我们的初始实现预先确定了检验次数。
图2-实时实验管道的高阶设计
实时实验管道包括下列主要组件:
为了过滤Activation记录,此作业使用Flink的广播状态模式。每10秒检查一次“最近ramp的实验组”作业所发布的CSV的更改情况,并将其发布到一个KeyedBroadcastProcessFunction的所有分区上,该函数也消费Activation。
KeyedBroadcastProcessFunction将广播的CSV与Activation流结合在一起,就可以过滤掉那些最近3天内未ramp-up实验的Activation记录。此外,“group-ramp-up-time”已添加到Activation记录中,并插入“filtered_experiment_activations”kafka主题中。
图3-Scala对象被插入中间层Kafka主题中
图4-实时实验累积作业图
上面是实时累积(Aggregation)Flink作业的高阶概览。这里简单提及了一些operator,后文中还将详细介绍另一些operator。Source operator从Kafka读取数据,而sink使用一个REST接口写入我们的内部Analytics Store上。
删除重复事件→这里用一个KeyedProcessFunction实现,由(event.user_id,event.event_type,event.timestamp)作为key。这里的思想是,如果来自同一用户的相同事件类型的事件具有相同的时间戳,则它们是重复事件。第一个这样的事件被发送到下游,但也会缓存进状态持续5分钟时间。任何后续事件都将被丢弃。5分钟后,一个计时器会启动并清除状态。这里的假定是所有重复事件之间的间隔都在5分钟之内。
查找首次触发时间→这里是一个Flink KeyedProcessFunction,由(experiment_hash,experiment_group,user_id)作为key。这里的假设是,为一个用户收到的第一个实验Activation记录也是具有第一个触发时间的Activation。一个实验ramp-up以后,收到的第一个Activation将发送至下游,并保存为状态并持续3天时间(我们累积了实验组ramp-up以来为期3天的计数)。经过3天的ramp时间后,一个计时器将清除状态。
15分钟的processing时间tumbling窗口→事件进入并向下游发送结果时,Numerator Computer和Denominator computer都将累积计数。这意味着数百万条记录,但是我们不需要如此频繁地将结果发送到Analytics Store上。我们可以在processing时间内运行一个持续15分钟的Flink tumbling窗口,这样效率更高。对于Numerator Computer来说,这个窗口由(“experiment_hash”,“experiment_group”,“event_type”,“timestamp”)作为key。当窗口在15分钟后触发时,将获取带有max_users的记录并将其发送到下游的Analytics Store sink。
图5-通过用户ID连接Activation流与事件流
我们使用Flink的IntervalJoin operator实现流到流的连接。IntervalJoin会在接下来的3天内缓冲每位用户的单个Activation记录,并且所有匹配事件都将与Activation记录中的其他实验元数据一起发送到下游。
这种方法的局限性:
我们研究了Flink的IntervalJoin源代码。它会在“左侧缓冲区”中缓冲Activation 3天时间,但事件将被立即删除。目前似乎无法通过配置更改此行为。我们正在研究使用Flink的协同处理函数来实现这个Activation到事件的连接,该函数是用于流到流连接的更通用的函数。我们可以将事件缓冲X分钟,这样即使Activation流延迟了X分钟,管道也可以处理延迟而不会出现计数不足。这将帮助我们避免同一用户的两次连接,并能形成更加动态的管道,其可以立即感知到实验组的re-ramp,并支持更多动态行为,例如在组re-ramp时自动扩展累积的覆盖范围 。
图6-Join Results Deduplicator
Join Results Deduplicator是一个Flink KeyedProcessFunction,它由experiment_hash,experiment_group,event_type,user_id作为key。这个operator的主要目的是在向下游发送记录时插入“user_first_time_seen”标志——下游Numerator Computer使用这个标志来计算倾向编号(# unique users),而无需使用设置的数据结构。
这个operator将状态存储到last-ramp-time+ 3天,之后状态将被清除。
图7-Numerator Computer
Numerator Computer是一个KeyedProcessFunction,由experiment_hash,experiment_group,event_type作为key。它会在最后2小时内一直滚动15分钟的存储桶(bucket),每当有新记录进入时都会更新这些桶。对于流量来说,每个动作都很重要;因此对于每个事件,动作计数都会增加。对于倾向数字(unique user)——它取决于"first_time_seen”标志(仅在为true时递增)。
随着时间的流逝,存储桶会滚动/旋转。每次新事件进入时,存储桶数据都会向下游刷新到15分钟的tumbling窗口中。
它有一个时间为3天的计时器(从ramp-up时间→3天),可在触发后清除所有状态,这样就能在ramp-up3天后重置/清除计数,完成归零。
为了使我们的流管道具有容错能力,Flink的增量检查点和RocksDB状态后端被用来保存应用程序检查点。我们面临的一项有趣挑战是检查点失败。问题似乎在于检查点流程需要花费很长时间,并且最终会超时。我们还注意到,在发生检查点故障时通常也会有很高的背压。
图8-Flink UI中显示的检查点故障
在仔细检查了检查点故障的内部机制之后,我们发现超时是由于某些子任务未将确认发送给检查点协调器而导致的,整个检查点流程都卡住了,如下所示。
图9-子任务未发送确认
然后我们针对导致失败的根本原因应用了一些调试步骤:
原来子任务运行很正常,只是抽不出空来处理消息。结果,这个特定的子任务具有很高的背压,从而阻止了barrier通过。没有barrier的收据,检查点流程将无法进行。
在进一步检查所有子任务的Flink指标之后,我们发现其中一个子任务产生的消息数量比其对等任务多100倍。由于消息是通过user_id在子任务之间分区的,这表明有些用户产生的消息比其他用户多得多,这就意味着那是垃圾消息。临时查询我们的spam_adjusted数据集后也确认了这一结果。
图10-不同子任务的消息数
为了缓解该问题,我们在“过滤器事件作业”中应用了一个上限规则:对于一个小时内的用户,如果我们看到的消息多于X条,则仅发送前X条消息。应用上限规则后,检查点就不再出现故障了。
数据准确性对于实验指标的计算而言更为重要。为了确保我们的实时实验流程按预期运行,并始终提供准确的指标,我们启动了一个单独的每日工作流,其执行与流作业相同的计算,但使用的是临时方式。如果流作业结果违反以下任一条件,则会提醒开发人员:
通过查询实验元数据,我们分别在3种情况下对实验进行了验证:
这一流程如下所示:
图11-验证流程
在这一部分中,我们提供了一些基本统计信息,展示实时实验管道的规模:
Kafka主题名称 | 消息数/每秒 | MB/每秒 |
---|---|---|
experiment_activation | 2,513,006.863 | 1,873.295 |
event | 127,347.091 | 64.704 |
filted_experiment_activation | 876,906.711 | 88.237 |
filtered_backend_event | 9,478.253 | 0.768 |
实时实验分析是Pinterest在生产环境中的第一个基于Flink的应用程序。非常感谢我们的大数据平台团队(特别感谢Steven Bairos-Novak、Jooseong Kim和Ang Zhang)构建了Flink平台并将其作为服务提供出来。同时还要感谢Analytics Platform团队(Bo Sun)出色的可视化效果,Logging Platform团队提供实时数据提取,以及Data Science团队(Brian Karfunkel)提供的统计咨询!
原文链接:
https://www.ververica.com/blog/real-time-experiment-analytics-at-pinterest-using-apache-flink
领取专属 10元无门槛券
私享最新 技术干货