之前有讲MYSQL连接协议, 也有讲过主从连接协议. 并附有相关python测试代码. 但对于主从连接的时候, GTID获取还是借用的现有的, 也就是没有做解析. 在我们解析了binlog之后. gtid信息就不在话下了. 格式就是PRE_GTID, 我这里就不再介绍了. 有兴趣的自己去看 https://www.modb.pro/db/1781217154309378048
每次重写之前的功能, 都想着写得简单方便实用点. 于是就是有了gtid += struct.pack('<QQ',*[int(_x) for _x in y.split("-")])
这种看起来不友好, 但又简单实用的写法了. 尽是些花里胡哨的 -_-
class GTID(object):
def __init__(self,data):
if isinstance(data,bytes):
self.bdata = data
else:
self.data = data
self.offset = 0
def read(self,n):
data = self.bdata[self.offset:self.offset+n]
self.offset += n
return data
def encode(self,tdata=None):
data = self.data if tdata is None else tdata
data = data.replace("\n","")
gtid_number = 0
ldata = data.rstrip(",").split(",")
gtid = struct.pack("<Q",len(ldata))
for x in ldata:
gtid_number += 0
_data = x.split(":")
uid = binascii.unhexlify(_data[0].replace('-', ''))
gtid += uid + struct.pack('<Q',len(_data[1:]))
for y in _data[1:]:
gtid += struct.pack('<QQ',*[int(_x) for _x in y.split("-")])
return gtid
def decode(self,):
# @https://github.com/ddcw/pymysqlbinlog : pymysqlbinlog/gtid_event.py
self.offset = 0
gtid_number = struct.unpack('<Q',self.read(8))[0]
gtid_list = []
for x in range(gtid_number):
server_uid_bdata = self.read(16)
if server_uid_bdata == b'':
break
sid = str(uuid.UUID(bytes=server_uid_bdata))
group_gno_number = struct.unpack('<Q',self.read(8))[0]
gtid_info = sid
for y in range(group_gno_number):
start = struct.unpack('<Q',self.read(8))[0]
stop = struct.unpack('<Q',self.read(8))[0]
gtid_info += f":{start}-{stop}"
gtid_list.append(gtid_info)
return ",".join([ str(x) for x in gtid_list ])
基本上只是对上一次的脚本做补充, 用法没变, 我这里就直接演示了. 直接展示又好像少了点什么, 所以我就和 pymysqlbinlog 结合起来了, 只打印DML语句信息. (基本上只是多个导包而已).
import t20240508
aa = t20240508.repl()
aa.server_id = 123456
aa.auto_position = True
aa.connect()
aa.request_dump("6d650f1f-ba4e-11ed-99ab-000c2980c11e:1-5") # 请求相关GTID. 包含结束的那个GTID值.(就是server端有个-1操作, 解析binlog的时候代码里面也有写 -1)
aa.parse_event()
我这里就只给t20240508.py的源码(大部分没变), 其它的之前都发过.
import testpymysql
import struct
import uuid
import binascii
from pymysqlbinlog.pymysqlbinlog import mysqlbinlog
from pymysqlbinlog.row_event import *
def _read_lenenc(bdata,i):
length = btoint(bdata[i:i+1])
i += 1
data = bdata[i:i+length]
i += length
return data,i
def event_header(bdata):
timestamp, event_type, server_id, event_size, log_pos, flags = struct.unpack("<LBLLLh",bdata[0:19])
return {"timestamp":timestamp,'event_type':event_type,'server_id':server_id,'event_size':event_size,'log_pos':log_pos,'flags':flags,}
class GTID(object):
def __init__(self,data):
if isinstance(data,bytes):
self.bdata = data
else:
self.data = data
self.offset = 0
def read(self,n):
data = self.bdata[self.offset:self.offset+n]
self.offset += n
return data
def encode(self,tdata=None):
data = self.data if tdata is None else tdata
data = data.replace("\n","")
gtid_number = 0
ldata = data.rstrip(",").split(",")
gtid = struct.pack("<Q",len(ldata))
for x in ldata:
gtid_number += 0
_data = x.split(":")
uid = binascii.unhexlify(_data[0].replace('-', ''))
gtid += uid + struct.pack('<Q',len(_data[1:]))
for y in _data[1:]:
gtid += struct.pack('<QQ',*[int(_x) for _x in y.split("-")])
return gtid
def decode(self,):
# @https://github.com/ddcw/pymysqlbinlog : pymysqlbinlog/gtid_event.py
self.offset = 0
gtid_number = struct.unpack('<Q',self.read(8))[0]
gtid_list = []
for x in range(gtid_number):
server_uid_bdata = self.read(16)
if server_uid_bdata == b'':
break
sid = str(uuid.UUID(bytes=server_uid_bdata))
group_gno_number = struct.unpack('<Q',self.read(8))[0]
gtid_info = sid
for y in range(group_gno_number):
start = struct.unpack('<Q',self.read(8))[0]
stop = struct.unpack('<Q',self.read(8))[0]
gtid_info += f":{start}-{stop}"
gtid_list.append(gtid_info)
return ",".join([ str(x) for x in gtid_list ])
class repl(testpymysql.mysql,mysqlbinlog):
def __init__(self,*args,**kwargs):
#super().__init__(*args,**kwargs)
self.filename = "None"
testpymysql.mysql.__init__(self,)
mysqlbinlog.__init__(self,filename="xx")
self.server_id = 12345678 #uint32
self.lport = 6666
self.auto_position = False #Ture:gtid
self.master_heartbeat_period = 30*1000000000 #设置心跳, 单位纳秒
self.log_file = 'm3308.001037' #其实可以show master status的, 但是我懒得去查了...
self.log_pos = 4
#懒得去计算gtid了, 直接用我的环境的现成的, 计算方式可以参考: pymysqlreplication 的 gtid.py
#使用: gtid.GtidSet(GTID_STR).encode()
self.ROLLBACK = False
self.checksum = None
#sql/rpl_slave.cc register_slave_on_master
def register_slave(self):
"""
COM_FLAG: 1 (COM_REGISTER_SLAVE:21)
server_id: 4
host,user,password 0
port: 2
rpl_recovery_rank:4 (0)
master_id:4 (0)
"""
bdata = struct.pack('<BLBBBHLL',21,self.server_id,0,0,0,self.lport,0,0)
self._next_seq_id = 0 #每个com都清零
self.write_pack(bdata)
rpack = self.read_pack()
return True if rpack[0:1] == b'\x00' else False
#sql/rpl_slave.cc request_dump
def request_dump(self,gtid):
self.query(f'SET @master_heartbeat_period = {self.master_heartbeat_period}')
#master_binlog_checksum, slave_uuid 算了
bdata = b''
if self.auto_position:
"""
COM_FLAG: 1 (COM_BINLOG_DUMP_GTID:30)
binlog_flags: 2
server_id: 4
BINLOG_NAME_INFO_SIZE: 4
BINLOG_NAME:
POS: 8
gtid_size: 4
gtid:
"""
#regiest first
aa = GTID(gtid)
self.bgtid = aa.encode()
bdata = struct.pack('<BHLLLQL',30,0,self.server_id,4,0,self.log_pos,len(self.bgtid)) + self.bgtid
self.register_slave()
else:
"""
COM_FLAG: 1 (COM_BINLOG_DUMP:18)
binlog-pos: 4
flags: 2
server_id: 4
binlog-filename:
"""
self.register_slave()
bdata = struct.pack('<BLHL',18,self.log_pos,0,self.server_id) + self.log_file.encode()
self._next_seq_id = 0
self.write_pack(bdata)
def read_event(self,):
bdata = self.read_pack()
if bdata[0:1] == b'\x00' and len(bdata) > 20:
return bdata[1:]
def event(self):
while True:
bdata = self.read_pack()
if bdata[0:1] == b'\x00' and len(bdata) > 20:
yield event_header(bdata[1:20])
def parse_event(self):
while True:
bdata = self.read_pack()
if bdata[0:1] == b'\x00' and len(bdata) > 20:
bdata = bdata[1:]
event_type = struct.unpack('>B',bdata[4:5])[0]
if event_type == 19:
aa = tablemap_event(bdata=bdata[19:],event_header=bdata[:19])
aa.init()
bdata = self.read_pack()[1:]
bb = row_event(bdata=bdata[19:],tablemap=aa,event_header=bdata[:19])
bb.init()
print(bb.read_sql())
def query(self,sql):
self._next_seq_id = 0
bdata = struct.pack('<B',3) + sql.encode()
self.write_pack(bdata)
#return self.result()
def close(self):
#self._next_seq_id = 0
self.write_pack(struct.pack('<B',1))
写本文的时候发生的事情, 记录一下.
之前环境迁移, 今天验证的时候,发现个别文件系统的部分文件目录丢失了, 在该文件系统的lost+found目录找到了相关文件和目录, 但未发现删除之类的信息, 虽然恢复起来很麻烦. 最终还是恢复成功了. 迁移前一定要做好备份
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。