事务(transaction)是传统数据库所具备的一项基本能力,其根本目的是为数据的可靠性与一致性提供保障。而在通常的实现中,事务包含了一个系列的数据库读写操作,这些操作要么全部完成,要么全部撤销。例如,在电子商城场景中,当顾客下单购买某件商品时,除了生成订单,还应该同时扣减商品的库存,这些操作应该被作为一个整体的执行单元进行处理,否则就会产生不一致的情况。
数据库事务需要包含 4 个基本特性,即常说的 ACID,具体如下:
在 MongoDB 中,对单个文档的操作是原子的。由于可以在单个文档结构中使用内嵌文档和数组来获得数据之间的关系,而不必跨多个文档和集合进行范式化,所以这种单文档原子性避免了许多实际场景中对多文档事务的需求。
对于那些需要对多个文档(在单个或多个集合中)进行原子性读写的场景,MongoDB 支持多文档事务。而使用分布式事务,事务可以跨多个操作、集合、数据库、文档和分片使用。
MongoDB 虽然已经在 4.2 开始全面支持了多文档事务,但并不代表大家应该毫无节制地使用它。相反,对事务的使用原则应该是:能不用尽量不用。 通过合理地设计文档模型,可以规避绝大部分使用事务的必要性。
使用事务的原则:
MongoDB 对事务支持:
事务属性 | 支持程度 |
---|---|
Atomocity 原子性 | 单表单文档:1.x 就支持 复制集多表多行:4.0 分片集群多表多行:4.2 |
Consistency 一致性 | writeConcern, readConcern (3.2) |
Isolation 隔离性 | readConcern (3.2) |
Durability 持久性 | Journal and Replication |
使用方法:
try (ClientSession clientSession = client.startSession()) {
clientSession.startTransaction();
collection.insertOne(clientSession, docOne);
collection.insertOne(clientSession, docTwo);
clientSession.commitTransaction();
}
writeConcern 决定一个写操作落到多少个节点上才算成功。MongoDB 支持客户端灵活配置写入策略(writeConcern),以满足不同场景的需求。
语法格式:
{ w: <value>, j: <boolean>, wtimeout: <number> }
{w: 0} 对客户端的写入不需要发送任何确认,适用于性能要求高,但不关注正确性的场景;
{w: 1} 默认的 writeConcern,数据写入到 Primary 就向客户端发送确认;
{w: “majority”} 数据写入到副本集大多数成员后向客户端发送确认,适用于对数据安全性要求比较高的场景,该选项会降低写入性能;
默认为{j: false}
,如果要求 Primary 写入持久化了才向客户端确认,则指定该选项为 true;
当指定{w: }
时,数据需要成功写入 number 个节点才算成功,如果写入过程中有节点故障,可能导致这个条件一直不能满足,从而一直不能向客户端发送确认结果,针对这种情况,客户端可设置 wtimeout 选项来指定超时时间,当写入过程持续超过该时间仍未结束,则认为写入失败;
测试:
包含延迟节点的 3 节点 pss 复制集。
db.user.insertOne({name:"李四"},{writeConcern:{w:"majority"}})
# 配置延迟节点
cfg = rs.conf()
cfg.members[2].priority = 0
cfg.members[2].hidden = true
cfg.members[2].secondaryDelaySecs = 60
rs.reconfig(cfg)
# 等待延迟节点写入数据后才会响应
db.user.insertOne({name:"王五"},{writeConcern:{w:3}})
# 超时写入失败
db.user.insertOne({name:"小明"},{writeConcern:{w:3,wtimeout:3000}})
注意事项:
{w: “majority”}
,普通数据可以应用 {w: 1}
以确保最佳性能;在读取数据的过程中我们需要关注以下两个问题:
第一个问题是是由 readPreference 来解决,第二个问题则是由 readConcern 来解决。
合理的 ReadPreference 可以极大地扩展复制集的读性能,降低访问延迟。
readPreference 场景举例:
primary/primaryPreferred
。因为此时从节点可能还没复制到新订单;secondary/secondaryPreferred
。查询历史订单对时效性通常没有太高要求;secondary
。报表对时效性要求不高,但资源需求大,可以在从节点单独处理,避免对线上用户造成影响;nearest
。每个地区的应用选择最近的节点读取数据;readPreference 配置:
通过 MongoDB 的连接串参数:
mongodb://host1:27107,host2:27107,host3:27017/?replicaSet=rs0&readPreference=secondary
通过 MongoDB 驱动程序 API:
MongoCollection.withReadPreference(ReadPreference readPref)
Mongo Shell:
db.collection.find().readPref( "secondary" )
从节点读测试:
{count:1}
,观察该条数据在各个节点均可见# mongosh --host rs0/localhost:28017
rs0:PRIMARY> db.user.insert({count:3},{writeConcern:{w:1}})
在 primary 节点中调用 readPref(“secondary”) 查询从节点用直连方式(mongosh localhost:28017)会查到数据,需要通过
mongosh --host rs0/localhost:28017
方式连接复制集。
参考: https://jira.mongodb.org/browse/SERVER-22289
db.fsyncLock()
来锁定写入(同步)# mongosh localhost:28018
rs0:SECONDARY> rs.secondaryOk()
rs0:SECONDARY> db.fsyncLock()
{count:2}
rs0:PRIMARY> db.user.insert({count:2},{writeConcern:{w:1}})
rs0:PRIMARY> db.user.find()
rs0:PRIMARY> db.user.find().readPref("secondary")
rs0:SECONDARY> db.user.find()
db.fsyncUnlock()
rs0:SECONDARY> db.fsyncUnlock()
rs0:PRIMARY> db.user.find().readPref("secondary")
扩展:Tag
readPreference 只能控制使用一类节点。Tag 则可以将节点选择控制到一个或几个节点。考虑以下场景:
可以使用 Tag 来达到这样的控制目的。
# 为复制集节点添加标签
2 conf = rs.conf()
3 conf.members[1].tags = { purpose: "online"}
4 conf.members[4].tags = { purpose: "analyse"}
5 rs.reconfig(conf)
6
7 # 查询
8 db.collection.find({}).readPref( "secondary", [ {purpose: "online"} ] )
注意事项:
如果报表使用的节点失效,即使不生成报表,通常也不希望将报表负载转移到其他节点上,此时只有一个节点有报表 Tag 是合理的选择;
如果线上节点失效,通常希望有替代节点,所以应该保持多个节点有同样的 Tag;
在 readPreference 选择了指定的节点后,readConcern 决定这个节点上的数据哪些是可读的,类似于关系数据库的隔离级别。可选值包括:
readConcern: local 和 available
在复制集中 local 和 available 是没有区别的,两者的区别主要体现在分片集上。
考虑以下场景:
所有对 chunk x 的读写操作仍然进入 shard1;
config 中记录的信息 chunk x 仍然属于 shard1;
local:只取应该由 shard2 负责的数据(不包括 x);
available:shard2 上有什么就读什么(包括 x);
注意事项:
readConcern: majority
只读取大多数据节点上都提交了的数据。考虑如下场景:
如果在各节点上应用 {readConcern: “majority”} 来读取数据:
考虑 t3 时刻的 Secondary1,此时:
如何实现?
节点上维护多个 x 版本(MVCC 机制),MongoDB 通过维护多个快照来链接不同的版本:
测试 readConcern: majority vs local
rs0:PRIMARY> db.user.insert({count:10},{writeConcern:{w:1}})
rs0:PRIMARY> db.user.find().readConcern("local")
rs0:PRIMARY> db.user.find().readConcern("majority")
主节点测试结果:
在某一个从节点上执行 db.fsyncUnlock(),从节点测试结果:
结论:
readConcern: majority 与脏读
MongoDB 中的回滚:
所以从分布式系统的角度来看,事务的提交被提升到了分布式集群的多个节点级别的“提交”,而不再是单个节点上的“提交”。
在可能发生回滚的前提下考虑脏读问题:
使用 {readConcern: “majority”} 可以有效避免脏读。
如何安全的读写分离
考虑如下场景:
思考:如何保证自己能够读到刚刚写入的数据?
下述方式有可能读不到刚写入的订单:
db.orders.insert({oid:101,sku:"kite",q:1})
db.orders.find({oid:101}).readPref("secondary")
使用 writeConcern+readConcern majority 来解决:
db.orders.insert({oid:101,sku:"kite",q:1},{writeConcern:{w:"majority"}})
db.orders.find({oid:101}).readPref("secondary").readConcern("majority")
readConcern: linearizable
只读取大多数节点确认过的数据。和 majority 最大差别是保证绝对的操作线性顺序
readConcern: snapshot
{readConcern: “snapshot”} 只在多文档事务中生效。将一个事务的 readConcern 设置为 snapshot,将保证在事务中的读:
因为所有的读都将使用同一个快照,直到事务提交为止该快照才被释放。
小结
db.tx.insertMany([{ x: 1 }, { x: 2 }])
var session = db.getMongo().startSession()
# 开启事务
session.startTransaction()
var coll = session.getDatabase("test").getCollection("tx")
# 事务内修改 {x:1, y:1}
coll.updateOne({x: 1}, {$set: {y: 1}})
# 事务内查询 {x:1}
coll.findOne({x: 1}) //{x:1, y:1}
# 事务外查询 {x:1}
db.tx.findOne({x: 1}) //{x:1}
# 提交事务
session.commitTransaction()
# 或者回滚事务
session.abortTransaction()
var session = db.getMongo().startSession()
session.startTransaction({ readConcern: {level: "snapshot"}, writeConcern: {w:
"majority"}})
var coll = session.getDatabase('test').getCollection("tx")
coll.findOne({x: 1})
db.tx.updateOne({x: 1}, {$set: {y: 2}})
db.tx.findOne({x: 1})
coll.findOne({x: 1}) # 事务外查询
session.abortTransaction()
在执行事务的过程中,如果操作太多,或者存在一些长时间的等待,则可能会产生如下异常:
原因在于,默认情况下 MongoDB 会为每个事务设置 1 分钟的超时时间,如果在该时间内没有提交,就会强制将其终止。该超时时间可以通过 transactionLifetimeLimitSecond 变量设定。
MongoDB 的事务错误处理机制不同于关系数据库:
开 3 个 mongo shell 均执行下述语句:
var session = db.getMongo().startSession()
session.startTransaction({ readConcern: {level: "majority"}, writeConcern: {w: "majority"}})
var coll = session.getDatabase('test').getCollection("tx")
窗口 1:正常结束
coll.updateOne({x: 1}, {$set: {y: 1}})
窗口 2:异常 – 解决方案:重启事务
coll.updateOne({x: 1}, {$set: {y: 2}})
窗口 3:事务外更新,需等待
db.tx.updateOne({x: 1}, {$set: {y: 3}})
/**
* 事务操作API
* https://docs.mongodb.com/upcoming/core/transactions/
*/
@Test
public void updateEmployeeInfo() {
// 连接复制集
MongoClient client = MongoClients.create("mongodb://firechou:firechou@192.168.65.174:28017,192.168.65.174:28018,192.168.65.174:28019/test?authSource=admin&replicaSet=rs0");
MongoCollection<Document> emp = client.getDatabase("test").getCollection("emp");
MongoCollection<Document> events = client.getDatabase("test").getCollection("events");
// 事务操作配置
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.build();
try (ClientSession clientSession = client.startSession()) {
// 开启事务
clientSession.startTransaction(txnOptions);
try {
emp.updateOne(clientSession,
Filters.eq("username", "张三"),
Updates.set("status", "inactive"));
int i=1/0;
events.insertOne(clientSession,
new Document("username", "张三").append("status", new Document("new", "inactive").append("old", "Active")));
// 提交事务
clientSession.commitTransaction();
}catch (Exception e){
e.printStackTrace();
// 回滚事务
clientSession.abortTransaction();
}
}
}
配置事务管理器:
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory factory){
// 事务操作配置
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.build();
return new MongoTransactionManager(factory);
}
编程测试 service:
@Service
public class EmployeeService {
@Autowired
MongoTemplate mongoTemplate;
@Transactional
public void addEmployee(){
Employee employee = new Employee(100,"张三", 21, 15000.00, new Date());
Employee employee2 = new Employee(101,"赵六", 28, 10000.00, new Date());
mongoTemplate.save(employee);
// int i=1/0;
mongoTemplate.save(employee2);
}
}
测试:
@Autowired
EmployeeService employeeService;
@Test
public void test(){
employeeService.addEmployee();
}