在上一篇文章Spring Boot自动装配原理以及实践我们完成了服务通用日志监控组件的开发,确保每个服务都可以基于一个注解实现业务功能的监控。
而本文我们尝试基于RocketMQ
实现下单的分布式的事务。可能会有读者会有疑问,之前我们不是基于Seata
完成了分布式事务,为什么我们还要用到RocketMQ
呢?
我们的再来回顾一下我们下单功能大抵是做以下三件事情:
我们将该场景放到高并发场景下,这个功能势必要考虑性能和可靠性问题,所以我们在业务需求清楚明了的情况下,就希望能有一种方式确保下单功能在高并发场景保证性能、可靠性。
而Seata
的AT
模式确实可以保证最终一致性,但由于需要用到undo_log
和lock_table
等涉及数据持久化以及锁相关的操作,可能存在一定的性能问题。而且Seata
一旦报错会直接回滚事务,不存在任何重试机制,对于我们这种付款下单的场景是非常不可取的。
而RocketMQ
实现分布式的方式是基于消息通信的,既确保了业务功能解耦保证了并发场景的性能,而且RocketMQ
还对消息消费可靠性做了许多不错的优化,例如:失败重试、死信队列等,所以我们还是尝试使用RocketMQ
来改良我们的下单分布式事务问题。
用户下单大抵需要在三个服务中完成:订单创建、钱包扣款、库存扣减等业务逻辑。这其中会跨域三个服务,分别是订单服务创建订单、账户服务扣款、商品服务扣减库存。
以我们业务为最终目标,RocketMQ
实现分布式事务的原理是基于2PC
的,流程大抵如下:
MQ
收到half
消息,并回复确认。(订单服务order-service)
得知我们发送的消息已被收到,订单服务则执行本地事务并提交事务,即将订单数据插入数据库中。(订单服务order-service)
完成本地事务的提交,告知MQ
将事务消息commit
,此时消费者就可以消费这条消息了,注意若生产者消费失败,则将消息rollback
,一切就当没有发生过。commit
则将消息持久化到commitLog
中,以便后续MQ
宕机或者服务宕机后依然可以继续消费这条没有被消费的消息。(非必要步骤)
若MQ长时间没有收到生产者的commit
或者rollback
的信号,则会主动找生产者索要当前消息状态。half
消息即半消息,它和普通消息一样,都是存储在MQ中,唯一区别就是这个消息不会立马被消费者消费到。只有生产者本地事务成功并发送commit通知后,这个消息才会被提交到topic队列
中后消费者拿到这个消息并进行消费。
基于MQ事务消息
的实现接口完成实现(具体后文会演示)。
先发送half消息
的原因是为了尽可能确保生产者和消息队列通信正常,只有通信正常了才能确保生产者本地事务提交后发送的commit通知可以消息队列收到通知,从而将消息提交到topic队列中让消费者消费,由此保证分布式事务的可靠性。
这也就意味着生产者没有收到确认的通知,随后消息队列就会因为长时间没有收到生产者commit或者rollback的通知而去回调生产者的接口询问事务提交结果。
MQ
没有收到生产者(订单服务)
的commit
或者rollback
信号我们如何回查?怎么提供回查的依据?常规的做法就是建立一张表记录日志,只要我们订单信息插入成功就需要日志一下这条数据,所以我们必须保证订单数据插入和日志插入表中的原子性,这一点我们基于spring
的事务注解即可实现。
首先将本地事务回滚,再向消息队列提交一个rollback
的请求,对应的half消息
就会回滚,而不会被消费者消费,保证最终一致性。
生产者和消息队列事务流程可以确保生产者和消息队列写操作的一致性,确保写操作都是成功或者失败。只有保证两者正常通信,才能确保消费者可以消费MQ中的消息从而完成数据最终一致性。
我们都知道消息队列只能保证消息可靠性,而无法保证分布式事务的强一致性,出现这种情况,消息队列会进行N次重试,如果还是失败,则可以到死信队列中查看失败消息,然后通过补偿机制实现分布式事务最终一致性。
在编写业务代码之前,我们必须完成一下RocketMQ
的部署,首先我们自然要下载一下RocketMQ
,下载地址如下,笔者下载的是rocketmq-all-4.8.0-bin-release
这个版本
https://rocketmq.apache.org/download/
完成完成后,我们将其解压到自定义的路径,并配置一个名为ROCKETMQ_HOME
的环境变量,以笔者为例,因为mq存放在D:\myinstall\rocketmq
,所以我们将这个路径配置到环境变量中。
完成环境变量配置后,我们到达mq
的bin
目录先键入这条命令,启动nameserver
start mqnamesrv.cmd
如果弹窗输出下面这条结果,则说明mq
的NameServer
启动成功。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
然后我们再键入下面这条命令启动broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
若弹窗输出下面所示的文字,则说明broker
启动成功,自此mq
就在windows
环境部署成功了。我们就可以开始编码工作了。
The broker[DESKTOP-BI4ATFQ, 192.168.237.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
完成RocketMQ
部署之后,我们就可以着手编码工作了,首先我们要在在三个服务中引入RocketMQ
的依赖,由于笔者的spring-boot
版本比较老,所以这里笔者为了统一管理在父pom
中指定了mq
较新的版本号:
<!--rocketmq-->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
然后我们分别对order
、account
、product
三个服务中引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
由于我们的分布式事务涉及3个服务,而且mq的消费模式采用的是发布订阅模式,所以我们的生产者(order-service)和消费者(account-serivce)都配置为cloud-group
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: cloud-group
之所以没有没将消费者2(product-service)也配置到cloud-group中的原因也很简单,同一个消息只能被同一个消费者组中的一个成员消费,假如我们的将product-service配置到同一个消费者组中就会出现一条消息只能被一个Java
服务消费。
对此我们实现思路有两种:
product-service
设置到别的消费者组中。考虑后续扩展笔者选择方案2,设置到别的组中。
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: cloud-group2
我们在上文进行需求梳理时有提到一个MQServer
没收到生产者本地事务执行状态的情况,所以我们在生产者在执行本地事务时,需要创建一张表记录生产者本地事务执行状态,建表SQL如下:
DROP TABLE IF EXISTS `rocketmq_transaction_log`;
CREATE TABLE `rocketmq_transaction_log` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`transaction_id` varchar(50) DEFAULT NULL,
`log` varchar(500) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
我们的订单服务需要做以下三件事:
所以我们需要定义一下消息格式,对象类中必须包含订单号、产品编码、用户编码、购买产品数量等信息。
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class OrderDto {
private static final long serialVersionUID = 1L;
//设置主键自增,避免插入时没必要的报错
@TableId(value = "ID", type = IdType.AUTO)
private Integer id;
/**
* 订单号
*/
private String orderNo;
/**
* 用户编码
*/
private String accountCode;
/**
* 产品编码
*/
private String productCode;
/**
* 产品扣减数量
*/
private Integer count;
/**
* 余额
*/
private BigDecimal amount;
/**
* 本次扣减金额
*/
private BigDecimal price;
}
然后我们就可以编写控制层的代码了,通过获取前端传输的参数调用orderService完成half消息发送。
@PostMapping("/order/createByMQ")
public ResultData<String> createByMQ(@RequestBody OrderDto orderDTO) {
log.info("基于mq完成用户下单流程,请求参数: " + JSON.toJSONString(orderDTO));
orderService.createByRocketMQ(orderDTO);
return ResultData.success("基于mq完成用户下单完成");
}
orderService
的实现逻辑很简单,定义好消息设置消息头内容和消息载体的对象,通过sendMessageInTransaction
方法完成半消息发送,需要了解一下消息的主题(topic)
为createByRocketMQ
,只有订阅这个主题的消费者才能消费这条消息。
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void createByRocketMQ(OrderDto orderDto) {
//创建half消息,消息内容为,告知account服务要退款给用户
String transactionId = UUID.randomUUID().toString();
Message<OrderDto> message = MessageBuilder.withPayload(orderDto)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("accountCode", orderDto.getAccountCode())
.setHeader("productCode", orderDto.getProductCode())
.setHeader("count", orderDto.getCount())
.setHeader("amount", orderDto.getPrice().multiply(new BigDecimal(orderDto.getCount())))
.build();
//发送half消息
rocketMQTemplate.sendMessageInTransaction("createByRocketMQ", message, orderDto);
}
完成half消息发送之后,我们就必须知晓消息发送结果才能确定是否执行本地事务并提交,所以我们的订单服务必须创建一个监听器了解half消息的发送情况,executeLocalTransaction
方法就是mq成功收到半消息后的回调函数,一旦我们得知消息成功发送之后,MQ就会执行这个方法,笔者通过这个方法获取消息头的参数创建订单对象,调用createOrderWithRocketMqLog
完成订单的创建的本地事务成功的日志记录。
@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class OrderListener implements RocketMQLocalTransactionListener {
private final IOrderService orderService;
private final RocketmqTransactionLogMapper rocketMqTransactionLogMapper;
/**
* 监听到发送half消息,执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
log.info("order执行本地事务");
try {
MessageHeaders headers = message.getHeaders();
String amount = (String) headers.get("amount");
Order order = Order.builder()
.accountCode((String) headers.get("accountCode"))
.amount(new BigDecimal(amount) )
.productCode((String) headers.get("productCode"))
.count(Integer.valueOf(String.valueOf(headers.get("count"))))
.build();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
orderService.createOrderWithRocketMqLog(order, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("创建订单失败,失败原因: " + e.getMessage(), e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 本地事务的检查,检查本地事务是否成功
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
//获取事务ID
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("检查本地事务,事务ID:{}", transactionId);
//根据事务id从日志表检索
QueryWrapper<RocketmqTransactionLog> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("transaction_id", transactionId);
RocketmqTransactionLog rocketmqTransactionLog = rocketMqTransactionLogMapper.selectOne(queryWrapper);
if (null != rocketmqTransactionLog) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
createOrderWithRocketMqLog
做了两件事,分别是插入订单信息和创建消息日志,这里笔者用到了事务注解确保了两个操作的原子性。
这样一来,MQserver
后续的回查逻辑完全可以基于RocketmqTransactionLog
进行判断,如果消息的事务id在表中存在,则说明生产者本地事务成功,反之就是失败。
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void createOrderWithRocketMqLog(Order order, String transactionId) {
order.setOrderNo(UUID.randomUUID().toString());
orderMapper.insert(order);
RocketmqTransactionLog log = RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("执行创建订单操作")
.build();
rocketmqTransactionLogMapper.insert(log);
}
补充一下基于MP
生成的RocketmqTransactionLog
类代码
@TableName("rocketmq_transaction_log")
@ApiModel(value = "RocketmqTransactionLog对象", description = "")
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RocketmqTransactionLog implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "ID", type = IdType.AUTO)
private Integer id;
private String transactionId;
private String log;
}
然后我们就可以实现用户服务和商品服务的监听事件了,一旦生产者提交事务消息之后,这几个消费者都会收到这个topic(主题)
的消息,进而完成当前服务的业务逻辑。
先来看看实现扣款的用户服务,我们的监听器继承了RocketMQListener
,基于@RocketMQMessageListener
注解设置它订阅的主题为createByRocketMQ
,一旦收到这个主题的消息时这个监听器就会执行onMessage
方法,我们的逻辑很简单,就是获取消息的内容完成扣款,唯一需要注意的就是线程安全问题。我们的压测的情况下,单用户可能会频繁创建订单,在并发期间同一个用户的扣款消息可能同时到达扣款服务中,这就导致单位时间内扣款服务从数据库中查询到相同的余额,执行相同的扣款逻辑,导致金额少扣了。
所以我们必须保证扣款操作互斥和原子化,考虑到笔者当前项目环境是单体,所以就用简单的synchronized
关键字解决问题。
@Slf4j
@Service
@RocketMQMessageListener(topic = "createByRocketMQ", consumerGroup = "cloud-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SubtracAmountListener implements RocketMQListener<OrderDto> {
@Resource
private AccountMapper accountMapper;
//强制转为runTimeException
@SneakyThrows
@Override
public void onMessage(OrderDto orderDto) {
log.info("账户服务收到消息,开始消费");
QueryWrapper<Account> query = new QueryWrapper<>();
query.eq("account_code", orderDto.getAccountCode());
//解决单体服务下线程安全问题
synchronized (this){
Account account = accountMapper.selectOne(query);
BigDecimal subtract = account.getAmount().subtract(orderDto.getAmount());
if (subtract.compareTo(BigDecimal.ZERO)<0){
throw new Exception("用户余额不足");
}
account.setAmount(subtract);
log.info("更新账户服务,请求参数:{}", JSON.toJSONString(account));
accountMapper.updateById(account);
}
}
}
然后就说商品服务,逻辑也很简单,也同样要注意一下线程安全问题
@Slf4j
@Service
@RocketMQMessageListener(topic = "createByRocketMQ", consumerGroup = "cloud-group2")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ProductSubtractListener implements RocketMQListener<OrderDto> {
@Resource
private ProductMapper productMapper;
@Override
public void onMessage(OrderDto orderDto) {
log.info("产品服务收到消息,开始消费");
QueryWrapper<Product> queryWrapper=new QueryWrapper<>();
queryWrapper.eq("product_code",orderDto.getProductCode());
synchronized (this){
Product product = productMapper.selectOne(queryWrapper);
if (product.getCount()<orderDto.getCount()){
throw new RuntimeException("库存不足");
}
product.setCount(product.getCount()-orderDto.getCount());
log.info("更新产品库存信息,请求参数:{}", JSON.toJSONString(product));
productMapper.updateById(product);
}
}
}
完整编码工作后,自测是非常有必要的,我们日常完成开发任务后,都会结合需求场景以及功能编排一些自测用例查看最终结果是否与预期一致。
需要注意的是由于订单业务逻辑较为复杂,很多业务场景一篇博客是不可能全部覆盖,所以这里我们就测试一下基于RocketMQ
实现分布式事务常见的几个问题场景是否和预期一致。
在测试前我们必须做好前置准备工作,准备功能测试时涉及到的SQL语句,以本次用户购买产品的业务为例,涉及到订单表、用户账户信息表、产品表、以及生产者本地事务日志表。
SELECT * FROM t_order to2 ;
SELECT * from account a ;
SELECT * from product p ;
SELECT * FROM rocketmq_transaction_log rtl ;
在每次测试完成之后,我们希望数据能够还原,所以这里也需要准备一下每次测试结束后的更新语句,由于订单表和消息日志表都是主键自增,考虑到这两张表只涉及插入,所以笔者为了重置主键的值采取的是truncate语句。
truncate table t_order;
truncate rocketmq_transaction_log ;
UPDATE account set amount=10000 ;
UPDATE product set count=10000;
第一个用例是查看所有服务都正常的情况下,订单表是否有数据,用户表的用户是否会正常扣款,以及商品表库存是否会扣减。
测试前,我们先查看订单表,确认没有数据
查看我们的测试用户,钱包额度为10000
再查看库存表,可以看到数量为1000
确认完数据之后,我们就可以测试服务是否按照预期的方式执行,将所有服务启动
我们通过网关发起调用,请求地址如下:
http://localhost:8090/order/order/createByMQ
请求参数如下,从参数可以看出这个请求意为用户代码(accountCode)
为demoData这个用户希望购买1个(count)产品代码(productCode)
为P001
的产品,该产品当前售价(price)
为1元。
{
"accountCode": "demoData",
"productCode": "P001",
"count": 1,
"amount": 1,
"price": 1
}
调用完成后,查看订单表,订单数据生成无误:
查看用户服务是否完成用户扣款,扣款无误:
查看产品表,可以看到产品数量也准确扣减:
我们希望测试一下发送完half消息之后,执行本地事务完成,但是未提交commit请求时,MQServer是否会调用回查逻辑。
为了完成这一点我们必须按照以下两个步骤执行:
jps
定位到进程号,将其强制杀死。如下所示,我们的代码执行到了提交事务消息这一步:我们通过jps
定位并将其杀死
MQServer
会调用checkLocalTransaction
回查生产者本地事务的情况。我们放行这块代码让程序执行下去,最后再查看数据库中的数据结果是否符合预期。测试消费者执行报错后是否会进行重试,这一点就比较好测试了,我们在消费者监听器中插入随便插入一个报错查看其是否会不断重试。这里笔者就不多做演示,实验结果是会进行不断重试,当重试次数达到阈值时会将结果存到死信队列中。
由于MQ
是采用异步消费的形式解耦了服务间的业务,而我们的Seata
采用默认的AT模式
每次执行分布式事务时都会需要借助undo-log
、全局锁
等的方式保证最终一致性。所以理论上RocketMQ
的性能肯定是高于Seata
的,对此我们不妨使用Jmeter
进行压测来验证一下。
本次压测只用了10个并发,MQ和seata的压测结果如下,可以看到MQ无论从执行时间还是成功率都远远优秀于Seata的。
MQ的压测结果:
Seata的压测结果: