支持版本
PostgreSQL 使用的驱动版本为 postgresql42.2.26,目前支持 PostgreSQL9.6 及以上版本。
使用限制
说明:
支持读取视图表。
1. PostgreSQL 数据源支持 Password 认证方式(支持 SCRAM-SHA-256 认证方式),如果 PostgreSQL 数据库端更改了密码和密码认证方式,则需要更新数据源配置,并且重新测试连通性和手动运行相关任务验证。
2. 当 PostgreSQL 中表名称、字段名称是以数字开头,或者名称中包含大小写英文字母、中划线(-)时需要使用双引号("")进行转义,不进行转义会导致PostgreSQL 插件读取或写入 PostgreSQL 数据失败。但是在 PostgreSQL Reader 和 Writer 插件中,双引号("")为 JSON 关键字,因此,您需要使用反斜线(\\)再次对双引号("")进行转义。例如,表名称为 123Test,则转义后表名称为 \\"123Test\\"。
说明:
双引号("")中,前引号(")和后引号(")均需使用反斜线(\\)进行转义。
向导模式不支持转义,您需要转换为脚本模式进行转义。
使用脚本模式进行转义的代码示例如下:
"parameter": {"datasource": "abc","column": ["id","\\"123Test\\"", //添加转义符],"where": "","splitPk": "id","table": "public.wpw_test"}
PostgreSQL 离线单表读取节点配置
参数 | 说明 |
数据来源 | 可用的 PostgreSQL 数据源。 |
库 | 支持选择、或者手动输入需读取的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
Schema | 支持选择、或者手动输入需读取的 Schema 名称。 |
表 | 支持选择、或者手动输入需读取的表名称 分表情况下,可在 PostgreSQL 源端支持选择或输入多个表名称,多个表需保证结构一致。 分表情况下,支持配置表序号区间。例如'table_[0-99]'表示读取'table_0'、'table_1'、'table_2'直到'table_99' ; 如果您的表数字后缀的长度一致,例如'table_000'、'table_001'、'table_002'直到'table_999',您可以配置为'"table": ["table_00[0-9]", "table_0[10-99]", "table_[100-999]"]' 。 当数据源网络不联通导致无法直接拉取表信息时,可手动输入表名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
添加分库分表 | 适用于分库场景,单击后可配置多个数据源、库及表信息。 注意:
分库分表情况下选择的多个表对象需保证 Schema 信息一致(包括字段名称、字段类型)。数据字段模块内系统默认展示第一个数据源的第一张表的元数据字段信息,若多表间字段不一致可能会导致运行失败。 |
切割键 | 您可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。读取数据时,根据配置的字段进行数据分片,实现并发读取,可以提升数据同步效率。 |
筛选条件(选填) | PostgreSQL 根据指定的 where 条件拼接 SQL,并根据该 SQL 进行数据抽取。例如测试时,可以将 where 条件指定实际业务场景,通常会选择当天的数据进行同步,将 where 条件指定为 id>2 and sex=1。 |
PostgreSQL 离线单表写入节点配置
参数 | 说明 |
数据去向 | 需要写入的 PostgreSQL 数据源。 |
库 | 支持选择、或者手动输入需写入的库名称 默认将数据源绑定的数据库作为默认库,其他数据库需手动输入库名称。 当数据源网络不联通导致无法直接拉取库信息时,可手动输入数据库名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
Schema | 支持选择、或者手动输入需读取的 Schema 名称。 |
表 | 支持选择、或者手动输入需写入的表名称 当数据源网络不联通导致无法直接拉取表信息时,可手动输入表名称。在数据集成网络连通的情况下,仍可进行数据同步。 |
是否清空表 | 在写入该 PostgreSQL 数据表前可以手动选择是否清空该数据表。 |
写入模式 | PostgreSQL写入支持两种模式:
append:追加写入。
upsert:根据设置主键字段进行数据更新写入。 |
批量提交大小 | 一次性批量提交的记录数大小,该值可以极大减少数据同步系统与 PostgreSQL 的网络交互次数,并提升整体吞吐量。如果该值设置过大,会导致数据同步运行进程 OOM 异常。 |
前置 SQL(选填) | 执行同步任务之前执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,执行前清空表中的旧数据(truncate table tablename)。 |
后置 SQL(选填) | 执行同步任务之后执行的 SQL 语句,根据数据源类型对应的正确 SQL 语法填写 SQL,例如,加上某一个时间戳alter table tablename add colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP。 |
数据类型转换支持
读取
PostgreSQL 读取支持的数据类型及转换对应关系如下(在处理 PostgreSQL 时,会先将 PostgreSQL 数据源的数据类型和数据处理引擎的数据类型做映射):
PostgreSQL 数据类型 | 内部类型 |
smallint(int2),integer(int4),bigint(int8), serial,bigserial | Long |
real(float4),double precision(float8), money, numeric | Double |
varchar(character varying), char,uuid,json,array | String |
date, time, timestamp | Date |
bool(boolean),bit | Boolean |
bytea | Bytes |
写入
PostgreSQL 写入支持的数据类型及转换对应关系如下:
内部类型 | PostgreSQL 数据类型 |
Long | smallint(int2),integer(int4),bigint(int8) |
Double | double precision,numeric, real |
String | varchar(character varying),char,bit,money |
Date | date, time, timestamp |
Boolean | bool(boolean) |
Bytes | bytea |
PostgreSQL 脚本 Demo
如果您配置离线任务时,使用脚本模式的方式进行配置,您需要在任务脚本中,按照脚本的统一格式要求编写脚本中的 reader 参数和 writer 参数。
"job": {"content": [{"reader": {"parameter": {"password": "******","column": [ //列名"id","name"],"connection": [{"jdbcUrl": ["jdbc:postgresql://ip:5432/database?reWriteBatchedInserts=true"],"table": [ //源表"schema.source_table"]}],"where": "id>10", //筛选条件"splitPk": "id", //切割键"username": "root"},"name": "postgresqlreader"},"transformer": [],"writer": {"parameter": {"postSql": [ //后置sql""],"password": "******","column": [ //列名"id","name"],"connection": [{"jdbcUrl": "jdbc:postgresql://ip:5432/database?reWriteBatchedInserts=true","table": [ //目标表"schema.sink_table"]}],"writeMode": "upsert", //写入模式"batchSize": 1024, //批量提交大小"username": "root","on": [ //唯一性索引"id"],"preSql": [ //前置sql""]},"name": "postgresqlwriter"}}],"setting": {"errorLimit": { //脏数据阈值"record": 0},"speed": {"byte": -1, //不限制同步速度,正整数表示设置最大传输速度 byte/s"channel": 1 //并发数量}}}