前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用PeerDB实现Postgres到Elasticsearch的实时同步与复制

使用PeerDB实现Postgres到Elasticsearch的实时同步与复制

原创
作者头像
点火三周
发布2024-05-11 09:55:53
1940
发布2024-05-11 09:55:53
举报
文章被收录于专栏:Elastic Stack专栏Elastic Stack专栏

PeerDB 团队最近完成了针对 Elasticsearch 的数据集成目标连接器的初步开发,并已进入测试阶段。 EElasticsearch 是一个广泛使用的搜索和分析引擎,它建立在分布式多用户能力的文档数据库之上。在多个行业的数据架构案例中都有 Elasticsearch 的广泛应用。

本文解释了一些通过 Postgres 到 Elasticsearch 的实时同步用例,然后通过一个快速演示展示了使用 PeerDB 进行 Postgres 到 Elasticsearch 复制的高性能和低延迟。最后,我们对连接器的架构进行了高级概述。

Postgres到Elasticsearch复制的使用案例

通过CDC或查询复制从Postgres到Elasticsearch复制的一些常见用例包括:

  1. 大容量数据的高效搜索:Elasticsearch的主要用途是作为一个搜索引擎,即使在海量的数据上也能高效运行。从全文和加权搜索,甚至到使用内置的NLP模型进行复杂的语义搜索,Elasticsearch都非常灵活且可调整。它常用于摄取和索引大量的日志,甚至作为搜索大型网站和内部知识库的支持引擎。
  2. 将数据从规范化转换为文档化:数据模型通常以高度规范化的形式存储在Postgres中,这对于事务完整性非常好,但对于可能需要使用联接或CTE的复杂查询来说就不利了。作为一个文档数据库,Elasticsearch更喜欢以非规范化的形式存储数据。使用PeerDB的查询复制功能,你可以定期将你的数据转换成非规范化的形式,这使得它更适合下游消费者查询。一些处理也可以使用Elasticsearch的摄取管道进行。

使用PeerDB从Postgres到Elasticsearch的低延迟复制

在这一部分,我将通过一个快速演示,介绍如何在变更数据捕获(CDC)模式下,使用 PeerDB 进行 Postgres 到 Elasticsearch 的复制。使用 PeerDB 从 Postgres 到 Elasticsearch 的复制有一些好处,主要的优点是快速的初始加载,和通过不断读取插槽来实现的亚分钟延迟,PeerDB 能够提供这些,因为它专注于 Postgres 的复制。

Postgres设置

你可以在云上或者在本地使用任何Postgres数据库。为了简单起见,我在这个演示中使用了一个在 Docker 容器中本地运行的 Postgres 集群。我们创建了一个名为 oss1 的表,使用一个多值插入语句每秒连续插入1000行。

代码语言:javascript
复制
postgres=# CREATE TABLE oss1 (
           id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
           c1 INT,
           c2 INT,
           t TEXT,
           updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
         );
CREATE TABLE
postgres=# INSERT INTO oss1 (c1, c2, t)
SELECT
    generate_series AS c1,
    generate_series * 2 AS c2,
    'text_' || generate_series AS t
FROM 
    generate_series(1, 1000); 
# 每秒运行一次INSERT    
postgres=# \watch 1
INSERT 0 1000
INSERT 0 1000
INSERT 0 1000

Elasticsearch设置

你可以在本地或者云虚拟机上使用它的 Docker compose设置来设置一个 Elasticsearch 实例。或者你也可以使用腾讯云 ES 或者 Elasticsearch Cloud。在这个演示中,我使用了本地运行的 Docker compose 设置。

PeerDB设置

你可以使用 PeerDB开源版 或者 PeerDB云版 来部署一个PeerDB实例。在这个演示中,我通过Docker compose在本地部署了PeerDB开源版。

创建对等体和镜像以进行Postgres到Elasticsearch的复制

在 PeerDB 世界中,对等体指的是源数据存储或目标数据存储。你可以使用 PeerDB 的用户界面来创建PostgresElasticsearch对等体。然后在源对等体和目标对等体之间创建一个镜像进行数据复制。你可以使用 PeerDB 的用户界面来创建一个从 Postgres 到 Elasticsearch 复制数据的 MIRROR。

我创建了一个基于变更数据捕获(CDC)的 MIRROR,它使用 Postgres 的预写日志(WAL)逻辑解码来复制数据。它包括两个步骤:

  1. 初始加载:首先对 Postgres 中现有的数据进行完全一致的快照,并将其复制到 Elasticsearch;通过 PeerDB 的并行快照,你可以期望显著地加快初始加载速度。我们已经看到在几个小时内移动了几个太字节的数据,而不是几天。
  2. 变更数据捕获(CDC):一旦初始加载完成,PeerDB 将不断从逻辑复制插槽中读取 Postgres 中的变化,并将这些变化复制到 Elasticsearch。由于我们的流式架构,你可以期望对于持续运行的镜像到 Elasticsearch 的数据延迟在几秒钟的范围内。

初始加载应该很快就能完成,而且应该能在创建的 Elasticsearch 索引中看到行。在进入连续的 CDC 模式后,新的行应该会随着它们被插入而显示出来。下面附上了一个显示 Postgres 到 Elasticsearch CDC 镜像的快速视频

架构和设计选择

我们之前已经详细讨论过 PeerDB 的流式架构,但总的来说,PeerDB 利用 Go 的 goroutines 和通道,通过逻辑复制有效地从 PostgreSQL 读取数据,然后通过 Bulk API 批量将数据推送到 Elasticsearch。这种方法通过启用并行处理来提高执行时间。

我们的数据仓库连接器在将数据推送到最终表之前,先将数据存储在一个暂存表中,这是出于成本和性能的考虑。由于 Elasticsearch 的架构和查询语言,我们也能够避免这个中间步骤,直接将处理过的记录流发送到 Elasticsearch 索引,通过批量 API

在Elasticsearch中处理更新和删除

PeerDB 支持使用 Elasticsearch 作为 CDC 和查询复制的目标。在大多数情况下,我们推荐使用 CDC,因为它的使用更简单,可靠性更高,而且能够将 DELETE 复制到 Elasticsearch。然而,这限制了在加载到 Elasticsearch 之前可以进行的转换的范围。

为了在 Elasticsearch 侧支持去重,我们需要一个对每个文档保持一致的唯一 ID,这样我们就可以根据源更新或删除它。对于主键中只有一列的表,可以使用该列的值。对于主键中有多列的表,我们选择将列的值一起哈希,从而得到一个小的唯一标识符,无论行的宽度如何。

代码语言:javascript
复制
// 简化的 Go 代码
func primaryKeyColsHash(record []any, colIndices []int) string {
    hasher := sha256.New()

    for _, colIndex := range colIndices {
        // 将值写入哈希器
        _, _ = fmt.Fprint(hasher, record[colIndex])
    }
    hashBytes := hasher.Sum(nil)
    return base64.RawURLEncoding.EncodeToString(hashBytes)
}

代码语言:javascript
复制
# PeerDB上传到Elasticsearch的样本文档。
# 注意 _id 字段是主键列 id 和 c1 的(base64 编码的)哈希值。
{
  "_index": "public.oss2",
  "_id": "SAgdSqEaQyGYWxOo8Dj2s0DbXsQXLTC_CWlds8-c4kY",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "c1": 434017,
    "c2": 922856,
    "id": 8,
    "t": "pgbenchinsertc4b998821cc6b161e65489b3",
    "updated_at": "2024-05-08T18:33:39.031107Z"
  }
}

查询复制可以以追加模式进行,其中任何变化都会在 Elasticsearch 中创建一个新文档,或者以 upsert 模式进行,其中一些列被指定为键列,这些列在类似于 CDC 的方式中进行去重。

数据类型的动态映射

默认情况下,PeerDB 目前使用 Elasticsearch 的动态映射来自动根据索引中的文档内容推断出数据类型映射。在实践中,数字类型被映射为 longfloat,时间戳类型被映射为 date,大多数其他类型被映射为 text。更详细的映射可以在这里找到。这对许多用例都有效。如果需要,用户可以在手动创建索引时提供显式映射,PeerDB 将向此索引加载文档。

结论

Elasticsearch 连接器处于测试阶段 -- 我们已经有客户使用 PeerDB 将数十亿行从 Postgres 移动到 Elasticsearch。如果你是一个 Elasticsearch 用户,并希望使用 PeerDB 将数据从 Postgres 复制到 Elasticsearch,请试试 PeerDB!我们很乐意帮助你或者得到你的反馈:

  1. 免费试用 PeerDB 云。
  2. 访问 PeerDB 的 GitHub 仓库以开始使用。
  3. 加入我们的 Slack 并打个招呼!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Postgres到Elasticsearch复制的使用案例
  • 使用PeerDB从Postgres到Elasticsearch的低延迟复制
    • Postgres设置
      • Elasticsearch设置
        • PeerDB设置
          • 创建对等体和镜像以进行Postgres到Elasticsearch的复制
          • 架构和设计选择
            • 在Elasticsearch中处理更新和删除
              • 数据类型的动态映射
              • 结论
              相关产品与服务
              Elasticsearch Service
              腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档