第5章 Flume高级之自定义MySQLSource 5.1 自定义Source说明 Source是负责接收数据到Flume Agent的组件。...如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource。...官方也提供了自定义source的接口: 官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source 5.3 自定义MySQLSource组成...5.2 自定义MySQLSource步骤 根据官方说明自定义MySqlSource需要继承AbstractSource类并实现Configurable和PollableSource接口。...数据库 CREATE DATABASE mysqlsource; 在MySqlSource数据库下创建数据表Student和元数据表Flume_meta CREATE TABLE `student` (
这篇文章我们讲解的是如何自定义MySQLSource。 1....自定义MySQLSource组成 ? 3....自定义MySQLSource步骤 根据官方说明自定义mysqlsource需要继承AbstractSource类并实现Configurable和PollableSource接口。...创建mysqlsource数据库 // 登录mysql [bigdata@hadoop002 flume]$ mysql -uroot -p000000 mysql> CREATE DATABASE mysqlsource...; mysql> use mysqlsource; 2.
dependency> 第三步,编写 Flink 代码 代码如下: package test; import com.ververica.cdc.connectors.mysql.source.MySqlSource...debeziumProperties = new Properties(); debeziumProperties.put("decimal.handling.mode", "String"); MySqlSource... mySqlSource = MySqlSource....env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ck"); env .fromSource(mySqlSource...1 for sink to keep message ordering env.execute("Print MySQL Snapshot + Binlog"); } } MySqlSource
3.1.2 编写代码 import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource...DebeziumSourceFunction mysqlSource = MySQLSource....自定义反序列化器 import com.alibaba.fastjson.JSONObject; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource...env.setParallelism(1); //2.创建Flink-MySQL-CDC的Source DebeziumSourceFunction mysqlSource...= MySQLSource.
DatastreamRestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("es-ip", 9200, "http")));MySqlSource... mySqlSource = MySqlSource....StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(3000); DataStream input = env.fromSource(mySqlSource
plugins> 代码 package com.wud.cdc2; import com.ververica.cdc.connectors.mysql.MySqlSource...的访问用户名 System.setProperty("HADOOP_USER_NAME","hdfs"); DebeziumSourceFunction mySqlSource...= MySqlSource....= MySqlSource....tableEnv = StreamTableEnvironment.create(env); String sourceSql = "CREATE TABLE IF NOT EXISTS mySqlSource
HADOOP_USER_NAME", "hadoop"); //4 获取mysql的数据源 DebeziumSourceFunction sourceFunction = MySqlSource...configuration property) to use a more specifc time zone value if you want to utilize time zone support.解决办法:MySqlSource
SQL Hints:SELECT * FROM source_table /+ OPTIONS('server-id'='123456') / ; 通过Stream ApI的 创建source时设置:MySQLSource.builder...com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource...public static void main(String[] args) throws Exception { SourceFunction sourceFunction = MySQLSource
姓名,年龄>> DataStreamSource>> configDS = env.addSource(new MySQLSource...String, String, String, Integer, String, Integer>> out) throws Exception { //value就是MySQLSource... isRunning = false; } } /** * > */ public static class MySQLSource...MySource()); DataStream>> userInfoDS = env.addSource(new MySQLSource...* 2.用户信息流(配置流/规则流): > 用户的详细信息 * > */ public static class MySQLSource
数据清洗 /** * 自定义MySQL数据源 */ public class MySQLSource extends RichParallelSourceFunction<Tuple2<String...}); DataStreamSource> mysqlData = env.addSource(new MySQLSource...{RichParallelSourceFunction, SourceFunction} class MySQLSource extends RichParallelSourceFunction[(String....filter(_._1 == "E") .map(x => (x._2,x._3,x._4.toLong)) val mysqlData = env.addSource(new MySQLSource...}); DataStreamSource> mysqlData = env.addSource(new MySQLSource
年龄>> DataStreamSource>> configSource = env.addSource(new MySQLSource...} } } //value就是MySQLSource...isRunning = false; } } /** * > */ public static class MySQLSource
deserializationSchema = new StringDebeziumDeserializationSchema(); final DebeziumSourceFunction sourceFunction = MySQLSource.builder
com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource...public static void main(String[] args) throws Exception { SourceFunction sourceFunction = MySQLSource
Exception { SourceFunction sourceFunction = MySQLSource
第5章 Flume高级之自定义MySQLSource 5.1 自定义Source说明 Source是负责接收数据到Flume Agent的组件。...如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource。 ...官方也提供了自定义source的接口: 官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source 5.2 自定义MySQLSource...5.3 自定义MySQLSource步骤 根据官方说明自定义MySqlSource需要继承AbstractSource类并实现Configurable和PollableSource接口。 ...数据库 CREATE DATABASE mysqlsource; 2) 在MySqlSource数据库下创建数据表Student和元数据表Flume_meta CREATE TABLE `student
首先创建 Source 和 Sink(对应的依赖引用,在文末) SourceFunction sourceFunction = MySQLSource....org.apache.flink.streaming.api.functions.source.SourceFunction; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource...static void main(String[] args) throws Exception { SourceFunction sourceFunction = MySQLSource
} }); //TODO 4.使用FlinkCDC消费配置表并处理成 广播流 SourceFunction mySqlSource...= MySqlSource....initial() latest .build(); DataStream tbProcessDStream = env.addSource(mySqlSource
(MySqlSource.java:174) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java...(MySqlSource.java:174) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java...(MySqlSource.java:174) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java...(MySqlSource.java:174) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java...(MySqlSource.java:174) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java
四、新建 FlinkCDC 的 DataStream 项目 import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions...//2.通过 FlinkCDC 构建 SourceFunction 并读取数据 DebeziumSourceFunction sourceFunction = MySQLSource
StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStream studentDS = env.addSource(new MySQLSource...Integer id; private String name; private Integer age; } public static class MySQLSource
领取专属 10元无门槛券
手把手带您无忧上云