Seatunnel
是一个非常易用,高性能、支持实时流式
和离线批处理
的海量数据处理产品,架构于Apache Spark
和 Apache Flink
之上,开源项目地址:https://github.com/apache/incubator-seatunnel
Seatunnel原名为Waterdrop,在更名之后正式孵化为Apache项目,同时对于两个名字也对应了不同的版本,Waterdrop指1.x版本,Seatunnel指2.x版本,对于1.x和2.x有以下区别:
关键功能 | 1.x | 2.x |
---|---|---|
支持spark | yes | yes |
支持flink | no | yes |
主要开发语言 | scala | java |
主要构建工具 | sbt | maven |
Apache Spark和Apache Flink对于分布式数据处理和流式数据处理来说是一个伟大的进步,但较高的使用门槛让数据处理人员需要学习spark和flink复杂的运行机制和api才能够使用的更加顺畅,为降低数据处理门槛,且让spark和flink变得更加易用,减少学习成本,加快分布式数据处理在生产环境的落地,Seatunnel应运而生。
关键功能 | Seatunnel | FlinkX | StreamX | DataX |
---|---|---|---|---|
spark是否支持 | yes | no | yes | no |
flink是否支持 | yes,高版本兼容性不好 | yes,高版本兼容性不好 | yes,高版本兼容性好 | no |
部署难度 | 轻松 | 中等 | 较难 | 容易 |
主要功能对比 | etl、数据同步 | 数据同步 | flink任务可视化部署 | 数据同步 |
输入
-> 转换
-> 输出
,对于更复杂的数据处理,实质上也是这几种行为的组合:
public interface Plugin<T> extends Serializable {
// 配置文件的key
String RESULT_TABLE_NAME = "result_table_name";
String SOURCE_TABLE_NAME = "source_table_name";
// 设置每个插件的config
void setConfig(Config config);
// 获取插件的配置
Config getConfig();
// 对于config的校验
CheckResult checkConfig();
// 插件前准备
void prepare(T prepareEnv);
}
trait BaseSparkSource[Data] extends BaseSource[SparkEnvironment] {
protected var config: Config = ConfigFactory.empty()
override def setConfig(config: Config): Unit = this.config = config
override def getConfig: Config = config
def getData(env: SparkEnvironment): Data;
}
trait BaseSparkTransform extends BaseTransform[SparkEnvironment] {
protected var config: Config = ConfigFactory.empty()
override def setConfig(config: Config): Unit = this.config = config
override def getConfig: Config = config
def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row];
}
trait BaseSparkSink[OUT] extends BaseSink[SparkEnvironment] {
protected var config: Config = ConfigFactory.empty()
override def setConfig(config: Config): Unit = this.config = config
override def getConfig: Config = config
def output(data: Dataset[Row], env: SparkEnvironment): OUT;
}
trait SparkStreamingSource[T] extends BaseSparkSource[DStream[T]] {
def beforeOutput(): Unit = {}
def afterOutput(): Unit = {}
def rdd2dataset(sparkSession: SparkSession, rdd: RDD[T]): Dataset[Row]
def start(env: SparkEnvironment, handler: Dataset[Row] => Unit): Unit = {
getData(env).foreachRDD(rdd => {
val dataset = rdd2dataset(env.getSparkSession, rdd)
handler(dataset)
})
}
}
public interface FlinkBatchSource<T> extends BaseFlinkSource {
DataSet<T> getData(FlinkEnvironment env);
}
public interface FlinkBatchTransform<IN, OUT> extends BaseFlinkTransform {
DataSet<OUT> processBatch(FlinkEnvironment env, DataSet<IN> data);
}
public interface FlinkBatchSink<IN, OUT> extends BaseFlinkSink {
DataSink<OUT> outputBatch(FlinkEnvironment env, DataSet<IN> inDataSet);
}
public interface FlinkStreamSource<T> extends BaseFlinkSource {
DataStream<T> getData(FlinkEnvironment env);
}
public interface FlinkStreamTransform<IN, OUT> extends BaseFlinkTransform {
DataStream<OUT> processStream(FlinkEnvironment env, DataStream<IN> dataStream);
}
public interface FlinkStreamSink<IN, OUT> extends BaseFlinkSink {
DataStreamSink<OUT> outputStream(FlinkEnvironment env, DataStream<IN> dataStream);
}
SPI全称Service Provider Interface,是Java提供的一套用来被第三方实现或者扩展的接口,它可以用来启用框架扩展和替换组件,SPI的作用就是为这些被扩展的API寻找服务实现
API-(Application Programming Interface
)大多数情况下,都是实现方
制定接口并完成对接口的实现,调用方
仅仅依赖接口调用,且无权选择不同实现。从使用人员上来说,API 直接被应用开发人员使用,SPI-(Service Provider Interface
)是调用方
来制定接口规范,提供给外部来实现调用方
选择自己需要的外部实现。从使用人员上来说,SPI 被框架扩展人员使用
package com.tyrantlucifer;
public interface Animal {
void shut();
}
package com.tyrantlucifer;
import java.util.ServiceLoader;
public class Main {
public static void main(String[] args) {
ServiceLoader<Animal> services = ServiceLoader.load(Animal.class);
for (Animal service : services) {
service.shut();
}
}
}
package com.tyrantlucifer;
public class Cat implements Animal {
public void shut() {
System.out.println("cat shut miao miao!!!");
}
}
package com.tyrantlucifer;
public class Dog implements Animal{
public void shut() {
System.out.println("dog shut wang wang!!!");
}
}
注册spi,需要在resources/META-INF/services下新建以接口全类名的文件,比如我们这次的接口com.tyrantlucifer.Animal
,那么就新建一个com.tyrantlucifer.Animal
文件,并在文件中添加自己的实现类:
com.tyrantlucifer.Cat
com.tyrantlucifer.Dog
spark {
spark.streaming.batchDuration = 5
spark.app.name = "seatunnel"
spark.ui.port = 13000
}
input {
socketStream {}
}
filter {
split {
fields = ["msg", "name"]
delimiter = ","
}
}
output {
stdout {}
}
env {
execution.parallelism = 1
}
source {
SocketStream{
result_table_name = "fake"
field_name = "info"
}
}
transform {
Split{
separator = "#"
fields = ["name","age"]
}
sql {
sql = "select * from (select info,split(info) as info_row from fake) t1"
}
}
sink {
ConsoleSink {}
}
class MyStdout extends BaseOutput {
var config: Config = ConfigFactory.empty()
/**
* Set Config.
* */
override def setConfig(config: Config): Unit = {
this.config = config
}
/**
* Get Config.
* */
override def getConfig(): Config = {
this.config
}
override def checkConfig(): (Boolean, String) = {
if (!config.hasPath("limit") || (config.hasPath("limit") && config.getInt("limit") >= -1)) {
(true, "")
} else {
(false, "please specify [limit] as Number[-1, " + Int.MaxValue + "]")
}
}
override def prepare(spark: SparkSession): Unit = {
super.prepare(spark)
val defaultConfig = ConfigFactory.parseMap(
Map(
"limit" -> 100,
"format" -> "plain" // plain | json | schema
)
)
config = config.withFallback(defaultConfig)
}
override def process(df: Dataset[Row]): Unit = {
val limit = config.getInt("limit")
var format = config.getString("format")
if (config.hasPath("serializer")) {
format = config.getString("serializer")
}
format match {
case "plain" => {
if (limit == -1) {
df.show(Int.MaxValue, false)
} else if (limit > 0) {
df.show(limit, false)
}
}
case "json" => {
if (limit == -1) {
df.toJSON.take(Int.MaxValue).foreach(s => println(s))
} else if (limit > 0) {
df.toJSON.take(limit).foreach(s => println(s))
}
}
case "schema" => {
df.printSchema()
}
}
}
}
本文分享自 Tyrant Lucifer 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!