支持版本
支持 HDFS 2.x、3.x 版本。
使用限制
离线读取
HDFS Reader 支持以下功能:
支持 TextFile、ORCFile、rcfile、sequence file、csv 和 parquet 格式的文件,且要求文件内容存放的是一张逻辑意义上的二维表。
支持多种类型数据读取(使用 String 表示),支持列裁剪,支持列常量。
支持递归读取、支持正则表达式 * 和 ?。
支持 ORCFile 数据压缩,目前支持 SNAPPY 和 ZLIB 两种压缩方式。
支持 SequenceFile 数据压缩,目前支持 lZO 压缩方式。
多个 File 可以支持并发读取。
CSV 类型支持压缩格式有 gzip、bz2、zip、lzo、lzo_deflate 和 snappy。
目前插件中 Hive 版本为2.3.7,Hadoop 版本为3.2.3。
说明:
HDFS Reader 暂不支持单个 File 多线程并发读取,此处涉及到单个 File 内部切分算法。
离线写入
使用 HDFS Writer 时,请注意以下事项:
1. 目前 HDFS Writer 仅支持 TextFile、ORCFile 和 ParquetFile 三种格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表。
2. 由于 HDFS 是文件系统,不存在 schema 的概念,因此不支持对部分列写入。
HDFS 离线单表读取节点配置
参数 | 说明 |
数据来源 | 选择当前项目中可用的 HDFS 数据源。 |
文件路径 | 文件系统的路径信息。路径支持使用‘*’作为通配符,指定通配符后将遍历多个文件信息。例如,指定/代表读取/目录下所有的文件,指定 /bazhen/ 代表读取 bazhen 目录下所有的文件。HDFS 目前只支持 * 和 ? 作为文件通配符,语法类似于通常的 Linux 命令行文件通配符。 |
文件类型 | HDFS 支持四种文件类型:txt 、orc 、parquet 、csv。 txt:表示 TextFile 文件格式。 orc:表示 ORCFile 文件格式。 parquet:表示普通 Parquet 文件格式。 csv:表示普通 HDFS 文件格式(逻辑二维表)。 |
压缩格式 | 当 fileType(文件类型)为 csv 下的文件压缩方式,目前仅支持:none、deflate、gzip、bzip2、lz4、snappy。 由于 snappy 目前没有统一的 stream format,数据集成目前仅支持最主流的 hadoop-snappy(hadoop 上的 snappy stream format)和 framing-snappy(google 建议的 snappy stream format)。 ORC 文件类型下无需填写。 |
字段分隔符 | 读取的字段分隔符,HDFS 在读取 TextFile 数据时,需要指定字段分隔符,如果不指定默认为逗号(,)。HDFS 在读取 ORC File 时,您无需指定字段分割符。 其他可用分隔符:' \\t ' 、' \\u001 ' 、' | '、' 空格 ' 、 ' ;' ' , '。 如果您想将每一行作为目的端的一列,分隔符请使用行内容不存在的字符。例如,不可见字符 \\u0001。 |
编码 | 读取文件的编码配置。支持 utf8 和 gbk 两种编码。 |
空值转换 | 读取时,将指定字符串转为 null。 支持下拉选择或手动输入,下拉选择支持:空字符串、空格、\\n、\\0、null |
HDFS 离线单表写入节点配置
参数 | 说明 |
数据去向 | 选择当前项目中可用的 HDFS 数据源。 |
同步方式 | HDFS 支持两种同步方式: 数据同步:解析结构化数据内容,按字段关系进行数据内容映射与同步。 文件传输:不做内容解析传输整个文件,可应用于非结构化数据同步。 |
文件路径 | 文件系统的路径信息。路径支持使用‘*’作为通配符,指定通配符后将遍历多个文件信息。 |
写入模式 | HDFS 支持三种写入模式: append:写入前不做任何处理,直接使用 filename 写入,保证文件名不冲突 nonConflict:文件名重复时报错 overwrite:写入前清理以文件名为前缀的所有文件,例如,"fileName": "abc",将清理对应目录所有 abc 开头的文件。 |
文件类型 | HDFS 支持四种文件类型:txt 、orc 、parquet 、csv。 txt:表示 TextFile 文件格式。 orc:表示 ORCFile 文件格式。 parquet:表示普通 Parquet 文件格式。 csv:表示普通 HDFS 文件格式(逻辑二维表)。 |
压缩格式 | 当 fileType(文件类型)为 csv 下的文件压缩方式,目前仅支持:none、deflate、gzip、bzip2、lz4、snappy。 由于 snappy 目前没有统一的 stream format,数据集成目前仅支持最主流的 hadoop-snappy(hadoop 上的 snappy stream format)和 framing-snappy(google 建议的 snappy stream format)。 ORC 文件类型下无需填写。 |
字段分隔符 | HDFS 写入时的字段分隔符,需要您保证与创建的 HDFS 表的字段分隔符一致,否则无法在 HDFS 表中查到数据。可选:'\\t ' 、 ' \\u001 ' 、' | '、' 空格 ' 、',' 、';'、'\\u005E\\u0001\\u005E'。 |
空字符串处理 | 不做处理:写入时,不处理空字符串。
处理为 null:写入时,将空字符串处理为 null。 |
高级设置(选填) | 可根据业务需求配置参数。 |
数据类型转换支持
读取
HDFS 读取支持的数据类型及转换对应关系如下(在处理 HDFS 时,会先将 HDFS 数据源的数据类型和数据处理引擎的数据类型做映射):
HDFS(Hive 表) 数据类型 | 内部类型 |
TINYINT,SMALLINT,INT,BIGINT | Long |
FLOAT,DOUBLE | Double |
String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY | String |
BOOLEAN | Boolean |
Date,TIMESTAMP | Date |
写入
HDFS 读取支持的数据类型及转换对应关系如下:
内部类型 | HDFS(Hive 表 )数据类型 |
Long | TINYINT,SMALLINT,INT,BIGINT |
Double | FLOAT,DOUBLE |
String | String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY |
Boolean | BOOLEAN |
Date | Date,TIMESTAMP |
HDFS 脚本 Demo
如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数和 writer 参数。
"job": {"content": [{"reader": {"parameter": {"path": "/path/source.txt", //文件路径"nullFormat": "null", //空值转换"hadoopConfig": { //如果hadoop为高可用集群,需要配置NameNode高可用配置信息"dfs.namenode.https-address.mycluster.nn2": "ip:4009","dfs.namenode.https-address.mycluster.nn1": "ip:4009","dfs.nameservices": "mycluster","dfs.ha.namenodes.mycluster": "nn1,nn2","dfs.namenode.servicerpc-address.mycluster.nn2": "ip:4010","dfs.namenode.servicerpc-address.mycluster.nn1": "ip:4010","fs.defaultFS": "hdfs://mycluster","dfs.namenode.rpc-address.mycluster.nn2": "ip:4007","dfs.namenode.rpc-address.mycluster.nn1": "ip:4007","dfs.namenode.http-address.mycluster.nn2": "ip:4008","dfs.namenode.http-address.mycluster.nn1": "ip:4008","dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",},"column": [{"name": "null","index": 0,"type": "string"}],"defaultFS": "hdfs://ip:4007","skipHeader": "false", //是否跳过表头"fieldDelimiter": ",", //字段分割符"encoding": "utf-8", //编码方式"fileType": "text" //文件类型},"name": "hdfsreader"},"transformer": [],"writer": {"parameter": {"path": "/path/sink", //文件路径"fileName": "sink_table", //文件名"nullFormat": "null", //空值转换"hadoopConfig": { //如果hadoop为高可用集群,需要配置NameNode高可用配置信息"dfs.namenode.https-address.mycluster.nn2": "ip:4009","dfs.namenode.https-address.mycluster.nn1": "ip:4009","dfs.nameservices": "mycluster","dfs.ha.namenodes.mycluster": "nn1,nn2","dfs.namenode.servicerpc-address.mycluster.nn2": "ip:4010","dfs.namenode.servicerpc-address.mycluster.nn1": "ip:4010","fs.defaultFS": "hdfs://mycluster","dfs.namenode.rpc-address.mycluster.nn2": "ip:4007","dfs.namenode.rpc-address.mycluster.nn1": "ip:4007","dfs.namenode.http-address.mycluster.nn2": "ip:4008","dfs.namenode.http-address.mycluster.nn1": "ip:4008","dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",},"column": [ //列名{"name": "1","type": "string"}],"defaultFS": "hdfs://ip:4007","emptyAsNull": false, //空字符串是否处理,处理的话将空字符串处理为null"writeMode": "append", //写入模式"suffix": "txt", //文件后缀"fieldDelimiter": ",", //字段分割符"encoding": "utf-8", //编码方式"fileType": "text" //文件类型},"name": "hdfswriter"}}],"setting": {"errorLimit": {"record": 0 //脏数据阈值},"speed": {"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s"channel": 1 //并发数量}}}