通过本文你可以 get 到:
根据微博目前站内词条消费情况,计算 top 50 消费热度词条,每分钟更新一次,并且按照列表展现给用户。
这类指标可以统一划分到 topN 类别的指标中。即输入是具体词条消费日志,输出是词条消费排行榜。
预期效果如下。
1
Flink DataStream api 实时计算topN热榜[1]
Flink SQL TopN语句[2]
Flink SQL 功能解密系列 —— 流式 TopN 挑战与实现[3]
我们需要制定自己的 flink sql 解决方案,以实现上述需求。这也是本节重点要讲述的内容,即在「数据建设篇-具体实现方案详述」详细展开。
首先,我们最初的方案是如下图所示,单机房的服务端,但是很明显基本没有高可用保障。我们本文主要介绍 flink sql 方案,所以下文先介绍 flink sql,后文 6.6 介绍各种高可用、高性能优化及保障。
2
从本节开始,正式介绍 flink sql 相关的方案设计。
我们会从以下三个角度去介绍:
数据源即安装在各位的手机微博客户端上报的用户消费明细日志,即用户消费一次某个词条,就会上报一条对应的日志。
字段名 | 备注 |
---|---|
user_id | 消费词条的用户 |
热搜词条_name | 消费词条名称 |
timestamp | 消费词条时间戳 |
... | ... |
最开始设计的 schema 如下:
字段名 | 字段类型 | 备注 |
---|---|---|
timestamp | bigint | 当前分钟词条时间戳 |
热搜词条_name | string | 词条名 |
rn | bigint | 排名 1 - 50 |
但是排名展示时,需要将这一分钟的前 50 名的数据全部查询到展示。而 flink 任务输出排名数据到外部存储时,保障前 50 名的词条数据事务性的输出(要么同时输出到数据服务中,要么一条也不输出)是一件比较复杂事情。所以我们索性将前 50 名的数据全部收集到同一条数据当中,时间戳最新的一条数据就是最新的结果数据。
重新设计的 schema 如下:
字段名 | 字段类型 | 备注 |
---|---|---|
timestamp | bigint | 当前分钟词条时间戳 |
热搜榜单 | string | 热搜榜单,schema 如 {"排名第一的词条1" : "排名第一的词条消费量", "排名第二的词条1" : "排名第二的词条消费量", "排名第三的词条1" : "排名第三的词条消费量"...} 前 50 名 |
INSERT INTO
target_db.target_table
SELECT
max(timestamp) AS timestamp,
热搜_top50_json(热搜词条_name, cnt) AS data -- 外层 udaf 将所有数据进行 merge
FROM
(
SELECT
热搜词条_name,
cnt,
timestamp,
row_number() over(
PARTITION by
热搜词条_name
ORDER BY
cnt ASC
) AS rn -- 内层 rownum 进行排名
FROM
(
SELECT
热搜词条_name,
count(1) AS cnt,
max(timestamp) AS timestamp
FROM
source_db.source_table
GROUP BY
热搜词条_name
-- 如果有热点词条导致数据倾斜,可以加一层打散层
)
)
WHERE
rn <= 100
GROUP BY
0;
topN 设计伪代码如下:
public class 热搜_top50_json extends AggregateFunction<Map<String, Long>, TopN<Pair<String, Long>>> {
@Override
public TopN<Pair<String, Long>> createAccumulator() {
// 创建 acc -> 最小堆实现的 Top 50
}
@Override
public String getValue(TopN<Pair<String, Long>> acc) {
// 1.将最小堆 acc 中列表数据拿到
// 2.然后将列表按照从大到小进行排序
// 3.产出结果数据
}
public void accumulate(TopN<Pair<String, Long>> acc, String 词条名称, long cnt) {
// 1.获取到当前最小堆中的最小值
// 如果当前词条的消费量 cnt 小于最小堆的堆顶
// 则直接进行过滤
// 2.如果最小堆中不存在当前词条
// 则直接将当前词条放入最小堆中
// 3.如果最小堆中已经存在当前词条存在
// 那么将最小堆中这个词条的消费 cnt 与
// 当前词条的 cnt 作比较,将大的那个放入最小堆中
}
public void retract(TopN<Pair<String, Long>> acc, String id, long cnt) {
// 不需要实现 retract 方法
// 由于 topn 具有特殊性:即我们只取每一个词条的最大值
// 进行排名,所以可以不需要实现 retract 方法
// 比较排名都在 accumulate 方法中已经实现完成
}
}
❝Notes:
❞
由于上述 sql 是在无限流上的操作,所以上游数据每更新一次都会向下游发送一次 retract 消息以及最新的数据的消息进行计算。
那么就会存在这样一个问题,即 source qps 为 x 时,任务内的吞吐就为 x * n 倍,sink qps 也为 x,这会导致性能大幅下降的同时也会导致输出结果数据量非常大。
而我们只需要每分钟更新一次结果即可,所以可以使用 flink sql 自带的 minibatch 参数来控制输出结果的频次。
minibatch 具体参考可参考下面两篇文章:
table.exec.mini-batch.enabled : true
-- minibatch 是下面两个任意一个符合条件就会起触发计算
-- 60s 一次
table.exec.mini-batch.allow-latency : 60 s
-- 数量达到 10000000000 触发一次
-- 设置为 10000000000 是为了让上面的 allow-latency 触发,每 60s 输出一次来满足我们的需求
table.exec.mini-batch.size : 10000000000
状态过期,如果不设置的话,词条状态会越来越大,对非高热词条进行清除。
http://apache-flink.147419.n8.nabble.com/Flink-sql-state-ttl-td10158.html
-- 设置 1 天的 ttl,如果一天过后
-- 这个词条还没有更新,则直接删除
table.exec.state.ttl : 86400 s
INSERT INTO target_db.target_table
SELECT
max(timestamp) AS timestamp,
-- udf 计算每一个分桶的前 100 名列表
热搜_top50_json(cast(热搜词条_name AS string), cnt) AS bucket_top100
FROM
(
SELECT
热搜词条_name AS 热搜词条_name,
count(1) AS cnt,
max(timestamp) AS timestamp
FROM
source_db.source_table
GROUP BY
热搜词条_name
-- 如果有热点词条导致数据倾斜
-- 可以加一层打散层
)
GROUP BY
0
-- 由于这里是 group by 0
-- 所以可能会到导致热点,所以如果需要也可以加一层打散层
-- 在内部先算 top50,在外层将内部分桶的 top50 榜单进行 merge
此 udf 与 方案1 的 udf(见 6.5.1.2.udf) 完全相同。
参数同 6.5.1.3 flink-conf.yaml 参数配置
异地双链路热备如下图:
2
可能会发现图中有异地机房,但是我们目前只画出了 A 地区机房的数据链路,B 地区机房还没有画全,接着我们一步一步将这个图进行补全。
❝「Notes:」 「异地双机房只是双链路的热备的一种案例。如果有同城双机房、双集群也可进行同样的服务部署。」 「为什么说异地机房的保障能力 > 同城异地机房 > 同城同机房双集群容灾能力?」 「同城同机房:只要这个机房挂了,即使你有两套链路也没救。」「同城异地机房:很小几率情况会同城异地两个机房都挂了。。」「异地机房:几乎不可能同时异地两个机房都被炸了。。。」 ❞
正常情况下如图所示:
2
当发生 A 地机房 webserver 宕机时,客户端自动切换上报日志至 B 地机房 webserver。如下图所示:
2
kafka 也相同。如下图所示:
2
flink 任务以 A 地机房做主链路,B 地机房启动相同的任务做热备双跑链路。
当 A 地机房 flink 任务宕机且无法恢复时,则 B 地机房的任务做热备替换。
正常情况下如图所示:
2
当 A 地机房 flink 任务宕机且无法恢复时,热备链路 flink 任务就可以顶上。如下图所示:
2
正常情况如下:
2
当 A 地 OLAP 或者 KV 存储挂了之后,webserver 可以自动切换至 B 地 OLAP 或者 KV 存储。如下图所示:
2
当 A 地 webserver 挂了之后,客户端可以自动拉取 B 地 webserver 数据,如下图所示:
2
根据我们上述设计的数据汇 schema 来看,最适合存储引擎就是 kv 引擎,因为前端只需要展示最新的排行榜数据即可。所以我们可以使用 redis 等 kv 存储引擎来存储最新的数据。
如果用户有需求需要记录上述数据的历史记录,我们也可以使用时序数据库或者 OLAP 引擎直接进行存储。
见下文。
数据质量保障篇楼主正在 gang...
{
"黄子韬 杨紫是我哥们": 1672825,
"延乔墓前的来信破防了": 1087416,
"孟子义 张翰同学站起来": 747703
// ...
}
1
我目前的一个想法就是将结果 schema 拍平。举例:
字段名 | 字段类型 | 备注 |
---|---|---|
timestamp | bigint | 当前分钟事件时间戳 |
热搜词条_1 | string | 第一名的热搜词条名称 |
热搜词条_2 | string | 第二名的热搜词条名称 |
热搜词条_3 | string | 第三名的热搜词条名称 |
热搜词条_4 | string | 第四名的热搜词条名称 |
热搜词条_5 | string | 第五名的热搜词条名称 |
... | ... | ... |
热搜词条_n | string | 第 n 名的词条名称 |
每一次输出都将目前每一个排名的数据产出。但是目前在 flink sql 的实现思路上不太明了。