作者:孙小波
Kafka原生没有提供SAP HANA的Connector,GitHub开源项目Kafka Connectors for SAP提供了kafka与SAP之间的Connector,可实现定时全量或增量的拉取SAP HANA数据发送到Kafka。详细信息,参考GitHub:https://github.com/SAP/kafka-connect-sap/tree/master
Kafka版本:2.5.0.7.1.7.2013-1(cloudera) SAP HANA版本:HDB (ver. 2.00.048.04.1612945474) 测试以下两种情况:
需要准备两个jar包,SAP HANA的驱动和kafka-connect-sap项目打包。kafka-connect-sap选择对应的版本下载打包即可。或者直接在GitHub release中下载jar:https://github.com/SAP/kafka-connect-sap/releases。例如下载源码打包:
cd Downloads/kafka-connect-sap-master-2.8.1
sudo /Applications/IntelliJ\ IDEA.app/Contents/plugins/maven/lib/maven3/bin/mvn clean install -DskipTests -e
ll modules/scala_2.12/target
选择一台kafka客户端节点,将kafka-connector-hana_2.12-0.9.5-SNAPSHOT.jar放到kafka客户端节点的/var/lib/kafka/或者/opt/cloudera/parcels/CDH/lib/kafka/libs/目录下;SAP HANA驱动ngdbc-2.12.9.jar放在/opt/cloudera/parcels/CDH/lib/kafka/libs/下。
关于Kafka Connect Standalone的配置文件释义,可参考:https://kafka.apache.org/25/documentation.html#connect_configuring 关于Kafka Connectors for SAP的配置文件释义,可参考: https://github.com/SAP/kafka-connect-sap#configuration
worker-nokrb.properties: 非kerberos环境Kafka Connect Standalone模式的 启动配置文件,配置文件名称可以自定义命名。
bootstrap.servers=hqcncdptst03l:9192
config.storage.replication.factor=1
config.storage.topic=connect-configs
connect.prometheus.metrics.port=28186
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
metrics.jetty.server.port=28084
offset.flush.interval.ms=60000
offset.storage.replication.factor=1
offset.storage.topic=connect-offsets
plugin.path=/var/lib/kafka
rest.extension.classes=com.cloudera.dim.kafka.metrics.JmxJsonMetricsRestExtension
rest.port=28183
ssl.client.auth=none
status.storage.replication.factor=1
status.storage.topic=connect-status
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
offset.storage.file.filename=/opt/hana_test/hana_offset.txt
worker.properties:kerberos环境Kafka Connect Standalone模式的 启动配置文件,配置文件名称可以自定义命名。
bootstrap.servers=hqcncdptst01l:9092,hqcncdptst02l:9092,hqcncdptst03l:9092
config.storage.replication.factor=1
config.storage.topic=connect-configs
connect.prometheus.metrics.port=28096
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
metrics.jetty.server.port=28094
offset.flush.interval.ms=60000
offset.storage.replication.factor=1
offset.storage.topic=connect-offsets
plugin.path=/var/lib/kafka
rest.extension.classes=com.cloudera.dim.kafka.metrics.JmxJsonMetricsRestExtension
rest.port=28093
status.storage.replication.factor=1
status.storage.topic=connect-status
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
producer.acks = 1
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/opt/kafka-jaas/hive.keytab" \
principal="hive@HADOOP.COM";
sasl.mechanism = GSSAPI
sasl.kerberos.service.name = kafka
security.protocol = SASL_PLAINTEXT
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/opt/kafka-jaas/hive.keytab" \
principal="hive@HADOOP.COM";
producer.sasl.mechanism = GSSAPI
producer.sasl.kerberos.service.name = kafka
producer.security.protocol = SASL_PLAINTEXT
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/opt/kafka-jaas/hive.keytab" \
principal="hive@HADOOP.COM";
consumer.sasl.mechanism = GSSAPI
consumer.sasl.kerberos.service.name = kafka
consumer.security.protocol = SASL_PLAINTEXT
offset.storage.file.filename = /opt/hana_test/hana_offset.txt
hana_source_full.properties:全量拉取HANA数据的配置文件,配置文件名称可以自定义命名。
name=hana_test-source
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
topics=hana_test
connection.url=jdbc:sap://10.xxx.xxx.xxx:33015?encrypt=true&validateCertificate=false
connection.user=username
connection.password=xxxx
hana_test.table.name="BI_CONNECT"."MAT_SD_TEST_KAFKA"
hana_test.poll.interval.ms=60000
hana_source_incr.properties:增量拉取HANA数据的配置文件,配置文件名称可以自定义命名。
name=hana_test-incr-source
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
mode=incrementing
topics=hana_incr_test
connection.url=jdbc:sap://10.xxx.xxx.xxx:33015?encrypt=true&validateCertificate=false
connection.user=username
connection.password=xxxxx
hana_incr_test.table.name="BI_CONNECT"."MAT_SD_TEST_KAFKA"
hana_incr_test.poll.interval.ms=30000
hana_incr_test.incrementing.column.name=GROUP_ID
1.启动kafka connect-standalone任务:
/opt/cloudera/parcels/CDH/lib/kafka/bin/connect-standalone.sh worker-nokrb.properties hana_source_full.properties
每间隔60s全量拉取一次"BI_CONNECT"."MAT_SD_TEST_KAFKA"数据,发送到Kafka topic hana_test中。
2.查看SAP HANA数据
3.Kafka启动一个控制台消费者查看拉取数据情况
kafka-console-consumer --topic hana_test --from-beginning --bootstrap-server $(hostname):9192
1.启动kafka connect-standalone任务:
/opt/cloudera/parcels/CDH/lib/kafka/bin/connect-standalone.sh worker-nokrb.properties hana_source_incr.properties
2.HANA插入1条数据
3.Kafka启动一个控制台消费者查看拉取数据情况
kafka-console-consumer --topic hana_incr_test --from-beginning --bootstrap-server $(hostname):9192
1.启动kafka connect-standalone任务:
/opt/cloudera/parcels/CDH/lib/kafka/bin/connect-standalone.sh worker.properties hana_source_full.properties
每间隔60s全量拉取一次"BI_CONNECT"."MAT_SD_TEST_KAFKA"数据,发送到Kafka topic hana_test中。
2.查看SAP HANA数据
3.Kafka启动一个控制台消费者查看拉取数据情况
# 准备一个kafka jaas文件和client.properties文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka-jaas/jaas-hive.conf"
kafka-console-consumer --topic hana_test --from-beginning --bootstrap-server $(hostname):9092 --consumer.config client.properties
1.启动kafka connect-standalone任务:
/opt/cloudera/parcels/CDH/lib/kafka/bin/connect-standalone.sh worker.properties hana_source_incr.properties
2.HANA插入1条数据
INSERT INTO "BI_CONNECT"."MAT_SD_TEST_KAFKA" VALUES (1010,'xxx','message_10');
3.Kafka启动一个控制台消费者查看拉取数据情况
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka-jaas/jaas-hive.conf"
kafka-console-consumer --topic hana_incr_test --from-beginning --bootstrap-server $(hostname):9092 --consumer.config client.properties
在kerberos环境补充测试delete和update情况。
全量拉取模式下:
delete from "BI_CONNECT"."MAT_SD_TEST_KAFKA" where GROUP_ID = 1011
{"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"GROUP_ID"},{"type":"string","optional":true,"field":"U_NAME"},{"type":"string","optional":true,"field":"U_MESS"}],"optional":false,"name":"bi_connectmat_sd_test_kafka"},"payload":{"GROUP_ID":1010,"U_NAME":"xxx","U_MESS":"message_10"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"GROUP_ID"},{"type":"string","optional":true,"field":"U_NAME"},{"type":"string","optional":true,"field":"U_MESS"}],"optional":false,"name":"bi_connectmat_sd_test_kafka"},"payload":{"GROUP_ID":1012,"U_NAME":"xxx","U_MESS":"message_12_new"}}
数据结果中没有已删除的数据,说明删除的 数据没有发送到kafka。增量拉取模式下:
delete from "BI_CONNECT"."MAT_SD_TEST_KAFKA" where GROUP_ID = 1010
2.Kafka Connect Standalone任务后,查看Kafka consumer 删除的删除没有发送到Kafka topic
3.查看Kafka connect standalone任务输出日志
23/08/24 10:02:56 INFO querier.IncrColTableQuerier: 1
23/08/24 10:02:56 INFO hana.HANASourceTask: Closing this query for IncrColTableQuerier{name='"BI_CONNECT"."MAT_SD_TEST_KAFKA"', topic='hana_incr_test'}
23/08/24 10:02:56 INFO hana.HANASourceTask: No updates for IncrColTableQuerier{name='"BI_CONNECT"."MAT_SD_TEST_KAFKA"', topic='hana_incr_test'}
23/08/24 10:02:56 INFO hana.HANASourceTask: Start polling records from HANA
23/08/24 10:02:56 INFO hana.HANASourceTask: Waiting 30000 ms to poll from IncrColTableQuerier{name='"BI_CONNECT"."MAT_SD_TEST_KAFKA"', topic='hana_incr_test'}
日志显示没有数据更新。
全量拉取模式下:
1.在HANA中update一条数据
update "BI_CONNECT"."MAT_SD_TEST_KAFKA" set U_MESS = 'message_12_new' where GROUP_ID = 1012
2.Kafka Connect Standalone任务后,查看Kafka consumer
{"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"GROUP_ID"},{"type":"string","optional":true,"field":"U_NAME"},{"type":"string","optional":true,"field":"U_MESS"}],"optional":false,"name":"bi_connectmat_sd_test_kafka"},"payload":{"GROUP_ID":1012,"U_NAME":"xxx","U_MESS":"message_12_new"}}
HANA数据更改后,全量拉取发送到kafka。增量拉取模式下:
1.在HANA中更新两条数据
update "BI_CONNECT"."MAT_SD_TEST_KAFKA" set U_MESS = 'message_9_new' where GROUP_ID = 1009
update "BI_CONNECT"."MAT_SD_TEST_KAFKA" set GROUP_ID = 10088 where U_MESS = 'message_8'
分别更新GROUP_ID和U_MESS字段值。
2.Kafka Connect Standalone任务后,查看Kafka consumer
{"schema":{"type":"struct","fields":[{"type":"int64","optional":true,"field":"GROUP_ID"},{"type":"string","optional":true,"field":"U_NAME"},{"type":"string","optional":true,"field":"U_MESS"}],"optional":false,"name":"bi_connectmat_sd_test_kafka"},"payload":{"GROUP_ID":10088,"U_NAME":"xxx","U_MESS":"message_8"}}
结果显示,GROUP_ID更新的数据会拉取发送到Kafka,因为我们在hana_source_incr.properties配置文件中指定了参数hana_incr_test.incrementing.column.name=GROUP_ID,以GROUP_ID的变化来判断数据增量。所以,只有配置文件中指定了incrementing.column.name的column发生变化,才算是增量数据,才能发送到Kafka。
1.在全量拉取模式下,将通过指定的全量拉取间隔时间定期拉取全量数据发送到Kafka;数据始终以HANA查询出来的数据为准,未发生变化的数据和发生变化的数据,都会全量发送到Kafka topic。
2.在增量拉取模式下,需要指定HANA Table的一个column为增量列,无论该column是否为primary key以下结论都符合:
1.修改Kafka connect standalone启动脚本 启动脚本中指定了log4j配置文件,但实际不存在,会报错:
java.io.FileNotFoundException: /opt/cloudera/parcels/CDH/lib/kafka/bin/../config/connect-log4j.properties (No such file or directory
可以注释掉以下内容解决:
vim /opt/cloudera/parcels/CDH/lib/kafka/bin/connect-standalone.sh
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
fi
2.kafka connect standalone启动是有部分报错,例如:org.reflections.ReflectionsException: could not get type for name org.springframework.beans.factory.FactoryBean,可以忽略不管。
3.在Kafka connect standalone配置文件中,需要指定offset存文件地址。可以先创建一个空文件。
offset.storage.file.filename = /opt/hana_test/hana_offset.txt
4.kerberos环境下,Kafka connect standalone配置中,consumer和producer的认证类型和jaas配置需要分别以conmuser.xxx和producer.xxx单独指定。
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/opt/kafka-jaas/hive.keytab" \
principal="hive@HADOOP.COM";
producer.sasl.mechanism = GSSAPI
producer.sasl.kerberos.service.name = kafka
producer.security.protocol = SASL_PLAINTEXT
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
useKeyTab=true \
storeKey=true \
keyTab="/opt/kafka-jaas/hive.keytab" \
principal="hive@HADOOP.COM";
consumer.sasl.mechanism = GSSAPI
consumer.sasl.kerberos.service.name = kafka
consumer.security.protocol = SASL_PLAINTEXT