数据库 MongoDB

最近更新时间:2023-06-21 15:21:50

我的收藏

介绍

Flink connector mongodb 目前支持通过 Flink 将数据批量写入到 mongodb 中,目前仅支持 append 流。

版本说明

Flink 版本
说明
1.11
不支持
1.13
支持
1.14
不支持
1.16
支持

DDL定义

CREATE TABLE mongodb (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- 固定值 'mongodb'
'database' = 'test', --数据库名
'collection' = 'table1',--数据集合
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串
'batchSize' = '1024' -- 每次批量写入的条数
);

使用范围

Flink connector mongodb 目前仅支持 mongodb sink。支持将腾讯云数据库 MongoDB 作为结果表使用。

WITH参数

参数
说明
是否必填
备注
connector
结果表类型
固定值 mongodb
database
数据库名称
-
collection
数据集合
-
uri
MongoDB 连接串
-
batchSize
每次批量写入的条数
默认1024
maxConnectionIdleTime
连接超时时长
默认值为60000,单位为毫秒

代码示例

CREATE TABLE random_source (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100', -- 每秒产生的数据条数
'fields.user_id.kind' = 'sequence', -- 有界序列(结束后自动停止输出)
'fields.user_id.start' = '1', -- 序列的起始值
'fields.user_id.end' = '10000', -- 序列的终止值
'fields.item_id.kind' = 'random', -- 无界的随机数
'fields.item_id.min' = '1', -- 随机数的最小值
'fields.item_id.max' = '1000', -- 随机数的最大值
'fields.category_id.kind' = 'random', -- 无界的随机数
'fields.category_id.min' = '1', -- 随机数的最小值
'fields.category_id.max' = '1000', -- 随机数的最大值
'fields.behavior.length' = '5' -- 随机字符串的长度
);

CREATE TABLE mongodb (
user_id INT,
item_id INT,
category_id INT,
behavior VARCHAR
) WITH (
'connector' = 'mongodb', -- 固定值 'mongodb'
'database' = 'test', --数据库名
'collection' = 'table1',--数据集合
'uri' = 'mongodb://$username:$password@$IP:$PORT,$IP:$PORT,$IP:$PORT/test?authSource=admin', -- MongoDB连接串
'batchSize' = '1024' -- 每次批量写入的条数
);

insert into mongodb select * from random_source;

注意事项

Upsert

MongoDB sink 暂不支持 upsert。

用户权限

MongoDB 的 User 必须拥有 database 的写权限。