GitHub:https://github.com/seata/seatastars: 20.6k
最新版本: v1.6.1 Dec 22, 2022
官方文档:http://seata.io/zh-cn/index.html
官方仅仅支持同步调用。 官方在FAQ中表示对于异步框架需要自行支持。 具体的扩展思路看查阅下文中的事务传播章节。
需要理解XA规范, 或者2PC提交流程。(XA是一个规范,2PC是这个规范的具体体现,理念是一样的)
TC
TM
RM
TCC即Try-Confirm-Cancel。针对每个操作都要编排一个与其对应的确认和补偿(撤销操作)。
TCC模式不依赖于底层数据资源的事务支持,依靠开发者自己编排的Try-Confirm-Cancel逻辑来进行全局事务数据维护,Seata仅用来用来管理全局事务,统筹分支事务状态,并决策执行Confirm Or Cancel(针对已注册到TC的分支事务)。
TCC的优势在于不会对资源有任何的锁定和占用,性能较高。缺点是需要自行编排逻辑,需要对业务流转比较清楚,与业务的耦合性较高。 (Seata的TCC如果使用Fence还是会有一定的资源占用)
Seata通过方法级别的注解@TwoPhaseBusinessAction来指定 Try-Confirm-Cancel。
@TwoPhaseBusinessAction(name = “beanName”, commitMethod = “commit”, rollbackMethod = “rollback”, useTCCFence = true)
注解所在方法即为Try, commitMethod指定Confirm, rollbackMethod指定Cancel。 name用来指定beanName,即这些方法所在的bean。 如果希望通过Seata解决TCC的幂等,空回滚,悬挂等问题,就设置useTCCFence为true,同时需要建表tcc_fence_log。 useTCCFence默认为false,即开发者自己通过逻辑编排解决幂等,空回滚,悬挂这些TCC的问题。
接下来,看一下useTCCFence为true时Seata是如何解决这些问题的:
在 commit/cancel 阶段,因为 TC 没有收到分支事务的响应,需要进行重试,这就要分支事务支持幂等。
分支事务提交时,如果useTCCFence为true则会走到TCCFenceHandler 类中的 commitFence 逻辑,首先会判断 tcc_fence_log 表中是否已经有记录,如果有记录,则判断事务执行状态并返回。这样如果判断到事务的状态已经是 STATUS_COMMITTED,就不会再次提交,保证了幂等。如果 tcc_fence_log 表中没有记录,则插入一条记录,供后面重试时判断。
Rollback 的逻辑跟 commit 类似,逻辑在类 TCCFenceHandler 的 rollbackFence 方法。
对于分布式事务来说,在try阶段,一般我们会调用多个服务。但是在全局事务回滚时,并非一定是所有的分支事务都已进行了提交。若分支事务未提交就因为全局事务失败产生了回滚,就会出现空回滚的情况, 即回滚了一个未提交的分支事务。
如图,账户服务的分支事务执行失败导致分布式事务回滚,此时账户服务的Cannel依然会被执行,就发生了空回滚现象。
面对这种情况,Seata的处理策略是在try阶段往tcc_fence_log表里面插入一条数据,status字段是STATUS_TRIED,在Rollback阶段判断是否存在,如果不存在,则不执行回滚操作。
所谓的事务悬挂就是指因为某些原因分支事务在全局事务回滚之后提交。
如图,分支事务的try阶段因为某些原因(如网络阻塞)阻塞,阻塞过程中全局事务发生了回滚。回滚结束后,分支事务被收到并执行,此时就发生了事务悬挂。
对于事务悬挂,Seata的TCC模式的处理策略是在Rollback时,首先判断tcc_fence_log 中是否存在当前分支事务xid的记录,如果不存在则插入一条记录,状态是STATUS_SUSPENDED,并且不再执行回滚操作。而分支事务的try的第一步也是向tcc_fence_log 表插入xid的记录,这样若后面分支事务产生悬挂现象,也会因为tcc_fence_log 表中已有xid的记录而造成主键冲突,分支事务无法执行。从而避免了事务悬挂。 另外Rollback时对tcc_fence_log 的查询是select for update。 所以也就不会说出现并发的问题。
需要注意,TCC除了幂等,空回滚,事务悬挂问题之外。不合理的业务编排还可能导致脏写和脏读的问题,因为try执行完成后数据库事务已提交,此时数据对于其它事务已处于可见状态。如果在业务编排时未考虑此场景,就可能出现其它事务读取只进行了try而未Confirm的数据,造成脏写或脏读。 因为在业务编排时我们一定要注意,非特殊情况,TCC修改的数据要在confirm阶段后才应该对其它业务可见,在confirm之前数据应该处于数据库可见,而业务不可见的状态。
使用事务资源对XA协议的支持,以 XA 协议的机制来管理分支事务的一种 事务模式。
如图所示:
Seata XA 模式需要依赖于数据源的XA模式,因此数据源需要是支持XA 事务的数据库。
简单的说,XA模式在数据库事务启动时注册分支事务,数据库事务执行完成后并不提交,而是反馈给TC结果。TC根据分支事务的执行结果再通知分支事务(也就是数据库事务)提交或者回滚,之后数据库事务才算真正的结束。
可以看到XA模式在全局事务执行期间需要一直保持数据库事务为已执行未提交状态,需要长期占据RM已经数据库的本地锁即连接资源,并发度必然会受到影响。好处是不会出现各种事务隔离问题。
鉴于此,Seata提供了AT模式。
脱胎于XA规范。两阶段提交:
如图AT模式下,每一个分支事务操作时,都先将其操作的目标数据查出来。并记录到undolog表中(注意这个是自建的undolog表,而不是mysql的undolog日志)。并对目标数据加全局锁。如果二阶段全局事务提交,则解开全局锁。如果二阶段全局事务回滚,则从undolog表里面查出修改前的数据,再进行目标数据更新操作,以进行反向补偿。
这样做的好处是在一阶段就释放事务资源的资源,无需长时间占据事务资源。缺点是需要进行对事务资源进行额外的数据资源操作,需要额外的负载。
AT 模式通过全局锁来实现写隔离:
也就是说,AT模式在提交之前,都会对影响到的目标数据加一个全局锁,如果锁成功,则继续提交事务,如果获取锁失败,则需要等待全局锁。可以理解AT模式会对目标数据加一个分布式锁以此来保证同一数据同时只能有一个事务操作,从而解决了脏写问题。
如果还不理解写隔离,下面以两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000为例:
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
如果数据库默认的事务隔离级别是读已提交(Read Committed)或以上, 那么Seata默认的分布式事务隔离级别是读未提交(Read Uncommitted) 。是无法避免脏读的。
因为AT模式在第一阶段是真真切切的提交了数据库事务的,而全局事务仅仅读是不加全局锁的,因此其它事务的读自然可以读到这个全局事务未提交,数据库事务已提交的数据。
若流程需要全局事务是读已提交(Read Committed), 则需要使用select for update。 Seata会对加了select for update的读操作加全局锁,直到获取成功才执行实际的查询。如图:
Seata没有使用默认的全局事务RC模式也是出于性能考虑。
可以看到上文提到的通过select for update实现读写隔离仅仅是在两个全局事务之间,如果一个流程是全局事务,一个流程非全局事务。Seata就无法实现全局事务的读写隔离了。毕竟AT模式的一阶段完成,数据库事务实际上是已经结束了,数据已经处于可见状态。
但是有的事务确实就是一个简单的事务,而全局事务相较于简单的数数据库事务要重的多。如果为了实现全局的读写隔离,就都加入全局事务,那性能必然会受更大的影响。因此Seata推出了@GlobalLock注解。 @GlobalLock简化了rpc过程,使其做到更高的性能。当然select for update依然还是需要的。
更多的关于Seata事务隔离的信息可以见官方文档: Seata事务隔离
需要注意的是,如果一个表存在全局事务和非全局事务混用(无论是逻辑更新还是说你直接用sql更新了数据),就可能导致脏事务。Seata在事务失败回滚时会确认undolog中的镜像数据是否和当前表中的一致,如果不一致,内部会抛出一个SQLUndoDirtyException异常(内部异常,不会抛到外部)并终止回滚流程(源码见io.seata.rm.datasource.DataSourceManager)。此时就形成了脏事务。
脏数据需手动处理,根据日志提示修正数据或者将对应undo删除(可自定义实现FailureHandler做邮件通知或其他)。
Seata还支持关闭回滚时undo镜像校验,当然该方案是很不推荐的。
事务悬挂定义在上文中的TCC模式中已经提过,这里不再赘述。
AT模式下产生事务悬挂的场景是:分支事务a注册TC后,a的本地事务提交前发生了全局事务回滚,此时就会导致全局事务回滚成功,而a资源被占用掉,产生了资源悬挂问题。
Seata AT模式的防悬挂措施是a回滚时发现回滚undo还未插入,则插入一条log_status=1的undo记录,a本地事务(业务写操作sql和对应undo为一个本地事务)提交时会因为undo表唯一索引冲突而提交失败。
必要配置 | #store.mode=db需要以下配置 | #store.mode=redis 需要以下配置 |
---|---|---|
registry.type–注册中心类型,默认file | store.db.driverClassName | store.redis.host |
config.type–配置中心类型,默认file | store.db.url | store.redis.port |
store.mode–事务会话信息存储方式 | store.db.user | store.redis.database |
store.db.password | store.redis.password |
事务信息存储模式
更多配置见seata参数配置
-h: 注册到注册中心的ip
-p: Server rpc 监听端口
-m: 全局事务会话信息存储模式,file、db、redis,优先读取启动参数 (Seata-Server 1.3及以上版本支持redis)
-n: Server node,多个Server时,需区分各自节点,用于生成不同区间的transactionId,以免冲突
-e: 多环境配置参考 http://seata.io/en-us/docs/ops/multi-configuration-isolation.html
注: 官方建议堆内存分配2G,堆外内存1G
三种集成方式, 分表对应不依赖于spring boot, 依赖于spring boot, 依赖于spring cloud。 根据自己的项目选择一个即可。
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>${seata.version}</version>
</dependency>
@Primary
@Bean("dataSource")
public DataSource dataSource(DataSource druidDataSource) {
// 以下两个二选一
//AT 代理
return new DataSourceProxy(druidDataSource);
//XA 代理
return new DataSourceProxyXA(druidDataSource)
}
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
String applicationName = this.applicationContext.getEnvironment().getProperty("spring.application.name");
String txServiceGroup = this.seataProperties.getTxServiceGroup();
if (StringUtils.isEmpty(txServiceGroup)) {
txServiceGroup = applicationName + "-fescar-service-group";
this.seataProperties.setTxServiceGroup(txServiceGroup);
}
return new GlobalTransactionScanner(applicationName, txServiceGroup);
}
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>${seata.version}</version>
</dependency>
@Primary
@Bean("dataSource")
public DataSource dataSource(DataSource druidDataSource) {
// 以下两个二选一
//AT 代理
return new DataSourceProxy(druidDataSource);
//XA 代理
return new DataSourceProxyXA(druidDataSource)
}
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency
配置项 | 描述 | 备注 |
---|---|---|
registry.type | 注册中心 | |
config.type | 配置中心 | |
service.vgroupMapping.my_test_tx_group | 事务群组 | |
(相关概念见链接) | my_test_tx_group为分组,配置项值为TC集群名 | |
service.default.grouplist | TC服务列表 | 仅注册中心为file时使用 |
service.disableGlobalTransaction | 全局事务开关 | 默认false。false为开启,true为关闭 |
更多配置见seata参数配置
目前 seata-all 中已经支持:Apache Dubbo、Alibaba Dubbo、sofa-RPC、Motan、gRpc、httpClient,对于 Spring Cloud 的支持,需要引用 spring-cloud-alibaba-seata。如果符合以上条件,则这一步骤不需要。
如果想了解原理,或者是使用的其他自研框架、异步模型、消息消费事务模型,则需要进行这一步结合 API 自行支持。
对于异步模型的个人见解: 异步模型可以考虑使用CountdownLaunch,需要TC决定提交全局事务前通知TC还有新的分支事务。
在了解如果配置事务传播之前,要先明白Seata 的事务上下文。
Seata 的事务上下文由RootContext 来管理。
在开启一个Seate全局事务后,RootContext 会自动绑定该事务的XID,事务结束后(提交或回滚完成),RootContext 会自动解绑 XID。
// 绑定 XID
RootContext.bind(xid);
// 解绑 XID
String xid = RootContext.unbind();
可以通过RootContext获取当前全局事务的XID,或者判定当前是否在全局事务中。
// 获取 XID
String xid = RootContext.getXID();
// 当前是否在全局事务中
boolean inGlobalTransaction = RootContext.inGlobalTransaction();
通过简单的查看源码就可以看到,RootContext的实现是依赖于ThreadLocal的。
根据官方文档描述:Seata 全局事务的传播机制就是指事务上下文的传播,根本上,就是 XID 的应用运行时的传播方式。 -这一句很重要,理解这一句话做适配的事务传播开发才更得心应手。翻译过来就是所谓的事务传播就是通过RootContext.bind 将不同模块绑定同一个XID的过程。
默认的,RootContext 的实现是基于 ThreadLocal 的,即 XID 绑定在当前线程上下文中。所以服务内部的 XID 传播通常是天然的通过同一个线程的调用链路串连起来的。默认不做任何处理,事务的上下文就是传播下去的。相关代码可以看源码: io.seata.core.context.ThreadLocalContextCore
同时,可以通过RootContext来挂起事务。比如希望某一段流程运行在全局事务外。
// 挂起(暂停)
String xid = RootContext.unbind();
// TODO: 运行在全局事务外的业务逻辑
// 恢复全局事务上下文
RootContext.bind(xid);
通过上述基本原理,可以得出:
跨服务调用场景下的事务传播,本质上就是要把 XID 通过服务调用传递到服务提供方,并绑定到 RootContext 中去。
只要能做到这点,理论上 Seata 可以支持任意的微服务框架。
Dubbo对全局事务的支持
这里以内置的对Dubbo的事务传播支持机制为例,来说明如何实现一个RPC框架对全局事务传播行为的支持。
_org.apache.dubbo.rpc.Filter_是Dubbo提供的拦截器,其功能类似web的filter。这里通过他来完成XID的自动装配与绑定。
/**
* The type Transaction propagation filter.
*/
@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID(); // 获取当前事务 XID
String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID); // 获取 RPC 调用传递过来的 XID
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
}
boolean bind = false;
if (xid != null) { // Consumer:把 XID 置入 RPC 的 attachment 中
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
} else {
if (rpcXid != null) { // Provider:把 RPC 调用传递来的 XID 绑定到当前运行时
RootContext.bind(rpcXid);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[" + rpcXid + "] to RootContext");
}
}
}
try {
return invoker.invoke(invocation); // 业务方法的调用
} finally {
if (bind) { // Provider:调用完成后,对 XID 的清理
String unbindXid = RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
if (unbindXid != null) { // 调用过程有新的事务上下文开启,则不能清除
RootContext.bind(unbindXid);
LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
}
}
}
}
}
}
消费方和提供方都加入这个拦截器就可以进行Dubbo调用的全局事务自动传播了。
更多的事务传播示例可以查看seata-samples或者 seata-integration。
@GetMapping(value = "testCommit")
@GlobalTransactional
public Object testCommit(@RequestParam(name = "id",defaultValue = "1") Integer id,
@RequestParam(name = "sum", defaultValue = "1") Integer sum) {
Boolean ok = productService.reduceStock(id, sum);
if (ok) {
LocalDateTime now = LocalDateTime.now();
Orders orders = new Orders();
orders.setCreateTime(now);
orders.setProductId(id);
orders.setReplaceTime(now);
orders.setSum(sum);
orderService.save(orders);
return "ok";
} else {
return "fail";
}
}
@Bean
public AspectTransactionalInterceptor aspectTransactionalInterceptor () {
return new AspectTransactionalInterceptor();
}
@Bean
public Advisor txAdviceAdvisor(AspectTransactionalInterceptor aspectTransactionalInterceptor ) {
AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
pointcut.setExpression("配置切点表达式使全局事务拦截器生效");
return new DefaultPointcutAdvisor(pointcut, aspectTransactionalInterceptor);
}
AT和XA模式开启全局事务即可,不需要其它额外的操作。
Tcc模式除了需要开启全局事务之外,还需要使用**@TwoPhaseBusinessAction**注解来定义try,commit,cancel。此注解定义在方法上。
// @LocalTCC标识这是一个本地的TCC,仅在TCC参与者是 本地bean时需要
@LocalTCC
public interface TccAction {
/**
* 定义两阶段提交 name = 该tcc的bean名称,全局唯一 commitMethod = commit 为二阶段确认方法 rollbackMethod = rollback 为二阶段取消方法
* useTCCFence=true 为开启防悬挂
* BusinessActionContextParameter注解 传递参数到二阶段中
*
* @param params -入参
* @return String
*/
@TwoPhaseBusinessAction(name = "beanName", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true)
public void insert(@BusinessActionContextParameter(paramName = "params") Map<String, String> params) {
logger.info("此处可以预留资源,或者利用tcc的特点,与AT混用,二阶段时利用一阶段在此处存放的消息,通过二阶段发出,比如redis,mq等操作");
}
/**
* 确认方法、可以另命名,但要保证与commitMethod一致 context可以传递try方法的参数
*
* @param context 上下文
* @return boolean
*/
public void commit(BusinessActionContext context) {
logger.info("预留资源真正处理,或者发出mq消息和redis入库");
}
/**
* 二阶段取消方法
*
* @param context 上下文
* @return boolean
*/
public void rollback(BusinessActionContext context) {
logger.info("预留资源释放,或清除一阶段准备让二阶段提交时发出的消息缓存");
}
}
@LocalTCC
@LocalTCC注解标记这是一个本地的TCC。仅在TCC参与者是 本地bean(非远程RPC服务)时需要在 接口定义中添加 该 注解。如果这是一个远程RPC服务的定义接口,则不需要添加这个这个注解。目前TCC支持 Loacl, Dubbo, HSF, Sofa。
在AT模式,@Transactional 可与 DataSourceTransactionManager 和 JTATransactionManager 连用分别表示本地事务和XA分布式事务。@Transactional和@GlobalTransactional连用,@Transactional 只能位于标注在@GlobalTransactional的同一方法层次或者位于@GlobalTransaction 标注方法的内层。这里分布式事务的概念要大于本地事务,若将 @Transactional 标注在外层会导致分布式事务空提交,当@Transactional 对应的 connection 提交时会报全局事务正在提交或者全局事务的xid不存在。
和@Transactional一样,全局事务的回滚需要捕获到异常。因此如果要回滚,异常一定要抛到最外层。如果异常在调用的过程中被吞掉的话,即使失败也是不会回滚的。 如果不想在调用中抛大量的异常。可以通过返回错误码,然后最外层根据错误码抛异常即可。
参考文档:https://zhuanlan.zhihu.com/p/561308610
https://juejin.cn/post/6911183702790209549
http://seata.io/zh-cn/docs/overview/what-is-seata.html
GitHub:https://github.com/codingapi/tx-lcnstars: 4k
最新版本: 5.0.2.RELEASE Feb 23, 2019
社区比较不活跃