FTP 数据源

最近更新时间:2024-09-29 17:41:33

我的收藏

使用限制

FTP Reader 实现了从远程 FTP 文件读取数据并转为数据同步协议的功能,远程 FTP 文件本身是无结构化数据存储。目前 FTP Reader 支持的功能如下所示:
支持
不支持
支持且仅支持读取 TXT 的文件,并要求 TXT 中的 schema 为一张二维表。
支持类 CSV 格式文件,自定义分隔符。
支持多种类型数据读取(使用 STRING 表示)、支持列裁剪和列常量。
支持递归读取、支持文件名过滤。
支持文本压缩,现有压缩格式为 gzip、bzip2、zip、lzo 和 lzo_deflate。
多个 File 可以支持并发读取。

单个 File 多线程并发读取,此处涉及到单个 File 内部切分算法。
单个 File 在压缩情况下,从技术上无法支持多线程并发读取。

FTP Writer 实现了从数据集成协议转为 FTP 文件功能,FTP 文件本身是无结构化数据存储。目前 FTP Writer 支持的功能如下:
支持
不支持
支持且仅支持写入文本类型(不支持 BLOB,如视频数据)的文件,且要求文本中 schema 为一张二维表。
支持类 CSV 和 TEXT 格式的文件,自定义分隔符。
写出时不支持文本压缩。
支持多线程写入,每个线程写入不同子文件。

单个文件不能支持并发写入。
FTP 本身不提供数据类型,FTP Writer 均将数据以 STRING类型写入 FTP 文件。


FTP 离线单表读取节点配置




参数
说明
数据来源
选择当前项目中可用的 FTP 数据源。
同步方式
FTP 支持两种同步方式:
数据同步:解析结构化数据内容,按字段关系进行数据内容映射与同步。
文件传输:不做内容解析传输整个文件,可应用于非结构化数据同步。
文件路径
远程 FTP 文件系统的路径和文件名信息,需要填写包含路径和文件后缀的完整文件路径和文件名。这里可以支持填写多个路径。
当指定单个远程 FTP 文件,FTP 暂时只能使用单线程进行数据抽取。后期会在非压缩文件情况下针对单个 File 进行多线程并发读取。
当指定多个远程 FTP 文件,FTP 支持使用多线程进行数据抽取。线程并发数通过通道数指定。
当指定通配符,FTP 尝试遍历出多个文件信息。例如,指定/代表读取/目录下所有的文件,指定 /bazhen/ 代表读取 bazhen 目录下所有的文件。FTP 目前仅支持星号(*)作为文件通配符,并支持使用调度参数配合调度,灵活配置文件名与文件路径。
文件类型
FTP 支持四种文件类型:txt 、orc 、parquet 、csv。
txt:表示 TextFile 文件格式。
orc:表示 ORCFile 文件格式。
parquet:表示普通 Parquet 文件格式。
csv:表示普通 HDFS 文件格式(逻辑二维表)。
字段分隔符
读取的字段分隔符,FTP 在读取数据时,需要指定字段分隔符,如果不指定会默认为(,),界面配置也会默认填写(,)。
编码
读取文件的编码配置。支持 utf8 和 gbk 两种编码。
空值转换
读取时,将指定字符串转为 null。
文本压缩类型
支持无压缩、zip、gzip、bzip2
跳过表头
否:读取时,不跳过表头。
是:读取时,跳过表头。
高级设置(选填)
可根据业务需求配置参数。
关于文件路径说明:
通常不建议您使用星号(*),易导致任务运行报 JVM 内存溢出的错误。
数据同步会将一个作业下同步的所有 Text File 视作同一张数据表。您必须自己保证所有的 File 能够适配同一套 Schema 信息。
您必须保证读取文件为类 CSV 格式,并且提供给数据同步系统权限可读。
如果 Path 指定的路径下没有符合匹配的文件抽取,同步任务将报错。

FTP 离线单表写入节点配置




参数
说明
数据去向
选择当前项目中可用的 FTP 数据源。
文件路径
文件系统的路径信息。路径支持使用‘*’作为通配符,指定通配符后将遍历多个文件信息。
文件名称
写入的文件名称,该文件名会添加随机的后缀作为实际写入名称。
写入模式
FTP 支持三种写入模式:
append:写入前不做任何处理,直接使用 filename 写入,保证文件名不冲突 。
nonConflict:文件名重复时报错 。
overwrite:写入前清理以文件名为前缀的所有文件。
字段分隔符
写入的字段分隔符。FTP 写入时的字段分隔符,需要您保证与创建的 FTP 表的字段分隔符一致,否则无法在 FTP 表中查到数据。可选:' \\t ' 、' \\u001 ' 、' | '、' 空格 ' 、 ' ;' ' , '。
编码
写入文件的编码配置。支持 utf8 和 gbk 两种编码。
空值转换
写入时,将 null 转为指定字符串。
是否包含表头
否:写入时,不跳过表头。
是:写入时,跳过表头。
高级设置(选填)
可根据业务需求配置参数。

数据类型转换支持

FTP 实现了读取和写入 FTP 双向通道的功能,远程 FTP 文件本身是无结构化数据存储,数据处理引擎在读取和写入时自动转换为 Bytes 类型。

常见问题

1. ftp 写入任务报错:请确认...拥有目录 ls 权限, errorMessage:connect timed out

问题原因
集成连接 FTP 暂仅支持被动模式,可能是 FTP 服务配置未开启被动模式。
客户端连接到 FTP 服务器的21端口,发送用户名和密码登录,登录成功后要 list 列表或者读取数据时,使用的是另外的端口(1024以上),可能是传输数据端口未开放。
解决方案:
确认 FTP 服务器配置:
1. 是否开启了被动模式
# 配置文件
pasv_enable=YES #开启被动模式
pasv_min_port=${number} # 被动模式最小端口
pasv_max_port=${number} # 被动模式最大端口
2. 服务端是否开启上述范围的端口。
在 pod 中安装 lftp 命令:
# 登录213 拷贝apk包到pod中:
cd /data/home/ryanrliao
kube ${资源组}
kubectl cp ftp/lftp-4.8.3-r2.apk -n ${资源组}/${pod名称}:/data/wedata/runner
# 进入pod后安装:
sudo su
apk add --allow-untrusted --no-network lftp-4.8.3-r2.apk
# 使用命令连接
lftp -u ${用户名},'${密码}' -p ${端口} ${ip}
# 使用主动模式
lftp -e "set ftp:passive-mode 0" -u ${用户名},'${密码}' -p ${端口} ${ip}
# 也可以用来连接sftp
lftp sftp://${用户名}:${密码}@${ip}:${端口}

FTP 脚本 Demo

如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数writer 参数
"job": {
"syncType": "data", //同步方式,支持数据同步和文件传输
"content": [
{
"reader": {
"parameter": {
"nullFormat": "", //空值转换
"compress": "zip", //文件压缩类型
"column": [
"*"
],
"skipHeader": "false", //是否跳过表头
"fieldDelimiter": ",", //字段分割符
"encoding": "utf-8", //编码类型
"path": "/path/source", //来源文件路径
"protocol": "ftp",
"password": "******",
"port": 21,
"lineDelimiter": "\\n", //行分割符
"host": "ip",
"fileType": "text", //文件类型
"username": "ftpuser"
},
"name": "ftpreader"
},
"transformer": [],
"writer": {
"parameter": {
"fileName": "sinkFile", //目标文件名
"nullFormat": "null", //空值转换
"writeMode": "truncate", //写入模式
"suffix": "txt", //文件后缀
"encoding": "utf-8",
"fieldDelimiter": ",", //字段分割符
"path": "/path/sink", //目标文件路径
"protocol": "ftp",
"password": "******",
"port": 21,
"lineDelimiter": "\\n", //行分隔符
"host": "ip",
"fileType": "text", //文件类型
"username": "ftpuser"
},
"name": "ftpwriter"
}
}
],
"setting": {
"errorLimit": { //脏数据阈值
"record": 0
},
"speed": {
"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s
"channel": 1 //并发数量
}
}
}