腾讯云 Mongo 数据源与 Mongo 数据源配置方式相同,此处以 Mongo 数据源为例进行讲解。
支持版本
支持 MongoDB 3.6、4.x、5.x、6.x 版本。
使用限制
1. 增量同步时,where 条件语法:
create_time>= '${yyyy-MM-dd-1d HH:mm:ss}' and create_time < '${yyyy-MM-dd HH:mm:ss}'
Bound 目前支持整型和日期函数配置。日期函数配置使用方法:
// 转化为13位时间戳(毫秒)TimestampMillis('yyyy-MM-ddTHH:mm:00+0800')TimestampMillis('2023-07-10T00:00:00+0800')TimestampMillis('2023-07-10 00:00:00')TimestampMillis('2023-07-10')// 转化为10位时间戳(秒)TimestampSeconds('yyyy-MM-ddTHH:mm:00+0800')TimestampSeconds('2023-07-10T00:00:00+0800')TimestampSeconds('2023-07-10 00:00:00')TimestampSeconds('2023-07-10')
2. 支持使用 MongoDB 数据库对应账号进行连接,如果您使用的是云数据库 MongoDB 版,默认会有一个 admin 账号。出于安全策略的考虑,在添加使用MongoDB 数据源时,请避免使用 admin 作为访问账号。
3. 如果 MongoDB 为分片集群,则在配置数据源时,需要配置 mongos 地址,避免配置 mongod/shard 节点地址。否则同步任务在抽取 MongoDB 中数据时,可能会导致只查询到指定 shard 的数据,而非预期的全集。关于 mongos、mongod,详情请参见 mongos shards。
4. 在并发大于1的情况下,同步任务配置的集合中所有 _id 字段类型必须一致(例如,_id 字段都为 string 类型或者 ObjectId 类型),否则会出现部分数据无法同步的问题。
说明:
并发大于1时,任务拆分会使用 _id 字段进行划分,因而在此场景下 _id 字段不支持混合类型。如果 _id 有多种字段类型,您可以使用单并发的形式进行数据同步,且不配置 splitFactor 或 splitFactor 配置为1。
Mongo 离线单表读取节点配置
参数 | 说明 |
数据来源 | 可用的 Mongo 数据源。 |
库 | 支持选择、或者手动输入需读取的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
集合 | 支持选择、或者手动输入需读取的集合。 |
分割符 | 当字段为数组类型时,使用分割符拼接数组内容。 |
筛选条件(选填) | 根据数据类型填写对应筛选语句,该语句会作为将要同步数据的筛选条件。 |
高级设置(选填) | 可根据业务需求配置参数。 |
Mongo 离线单表写入节点配置
参数 | 说明 |
数据去向 | 选择当前项目中可用的 Mongodb 数据源。 |
库 | 支持选择、或者手动输入需读取的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
集合 | 支持选择、或者手动输入需读取的集合。 |
写入模式 | 表示写入数据时,是否针对相同的主键覆盖写入。默认使用 _id 每行记录的业务主键 |
前置语句 | 选填,表示数据同步写入 MongoDB 前的前置操作,请确保前置语句符合 JSON 语句要求,如{"type": "remove"} |
高级设置(选填) | 可根据业务需求配置参数。 |
数据类型转换支持
读取
Mongo 读取支持的数据类型及转换对应关系如下(在处理 Mongo 时,会先将 Mongo 数据源的数据类型和数据处理引擎的数据类型做映射):
Mongo 数据类型 | 内部类型 |
int32,int64,bigint | Long |
double,decimal | Double |
string, array,map,list | String |
date,datetime | Date |
boolean | Boolean |
bytes | Bytes |
写入
Mongo 写入支持的数据类型及转换对应关系如下:
内部类型 | Mongo 数据类型 |
Long | int, Long |
Double | double,decimal |
String | string, array |
Date | date,datetime |
Boolean | boolean |
Bytes | bytes |
Mongo 脚本 Demo
如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数和 writer 参数。
"job": {"content": [{"reader": {"parameter": {"userPassword": "******","address": [ //源数据源地址"ip:27017"],"query": "{\\"create_date\\":{\\"$gt\\":ISODate(\\"${yyyy-MM-dd-1d}T00:00:00+0800\\"),\\"$lt\\":ISODate(\\"${yyyy-MM-dd}T00:00:00+0800\\")}}", //筛选条件"dbName": "database", //源库"column": [{"name": "id","type": "string"},{"name": "name","type": "string"}],"authDb": "admin","userName": "mongouser","splitter": "-", //分割符,当字段为数组类型时,使用分割符拼接数组内容"collectionName": "source_collection" //源集合},"name": "mongodbreader"},"transformer": [],"writer": {"parameter": {"userPassword": "******","address": [ //支持多ip"ip:45400","ip:41748","ip:41748"],"dbName": "database", //目标库"column": [{"name": "id","type": "string"},{"name": "name","type": "string"}],"authDb": "admin","userName": "mongouser","collectionName": "sink_collections", //目标集合"preSql": { //前置sql"type": "remove"}},"name": "mongodbwriter"}}],"setting": {"errorLimit": { //脏数据阈值"record": 0},"speed": {"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s"channel": 1 //并发数量}}}