使用 flink(table sql)+kafka+mysql 实现一个简单的 demo
plugins {
id 'java'
id "com.github.johnrengelman.shadow" version "6.1.0"
}
group 'io.github.earthchen'
version '1.0-SNAPSHOT'
repositories {
mavenLocal()
maven { url "http://maven.aliyun.com/nexus/content/groups/public/" }
maven { url "http://maven.aliyun.com/nexus/content/repositories/jcenter" }
maven { url 'https://jitpack.io' }
mavenCentral()
}
def conditionDependencies = [
"org.apache.flink:flink-table:${flinkVersion}",
"org.apache.flink:flink-table-api-java:${flinkVersion}",
"org.apache.flink:flink-table-api-java-bridge_${scalaBinaryVersion}:${flinkVersion}",
// "org.apache.flink:flink-streaming-scala_${scalaBinaryVersion}:${flinkVersion}",
"org.apache.flink:flink-table-planner-blink_${scalaBinaryVersion}:${flinkVersion}",
"org.apache.flink:flink-table-planner_${scalaBinaryVersion}:${flinkVersion}",
"org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}",
// "ch.qos.logback:logback-core:1.2.3",
// "ch.qos.logback:logback-classic:1.2.3"
]
def prod = System.getProperty("prod") ?: false;
dependencies {
// https://mvnrepository.com/artifact/mysql/mysql-connector-java
implementation group: 'mysql', name: 'mysql-connector-java', version: '8.0.19'
// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc
implementation group: 'org.apache.flink', name: 'flink-connector-jdbc_2.11', version: '1.12.0'
// https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
implementation group: 'org.apache.flink', name: 'flink-sql-connector-kafka_2.11', version: '1.12.0'
// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
implementation group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.12.0'
// https://mvnrepository.com/artifact/org.apache.flink/flink-csv
implementation group: 'org.apache.flink', name: 'flink-json', version: '1.12.0'
print("----当前prod=${prod}-------")
if (prod) {
compileOnly(conditionDependencies)
} else {
implementation(conditionDependencies)
}
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
testImplementation("org.hamcrest:hamcrest-all:1.3")
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
}
test {
useJUnitPlatform()
}
jar {
manifest {
attributes "Main-Class": "io.github.earthchen.json.KafkaJsonMain"
}
}
configurations.compile.dependencies.remove dependencies.gradleApi()
shadowJar {
mergeServiceFiles()
}
CREATE TABLE KafkaTable
(
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
)
WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = '127.0.0.1:31090',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
CREATE TABLE kafka_sink_table
(
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/test',
'username'= 'root'
'password'= '123456'
'table-name' = 'kafka_sink_table'
);
create table kafka_sink_table
(
id int auto_increment
primary key,
user_id bigint null,
item_id bigint null,
behavior varchar(256) null
);
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/**
* @author earthchen
* @date 2021/5/26
**/
public class KafkaJsonMain {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
// ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE kafka_source (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING\n" +
// " `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior1',\n" +
" 'properties.bootstrap.servers' = '127.0.0.1:9000'," +
"\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'json.fail-on-missing-field' = 'false'," +
" 'json.ignore-parse-errors' = 'true'," +
" 'format' = 'json'\n" +
")");
Table kafkaJsonSource = tEnv.from("kafka_source");
tEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print')\n" +
"LIKE kafka_source (EXCLUDING ALL)");
tEnv.executeSql("CREATE TABLE kafka_sink_table\n" +
"(\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING\n" +
")\n" +
"WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://127.0.0.1:3306/test',\n" +
" 'username'= 'root',\n" +
" 'password'= 'Root$%^7',\n" +
" 'table-name' = 'kafka_sink_table'\n" +
")");
// tEnv.executeSql("select * from kafka_source").print();
kafkaJsonSource.select($("user_id"),
$("item_id"),
$("behavior")).executeInsert("kafka_sink_table");
}
}
{"user_id":111,"item_id":111,"behavior":{"aaa":"aaaaa","bbb":"aaaa222"}}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。