前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用python+binlog实现正向和反向sql的记录

使用python+binlog实现正向和反向sql的记录

原创
作者头像
保持热爱奔赴山海
修改2024-08-23 09:22:17
1760
修改2024-08-23 09:22:17
举报
文章被收录于专栏:数据库相关

背景

我们如果能将binlog全量采集下来,可以便于事件的回溯,也便于审计。

如果我们能在记录binlog的同时把对应的回滚的sql也生成,这样在需要回滚的时候,就可以不用去使用my2sql等工具再次进行处理,增加特殊场景下的数据闪回的处理的效率。

基于这个这个设想,产生了这篇博文。

架构图

大致思路: 使用python-replication 消费MySQL的binlog,并对其进行处理后将生成的结果存到kafka,然后使用clickhouse_sinker去消费kafka数据,将最终结果存到clickhouse里。

arch
arch

说明:

上图中,其中的clickhouse_sinker也可以使用自己编写python代码去实现对应的consumer功能,ClickHouse也可以使用databend或StarRocks之类的数据据库来替代。

代码实现

注意,下面的代码只是一个最基础的实现,有很多硬编码的地方。

代码语言:python
代码运行次数:0
复制
# -*- coding: utf-8 -*-

"""
binlogevent.event_type的数值和编码的对应关系:
WRITE_ROWS_EVENT_V2 = 30
UPDATE_ROWS_EVENT_V2 = 31
DELETE_ROWS_EVENT_V2 = 32
"""


import json
import datetime
from kafka import KafkaProducer
import mysql.connector
import logging
import sys

logging.basicConfig(
    level=logging.INFO,
    filename="binlog_audit.log",
    filemode="a",
    format="%(asctime)s - "
    "%(pathname)s[line:%(lineno)d] - "
    "%(levelname)s: %(message)s",
)

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent,
)

MYSQL_SETTINGS = {"host": "127.0.0.1", "port": 3306, "user": "dts", "passwd": "123456"}

# kafka配置信息
bootstrap_servers = "127.0.0.1:9092"
producer_config = {
    "bootstrap_servers": bootstrap_servers,
    "batch_size": 16384,
    "linger_ms": 10,
    "acks": 1,
    "retries": 3,
}

producer = KafkaProducer(**producer_config)


def get_binlog_pos():
    # 用于获取最新的binlog的位置
    connection = mysql.connector.connect(
        host="127.0.0.1", user="dts", password="123456"
    )

    res = []

    mycursor = connection.cursor()
    mycursor.execute("SHOW MASTER STATUS")
    result = mycursor.fetchone()
    log_file = result[0]
    log_pos = result[1]
    res.append(log_file)
    res.append(log_pos)
    return res


def main():
    try:
        with open("binlog.pos", "r") as f:
            res = f.readlines()
            res1, res2 = res[0].split(",")
            logging.info(f"读取到的之前binlog pos文件信息, binlog文件名:{res1} ,位移: {res2}")

        stream = BinLogStreamReader(
            connection_settings=MYSQL_SETTINGS,
            ignore_decode_errors=True,
            resume_stream=True,
            server_id=311,
            only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
            blocking=True,
            log_file=res1,
            log_pos=int(res2),
        )

    # 如果读取之前的binlog.pos失败,则从最新的位置开始消费
    except Exception as e:
        print(str(e))
        logging.info("未找到之前的binlog pos文件信息,从最新的binlog pos开始读取")
        stream = BinLogStreamReader(
            connection_settings=MYSQL_SETTINGS,
            ignore_decode_errors=True,
            resume_stream=True,
            server_id=311,
            only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
            blocking=True,
        )

    for binlogevent in stream:
        ts1 = datetime.datetime.fromtimestamp(binlogevent.timestamp)
        ts2 = binlogevent.timestamp
        event_type_code = binlogevent.event_type  # 这里返回的是数值

        if int(event_type_code) == 30:
            event_type = "INSERT"
        elif int(event_type_code) == 31:
            event_type = "UPDATE"
        elif int(event_type_code) == 32:
            event_type = "DELETE"
        else:
            event_type = "不支持的event类型"

        event_size = binlogevent.event_size  # binlog event的大小
        schema = binlogevent.schema
        table = binlogevent.table
        # 这里我们只采集需要关注的库,减少处理数据量
        if schema not in ["sbtest", "percona"]:
            continue
        # rows指的是binlog event涉及到的数据行数(对于大事务在python-replicaiton处理的时候会被拆成多条,这里的值并不准确)
        rows = len(binlogevent.rows)
        binlog_detail = binlogevent._RowsEvent__rows

        msg = dict()
        msg["ts1"] = str(ts1)
        msg["ts2"] = ts2
        msg["event_type"] = event_type
        msg["event_size"] = event_size
        msg["schema_name"] = schema
        msg["table_name"] = table
        msg["row_count"] = rows
        msg["cluster"] = "DBA-TEST"  # 这里加了一个字段,用于标识MySQL所属的集群

        for row in binlog_detail:
            if event_type == "INSERT":
                values = row["values"]
                # 正向SQL
                insert_sql = ""
                for ii in values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    insert_sql = insert_sql + kv + ","
                forward_sql = (
                    f"/* PY-FORWARD */ INSERT INTO {schema}.{table} SET {insert_sql}".rstrip(
                        ","
                    )
                    + ";"
                )
                # print(f"正向SQL --- {forward_sql}")
                msg["forward_sql"] = forward_sql

                # 反向SQL
                delete_sql = ""
                for ii in values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    delete_sql = delete_sql + kv + " AND "
                rollback_sql = (
                    f"/* PY-ROLLBACK */ DELETE FROM {schema}.{table} WHERE {delete_sql}".rstrip(
                        " AND "
                    )
                    + ";"
                )
                # print(f"反向SQL --- {rollback_sql}")
                msg["rollback_sql"] = rollback_sql

            if event_type == "UPDATE":
                before_values = row["before_values"]
                after_values = row["after_values"]

                # 反向sql (回滚sql)
                before_sql = ""
                for ii in before_values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    before_sql = before_sql + kv + ","
                before_conditions = f"{before_sql}".rstrip(",")

                after_sql = ""
                for ii in after_values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    after_sql = after_sql + kv + " AND "
                after_conditions = f"{after_sql}".rstrip(",")

                rollback_sql = (
                    f"/* PY-ROLLBACK */ UPDATE {schema}.{table} SET {before_conditions}"
                    + " WHERE "
                    + after_conditions.rstrip(" AND ")
                    + ";"
                )
                # print(rollback_sql)
                msg["rollback_sql"] = rollback_sql

                # 正向sql(原始sql)
                before_sql = ""
                for ii in before_values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    before_sql = before_sql + kv + " AND "
                before_conditions = f"{before_sql}".rstrip(",")

                after_sql = ""
                for ii in after_values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    after_sql = after_sql + kv + ","
                after_conditions = f"{after_sql}".rstrip(",")

                forward_sql = (
                    f"/* PY-FORWARD */ UPDATE {schema}.{table} SET {after_conditions}"
                    + " WHERE "
                    + before_conditions.rstrip(" AND ")
                    + ";"
                )
                # print(forward_sql)
                msg["forward_sql"] = forward_sql

            if event_type == "DELETE":
                values = row["values"]

                # 正向sql(原始sql)
                delete_sql = ""
                for ii in values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    delete_sql = delete_sql + kv + " AND "
                forward_sql = (
                    f"/* PY-FORWARD */ DELETE FROM {schema}.{table} WHERE {delete_sql}".rstrip(
                        " AND "
                    )
                    + ";"
                )
                # print(forward_sql)
                msg["forward_sql"] = forward_sql

                # 反向sql(回滚sql)
                insert_sql = ""
                for ii in values.items():
                    kv = f"`{ii[0]}`='{ii[1]}'"
                    insert_sql = insert_sql + kv + ","
                rollback_sql = (
                    f"/* PY-ROLLBACK */ INSERT INTO {schema}.{table} SET {insert_sql}".rstrip(
                        ","
                    )
                    + ";"
                )
                # print(rollback_sql)
                msg["rollback_sql"] = rollback_sql

            producer.send(
                "binlog_audit",
                value=json.dumps(msg).encode("utf-8"),
            )
            
            # 记录下binlog位移
            start_pos = get_binlog_pos()
            with open("binlog.pos", "w+") as f:
                f.writelines(str(start_pos[0] + "," + str(start_pos[1])))

    stream.close()


if __name__ == "__main__":
    main()

注意: 为了保证数据的有序性,我们的kakfka需要使用单partition方式,但是这样的话,处理效率就会比较低。

一个比较好的优化方法:

在创建kafka topic的时候创建多个partition;同时修改上面的python代码,通过自定义partition策略(例如基于table进行分发,同一个table的binlog进到相同的partition里面,这样就可以保证同一个table的binlog是有序的)。

优化后的代码,敬请期待。

clickhouse_sinker的部署

我这里用的版本是 clickhouse_sinker_3.1.8

binlog_audit.hjson 内容如下:

代码语言:txt
复制
{
    clickhouse: {
        hosts: [
            [
                127.0.0.1
            ]
        ]
        port: 9000
        db: default
        username: ""
        password: ""
        retryTimes: 0
    }
    kafka: {
        brokers: 127.0.0.1:9092
    }
    task: {
        name: binlog_audit
        topic: binlog_audit
        consumerGroup: clickhouse_sinker_binlog_audit
        earliest: false
        parser: json
        autoSchema: true
        tableName: binlog_audit
        excludeColumns: []
        bufferSize: 50000
        "dynamicSchema": {
            "enable": true,
            "maxDims": 1024,
            "whiteList": "^[0-9A-Za-z_]+$",
            "blackList": "@"
        },
        flushInterval: 1,
    }
    logLevel: info
}

前台启动:

代码语言:txt
复制
./clickhouse_sinker --local-cfg-file binlog_audit.hjson

clickhouse中的记录的binlog明细

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 架构图
  • 代码实现
  • clickhouse_sinker的部署
  • clickhouse中的记录的binlog明细
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档