Seata 是一款阿里开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案,github地址:https://github.com/seata/seata。
RM模块是seata中全局事务参与者,其核心逻辑有:
同TM类似,RM侧也是有一个GlobalTransactionScanner
类,来进行初始化的动作,GlobalTransactionScanner
实现了InitializingBean,其afterPropertiesSet方法中会执行netty客户端初始化工作,逻辑如下:
private void initClient() {
//初始化TM
TMClient.init(applicationId, txServiceGroup);
...
//初始化RM
RMClient.init(applicationId, txServiceGroup);
...
// 注册Spring shutdown的回调,用来释放资源
registerSpringShutdownHook();
}
RM对应的客户端为RMClient,其初始化代码如下:
public static void init(String applicationId, String transactionServiceGroup) {
// 设置资源管理和事务消息处理器
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
rmNettyRemotingClient.init();
}
RMClient初始化首先注册各种类型的处理器(全局事务相关、分支事务相关、心跳相关),然后启动netty客户端:
private void registerProcessor() {
// 1.registry rm client handle branch commit processor
RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
// 2.registry rm client handle branch commit processor
RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
// 3.registry rm handler undo log processor 其目前已经不再起任何作用了,undolog的删除作为AT模式下的commit处理器是会自动处理
RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
// 4.registry TC response processor
ClientOnResponseProcessor onResponseProcessor =
new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
// 5.registry heartbeat message processor
ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}
public void init() {
// 定时重连任务
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
// 请求future定时器
super.init();
// 标准的netty client初始化
clientBootstrap.start();
}
启动netty完成之后,RM侧的业务SQL执行时,就会进行对应的分支事务的执行操作了,这块内容和TM原理解析中的SQL执行流程是一样的,这里就不在赘述。下面就重点关注下RM侧特有的一些消息处理器类,也就是在方法io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor
中注册的各种处理器。
分支事务的提交和回滚分别对应处理器类RmBranchCommitProcessor
和RmBranchRollbackProcessor
,二者对应的处理逻辑如下:
事务提交在AT模式下最后会返回BranchStatus.PhaseTwo_Committed
,分支事务提交首先获取xid和resouceId,然后执行ResourceManager.branchCommit
进行提交操作。
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
throws TransactionException {
String xid = request.getXid();
long branchId = request.getBranchId();
String resourceId = request.getResourceId();
String applicationData = request.getApplicationData();
BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
applicationData);
response.setXid(xid);
response.setBranchId(branchId);
response.setBranchStatus(status);
}
// 异步任务doBranchCommitSafely逻辑
scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS);
注意,AT模式下的提交目前只是简单的往
asyncWorker
中提交一个异步任务,后续会进行undolog的清除操作,注意:RM侧的事务提交已经在阶段一就已经完成了,因此这里主要就是进行undolog的清除操作了。
相对于事务提交操作,事务回滚要做的事情会多一些,比如找到对应的undolog然后进行数据的恢复(这里会开启一个新的事务来完成恢复操作)。事务回滚核心逻辑是调用ResourceManager.branchRollback
来完成的,代码如下:
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
try {
// 找到undolog,然后进行undo操作
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
对于MySQL来说,进行undo的类为MySQLUndoLogManager,其undo方法主要逻辑如下:
RM侧处理响应结果的逻辑大都较为简洁明了,这里就不再赘述了。