HDFS 数据源

最近更新时间:2024-09-29 17:41:33

我的收藏

支持版本

支持 HDFS 2.x、3.x 版本。

使用限制

离线读取

HDFS Reader 支持以下功能:
支持 TextFile、ORCFile、rcfile、sequence file、csv 和 parquet 格式的文件,且要求文件内容存放的是一张逻辑意义上的二维表。
支持多种类型数据读取(使用 String 表示),支持列裁剪,支持列常量。
支持递归读取、支持正则表达式 * 和 ?。
支持 ORCFile 数据压缩,目前支持 SNAPPY 和 ZLIB 两种压缩方式。
支持 SequenceFile 数据压缩,目前支持 lZO 压缩方式。
多个 File 可以支持并发读取。
CSV 类型支持压缩格式有 gzip、bz2、zip、lzo、lzo_deflate 和 snappy。
目前插件中 Hive 版本为2.3.7,Hadoop 版本为3.2.3。
说明:
HDFS Reader 暂不支持单个 File 多线程并发读取,此处涉及到单个 File 内部切分算法。

离线写入

使用 HDFS Writer 时,请注意以下事项:
1. 目前 HDFS Writer 仅支持 TextFile、ORCFile 和 ParquetFile 三种格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表。
2. 由于 HDFS 是文件系统,不存在 schema 的概念,因此不支持对部分列写入。

HDFS 离线单表读取节点配置




参数
说明
数据来源
选择当前项目中可用的 HDFS 数据源。
文件路径
文件系统的路径信息。路径支持使用‘*’作为通配符,指定通配符后将遍历多个文件信息。例如,指定/代表读取/目录下所有的文件,指定 /bazhen/ 代表读取 bazhen 目录下所有的文件。HDFS 目前只支持 * 和 ? 作为文件通配符,语法类似于通常的 Linux 命令行文件通配符。
文件类型
HDFS 支持四种文件类型:txt 、orc 、parquet 、csv。
txt:表示 TextFile 文件格式。
orc:表示 ORCFile 文件格式。
parquet:表示普通 Parquet 文件格式。
csv:表示普通 HDFS 文件格式(逻辑二维表)。
压缩格式
当 fileType(文件类型)为 csv 下的文件压缩方式,目前仅支持:none、deflate、gzip、bzip2、lz4、snappy。
由于 snappy 目前没有统一的 stream format,数据集成目前仅支持最主流的 hadoop-snappy(hadoop 上的 snappy stream format)和 framing-snappy(google 建议的 snappy stream format)。
ORC 文件类型下无需填写。
字段分隔符
读取的字段分隔符,HDFS 在读取 TextFile 数据时,需要指定字段分隔符,如果不指定默认为逗号(,)。HDFS 在读取 ORC File 时,您无需指定字段分割符。
其他可用分隔符:' \\t ' 、' \\u001 ' 、' | '、' 空格 ' 、 ' ;' ' , '。
如果您想将每一行作为目的端的一列,分隔符请使用行内容不存在的字符。例如,不可见字符 \\u0001。
编码
读取文件的编码配置。支持 utf8 和 gbk 两种编码。
空值转换
读取时,将指定字符串转为 null。
支持下拉选择或手动输入,下拉选择支持:空字符串、空格、\\n、\\0、null

HDFS 离线单表写入节点配置




参数
说明
数据去向
选择当前项目中可用的 HDFS 数据源。
同步方式
HDFS 支持两种同步方式:
数据同步:解析结构化数据内容,按字段关系进行数据内容映射与同步。
文件传输:不做内容解析传输整个文件,可应用于非结构化数据同步。
文件路径
文件系统的路径信息。路径支持使用‘*’作为通配符,指定通配符后将遍历多个文件信息。
写入模式
HDFS 支持三种写入模式:
append:写入前不做任何处理,直接使用 filename 写入,保证文件名不冲突
nonConflict:文件名重复时报错
overwrite:写入前清理以文件名为前缀的所有文件,例如,"fileName": "abc",将清理对应目录所有 abc 开头的文件。
文件类型
HDFS 支持四种文件类型:txt 、orc 、parquet 、csv。
txt:表示 TextFile 文件格式。
orc:表示 ORCFile 文件格式。
parquet:表示普通 Parquet 文件格式。
csv:表示普通 HDFS 文件格式(逻辑二维表)。
压缩格式
当 fileType(文件类型)为 csv 下的文件压缩方式,目前仅支持:none、deflate、gzip、bzip2、lz4、snappy。
由于 snappy 目前没有统一的 stream format,数据集成目前仅支持最主流的 hadoop-snappy(hadoop 上的 snappy stream format)和 framing-snappy(google 建议的 snappy stream format)。
ORC 文件类型下无需填写。
字段分隔符
HDFS 写入时的字段分隔符,需要您保证与创建的 HDFS 表的字段分隔符一致,否则无法在 HDFS 表中查到数据。可选:'\\t ' 、 ' \\u001 ' 、' | '、' 空格 ' 、',' 、';'、'\\u005E\\u0001\\u005E'。
空字符串处理
不做处理:写入时,不处理空字符串。 处理为 null:写入时,将空字符串处理为 null。
高级设置(选填)
可根据业务需求配置参数。

数据类型转换支持

读取

HDFS 读取支持的数据类型及转换对应关系如下(在处理 HDFS 时,会先将 HDFS 数据源的数据类型和数据处理引擎的数据类型做映射):
HDFS(Hive 表) 数据类型
内部类型
TINYINT,SMALLINT,INT,BIGINT
Long
FLOAT,DOUBLE
Double
String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY
String
BOOLEAN
Boolean
Date,TIMESTAMP
Date

写入

HDFS 读取支持的数据类型及转换对应关系如下:
内部类型
HDFS(Hive 表 )数据类型
Long
TINYINT,SMALLINT,INT,BIGINT
Double
FLOAT,DOUBLE
String
String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY
Boolean
BOOLEAN
Date
Date,TIMESTAMP

HDFS 脚本 Demo

如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数writer 参数
"job": {
"content": [
{
"reader": {
"parameter": {
"path": "/path/source.txt", //文件路径
"nullFormat": "null", //空值转换
"hadoopConfig": { //如果hadoop为高可用集群,需要配置NameNode高可用配置信息
"dfs.namenode.https-address.mycluster.nn2": "ip:4009",
"dfs.namenode.https-address.mycluster.nn1": "ip:4009",
"dfs.nameservices": "mycluster",
"dfs.ha.namenodes.mycluster": "nn1,nn2",
"dfs.namenode.servicerpc-address.mycluster.nn2": "ip:4010",
"dfs.namenode.servicerpc-address.mycluster.nn1": "ip:4010",
"fs.defaultFS": "hdfs://mycluster",
"dfs.namenode.rpc-address.mycluster.nn2": "ip:4007",
"dfs.namenode.rpc-address.mycluster.nn1": "ip:4007",
"dfs.namenode.http-address.mycluster.nn2": "ip:4008",
"dfs.namenode.http-address.mycluster.nn1": "ip:4008",
"dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
},
"column": [
{
"name": "null",
"index": 0,
"type": "string"
}
],
"defaultFS": "hdfs://ip:4007",
"skipHeader": "false", //是否跳过表头
"fieldDelimiter": ",", //字段分割符
"encoding": "utf-8", //编码方式
"fileType": "text" //文件类型
},
"name": "hdfsreader"
},
"transformer": [],
"writer": {
"parameter": {
"path": "/path/sink", //文件路径
"fileName": "sink_table", //文件名
"nullFormat": "null", //空值转换
"hadoopConfig": { //如果hadoop为高可用集群,需要配置NameNode高可用配置信息
"dfs.namenode.https-address.mycluster.nn2": "ip:4009",
"dfs.namenode.https-address.mycluster.nn1": "ip:4009",
"dfs.nameservices": "mycluster",
"dfs.ha.namenodes.mycluster": "nn1,nn2",
"dfs.namenode.servicerpc-address.mycluster.nn2": "ip:4010",
"dfs.namenode.servicerpc-address.mycluster.nn1": "ip:4010",
"fs.defaultFS": "hdfs://mycluster",
"dfs.namenode.rpc-address.mycluster.nn2": "ip:4007",
"dfs.namenode.rpc-address.mycluster.nn1": "ip:4007",
"dfs.namenode.http-address.mycluster.nn2": "ip:4008",
"dfs.namenode.http-address.mycluster.nn1": "ip:4008",
"dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
},
"column": [ //列名
{
"name": "1",
"type": "string"
}
],
"defaultFS": "hdfs://ip:4007",
"emptyAsNull": false, //空字符串是否处理,处理的话将空字符串处理为null
"writeMode": "append", //写入模式
"suffix": "txt", //文件后缀
"fieldDelimiter": ",", //字段分割符
"encoding": "utf-8", //编码方式
"fileType": "text" //文件类型
},
"name": "hdfswriter"
}
}
],
"setting": {
"errorLimit": {
"record": 0 //脏数据阈值
},
"speed": {
"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s
"channel": 1 //并发数量
}
}
}