Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

项目实战:消息丢失与重复消费

多朋友对于消息不丢和重复消费问题,一直都是在背。下面来看看我的实际项目中是如何做到项目落地的。

常规回答如下:

消息丢失分为:

生产者发送到消息队列时失败

由于消息队列宕机之类的事情,导致消息丢失

消费者处理消息时,把消息给搞丢了

解决消息丢失,通常有如何步骤:

消息确认机制

消息的持久化

消息ACK

这么回答基本上就表明你是背的八股文,因为市面上就是这样的八股文。

我在充电桩项目中,就有对此进行实战。

下面,我来说说核心实现步骤:

1、对消息进行持久化

@Bean

public Queue queueMessageUserPoint() {

return new Queue(RabbitMQConstantEnum.USER_INVITED_REGISTRY.getQueue(),true);

}

@Bean

TopicExchange exchangeUserPoint() {

  return new TopicExchange(RabbitMQConstantEnum.USER_INVITED_REGISTRY.getExchange(), true, false);

}

2、创建本地消息记录表

CREATE TABLE `retry_message` (

`id` bigint NOT NULL AUTO_INCREMENT,

`type` int DEFAULT NULL,

`content` text,

`retried_times` int DEFAULT NULL,

`retry` int NOT NULL DEFAULT '1',

`create_time` datetime DEFAULT NULL,

`status` int DEFAULT '0',

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

3、消息发送失败后记录到本地表

@Slf4j

@Component

publicclass InvitedRegistryProducer {

  @Resource

  private RetryMessageMapper retryMessageMapper;

  public void send(InvitedRegistryMessage invitedRegistryMessage) {

      String content = JSON.toJSONString(invitedRegistryMessage);

      RabbitTemplate rabbitTemplate = ApplicationContextFactory.getBean(RabbitTemplate.class);

      CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());

      rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

          if (ack) {

              log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);

          } else {

              RetryMessage retryMessage = new RetryMessage();

              retryMessage.setContent(content);

              retryMessage.setRetry(5);

              retryMessage.setCreateTime(new Date());

              retryMessage.setStatus(0);

              retryMessage.setRetriedTimes(0);

              retryMessage.setType(0);

              retryMessageMapper.insert(retryMessage);

              log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause);

          }

      });

      rabbitTemplate.setReturnCallback((msg, replyCode, replyText, exchange, routingKey) -> {

          log.info("UserPointConfirm ReturnsCallback 消息:{},回应码:{},回应信息:{},交换机:{},路由键:{}"

                  , msg, replyCode

                  , replyText, exchange

                  , routingKey);

      });

      rabbitTemplate.convertAndSend(RabbitMQConstantEnum.USER_INVITED_REGISTRY.getExchange()+"1"

              , RabbitMQConstantEnum.USER_INVITED_REGISTRY.getRoutingKey(), content, message1 -> {

                  message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化

                  return message1;

              }, correlationId);

  }

}

4、失败消息重试

@Component

publicclass InvitedRegistryRetryMessageJob {

  @Resource

  private RetryMessageMapper retryMessageMapper;

  @Resource

  private InvitedRegistryProducer invitedRegistryIncomeProducer;

  @XxlJob("invitedRegistryIncomeRetryMessageJob")

  public void process() {

      log.info("开始执行 Invited user registry income retry messageJob 定时任务");

      XxlJobHelper.log("start userPointRetryMessageJob job");

      int countRetryMessage = retryMessageMapper.countRetryMessage(0, 0);

      if (countRetryMessage == 0) {

          log.info(" 执行结束  Invited user registry income retry messageJob 没有消息需要重发");

      }

      List<RetryMessage> retryMessages = retryMessageMapper.selectRetryMessage(0, 0);

      for (RetryMessage retryMessage : retryMessages) {

          invitedRegistryIncomeProducer.sendMessage(retryMessage);

      }

  }

}

5、消费者实现

@RabbitListener(queues = RabbitMqQueueConstant.USER_INVITED_REGISTRY_QUEUE)

@Component

@Slf4j

publicclass UserInvitedRegistryConsumer {

  @Resource

  private UserInvitedRegistryService userInvitedRegistryService;

  @RabbitHandler

  public void process(Object data, Channel channel, Message message) throws IOException {

      try {

          log.info("消费者接受到的消息是:{},消息体为:{}", data, message);

          InvitedRegistryMessage invitedRegistryMessage = JSON.parseObject(new String(message.getBody()), InvitedRegistryMessage.class);

          userInvitedRegistryService.invitedRegistry(invitedRegistryMessage);

          channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

      } catch (Exception exception) {

  @Override

  public void invitedRegistry(InvitedRegistryMessage invitedRegistryMessage) {

      GlobalProperty chargeGlobalProperty = globalPropertyMapper.selectByPrimaryKey(1);

      Integer busyDefault = chargeGlobalProperty.getBusiDefault();

      Long userId = invitedRegistryMessage.getUserId();

      RLock lock = redissonClient.getLock(RedisConstantPre.USER_INFO_ID_LOCK_PRE + userId);

      lock.lock();

      try {

          int count = chargeInviteRecordMapper.countByReqId(invitedRegistryMessage.getReqId());

          if (count == 0) {

              log.info("【重复消费】,请求参数={}",invitedRegistryMessage);

              return;

          }

          //邀请人余额 增加

          ChargeUser chargeUser = chargeUserMapper.selectByPrimaryKey(userId);

          //新余额=当前余额+邀请收益

          chargeUser.setBalance(chargeUser.getBalance().add(invitedRegistryMessage.getAmount()));

          chargeUserMapper.updateByPrimaryKey(chargeUser);

          //邀请记录

          ChargeInviteRecord chargeInviteRecord = new ChargeInviteRecord();

          Date date = new Date();

          chargeInviteRecord.setInvitedTime(date);

          chargeInviteRecord.setInvitedUserId(invitedRegistryMessage.getNewUserId());

          chargeInviteRecord.setInviteUserId(invitedRegistryMessage.getUserId());

          chargeInviteRecordMapper.insert(chargeInviteRecord);

          //邀请收益记录

          ChargeUserIncome chargeUserIncome = new ChargeUserIncome();

          chargeUserIncome.setIncome(new BigDecimal(busyDefault));

          chargeUserIncome.setUserId(invitedRegistryMessage.getUserId());

          chargeUserIncome.setCreateTime(date);

          chargeUserIncomeMapper.insert(chargeUserIncome);

      } finally {

          lock.unlock();

      }

  }

流程图

下面来画张图给大家梳理一下:

以上就是如何解决消息不丢失的问题,其实细心的朋友可以看到重复消费问题也解决了。

以上代码都在我的充电桩项目中,想想设计文档可以宠我的知识星球中获取。

https://www.woaijava.cc/

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OWfGLid8P4WBmH7p745llK_g0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券
首页
学习
活动
专区
圈层
工具