最大努力通知型分布式事务

发布时间:2023年12月20日

系列文章目录

提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
最大努力通知型分布式事务


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

提示:这里可以添加本文要记录的大概内容:

在当今的分布式系统中,确保数据的一致性和可靠性是至关重要的。而传统的事务处理方式在分布式环境下往往面临着挑战,如网络延迟、节点故障等。为了解决这些问题,最大努力通知型分布式事务应运而生。
最大努力通知型分布式事务是一种柔性事务处理机制,它允许在某些情况下事务可能无法完全成功,但会尽最大努力确保最终的一致性。与传统的原子性事务不同,最大努力通知型分布式事务强调可用性和最终一致性,而不是严格的原子性。
在本博客中,我们将深入探讨最大努力通知型分布式事务的原理、实现以及其在实际应用中的优势。我们将介绍一些常见的最大努力通知型分布式事务解决方案,如基于消息队列的实现、基于定时任务的实现等。同时,我们也会探讨最大努力通知型分布式事务在高并发、高可用场景下的应用。


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是最大努力通知型分布式事务?

最大努力通知型分布式事务(Best-Effort Notification Distributed Transaction)是一种在分布式系统中处理事务的柔性事务模型。与传统的原子性事务不同,最大努力通知型分布式事务不保证事务的原子性,而是专注于尽最大努力确保事务的最终一致性。
在最大努力通知型分布式事务中,事务的参与者(通常是服务或数据库)会尽力完成事务的操作,但并不保证一定能够成功。如果事务执行过程中发生了故障或异常,参与者会尝试采取一些补偿措施来恢复到一致的状态。
最大努力通知型分布式事务的核心思想是通过多次尝试来达成最终一致性。参与者在执行事务操作后,会发送通知给其他相关方,告知事务的执行结果。如果通知发送失败,参与者会进行重试,直到通知成功或达到一定的重试次数。

最大努力通知型分布式事务一定要包含两个方面

  1. 有一定的消息重复通知机制:接收通知方有可能无法接收到通知,一定要具有重复通知的机制。
  2. 消息校对机制:在重复通知仍然没有通知到对方,可由接收通知方主动查询信息。

二、最大努力通知型分布式事务和可靠消息最终一致性分布式消息的区别

最大努力通知型分布式事务是一种事务处理模型,它的主要目标是尽最大努力确保事务的最终一致性。在最大努力通知型分布式事务中,事务的参与者(通常是服务或数据库)会尽力完成事务的操作,但并不保证一定能够成功。如果事务执行过程中发生了故障或异常,参与者会尝试采取一些补偿措施来恢复到一致的状态。最大努力通知型分布式事务就像是你让朋友帮你办事,但不能保证一定能办成。你的朋友会尽最大的努力去做,但如果出现问题,他可能会告诉你事情没办成,或者尝试其他方法来解决。
可靠消息最终一致性分布式消息是一种消息传递模型,它的主要目标是确保消息的可靠传递和最终一致性。在可靠消息最终一致性分布式消息中,消息发送方将消息发送到一个可靠的消息中间件,消息中间件负责将消息可靠地传递给接收方。如果消息传递过程中发生了故障或异常,消息中间件会尝试重新发送消息,直到消息被成功接收。可靠消息最终一致性分布式消息就像是你给朋友寄了一封信,你可以确定他最终会收到这封信,但不能保证他什么时候会收到。信件可能会在路上丢失或延误,但最终它会到达朋友手中。
最大努力通知型分布式事务和可靠消息最终一致性分布式消息的主要区别在于:

  1. 事务处理和消息传递:最大努力通知型分布式事务关注的是事务的处理和最终一致性,而可靠消息最终一致性分布式消息关注的是消息的可靠传递和最终一致性。
  2. 可靠性保证:最大努力通知型分布式事务并不保证事务的原子性,可能会存在数据不一致的情况;而可靠消息最终一致性分布式消息保证消息的可靠传递,通常会实现至少一次传递。
  3. 补偿措施:最大努力通知型分布式事务在事务执行失败时会尝试采取补偿措施来恢复到一致的状态;而可靠消息最终一致性分布式消息通常不需要补偿措施,因为消息中间件会负责确保消息的可靠传递
  4. 应用场景:最大努力通知型分布式事务适用于对事务的一致性要求不高,但需要尽可能保证最终一致性的场景。例如,电商系统中的订单状态更新,虽然不能保证每次更新都能成功,但可以通过多次重试来尽可能确保最终一致性。可靠消息最终一致性分布式消息适用于需要保证消息可靠传递和最终一致性的场景。例如,金融系统中的交易确认消息,必须确保消息被可靠地传递和处理,以保证交易的正确性和一致性。

三、最大努力通知型分布式事务的步骤

  1. 发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。
  2. 接收通知方监听 MQ。
  3. 接收通知方接收消息,业务处理完成回应ack。
  4. 接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔(如果MQ采用 rocketMq,在broker中可进行配置),直到达到通知要求的时间窗口上限。
  5. 接收通知方可通过消息校对接口来校对消息的一致性。

四、最大努力通知型分布式事务的项目实战

数据库设计

1.创建tx-notifymsg-account库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for account_info
-- ----------------------------
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info` (
 `id` int(11) NOT NULL COMMENT '主键id',
 `account_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '账户',
 `account_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账户名',
 `account_balance` decimal(10, 2) NULL DEFAULT NULL COMMENT '账户余额',
 PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of account_info
-- ----------------------------
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info` (
 `tx_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '充值记录流水号',
 `account_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账户',
 `pay_amount` decimal(10, 2) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值金额',
 `pay_result` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值结果',
 `pay_time` datetime(0) NOT NULL COMMENT '充值时间',
 PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;

2.创建tx-notifymsg-payment库

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for pay_info
-- ----------------------------
DROP TABLE IF EXISTS `pay_info`;
CREATE TABLE `pay_info` (
 `tx_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '充值记录流水号',
 `account_no` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账户',
 `pay_amount` decimal(10, 2) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值金额',
 `pay_result` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '充值结果',
 `pay_time` datetime(0) NOT NULL COMMENT '充值时间',
 PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Records of pay_info
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;

Rocketmq准备

1.安装NameServer,拉取镜像

docker pull rocketmqinc/rocketmq

2.创建数据存储目录

mkdir -p /docker/rocketmq/data/namesrv/logs /docker/rocketmq/data/namesrv/store

3.启动NameServer

docker run -d \
--restart=always \
--name rmqnamesrv  \
-p 9876:9876 \
-v /docker/rocketmq/data/namesrv/logs:/root/logs \
-v /docker/rocketmq/data/namesrv/store:/root/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
rocketmqinc/rocketmq \
sh mqnamesrv

4.编辑border配置

vim /docker/rocketmq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster 
#broker名称,master和slave使用相同的名称,表明他们的主从关系 
brokerName = broker-a 
#0表示Master,大于0表示不同的
slave brokerId = 0 
#表示几点做消息删除动作,默认是凌晨4点 
deleteWhen = 04 
#在磁盘上保留消息的时长,单位是小时 
fileReservedTime = 48 
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机 制;
brokerRole = ASYNC_MASTER 
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后 才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH 
# 设置broker节点所在服务器的ip地址 
brokerIP1 = 192.168.66.100
#剩余磁盘比例 
diskMaxUsedSpaceRatio=99

5.启动broker

docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

6.关闭防火墙(或者开放端口)

#关闭防火墙 
systemctl stop firewalld.service 
#禁止开机启动 
systemctl disable firewalld.service

7.部署RocketMQ的管理工具

#创建并启动容器 
docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.66.100:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 pangliang/rocketmq-console-ng

项目准备

1.创建Maven父项目,创建子工程,大家自行创建
2.引入依赖

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>com.baomidou</groupId>
      <artifactId>mybatis-plus-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-spring-boot-starter</artifactId>
      <version>2.0.1</version>
    </dependency>
    <!--  引入nacos依赖 -->
    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
  </dependencies>

3.编写主启动类

@EnableDiscoveryClient
@MapperScan("com.itbaizhan.payment.mapper")
@SpringBootApplication
@Slf4j
public class PayMain7071 {
  public static void main(String[] args) {
    SpringApplication.run(PayMain7071.class,args);
    log.info("*********** 充值服务启动成功 *********");
   }
}

4.编写配置文件

server:
  port: 7071
spring:
  cloud:
   nacos:
    discovery:
     server-addr: 192.168.66.100:8848
  application:
   name: tx-notifymsg-pay
  datasource:
   url: jdbc:mysql://192.168.66.100:3306/tx-notifymsg-payment?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSSL=false
   username: root
   password: 123456
   driver-class-name: com.mysql.cj.jdbc.Driver




################ RocketMQ 配置 ##########
rocketmq:
  name-server: 192.168.66.100:9876
  producer:
   group: payment-group

5.充值微服务的业务逻辑层主要完成充值的业务逻辑处理,当充值成功时,会向RocketMQ发送充值结果信息,同时提供业务逻辑层查询充值结果信息的接口。编写充值接口。

public interface IPayInfoService extends IService<PayInfo> {
  /**
   * 保存充值信息
   */
  PayInfo savePayInfo(PayInfo payInfo);
  /**
   * 查询指定的充值信息
   */
  PayInfo getPayInfoByTxNo(String txNo);
}

重点一

6.充值接口实现

@Slf4j
@Service
public class PayInfoServiceImpl extends ServiceImpl<PayInfoMapper, PayInfo> implements IPayInfoService {
  @Resource
  private PayInfoMapper payInfoMapper;
  @Resource
  private RocketMQTemplate rocketMQTemplate;
  @Override
  public PayInfo savePayInfo(PayInfo payInfo) {
    payInfo.setTxNo(UUID.randomUUID().toString().replace("-",""));
    payInfo.setPayResult("success");
    payInfo.setPayTime(LocalDateTime.now());
    int count = payInfoMapper.insert(payInfo);
    //充值信息保存成功
    if(count > 0){
      log.info("充值微服务向账户微服务发送结果消息");
      //发送消息通知账户微服务
      rocketMQTemplate.convertAndSend("topic_nofitymsg", JSON.toJSONString(payInfo));
      return payInfo;
     }
    return null;
   }
  @Override
  public PayInfo getPayInfoByTxNo(String txNo) {
    return baseMapper.selectById(txNo);
   }
}

7.编写充值接口

@RestController
@RequestMapping("/payInfo")
public class PayInfoController {


  @Autowired
  private IPayInfoService payInfoService;


  /**
   * 充值
   * @param payInfo
   * @return
   */
  @GetMapping(value = "/pay_account")
  public PayInfo pay(PayInfo payInfo){
    //生成事务编号
    return payInfoService.savePayInfo(payInfo);
   }


  /**
   * 查询充值结果
   * @param txNo
   * @return
   */
  @GetMapping(value = "/query/payresult/{txNo}")
  public PayInfo payResult(@PathVariable("txNo") String txNo){
    return payInfoService.getPayInfoByTxNo(txNo);
   }
}

8.创建子工程account,引入依赖

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>com.baomidou</groupId>
      <artifactId>mybatis-plus-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-spring-boot-starter</artifactId>
      <version>2.0.1</version>
    </dependency>
    <!--   引入Nacos注册中心   -->
    <dependency>
      <groupId>com.alibaba.cloud</groupId>
      <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <!--   引入OpenFeign -->
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
    <!--   引入负载均衡器-->
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-loadbalancer</artifactId>
    </dependency>
  </dependencies>



9.编写配置文件

server:
  port: 7070
spring:
  cloud:
   nacos:
    discovery:
     server-addr: 192.168.66.100:8848
  application:
   name: tx-notifymsg-account
  datasource:
   url: jdbc:mysql://192.168.66.100:3306/tx-notifymsg-account?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSSL=false
   username: root
   password: 123456
   driver-class-name: com.mysql.jdbc.Driver
################ RocketMQ 配置 ##########
rocketmq:
  name-server: 192.168.66.100:9876

重点二

10.RocketMQ消费充值信息

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_account", topic = "topic_nofitymsg")
public class NotifyMsgAccountListener implements RocketMQListener<String> {
  @Autowired
  private IAccountInfoService accountInfoService;
  @Override
  public void onMessage(String message) {
    log.info("账户微服务收到RocketMQ的消息:{}", JSONObject.toJSONString(message));
    //如果是充值成功,则修改账户余额
    PayInfo payInfo = JSON.parseObject(message, PayInfo.class);
    if("success".equals(payInfo.getPayResult())){
      accountInfoService.updateAccountBalance(payInfo);
     }
    log.info("更新账户余额完毕:{}", JSONObject.toJSONString(payInfo));
   }
}

11.编写账户操作接口

 /**
   * 更新账户余额
   */
  void updateAccountBalance(PayInfo payInfo);

12.实现账户操作接口

@Slf4j
@Service
public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
  @Resource
  private AccountInfoMapper accountInfoMapper;
  @Resource
  private PayInfoMapper payInfoMapper;
  /**
   *
   * @param payInfo
   */
  @Transactional(rollbackFor = Exception.class)
  @Override
  public void updateAccountBalance(PayInfo payInfo) {
    if(payInfoMapper.selectById(payInfo.getTxNo()) != null){
      log.info("账户微服务已经处理过当前事务...");
      return;
     }
    LambdaUpdateWrapper<AccountInfo> lambdaUpdateWrapper = new LambdaUpdateWrapper<>();
    lambdaUpdateWrapper.eq(AccountInfo::getAccountNo,payInfo.getAccountNo());
    //更新账户余额
    List<AccountInfo> accountInfos = baseMapper.selectList(lambdaUpdateWrapper);
    if (accountInfos != null && !accountInfos.isEmpty()){
      AccountInfo accountInfo = accountInfos.get(0);
      accountInfo.setAccountBalance(accountInfo.getAccountBalance().add(payInfo.getPayAmount()));
      accountInfoMapper.updateById(accountInfo);
     }
    //保存充值记录
    payInfoMapper.insert(payInfo);
   }
}

13.主启动类加Feign注解

@EnableDiscoveryClient
@EnableFeignClients
@MapperScan("com.itbaizhan.account.mapper")
@SpringBootApplication
@Slf4j
public class AccountMain7070 {
  public static void main(String[] args) {
    SpringApplication.run(AccountMain7070.class,args);
    log.info("*********** AccountMain7070 启动成功 *********");
   }
}

14.编写远程调用接口

@Service
@FeignClient("tx-notifymsg-pay")
public interface IPayFeignService {
  @GetMapping(value = "/payInfo/query/payresult/{txNo}")
  PayInfo payResult(@PathVariable("txNo") String txNo);
}

15.编写查询账户接口

/**
   * 查询充值结果
   */
  PayInfo queryPayResult(String txNo);

16.实现查询账户信息

  /**
   * 查询结果
   * @param txNo
   * @return
   */
  @Override
  public PayInfo queryPayResult(String txNo) {
    try{
      return iPayFeignService.payResult(txNo);
     }catch (Exception e){
      log.error("查询充值结果异常:{}", e);
     }
    return null;
   }

17.编写查询充值结果接口

  /**
   * 主动查询充值结果
   * @param txNo
   * @return
   */
@GetMapping(value = "/query/payresult/{txNo}")
  public ResponseEntity result(@PathVariable("txNo") String txNo){
    return ResponseEntity.ok(accountInfoService.queryPayResult(txNo));
   }

总结

提示:这里对文章进行总结:

最大努力通知型分布式事务适用于一些对事务原子性要求不高,但对最终一致性有要求的场景。它提供了一种在分布式环境下处理事务的灵活方式,可以提高系统的可用性和容错能力。
需要注意的是,最大努力通知型分布式事务并不保证事务的原子性,可能会存在数据不一致的情况。在设计和实现最大努力通知型分布式事务时,需要考虑到可能的失败情况,并采取适当的补偿措施来确保最终一致性。
总的来说,最大努力通知型分布式事务是一种在分布式系统中处理事务的有效方式,它通过多次尝试来确保最终的一致性,提高了系统的可靠性和容错能力。在实际应用中,需要根据具体的业务需求和场景选择合适的事务处理方式。
希望以上博客总结对你有所帮助!如果你有任何问题或需要进一步了解最大努力通知型分布式事务,请随时提问。

文章来源:https://blog.csdn.net/liubopro666/article/details/135091956
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。