使用限制
FTP Reader 实现了从远程 FTP 文件读取数据并转为数据同步协议的功能,远程 FTP 文件本身是无结构化数据存储。目前 FTP Reader 支持的功能如下所示:
支持 | 不支持 |
支持且仅支持读取 TXT 的文件,并要求 TXT 中的 schema 为一张二维表。 支持类 CSV 格式文件,自定义分隔符。 支持多种类型数据读取(使用 STRING 表示)、支持列裁剪和列常量。 支持递归读取、支持文件名过滤。 支持文本压缩,现有压缩格式为 gzip、bzip2、zip、lzo 和 lzo_deflate。 多个 File 可以支持并发读取。 | 单个 File 多线程并发读取,此处涉及到单个 File 内部切分算法。 单个 File 在压缩情况下,从技术上无法支持多线程并发读取。 |
FTP Writer 实现了从数据集成协议转为 FTP 文件功能,FTP 文件本身是无结构化数据存储。目前 FTP Writer 支持的功能如下:
支持 | 不支持 |
支持且仅支持写入文本类型(不支持 BLOB,如视频数据)的文件,且要求文本中 schema 为一张二维表。 支持类 CSV 和 TEXT 格式的文件,自定义分隔符。 写出时不支持文本压缩。 支持多线程写入,每个线程写入不同子文件。 | 单个文件不能支持并发写入。 FTP 本身不提供数据类型,FTP Writer 均将数据以 STRING类型写入 FTP 文件。 |
FTP 离线单表读取节点配置
参数 | 说明 |
数据来源 | 选择当前项目中可用的 FTP 数据源。 |
同步方式 | FTP 支持两种同步方式: 数据同步:解析结构化数据内容,按字段关系进行数据内容映射与同步。 文件传输:不做内容解析传输整个文件,可应用于非结构化数据同步。 |
文件路径 | 远程 FTP 文件系统的路径和文件名信息,需要填写包含路径和文件后缀的完整文件路径和文件名。这里可以支持填写多个路径。 当指定单个远程 FTP 文件,FTP 暂时只能使用单线程进行数据抽取。后期会在非压缩文件情况下针对单个 File 进行多线程并发读取。 当指定多个远程 FTP 文件,FTP 支持使用多线程进行数据抽取。线程并发数通过通道数指定。 当指定通配符,FTP 尝试遍历出多个文件信息。例如,指定/代表读取/目录下所有的文件,指定 /bazhen/ 代表读取 bazhen 目录下所有的文件。FTP 目前仅支持星号(*)作为文件通配符,并支持使用调度参数配合调度,灵活配置文件名与文件路径。 |
文件类型 | FTP 支持四种文件类型:txt 、orc 、parquet 、csv。 txt:表示 TextFile 文件格式。 orc:表示 ORCFile 文件格式。 parquet:表示普通 Parquet 文件格式。 csv:表示普通 HDFS 文件格式(逻辑二维表)。 |
字段分隔符 | 读取的字段分隔符,FTP 在读取数据时,需要指定字段分隔符,如果不指定会默认为(,),界面配置也会默认填写(,)。 |
编码 | 读取文件的编码配置。支持 utf8 和 gbk 两种编码。 |
空值转换 | 读取时,将指定字符串转为 null。 |
文本压缩类型 | 支持无压缩、zip、gzip、bzip2 |
跳过表头 | 否:读取时,不跳过表头。 是:读取时,跳过表头。 |
高级设置(选填) | 可根据业务需求配置参数。 |
关于文件路径说明:
通常不建议您使用星号(*),易导致任务运行报 JVM 内存溢出的错误。
数据同步会将一个作业下同步的所有 Text File 视作同一张数据表。您必须自己保证所有的 File 能够适配同一套 Schema 信息。
您必须保证读取文件为类 CSV 格式,并且提供给数据同步系统权限可读。
如果 Path 指定的路径下没有符合匹配的文件抽取,同步任务将报错。
FTP 离线单表写入节点配置
参数 | 说明 |
数据去向 | 选择当前项目中可用的 FTP 数据源。 |
文件路径 | 文件系统的路径信息。路径支持使用‘*’作为通配符,指定通配符后将遍历多个文件信息。 |
文件名称 | 写入的文件名称,该文件名会添加随机的后缀作为实际写入名称。 |
写入模式 | FTP 支持三种写入模式: append:写入前不做任何处理,直接使用 filename 写入,保证文件名不冲突 。 nonConflict:文件名重复时报错 。 overwrite:写入前清理以文件名为前缀的所有文件。 |
字段分隔符 | 写入的字段分隔符。FTP 写入时的字段分隔符,需要您保证与创建的 FTP 表的字段分隔符一致,否则无法在 FTP 表中查到数据。可选:' \\t ' 、' \\u001 ' 、' | '、' 空格 ' 、 ' ;' ' , '。 |
编码 | 写入文件的编码配置。支持 utf8 和 gbk 两种编码。 |
空值转换 | 写入时,将 null 转为指定字符串。 |
是否包含表头 | 否:写入时,不跳过表头。 是:写入时,跳过表头。 |
高级设置(选填) | 可根据业务需求配置参数。 |
数据类型转换支持
FTP 实现了读取和写入 FTP 双向通道的功能,远程 FTP 文件本身是无结构化数据存储,数据处理引擎在读取和写入时自动转换为 Bytes 类型。
常见问题
1. ftp 写入任务报错:请确认...拥有目录 ls 权限, errorMessage:connect timed out
问题原因:
集成连接 FTP 暂仅支持被动模式,可能是 FTP 服务配置未开启被动模式。
客户端连接到 FTP 服务器的21端口,发送用户名和密码登录,登录成功后要 list 列表或者读取数据时,使用的是另外的端口(1024以上),可能是传输数据端口未开放。
解决方案:
确认 FTP 服务器配置:
1. 是否开启了被动模式
# 配置文件pasv_enable=YES #开启被动模式pasv_min_port=${number} # 被动模式最小端口pasv_max_port=${number} # 被动模式最大端口
2. 服务端是否开启上述范围的端口。
在 pod 中安装 lftp 命令:
# 登录213 拷贝apk包到pod中:cd /data/home/ryanrliaokube ${资源组}kubectl cp ftp/lftp-4.8.3-r2.apk -n ${资源组}/${pod名称}:/data/wedata/runner# 进入pod后安装:sudo suapk add --allow-untrusted --no-network lftp-4.8.3-r2.apk# 使用命令连接lftp -u ${用户名},'${密码}' -p ${端口} ${ip}# 使用主动模式lftp -e "set ftp:passive-mode 0" -u ${用户名},'${密码}' -p ${端口} ${ip}# 也可以用来连接sftplftp sftp://${用户名}:${密码}@${ip}:${端口}
FTP 脚本 Demo
如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数和 writer 参数。
"job": {"syncType": "data", //同步方式,支持数据同步和文件传输"content": [{"reader": {"parameter": {"nullFormat": "", //空值转换"compress": "zip", //文件压缩类型"column": ["*"],"skipHeader": "false", //是否跳过表头"fieldDelimiter": ",", //字段分割符"encoding": "utf-8", //编码类型"path": "/path/source", //来源文件路径"protocol": "ftp","password": "******","port": 21,"lineDelimiter": "\\n", //行分割符"host": "ip","fileType": "text", //文件类型"username": "ftpuser"},"name": "ftpreader"},"transformer": [],"writer": {"parameter": {"fileName": "sinkFile", //目标文件名"nullFormat": "null", //空值转换"writeMode": "truncate", //写入模式"suffix": "txt", //文件后缀"encoding": "utf-8","fieldDelimiter": ",", //字段分割符"path": "/path/sink", //目标文件路径"protocol": "ftp","password": "******","port": 21,"lineDelimiter": "\\n", //行分隔符"host": "ip","fileType": "text", //文件类型"username": "ftpuser"},"name": "ftpwriter"}}],"setting": {"errorLimit": { //脏数据阈值"record": 0},"speed": {"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s"channel": 1 //并发数量}}}