前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >编码方式实现Split Distinct Aggregation功能

编码方式实现Split Distinct Aggregation功能

作者头像
Flink实战剖析
发布2022-04-18 13:38:58
4450
发布2022-04-18 13:38:58
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析
前言

去重指标作为业务分析里面的一个重要指标,不管是在OLAP存储引擎还是计算引擎都对其实现做了大量工作,在面对不同的数据量、指标精确性要求,都有不同的实现方式,但是总体都逃脱不了硬算、两阶段方式、bitmap、hll等这些实现。本文将分析Split Distinct Aggregation实现原理与使用代码方式实现其功能。

Split Distinct Aggregation

如果要使用Sql去实现一个去重功能,通常会这样实现:

代码语言:javascript
复制
SELECT day, COUNT(DISTINCT user_id) FROM T  GROUP BY day --sql1

或者

代码语言:javascript
复制
select day,count(*) from(
     select distinct user_id,day from T ) a
group by day     --sql2

在之前的去重系列中SQL方式去重中也对这两种实现方式进行了分析,但是这两种方式都未解决计算热点问题,例如当某一个day 对应的devId 特别大的情况下,那么计算压力都会到该day所在的task,使这个task成为任务的性能瓶颈。

Split Distinct Aggregation是从Flink-1.9版本开始提供的一个对去重的优化功能,该功能必须在Blink planner下并且配置:

代码语言:javascript
复制
val tEnv: TableEnvironment = ...
tEnv.getConfig.getConfiguration .setString("table.optimizer.distinct-agg.split.enabled", "true")

那么sql1 在其内部会转换为

代码语言:javascript
复制
SELECT day, SUM(cnt)
FROM (
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

MOD(HASH_CODE(user_id), 1024) 表示对取user_id的hashCode然后对1024取余,也就是将user_id划分到1024个桶里面去,那么里层通过对day与桶编号进行去重(cnt)外层只需要对cnt执行sum操作即可,因为分桶操作限制了相同的user_id 一定会在相同的桶里面,执行效果图如下:

我们也通过tabEnv.explain方式打印执行计划验证一下是否是真的这样执行:

代码语言:javascript
复制
Stage 5 : Operator                                          
content : Calc(select=[status, devId, (HASH_CODE(devId) MOD 1024) AS $f2])

Stage 7 : Operator                                        
content : GroupAggregate(groupBy=[status, $f2], partialFinalType=[PARTIAL], select=[status, $f2, COUNT(DISTINCT devId) AS $f2_0])
ship_strategy : HASH

Stage 9 : Operator                                         
content : GroupAggregate(groupBy=[status], partialFinalType=[FINAL], select=[status, $SUM0_RETRACT($f2_0) AS $f1])
ship_strategy : HAS

Stage 5 中执行分桶操作,Stage 7分桶之后去重操作,Stage 9 最终的sum操作。

使用代码方式实现

在去重系列中实现了使用MapState去重方式,仍然在此基础上来完成Split Distinct Aggregation功能,其业务场景是实时计算广告位访客数,流量数据id(广告位ID)、devId(访问ID)、time(访问时间),实现思路:

•首先通过对id、设备id分桶编号、小时级别时间分组,使用一个ProcessFunction计算分桶后的去重数(与MapState方式相同)•然后通过对id、小时级别时间分组,使用另一个ProcessFunction做sum操作,但是这里面需要注意的一个问题是对于相同id与时间其数据可能会来源于上游不同的task,而上游的每个task的数据都会以全量一直往下发送,如果直接做累加操作会导致重复计算,因此得实现一个类似于sql中retract撤回机制(可参考Flink SQL中可撤回机制解密),也就是上一个ProcessFunction每发送一条数据都需要先将之前的数据发送一份表示其为撤回。

接下来看具体的代码实现,数据结构:

代码语言:javascript
复制
--流量数据
case class AdData(id:Int,devId:String,time:Long)  
--第一次keyBy数据
case class AdKey1(id:Int,time:Long,bucketCode:Int) 
--第二次keyBy数据  
case class AdKey2(id:Int,time:Long)

去重实现Distinct1ProcessFunction:

代码语言:javascript
复制
class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey1, AdData, Tuple2[Boolean, Tuple3[Int, Long, Long]]] {
  var devIdState: MapState[String, Int] = _
  var devIdStateDesc: MapStateDescriptor[String, Int] = _
  var countState: ValueState[Long] = _
  var countStateDesc: ValueStateDescriptor[Long] = _

  override def open(parameters: Configuration): Unit = {

    devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))
    devIdState = getRuntimeContext.getMapState(devIdStateDesc)
    countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))
    countState = getRuntimeContext.getState(countStateDesc)
  }
  override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey1, AdData, Tuple2[Boolean, Tuple3[Int, Long, Long]]]#Context, out: Collector[Tuple2[Boolean, Tuple3[Int, Long, Long]]]): Unit = {

    val devId = value.devId
    devIdState.get(devId) match {
      case 1 => {
        //表示已经存在
      }
      case _ => {
        //表示不存在
        devIdState.put(devId, 1)
        val c = countState.value()
        val currV = c + 1
        countState.update(currV)

        if (currV > 1) {
          --认为大于1的需要执行撤回
          out.collect(Tuple2.apply(false, Tuple3.apply(ctx.getCurrentKey.id, ctx.getCurrentKey.time, c)))
          out.collect(Tuple2.apply(true, Tuple3.apply(ctx.getCurrentKey.id, ctx.getCurrentKey.time, currV)))
        } else {
          out.collect(Tuple2.apply(true, Tuple3.apply(ctx.getCurrentKey.id, ctx.getCurrentKey.time, currV)))
        }
      }
    }
  }
}

撤回实现同样使用boolean标识,false表示为撤回数据,true表示正常insert的数据。

聚合实现Distinct2ProcessFunction:

代码语言:javascript
复制
class Distinct2ProcessFunction extends KeyedProcessFunction[Tuple2[Int, Long], Tuple2[Boolean, Tuple3[Int, Long, Long]], Void] {
  var cntState: ValueState[Long] = _
  var cntStateDesc: ValueStateDescriptor[Long] = _

  override def open(parameters: Configuration): Unit = {
    cntStateDesc = new ValueStateDescriptor[Long]("distinctValue", TypeInformation.of(classOf[Long]))
    cntState = getRuntimeContext.getState(cntStateDesc)
  }

  override def processElement(value: (Boolean, (Int, Long, Long)), ctx: KeyedProcessFunction[(Int, Long), (Boolean, (Int, Long, Long)), Void]#Context, out: Collector[Void]): Unit = {
    val currV = cntState.value()
    value._1 match {
      case true => {
        cntState.update(currV + value._2._3
        println(ctx.getCurrentKey + ":" + cntState.value())
      }
      case false => {
        --撤回操作
        cntState.update(currV - value._2._3)
        println(ctx.getCurrentKey + ":" + cntState.value())
      }
    }
  }
}

重点在于如果收到编码为false 的数据,那么需要从当前计数里面减掉撤回的计数值。

主流程:

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment

val kafkaConfig = new Properties()
kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test1")
val consumer = new FlinkKafkaConsumer[String]("topic1", new SimpleStringSchema, kafkaConfig)
val ds = env.addSource(consumer)
            .map(x => {
                val s = x.split(",")
                AdData(s(0).toInt, s(1), s(2).toLong)
          }).keyBy(x => {
               val endTime = TimeWindow.getWindowStartWithOffset(x.time, 0,
               Time.hours(1).toMilliseconds) + Time.hours(1).toMilliseconds
               AdKey1(x.id, endTime, x.devId.hashCode % 3)
          }).process(new Distinct1ProcessFunction)
            .keyBy(x => {
                Tuple2.apply(x._2._1, x._2._2)
          }).process(new Distinct2ProcessFunction)
env.execute()
总结

Split Distinct Aggregation是去重计算在数据倾斜的情况下的优化的一种思路,类似于两阶段聚合,第一阶段执行打散操作,第二阶段执行累加操作,这是一种通用的优化思路,而对于使用代码方式实现其重点在于第一阶段到第二阶段的撤回思路避免数据的重复计算

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-02-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Split Distinct Aggregation
  • 使用代码方式实现
    • 总结
    相关产品与服务
    流计算 Oceanus
    流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档