Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flink Mysql CDC 统计处理

Flink Mysql CDC 统计处理

原创
作者头像
平常心
修改于 2021-08-16 10:12:20
修改于 2021-08-16 10:12:20
4.4K10
代码可运行
举报
文章被收录于专栏:个人总结系列个人总结系列
运行总次数:0
代码可运行

1.环境准备

1.1 mysql 开启binlog

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30

1.2 flink的cdc依赖

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>

说明: 该依赖已经内置了debezium进行处理mysql 变更数据并发送了,所以我们不需要额外的方式,简化了异常 mysql → debezium → kafka的这种方式和数据流程。

2.代码开发

2.1 数据库和表准备

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
CREATE TABLE t_students (
    `id` BIGINT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
    `name` VARCHAR(24)  DEFAULT NULL,
    `age` INT(4) DEFAULT NULL,
    `create_time` DATETIME NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    `update_time` DATETIME NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
    PRIMARY KEY (`id`) USING BTREE
)
;

2.2 flink代码编写

2.2.1 stream api方式

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(1);
 
final DebeziumDeserializationSchema deserializationSchema = new StringDebeziumDeserializationSchema();
 
final DebeziumSourceFunction sourceFunction = MySQLSource.builder()
         .hostname("127.0.0.1").port(3306)
         .databaseList("flink_cdc")
         .username("root")
         .password("123456")
         .deserializer(deserializationSchema)
         .build();
 
env.addSource(sourceFunction).setParallelism(1).print();

说明:这种不是很方便,数据解析也比较麻烦

2.2.2 table api 方式

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
final EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        TableEnvironment tabEnv = TableEnvironment.create(settings);
        String ddlMysql = "CREATE TABLE mysql_binlog (id INT NOT NULL, " +
                "name STRING, " +
                "age INT, " +
                "create_time STRING, " +
                "update_time STRING " +
                ") " +
                "WITH ('connector' = 'mysql-cdc', " +
                "'hostname' = '127.0.0.1', " +
                "'port' = '3306', " +
                "'username' = 'root', " +
                "'password' = '123456', " +
                "'database-name' = 'flink_cdc', " +
                "'table-name' = 't_students')\n"
                ;
        tabEnv.executeSql(ddlMysql);
 
//        String sink = "CREATE TABLE sink_table (id INT NOT NULL, " +
//                "name STRING, " +
//                "age INT, " +
//                "create_time STRING, " +
//                "update_time STRING " +
//                ") " +
//                "WITH ('connector' = 'print')"
//                ;
//        tabEnv.executeSql(sink);
//
//        String dml = "INSERT INTO sink_table SELECT id, name, age, create_time, update_time FROM mysql_binlog";
//
//        final TableResult result = tabEnv.executeSql(dml);
 
        final TableResult result = tabEnv.executeSql("select * from mysql_binlog");
        result.print();

说明:cdc 最开始发起人是 吴邪,所以是通过table api的方式处理的,目前已经代码实现了很多对应的逻辑处理,方便使用和统计。

3.效果展示

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
1 条评论
热度
最新
set global slave_net_timeout = 120;set global thread_pool_idle_timeout = 120;set global interactive_timeout=604800;set global wait_timeout=604822;
set global slave_net_timeout = 120;set global thread_pool_idle_timeout = 120;set global interactive_timeout=604800;set global wait_timeout=604822;
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
Flink + Debezium CDC 实现原理及代码实战
Debezium 是一个分布式平台,它将现有的数据库转换为事件流,应用程序消费事件流,就可以知道数据库中的每一个行级更改,并立即做出响应。
kk大数据
2020/12/29
8.1K0
Flink + Debezium CDC 实现原理及代码实战
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
Apache Flink 提供了两种关系型 API 用于统一流和批处理,Table 和 SQL API。
公众号:大数据羊说
2022/04/04
3.4K0
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
如何利用 Flink CDC 实现数据增量备份到 Clickhouse
首先什么是CDC ?它是Change Data Capture的缩写,即变更数据捕捉的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等操作。
麒思妙想
2021/07/19
4.7K0
Flink系列 - 实时数仓之FlinkCDC实现动态分流实战
自从Flink出了FlinkCDC之后,我们对数据库日志的采集就变得方便了许多了,除去了MaxWell、Cannel、OGG等第三方组件的繁琐配置,目前实现CDC有两种方式:HQL实现 和 DataStreamAPI实现(推荐)。
数据仓库践行者
2021/12/06
3K1
Flink系列 - 实时数仓之FlinkCDC实现动态分流实战
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
公众号:大数据羊说
2022/04/04
6.3K0
(中)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
Flink CDC 原理、实践和优化
CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。
KyleMeow
2021/03/15
26.1K0
Flink CDC 原理、实践和优化
Flink CDC 和 kafka 进行多源合并和下游同步更新
摘要:本文介绍了 Flink CDC 利用 Kafka 进行 CDC 多源合并和下游同步更新的实践分享。内容包括:
文末丶
2022/02/10
3.2K0
Flink CDC 和 kafka 进行多源合并和下游同步更新
Flink CDC 与Hudi整合
之前写过Flink CDC sink 到 Iceberg中,本篇主要实践如何CDC到hudi中.
awwewwbbb
2022/05/09
1.2K0
Flink CDC 与Hudi整合
Flink CDC 2.0 数据处理流程全面解析
8月份 FlinkCDC 发布2.0.0版本,相较于1.0版本,在全量读取阶段支持分布式读取、支持checkpoint,且在全量 + 增量读取的过程在不锁表的情况下保障数据一致性。
王知无-import_bigdata
2021/11/10
3.2K1
Flink CDC 2.0 数据处理流程全面解析
Flink CDC 原理及生产实践
MySQL CDC连接器允许从MySQL数据库读取快照数据和增量数据。本文档根据官网翻译了如何设置MySQL CDC连接器以对MySQL数据库运行SQL查询。
王知无-import_bigdata
2021/01/06
3.5K0
Flink CDC 原理及生产实践
实时数仓:基于 Flink CDC 实现 Oracle 数据实时更新到 Kudu
作者:于乐,腾讯 CSIG 工程师 解决方案描述 概述 Flink CDC 于 2021 年 11 月 15 日发布了最新版本 2.1,该版本通过引入内置 Debezium 组件,增加了对 Oracle 的支持。本方案主要对 flink-connector-oracle-cdc进行试用。首先在本地对 Oracle CDC 进行调试通过,然后结合腾讯云产品流计算 Oceanus、EMR(Kudu)实现了 Oracle-Oceanus-Kudu 一体化解决方案,其中并无复杂的业务逻辑实现(这里进行最简单的数据转
腾讯云大数据
2022/02/17
3.9K0
《一文读懂腾讯云Flink CDC 原理、实践和优化》
CDC 变更数据捕获技术可以将源数据库的增量变动记录,同步到一个或多个数据目的。本文基于腾讯云 Oceanus 提供的 Flink CDC 引擎,着重介绍 Flink 在变更数据捕获技术中的应用。 一、CDC 是什么? CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。 例如对于电商平台,用户的订单
腾讯产业互联网学堂1
2023/05/29
3.3K0
《一文读懂腾讯云Flink CDC 原理、实践和优化》
Flink CDC 2.2.1 + Flink 1.13 开发一个简单的CDC项目
本文将演示如何使用 Flink DataStream API 开发一个 Flink CDC 应用。
大数据学习指南
2022/05/26
6K0
Flink CDC 2.2.1 + Flink 1.13 开发一个简单的CDC项目
Flink CDC 2.0原理详解和生产实践
CDC 的全称是 Change Data Capture ,在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向 数据库的变更,是一种用于捕获数据库中数据变更的技术。
王知无-import_bigdata
2022/04/13
4.2K0
Flink CDC 2.0原理详解和生产实践
Flink SQL 知其所以然(二十六):2w 字详述 Join 操作(大威天龙)
可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。
公众号:大数据羊说
2022/07/07
2.4K0
Flink SQL 知其所以然(二十六):2w 字详述 Join 操作(大威天龙)
【BUG】Flink CDC 2.0.0迷之异常!!!
一、场景还原 基于 Flink CDC 的 SQL Api 实现实时监听 MySQL 的 binlog 数据发送到 Kafka 二、框架版本 框架 版本 Flink 1.13.2 MySQL 5.7.25 connector-mysql-cdc 2.0.0 三、测试代码 public class CDCWithSqlTest { public static void main(String[] args) { StreamExecutionEnvironment env
857技术社区
2022/05/17
2.2K0
Apache Flink CDC简介与使用
Flink在1.11版本中新增了CDC的特性,简称 改变数据捕获。名称来看有点乱,我们先从之前的数据架构来看CDC的内容。
CainGao
2020/11/13
9.4K0
Apache Flink CDC简介与使用
Flink connecton for gbase8c
上一次发文,好像还是上一次,鸽了这么久,开始还是有一些心理负担的,但是时间长了,好像就坦然了一些,但问题终究还是要面对的,所以今天我来了。。。
麒思妙想
2022/11/11
4950
Flink connecton for gbase8c
Flink CDC我吃定了耶稣也留不住他!| Flink CDC线上问题小盘点
如果你对Flink CDC 还没有什么概念,可以参考这里:Flink CDC 原理及生产实践。
王知无-import_bigdata
2021/07/30
2.6K0
【Flink实时数仓】需求一:用户属性维表处理-Flink CDC 连接 MySQL 至 Hbase 实验及报错分析
这个代码是sql-client.sh中成功读取到MySQL插入的数据,此时在MySQL中对数据的操作即可通过FlinkCDC连接展示在此界面。
火之高兴
2024/07/25
2020
【Flink实时数仓】需求一:用户属性维表处理-Flink CDC 连接 MySQL 至 Hbase 实验及报错分析
推荐阅读
相关推荐
Flink + Debezium CDC 实现原理及代码实战
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验