基于Atomikos实现XA强一致性分布式事务

发布时间:2023年12月21日

系列文章目录

提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
基于Atomikos实现XA强一致性分布式事务


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

提示:这里可以添加本文要记录的大概内容:

前面介绍的Seata提供XA模式实现分布式事务是用来处理跨JVM进程的分布服务,也就是应用在微服务或者分布式项目中,而接下来介绍的基于Atomikos实现XA强一致性分布式事务是用来处理跨数据库的分布式事务,例如,订单微服务和库存微服务访问同一个数据库也会产生分布式事务,原因是:多个微服务访问同一个数据库,本质上也是通过不同的数据库会话来操作数据库,此时就会产生分布式事务。
无论您是分布式系统的开发者、架构师,还是对分布式事务感兴趣的技术人员,这篇博客都将为您提供有价值的信息和实用的指导。让我们一起探索 Atomikos 的世界,掌握实现 XA 强一致性分布式事务的技能!


提示:以下是本篇文章正文内容,下面案例可供参考

一、什么是Atomikos?

Atomikos(https://www.atomikos.com/),其实是一家公司的名字,提供了基于JTA规范的XA分布式事务TM的实现。其旗下最著名的产品就是事务管理器。

二、什么是JTA

JTA(Java Transaction API)是 Java 平台上的一种事务处理 API,用于实现分布式事务。它提供了一组标准的接口和抽象,使得应用程序可以在多个数据库或其他资源上进行事务操作,并且保证这些操作的原子性、一致性、隔离性和持久性(ACID 属性)。
要想使用用 JTA 事务,那么就需要有一个实现 javax.sql.XADataSource 、 javax.sql.XAConnection 和 javax.sql.XAResource 接口的 JDBC 驱动程序。一个实现了这些接口的驱动程序将可以参与 JTA 事务。一个 XADataSource 对象就是一个 XAConnection 对象的工厂。XAConnection 是参与 JTA 事务的 JDBC 连接。

三、基于Atomikos实现XA强一致性分布式事务实现步骤

1.因为Atomikos是用来处理跨数据库事务的,所以要准备两个数据库,分别为tx-xa-01和tx-xa-02,分别在2个数据库中创建转出金额数据库。

DROP TABLE IF EXISTS `user_account`;
CREATE TABLE `user_account` (
 `account_no` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '账户编号',
 `account_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账户名称',
 `account_balance` decimal(10, 2) NULL DEFAULT NULL COMMENT '账户余额',
 PRIMARY KEY (`account_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;

2.向tx-xa-01库中添加数据。

INSERT INTO `user_account` VALUES ('1001', '张三', 10000.00);

3.向tx-xa-02库中添加数据。

INSERT INTO `user_account` VALUES ('1002', '李四', 10000.00);

4.创建一个springboot项目,添加如下依赖

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!--    Atomikos依赖-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    </dependency>
    <!--    druid连接池依赖组件-->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid-spring-boot-starter</artifactId>
      <version>1.1.22</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <dependency>
      <groupId>com.baomidou</groupId>
      <artifactId>mybatis-plus-boot-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
  </dependencies>

5.编写配置文件

server:
  port: 6003
spring:
  autoconfigure:
  #停用druid连接池的自动配置
   exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
  datasource:
  #选用druid的XADataSource数据源,因为这个数据源支持分布式事务管理
   type: com.alibaba.druid.pool.xa.DruidXADataSource
  #以下是自定义字段
   dynamic:
    primary: master
    datasource:
     master:
      url: jdbc:mysql://192.168.66.102:3306/tx-xa-01?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true
      username: root
      password: 123456
      driver-class-name: com.mysql.jdbc.Driver
     slave:
      url: jdbc:mysql://192.168.66.102:3306/tx-xa-02?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true
      username: root
      password: 123456
      driver-class-name: com.mysql.jdbc.Driver
     validation-query: SELCET 1



6.编写主启动类

@Slf4j
@SpringBootApplication
@EnableTransactionManagement(proxyTargetClass = true)
public class TxXaStarter {
  public static void main(String[] args){
    SpringApplication.run(TxXaStarter.class,args);
    log.info("*************** TxXaStarter *********");
   }
}

7.创建第一个数据源的配置类DBConfig1

@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.master")
public class DBConfig1 {
  private String url;
  private String username;
  private String password;
  private String dataSourceClassName;
}

8.创建第二个数据源的配置类DBConfig2

@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.slave")
public class DBConfig2 {
  private String url;
  private String username;
  private String password;
  private String dataSourceClassName;
}

9.创建持久层接口,在这需要创建两个Mapper接口,来操作不同数据库的表

public interface UserAccount1Mapper extends BaseMapper<UserAccount> {
}
public interface UserAccount2Mapper extends BaseMapper<UserAccount> {
}

10.创建MyBatisConfig1类,读取DBConfig1类中的信息,实现数据库连接池,最终通过Atomikos框架的数据库连接池连接数据库并操作。

@Configuration
@MapperScan(basePackages = "mapper1", sqlSessionTemplateRef = "masterSqlSessionTemplate")
public class MyBatisConfig1 {


  @Bean(name = "masterDataSource")
  public DataSource masterDataSource(DBConfig1 dbConfig1) {
    AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
    sourceBean.setUniqueResourceName("masterDataSource");
    sourceBean.setXaDataSourceClassName(dbConfig1.getDataSourceClassName());
    sourceBean.setTestQuery("select 1");
    sourceBean.setBorrowConnectionTimeout(3);
    MysqlXADataSource dataSource = new MysqlXADataSource();
    dataSource.setUser(dbConfig1.getUsername());
    dataSource.setPassword(dbConfig1.getPassword());
    dataSource.setUrl(dbConfig1.getUrl());
    sourceBean.setXaDataSource(dataSource);
    return sourceBean;
   }


  @Bean(name = "masterSqlSessionFactory")
  public SqlSessionFactory masterSqlSessionFactory(@Qualifier("masterDataSource") DataSource dataSource) throws Exception {
    MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
    sqlSessionFactoryBean.setDataSource(dataSource);
    return sqlSessionFactoryBean.getObject();
   }


  @Bean(name = "masterSqlSessionTemplate")
  public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
    return new SqlSessionTemplate(sqlSessionFactory);
   }
}

11.创建MyBatisConfig2类MyBatisConfig2类的作用与MyBatisConfig1类的作用相似,只不过MyBatisConfig2类读取的是DBConfig2类中的信息,封装的是整合了Atomikos框架的另一个数据源的数据库连接池,通过连接池连接数据库并操作。

@Configuration
@MapperScan(basePackages = "mapper2", sqlSessionTemplateRef = "slaveSqlSessionTemplate")
public class MyBatisConfig2 {
  @Bean(name = "slaveDataSource")
  public DataSource slaveDataSource(DBConfig2 dbConfig2) {
    AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
    sourceBean.setUniqueResourceName("slaveDataSource");
    sourceBean.setXaDataSourceClassName(dbConfig2.getDataSourceClassName());
    sourceBean.setTestQuery("select 1");
    sourceBean.setBorrowConnectionTimeout(3);
    MysqlXADataSource dataSource = new MysqlXADataSource();
    dataSource.setUser(dbConfig2.getUsername());
    dataSource.setPassword(dbConfig2.getPassword());
    dataSource.setUrl(dbConfig2.getUrl());
    sourceBean.setXaDataSource(dataSource);
    return sourceBean;
   }
  @Bean(name = "slaveSqlSessionFactory")
  public SqlSessionFactory slaveSqlSessionFactory(@Qualifier("slaveDataSource") DataSource dataSource) throws Exception {
    MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
    sqlSessionFactoryBean.setDataSource(dataSource);
    return sqlSessionFactoryBean.getObject();
   }
  @Bean(name = "slaveSqlSessionTemplate")
  public SqlSessionTemplate slaveSqlSessionTemplate(@Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
    return new SqlSessionTemplate(sqlSessionFactory);
   }
}

11.创建UserAccount类

@Data
@TableName("user_account")
@AllArgsConstructor
@NoArgsConstructor
public class UserAccount implements Serializable {
  private static final long serialVersionUID = 6909533252826367496L;


  /**
   * 账户编号
   */
  @TableId
  private String accountNo;


  /**
   * 账户名称
   */
  private String accountName;


  /**
   * 账户余额
   */
  private BigDecimal accountBalance;
}

12.创建UserAccountService接口

public interface UserAccountService {
   /**
   * 跨库转账
   * @param sourceAccountNo 转出账户
   * @param targetSourceNo 转入账户
   * @param bigDecimal 金额
   */
   void transferAccounts(String sourceAccountNo, String targetSourceNo, BigDecimal transferAmount);
}

13.实现UserAccountService接口

/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author itbaizhan
 * @since 05-13
 */
@Service
public class UserAccountServiceImpl implements IUserAccountService {
  @Autowired
  private UserAccountMapper1 userAccountMapper1;
  @Autowired
  private UserAccountMapper2 userAccountMapper2;
  /**
   * 跨库转账
   * @param sourceAccountNo 源账户
   * @param targetSourceNo 目标账户
   * @param bigDecimal 金额
   */
  @Transactional
  @Override
  public void transofer(String sourceAccountNo, String targetSourceNo, BigDecimal bigDecimal) {
    // 1. 查询原账户
    UserAccount sourceUserAccount = userAccountMapper1.selectById(sourceAccountNo);
    // 2. 查询目标账户
    UserAccount targetUserAccount = userAccountMapper2.selectById(targetSourceNo);
    // 3. 判断转入账户和转出账户是否为空
    if (sourceAccountNo != null && targetUserAccount != null){
      // 4. 判断转出账户是否余额不足
      if (sourceUserAccount.getAccountBalance().compareTo(bigDecimal) < 0){
        throw new RuntimeException("余额不足");
       }
      // 5.更新金额
      sourceUserAccount.setAccountBalance(sourceUserAccount.getAccountBalance().subtract(bigDecimal));
      // 6.张三账户减金额
      userAccountMapper1.updateById(sourceUserAccount);
      System.out.println(10/0);
      // 7.更新金额
      targetUserAccount.setAccountBalance(targetUserAccount.getAccountBalance().add(bigDecimal));
      // 8.张三账户减金额
      userAccountMapper2.updateById(targetUserAccount);


     }
   }
}




总结

提示:这里对文章进行总结:
通过阅读这篇博客,读者可以了解到如何使用 Atomikos 实现 XA 强一致性分布式事务,以及在实践中需要注意的事项。希望这对你有所帮助!如果你有任何问题或需要进一步的帮助,请随时提问。

文章来源:https://blog.csdn.net/liubopro666/article/details/135069924
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。