前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据实时数据同步方案

大数据实时数据同步方案

作者头像
用户7353950
发布2022-05-11 11:22:03
3.1K0
发布2022-05-11 11:22:03
举报
文章被收录于专栏:IT技术订阅

概述

1.1、目标

实时数据同步主要实现从源数据库到目标数据库的实时数据同步。源数据主要支持mysql数据库,目标数据包括mysql数据库和hbase数据库。

下面是实时数据同步的数据流转图,mysql的增量订阅数据经过canal和kafka,数据最终实时流入hbase或mysql。

1.2、整体设计架构

实时数据同步基于数据库变更订阅中心,实现从源数据到目标数据的实时数据同步应用。

整体设计架构如下图所示:

1.3、概要设计

实时数据同步分两部分:生产端(productor)和消费端(consumer)

1.3.1、生产端(productor)

集成canal的consumer和kafka的productor。主要完成如下任务

1、监听canal producer 发送过来的

2、将数据进行格式化,

3、调用kafka producer,发送数据。

1.3.2、消费端(consumer)

主要集成kafka consumer和HBase,主要完成如下任务

1、监听producer发送过来的数据。

2、解析数据

3、数据写入到HBase

1.4、技术组件

1.4.1 canal

1.4.1.1 canal简介

canal 是阿里巴巴mysql数据库binlog的增量订阅&消费组件。

1.4.1.2 canal工作原理

canal模拟mysql slave的交互协议,伪装自己为mysql slave,mysql master发送dump协议

mysql master收到dump请求,开始推送binary log给slave(也就是canal)

canal解析binary log对象(原始为byte流)

1.4.1.3 canal工作流程

数据按照instance为单位进行划分,每个server上可以部署多个instance。但是同一个instance在整个集群中仅在一台上处于activity状态。其余均处于standby状态。也就是说instanceA两台server中均有部署,但是在这个集群中,仅有一个server上的instanceA处于activity状态。

为了保证数据的有序性,每个instance只能被一个client接收。而且数据称队列方式消费消费,有且仅能被消费一次。

1.4.2 Kafka

1.4.2.1 kafka介绍

kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。broker依赖zookeeper管理集群和存储一些meta信息

14.2.2工作流程

kafka 在server端将数据分topic进行管理,每个topic按照需求,可以分多个partition。

Kafka消息传输同时集成了队列和广播传输两种模式,

针对consumer端进行分group管理,每组会有多个consumer。

每个topic的消息可以被多个group同时消费,每个group的多个consumer正常情况下只能消费一次消息。

详细设计

2.1、生产端设计(producer)

由于canal服务端集成生产者,故程序直接调用canal consumer api 。收集canal producer发送的数据。

在producer对数据的处理比较少,设计时希望数据原汁原味的把数据发送出去。Consumer端可以根据不同的场景需求,对数据进行处理。

2.1.1、类关系图

类ClusterCanalClient

主类,程序的启动入口,继承AbstractCanalClient

接收启动参数。并组织调用其他类完成消费端工作。

类AbstractCanalClient

调用consumerConfig,设置consumer参数。启动consumer。

周期性获取消费数据,调用数据解析和格式化程序,处理数据。

调用kafka接口,将处理后的数据发送。

按照要求,解析并格式化数据。

类 ItheimaProductorConfig

读取并初始化配置信息

2.1.2、可靠性设计

canal设计规则。两台server。采用主从模式,有zookeeper管理。依据需求,合理配置instance个数。

每个instance最多只允许一个client接收数据。

每台consumer client可以接收两个instance。

每两个instance配置3台client。确保系统稳定性

2.1.3 数据处理规则

1、处理规则

Producer不在对数据进行任何形式的过滤。数据是否发送,发送那些数据,需要在canal instance的配置文件中配置。Producer只负责将接收到canal producer发送过来的数据进行解析和格式化。这样虽然会增加占用带宽和存储资源。但可以保证数据在不同的应用中使用。

2、发送时的数据格式

{

table:database.tableName,

binlog_id:””,

event_type:”insert/update/delete”

binlog_id:””,

exe_time:””,

cur_time:””,

cols:[{col:columnName,Val:value,type:columnType},{…},...]}

例如:同步jeehe_goods_info表中的数据如下,将得到如下消息

{“table”:”yzbmp.jehhe_goods_info”, “binlog”:”23455234234”,

“event_type”:”INSERT/UPDATE/DELETE”,

“exe_time”:””,

“cur_time”:””,

“cols”[

{“col”:“order_id”,”val”:”20014587”,”type”:”double”,”update”:”true/flase”,},

{“col”:“user_id”,”val”:”123456”,”type”:”varchar”}

]}

其中

binlog_id 记录获取的binlog ID,用于核对数据,

event_type 当前数据操作类型。插入/修改/删除

exe_time:binlog生成时间

cur_time:canal获取binlog时间

table标签值为当前行所在的表名(数据库.表名)

cols:将列作为数组传输。

col:列名称

val:当前列的值

type:当前列的数据类型,为当前数据库规定的类型,比如mysql的varchar。

* 1、log日志中记录下当前批次,数据获取获取时间和当前处理时间,用于统计数据延迟和数据处理情况。

2、记录下数据binlog信息,并在consumer端同样记录,用于核实数据丢失情况。

目前,consumer在解析数据时,首先查找table标签,发现table标签后,再做进一步解析,如果没有发现table标签,丢弃该条消息。

2.2、消费端设计(consumer)

此消费端主要是从kafka中获取数据,将该数据存入到hbase中

2.2.1、类关系图

类 ClusterKafkaClient

消费端启动类。调用kafka consumer启动程序

类KafkaConsumerController

消费端启动类,启动时负责初始化数据。

调用kafkaconsumer消费端,周期性(暂定30秒)接收producer发送的数据。

调用格式化,格式化数据。

调用Hbase控制类,实例化数据

类 HBaseController

调用HBaseConfig,

获取rowMeta数据,以row为单位,持久化数据。

鉴于线上采用HBase v1.0 版本,目前,程序主要使用V1.0 版本的API。

类YZHBaseTransferConfig

该类主要用于记录mysql数据同步至hbase时的对应关系:

创建对象时,连接一次数据库,并初始化数据。

依据数据库对应关系表,将数据实例化为两个对象,分别为SynColumn,SyTable。

同步时主要分为全表同步和部分同步。全表同步时,所有列都会同步至hbase中,部分同步时,只同步指定列

类YZHBaseTransfer

该类主要负责格式化数据。将kafka接收的消息按照同步对应关系要求。进行格式化,将数据存入rowMeta实例中。

类 SynColumn、 SynTable

数据库中数据同步至hbase时的字段对应关系,分别与yzdc_sync_table_mapping和yzdc_sync_column_mapping表相对应。

类 ColumnMeta、RowMeta

存入hbase数据库的数据对象

2.2.2、可靠性设计

在kafka中,由于每台partition需要对应一台 consumer client。目前环境做如下配置:

2台broker(server).

topic的partition设置为3。这样就可以设置3台 consumer client

2.2.3、数据准确性保证

由于kafka消息传播再多个partition之间是无序的。

Hbase写入时必须设置合适的key,在出现故障时可以将数据冗余覆盖

kafka的offset修改为手动提交,保证HBase写入后再关提交offset。

2.3、数据库设计

数据库主要表结构设计

2.3.1数据表对应关系表 yzdc_sync_table_mapping

字段名称

字段类型

注释

tb_id

int

主键,自增

orig_tb

VARCHAR

原始表名称

dest_tb

VARCHAR

目标表名称

key_col

VARCHAR

作为rowkey的列

default_family

VARCHAR

默认family

syn_type

ENUM

数据同步类型,分为all和part。all表示全表同步,part表示仅同步表的一部分

is_delete

TINYINT

是否删除

update_time

long

记录当前数据修改时间

2.3.2数据列对应关系表yzdc_sync_column_mapping

字段名称

字段类型

注释

col_id

int

主键、自增

orig_clo

VARCHAR

原始列名称

dest_qualifier

VARCHAR

对应的目标qualifier

dest_family

VARCHAR

对应的目标family

tb_id

int

对应的表主键

update_time

long

修改时间

is_delete

boolean

是否删除

附录一、kafka数据无序性解决方案

为了解决数据负载均衡,通常情况下会为kafka的topic设置多个partition。便于多consumer接收数据,这样便会引起数据时序性问题。

例如:首先修改A为A1,修改结果发送至partition 1.

再次修改A1为A2,修改结果发送至partition 2

而客户端再接收数据时,针对不同的partition并没有时序性,很有可能会先接收partition 2 的数据,将结果存为A2,然后又接收到partition 1 数据,将结果再次修改为A1,这样的结果和实际结果不符。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-05-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 IT技术订阅 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2.3.2数据列对应关系表yzdc_sync_column_mapping
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档