随着业务数据量的剧增,传统MySQL在数据存储上变得越来越吃力,NoSQL因其良好的性能、扩展性、稳定性逐渐成为业务选型的首要考虑。TcaplusDB是腾讯云推出的一款全托管NoSQL数据库服务,旨在为客户提供极致的数据据存储体验,详细信息请参考官方文档。本文主要介绍如何将MySQL数据迁移到TcaplusDB。
MySQL与TcaplusDB属于异构数据库,数据迁移之前需要考虑两者间数据的差异。
TcaplusDB | MySQL |
---|---|
集群 | 数据库 |
表格组 | N/A |
表 | 表 |
记录 | 行 |
字段 | 列 |
特性 | TcaplusDB | MySQL |
---|---|---|
数据模型 | Key-Value, JSON格式 | 结构化数据 |
数据类型 | 丰富 | 丰富 |
分片 | Yes | N/A |
扩展性 | 优 | 低 |
SQL | 类SQL | ANSI |
主键 | Yes | Yes |
外键 | N/A | Yes |
事务 | N/A | Yes |
二进制数据 | Yes | Yes |
周边生态 | 弱 | 强 |
TcaplusDB支持两种协议来表述数据Schema, 分别为TDR (Tencent Data Representation)和Google Protobuf, 不同协议的数据类型在定义时有所区别,但在底层是统一的。
数据类型 | TcaplusDB (TDR) | TcaplusDB (Protobuf) | MySQL |
---|---|---|---|
整形 | int64,uint64 | int64,uint64 | BIGINT |
整形 | int32,uint32 | int32,uint32 | INT,INTEGER |
整形 | int16,uint16 | N/A | SMALLINT |
整形 | int8,uint18 | N/A | TINYINT |
浮点型 | float, double | float,double | FLOAT, DOUBLE |
定点型 | N/A | N/A | DECIMAL |
位 | N/A | N/A | BIT |
字符型 | char | bytes | CHAR |
字符串 | string | string | VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT |
二进制 | array char | bytes | TINYBLOB, BLOB, MEDIUMBLOB,LONGBLOB, VARBINARY, BINARY |
布尔型 | bool | bool | BOOLEAN |
数组列表 | array修饰 | repeated修饰 | N/A |
结构体 | struct | message | N/A |
枚举 | N/A | enum | ENUM |
共同体 | union | N/A | N/A |
TcaplusDB与MySQL属于异构数据库,数据迁移有一些限制。
序号 | 限制项 | 说明 |
---|---|---|
1 | 不支持外键迁移 | TcaplusDB没有外键,如果MySQL有定义外键迁移时外键对应列直接映射成TcaplusDB字段,不会维护原有外键关联表信息 |
2 | 不支持存储过程 | 如果MySQL定义有存储过程,迁移时将被忽略 |
3 | 数据类型转换 | 对于TcaplusDB不支持的数据类型,需要进行转换,如Decimal转成TcaplusDB长整形,日期类的转换成字符串类型等 |
4 | 迁移网络环境 | 本文只介绍同是腾讯云环境下MySQL迁移TcaplusDB场景,MySQL与TcaplusDB同属腾讯云一个地域 |
5 | 实时迁移删除操作限制 | 删除操作可能存在删空记录情况,需要避免后续离线迁移重新把待删除的记录写到表中,产生脏数据现象,具体做法是如果删除的是一条空记录把记录写到另一张待删除的表,待离线全量迁移完成后进行对账,把脏数据从业务表删除 |
6 | MySQL数据订阅 | 开启数据订阅功能,需要修改数据源MySQL实例的参数,涉及重启实例,会影响当前存在的连接,用户需要评估重启期间断连对业务的影响 |
对于线上业务迁移,涉及实时数据迁移和离线存量数据迁移。以游戏举例,如果业务同意停服迁移,则直接采用离线全量数据一次性迁移最好,低价低; 如果业务不同意停服迁移,则需要同时考虑实时和离线。
MySQL实时数据迁移通用的方案是基于binlog进行实时数据采集,捕获数据更新操作如insert、update、delete。腾讯云目前针对MySQL实例支持数据订阅功能可实时捕获MySQL数据变更,数据订阅相关文档可参考官网文档。
数据订阅功能支持将数据采集传输至Kafka,这里会介绍如何用腾讯云CKafka实现数据流传输,同时借助腾讯云SCF无服务函数来消费CKafka数据并写到TcaplusDB。
MySQL离线数据迁移涉及存量数据的搬迁,存量数据搬迁需要考虑如何避免线上业务影响,如在业务低峰期迁移、从备机拉数据。存量数据导出并导入异构平台方案这里介绍两种:
迁移场景 | 资源名称 | 资源说明 |
---|---|---|
实时迁移 | 腾讯云CVM实例 | 用于跑数据订阅程序 |
实时迁移 | 腾讯云CDB for MySQL | 用于数据源模拟 |
实时迁移 | 腾讯云DTS服务数据订阅 | 实时订阅MySQL数据 |
实时迁移 | 腾讯云CKafka | 消息队列,解藕订阅/消费过程 |
实时迁移 | 腾讯云SCF | 消费CKafka数据 |
实时迁移 | 腾讯云TcaplusDB | 数据存储平台 |
离线迁移 | 腾讯云COS | 数据文件存储 |
离线迁移 | 腾讯云CDB for MySQL | 用于中间临时数据存储 |
迁移场景 | 开发项 |
---|---|
实时迁移 | 数据订阅程序 |
实时迁移 | SCF消费订阅数据程序 |
离线迁移 | 批量导出MySQL数据和批量解析程序 |
所有资源统一申请到腾讯云上海地域。
表信息 | |||
---|---|---|---|
库表名 | 库: tcaplus | 表: test | |
序号 | 字段名 | 字段类型 | 字段说明 |
1 | player_id | bigint(20) | primary key |
2 | player_name | varchar(128) | |
3 | player_info | varbinary(2048) | JSON字符串二进制写入 |
player_info的JSON原始数据结构如下:
序号 | 字段名 | 字段类型 |
---|---|---|
1 | player_email | varchar(64) |
2 | player_phone | varchar(32) |
表信息 | ||||
---|---|---|---|---|
集群:tw_tcaplus | 表格组: tw_group_1 | 表名: test | ||
序号 | 字段名 | 字段类型 | 字段说明 | |
1 | player_id | int64 | primary key | |
2 | player_name | string | ||
3 | player_info | struct | 参考MySQL表player_info说明 |
tcaplus
, 表: test
,编码: utf8mb4
),申请指引参考官方文档,创建表DDL语句如下:CREATE TABLE `test` (
`player_id` bigint(20) NOT NULL,
`player_name` varchar(128) DEFAULT NULL,
`player_info` varbinary(2048) DEFAULT NULL,
PRIMARY KEY (`player_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
tw_cluster
, 表所在表格组名: tw_group_1
,表名: test
。TcaplusDB表定义Schema如下:syntax = "proto3"; // Specify the version of the protocol buffers language
import "tcaplusservice.optionv1.proto"; // Use the public definitions of TcaplusDB by importing them.
message test { // Define a TcaplusDB table with message
// Specify the primary keys with the option tcaplusservice.tcaplus_primary_key
// The primary key of a TcaplusDB table has a limit of 4 fields
option(tcaplusservice.tcaplus_primary_key) = "player_id";
// primary key fields
int64 player_id = 1;
//ordinary fields
string player_name = 2;
info player_info = 3;
}
message info {
string player_email = 1;
string player_phone = 2;
}
tw_migrate
。具体指引参考官方文档。utf8mb4
。%
,允许所有主机通过此账户来访问实例,如下所示:tw_migrate
,如下所示:注意要与MySQL实例同属一个网络
),在同步类型处只选择数据更新,库表任务处选择test表作为订阅的表。具体如下所示:数据订阅好后,需要把订阅的数据拉取到CKafka, 这里需要开发相应的程序,示例程序可参考官方文档。程序依赖的相关组件可从官方文档获取。
KafkaDemo.java
, 参考第6章节资源下载binlogsdk-2.8.2-jar-with-dependencies.jar
, 参考第6章节资源下载SLF4J.zip
, 参考第6章节资源下载kafka-clients-1.1.0.jar
, 参考第6章节资源下载json-simple-1.1.jar
, 参考第6章节资源下载yum install -y java-1.8.0-openjdk-devel
。//替换TOPIC为申请的topic名,这里为tw_migrate
final String TOPIC = "tw_migrate";
Properties props = new Properties();
//替换CKafka实例的连接信息,进Ckafka控制查看实例信息中的内网IP与端口
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.16.6:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final Producer<String, String> producer = new KafkaProducer<String, String>(props);
//替换SecretId为自己腾讯云账号下申请的密钥信息
context.setSecretId("xxx");
//替换SecretKey为自己腾讯云账号下申请的密钥信息
context.setSecretKey("xxx");
//替换Region信息
context.setRegion("ap-shanghai");
// 替换数据订阅连接信息,从详情页获取的ip,port
context.setServiceIp("172.17.16.2");
context.setServicePort(7507);
final DefaultSubscribeClient client = new DefaultSubscribeClient(context);
// 填写对应要同步的数据库和表名
//替换订阅的库名
final String targetDatabase = "tcaplus";
client.addClusterListener(listener);
// 替换订阅的通道名,订阅详情页获取的 dts-channel 配置信息,填写到此处
client.askForGUID("dts-channel-5ljfLZmhFvnTCQx9");
client.start();
上传上述下载的组件包到CVM实例,并执行如下编译命令:
javac -cp binlogsdk-2.8.2-jar-with-dependencies.jar:kafka-clients-1.1.0.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar:json-simple-1.1.jar -encoding UTF-8 KafkaDemo.java
java -XX:-UseGCOverheadLimit -Xms2g -Xmx2g -classpath .:binlogsdk-2.6.0-release.jar:kafka-clients-1.1.0.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar:json-simple-1.1.jar KafkaDemo
在MySQL实例中插入一条示例数据,验证数据是否能订阅成功。插入MySQL数据这里用Python3程序来模拟,代码如下:
import json
import MySQLdb
#替换DB连接信息,从已申请的MySQL实例中获取
db = MySQLdb.connect("172.17.16.17", "xxx", "xxx", "tcaplus", charset='utf8' )
cursor = db.cursor()
#模拟一个json结构数据转成字符串后以byte形式写入MySQL
info={"player_phone":"123456","player_email":"June@123.com"}
info_bin = json.dumps(info).encode('utf-8')
sql = "INSERT INTO test (player_id, player_name, player_info) VALUES (%s, %s, %s)"
val = (7, 'June', info_bin,)
cursor.execute(sql, val)
db.commit()
db.close()
打开另一个CVM终端,执行上述程序,并查看4.2.5.4启动程序的输出,如下所示:
从上图来看说明数据订阅程序已经捕获到MySQL的数据插入动作。
数据消费通过腾讯云SCF来实现。SCF支持创建CKafka触发器,借助触发器机制可实时捕获CKafka的数据流,只要有数据发布到Ckafka指定topic, 会触发SCF自动拉取Topic新进的数据。触发器如下所示:
SCF捕获到数据后,解析捕获的数据包并转换成TcaplusDB能识别的JSON记录格式,再通过调用TcaplusDB Python RESTful SDK接口把JSON记录写到TcaplusDB表。具体代码可在SCF控制台上传代码ZIP包即可, 下载地址 。代码关键逻辑:
通过SCF转换写入到TcaplusDB的数据,如下所示:
上面实现并验证了实时迁移数据流管道,通过数据订阅捕获MySQL增删改事件并实时通过订阅程序传输到Ckafka, 同时通过SCF触发器机制捕获CKafka的输入数据流并拉取解析最后写到TcaplusDB。针对删除操作,为避免空删场景,把删除时错误码为261(数据记录不存在)的单独处理,即把这部分数据重新写到新的一张待删除表,待全量迁移完成后再统一对账清理这部分脏数据。
离线迁移主要有两种方式: 一种是dump方式把表数据dump成SQL文件形式,文件内容为Insert格式,然后可以把SQL文件回写到另一临时MySQL实例产生Binlog走实时迁移方案; 另一种是select方式,从表中查数据出来以指定格式保存到文本文件, 如JSON格式行,通过腾讯云批量解析的方式写到TcaplusDB。
dump全表数据可以用如下命令:
#替换MySQL连接账户名和密码
mysqldump -h172.17.16.17 -u[db_user] -p[db_password] -P3306 -B tcaplus --tables test --skip-opt >test.sql
将上述导出test.sql文件重新Load到新的临时MySQL实例,产生Binlog后用实时迁移方案来进行数据采集,参考第4章节所述,这里不再展开。
Select方式可以选择数据输出格式如JSON,如果原表设计有时间字段可以将时间字段设置为索引,并按时间段进行数据导出避免一次导出全量数据。这里以导出全量数据举例,借助JSON_OBJECT函数可以导出TcaplusDB的JSON格式,如下命令所示:
#替换用户名和密码,行之间间隔符}}}{{{
mysql -h172.17.16.17 -u[db_user] -p[db_password] -P3306 -D tcaplus -Ne "select JSON_OBJECT('player_id',player_id,'player_name',player_name,'player_info',player_info) from test" >test.json
test.json示例文件格式如下:
{"player_id": 1, "player_info": "base64:type15:eyJwbGF5ZXJfZW1haWwiOiAiMUB0ZXN0LmNvbSIsICJwbGF5ZXJfcGhvbmUiOiAiMTIzNDU2NiJ9", "player_name": "ball"}
从上述示例格式来看,MySQL底层对Varbinary数据类型会自动转成base64编码,在解析时需要把这个base64进行解码转换成TcaplusDB的字符串格式。
在2.5.2章节介绍了离线数据迁移场景,如果业务表数据量很大,为加快导入TcaplusDB速度,可考虑批量解析,批量解析文件目前业界用得较多的方案是用Spark或Map/Reduce进行文件解析将解析后的数据写入到TcaplusDB,后续针对批量解析这块单独介绍,这里只简单介绍上述导出的JSON文件导入到TcaplusDB。
JSON文件解析采用Python进行,同时引入TcaplusDB Python RESTful SDK,SDK使用方法参考官方文档。示例代码如下:
import json
import base64
#引入TcaplusRestClient类
from tcaplusdb_client.tcaplusdb_rest_client import TcaplusRestClient
client = None
#替换TcaplusDB集群访问地址
endpoint = "http://172.17.16.15"
#替换TcaplusDB访问ID
access_id = 20
#替换TcaplusDB集群密码
access_password = "xxx"
#替换TcaplusDB表格组ID
tablegroup_id = 1
#替换TcaplusDB表名
table_name = "test"
def init():
#初始化TcaplusDB客户端
global client
client = TcaplusRestClient(endpoint, access_id, access_passwd)
client.SetTargetTable(table_group_id=tablegroup_id, table_name=table_name)
def write():
#从文件读数据并写入TcaplusDB
filename='test.json'
with open(filename,'r') as f:
lines = f.readlines()
for line in lines:
json_str = json.loads(line)
player_info = json_str['player_info'].replace("base64:type15:","")
#将base64编码的字段反解码成字符串并转换成JSON格式
json_str['player_info'] = json.loads(base64.b64decode(player_info))
stat, resp = client.AddRecord(json_str)
if __name__ == '__main__':
init()
write()
对于MySQL导出的数据文件可以放腾讯云COS存储,方便其它组件拉取数据进行处理。COS相关介绍可参考官方文档。这里介绍Python SDK操作方法,具体使用手册可参考官方文档。
pip install -U cos-python-sdk-v5
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
#替换COS所在Region名
cosRegionName='ap-shanghai'
#替换腾讯云账户的密钥信息
SECRET_ID='xxx'
SECRET_KEY='xxx'
scheme='https'
config = CosConfig(Region=cosRegionName, SecretId=SECRET_ID, SecretKey=SECRET_KEY,Scheme=scheme)
client = CosS3Client(config)
# 创建bucket, 注意格式为migrate名-appid (appid是自己腾讯云账号下的appid)
response = client.create_bucket(
Bucket='migrate-1258272208'
)
#上传文件到COS
#替换bucket名,要上传的文件名及Key, PartSize指定分包大小(单位MB),MAXThread指定并发上传的线程数
response = client.upload_file(
Bucket='migrate-1258272208',
LocalFilePath='1.txt',
Key='1.txt',
PartSize=1,
MAXThread=10,
EnableMD5=False
)
#从COS下载文件
#替换Bucket名
response = client.get_object(
Bucket='migrate-1258272208',
Key='1.txt',
)
response['Body'].get_stream_to_file('output.txt')
迁移场景 | 依赖资源 | 资源下载地址 | 资源用途 |
---|---|---|---|
实时迁移 | tcaplus_tes.sql | 定义数据源表结构 | |
实时迁移 | test.proto | 定义TcaplusDB表结构 | |
实时迁移 | mysql_demo.py | 用于模拟写入MySQL数据,依赖mysqlclient库,参考网上资料安装 | |
实时迁移 | KafkaDemo.java | 数据订阅程序,从数据订阅管道拉取binlog捕获数据并解析写入到CKafka | |
实时迁移 | binlogsdk-2.8.2-jar-with-dependencies.jar | KafkaDemo依赖,binlog捕获SDK | |
实时迁移 | SLF4J.zip | KafkaDemo依赖,日志组件 | |
实时迁移 | kafka-clients-1.1.0.jar | KafkaDemo依赖,Kafka客户端 | |
实时迁移 | json-simple-1.1.jar | KafkaDemo依赖,json处理组件 | |
实时迁移 | scf_migrate_tcaplusdb.zip | SCF程序,从Ckafka摘取数据并写入TcaplusDB | |
离线迁移 | tcaplusdb-restapi-python-sdk-3.0.tgz | TcpalusDB Python RESTful SDK API, 基于包装好的RESTful 接口进行TcaplusDB数据操作 |
本文介绍了MySQL数据迁移TcaplusDB的两种方案: 实时和离线迁移。实时迁移采用订阅MySQL binlog的方式将数据订阅到CKafka, 通过SCF拉取CKafka数据进行实时写入到TcaplusDB。离线迁移可以采用Dump和Select两种方式进行数据导出并将导出文件解析写入到TcaplusDB。本文针对离线迁移数据解析未介绍批量解析方式,后续将独立介绍通过EMR方式进行批量解析,敬请期待!
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。