Mongo/腾讯云 Mongo 数据源

最近更新时间:2024-09-29 17:41:33

我的收藏
腾讯云 Mongo 数据源与 Mongo 数据源配置方式相同,此处以 Mongo 数据源为例进行讲解。

支持版本

支持 MongoDB 3.6、4.x、5.x、6.x 版本。

使用限制

1. 增量同步时,where 条件语法: create_time>= '${yyyy-MM-dd-1d HH:mm:ss}' and create_time < '${yyyy-MM-dd HH:mm:ss}'
Bound 目前支持整型和日期函数配置。日期函数配置使用方法:
// 转化为13位时间戳(毫秒)
TimestampMillis('yyyy-MM-ddTHH:mm:00+0800')
TimestampMillis('2023-07-10T00:00:00+0800')
TimestampMillis('2023-07-10 00:00:00')
TimestampMillis('2023-07-10')

// 转化为10位时间戳(秒)
TimestampSeconds('yyyy-MM-ddTHH:mm:00+0800')
TimestampSeconds('2023-07-10T00:00:00+0800')
TimestampSeconds('2023-07-10 00:00:00')
TimestampSeconds('2023-07-10')
2. 支持使用 MongoDB 数据库对应账号进行连接,如果您使用的是云数据库 MongoDB 版,默认会有一个 admin 账号。出于安全策略的考虑,在添加使用MongoDB 数据源时,请避免使用 admin 作为访问账号。
3. 如果 MongoDB 为分片集群,则在配置数据源时,需要配置 mongos 地址,避免配置 mongod/shard 节点地址。否则同步任务在抽取 MongoDB 中数据时,可能会导致只查询到指定 shard 的数据,而非预期的全集。关于 mongos、mongod,详情请参见 mongos shards
4. 在并发大于1的情况下,同步任务配置的集合中所有 _id 字段类型必须一致(例如,_id 字段都为 string 类型或者 ObjectId 类型),否则会出现部分数据无法同步的问题。
说明:
并发大于1时,任务拆分会使用 _id 字段进行划分,因而在此场景下 _id 字段不支持混合类型。如果 _id 有多种字段类型,您可以使用单并发的形式进行数据同步,且不配置 splitFactor 或 splitFactor 配置为1。

Mongo 离线单表读取节点配置




参数
说明
数据来源
可用的 Mongo 数据源。
支持选择、或者手动输入需读取的库名称
默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。
当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。
集合
支持选择、或者手动输入需读取的集合。
分割符
当字段为数组类型时,使用分割符拼接数组内容。
筛选条件(选填)
根据数据类型填写对应筛选语句,该语句会作为将要同步数据的筛选条件。
高级设置(选填)
可根据业务需求配置参数。

Mongo 离线单表写入节点配置




参数
说明
数据去向
选择当前项目中可用的 Mongodb 数据源。
支持选择、或者手动输入需读取的库名称
默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。
当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。
集合
支持选择、或者手动输入需读取的集合。
写入模式
表示写入数据时,是否针对相同的主键覆盖写入。默认使用 _id 每行记录的业务主键
前置语句
选填,表示数据同步写入 MongoDB 前的前置操作,请确保前置语句符合 JSON 语句要求,如{"type": "remove"}
高级设置(选填)
可根据业务需求配置参数。

数据类型转换支持

读取

Mongo 读取支持的数据类型及转换对应关系如下(在处理 Mongo 时,会先将 Mongo 数据源的数据类型和数据处理引擎的数据类型做映射):
Mongo 数据类型
内部类型
int32,int64,bigint
Long
double,decimal
Double
string, array,map,list
String
date,datetime
Date
boolean
Boolean
bytes
Bytes

写入

Mongo 写入支持的数据类型及转换对应关系如下:
内部类型
Mongo 数据类型
Long
int, Long
Double
double,decimal
String
string, array
Date
date,datetime
Boolean
boolean
Bytes
bytes

Mongo 脚本 Demo

如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数writer 参数
"job": {
"content": [
{
"reader": {
"parameter": {
"userPassword": "******",
"address": [ //源数据源地址
"ip:27017"
],
"query": "{\\"create_date\\":{\\"$gt\\":ISODate(\\"${yyyy-MM-dd-1d}T00:00:00+0800\\"),\\"$lt\\":ISODate(\\"${yyyy-MM-dd}T00:00:00+0800\\")}}", //筛选条件
"dbName": "database", //源库
"column": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
}
],
"authDb": "admin",
"userName": "mongouser",
"splitter": "-", //分割符,当字段为数组类型时,使用分割符拼接数组内容
"collectionName": "source_collection" //源集合
},
"name": "mongodbreader"
},
"transformer": [],
"writer": {
"parameter": {
"userPassword": "******",
"address": [ //支持多ip
"ip:45400",
"ip:41748",
"ip:41748"
],
"dbName": "database", //目标库
"column": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
}
],
"authDb": "admin",
"userName": "mongouser",
"collectionName": "sink_collections", //目标集合
"preSql": { //前置sql
"type": "remove"
}
},
"name": "mongodbwriter"
}
}
],
"setting": {
"errorLimit": { //脏数据阈值
"record": 0
},
"speed": {
"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s
"channel": 1 //并发数量
}
}
}