首先理解什么是本地事务?
说白了就是通过数据库可以控制事物,使用服务自己的数据库来控制事物
我们在java代码中会使用@Transactional注解进行标注
平常我们在程序中通过spring去控制事务是利用数据库本身的事务特性来实现的,因此叫数据库事务,
由于应用主要靠关系数据库来控制事务,此数据库只属于该应用,所以基于本应用自己的关系型数据库的事务又被称为本地事务。
本地事务具有ACID四大特性,
数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元,
该执行单元中的所有操作 要么都成功,要么都失败,
只要其中任一操作执行失败,都将导致整个事务的回滚。
了解了本地事务之后再了解一下分布式事务
如下所示,内容管理数据库、Redis、Elasticsearch、Minio都是通过网络来交互的
在分布式系统下,通过多个服务(网络交互)来完成一件事,这个时候就存在分布式事务
现在的需求是课程发布操作后将数据写入数据库、redis、elasticsearch、MinIO四个地方,这四个地方已经不限制在一个数据库内,是由四个分散的服务去提供,与这四个服务去通信需要网络通信,而网络存在不可到达性,这种分布式系统环境下,通过与不同的服务进行网络通信去完成事务称之为分布式事务。
在分布式系统中分布式事务的场景很多:
例如用户注册送积分,银行转账,创建订单减库存,这些都是分布式事务
转账举例:
我们知道本地事务依赖数据库本身提供的事务特性来实现,因此以下逻辑可以控制本地事务:
begin transaction;
//1.本地数据库操作:张三减少金额
//2.本地数据库操作:李四增加金额
commit transation;
但是在分布式环境下,会变成下边这样:
说白了就是多个服务完成同一个事物
begin transaction;
//1.本地数据库操作:张三减少金额
//2.远程调用:让李四增加金额
commit transation;
当远程调用让李四增加金额成功了,由于网络问题远程调用并没有返回,此时本地事务提交失败就回滚了张三减少金额的操作,此时张三和李四的数据就不一致了
相当于测试张三金额没有少,但是李四的金额增多了
因此在分布式架构的基础上,传统数据库事务就无法使用了,张三和李四的账户不在一个数据库中甚至不在一个应用系统里,实现转账事务需要通过远程调用,由于网络问题就会导致分布式事务问题。
下边的场景都会产生分布式事务:
微服务架构下:
单服务多数据库:
多服务单数据库:
控制分布式事务首先需要理解CAP理论
什么是CAP理论?
CAP是 Consistency、Availability、Partition tolerance三个词语的缩写,分别表示一致性、可用性、分区容忍性
举例说明
用户发送请求的时候先经过网关,然后再由网关负载均衡请求用户服务结点1和用户服务结点2
一致性:用户不管访问哪一个结点拿到的数据都是最新的
比如查询小明的信息,不能出现在数据没有改变的情况下两次查询结果不一样
可用性:任何时候查询用户信息都可以查询到结果,但不保证查询到最新的数据
说白了可用性就是指不管什么时候访问都能得到数据
假如说一个结点挂掉了也没事,可以请求第二个结点。
分区容忍性:也叫分区容错性,当系统采用分布式架构时由于网络通信异常导致请求中断、消息丢失,但系统依然对外提供服务
不能由于网络故障而导致系统不能使用
分布式系统一定要满足分区容忍性,但是一致性和可用性就不一定有,而且一致性和可用性不能并存
满足P那么C和A不能同时满足
如果要满足C一致性,必须等待小明的信息同步完成系统才可用(否则会出现请求到结点2时查询不到数据,违反了一致性),在信息同步过程中系统是不可用的,所以满足C的同时无法满足A。
如果要满足A可用性,要时刻保证系统可用就不用等待信息同步完成,此时系统的一致性无法满足。
所以在分布式系统中进行分布式事务控制,要么保证CP、要么保证AP
举例说明一下
保证CP
要保证一致性,张三减少100那李四一定增加100
保证AP
保证可用性,张三减少100,那李四不会立即增加100,可能过段时间才会增加100
begin transaction;
//1.本地数据库操作:张三减少金额
//2.远程调用:让李四增加金额
commit transation;
如何控制分布式事务呢?
学习了CAP理论我们知道进行分布式事务控制要在C和A中作出取舍,保证一致性就不要保证可用性,保证可用性就不要保证一致,首先你确认是要CP还是AP,具体要根据应用场景进行判断。
跨行转账:一次转账请求要等待双方银行系统都完成整个事务才算完成,只要其中一个失败另一方执行回滚操作。
开户操作:在业务系统开户同时要在运营商开户,任何一方开户失败该用户都不可使用,所以要满足CP。
订单退款,今日退款成功,明日账户到账,只要用户可以接受在一定时间内到账即可。
注册送积分,注册成功积分在24分到账。
支付短信通信,支付成功发短信,短信发送可以有延迟,甚至没有发送成功。
在实际应用中符合AP的场景较多,其实虽然AP舍弃C一致性,实际上最终数据还是达到了一致,也就满足了最终一致性,所以业界定义了BASE理论
什么是BASE理论?
强调可用性
BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写。
基本可用:当系统无法满足全部可用时保证核心服务可用即可,比如一个外卖系统,每到中午12点左右系统并发量很高,此时要保证下单流程涉及的服务可用,其它服务暂时不可用。
软状态:是指可以存在中间状态,比如:打印自己的社保统计情况,该操作不会立即出现结果,而是提示你打印中,请在XXX时间后查收。虽然出现了中间状态,但最终状态是正确的。
最终一致性:退款操作后没有及时到账,经过一定的时间后账户到账,舍弃强一致性,满足最终一致性。
分布式事务控制有哪些常用的技术方案?
实现CP就是要实现强一致性:
使用Seata框架基于AT模式实现
使用Seata框架基于TCC模式实现。
实现AP则要保证最终数据一致性:
使用消息队列通知的方式去实现,通知失败自动重试,达到最大失败次数需要人工处理;
使用任务调度的方案,启动任务调度将课程信息由数据库同步到elasticsearch、MinIO、redis中。
正常的话流程:课程编辑 - 提交审核 - 审核通过 - 课程发布
课程发布后学习者在网站可以搜索到课程,然后查看课程的详细信息,进一步选课、支付、在线学习
课程编辑与发布的整体流程:
为了课程内容没有违规信息、课程内容安排合理,在课程发布之前运营方会进行课程审核,审核通过后课程方可发布。
课程编辑 - 提交审核 - 审核通过 - 课程发布
作为课程制作方即教学机构,在课程发布前通过课程预览功能可以看到课程发布后的效果,哪里的课程信息存在问题方便查看,及时修改。
下图是课程预览的效果图,也是课程正式发布后的课程详情界面
课程发布模块共包括三块功能:
1、课程预览
2、课程审核
3、课程发布
其中前两个功能已经做过了
如何去快速搜索课程?
打开课程详情页面仍然去查询数据库可行吗?
为了提高网站的速度需要将课程信息进行缓存,并且要将课程信息加入索引库方便搜索,下图显示了课程发布后课程信息的流转情况:
1、向内容管理数据库的课程发布表存储课程发布信息,更新课程基本信息表中发布状态为已发布。
2、向Redis存储课程缓存信息。
3、向Elasticsearch存储课程索引信息。
4、请求分布文件系统存储课程静态化页面(即html页面),实现快速浏览课程详情页面
课程发布表course_publish
之前我们介绍过课程预发布表,当课程审核通过后就会将课程预发布表的数据拷贝到课程发布表
课程预发布表存储的是课程的待审核信息
下面这篇文章中就有对课程预发布表的介绍,客户在提交审核后就会将课程的信息放入到课程预发布表中,当审核通过后就会把课程预发布表中的信息再存储课程发布表中
这个地方可以解释一下state_state字段
一个大任务可以有很多的阶段,比如向Redis、索引库、MinIO
假如说第一次写入Redis完成但是写入索引库与Minio没有完成,那下一次执行的时候只写入索引库和Minio就可以了,不需要写入Redis了
执行课程发布操作后要向数据库、redis、elasticsearch、MinIO写四份数据,这个场景用哪种方案?
满足CP
满足一致性
如果要满足CP就表示课程发布操作后向数据库、redis、elasticsearch、MinIO写四份数据,只要有一份写失败其它的全部回滚。
满足AP?
满足可用性
课程发布操作后,先更新数据库中的课程发布状态,更新后向redis、elasticsearch、MinIO写课程信息,只要在一定时间内最终向redis、elasticsearch、MinIO写数据成功即可。
我们选择满足AP,如下所示:
1、在内容管理服务的数据库中添加一个消息表,消息表和课程发布表在同一个数据库。
2、点击课程发布通过本地事务向课程发布表写入课程发布信息,同时向消息表写课程发布的消息。通过数据库进行控制,只要课程发布表插入成功,消息表也插入成功,消息表的数据就记录了某门课程发布的任务。
此时我们可以通过数据库事务对消息表和课程发布表进行控制,因为在一个数据库里面
也就是说课程发布表中有某个课的信息,那消息表中也会有此课的信息
3、启动任务调度系统定时调度内容管理服务去定时扫描消息表的记录
4、当扫描到课程发布的消息时即开始完成向redis、elasticsearch、MinIO同步数据的操作
5、同步数据的任务完成后删除消息表记录,并将删除的记录添加到消息历史表中
下图是课程发布操作的时序图
1、执行发布操作,内容管理服务存储课程发布表的同时向消息表添加一条“课程发布任务”。这里使用本地事务保证课程发布信息保存成功,同时消息表也保存成功。
2、任务调度服务定时调度内容管理服务扫描消息表,由于课程发布操作后向消息表插入一条课程发布任务,此时扫描到一条任务。
3、拿到任务开始执行任务,分别向redis、elasticsearch及文件系统存储数据。
4、任务完成后删除消息表记录。
我们点击“发布”按钮,就可以发布课程
课程发布表的数据来源于课程预发布表
@ApiOperation("课程发布")
@ResponseBody
@PostMapping("/coursepublish/{courseId}")
public void coursepublish(@PathVariable("courseId") Long courseId) {
Long companyId = 1232141425L;
coursePublishService.publish(companyId, courseId);
}
此时的代码中还没有写消息的模块,等待编写
/**
* 课程发布接口
*
* @param companyId 机构id
* @param courseId 课程id
*/
@Transactional
@Override
public void publish(Long companyId, Long courseId) {
//TODO 1.查询预发布表,课程如果没有审核通过不允许发布,通过了向课程发布表写数据
//1.1 查询预发布表
CoursePublishPre coursePublishPre = coursePublishPreMapper.selectById(courseId);
if (coursePublishPre == null) {
XueChengPlusException.cast("课程无审核记录,无法发布");
}
//预发布表的审核状态(审核通过202004)
String status = coursePublishPre.getStatus();
if (!"202004".equals(status)) {
//课程没有审核通过不允许发布
XueChengPlusException.cast("课程没有审核通过不允许发布");
}
//1.2 向课程发布表写入数据
CoursePublish coursePublish = new CoursePublish();
//课程预发布表和课程发布表ode数据结构是一个样子的
BeanUtils.copyProperties(coursePublishPre, coursePublish);
//首先查询课程发布表,看看此课程之前是是否发布过
CoursePublish coursePublishOld = coursePublishMapper.selectById(courseId);
if (coursePublishOld == null){
coursePublishMapper.insert(coursePublish);
}else {
coursePublishMapper.updateById(coursePublishOld);
}
//TODO 2.向消息表写入数据
//TODO 3.将预发布表的数据删除
}
使用消息包的SDK向消息表写入数据,完成2.5中的代码
课程发布操作执行后需要扫描消息表的记录,有关消息表处理的有哪些?
1、新增消息表
2、扫描消息表。
3、更新消息表。
4、删除消息表。
对消息表的增删改查
因为我们的消息不仅使用在课程发布上,还有会使用在其他的业务场景,所以我们需要把消息模块给抽离出来
所以在本项目中对消息的模块做了一个SDK的处理
解释
是做成通用的服务,还是做成通用的代码组件呢?
通用的服务是完成一个通用的独立功能,并提供独立的网络接口,比如:项目中的文件系统服务,提供文件的分布式存储服务。
代码组件也是完成一个通用的独立功能,通常会提供API的方式供外部系统使用,比如:fastjson、Apache commons工具包等。
如果将消息处理做成一个通用的服务,该服务需要连接多个数据库,因为它要扫描微服务数据库下的消息表,并且要提供与微服务通信的网络接口,单就针对当前需求而言开发成本有点高。
如果将消息处理做一个SDK工具包相比通用服务不仅可以解决将消息处理通用化的需求,还可以降低成本。
sdk需要提供执行任务的逻辑吗?
不会提供。SDK只会提供一些通用的方法然后被调用即可
如何保证任务的幂等性?
可以给任务加一个status字段,如果处理完标识“处理完”,如果正在处理标识“正在处理”,如果未处理标识“未处理”
幂等性:不管任务执行多少次,最终的结果都是一个样子的。说白了就是即是人物出现了重复处理,也要保证任务的处理结果是一个样子的
如何保证任务不重复执行?
采用和视频处理章节一致方案,除了保证任务的幂等性外,任务调度采用分片广播,根据分片参数去获取任务,另外阻塞调度策略为丢弃任务。
注意:这里是信息同步类任务,即使任务重复执行也没有关系,不再使用抢占任务的方式保证任务不重复执行。
给content-service模块的pom文件添加消息依赖
<dependency>
<groupId>com.xuecheng</groupId>
<artifactId>xuecheng-plus-message-sdk</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
引入消息SDK中的实例对象
@Autowired
MqMessageService mqMessageService;
//TODO 2.向消息表写入数据
this.saveCoursePublishMessage(courseId);
//TODO 3.将预发布表的数据删除
coursePublishPreMapper.deleteById(courseId);
/**
* 保存消息表记录
*
* @param courseId 课程id
*/
private void saveCoursePublishMessage(Long courseId) {
//我们可以任务约定一下课程发布的messageType是course_publish
//剩下三个参数是业务信息字段,如果不使用的话填空即可
MqMessage mqMessage = mqMessageService.addMessage("course_publish", courseId.toString(), null, null);
if (mqMessage == null) {
XueChengPlusException.cast(CommonError.UNKOWN_ERROR);
}
}
测试前记得启动Nginx
首先将course_publish_pre表中的status字段值设置为"202004",表示审核通过
对应在course_base表中的audit_status字段的值也改为“202004”,表示审核通过
直接前后端联调,点击“发布”
观察数据库mq_message表数据结果
所以说我们完成了下面标红的这一步