前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >seata RM源码分析

seata RM源码分析

作者头像
luoxn28
发布2021-02-26 15:13:01
6530
发布2021-02-26 15:13:01
举报
文章被收录于专栏:TopCoder

Seata 是一款阿里开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案,github地址:https://github.com/seata/seata。

RM模块是seata中全局事务参与者,其核心逻辑有:

  • 启动netty客户端:会启动RM客户端与TC通信
  • 数据源切面代理:SQL解析、分支事务注册/提交、undolog保存、分支事务状态上报
  • Rpc代理:在RPC流程中传递seata上下文(xid等,非本文分析重点)

同TM类似,RM侧也是有一个GlobalTransactionScanner类,来进行初始化的动作,GlobalTransactionScanner实现了InitializingBean,其afterPropertiesSet方法中会执行netty客户端初始化工作,逻辑如下:

代码语言:javascript
复制
private void initClient() {
    //初始化TM
    TMClient.init(applicationId, txServiceGroup);
    ...
    //初始化RM
    RMClient.init(applicationId, txServiceGroup);
    ... 
    // 注册Spring shutdown的回调,用来释放资源
    registerSpringShutdownHook();
 }

RM对应的客户端为RMClient,其初始化代码如下:

代码语言:javascript
复制
public static void init(String applicationId, String transactionServiceGroup) {
    // 设置资源管理和事务消息处理器
    RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
    rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
    rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
    rmNettyRemotingClient.init();
}

RMClient初始化首先注册各种类型的处理器(全局事务相关、分支事务相关、心跳相关),然后启动netty客户端:

代码语言:javascript
复制
private void registerProcessor() {
    // 1.registry rm client handle branch commit processor
    RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
    // 2.registry rm client handle branch commit processor
    RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
    // 3.registry rm handler undo log processor 其目前已经不再起任何作用了,undolog的删除作为AT模式下的commit处理器是会自动处理
    RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
    // 4.registry TC response processor
    ClientOnResponseProcessor onResponseProcessor =
        new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
    // 5.registry heartbeat message processor
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

public void init() {
    // 定时重连任务
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    // 请求future定时器
    super.init();
    // 标准的netty client初始化
    clientBootstrap.start();
}

启动netty完成之后,RM侧的业务SQL执行时,就会进行对应的分支事务的执行操作了,这块内容和TM原理解析中的SQL执行流程是一样的,这里就不在赘述。下面就重点关注下RM侧特有的一些消息处理器类,也就是在方法io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor中注册的各种处理器。

分支事务提交回滚

分支事务的提交和回滚分别对应处理器类RmBranchCommitProcessorRmBranchRollbackProcessor,二者对应的处理逻辑如下:

事务提交在AT模式下最后会返回BranchStatus.PhaseTwo_Committed,分支事务提交首先获取xid和resouceId,然后执行ResourceManager.branchCommit进行提交操作。

代码语言:javascript
复制
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
    throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();

    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
        applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
}

// 异步任务doBranchCommitSafely逻辑
scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS);

注意,AT模式下的提交目前只是简单的往asyncWorker中提交一个异步任务,后续会进行undolog的清除操作,注意:RM侧的事务提交已经在阶段一就已经完成了,因此这里主要就是进行undolog的清除操作了。

相对于事务提交操作,事务回滚要做的事情会多一些,比如找到对应的undolog然后进行数据的恢复(这里会开启一个新的事务来完成恢复操作)。事务回滚核心逻辑是调用ResourceManager.branchRollback来完成的,代码如下:

代码语言:javascript
复制
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    try {
        // 找到undolog,然后进行undo操作
     UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;
}

对于MySQL来说,进行undo的类为MySQLUndoLogManager,其undo方法主要逻辑如下:

  • 获取DB连接,根据xid和branchId获取对应的undolog
  • 首先判断undolog状态,然后将undolog反解析为sqlUndoLogs
  • 逆序排列,然后遍历执行每个undolog进行undo操作
  • 执行完毕后删除undolog,然后进行commit事务,最后返回

RM侧处理响应结果的逻辑大都较为简洁明了,这里就不再赘述了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TopCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分支事务提交回滚
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档