Apache Flink 是一个开源的流处理框架,支持批处理和流处理。Flink SQL 是 Flink 提供的一个 SQL 接口,允许用户使用 SQL 语句来定义和执行数据处理作业。在 Flink SQL 中,数据流的起点称为“源”(Source),终点称为“汇”(Sink)。
源(Source):
汇(Sink):
以下是一个简单的 Flink SQL 作业示例,展示了如何定义源和汇:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQLExample {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(env);
// 注册 Kafka 源
tableEnv.executeSql("CREATE TABLE kafka_source (" +
" id INT, " +
" name STRING, " +
" event_time TIMESTAMP(3), " +
" WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka', " +
" 'topic' = 'test_topic', " +
" 'properties.bootstrap.servers' = 'localhost:9092', " +
" 'format' = 'json'" +
")");
// 注册 HDFS 汇
tableEnv.executeSql("CREATE TABLE hdfs_sink (" +
" id INT, " +
" name STRING, " +
" event_time TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'filesystem', " +
" 'path' = 'hdfs://localhost:9000/output/', " +
" 'format' = 'csv'" +
")");
// 执行 SQL 查询并将结果写入汇
tableEnv.executeSql("INSERT INTO hdfs_sink " +
"SELECT id, name, event_time FROM kafka_source");
}
}
通过上述示例代码和参考链接,您可以了解更多关于 Flink SQL 作业中源和汇的详细信息。
领取专属 10元无门槛券
手把手带您无忧上云