概述
1.1、目标
实时数据同步主要实现从源数据库到目标数据库的实时数据同步。源数据主要支持mysql数据库,目标数据包括mysql数据库和hbase数据库。
下面是实时数据同步的数据流转图,mysql的增量订阅数据经过canal和kafka,数据最终实时流入hbase或mysql。
1.2、整体设计架构
实时数据同步基于数据库变更订阅中心,实现从源数据到目标数据的实时数据同步应用。
整体设计架构如下图所示:
1.3、概要设计
实时数据同步分两部分:生产端(productor)和消费端(consumer)
1.3.1、生产端(productor)
集成canal的consumer和kafka的productor。主要完成如下任务
1、监听canal producer 发送过来的
2、将数据进行格式化,
3、调用kafka producer,发送数据。
1.3.2、消费端(consumer)
主要集成kafka consumer和HBase,主要完成如下任务
1、监听producer发送过来的数据。
2、解析数据
3、数据写入到HBase
1.4、技术组件
1.4.1 canal
1.4.1.1 canal简介
canal 是阿里巴巴mysql数据库binlog的增量订阅&消费组件。
1.4.1.2 canal工作原理
canal模拟mysql slave的交互协议,伪装自己为mysql slave,mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
1.4.1.3 canal工作流程
数据按照instance为单位进行划分,每个server上可以部署多个instance。但是同一个instance在整个集群中仅在一台上处于activity状态。其余均处于standby状态。也就是说instanceA两台server中均有部署,但是在这个集群中,仅有一个server上的instanceA处于activity状态。
为了保证数据的有序性,每个instance只能被一个client接收。而且数据称队列方式消费消费,有且仅能被消费一次。
1.4.2 Kafka
1.4.2.1 kafka介绍
kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。broker依赖zookeeper管理集群和存储一些meta信息
14.2.2工作流程
kafka 在server端将数据分topic进行管理,每个topic按照需求,可以分多个partition。
Kafka消息传输同时集成了队列和广播传输两种模式,
针对consumer端进行分group管理,每组会有多个consumer。
每个topic的消息可以被多个group同时消费,每个group的多个consumer正常情况下只能消费一次消息。
详细设计
2.1、生产端设计(producer)
由于canal服务端集成生产者,故程序直接调用canal consumer api 。收集canal producer发送的数据。
在producer对数据的处理比较少,设计时希望数据原汁原味的把数据发送出去。Consumer端可以根据不同的场景需求,对数据进行处理。
2.1.1、类关系图
类ClusterCanalClient
主类,程序的启动入口,继承AbstractCanalClient
接收启动参数。并组织调用其他类完成消费端工作。
类AbstractCanalClient
调用consumerConfig,设置consumer参数。启动consumer。
周期性获取消费数据,调用数据解析和格式化程序,处理数据。
调用kafka接口,将处理后的数据发送。
按照要求,解析并格式化数据。
类 ItheimaProductorConfig
读取并初始化配置信息
2.1.2、可靠性设计
canal设计规则。两台server。采用主从模式,有zookeeper管理。依据需求,合理配置instance个数。
每个instance最多只允许一个client接收数据。
每台consumer client可以接收两个instance。
每两个instance配置3台client。确保系统稳定性
2.1.3 数据处理规则
1、处理规则
Producer不在对数据进行任何形式的过滤。数据是否发送,发送那些数据,需要在canal instance的配置文件中配置。Producer只负责将接收到canal producer发送过来的数据进行解析和格式化。这样虽然会增加占用带宽和存储资源。但可以保证数据在不同的应用中使用。
2、发送时的数据格式
{
table:database.tableName,
binlog_id:””,
event_type:”insert/update/delete”
binlog_id:””,
exe_time:””,
cur_time:””,
cols:[{col:columnName,Val:value,type:columnType},{…},...]}
例如:同步jeehe_goods_info表中的数据如下,将得到如下消息
{“table”:”yzbmp.jehhe_goods_info”, “binlog”:”23455234234”,
“event_type”:”INSERT/UPDATE/DELETE”,
“exe_time”:””,
“cur_time”:””,
“cols”[
{“col”:“order_id”,”val”:”20014587”,”type”:”double”,”update”:”true/flase”,},
{“col”:“user_id”,”val”:”123456”,”type”:”varchar”}
]}
其中
binlog_id 记录获取的binlog ID,用于核对数据,
event_type 当前数据操作类型。插入/修改/删除
exe_time:binlog生成时间
cur_time:canal获取binlog时间
table标签值为当前行所在的表名(数据库.表名)
cols:将列作为数组传输。
col:列名称
val:当前列的值
type:当前列的数据类型,为当前数据库规定的类型,比如mysql的varchar。
* 1、log日志中记录下当前批次,数据获取获取时间和当前处理时间,用于统计数据延迟和数据处理情况。
2、记录下数据binlog信息,并在consumer端同样记录,用于核实数据丢失情况。
目前,consumer在解析数据时,首先查找table标签,发现table标签后,再做进一步解析,如果没有发现table标签,丢弃该条消息。
2.2、消费端设计(consumer)
此消费端主要是从kafka中获取数据,将该数据存入到hbase中
2.2.1、类关系图
类 ClusterKafkaClient
消费端启动类。调用kafka consumer启动程序
类KafkaConsumerController
消费端启动类,启动时负责初始化数据。
调用kafkaconsumer消费端,周期性(暂定30秒)接收producer发送的数据。
调用格式化,格式化数据。
调用Hbase控制类,实例化数据
类 HBaseController
调用HBaseConfig,
获取rowMeta数据,以row为单位,持久化数据。
鉴于线上采用HBase v1.0 版本,目前,程序主要使用V1.0 版本的API。
类YZHBaseTransferConfig
该类主要用于记录mysql数据同步至hbase时的对应关系:
创建对象时,连接一次数据库,并初始化数据。
依据数据库对应关系表,将数据实例化为两个对象,分别为SynColumn,SyTable。
同步时主要分为全表同步和部分同步。全表同步时,所有列都会同步至hbase中,部分同步时,只同步指定列
类YZHBaseTransfer
该类主要负责格式化数据。将kafka接收的消息按照同步对应关系要求。进行格式化,将数据存入rowMeta实例中。
类 SynColumn、 SynTable
数据库中数据同步至hbase时的字段对应关系,分别与yzdc_sync_table_mapping和yzdc_sync_column_mapping表相对应。
类 ColumnMeta、RowMeta
存入hbase数据库的数据对象
2.2.2、可靠性设计
在kafka中,由于每台partition需要对应一台 consumer client。目前环境做如下配置:
2台broker(server).
topic的partition设置为3。这样就可以设置3台 consumer client
2.2.3、数据准确性保证
由于kafka消息传播再多个partition之间是无序的。
Hbase写入时必须设置合适的key,在出现故障时可以将数据冗余覆盖
kafka的offset修改为手动提交,保证HBase写入后再关提交offset。
2.3、数据库设计
数据库主要表结构设计
2.3.1数据表对应关系表 yzdc_sync_table_mapping
字段名称 | 字段类型 | 注释 |
---|---|---|
tb_id | int | 主键,自增 |
orig_tb | VARCHAR | 原始表名称 |
dest_tb | VARCHAR | 目标表名称 |
key_col | VARCHAR | 作为rowkey的列 |
default_family | VARCHAR | 默认family |
syn_type | ENUM | 数据同步类型,分为all和part。all表示全表同步,part表示仅同步表的一部分 |
is_delete | TINYINT | 是否删除 |
update_time | long | 记录当前数据修改时间 |
字段名称 | 字段类型 | 注释 |
---|---|---|
col_id | int | 主键、自增 |
orig_clo | VARCHAR | 原始列名称 |
dest_qualifier | VARCHAR | 对应的目标qualifier |
dest_family | VARCHAR | 对应的目标family |
tb_id | int | 对应的表主键 |
update_time | long | 修改时间 |
is_delete | boolean | 是否删除 |
附录一、kafka数据无序性解决方案
为了解决数据负载均衡,通常情况下会为kafka的topic设置多个partition。便于多consumer接收数据,这样便会引起数据时序性问题。
例如:首先修改A为A1,修改结果发送至partition 1.
再次修改A1为A2,修改结果发送至partition 2
而客户端再接收数据时,针对不同的partition并没有时序性,很有可能会先接收partition 2 的数据,将结果存为A2,然后又接收到partition 1 数据,将结果再次修改为A1,这样的结果和实际结果不符。