提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
基于Atomikos实现XA强一致性分布式事务
提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档
提示:这里可以添加本文要记录的大概内容:
前面介绍的Seata提供XA模式实现分布式事务是用来处理跨JVM进程的分布服务,也就是应用在微服务或者分布式项目中,而接下来介绍的基于Atomikos实现XA强一致性分布式事务是用来处理跨数据库的分布式事务,例如,订单微服务和库存微服务访问同一个数据库也会产生分布式事务,原因是:多个微服务访问同一个数据库,本质上也是通过不同的数据库会话来操作数据库,此时就会产生分布式事务。
无论您是分布式系统的开发者、架构师,还是对分布式事务感兴趣的技术人员,这篇博客都将为您提供有价值的信息和实用的指导。让我们一起探索 Atomikos 的世界,掌握实现 XA 强一致性分布式事务的技能!
提示:以下是本篇文章正文内容,下面案例可供参考
Atomikos(https://www.atomikos.com/),其实是一家公司的名字,提供了基于JTA规范的XA分布式事务TM的实现。其旗下最著名的产品就是事务管理器。
JTA(Java Transaction API)是 Java 平台上的一种事务处理 API,用于实现分布式事务。它提供了一组标准的接口和抽象,使得应用程序可以在多个数据库或其他资源上进行事务操作,并且保证这些操作的原子性、一致性、隔离性和持久性(ACID 属性)。
要想使用用 JTA 事务,那么就需要有一个实现 javax.sql.XADataSource 、 javax.sql.XAConnection 和 javax.sql.XAResource 接口的 JDBC 驱动程序。一个实现了这些接口的驱动程序将可以参与 JTA 事务。一个 XADataSource 对象就是一个 XAConnection 对象的工厂。XAConnection 是参与 JTA 事务的 JDBC 连接。
1.因为Atomikos是用来处理跨数据库事务的,所以要准备两个数据库,分别为tx-xa-01和tx-xa-02,分别在2个数据库中创建转出金额数据库。
DROP TABLE IF EXISTS `user_account`;
CREATE TABLE `user_account` (
`account_no` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT '账户编号',
`account_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '账户名称',
`account_balance` decimal(10, 2) NULL DEFAULT NULL COMMENT '账户余额',
PRIMARY KEY (`account_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
2.向tx-xa-01库中添加数据。
INSERT INTO `user_account` VALUES ('1001', '张三', 10000.00);
3.向tx-xa-02库中添加数据。
INSERT INTO `user_account` VALUES ('1002', '李四', 10000.00);
4.创建一个springboot项目,添加如下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Atomikos依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<!-- druid连接池依赖组件-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.22</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
5.编写配置文件
server:
port: 6003
spring:
autoconfigure:
#停用druid连接池的自动配置
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
datasource:
#选用druid的XADataSource数据源,因为这个数据源支持分布式事务管理
type: com.alibaba.druid.pool.xa.DruidXADataSource
#以下是自定义字段
dynamic:
primary: master
datasource:
master:
url: jdbc:mysql://192.168.66.102:3306/tx-xa-01?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
slave:
url: jdbc:mysql://192.168.66.102:3306/tx-xa-02?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
validation-query: SELCET 1
6.编写主启动类
@Slf4j
@SpringBootApplication
@EnableTransactionManagement(proxyTargetClass = true)
public class TxXaStarter {
public static void main(String[] args){
SpringApplication.run(TxXaStarter.class,args);
log.info("*************** TxXaStarter *********");
}
}
7.创建第一个数据源的配置类DBConfig1
@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.master")
public class DBConfig1 {
private String url;
private String username;
private String password;
private String dataSourceClassName;
}
8.创建第二个数据源的配置类DBConfig2
@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.slave")
public class DBConfig2 {
private String url;
private String username;
private String password;
private String dataSourceClassName;
}
9.创建持久层接口,在这需要创建两个Mapper接口,来操作不同数据库的表
public interface UserAccount1Mapper extends BaseMapper<UserAccount> {
}
public interface UserAccount2Mapper extends BaseMapper<UserAccount> {
}
10.创建MyBatisConfig1类,读取DBConfig1类中的信息,实现数据库连接池,最终通过Atomikos框架的数据库连接池连接数据库并操作。
@Configuration
@MapperScan(basePackages = "mapper1", sqlSessionTemplateRef = "masterSqlSessionTemplate")
public class MyBatisConfig1 {
@Bean(name = "masterDataSource")
public DataSource masterDataSource(DBConfig1 dbConfig1) {
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
sourceBean.setUniqueResourceName("masterDataSource");
sourceBean.setXaDataSourceClassName(dbConfig1.getDataSourceClassName());
sourceBean.setTestQuery("select 1");
sourceBean.setBorrowConnectionTimeout(3);
MysqlXADataSource dataSource = new MysqlXADataSource();
dataSource.setUser(dbConfig1.getUsername());
dataSource.setPassword(dbConfig1.getPassword());
dataSource.setUrl(dbConfig1.getUrl());
sourceBean.setXaDataSource(dataSource);
return sourceBean;
}
@Bean(name = "masterSqlSessionFactory")
public SqlSessionFactory masterSqlSessionFactory(@Qualifier("masterDataSource") DataSource dataSource) throws Exception {
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource);
return sqlSessionFactoryBean.getObject();
}
@Bean(name = "masterSqlSessionTemplate")
public SqlSessionTemplate masterSqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
return new SqlSessionTemplate(sqlSessionFactory);
}
}
11.创建MyBatisConfig2类MyBatisConfig2类的作用与MyBatisConfig1类的作用相似,只不过MyBatisConfig2类读取的是DBConfig2类中的信息,封装的是整合了Atomikos框架的另一个数据源的数据库连接池,通过连接池连接数据库并操作。
@Configuration
@MapperScan(basePackages = "mapper2", sqlSessionTemplateRef = "slaveSqlSessionTemplate")
public class MyBatisConfig2 {
@Bean(name = "slaveDataSource")
public DataSource slaveDataSource(DBConfig2 dbConfig2) {
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
sourceBean.setUniqueResourceName("slaveDataSource");
sourceBean.setXaDataSourceClassName(dbConfig2.getDataSourceClassName());
sourceBean.setTestQuery("select 1");
sourceBean.setBorrowConnectionTimeout(3);
MysqlXADataSource dataSource = new MysqlXADataSource();
dataSource.setUser(dbConfig2.getUsername());
dataSource.setPassword(dbConfig2.getPassword());
dataSource.setUrl(dbConfig2.getUrl());
sourceBean.setXaDataSource(dataSource);
return sourceBean;
}
@Bean(name = "slaveSqlSessionFactory")
public SqlSessionFactory slaveSqlSessionFactory(@Qualifier("slaveDataSource") DataSource dataSource) throws Exception {
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSource);
return sqlSessionFactoryBean.getObject();
}
@Bean(name = "slaveSqlSessionTemplate")
public SqlSessionTemplate slaveSqlSessionTemplate(@Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
return new SqlSessionTemplate(sqlSessionFactory);
}
}
11.创建UserAccount类
@Data
@TableName("user_account")
@AllArgsConstructor
@NoArgsConstructor
public class UserAccount implements Serializable {
private static final long serialVersionUID = 6909533252826367496L;
/**
* 账户编号
*/
@TableId
private String accountNo;
/**
* 账户名称
*/
private String accountName;
/**
* 账户余额
*/
private BigDecimal accountBalance;
}
12.创建UserAccountService接口
public interface UserAccountService {
/**
* 跨库转账
* @param sourceAccountNo 转出账户
* @param targetSourceNo 转入账户
* @param bigDecimal 金额
*/
void transferAccounts(String sourceAccountNo, String targetSourceNo, BigDecimal transferAmount);
}
13.实现UserAccountService接口
/**
* <p>
* 服务实现类
* </p>
*
* @author itbaizhan
* @since 05-13
*/
@Service
public class UserAccountServiceImpl implements IUserAccountService {
@Autowired
private UserAccountMapper1 userAccountMapper1;
@Autowired
private UserAccountMapper2 userAccountMapper2;
/**
* 跨库转账
* @param sourceAccountNo 源账户
* @param targetSourceNo 目标账户
* @param bigDecimal 金额
*/
@Transactional
@Override
public void transofer(String sourceAccountNo, String targetSourceNo, BigDecimal bigDecimal) {
// 1. 查询原账户
UserAccount sourceUserAccount = userAccountMapper1.selectById(sourceAccountNo);
// 2. 查询目标账户
UserAccount targetUserAccount = userAccountMapper2.selectById(targetSourceNo);
// 3. 判断转入账户和转出账户是否为空
if (sourceAccountNo != null && targetUserAccount != null){
// 4. 判断转出账户是否余额不足
if (sourceUserAccount.getAccountBalance().compareTo(bigDecimal) < 0){
throw new RuntimeException("余额不足");
}
// 5.更新金额
sourceUserAccount.setAccountBalance(sourceUserAccount.getAccountBalance().subtract(bigDecimal));
// 6.张三账户减金额
userAccountMapper1.updateById(sourceUserAccount);
System.out.println(10/0);
// 7.更新金额
targetUserAccount.setAccountBalance(targetUserAccount.getAccountBalance().add(bigDecimal));
// 8.张三账户减金额
userAccountMapper2.updateById(targetUserAccount);
}
}
}
提示:这里对文章进行总结:
通过阅读这篇博客,读者可以了解到如何使用 Atomikos 实现 XA 强一致性分布式事务,以及在实践中需要注意的事项。希望这对你有所帮助!如果你有任何问题或需要进一步的帮助,请随时提问。