最近遇到一个关于MySQL单表过大的问题,该表存放的主要是日志文件,且其中有一个字段存放的数据过大,导致占用空间过大以及查询效率的降低,这种设计其实是不合理的。目前该表占用1.2T容量,数据量超过3亿条,而这个RDS数据库的容量总共就2T,且由于种种原因无法扩容,迫不得已急需给出解决方案。
根据上面的背景,可得出以下这些问题,也给出了解决方案:
问题 | 解决方法 | |
---|---|---|
1 | 某字段占用空间较大,在MySQL中为text类型,存储的是json格式的数据,该字段平均占用空间为5KB | 对字段进行压缩,把json格式压缩成字节序列,压缩后可节省5倍空间左右 |
2 | 单表数据量过大,而我们的业务是基本只取本年的数据,该表中很多不使用的数据导致查询效率降低 | 对该表按年份分表,本年的数据为热数据,之前的数据为冷数据 |
3 | RDS服务器容量不足且无法扩容 | 考虑到以后业务数据的增长,我们决定直接买另一台RDS服务器,把冷数据迁移到新RDS服务器 |
具体步骤:在原先的数据库批量压缩字段 —> 批量迁移数据到新数据库
由于数据库一直在业务上被使用着,无法停下来专门给我们做这些处理,那么为了降低对业务的影响,我们只能选择在节假日或者晚上凌晨时候操作,因此所有的脚本文件都需要提前写好,到时候直接做批量处理。
现在先给出压缩与解压代码如下,把json的数据压缩成字节格式,然后采用Base64编码格式存储:
func Compress(s string, jsonFlag bool) string {
// 开启 了 json 判断 并且 传入的 值 并非 是 有效的 json,直接返回原来的值
if jsonFlag && !json.Valid([]byte(s)) || len(s) == 0 {
return s
}
data := []byte(s)
// 压缩
data = zipBytes(data)
// 转base64 存储
return setBase64(data)
}
func UnCompress(s string, jsonFlag bool) string {
// 开启 了 json 验证 并且 是有效的 json ,则 直接返回 原来的值
if jsonFlag && json.Valid([]byte(s)) || len(s) == 0 {
return s
}
data, err := getBase64(s)
if err != nil {
log.Error("解析报错")
return ""
}
data = uZipBytes(data)
return string(data)
}
func setBase64(data []byte) string {
a := base64.StdEncoding.EncodeToString(data)
return a
}
func getBase64(data string) ([]byte, error) {
a, err := base64.StdEncoding.DecodeString(data)
return a, err
}
func zipBytes(data []byte) []byte {
var in bytes.Buffer
z := zlib.NewWriter(&in)
z.Write(data)
z.Close()
return in.Bytes()
}
//zip解压
func uZipBytes(data []byte) []byte {
var out bytes.Buffer
var in bytes.Buffer
in.Write(data)
r, _ := zlib.NewReader(&in)
r.Close()
io.Copy(&out, r)
return out.Bytes()
}
上面的代码是对单条数据的压缩,现在需要从数据库查出数据,然后批量的压缩,采用更新的操作,需要考虑如下问题:
① 每一批取出多少条数据
② 批量压缩采用goroutine并发压缩
③ 批量更新如何操作
由于数据超过3亿条,因此要保证查询效率,不然查询速度会非常慢。具体做法:
select id, detail from log_table where id > last_id limit 10000;
这里假设要压缩的字段名为detail,表名为log_table,这种方式不仅命中了索引,还避免了全表扫描上面查出了1万条数据,接着要做的就是批量压缩,如果采用for循环1个1个的压缩,那么效率必然不是最高的,可以利用go语言并发的优势,把1万条数据分成10组,每组1千条数据,让这10组数据同时进行压缩,代码如下:
func() {
......
var wg sync.WaitGroup
wg.Add(10)
flagMap := make(map[int64]int, 0)
for i := 0; i < 10; i++ {
go func(ii int) {
defer wg.Done()
defer func() { // 防止发生panic之后暂停
err := recover()
if err != nil {
log.Info("err:", err)
}
}()
rwLock.Lock()
for j := ii * 1000; j < (ii+1)*1000; j++ { // 给goroutine划分执行区间
flag := false
detailTemp := processLogData[j].Detail
detailTemp, flag = util.CompressForUpdate(detailTemp, true)
if flag { // 如果不是有效json,则跳过
log.Info("该条json压缩失败或为空,已跳过,id:", processLogData[j].Id)
flagMap[processLogData[j].Id] = 2
} else {
processLogData[j].Detail = detailTemp
}
}
rwLock.Unlock()
}(i)
}
wg.Wait() // 等待所有goroutine执行完毕
......
}
func CompressForUpdate(s string, jsonFlag bool) (string, bool) {
// 开启 了 json 判断 并且 传入的 值 并非 是 有效的 json,直接返回原来的值
if jsonFlag && !json.Valid([]byte(s)) || len(s) == 0 {
return s, true
}
data := []byte(s)
data = zipBytes(data)
return setBase64(data), false
}
上面需要注意在每组goroutine内需要采用读写锁锁住,防止并发安全问题,因为里面有一个用来判空的map,是否需要判空根据不同业务决定
如果一条一条更新速度是极慢的,所以不推荐这种方法,这里采用的是批量更新的方式,经过试验,更新数据库字段,一次更新1000条,更新十次,会比一次更新1万条速度快很多,所以下面函数的tempList切片放的数据量是1千条,需要循环该函数10次才是1万条
func batchUpdate(tableName, fieldName string, tempList []models.LogTable) string {
/*
例子:
UPDATE tableName
SET fieldName = CASE id
WHEN 1 THEN 'value'
WHEN 2 THEN 'value'
WHEN 3 THEN 'value'
END
WHERE id IN (1,2,3);
*/
var sqlStr string
sqlStr = "UPDATE " + tableName + " SET " + fieldName + " = CASE id"
for i := 0; i < len(tempList); i++ {
id := fmt.Sprintf("%d", tempList[i].Id)
sqlStr += " WHEN " + id + " THEN " + tempList[i].Detail
if i == len(tempList)-1 {
sqlStr += " END"
}
}
sqlStr += " WHERE id IN ("
for i := 0; i < len(tempList); i++ {
sqlStr += fmt.Sprintf("%d", tempList[i].Id)
if i != len(tempList)-1 {
sqlStr += ","
} else {
sqlStr += ");"
}
}
return sqlStr
}
当脚本写好后需要把程序放到服务器上跑,且在内网的环境下进行。经过实验,查询+压缩+更新 1万条数据共花费4s左右时间,那么3亿条数据需要花费大概33小时
迁移主要包括查询和插入两个步骤,查询和上面的查询方法一样;经过比较,批量插入的时候每500条插入一次速度最快