数据库 MongoDB

最近更新时间:2026-04-23 16:42:42

我的收藏

介绍

Flink connector mongodb 目前支持通过 Flink 将数据批量写入到 mongodb 中,目前支持 append 流和 upsert 流。

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持
1.14
不支持
1.16
支持,

DDL 定义

CREATE TABLE mongodb (
user_id STRING,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- 固定值 'mongodb'
'database' = 'test', --数据库名
'collection' = 'table1',--数据集合
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串
'sink.buffer-flush.max-rows' = '1024', -- 每次批量写入的条数
'sink.buffer-flush.interval' = '1s' -- 每次批量写入的间隔
);

使用范围

Flink connector mongodb 目前仅支持 mongodb sink。支持将腾讯云数据库 MongoDB 作为结果表使用。 其中 1.16 支持 指定分片键写入

WITH 参数

参数
说明
是否必填
备注
connector
结果表类型
固定值 mongodb
database
数据库名称
-
collection
数据集合
-
uri
MongoDB 连接串
-
sink.buffer-flush.max-rows
每次批量写入的条数
默认1000
maxConnectionIdleTime
连接超时时长
默认值为60000,单位为毫秒
sink.buffer-flush.interval
每次批量写入的间隔
默认值为1,单位为秒
sink.max-retries
写入失败的最大重试次数
默认为3
sink.retry.interval
写入失败的重试间隔
默认值为1000,单位为 ms
sink.string-to-objectid.enable
是否设置 string 默认转换为 ObjectId
默认值为 true, 如果分区字段长度刚好等于 14 的时候, 需要设置 'sink.string-to-objectid.enable' = 'false'

代码示例

CREATE TABLE random_source (
user_id STRING,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- 每秒产生的数据条数
'fields.user_id.kind' = 'sequence', -- 有界序列(结束后自动停止输出)
'fields.user_id.start' = '1', -- 序列的起始值
'fields.user_id.end' = '10000', -- 序列的终止值
'fields.item_id.kind' = 'random', -- 无界的随机数
'fields.item_id.min' = '1', -- 随机数的最小值
'fields.item_id.max' = '1000', -- 随机数的最大值
'fields.category_id.kind' = 'random', -- 无界的随机数
'fields.category_id.min' = '1', -- 随机数的最小值
'fields.category_id.max' = '1000', -- 随机数的最大值
'fields.behavior.length' = '5' -- 随机字符串的长度
);

CREATE TABLE mongodb (
user_id STRING,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- 固定值 'mongodb'
'database' = 'test', --数据库名
'collection' = 'table1',--数据集合
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串
'sink.buffer-flush.max-rows' = '1024', -- 每次批量写入的条数
'sink.buffer-flush.interval' = '1s' -- 每次批量写入的间隔
);

insert into mongodb select * from random_source;

Upsert

MongoDB sink 支持 upsert,但是需要在创建 table 时指定 primary key。分为以下两种情况:
1. 只指定 _id 作为 key。
CREATE TABLE mongodb (
_id STRING,
item_id INT,
category_id INT,
behavior VARCHAR,
PRIMARY KEY (`_id`) NOT ENFORCED
) WITH (
...
);
2. 指定除 _id 以外的其他自定义字段作为 key。注意:此时会将自定义的字段转成 _id。
举例:字段 a 和字段 b 共同作为主键,那么 {a = 1, b = '2',c=3} 同步到 mongodb 端会变成{_id : {a:1, b:'2'}, a: 1, b: '2', c: 3}。
CREATE TABLE mongodb (
user_id STRING,
item_id INT,
category_id INT,
behavior VARCHAR,
PRIMARY KEY (`user_id`, `item_id`) NOT ENFORCED
) WITH (
...
);

注意事项

用户权限

MongoDB 的 User 必须拥有 database 的写权限。