更多趣文请关注一臻数据
❝不知道你是否遇到过这样的场景: 产品经理急匆匆跑来说"Doris数据怎么还没实时同步?" 老板突然要求"把所有数据实时展示!" 半夜被数据延迟告警吵醒... 别担心,一起来跟随新晋数据工程师小张的起飞经历,领略Flink Doris Connector的神奇魅力!
小张是一名刚入职的数据工程师。
在一个繁忙的周一早晨,他收到了一个紧急任务:业务部门需要实时查看各个销售渠道的订单数据。这个场景让他想起了漫威电影中的蚁人,能在微观和宏观世界自如穿梭。
而在Doris生态中,恰巧有一位类似的英雄 - Flink Doris Connector,它好比数据世界的"蚁人",能够在数据的源头和目的地之间自由穿行。
让我们通过一张简易的战斗装备图来了解这位数据英雄:
"数据,启动!"小张轻声念道。
在这个数字世界中,Flink Doris Connector好比一位全能型超级英雄,装备着各种超能力装备:
小张打开了版本选择器,发现这位"蚁人"有多个形态可供选择:
每个版本都像是这位"蚁人"的不同等级形态,能够适应不同阶级的战场需求。
在数据实时同步的战场上,Flink Doris Connector是一位身手敏捷的特工。
它能够悄无声息地潜入数据源,精准捕获每一个数据变更,然后以闪电般的速度将数据安全送达目的地Doris。无论是日常的数据同步任务,还是紧急的数据迁移需求,它都能完美完成任务。
"这简直太酷了!"小张兴奋地说道。有了这样一位数据英雄的帮助,他对接下来的任务充满了信心。
正如蚁人在量子领域穿梭自如,Flink Doris Connector在数据世界中也展现出同样的灵活性。
"蚁人"不仅能处理实时数据流,还能执行批量数据处理,甚至能够在两种模式之间自由切换,就像在不同维度间穿行的超级英雄。
"小张,周一的销售数据大屏怎么样了?" 产品经理焦急地询问。
"放心吧,已经部署上线了!" 小张自信地说。
这一周,他通过Flink Doris Connector的几大"秘技",成功解决了实时数据同步的难题。让我们一起来看看这位"蚁人"英雄的实战绝技。
首先来看整体作战流程图:
"产品说要实时看到订单数据,这可怎么办?"小张回忆起接到任务时的紧张。直到他发现了Flink CDC这个神器:
只需几行SQL,就能实现MySQL到Doris的实时同步:
-- enable checkpoint
SET'execution.checkpointing.interval' = '10s';
CREATETABLE cdc_mysql_source (
idint
,nameVARCHAR
,PRIMARY KEY (id) NOTENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'database',
'table-name' = 'table'
);
-- 支持同步 insert/update/delete 事件
CREATETABLE doris_sink (
idINT,
nameSTRING
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.enable-delete' = 'true', -- 同步删除事件
'sink.label-prefix' = 'doris_label'
);
insertinto doris_sink selectid,namefrom cdc_mysql_source;
"酷!数据像闪电一样快速同步过来了!" 小张兴奋地说。
数据同步搞定了,但产品经理又提出新需求:"能不能把用户的城市信息也显示出来?"
这时,Lookup Join技能派上了用场:
通过Lookup Join,可以将实时流的数据实时关联Doris城市维度信息:
CREATE TABLE fact_table (
`id`BIGINT,
`name`STRING,
`city`STRING,
`process_time`as proctime()
) WITH (
'connector' = 'kafka',
...
);
createtable dim_city(
`city`STRING,
`level`INT ,
`province`STRING,
`country`STRING
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'root',
'password' = ''
);
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFTJOIN dim_city FOR SYSTEM_TIME ASOF a.process_time AS c
ON a.city = c.city
"小张,老板说要把所有业务库的数据都全量+增量同步到Doris数仓,这个..."
"别担心,整库同步我也在行!" 小张笑着说。只见他快速敲下命令:
<FLINK_HOME>bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-24.0.1.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf port=3306 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=1
"看,整个数据库都在自动同步了,包括表结构、数据变更,都能自动处理!"
在经过一系列实战后,小张总结出了几点经验:
"滴滴滴..." 监控大屏上的数据开始实时滚动,产品经理露出了满意的笑容。
一个平静的下午,小张正在悠闲地喝着咖啡,突然...
"小张,救命!数据大屏卡住了!" 产品经理慌张地跑过来。
"别慌,这种情况我已经见过很多了。" 小张放下咖啡,打开了他的"蚁人装备优化手册"。
让我们看看小张是如何进行性能调优的:
"看,这是我的独门秘籍," 小张打开配置文件:
CREATE TABLE doris_sink (
id INT,
name STRING,
order_time TIMESTAMP
) WITH (
'connector' = 'doris',
'sink.enable.batch-mode' = 'true',
'sink.buffer-flush.max-rows' = '5000000',
'sink.buffer-flush.max-bytes' = '100MB',
'sink.buffer-flush.interval' = '10s',
'sink.properties.format' = 'json'
);
"不需要流模式ckp时,大规模数据通过攒批模式写入性能提升了N倍!" 小张自豪地说。
对于经常需要关联的维度表,小张使用了缓存神器:
CREATE TABLE dim_table (
id INT,
info STRING
) WITH (
'connector' = 'doris',
'lookup.cache.max-rows' = '100000',
'lookup.cache.ttl' = '60s',
'lookup.max-retries' = '3'
);
"缓存一开,查询速度嗖嗖的!但是要注意资源使用率,别负载了。"
突然,监控系统报警:
errCode = 2, detailMessage = Label [label_0_1] has already been used
"啊,这个我遇到过," 小张迅速打开他的问题速查手册翻找:
Label重复问题
# 异常日志:
errCode = 2, detailMessage =
Label [label_0_1] has already been used, relate to txn [19650]
-- 解决方案:更换label前缀
'sink.label-prefix' = 'timestamp_prefix'
事务超时问题
# 异常日志:
errCode = 2, detailMessage =
transaction [19650] not found
# FE配置调整
streaming_label_keep_max_second = 43200 # 12小时
并发写入限制
# 异常日志:
errCode = 2, detailMessage =
current running txns on db 10006 is 100, larger than limit 100
# 调整并发数
max_running_txn_num_per_db = 1000
...
小张的运维经验:
...
"搞定!" 小张轻松地靠在椅子上,"有了这些优化技巧,再大的数据量也不怕了。"
这时,办公室的同事们都围了过来:
"小张,你这招太厉害了!"
"能不能分享更多实战经验?"
"什么时候开个分享会呗?"
小张笑着说:"好好好...Doris生态的奥秘无穷无尽,我们一起探索,一起进步!"
看着数据大屏上流畅滚动的数据,小张知道,这只是Flink Doris Connector精彩故事的开始。接下来,还会有更多的挑战,更多的优化空间,等待着这位"蚁人"去探索和征服。
下期,我们将一起探讨其它更有趣有用有价值的内容,敬请期待!