SDK 数据写入

最近更新时间:2024-11-13 16:24:32

我的收藏

应用场景

DLC 支持通过 java sdk 将源数据库的增量变更同步到 DLC 原生表,完成源数据入湖。

前置条件

正确开通 DLC,已完成用户权限配置,开通托管存储。
正确创建 DLC 数据库。
正确配置 DLC 数据库数据优化,详细配置请参见开启数据优化

操作步骤

步骤1:下载依赖 Jar

请手动下载依赖 SDK jar 包 flink-sql-connect-kafka-1.15.4.jar。

步骤2:编写 java 代码

package org.example;// 建表 create table ingest_stream(id int,name string); public class RealtimeStreamDemo { public static void main(String[] args) throws Exception { if (args.length != 5) { System.out.println("input arguments: jdbcUrl, username, password"); System.exit(1); } String jdbcUrl = args[0]; String username = args[1]; String password = args[2]; String schema = args[3]; String table = args[4]; ClickZettaClient client = ClickZettaClient.newBuilder().url(jdbcUrl).username(username).password(password).build(); Options options = Options.builder().build(); RowStream stream = client.newRealtimeStreamBuilder() .operate(RowStream.RealTimeOperate.APPEND_ONLY) .options(options) .schema(schema) .table(table) .build(); for (int t = 0; t < 1000; t++) { Row row = stream.createRow(Stream.Operator.INSERT); row.setValue("id",t); row.setValue("name", String.valueOf(t)); stream.apply(row); } // 只有调用 flush 之后数据才可见 ((RealtimeStream) stream).flush(); // 必须调用 stream close接口,close 时会隐含执行 flush stream.close(); client.close(); } }
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.gotocompany</groupId> <artifactId>depot</artifactId> <version>0.9.1</version> </dependency> <dependency> <groupId>org.aeonbits.owner</groupId> <artifactId>owner</artifactId> <version>1.0.9</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>

步骤3:DLC 新建目标表

新建目标表详情可参见 DLC 原生表操作配置

步骤4:执行 SDK 程序

登录 DLC 控制台,单击数据探索,查询目标表数据。