Kafka Connect是一个用于Kafka与外部系统之间高可靠的、可扩展的流数据传输工具。它使得我们能够简单快速的定义数据集合在Kafka与外部系统之间输入输出。Kafka Connect可以从数据库或者应用程序服务器中手机数据指标到Kafka的topic中,以便数据进行低延迟的数据处理。一个实现了导出功能的Connect可以将数据从Kafka中导出到外部存储系统、查询系统或者批处理系统进行离线分析。
Kafka Connect包括如下特性:
提供了一个通用的Connectors开发框架
支持分布式模式或者单机模式
支持REST接口
自动offset管理
分布式并且可扩展
支持流处理和批处理
Kafka Connect功能示意图
运行Kafka Connect
支持两种运行模式:standalone模式(单线程)和分布式
standalone模式
standalone模式中,所有的工作都在单个线程中完成。一般情况下这种模式适合于只有单节点工作的情况,但是这并不能做到Kafka Connect的高容错性,如果进程down掉则没有替代的进程来完成后续的工作。那么如何启动一个standalone进程呢:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
connect-standalone.properties中配置了worker的相关信息,包括连接参数、系列化格式、和提交offset的时间间隔等,第二个配置文件则指定了connect的相关信息,是source还是sink,输入的源或者输出的目标。
分布式模式
分布式模式和standalone模式有点区别,我们先来看看启动命令:
> bin/connect-distributed.sh config/connect-distributed.properties
这样我们相当于启动了一个Connect服务,Kafka Connect的本意就是打算以服务的方式运行,所以它提供了REST API来管理connectors,分布式模式启动之后,我们就可以通过REST服务进行job管理了。
现在我们来尝试创建一个connector:
POST方式发送到:http://192.168.0.181:8083/connectors
返回:
{
"name":"local-file-source",
"config": {
"connector.class":"FileStreamSource",
"file":"test.txt",
"tasks.max":"1",
"name":"local-file-source-name",
"topic":"connect-test"
},
"tasks": [
{
"connector":"local-file-source",
"task":
}
],
"type":"source"
}
GET方式获取connectors列表:
[
"local-file-source"
]
现在我们向文件中增加一行数据:
echo "nowcccsadfasdfasdfadvvc" >> test.txt
在connect-test消费端可以发现接收到了一条新纪录:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"nowcccsadfasdfasdfadvvc"}
Kafka Connect的内容就讲解到这里,实际上Connector是可以直接定制的,我们可以通过继承SourceConnector、SinkConnector、SourceTask和SinkTask来实现我们所需要的功能。
领取专属 10元无门槛券
私享最新 技术干货