大家好,我是小悟。
在电商系统中,超卖是指商品库存不足以满足所有购买请求时,系统仍然接受了超过库存数量的订单。这会导致:
方案 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
数据库悲观锁 | 并发量不高 | 实现简单,强一致性 | 性能瓶颈,死锁风险 |
数据库乐观锁 | 读多写少 | 性能较好 | 需要重试机制 |
Redis原子操作 | 高并发场景 | 高性能 | 数据一致性需注意 |
分布式锁 | 分布式系统 | 强一致性 | 实现复杂 |
消息队列 | 流量削峰 | 系统解耦 | 实时性较差 |
使用数据库的排他锁(SELECT … FOR UPDATE)锁定库存记录,确保同一时间只有一个事务可以修改库存。
@Service
@Transactional
public class PessimisticLockInventoryService {
@Autowired
private ProductRepository productRepository;
@Autowired
private OrderRepository orderRepository;
public boolean purchaseWithPessimisticLock(Long productId, Integer quantity) {
// 1. 查询商品并加锁
Product product = productRepository.findWithLock(productId);
if (product == null) {
throw new RuntimeException("商品不存在");
}
// 2. 检查库存
if (product.getStock() < quantity) {
throw new RuntimeException("库存不足");
}
// 3. 扣减库存
int updatedRows = productRepository.decreaseStock(productId, quantity);
if (updatedRows == 0) {
throw new RuntimeException("库存扣减失败");
}
// 4. 创建订单
Order order = new Order();
order.setProductId(productId);
order.setQuantity(quantity);
order.setStatus(1);
orderRepository.save(order);
return true;
}
}
// Repository 层实现
@Repository
public interface ProductRepository extends JpaRepository<Product, Long> {
@Query(value = "SELECT * FROM product WHERE id = ?1 FOR UPDATE", nativeQuery = true)
Product findWithLock(Long id);
@Modifying
@Query("UPDATE Product p SET p.stock = p.stock - ?2 WHERE p.id = ?1 AND p.stock >= ?2")
int decreaseStock(Long productId, Integer quantity);
}通过版本号机制,在更新时检查数据是否被其他事务修改过,如果版本号不匹配则更新失败。
@Service
public class OptimisticLockInventoryService {
@Autowired
private ProductRepository productRepository;
@Autowired
private OrderRepository orderRepository;
private static final int MAX_RETRY_TIMES = 3;
public boolean purchaseWithOptimisticLock(Long productId, Integer quantity) {
int retryTimes = 0;
while (retryTimes < MAX_RETRY_TIMES) {
// 1. 查询商品(不加锁)
Product product = productRepository.findById(productId)
.orElseThrow(() -> new RuntimeException("商品不存在"));
// 2. 检查库存
if (product.getStock() < quantity) {
throw new RuntimeException("库存不足");
}
// 3. 扣减库存(带版本号检查)
int updatedRows = productRepository.decreaseStockWithVersion(
productId, quantity, product.getVersion());
if (updatedRows > 0) {
// 4. 创建订单
Order order = new Order();
order.setProductId(productId);
order.setQuantity(quantity);
order.setStatus(1);
orderRepository.save(order);
return true;
}
retryTimes++;
// 重试前短暂休眠
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程中断", e);
}
}
throw new RuntimeException("并发更新失败,请重试");
}
}
// 实体类
@Entity
@Table(name = "product")
public class Product {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
private Integer stock;
@Version
private Integer version;
// getter和setter方法
}
// Repository 层实现
@Repository
public interface ProductRepository extends JpaRepository<Product, Long> {
@Modifying
@Query("UPDATE Product p SET p.stock = p.stock - ?2, p.version = p.version + 1 " +
"WHERE p.id = ?1 AND p.stock >= ?2 AND p.version = ?3")
int decreaseStockWithVersion(Long productId, Integer quantity, Integer version);
}利用Redis的原子操作(如DECR、LUA脚本)来保证库存扣减的原子性。
@Service
public class RedisInventoryService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ProductRepository productRepository;
@Autowired
private OrderRepository orderRepository;
private static final String STOCK_KEY_PREFIX = "product:stock:";
private static final String LUA_SCRIPT =
"if redis.call('exists', KEYS[1]) == 1 then " +
"local stock = tonumber(redis.call('get', KEYS[1])); " +
"local num = tonumber(ARGV[1]); " +
"if stock >= num then " +
"return redis.call('decrby', KEYS[1], num) " +
"else " +
"return -1 " +
"end " +
"else " +
"return -2 " +
"end";
public boolean purchaseWithRedis(Long productId, Integer quantity) {
String stockKey = STOCK_KEY_PREFIX + productId;
// 使用LUA脚本保证原子性
Long result = (Long) redisTemplate.execute(
new DefaultRedisScript<>(LUA_SCRIPT, Long.class),
Collections.singletonList(stockKey),
quantity);
if (result == null) {
throw new RuntimeException("Redis操作失败");
}
if (result == -1) {
throw new RuntimeException("库存不足");
}
if (result == -2) {
throw new RuntimeException("商品不存在");
}
// 异步更新数据库
asyncUpdateDatabase(productId, quantity);
return true;
}
@Async
public void asyncUpdateDatabase(Long productId, Integer quantity) {
try {
// 更新数据库库存
productRepository.decreaseStock(productId, quantity);
// 创建订单
Order order = new Order();
order.setProductId(productId);
order.setQuantity(quantity);
order.setStatus(1);
orderRepository.save(order);
} catch (Exception e) {
// 如果数据库更新失败,需要回滚Redis库存
rollbackRedisStock(productId, quantity);
throw new RuntimeException("数据库更新失败", e);
}
}
private void rollbackRedisStock(Long productId, Integer quantity) {
String stockKey = STOCK_KEY_PREFIX + productId;
redisTemplate.opsForValue().increment(stockKey, quantity);
}
/**
* 初始化Redis库存
*/
public void initRedisStock(Long productId, Integer stock) {
String stockKey = STOCK_KEY_PREFIX + productId;
redisTemplate.opsForValue().set(stockKey, stock);
}
}使用Redis或Zookeeper实现分布式锁,确保在分布式环境下同一时间只有一个服务实例可以操作库存。
@Service
public class DistributedLockInventoryService {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ProductRepository productRepository;
@Autowired
private OrderRepository orderRepository;
private static final String LOCK_KEY_PREFIX = "inventory_lock:";
public boolean purchaseWithDistributedLock(Long productId, Integer quantity) {
String lockKey = LOCK_KEY_PREFIX + productId;
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试加锁,最多等待5秒,锁过期时间为10秒
boolean locked = lock.tryLock(5, 10, TimeUnit.SECONDS);
if (!locked) {
throw new RuntimeException("系统繁忙,请稍后重试");
}
// 查询商品库存
Product product = productRepository.findById(productId)
.orElseThrow(() -> new RuntimeException("商品不存在"));
// 检查库存
if (product.getStock() < quantity) {
throw new RuntimeException("库存不足");
}
// 扣减库存
int updatedRows = productRepository.decreaseStock(productId, quantity);
if (updatedRows == 0) {
throw new RuntimeException("库存扣减失败");
}
// 创建订单
Order order = new Order();
order.setProductId(productId);
order.setQuantity(quantity);
order.setStatus(1);
orderRepository.save(order);
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程中断", e);
} finally {
// 释放锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
// Redisson配置类
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host:localhost}")
private String redisHost;
@Value("${spring.redis.port:6379}")
private String redisPort;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + redisHost + ":" + redisPort)
.setDatabase(0);
return Redisson.create(config);
}
}通过消息队列进行流量削峰,将订单请求放入队列中顺序处理,避免瞬时高并发。
@Service
public class MQInventoryService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ProductRepository productRepository;
@Autowired
private OrderRepository orderRepository;
private static final String ORDER_QUEUE = "order.queue";
private static final String ORDER_EXCHANGE = "order.exchange";
private static final String ORDER_ROUTING_KEY = "order.create";
/**
* 接收订单请求,发送到消息队列
*/
public boolean submitOrder(Long productId, Integer quantity, Long userId) {
OrderRequest orderRequest = new OrderRequest();
orderRequest.setProductId(productId);
orderRequest.setQuantity(quantity);
orderRequest.setUserId(userId);
orderRequest.setRequestId(UUID.randomUUID().toString());
orderRequest.setCreateTime(System.currentTimeMillis());
// 发送消息到队列
rabbitTemplate.convertAndSend(ORDER_EXCHANGE, ORDER_ROUTING_KEY, orderRequest);
return true;
}
/**
* 消费订单消息,处理库存扣减
*/
@RabbitListener(queues = ORDER_QUEUE)
public void processOrder(OrderRequest orderRequest) {
Long productId = orderRequest.getProductId();
Integer quantity = orderRequest.getQuantity();
try {
// 查询商品库存
Product product = productRepository.findById(productId)
.orElseThrow(() -> new RuntimeException("商品不存在"));
// 检查库存
if (product.getStock() < quantity) {
// 库存不足,记录日志或发送通知
handleInsufficientStock(orderRequest);
return;
}
// 扣减库存
int updatedRows = productRepository.decreaseStock(productId, quantity);
if (updatedRows == 0) {
handleInsufficientStock(orderRequest);
return;
}
// 创建订单
Order order = new Order();
order.setProductId(productId);
order.setQuantity(quantity);
order.setUserId(orderRequest.getUserId());
order.setStatus(1);
orderRepository.save(order);
// 发送订单创建成功通知
sendOrderSuccessNotification(orderRequest);
} catch (Exception e) {
// 处理异常,可以重试或记录错误
handleProcessError(orderRequest, e);
}
}
private void handleInsufficientStock(OrderRequest orderRequest) {
// 记录库存不足日志
// 发送库存不足通知给用户
System.err.println("库存不足,订单请求: " + orderRequest);
}
private void sendOrderSuccessNotification(OrderRequest orderRequest) {
// 发送订单创建成功通知
System.out.println("订单创建成功: " + orderRequest);
}
private void handleProcessError(OrderRequest orderRequest, Exception e) {
// 处理异常,可以重试或记录错误
System.err.println("处理订单异常: " + orderRequest + ", 错误: " + e.getMessage());
}
}
// 订单请求DTO
@Data
public class OrderRequest {
private String requestId;
private Long productId;
private Integer quantity;
private Long userId;
private Long createTime;
}
// RabbitMQ配置类
@Configuration
public class RabbitMQConfig {
@Bean
public Queue orderQueue() {
return new Queue("order.queue", true);
}
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange");
}
@Bean
public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
return BindingBuilder.bind(orderQueue).to(orderExchange).with("order.create");
}
}
谢谢你看我的文章,既然看到这里了,如果觉得不错,随手点个赞、转发、在看三连吧,感谢感谢。那我们,下次再见。
您的一键三连,是我更新的最大动力,谢谢
山水有相逢,来日皆可期,谢谢阅读,我们再会
我手中的金箍棒,上能通天,下能探海
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。