1.文档编写目的
在前面的文章Fayson介绍了关于StreamSets的一些文章《如何在CDH中安装和使用StreamSets》、《如何使用StreamSets从MySQL增量更新数据到Hive》、《如何使用StreamSets实现MySQL中变化数据实时写入Kudu》、《如何使用StreamSets实现MySQL中变化数据实时写入HBase》、《如何使用StreamSets实时采集Kafka并入库Kudu》和《如何使用StreamSets实时采集Kafka数据并写入Hive表》,本篇文章Fayson主要介绍如何使用StreamSets实时采集Kafka中嵌套的JSON数据并将采集的数据写入Hive,StreamSets的流程处理如下:
1.测试环境准备
2.配置StreamSets
3.创建Pipline及测试
4.总结
1.RedHat7.3
2.CM和CDH版本为cdh5.13.3
3.Kafka2.2.0(0.10.0)
4.StreamSets3.3.0
1.集群已启用Sentry
2.测试环境准备
1.准备测试的JSON数据
{
"school": 1,
"address": 2,
"no": "page",
"class": 3,
"students": [{
"name": "page1",
"teacher": "larry",
"age": 40
}, {
"name": "page2",
"teacher": "larry",
"age": 50
}, {
"name": "page3",
"teacher": "larry",
"age": 51
}]
}
(可左右滑动)
2.为sdc用户授权
由于集群已启用Sentry,所以这里需要为sdc用户授权,否则sdc用户无法向Hive库中创建表及写入数据
3.创建StreamSets的Pipline
1.登录StreamSets,创建一个kafka2hive_json的Pipline
2.在Pipline流程中添加Kafka Consumer作为源并配置Kafka基础信息
配置Kafka相关信息,如Broker、ZK、Group、Topic及Kerberos信息
配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON
3.添加JavaScript Evaluator模块,主要用于处理嵌套的JSON数据
编写JSON数据解析代码,将嵌套JSON解析为多个Record,传输给HiveMetadata
解析脚本如下:
for(var i = 0; i < records.length; i++) {
try {
var students = records[i].value['students'];
log.error("---------++++++++------" + students.length);
for(var j = 0; j< students.length; j++) {
log.info("============" + students[0].name)
var newRecord = sdcFunctions.createRecord(true);
var studentMap = sdcFunctions.createMap(true);
studentMap.no = records[i].value['no'];
studentMap.school = records[i].value['school'];
studentMap.class = records[i].value['class'];
studentMap.address = records[i].value['address'];
studentMap.name = students[j].name;
studentMap.teacher = students[j].teacher;
studentMap.age = students[j].age;
newRecord.value = studentMap;
log.info("-------------" + newRecord.value['school'])
output.write(newRecord);
}
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
(可左右滑动)
4.添加Hive Metadata中间处理模块,选择对应的CDH版本
配置Hive的JDBC信息
配置Hive的表信息,指定表名和库名
指定数据格式,指定为Avro,选项中有parquet格式,但在后续处理中并不支持parquet格式
5.添加Hadoop FS处理模块,主要用于将HiveMetadata的数据写入HDFS
配置Hadoop FS,配置HDFS URL和是否启用Kerberos认证
配置Hadoop FS的Out Files
注意:勾选“Directory in Header”使HDFS写入数据时使用上一步中Hive Metadata模块传递的目录,“Idle Timeout”主要是用于指定Hadoop FS模块空闲多久则将数据刷到HDFS数据目录。
配置Late Records参数,使用默认参数即可
指定写入到HDFS的数据格式
6.添加Hive Metastore模块,该模块主要用于向Hive库中创建表
配置Hive信息,JDBC访问URL
Hive Metastore的高级配置
7.点击校验流程,如下图所示则说明流程正常
到此为止完成了Kafka数据到Hive的流程配置。
4.流程测试验证
1.启动kafka2hive_json的Pipline,启动成功如下图显示
2.使用Kafka的Producer脚本向kafka_hive_topic生产消息
kafka-console-producer \
--topic kafka_hive_topic \
--broker-list cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092
(可左右滑动)
3.在StreamSets中查看kafka2hive_json的pipline运行情况
4.使用sdc用户登录Hue查看ods_user表数据
将嵌套的JSON数据解析为3条数据插入到ods_user表中。
5.总结
1.在使用StreamSets的Kafka Consumer模块接入Kafka嵌套的JSON数据后,无法直接将数据入库到Hive,需要将嵌套的JSON数据解析,这里可以使用Evaluator模块,StreamSets支持多种语言的Evaluator(如:JavaScprit、Jython、Groovy、Expression及Spark)。
2.由于集群启用了Sentry,StreamSets默认使用sdc用户访问Hive,在想Hive库中创建表时需要为sdc用户授权,否则会报权限异常。
3.在配置Hive的JDBC是,我们需要在JDBC URL后指定user和password否则会报匿名用户无权限访问的问题,注意必须带上password。
4.HDFS模块在接收到HiveMetadata模块的数据后生成的为临时文件,不是立即将数据写入到HDFS,可以通过“Idle Timeout”参数来控制刷新数据到HDFS的频率。