状态机设计组件:seata-saga-statemachine-designer 状态机在线画图工具:saga_designer
github上Seata-sample有完整的示例代码,Seata Saga模式中有此示例的完整介绍和分析。这里仅摘取部分和介绍原理有关的代码进行分析。
mysql示例:
CREATE TABLE IF NOT EXISTS `seata_state_machine_def`
(
`id` VARCHAR(32) NOT NULL COMMENT 'id',
`name` VARCHAR(128) NOT NULL COMMENT 'name',
`tenant_id` VARCHAR(32) NOT NULL COMMENT 'tenant id',
`app_name` VARCHAR(32) NOT NULL COMMENT 'application name',
`type` VARCHAR(20) COMMENT 'state language type',
`comment_` VARCHAR(255) COMMENT 'comment',
`ver` VARCHAR(16) NOT NULL COMMENT 'version',
`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',
`status` VARCHAR(2) NOT NULL COMMENT 'status(AC:active|IN:inactive)',
`content` TEXT COMMENT 'content',
`recover_strategy` VARCHAR(16) COMMENT 'transaction recover strategy(compensate|retry)',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE IF NOT EXISTS `seata_state_machine_inst`
(
`id` VARCHAR(128) NOT NULL COMMENT 'id',
`machine_id` VARCHAR(32) NOT NULL COMMENT 'state machine definition id',
`tenant_id` VARCHAR(32) NOT NULL COMMENT 'tenant id',
`parent_id` VARCHAR(128) COMMENT 'parent id',
`gmt_started` DATETIME(3) NOT NULL COMMENT 'start time',
`business_key` VARCHAR(48) COMMENT 'business key',
`start_params` TEXT COMMENT 'start parameters',
`gmt_end` DATETIME(3) COMMENT 'end time',
`excep` BLOB COMMENT 'exception',
`end_params` TEXT COMMENT 'end parameters',
`status` VARCHAR(2) COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`compensation_status` VARCHAR(2) COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`is_running` TINYINT(1) COMMENT 'is running(0 no|1 yes)',
`gmt_updated` DATETIME(3) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `unikey_buz_tenant` (`business_key`, `tenant_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
CREATE TABLE IF NOT EXISTS `seata_state_inst`
(
`id` VARCHAR(48) NOT NULL COMMENT 'id',
`machine_inst_id` VARCHAR(128) NOT NULL COMMENT 'state machine instance id',
`name` VARCHAR(128) NOT NULL COMMENT 'state name',
`type` VARCHAR(20) COMMENT 'state type',
`service_name` VARCHAR(128) COMMENT 'service name',
`service_method` VARCHAR(128) COMMENT 'method name',
`service_type` VARCHAR(16) COMMENT 'service type',
`business_key` VARCHAR(48) COMMENT 'business key',
`state_id_compensated_for` VARCHAR(50) COMMENT 'state compensated for',
`state_id_retried_for` VARCHAR(50) COMMENT 'state retried for',
`gmt_started` DATETIME(3) NOT NULL COMMENT 'start time',
`is_for_update` TINYINT(1) COMMENT 'is service for update',
`input_params` TEXT COMMENT 'input parameters',
`output_params` TEXT COMMENT 'output parameters',
`status` VARCHAR(2) NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
`excep` BLOB COMMENT 'exception',
`gmt_updated` DATETIME(3) COMMENT 'update time',
`gmt_end` DATETIME(3) COMMENT 'end time',
PRIMARY KEY (`id`, `machine_inst_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8;
<bean id="dataSource" class="org.h2.jdbcx.JdbcConnectionPool" destroy-method="dispose">
<constructor-arg>
<bean class="org.h2.jdbcx.JdbcDataSource">
<property name="URL" value="jdbc:h2:mem:seata_saga" />
<property name="user" value="sa" />
<property name="password" value="sa" />
</bean>
</constructor-arg>
</bean>
<jdbc:initialize-database data-source="dataSource">
<jdbc:script location="classpath:sql/h2_init.sql" />
</jdbc:initialize-database>
<bean id="stateMachineEngine" class="io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine">
<property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
</bean>
<bean id="dbStateMachineConfig" class="io.seata.saga.engine.config.DbStateMachineConfig">
<property name="dataSource" ref="dataSource"></property>
<property name="resources" value="statelang/*.json"></property>
<property name="enableAsync" value="true"></property>
<property name="threadPoolExecutor" ref="threadExecutor"></property>
<property name="applicationId" value="saga_sample"></property>
<property name="txServiceGroup" value="my_test_tx_group"></property>
</bean>
<bean id="threadExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
<property name="threadNamePrefix" value="SAGA_ASYNC_EXE_" />
<property name="corePoolSize" value="1" />
<property name="maxPoolSize" value="20" />
</bean>
<bean class="io.seata.saga.rm.StateMachineEngineHolder">
<property name="stateMachineEngine" ref="stateMachineEngine"/>
</bean>
{
"Name": "reduceInventoryAndBalance",
"Comment": "reduce inventory then reduce balance in a transaction",
"StartState": "ReduceInventory",
"Version": "0.0.1",
"States": {
"ReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceInventory",
"Next": "ChoiceState",
"Input": [
"$.[businessKey]",
"$.[count]"
],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
},
"ChoiceState":{
"Type": "Choice",
"Choices":[
{
"Expression":"[reduceInventoryResult] == true",
"Next":"ReduceBalance"
}
],
"Default":"Fail"
},
"ReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "reduce",
"CompensateState": "CompensateReduceBalance",
"Input": [
"$.[businessKey]",
"$.[amount]",
{
"throwException" : "$.[mockReduceBalanceFail]"
}
],
"Output": {
"compensateReduceBalanceResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger"
}
],
"Next": "Succeed"
},
"CompensateReduceInventory": {
"Type": "ServiceTask",
"ServiceName": "inventoryAction",
"ServiceMethod": "compensateReduce",
"Input": [
"$.[businessKey]"
]
},
"CompensateReduceBalance": {
"Type": "ServiceTask",
"ServiceName": "balanceAction",
"ServiceMethod": "compensateReduce",
"Input": [
"$.[businessKey]"
]
},
"CompensationTrigger": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type":"Succeed"
},
"Fail": {
"Type":"Fail",
"ErrorCode": "PURCHASE_FAILED",
"Message": "purchase failed"
}
}
}
创建StateMachineEngine
StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationContext.getBean("stateMachineEngine");
执行-同步
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);
执行-异步
StateMachineInstance inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams, CALL_BACK);
Saga模式是一种长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者。
注:此图来自seata官网的博客。
状态机引擎的设计主要分成三层, 上层依赖下层,从下往上分别是:
注:此图来自seata官网的博客。
注:此图来自seata官网的博客。 Saga模式下,事务会根据json配置的state来执行,如果前一个state的正向服务执行成功,那么就路由到下一个state并执行下一个state的正向服务,如果执行失败,那么基于CompensateState属性执行补偿服务。
从代码的角度来看,Saga执行过程如下:
从时序图上可以看到,Saga模式和AT、TCC模式有较大的差异:
Saga的分布式事务由StateMachineEngine开启。目前支持同步、异步方式,示例代码如下:
StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) ;
StateMachineInstance startAsync(String stateMachineName, String tenantId, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;
StateMachineEngine开启事务,主要包括以下几件事情:
private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, boolean async, AsyncCallback callback) throws EngineExecutionException {
if (async && !stateMachineConfig.isEnableAsync()) {
throw new EngineExecutionException(
"Asynchronous start is disabled. please set StateMachineConfig.enableAsync=true first.",
FrameworkErrorCode.AsynchronousStartDisabled);
}
if (StringUtils.isEmpty(tenantId)) {
tenantId = stateMachineConfig.getDefaultTenantId();
}
StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);
ProcessContextBuilder contextBuilder = ProcessContextBuilder.create().withProcessType(ProcessType.STATE_LANG)
.withOperationName(DomainConstants.OPERATION_NAME_START).withAsyncCallback(callback).withInstruction(
new StateInstruction(stateMachineName, tenantId)).withStateMachineInstance(instance)
.withStateMachineConfig(getStateMachineConfig()).withStateMachineEngine(this);
Map<String, Object> contextVariables;
if (startParams != null) {
contextVariables = new ConcurrentHashMap<>(startParams.size());
nullSafeCopy(startParams, contextVariables);
} else {
contextVariables = new ConcurrentHashMap<>();
}
instance.setContext(contextVariables);
contextBuilder.withStateMachineContextVariables(contextVariables);
contextBuilder.withIsAsyncExecution(async);
ProcessContext processContext = contextBuilder.build();
if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {
stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);
}
if (StringUtils.isEmpty(instance.getId())) {
instance.setId(
stateMachineConfig.getSeqGenerator().generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
}
if (async) {
stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);
} else {
stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);
}
return instance;
}
状态机的流程处理主要包括两个部分:执行state、路由到下一个state。在ProcessController中定义了状态机的执行流程:
public void process(ProcessContext context) throws FrameworkException {
try {
businessProcessor.process(context);
businessProcessor.route(context);
} catch (FrameworkException fex) {
throw fex;
} catch (Exception ex) {
LOGGER.error("Unknown exception occurred, context = {}", context, ex);
throw new FrameworkException(ex, "Unknown exception occurred", FrameworkErrorCode.UnknownAppError);
}
}
StateMachineProcessHandler对状态机的每一种type都配置了handler,如下图所示:
public void initDefaultHandlers() {
if (stateHandlers.isEmpty()) {
stateHandlers.put(DomainConstants.STATE_TYPE_SERVICE_TASK, new ServiceTaskStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_SCRIPT_TASK, new ScriptTaskStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_SUB_MACHINE_COMPENSATION, new ServiceTaskStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_SUB_STATE_MACHINE, new SubStateMachineHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_CHOICE, new ChoiceStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_SUCCEED, new SucceedEndStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_FAIL, new FailEndStateHandler());
stateHandlers.put(DomainConstants.STATE_TYPE_COMPENSATION_TRIGGER, new CompensationTriggerStateHandler());
}
}
结合前面的示例数据,分析状态机是如何找到合适的State处理器的:
"States": {
"ReduceInventory": {
"Type": "ServiceTask", -> 使用ServiceTaskStateHandler处理
"ServiceName": "inventoryAction", -> 对应到bean的名字
"ServiceMethod": "reduce", ->对应到bean中的具体方法
"CompensateState": "CompensateReduceInventory", -> 补偿的state
"Next": "ChoiceState", ->下一个state
"Input": [
"$.[businessKey]",
"$.[count]"
],
"Output": {
"reduceInventoryResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
}
}
}
public void process(ProcessContext context) throws FrameworkException {
StateInstruction instruction = context.getInstruction(StateInstruction.class);
State state = instruction.getState(context);
String stateType = state.getType();
StateHandler stateHandler = stateHandlers.get(stateType);
List<StateHandlerInterceptor> interceptors = null;
if (stateHandler instanceof InterceptableStateHandler) {
interceptors = ((InterceptableStateHandler)stateHandler).getInterceptors();
}
List<StateHandlerInterceptor> executedInterceptors = null;
Exception exception = null;
try {
if (CollectionUtils.isNotEmpty(interceptors)) {
executedInterceptors = new ArrayList<>(interceptors.size());
for (StateHandlerInterceptor interceptor : interceptors) {
executedInterceptors.add(interceptor);
interceptor.preProcess(context);
}
}
stateHandler.process(context);
} catch (Exception e) {
exception = e;
throw e;
} finally {
if (CollectionUtils.isNotEmpty(executedInterceptors)) {
for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
StateHandlerInterceptor interceptor = executedInterceptors.get(i);
interceptor.postProcess(context, exception);
}
}
}
}
主要包括以下内容:
public void recordStateStarted(StateInstance stateInstance, ProcessContext context) {
if (stateInstance != null) {
boolean isUpdateMode = isUpdateMode(stateInstance, context);
// if this state is for retry, do not register branch
if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {
if (isUpdateMode) {
stateInstance.setId(stateInstance.getStateIdRetriedFor());
} else {
// generate id by default
stateInstance.setId(generateRetryStateInstanceId(stateInstance));
}
}
// if this state is for compensation, do not register branch
else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {
stateInstance.setId(generateCompensateStateInstanceId(stateInstance, isUpdateMode));
} else {
branchRegister(stateInstance, context);
}
if (StringUtils.isEmpty(stateInstance.getId()) && seqGenerator != null) {
stateInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_INST));
}
stateInstance.setSerializedInputParams(paramsSerializer.serialize(stateInstance.getInputParams()));
if (!isUpdateMode) {
executeUpdate(stateLogStoreSqls.getRecordStateStartedSql(dbType),
STATE_INSTANCE_TO_STATEMENT_FOR_INSERT, stateInstance);
} else {
// if this retry/compensate state do not need persist, just update last inst
executeUpdate(stateLogStoreSqls.getUpdateStateExecutionStatusSql(dbType),
stateInstance.getStatus().name(), new Timestamp(System.currentTimeMillis()),
stateInstance.getMachineInstanceId(), stateInstance.getId());
}
}
}
当type=ServiceTask时,将会由ServiceTaskStateHandler处理,具体逻辑如下:
public void process(ProcessContext context) throws EngineExecutionException {
StateInstruction instruction = context.getInstruction(StateInstruction.class);
ServiceTaskStateImpl state = (ServiceTaskStateImpl) instruction.getState(context);
String serviceName = state.getServiceName();
String methodName = state.getServiceMethod();
StateInstance stateInstance = (StateInstance) context.getVariable(DomainConstants.VAR_NAME_STATE_INST);
Object result;
try {
List<Object> input = (List<Object>) context.getVariable(DomainConstants.VAR_NAME_INPUT_PARAMS);
//Set the current task execution status to RU (Running)
stateInstance.setStatus(ExecutionStatus.RU);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(">>>>>>>>>>>>>>>>>>>>>> Start to execute State[{}], ServiceName[{}], Method[{}], Input:{}",
state.getName(), serviceName, methodName, input);
}
if (state instanceof CompensateSubStateMachineState) {
//If it is the compensation of the substate machine,
// directly call the state machine's compensate method
result = compensateSubStateMachine(context, state, input, stateInstance,
(StateMachineEngine) context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_ENGINE));
} else {
StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(
DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
ServiceInvoker serviceInvoker = stateMachineConfig.getServiceInvokerManager().getServiceInvoker(
state.getServiceType());
if (serviceInvoker == null) {
throw new EngineExecutionException("No such ServiceInvoker[" + state.getServiceType() + "]",
FrameworkErrorCode.ObjectNotExists);
}
if (serviceInvoker instanceof ApplicationContextAware) {
((ApplicationContextAware) serviceInvoker).setApplicationContext(
stateMachineConfig.getApplicationContext());
}
result = serviceInvoker.invoke(state, input.toArray());
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("<<<<<<<<<<<<<<<<<<<<<< State[{}], ServiceName[{}], Method[{}] Execute finish. result: {}",
state.getName(), serviceName, methodName, result);
}
if (result != null) {
stateInstance.setOutputParams(result);
((HierarchicalProcessContext) context).setVariableLocally(DomainConstants.VAR_NAME_OUTPUT_PARAMS,
result);
}
} catch (Throwable e) {
LOGGER.error("<<<<<<<<<<<<<<<<<<<<<< State[{}], ServiceName[{}], Method[{}] Execute failed.",
state.getName(), serviceName, methodName, e);
((HierarchicalProcessContext) context).setVariableLocally(DomainConstants.VAR_NAME_CURRENT_EXCEPTION, e);
EngineUtils.handleException(context, state, e);
}
}
执行完业务代码以后,会进入拦截器后置处理流程,主要包括以下内容:
路由的具体过程如下:
StateMachineProcessRouter代码:
public Instruction route(ProcessContext context) throws FrameworkException {
StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);
State state;
if (stateInstruction.getTemporaryState() != null) {
state = stateInstruction.getTemporaryState();
stateInstruction.setTemporaryState(null);
} else {
StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(
DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(
stateInstruction.getStateMachineName(), stateInstruction.getTenantId());
state = stateMachine.getStates().get(stateInstruction.getStateName());
}
String stateType = state.getType();
StateRouter router = stateRouters.get(stateType);
Instruction instruction = null;
List<StateRouterInterceptor> interceptors = null;
if (router instanceof InterceptableStateRouter) {
interceptors = ((InterceptableStateRouter)router).getInterceptors();
}
List<StateRouterInterceptor> executedInterceptors = null;
Exception exception = null;
try {
if (CollectionUtils.isNotEmpty(interceptors)) {
executedInterceptors = new ArrayList<>(interceptors.size());
for (StateRouterInterceptor interceptor : interceptors) {
executedInterceptors.add(interceptor);
interceptor.preRoute(context, state);
}
}
instruction = router.route(context, state);
} catch (Exception e) {
exception = e;
throw e;
} finally {
if (CollectionUtils.isNotEmpty(executedInterceptors)) {
for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
StateRouterInterceptor interceptor = executedInterceptors.get(i);
interceptor.postRoute(context, state, instruction, exception);
}
}
//if 'Succeed' or 'Fail' State did not configured, we must end the state machine
if (instruction == null && !stateInstruction.isEnd()) {
EngineUtils.endStateMachine(context);
}
}
return instruction;
}
TaskStateRouter代码:
public Instruction route(ProcessContext context, State state) throws EngineExecutionException {
StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);
if (stateInstruction.isEnd()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"StateInstruction is ended, Stop the StateMachine executing. StateMachine[{}] Current State[{}]",
stateInstruction.getStateMachineName(), state.getName());
}
return null;
}
//The current CompensationTriggerState can mark the compensation process is started and perform compensation
// route processing.
State compensationTriggerState = (State)context.getVariable(
DomainConstants.VAR_NAME_CURRENT_COMPEN_TRIGGER_STATE);
if (compensationTriggerState != null) {
return compensateRoute(context, compensationTriggerState);
}
//There is an exception route, indicating that an exception is thrown, and the exception route is prioritized.
String next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);
if (StringUtils.hasLength(next)) {
context.removeVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);
} else {
next = state.getNext();
}
//If next is empty, the state selected by the Choice state was taken.
if (!StringUtils.hasLength(next) && context.hasVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE)) {
next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);
context.removeVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);
}
if (!StringUtils.hasLength(next)) {
return null;
}
StateMachine stateMachine = state.getStateMachine();
State nextState = stateMachine.getState(next);
if (nextState == null) {
throw new EngineExecutionException("Next state[" + next + "] is not exits",
FrameworkErrorCode.ObjectNotExists);
}
stateInstruction.setStateName(next);
return stateInstruction;
}
在StateMachineProcessRouter#route方法中,我们可以看到,当没有下一个state时,将会通过以下代码结束事务,
if (CollectionUtils.isNotEmpty(executedInterceptors)) {
for (int i = executedInterceptors.size() - 1; i >= 0; i--) {
StateRouterInterceptor interceptor = executedInterceptors.get(i);
interceptor.postRoute(context, state, instruction, exception);
}
}
//if 'Succeed' or 'Fail' State did not configured, we must end the state machine
if (instruction == null && !stateInstruction.isEnd()) {
EngineUtils.endStateMachine(context);
}
结束事务的具体过程:
Saga模式下TC中执行的内容和AT模式非常相似,不过在TC收到Global Commit/Rollback时,TC仅修改全局事务状态,而不会立即进行回滚操作。具体是通过DefaultCoordinator中retryRollbacking、retryCommitting定时任务完成。
Saga中分支事务参与这不管理分支事务的状态,所有均在TM中基于state进行管理,所以TC通知Global Commit/Rollback时,TM会作为RM来完成state的状态管理(Commit/Rollback)。 Commit流程:
Rollback:
在SagaResourceManager中,有branchCommit、branchRollback的处理逻辑,如下:
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
try {
StateMachineInstance machineInstance = StateMachineEngineHolder.getStateMachineEngine().forward(xid, null);
if (ExecutionStatus.SU.equals(machineInstance.getStatus())
&& machineInstance.getCompensationStatus() == null) {
return BranchStatus.PhaseTwo_Committed;
} else if (ExecutionStatus.SU.equals(machineInstance.getCompensationStatus())) {
return BranchStatus.PhaseTwo_Rollbacked;
} else if (ExecutionStatus.FA.equals(machineInstance.getCompensationStatus()) || ExecutionStatus.UN.equals(
machineInstance.getCompensationStatus())) {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
} else if (ExecutionStatus.FA.equals(machineInstance.getStatus())
&& machineInstance.getCompensationStatus() == null) {
return BranchStatus.PhaseOne_Failed;
}
} catch (ForwardInvalidException e) {
LOGGER.error("StateMachine forward failed, xid: " + xid, e);
//if StateMachineInstanceNotExists stop retry
if (FrameworkErrorCode.StateMachineInstanceNotExists.equals(e.getErrcode())) {
return BranchStatus.PhaseTwo_Committed;
}
} catch (Exception e) {
LOGGER.error("StateMachine forward failed, xid: " + xid, e);
}
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
try {
StateMachineInstance stateMachineInstance = StateMachineEngineHolder.getStateMachineEngine().reloadStateMachineInstance(xid);
if (stateMachineInstance == null) {
return BranchStatus.PhaseTwo_Rollbacked;
}
if (RecoverStrategy.Forward.equals(stateMachineInstance.getStateMachine().getRecoverStrategy())
&& (GlobalStatus.TimeoutRollbacking.name().equals(applicationData)
|| GlobalStatus.TimeoutRollbackRetrying.name().equals(applicationData))) {
LOGGER.warn("Retry by custom recover strategy [Forward] on timeout, SAGA global[{}]", xid);
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
stateMachineInstance = StateMachineEngineHolder.getStateMachineEngine().compensate(xid,
null);
if (ExecutionStatus.SU.equals(stateMachineInstance.getCompensationStatus())) {
return BranchStatus.PhaseTwo_Rollbacked;
}
} catch (EngineExecutionException e) {
LOGGER.error("StateMachine compensate failed, xid: " + xid, e);
//if StateMachineInstanceNotExists stop retry
if (FrameworkErrorCode.StateMachineInstanceNotExists.equals(e.getErrcode())) {
return BranchStatus.PhaseTwo_Rollbacked;
}
} catch (Exception e) {
LOGGER.error("StateMachine compensate failed, xid: " + xid, e);
}
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
分布式事务 Seata 及其三种模式详解 | Meetup#3 回顾
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/191742.html原文链接:https://javaforall.cn