本文档主要介绍如何实时迁移AWS DynamoDB数据到腾讯云TcaplusDB。TcaplusDB是腾讯推出的一款全托管NoSQL数据库服务,专为游戏设计,立志于打造面向全球的精品云存储产品,提供高性能、低成本、易扩展、稳定、安全的存储服务。TcaplusDB与DynamoDB类似,数据模型采用的是KV和文档两种类型,以表为组织管理单位。相对DynamoDB表的schema-free模式,TcaplusDB采用的是schema架构,即需要用户提前定义好表的schema,但与传统关系型表结构定义相比,TcaplusDB支持更丰富的数据结构,如支持多层嵌套,满足多样化的数据定义需求。
DynamoDB对于海外用户应用广泛,作为AWS全托管的一款NoSQL数据库服务,能够满足大部分业务场景需要。为适配海外用户使用腾讯云产品的需要,依托DynamoDB完善的数据流机制和Lambda机制,可以实现业务不停服、数据实时迁移至腾讯云TcaplusDB的目标。
本文假定用户对DynamoDB的数据结构、数据类型、运作模式比较了解。如若有疑问,可以查阅详尽的DynamoDB官方文档了解相关内容。
从DynamoDB迁移到TcaplusDB需要考虑shema的转换问题,分两个场景:
针对上述两个场景, 设计了相应的方案来解决schema迁移转换问题:
本文所涉及的数据迁移架构如下:
DynamoDB数据实时写入,通过添加Lambda触发器来捕获DynamoDB的数据变更事件(增,删,改),Lambda函数捕获到事件后对其进行解析,判断事件类型并生成对应的TcaplusDB数据记录,然后发送到腾讯云的Ckafka消息队列组件,最后通过添加一个腾讯云SCF函数来捕获Ckafka写入的数据并进行解析写入TcaplusDB。
DynamoDB数据流机制可以实现数据变更的动态捕获,支持的变更操作如下:
操作类型 | DynamoDB | TcaplusDB | 说明 |
---|---|---|---|
插入 | INSERT | 对应Tcaplus.AddRecord | 插入一条记录,如果记录存在则报错 |
更新 | MODIFY | 对应Tcaplus.SetRecord/Tcaplus.FieldSetRecord | 更新一条记录(非主键字段),如果记录不存在则插入,存在则更新记录, 如果更新是部分字段则在TcaplusDB中对应的是FieldSetRecord接口 |
删除 | REMOVE | 对应Tcaplus.DeleteRecord | 删除一条记录 |
从上面架构来看,涉及AWS的部分主要是DynamoDB和Lambda, 涉及腾讯云的部分主要是Ckafka、SCF和TcaplusDB。 具体计费方式可查看对应产品的官网文档。
DynamoDB的数据模型与TcaplusDB存在一些差异,下面从三个维度展开介绍。
Aamazon DynamoDB | TencentCloud TcaplusDB | 备注 |
---|---|---|
Number | 取决于TcaplusDB的整形取值范围,如uint8-64,int8-64, float,double | |
String | string | |
Boolean | bool | |
Null | 无显示的null值 | 如果值未传入,TcaplusDB会隐式把字段值赋予相应数据类型的默认值,如0,'' |
Binary | bytes | |
Sets | array | TcaplusDB 有repeated关键字表示数组类型,数组元素类型可以是字符串、数值、字节、结构体类型 |
Map | struct | 如果Map结构属性一致,直接定义成TcaplusDB的struct |
DynamoDB主键由两部分组成: partition key
和sort key
, 前者用于分区,后者用于排序,支持只有partition key也支持两者的组合。DynamoDB主键可以无缝对应TcaplusDB的主键,在TcaplusDB中,支持最多四个字段作为联合主键,同时以主键的第一字段作为分区键。
DynamoDB的索引结构和TcaplusDB的有所区别,本文测试的TcaplusDB暂时不同步DynamoDB的全局索引和本地索引数据。后续待TcaplusDB分布式索引功能上线后再补充这一块的内容。
表信息项 | DynamoDB示例表 | TcaplusDB示例表 |
---|---|---|
表名 | migrate_test | migrate_test |
主键 | "player_id":String,"player_time":Number | "player_id":string, "player_time":int64 |
Key类型 | "player_id": hash_key, "player_time": sort_key | n/a |
非主键字段 | "player_email":String, "game_server_id": Number, "is_online": Boolean, "pay": Map, "pay.pay_id": Number, "pay.amount":Number, "pay.method":Number | "player_email":string, "game_server_id": int32, "is_online": bool, "pay": struct, "pay.pay_id": int64, "pay.amount":uint64, "pay.method":int64 |
假定用户已经有AWS账户并设置好Credentials,如果未设置请参考官方文档说明。
在DynamoDB控制台选定一个地域,如新加坡创建示例表migrate_test, 创建好后启动stream流,具体如下截图所示:
触发器创建的前提是需要提前创建Lambda函数,同时还涉及Lambda权限的配置,关于Lambda权限配置涉及AWS角色策略的创建。具体涉及步骤如下:
选择角色所属产品,这里选择Lambda产品,如下所示:
设置角色策略,关于Lambda需要的策略涉及两个:AWSLambdaDynamoDBExecutionRole
和AWSLambdaInvocation-DynamoDB
。具体如下:
创建一个基于Python3.6语言环境的函数,同时指定上一步为Lambda创建的角色,如下所示:
同时配置Lambda函数的环境变量,用于连接Ckafka需要,如下所示:
BROKER_LIST: 连接Ckafka集群的公网域名:端口
CKAFKA_INSTANCE_NAME: Ckafka实例ID
DYNAMODB_TABLE_NAME: DynamoDB表名
SASL_PASSWORD: SASL安全认证密码
SASL_USER: SASL安全认证用户
TOPIC_NAME: Ckafka topic名称
在创建触发器页面可以自动识别上述所创建的Lambda函数,同时设置批处理大小(注意:这个值设置不宜过大,和网络传输效率有关,过大容易丢包),并启动触发器。
假定用户已经有腾讯云账户并设置好Credentials, 如果未设置请参考官方文档说明。
进入TcaplusDB控制台, 选择地域新加坡,和DynamoDB保持一致,避免数据跨地域同步。创建表过程请参考官方文档说明, 创建表之前需要先创建TcaplusDB集群和表格组。
proto
(Google Protobuf)。同时VPC网络选择Default-VPC
, 子网选择Default-Subnet
,如下所示:创建好后如下所示:
syntax = "proto3"; // Specify the version of the protocol buffers language
import "tcaplusservice.optionv1.proto"; // Use the public definitions of TcaplusDB by importing them.
message migrate_test { // Define a TcaplusDB table with message
// Specify the primary keys with the option tcaplusservice.tcaplus_primary_key
// The primary key of a TcaplusDB table has a limit of 4 fields
option(tcaplusservice.tcaplus_primary_key) = "player_id, player_time";
// Specify the primary key indexes with the option tcaplusservice.tcaplus_index
option(tcaplusservice.tcaplus_index) = "index_1(player_id)";
// Value Types supported by TcaplusDB
// int32, int64, uint32, uint64, sint32, sint64, bool, fixed64, sfixed64, double, fixed32, sfixed32, float, string, bytes
// Nested Types Message
// primary key fields
string player_id = 1;
int64 player_time = 2;
string player_email = 3;
// Ordinary fields
int32 game_server_id = 4;
bool is_online = 5;
payment pay =6;
}
message payment {
int64 pay_id = 1;
uint64 amount = 2;
int64 method = 3;
}
创建表过程如下:
进入Ckafka控制台, 在新加坡地域创建一个入门型实例即可,其它都选默认,创建实例如下所示:
创建好后,需要操作以下几步以满足公网访问需要:
接入方式
栏选择添加路由策略
,路由类型选择公网域名接入
, 接入方式选择SASL_PLAINTEXT
。具体如下:进入SCF控制台, 在新加坡地域创建一个基于Python3.6的SCF函数。如下所示:
Default-VPC
, 子网选择Default-Subnet
即可。触发管理
页面新增加一个关于Ckafka的触发器,如下所示:SCF会自动识别同地域所创建的Ckafka实例和实例对应的topic,直接选择即可。
这里的代码指Lambda和SCF的代码,为保证统一,用的同一个代码包(dynamodb_migrate_tcaplusdb.zip),代码目录结构如下:
.
├── kafka
├── kafka_python-2.0.1.dist-info
├── config.py
├── config.pyc
├── dynamodb_migrate_tcaplusdb.zip
├── index.py
├── lambda_function.py
├── setup.cfg
└── tcaplusdb
├── __init__.py
├── http_client.py
├── log.py
├── tcaplusdb_rest_client.py
└── tcaplusdb_rest_exception.py
整体代码下载地址: dynamodb_migrate_tcaplusdb.zip。
Lambda为将数据发送至Ckafka,需要依赖kafka-python的包,直接用如下命令安装:
mkdir dynamodb-tcaplus-migration-realtime
cd dynamodb-tcaplus-migration-realtime
pip install kafka-python==1.0.0 -t .
TcaplusDB连接需要依赖Python RESTful SDK API。 参考代码包中的tcaplusdb目录。
连接配置依赖,需要配置Lambda和SCF的环境变量信息,参考代码包中的配置文件config.py
,具体内容如下:
#AWS Lambda Function Properties
#替换已申请的Ckafka接入的公网域名
BROKER_LIST="ckafka-instance.ap-use.ckafka.tencentcloudmq.com:6000"
#替换Ckafka Topic名称
TOPIC_NAME="test"
#替换Ckafka实例名
CKAFKA_INSTANCE_NAME="ckafka-instance"
#替换用户名
SASL_USERNAME="xxx"
#替换用户密码
SASL_PASSWORD="xxx"
#Dynamodb主键字段
DYNAMODB_PRIMARY_KEY = [{"NAME":"player_id","TYPE": "S"},{"NAME":"player_time","TYPE": "N"}]
#Dynamodb表名
DYNAMODB_TABLE_NAME="migrate_test"
#TencentCloud SCF Properties
#TcaplusDB集群连接地址,IPV4
ENDPOINT ="http://xxx.xxx.xxx.xxx"
#TcaplusDB集群接入ID
ACCESS_ID=28
#TcaplusDB集群连接密码
ACCESS_PASSWORD="test"
#TcaplusDB表所在表格组ID
TABLEGROUP_ID=1
#TcaplusDB表名
TCAPLUSDB_TABLE_NAME="migrate_test"
Lambda代码主要处理捕获DynamoDB数据变更事件,针对不同的事件类型,封装数据成TcaplusDB的记录格式,并写入Ckafka。参考代码包中的lambda_function.py
。
主入口为index.main_handle函数,处理从Ckafka消费数据并解析保存到TcaplusDB。参考代码包中的index.py
。
在上述工作都准备OK后,可以开始在DynamoDB插入数据进行演示。这里使用boto3工具进行数据操作,下面只模拟插入数据操作,其他类似,大家可自行在DynamoDB去更新和删除数据,看后端的数据流情况。
代码如下:
import boto3
table = boto3.resource('dynamodb').Table('migrate_test')
playerId="128"
playerTime=1591372738
response = table.put_item(
Item={
'player_id': playerId,
'player_time': playerTime,
'pay': {
'amount': 100,
'pay_id': 102,
'method': 3
},
'player_email':'wql@123.com',
'is_online': True,
'game_server_id': 112
}
)
print(response)
执行上述代码后,会在migrate_test
插入一条数据,可以从DynamoDB控制台查看数据是否插入成功,如果插入有问题,请检查AWS Credentials设置、表是否创建OK。
然后观察Lambda函数捕获情况,Lambda目前已同CloudWatch打通,即所捕获的所有事件都会有CloudWatch日志体现。具体如下所示:
从上述图,我们可以看到,捕获了一个INSERT
事件,事件中的Record数据和上述我们插入的保持一致。接下来,看下Ckafka的接收情况:
从上图可以看出,Ckafka也收到了来自Lambda函数所发送的数据,并发送一个Post类型的Http RESTful请求给TcaplusDB,插入一条数据并返回插入成功Response。为检查TcaplusDB数据是否有真的插入,这里用到一个tcaplus_client
工具,下载地址: tcaplus_client。tcaplus_client
工具使用说明请参考文档: Tcaplus_client使用说明.pdf 。注意:工具需要与TcaplusDB表所在的VPC保持一致,即用户需要申请一台CVM机器来执行tcaplus_client命令,CVM VPC需要与TcaplusDB集群保持一致。具体查看数据命令如下:
从上面截图可以看出,TcaplusDB也实时写入了数据,说明整个流程跑通。
本文介绍了如何实时增量迁移DynamoDB数据到TcaplusDB,下一阶段计划介绍如何全量离线迁移DynamoDB数据到TcaplusDB。对于TcaplusDB来说,未来也会走社区路线,把腾讯NoSQL在游戏行业的应用经验赋能给业界。如若有任何疑问,欢迎咨询: ballenwen@tencent.com
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。