在分布式系统中由于相关联的多个服务所在的数据库互相隔离,数据库无法使用本地事务来保证数据的一致性,因此需要使用分布式事务来保证数据的一致性
比如用户支付订单后,需要更改订单状态,还需要涉及其他服务的其他操作如:物流出货、积分变更、清空购物车等
由于它们数据所存储的数据库会互相隔离,当订单状态修改成功/失败时,其他服务对应的数据也需要修改成功/失败,否则就会出现数据不一致的情况
解决分布式事务常用的一种方案是使用MQ做补偿以此来达到数据的最终一致性,而RocketMQ提供的事务消息能够简单、有效的解决分布式事务满足数据最终一致性
在上面支付订单的案例中,主分支只需要修改订单状态,其他分支(出货、积分变更、清空购物车)都可以发送事务消息来达到数据最终一致性
本篇文章通过分析源码来描述事务消息的原理以及使用方法,并总结使用时需要注意的地方,思维导图如下:
往期回顾:
RocketMQ(六):Consumer Rebalanc原理(运行流程、触发时机、导致的问题)
RocketMQ(三):面对高并发请求,如何高效持久化消息?(核心存储文件、持久化核心原理、源码解析)
RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
事务消息拥有“半事务”的状态,在这种状态下即时消息到达broker也不能进行消费,直到主分支本地事务提交,事务消息才能被下游服务进行消费
使用事务消息的流程如下:
发送事务消息的生产者为TransactionMQProducer,TransactionMQProducer的使用与默认类似,只不过需要设置事务监听器TransactionListener
事务监听器接口需要实现executeLocalTransaction用于执行本地事务和checkLocalTransaction用于broker回查本地事务状态
public interface TransactionListener {
//执行本地事务
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
//回查事务状态
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
它们的结果LocalTransactionState有三个状态:COMMIT_MESSAGE 成功、ROLLBACK_MESSAGE 失败、UNKNOW 未知
当为未知状态时,后续还会触发回查,直到超过次数或者返回成功/失败
调用 sendMessageInTransaction
发送事务消息,其中参数arg用于扩展,执行本地事务时会携带使用
public TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg)
根据我们的情况写出TransactionListener的模拟代码
public class OrderPayTransactionListener implements TransactionListener {
//执行本地事务 其中参数arg传递的为订单ID
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object orderId) {
try {
//修改订单状态为已支付
if (updatePayStatus((Long) orderId)) {
return LocalTransactionState.COMMIT_MESSAGE;
}
} catch (Exception e) {
//log
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//回查状态
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Long orderId = Long.valueOf(msg.getBuyerId());
//查询订单状态是否为已支付
try {
if (isPayed(orderId)) {
return LocalTransactionState.COMMIT_MESSAGE;
}
} catch (Exception e) {
//log
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
执行本地事务时如果成功修改订单状态就返回commit,回查状态时判断订单状态是否为已支付
前文分析过通用的发送消息流程,而 sendMessageInTransaction
发送消息调用通用的发送消息流程外,还会在期间多做一些处理:
sendDefaultImpl
****(校验参数、获取路由信息、选择队列、封装消息、netty rpc调用,期间检查超时、超时情况)executeLocalTransaction
****endTransactionOneway
****(有回查机制无需考虑失败)public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
//检查事务监听器
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
//清除延迟等级 使用事务消息就不能使用延迟消息
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
//检查消息
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
//标记事务消息为半事务状态
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
//存储生产者组
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//通用的发送消息流程
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//成功执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
//刷盘超时 或 从节点不可用 相当于失败
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//通知broker本地事务状态
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
//返回
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
在发送的流程中主要会在发送前做一些准备如标记半事务状态,然后进行同步发送,如果发送成功则会执行本地事务,最后单向通知broker本地事务的状态
之前的文章也说过消息到达后,broker存储消息的原理(先写CommitLog、再写其他文件)
事务消息在消息进行存储前,会使用桥接器TransactionalMessageBridge调用 parseHalfMessageInner
,将消息topic改为半事务topic并存储原始topic、队列ID(方便后续重新投入真正的topic)
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
//存储真正的topic和队列ID
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//设置本次要投入的topic为半事务Topic RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
这样半事务状态的事务消息就会被投入半事务topic的队列中,这样就能达到消费者无法消费半事务消息(因为它们没被投入真实的队列中)
生产者发送完消息,无论成功还是失败都会通知broker本地事务状态
broker使用EndTransactionProcessor处理**END_TRANSACTION
**的请求,其核心逻辑就是根据本地事务状态进行处理:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
//构建通用响应
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
//解析
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
//从节点直接响应失败
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
return response;
}
//...
OperationResult result = new OperationResult();
//成功的情况
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
//调用 getHalfMessageByOffset 根据commitLog偏移量获取半事务消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
//找到半事务消息
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//检查数据
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//检查成功
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
//清理半事务标识
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
//重新将消息投入真实topic、队列中
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
//重投成功 删除事务消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
//失败情况 也是调用 getHalfMessageByOffset 根据commitLog偏移量获取半事务消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//找到消息检查完就删除事务消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
成功或失败(commit/rollback)的情况都会删除半消息,成功的情况会将消息投入原始队列中,后续进行消费
而还要一种无法确定是成功还是失败的情况,需要broker进行回查
负责回查的组件是TransactionalMessageCheckService:定期对半事务消息进行检查是否需要回查(在broker启动初始化时进行初始化)
其检查回查会调用this.brokerController.getTransactionalMessageService().check
它会遍历事务topic RMQ_SYS_TRANS_HALF_TOPIC
下的所有队列,循环取出半事务消息进行判断是否需要进行回查
由于代码较多,这里总结性贴出关键代码:
getHalfMsg
resolveDiscardMsg
putBackHalfMsgQueue
**resolveHalfMsg
**public void check(long transactionTimeout, int transactionCheckMax,AbstractTransactionalMessageCheckListener listener) {
//遍历事务topic下的所有队列,循环取出半事务消息进行判断是否需要进行回查
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
for (MessageQueue messageQueue : msgQueues) {
while (true) {
//超出边界会退出 代码略
//获取半事务消息 这里的参数i是半事务消息偏移量
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
//needDiscard 超过最大检查次数 15次
//needSkip 超过最大存储时间 72h
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
//丢弃半事务消息
listener.resolveDiscardMsg(msgExt);
//..
continue;
}
//...
//超过6s
if (isNeedCheck) {
//将消息重投入半消息队列
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
//向生产者发送回查的请求 CHECK_TRANSACTION_STATE
listener.resolveHalfMsg(msgExt);
}
}
}
}
请求回查并不会返回结果,生产者处理查询到事务状态后,再向broker发送单向的本地事务状态通知请求(endTransactionOneway)
ClientRemotingProcessor 处理broker发送的回查请求CHECK_TRANSACTION_STATE
ClientRemotingProcessor 调用 checkTransactionState
进行处理:
transactionListener.checkLocalTransaction
endTransactionOneway
** 对broker进行通知本地事务状态结果涉及多服务的分布式事务,不追求强一致性的情况下,可考虑使用事务消息+重试的方式尽力达到最终一致性
使用时需要定义事务监听器执行本地事务和回查本地事务状态的方法,注意可能消费失败,重试多次后需要记录并特殊处理避免最终数据不一致
使用事务消息时无法设置延迟级别,发送前会将延迟级别清除
发送事务消息采用同步发送,在发送前会标记为半(事务)消息状态,在发送成功后会调用事务监听器执行本地事务,最后单向通知broker本地事务的状态
broker存储半(事务)消息前会更改它的topic、queueId,将其持久化到事务(半消息)topic中,以此来达到暂时不可以被消费的目的
broker接收本地事务状态通知时,如果是commit状态则将半(事务)消息重投入原始topic、队列中,以此来达到可以进行消费的目的,并且删除半(事务)消息,rollback状态也会删除半(事务)消息,只有未知状态的情况下不删除,等待后续触发回查机制
broker使用组件定期遍历事务(半消息)topic下的所有队列检查是否需要进行回查,遍历队列时循环取出半(事务)消息,如果超过检查最大次数(15)或超时(72h),则会丢弃消息;否则会将半(事务)消息放回队列,当事务消息超过6s时会触发回查机制,向produce发送检查事务状态的请求
produce收到回查请求后,调用事务监听器的检查事务状态方法,并又调用单向通知broker本地事务状态
😁我是菜菜,热爱技术交流、分享与写作,喜欢图文并茂、通俗易懂的输出知识
📚在我的博客中,你可以找到Java技术栈的各个专栏:Java并发编程与JVM原理、Spring和MyBatis等常用框架及Tomcat服务器的源码解析,以及MySQL、Redis数据库的进阶知识,同时还提供关于消息中间件和Netty等主题的系列文章,都以通俗易懂的方式探讨这些复杂的技术点
🏆除此之外,我还是掘金优秀创作者、腾讯云年度影响力作者、华为云年度十佳博主....
👫我对技术交流、知识分享以及写作充满热情,如果你愿意,欢迎加我一起交流(vx:CaiCaiJava666),也可以持续关注我的公众号:菜菜的后端私房菜,我会分享更多技术干货,期待与更多志同道合的朋友携手并进,一同在这条充满挑战与惊喜的技术之旅中不断前行
🤝如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
📖本篇文章被收入专栏 消息中间件,感兴趣的朋友可以持续关注~
📝本篇文章、笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的朋友可以star持续关注~
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。