在微服务拆分过程里,会对数据库模块重新进行建模拆分,导致部分表和数据,出现物理隔离,导致跨库JOIN的SQL不可行,并在数据检索上也有性能损耗的风险。下面我们来一起探讨一下,具体的解决方案。
业界一般解决方案包括不限于下面几个
方案 | 实现手段 | 优点 | 缺点 |
---|---|---|---|
应用程序层面改造 | 通过子查询、UNION ALL或进行JOIN操作来实现类似的效果 | 灵活性高、易于实现:可以根据具体需求定制查询逻辑 | - 性能问题:复杂的JOIN操作和子查询可能导致性能下降,尤其是在大数据量和高并发情况下。 - 维护成本高:随着业务逻辑的变化,SQL语句可能需要频繁修改和维护。 - 可读性差:复杂的SQL语句可能难以理解和维护。 |
使用mysql的FEDERATED引擎的表 | 自带的联邦存储引擎 | - 数据集中管理:可以将多个数据库的数据集中到一个数据库中进行查询和管理。 - 减少网络开销:通过本地代理表访问远程数据,减少了网络传输的开销。 | - 性能问题:FEDERATED引擎的性能通常不如本地表,尤其是在跨网络访问时。 - 可靠性问题:依赖于远程数据库的可用性,如果远程数据库不可用,本地代理表也无法访问数据。 - 安全性问题:需要确保远程数据库的安全性,防止数据泄露。 |
使用BI工具离线分析 | - 强大的分析功能:BI工具通常提供丰富的数据分析和可视化功能。 - 易于使用:用户友好的界面,便于非技术人员进行数据分析。 | - 延时性问题可能比较严重 - 机器配置要求也高 | |
使用Mysql跨库的平替方案实现 | 如ES文本搜索引擎等 | - 高性能:Elasticsearch等文本搜索引擎通常具有较高的查询性能,适合大数据量的查询。 - 分布式架构:支持分布式部署,具有良好的扩展性和容错性。 - 丰富的功能:提供全文搜索、聚合分析等多种功能。 | - 学习曲线:需要学习和掌握新的技术和工具。 - 数据同步问题:需要确保Elasticsearch中的数据与MySQL中的数据保持同步。 - 成本问题:部署和维护Elasticsearch集群可能需要较高的成本。 |
基于上述的业务特点,我们选择了通过Mysql + DTS + Kafka + ES来解决微服务拆分导致的跨库联表检索问题。
Mysql有以下特点:
DTS提供了多种数据传输的解决方案,我们是基于DTS消息订阅服务,本质是DTS内置了一个Kafka,并将binlog数据源,丢到kafka终端。
DTS有以下特点:
Elasticsearch是一个强大的分布式搜索和分析引擎,它通过其灵活的数据模型和高级搜索功能,能够有效地解决跨表数据库查询的难题,ES具备以下的特点:
方案步骤如下:
我们用2张图对比,来说明ES宽表设计思路
分析:
DTS通过实时拉取源实例的Binlog增量日志,将增量数据解析成Kafka message,然后存储到内置Kafka Server。
操作类型 | 支持的 SQL 操作 |
---|---|
DML | INSERT、UPDATE、DELETE |
DDL | CREATE DATABASE、DROP DATABASE、CREATE TABLE、ALTER TABLE、DROP TABLE、RENAME TABLE |
我们主要是监听相关业务表的DML事件。
上面是DTS订阅了 某一数据库的某些表的binlog事件监听。这些被订阅表的以下变更,都会通过binlog,然后到DTS被暂存。
DTS内置支持 ProtoBuf、Avro 和 JSON 三种格式保存消息。
我们采用了ProtoBuf对消息进行序列化存储,使得消息存储更加灵活高效,空间占用更小,消费速度更快。
设置数据投递到内置 kafka 中 Topic 的分区数量,增加分区数量可提高数据写入和消费的速度。单分区可以保障消息的顺序,多分区无法保障消息顺序,如果您对消费到消息的顺序有严格要求,请选择分区数量为1。
当用户选择 Kafka 多分区时,可以通过设置分区策略(见下面配置),将业务相互关联的数据路由到同一个分区中,这样方便用户处理消费数据。
Topic 分区策略分为三种,将订阅数据生产到 Kafka 各分区:
将源库的订阅数据按照表名进行分区,设置后相同表名的数据会写入同一个 Kafka 分区中。
将源库的订阅数据按照表名+主键进行分区,设置后相同表名的同一个主键ID的数据,会写入同一个Kafka分区中。
自定义分区策略:先通过正则表达式对订阅数据中的库名和表名进行匹配,将匹配到的数据按照表名+表列值进行分区投递。
虽然业务表都归属于独立模块,但都冗余了一个关联主表的字段user_id,因此我们可以通过对user_id设置列分区策略,使得某一位用户的所有关联表数据,落到同一个分区,便于后续做聚合处理:
DTS通过实时拉取源实例的Binlog增量日志,将增量数据解析成Kafka message,然后存储到内置Kafka Server;因此我们可以通过Kafka Client来消费数据。
订阅的消息保存在内置Kafka中,默认保存时间为最近1天,单Topic的最大存储为500G。
例如:checkPoint事件是用来检测心跳发送接受的,可以忽略这类事件(messageType = CHECKPOINT)
Record 中的字段名称 | 说明 |
---|---|
id | 全局递增 ID。 |
version | 协议版本,当前版本为1。 |
messageType | 消息类型,枚举值:"INSERT","UPDATE","DELETE","DDL","BEGIN","COMMIT","HEARTBEAT","CHECKPOINT"。 |
fileName | 当前 record 所在的 binlog 文件名。 |
position | 当前 record 的在 binlog 中结束的偏移量,格式为 End_log_pos@binlog 文件编号。例如,当前 record 位于文件 mysql-bin.000004 中,结束偏移量为2196,则其值为"2196@4"。 |
safePosition | 当前事务在 binlog 中开始的偏移量,格式同上。 |
timestamp | 写入 binlog 的时间,unix 时间戳,秒级。binlog 记录的事务中对应 event header 里面的 timestamp,源端长事务操作可能会导致 timestamp 与 readerTimestamp 有时间差,这种属于正常情况。 |
gtid | 当前的 gtid,如:c7c98333-6006-11ed-bfc9-b8cef6e1a231:9。 |
transactionId | 事务 ID,只有 commit 事件才会生成事务 ID。 |
serverId | 源库 serverId,查看源库 server_id 参考 SHOW VARIABLES LIKE 'server_id'。 |
threadId | 提交当前事务的会话 ID,参考 SHOW processlist;。 |
sourceType | 源库的数据库类型,当前版本只有 MySQL。 |
sourceVersion | 源库版本,查看源库版本参考select version();。 |
schemaName | 库名。 |
tableName | 表名。 |
objectName | 格式为:库名.表名。 |
columns | 表中各列的定义。 |
oldColumns | DML 执行前该行的数据,如果是 insert 消息,该数组为 null。 |
newColumns | DML 执行后该行的数据,如果是 delete 消息,该数组为 null。 |
sql | DDL 的 SQL 语句。 |
executionTime | DDL 执行时长,单位为秒。 |
heartbeatTimestamp | 心跳消息的时间戳,秒级。只有 heartbeat 消息才有该字段。 |
syncedGtid | DTS 已解析 GTID 集合,格式形如:c7c98333-6006-11ed-bfc9-b8cef6e1a231:1-13。 |
fakeGtid | 是否为构造的假 GTID,如未开启 gtid_mode,则 DTS 会构造一个 GTID。 |
pkNames | 如果源库的表设有主键,则 DML 消息中会携带该参数,否则不会携带。 |
readerTimestamp | DTS 处理这条数据的时间,unix 时间戳,单位为毫秒数。 |
tags | QueryEvent 中的 status_vars,详细参考 QueryEvent。 |
total | 如果消息分片,记录分片总数。当前版本 (version = 1) 无意义,预留扩展。 |
index | 如果消息分片,记录当前分片的索引。当前版本 (version = 1) 无意义,预留扩展。 |
根据2.3.2所述,kafka的消息是按user_id来定制分区策略的,通过消息过滤后,一次批量取消息会拿到多条Record(包括了update/delete/insert),即消费者将同时拿到一个用户关联的多个表数据变更记录;一个消息包含多个DML事件(不同的表、不同的log数据)。
异常情况:默认kafka最大的消息是8MB,但还是可能出现超限情况,即,一条binlog可能拆分为多条Record数据,因此在应用层只能在本地内存里,对多条消息进行合并操作。
下面对正常业务流程和kafka消息拆包处理分步描述:
分析:
处理消息有以下需要注意的点:
消费者组有以下几个建议:
写入:
参考之前写过客户端限流文章Guava客户端限流源码分析
需求会在重构期间不断调整,这不又来一个需求了吗。
我们要对已有的宽表增加字段,这就涉及到2个方面:
我们通过接口灰度策略,实现ES检索接口的逐步灰度。
可以参考我的上一篇文章:基于SpringMVC的API灰度方案
业务大表变更,可能导致的大量binlog生产,经过级联扩散,暴露应用程序的消费能力的不足,导致es写入效率降低。【埋个坑,后续补充问题的排查方向】
1~3s里,数据同步并消费完整。
参考:https://www.elastic.co/guide/cn/elasticsearch/guide/current/optimistic-concurrency-control.html
全量导入阶段,DTS 写入目标库时,对目标库的主要影响在 CPU 和 IOPS。
如下以 MySQL 同步为例进行介绍。整体流程为,数据从源实例中导出并导入到目标实例中,关键步骤包括结构初始化、全量数据初始化及增量数据处理。
(1)结构初始化
结构初始化即在目标实例中创建与源实例相同的库表结构信息。同步任务配置时,用户可以选择是否同步库表结构,如果目标实例中已经创建了与源实例相同的结构信息,则不需要同步库表结构信息,只需要同步数据即可,否则需要同步库表结构信息。
(2)全量数据初始化
结构初始化完成后,DTS 会进行存量数据初始化,即将源实例中的全部存量数据导出并导入到目标实例中。
(3)增量数据处理
增量数据处理通过源实例 Binlog 持续获取增量数据,进行一系列过滤转换操作后,将增量数据持久化到中间存储。在全量数据导入完成后,开始在目标实例上持续回放中间存储上的增量变更数据,从而实现目标实例与源实例数据保持一致。
以上是我们的一次解决Es宽表解决跨库联表检索的设计方案总结,最后的业务难点和处理方法,后续有空我们继续聊!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。