当executeLocalTransaction方法返回UNKNOW以后,RocketMQ会每隔一段时间调用一次checkLocalTransaction,这个方法的返回值决定着这个消息的最终归宿。...我们运行一下生产端的代码,看看是什么情况,日志如下: executeLocalTransaction termId=15 term:com.example.rocketmqdemo.entity.Term...@4a3509b0 sendResult:COMMIT_MESSAGE 时间:Wed Jun 17 08:56:49 CST 2020 我们看到,先执行的是executeLocalTransaction...那么为什么在执行executeLocalTransaction方法时,能够查询到termId=16的数据呢?还记得MySQL的事务隔离级别吗?忘了的赶快复习一下吧。...我们可以在executeLocalTransaction方法中,固定返回UNKNOW,数据插入数据库成功也好,失败也罢,我们都返回UNKNOW。
msg.TransactionId = transactionId } localTransactionState = tp.listener.ExecuteLocalTransaction...= primitive.CommitMessageState { rlog.Error("executeLocalTransaction but state unexpected...SendMessageInTransaction方法先执行tp.producer.SendSync(ctx, msg),然后根据rsp.Status来做不同处理;对于primitive.SendOK执行tp.listener.ExecuteLocalTransaction
public interface RocketMQLocalTransactionListener { // 用来执行本地事务 RocketMQLocalTransactionState executeLocalTransaction
只有发送 half 消息成功,并且发送状态为 SEND_OK ,才会执行 executeLocalTransaction ,向 t_order_transaction_log 表写入事务日志 那么即使...消息会被回滚,积分服务收不到消息 导致的问题就是:用户下单成功,但却没有增加积分 可见关注 half 消息发送结果的重要性 4、half 消息发送成功,且返回的是 SEND_OK 状态,但 executeLocalTransaction...只有 half 消息发送成功,且返回状态是 SEND_OK 才会执行 executeLocalTransaction 即使 Broker 回查事务状态,得到的结果始终是 UNKNOW ,最终...half 消息会被回滚,积分服务收不到消息 订单服务与积分服务都没有落库成功,也就说是没问题的 3、half 消息发送成功,且返回的状态是 SEND_OK ,但 executeLocalTransaction...订单数据回滚了,积分服务未收到消息,那么此种情况是没问题的 看起来挺顺眼,异常情况下也没什么问题 rocketmq-client 的 bug 需要弄清楚的问题有两个: 1、half 消息中置, executeLocalTransaction
这个接口有两个方法: 提交本地事务 executeLocalTransaction 检查本地事务状态 checkLocalTransaction 下面代码是发送事务消息的方法: //类 DefaultMQProducerImpl...= null) { //执行本地事务 localTransactionState = transactionListener.executeLocalTransaction(msg,...使用事务消息时自己定义 TransactionListener,实现执行本地事务 executeLocalTransaction 和检查本地事务状态 checkLocalTransaction 这两个方法
createTransactionListener() { return new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction...createTransactionListener() 在init()方法中调用,构造实现RocketMQ的TransactionListener接口的匿名类,该接口需要实现如下两个方法: executeLocalTransaction..."); // 开始执行本地事务 localTransactionState = transactionListener.executeLocalTransaction
hread.sleep(1000); } producer.shutdown(); } } (2) 实现TransactionListener接口 实现executeLocalTransaction...消息生产者需要在executeLocalTransaction中执行本地事务当事务半消息提交成功,执行完毕后需要返回事务状态码。...String, Integer> localTrans = new ConcurrentHashMap(); @Override public LocalTransactionState executeLocalTransaction
RocketMQ架构设计 如图所示: RocketMQ事务消息原理 依赖于TransactionListener接口 executeLocalTransaction方法会在发送消息后调用,用于执行本地事务...commit/rollback:producer再通过TransactionListener的executeLocalTransaction方法执行本地事务,当producer的localTransaction
this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); } 复制代码 这里的transactionListener就是上面所说的消息回查的类,它提供了2个方法: executeLocalTransaction...//发送消息成功,执行本地事务 localTransactionState = transactionListener.executeLocalTransaction...//发送消息成功,执行本地事务 localTransactionState = transactionListener.executeLocalTransaction...break; default: break; } 事务半消息发送成功后,会调用transactionListener.executeLocalTransaction
第一个方法executeLocalTransaction 为执行本地事务;第二个方法checkLocalTransaction 为检查本地事务的执行状态,也就是回查动作。...我们需要实现RocketMQLocalTransactionListener接口,在executeLocalTransaction方法中执行本地事务,在执行checkLocalTransaction回查方法时告诉...rocketMqTransactionLogMapper; /** * 执行本地事务 */ @Override public RocketMQLocalTransactionState executeLocalTransaction
implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction...本地事务处理:实现 TransactionListener 接口的 executeLocalTransaction() 方法处理本地事务逻辑。
我们只需要编写对应的本地事务执行方法executeLocalTransaction和本地事务执行结果检查方法checkLocalTransaction,RocketMQ会自动调用本地事务执行。...参见官方示例:// TransactionProducer.java// 需要自定义一个TransactionListener用于执行事务executeLocalTransaction和事务执行结果回查...new ConcurrentHashMap(); /** * 执行本地事务 */ @Override public LocalTransactionState executeLocalTransaction...log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction
producer.setExecutorService(executorService); 3.2.2 设置监听回查 设置监听事务的接口TransactionListener:当发送半消息成功时,使用executeLocalTransaction...new ConcurrentHashMap(); //半消息发送成功触发此方法来执行本地事务 @Override public LocalTransactionState executeLocalTransaction
TransactionListenerImpl implements TransactionListener { //执行本地事务 @Override public LocalTransactionState executeLocalTransaction...优惠券状态改为Used } public void cancel() { 优惠券状态恢复为Unused } } 如果执行到TransactionListenerImpl.executeLocalTransaction...,说明半消息已经发送成功了,也说明OrderService.makePayment方法的四个步骤都执行成功了,此时tcc也到了confirm阶段,所以在TransactionListenerImpl.executeLocalTransaction
String, Integer> localTrans = new ConcurrentHashMap(); @Override public LocalTransactionState executeLocalTransaction...executeLocalTransaction 方法是具体的业务逻辑处理(本地事务处理); checkLocalTransaction 方法是Broker回查本地事务状态接口。...null) { // 处理本地事务 localTransactionState = transactionListener.executeLocalTransaction...如果broker返回SEND_OK,调用TransactionListener的executeLocalTransaction方法执行本地事务,并返回本地事务执行结果。
send方法中,这里能获取到 @return 返回事务状态,COMMIT :提交 ROLLBACK :回滚 UNKNOW :回调 */ RocketMQLocalTransactionState executeLocalTransaction...accountInfoDao; //消息发送成功回调此方法,此方法执行本地事务 @Override @Transactional public RocketMQLocalTransactionState executeLocalTransaction...accountChangeEvent); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("executeLocalTransaction
来复用大部分发送消息相关的逻辑,这个类的代码量非常少只有100来行,下面是这个类的sendMessageTransaction方法 这里的transactionListener就是上面所说的消息回查的类,它提供了2个方法: executeLocalTransaction...执行本地事务 Producer 半事务消息发送成功后,会调用transactionListener.executeLocalTransaction方法执行本地事务。
第一个方法executeLocalTransaction 为执行本地事务; 第二个方法checkLocalTransaction 为检查本地事务的执行状态,也就是回查动作。...rocketMqTransactionLogMapper; /** * 执行本地事务 */ @Override public RocketMQLocalTransactionState executeLocalTransaction
localTrans = new ConcurrentHashMap(); @Override public LocalTransactionState executeLocalTransaction...transactionIndex.getAndIncrement(); System.out.println(Thread.currentThread().getName()+ "-executeLocalTransaction
领取专属 10元无门槛券
手把手带您无忧上云