在分布式系统架构设计中,如何保证数据的一致性是一个非常重要的问题。而分布式事务处理就是解决这个问题的常见手段之一。本篇将介绍常见的分布式事务处理手段,并结合生产实践案例进行详细阐述。
两阶段提交(Two-Phase Commit)是一种经典的分布式事务处理方法,它通过一个事务协调器(Transaction Coordinator)来协调所有参与者(Participant)的事务操作。具体流程如下:
该方法的优点是比较直接,实现简单;缺点是可能存在单点故障。下面以一个在线购物场景为例,介绍如何使用两阶段提交实现分布式事务处理。
假设用户在购买商品时需要扣减库存和账户余额,场景如下:
下面是使用两阶段提交实现分布式事务处理的步骤:
在Java的Spring Boot框架中,可以使用JTA来模拟实现两阶段提交协议。具体操作方法如下:
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>atomikos-jta</artifactId>
<version>4.0.6</version>
</dependency>
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/db_example
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.jndi-name=jdbc/MyDataSource
spring.jta.atomikos.connectionfactory.bean-name=myXAConnectionFactory
spring.jta.atomikos.datasource.max-idle=3
spring.jta.atomikos.datasource.min-idle=1
spring.jta.atomikos.datasource.max-active=5
spring.jta.atomikos.datasource.pool-size=5
spring.jta.atomikos.datasource.test-query=SELECT 1
@Service
public class OrderService {
@Autowired
private StockService stockService;
@Autowired
private AccountService accountService;
@Transactional
public void deductStockAndAccount(String orderId, String userId, double totalPrice) {
// 扣减库存
stockService.deductStock(orderId, totalPrice);
// 扣减账户余额
accountService.deductAccount(userId, totalPrice);
}
}
@Service
public class AccountService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
public void deductAccount(String userId, double totalPrice) throws Exception {
int result = jdbcTemplate.update("UPDATE account SET balance=balance-? WHERE user_id=?", totalPrice, userId);
if (result == 0) {
throw new Exception("余额不足,扣减失败");
}
}
}
@Service
public class StockService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
public void deductStock(String orderId, double totalPrice) throws Exception {
int result = jdbcTemplate.update("UPDATE stock SET count=count-? WHERE order_id=?", totalPrice, orderId);
if (result == 0) {
throw new Exception("库存不足,扣减失败");
}
}
}
补偿事务(Compensating Transaction)是另一种常用的分布式事务处理方法。它假定在执行业务操作时,如果其中一个分支出问题,后续分支可以对之前分支已经执行过的操作进行回滚或者撤销,以此来解决分布式事务中的部分失败场景。具体流程如下:
这种方法的优点是相比于两阶段提交,能够更好地适应分布式系统中的不确定性和故障;缺点是实现略微复杂。下面以一个转账场景为例,介绍如何使用补偿事务实现分布式事务处理。
假设用户在进行账户之间的转账时需要拆分为两个服务:转出服务和转入服务,场景如下:
以上三个操作需要保证原子性,即要么全部执行成功,要么全部回滚。这时可以使用补偿事务来解决该问题:
针对这个场景,可以使用补偿事务来实现分布式事务处理。下面是步骤:
下面是使用Java的Spring Boot框架+JTA模拟实现补偿事务的过程:
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>atomikos-jta</artifactId>
<version>4.0.6</version>
</dependency>
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/db_example
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.jndi-name=jdbc/MyDataSource
spring.jta.atomikos.connectionfactory.bean-name=myXAConnectionFactory
spring.jta.atomikos.datasource.max-idle=3
spring.jta.atomikos.datasource.min-idle=1
spring.jta.atomikos.datasource.max-active=5
spring.jta.atomikos.datasource.pool-size=5
spring.jta.atomikos.datasource.test-query=SELECT 1
@Service
public class TransferOutService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private TransferInService transferInService;
@Transactional
public void transferOut(String fromUser, String toUser, double amount) throws Exception {
int result = jdbcTemplate.update("UPDATE account SET balance=balance-? WHERE username=?", amount, fromUser);
if (result == 0) {
throw new Exception("余额不足,扣减失败");
}
// 发起转账请求
transferInService.transferIn(fromUser, toUser, amount);
}
@Transactional(rollbackFor = Exception.class)
public void compensateTransferOut(String fromUser, double amount) {
jdbcTemplate.update("UPDATE account SET balance=balance+? WHERE username=?", amount, fromUser);
}
}
@Service
public class TransferInService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
public void transferIn(String fromUser, String toUser, double amount) throws Exception {
int result = jdbcTemplate.update("UPDATE account SET balance=balance+? WHERE username=?", amount, toUser);
if (result == 0) {
throw new Exception("账户不存在,转入失败");
}
// 向转出服务发送“同意提交”响应
transferOutClient.agreeTransfer();
}
@Transactional(rollbackFor = Exception.class)
public void compensateTransferIn(String toUser, double amount) {
jdbcTemplate.update("UPDATE account SET balance=balance-? WHERE username=?", amount, toUser);
}
}
消息队列(Message Queue)是一种常用的异步通信方式,在分布式系统中也可以用来解决部分分布式事务处理问题。具体流程如下:
这种方式可以避免分布式事务中的资源锁争用和阻塞等问题,提高系统的并发性和扩展性。下面以一个秒杀场景为例,介绍如何使用消息队列实现分布式事务处理。
假设在商家进行秒杀活动时需要保证商品库存数量和订单数量的一致性,场景如下:
以上操作需要保证原子性,即要么全部执行成功,要么全部回滚。这时可以使用消息队列来解决该问题:
要基于maven的Spring Boot程序设计使用RabbitMQ模拟实现分布式事务处理,需要按如下步骤进行:
<!-- RabbitMQ依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
@Service
public class SecKillService {
@Autowired
private AmqpTemplate amqpTemplate;
@Transactional
public void doSecKill(String itemId, String userId) throws Exception {
// 减少商品数量
int result = jdbcTemplate.update("UPDATE item SET stock=stock-1 WHERE id=?", itemId);
if (result == 0) {
throw new Exception("库存不足,秒杀失败");
}
// 发送创建订单消息
SeckillOrder order = new SeckillOrder();
order.setItemId(itemId);
order.setUserId(userId);
amqpTemplate.convertAndSend("create-order-exchange", "create-order-key", JsonUtils.toJson(order));
}
}
@Service
public class OrderService {
@Autowired
private AmqpTemplate amqpTemplate;
@Transactional
@RabbitListener(queues = "create-order-queue")
public void createOrder(String message) throws Exception {
SeckillOrder order = JsonUtils.fromJson(message, SeckillOrder.class);
// 执行本地事务
int result = jdbcTemplate.update("INSERT INTO order (item_id, user_id) VALUES (?, ?)", order.getItemId(), order.getUserId());
if (result == 0) {
throw new Exception("创建订单失败");
}
// 发送响应消息
amqpTemplate.convertAndSend("create-order-exchange", "order-created-key", message);
}
@RabbitListener(queues = "rollback-order-queue")
public void rollbackOrder(String message) {
// 执行本地事务回滚
SeckillOrder order = JsonUtils.fromJson(message, SeckillOrder.class);
jdbcTemplate.update("UPDATE item SET stock=stock+1 WHERE id=?", order.getItemId());
}
}
@SpringBootApplication
@EnableTransactionManagement
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public DirectExchange createOrderExchange() {
return new DirectExchange("create-order-exchange");
}
@Bean
public Queue createOrderQueue() {
return new Queue("create-order-queue");
}
@Bean
public Binding createOrderBinding() {
return BindingBuilder.bind(createOrderQueue()).to(createOrderExchange()).with("create-order-key");
}
@Bean
public Queue orderCreatedQueue() {
return new Queue("order-created-queue");
}
@Bean
public Binding orderCreatedBinding() {
return BindingBuilder.bind(orderCreatedQueue()).to(createOrderExchange()).with("order-created-key");
}
@Bean
public Queue rollbackOrderQueue() {
return new Queue("rollback-order-queue");
}
@Bean
public Binding rollbackOrderBinding() {
return BindingBuilder.bind(rollbackOrderQueue()).to(createOrderExchange()).with("rollback-order-key");
}
}
可靠消息最终一致性(Reliable Message-Based Consistency)是一种通过消息中间件来确保分布式操作的可靠性和一致性的方法。具体流程如下:
这种方式适用于数据一致性要求不高的场景,可以提高系统的并发性和扩展性。下面以一个用户注册送积分场景为例,介绍如何使用可靠消息最终一致性实现分布式事务处理。
假设在用户注册成功后需要给用户送积分,场景如下:
以上操作需要保证原子性,即要么全部执行成功,要么全部回滚。这时可以使用可靠消息最终一致性来解决该问题:
本篇介绍了常见的分布式事务处理手段,并结合生产实践案例进行详细阐述。实际生产需要根据具体业务场景来选择适合的分布式事务处理方法,以保证系统的可靠性和一致性。