多朋友对于消息不丢和重复消费问题,一直都是在背。下面来看看我的实际项目中是如何做到项目落地的。
常规回答如下:
消息丢失分为:
生产者发送到消息队列时失败
由于消息队列宕机之类的事情,导致消息丢失
消费者处理消息时,把消息给搞丢了
解决消息丢失,通常有如何步骤:
消息确认机制
消息的持久化
消息ACK
这么回答基本上就表明你是背的八股文,因为市面上就是这样的八股文。
我在充电桩项目中,就有对此进行实战。
下面,我来说说核心实现步骤:
1、对消息进行持久化
@Bean
public Queue queueMessageUserPoint() {
return new Queue(RabbitMQConstantEnum.USER_INVITED_REGISTRY.getQueue(),true);
}
@Bean
TopicExchange exchangeUserPoint() {
return new TopicExchange(RabbitMQConstantEnum.USER_INVITED_REGISTRY.getExchange(), true, false);
}
2、创建本地消息记录表
CREATE TABLE `retry_message` (
`id` bigint NOT NULL AUTO_INCREMENT,
`type` int DEFAULT NULL,
`content` text,
`retried_times` int DEFAULT NULL,
`retry` int NOT NULL DEFAULT '1',
`create_time` datetime DEFAULT NULL,
`status` int DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
3、消息发送失败后记录到本地表
@Slf4j
@Component
publicclass InvitedRegistryProducer {
@Resource
private RetryMessageMapper retryMessageMapper;
public void send(InvitedRegistryMessage invitedRegistryMessage) {
String content = JSON.toJSONString(invitedRegistryMessage);
RabbitTemplate rabbitTemplate = ApplicationContextFactory.getBean(RabbitTemplate.class);
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);
} else {
RetryMessage retryMessage = new RetryMessage();
retryMessage.setContent(content);
retryMessage.setRetry(5);
retryMessage.setCreateTime(new Date());
retryMessage.setStatus(0);
retryMessage.setRetriedTimes(0);
retryMessage.setType(0);
retryMessageMapper.insert(retryMessage);
log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause);
}
});
rabbitTemplate.setReturnCallback((msg, replyCode, replyText, exchange, routingKey) -> {
log.info("UserPointConfirm ReturnsCallback 消息:{},回应码:{},回应信息:{},交换机:{},路由键:{}"
, msg, replyCode
, replyText, exchange
, routingKey);
});
rabbitTemplate.convertAndSend(RabbitMQConstantEnum.USER_INVITED_REGISTRY.getExchange()+"1"
, RabbitMQConstantEnum.USER_INVITED_REGISTRY.getRoutingKey(), content, message1 -> {
message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化
return message1;
}, correlationId);
}
}
4、失败消息重试
@Component
publicclass InvitedRegistryRetryMessageJob {
@Resource
private RetryMessageMapper retryMessageMapper;
@Resource
private InvitedRegistryProducer invitedRegistryIncomeProducer;
@XxlJob("invitedRegistryIncomeRetryMessageJob")
public void process() {
log.info("开始执行 Invited user registry income retry messageJob 定时任务");
XxlJobHelper.log("start userPointRetryMessageJob job");
int countRetryMessage = retryMessageMapper.countRetryMessage(0, 0);
if (countRetryMessage == 0) {
log.info(" 执行结束 Invited user registry income retry messageJob 没有消息需要重发");
}
List<RetryMessage> retryMessages = retryMessageMapper.selectRetryMessage(0, 0);
for (RetryMessage retryMessage : retryMessages) {
invitedRegistryIncomeProducer.sendMessage(retryMessage);
}
}
}
5、消费者实现
@RabbitListener(queues = RabbitMqQueueConstant.USER_INVITED_REGISTRY_QUEUE)
@Component
@Slf4j
publicclass UserInvitedRegistryConsumer {
@Resource
private UserInvitedRegistryService userInvitedRegistryService;
@RabbitHandler
public void process(Object data, Channel channel, Message message) throws IOException {
try {
log.info("消费者接受到的消息是:{},消息体为:{}", data, message);
InvitedRegistryMessage invitedRegistryMessage = JSON.parseObject(new String(message.getBody()), InvitedRegistryMessage.class);
userInvitedRegistryService.invitedRegistry(invitedRegistryMessage);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception exception) {
@Override
public void invitedRegistry(InvitedRegistryMessage invitedRegistryMessage) {
GlobalProperty chargeGlobalProperty = globalPropertyMapper.selectByPrimaryKey(1);
Integer busyDefault = chargeGlobalProperty.getBusiDefault();
Long userId = invitedRegistryMessage.getUserId();
RLock lock = redissonClient.getLock(RedisConstantPre.USER_INFO_ID_LOCK_PRE + userId);
lock.lock();
try {
int count = chargeInviteRecordMapper.countByReqId(invitedRegistryMessage.getReqId());
if (count == 0) {
log.info("【重复消费】,请求参数={}",invitedRegistryMessage);
return;
}
//邀请人余额 增加
ChargeUser chargeUser = chargeUserMapper.selectByPrimaryKey(userId);
//新余额=当前余额+邀请收益
chargeUser.setBalance(chargeUser.getBalance().add(invitedRegistryMessage.getAmount()));
chargeUserMapper.updateByPrimaryKey(chargeUser);
//邀请记录
ChargeInviteRecord chargeInviteRecord = new ChargeInviteRecord();
Date date = new Date();
chargeInviteRecord.setInvitedTime(date);
chargeInviteRecord.setInvitedUserId(invitedRegistryMessage.getNewUserId());
chargeInviteRecord.setInviteUserId(invitedRegistryMessage.getUserId());
chargeInviteRecordMapper.insert(chargeInviteRecord);
//邀请收益记录
ChargeUserIncome chargeUserIncome = new ChargeUserIncome();
chargeUserIncome.setIncome(new BigDecimal(busyDefault));
chargeUserIncome.setUserId(invitedRegistryMessage.getUserId());
chargeUserIncome.setCreateTime(date);
chargeUserIncomeMapper.insert(chargeUserIncome);
} finally {
lock.unlock();
}
}
流程图
下面来画张图给大家梳理一下:
以上就是如何解决消息不丢失的问题,其实细心的朋友可以看到重复消费问题也解决了。
以上代码都在我的充电桩项目中,想想设计文档可以宠我的知识星球中获取。
https://www.woaijava.cc/
领取专属 10元无门槛券
私享最新 技术干货