分布式锁是一种用于协调多个进程或线程之间访问共享资源的机制,它可以避免多个进程或线程同时对共享资源进行修改而导致的数据不一致问题。在分布式系统中,由于数据的分散存储在不同的节点上,因此需要一种可靠的分布式锁机制。
分布式锁通常需要满足以下几个条件:
互斥性
:在任何时刻,只能有一个进程或线程获得锁。安全性
:一旦一个进程或线程获得锁,其他进程或线程无法修改该锁的状态,只有锁的持有者可以释放锁。高可用性
:分布式锁应该具有高可用性,即当某个节点或进程故障时,其他节点或进程可以接管该锁。性能
:分布式锁应该具有高性能,即在高并发的情况下,锁的获取和释放应该尽量快速例如一个秒杀活动中,商品1
、2
、3
的库存都是100件,同时有2人参与秒杀(贱笑了*-*);假设有2个进程/线程/协程同一时刻对秒杀库存进行读写,各自将库存数目按照订单减库存,那么库存的中商品的数目最终会是多少呢?
库存表
以下所有测试用例,均将数据还原至以上初始数据
mysql
redis
python3及peewee库
#! -*-conding=: UTF-8 -*-
# 2023/8/10 19:04
import random
import time
from datetime import datetime
import threading
from peewee import *
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
pass
db = ReconnectMySQLDatabase("inventory", host="192.168.91.1", port=3306, user="root", password="root")
# 删除 - 物理删除和逻辑删除 - 物理删除 -假设你把某个用户数据 - 用户购买记录,用户的收藏记录,用户浏览记录啊
# 通过save方法做了修改如何确保只修改update_time值而不是修改add_time
class BaseModel(Model):
add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
is_deleted = BooleanField(default=False, verbose_name="是否删除")
update_time = DateTimeField(verbose_name="更新时间", default=datetime.now)
def save(self, *args, **kwargs):
# 判断这是一个新添加的数据还是更新的数据
if self._pk is not None:
# 这是一个新数据
self.update_time = datetime.now()
return super().save(*args, **kwargs)
@classmethod
def delete(cls, permanently=False): # permanently表示是否永久删除
if permanently:
return super().delete()
else:
return super().update(is_deleted=True)
def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
if permanently:
return self.delete(permanently).where(self._pk_expr()).execute()
else:
self.is_deleted = True
self.save()
@classmethod
def select(cls, *fields):
return super().select(*fields).where(cls.is_deleted == False)
class Meta:
database = db
class Inventory(BaseModel):
# 商品的库存表
# stock = PrimaryKeyField(Stock)
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0) # 用于分布式锁的乐观锁
def sell():
# 多线程下的并发带来的数据不一致的问题
goods_list = [(1, 10), (2, 20), (3, 30)]
with db.atomic() as txn:
# 超卖
for goods_id, num in goods_list:
# 查询库存
goods_inv = Inventory.get(Inventory.goods == goods_id)
print(f"商品{goods_id} 售出 {num}件")
time.sleep(random.randint(1, 3)) # 增加并发问题的拟态实现
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
break
else:
goods_inv.stocks -= num
goods_inv.save()
def create_data():
db.create_tables([Inventory])
for i in range(5):
goods_inv = Inventory(goods=i, stocks=100)
goods_inv.save()
if __name__ == "__main__":
# create_data()
t1 = threading.Thread(target=sell)
t2 = threading.Thread(target=sell)
t1.start()
t2.start()
t1.join()
t2.join()
输出结果为:
商品1 售出 10件
商品1 售出 10件
商品2 售出 20件
商品3 售出 30件
商品2 售出 20件
商品3 售出 30件
PooledMySQLDatabase
对象连接MySQL数据库,具体配置为连接到IP地址为 192.168.91.1
的MySQL服务器,使用用户名 root
和密码 root
连接到数据库 inventory
。BaseModel
,其中包含了添加时间、更新时间、是否删除等字段的定义。此模型有一些方法,如保存(save
)、删除(delete
)和查询(select
)等。BaseModel
定义了一个库存模型 Inventory
,其中包含商品id、库存数量、版本号等字段的定义。__name__ == "__main__"
的分支中运行。首先通过 create_data()
函数创建初始库存数据,然后使用两个线程并发运行 sell()
函数模拟售卖商品。但由于多线程并发问题,可能会导致库存不足和超卖等问题。问题:
应该在更新的时候根据当前的数据更新。
修改代码如下(修改了更新数据的逻辑
):
#! -*-conding=: UTF-8 -*-
# 2023/8/10 19:04
import random
import time
from datetime import datetime
import threading
from peewee import *
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
pass
db = ReconnectMySQLDatabase("inventory", host="192.168.91.1", port=3306, user="root", password="root")
# 删除 - 物理删除和逻辑删除 - 物理删除 -假设你把某个用户数据 - 用户购买记录,用户的收藏记录,用户浏览记录啊
# 通过save方法做了修改如何确保只修改update_time值而不是修改add_time
class BaseModel(Model):
add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
is_deleted = BooleanField(default=False, verbose_name="是否删除")
update_time = DateTimeField(verbose_name="更新时间", default=datetime.now)
def save(self, *args, **kwargs):
# 判断这是一个新添加的数据还是更新的数据
if self._pk is not None:
# 这是一个新数据
self.update_time = datetime.now()
return super().save(*args, **kwargs)
@classmethod
def delete(cls, permanently=False): # permanently表示是否永久删除
if permanently:
return super().delete()
else:
return super().update(is_deleted=True)
def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
if permanently:
return self.delete(permanently).where(self._pk_expr()).execute()
else:
self.is_deleted = True
self.save()
@classmethod
def select(cls, *fields):
return super().select(*fields).where(cls.is_deleted == False)
class Meta:
database = db
class Inventory(BaseModel):
# 商品的库存表
# stock = PrimaryKeyField(Stock)
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0) # 用于分布式锁的乐观锁
def sell():
# 多线程下的并发带来的数据不一致的问题
goods_list = [(1, 99), (2, 20), (3, 30)]
with db.atomic() as txn:
# 超卖
for goods_id, num in goods_list:
# 查询库存
goods_inv = Inventory.get(Inventory.goods == goods_id)
time.sleep(random.randint(1, 3))
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
break
else:
# 让数据库根据自己当前的值更新数据
query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
ok = query.execute()
print(f"商品{goods_id} 售出 {num}件")
if ok:
print("更新成功")
else:
print("更新失败")
def create_data():
db.create_tables([Inventory])
for i in range(5):
goods_inv = Inventory(goods=i, stocks=100)
goods_inv.save()
if __name__ == "__main__":
# create_data()
t1 = threading.Thread(target=sell)
t2 = threading.Thread(target=sell)
t1.start()
t2.start()
t1.join()
t2.join()
输出结果为:
商品1 售出 99件
商品1 售出 99件
更新成功
商品2 售出 20件
更新成功
商品3 售出 30件
更新成功
更新成功
商品2 售出 20件
更新成功
商品3 售出 30件
更新成功
咦,商品售出99件后为啥还能售出第二次99件?还是出现超卖现象了!!读→更新
这里不是原子的。
数据库里的数据页证明了超卖了:
这还是不能处理并发问题。
from datetime import datetime
import threading
import time
from random import randint
from peewee import *
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
pass
db = ReconnectMySQLDatabase("inventory", host="192.168.91.1", port=3306, user="root", password="root")
# 删除 - 物理删除和逻辑删除 - 物理删除 -假设你把某个用户数据 - 用户购买记录,用户的收藏记录,用户浏览记录啊
# 通过save方法做了修改如何确保只修改update_time值而不是修改add_time
class BaseModel(Model):
add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
is_deleted = BooleanField(default=False, verbose_name="是否删除")
update_time = DateTimeField(verbose_name="更新时间", default=datetime.now)
def save(self, *args, **kwargs):
# 判断这是一个新添加的数据还是更新的数据
if self._pk is not None:
# 这是一个新数据
self.update_time = datetime.now()
return super().save(*args, **kwargs)
@classmethod
def delete(cls, permanently=False): # permanently表示是否永久删除
if permanently:
return super().delete()
else:
return super().update(is_deleted=True)
def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
if permanently:
return self.delete(permanently).where(self._pk_expr()).execute()
else:
self.is_deleted = True
self.save()
@classmethod
def select(cls, *fields):
return super().select(*fields).where(cls.is_deleted == False)
class Meta:
database = db
class Inventory(BaseModel):
# 商品的库存表
# stock = PrimaryKeyField(Stock)
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0) # 分布式锁的乐观锁
R = threading.Lock()
def sell():
# 多线程下的并发带来的数据不一致的问题
goods_list = [(1, 10), (2, 20), (3, 99)]
with db.atomic() as txn:
# 超卖
for goods_id, num in goods_list:
# 查询库存
with R:
goods_inv = Inventory.get(Inventory.goods == goods_id)
time.sleep(randint(1, 3))
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
break
else:
# 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
ok = query.execute()
print(f"商品{goods_id} 售出 {num}件")
if ok:
print("更新成功")
else:
print("更新失败")
def create_data():
db.create_tables([Inventory])
for i in range(5):
goods_inv = Inventory(goods=i, stocks=100)
goods_inv.save()
if __name__ == "__main__":
# create_data()
t1 = threading.Thread(target=sell)
t2 = threading.Thread(target=sell)
t1.start()
t2.start()
t1.join()
t2.join()
输出结果为:
商品1 售出 10件
更新成功
商品2 售出 20件
更新成功
商品3 售出 99件
更新成功
商品1 售出 10件
更新成功
商品2 售出 20件
更新成功
商品:3 库存不足
单体服务中这样实现是可以的,但是在
微服务中,普通的锁机制失效。
悲观锁适用于对并发要求不高但需要确保操作的一致性的场景
悲观锁概念:顾名思义,就是对于数据的处理持悲观态度,总认为会发生并发冲突,获取和修改数据时,别人会修改数据;所以在整个数据处理过程中,需要将数据锁定
悲观锁的实现:通常依靠数据库提供的锁机制实现,比如mysql的排他锁,select … for update
来实现悲观锁;例如,商品秒杀过程中,库存数量的减少,避免出现超卖的情况
mysql请求一把锁for update
使用for update
的时候要注意:每个语句mysql都是默认提交的
需要关闭autocommit:set autocommit=0
;(注意这个只针对当前窗口有效,不是全局的);(查询select @@autocommit;
)
具体执行逻辑:select * from inventary where goods=1 for update;
释放锁:commit;
其实是行锁,只会锁住满足条件的数据,where goods=1
和where goods=2
这2个是不会触发锁的
如果条件部分没有索引goods,那么行锁会升级成表锁
锁只是锁住要更新的语句for update
,普通的查询不会锁住
如果没有满足条件,不会锁表
使用悲观锁来实现防止超卖的效果,可以使用数据库的行级锁来保证在读取库存时进行锁定,从而避免并发问题。以下是使用悲观锁来实现的示例代码:
#! -*-conding=: UTF-8 -*-
# 2023/8/11 15:07
from datetime import datetime
import threading
from peewee import *
# 定义数据库连接
db = MySQLDatabase("inventory", host="192.168.91.1", port=3306, user="root", password="root")
# 定义基础模型
class BaseModel(Model):
class Meta:
database = db
# 定义库存表
class Inventory(BaseModel):
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0)
# 初始化数据库连接
db.connect()
# 创建库存表
db.create_tables([Inventory], safe=True)
# 出售商品函数,使用悲观锁
def sell(order_list):
with db.atomic() as txn:
for goods_id, num in order_list:
try:
# 查询库存并锁定记录
goods_inv = Inventory.select().where(Inventory.goods == goods_id).for_update().get()
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
return False
# 更新库存和版本号
goods_inv.stocks -= num
goods_inv.version += 1
goods_inv.save()
print(f"商品{goods_id} 售出 {num}件")
except Inventory.DoesNotExist:
print(f"商品{goods_id} 不存在")
txn.rollback()
return False
print(f"订单成功,商品列表:{order_list}")
txn.commit()
return True
# 创建库存初始数据
def create_data():
with db.atomic():
for i in range(5):
Inventory.create(goods=i, stocks=100)
# 主程序
if __name__ == "__main__":
# create_data()
# 模拟一批订单并发出售商品
order_list = [(1, 10), (2, 20), (3, 30)]
current_stock = {}
for goods_id, _ in order_list:
stock = Inventory.get(Inventory.goods == goods_id).stocks
current_stock[goods_id] = stock
print(f"当前库存:{current_stock}")
num_threads = 2 # 可根据需要调整线程数
threads = []
for _ in range(num_threads):
t = threading.Thread(target=sell, args=(order_list,))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
# 查询库存
final_stock = {}
for goods_id, _ in order_list:
stock = Inventory.get(Inventory.goods == goods_id).stocks
final_stock[goods_id] = stock
print(f"最终库存:{final_stock}")
输出结果为:
当前库存:{1: 100, 2: 100, 3: 100}
商品1 售出 10件
商品2 售出 20件
商品3 售出 30件
订单成功,商品列表:[(1, 10), (2, 20), (3, 30)]
商品1 售出 10件
商品2 售出 20件
商品3 售出 30件
订单成功,商品列表:[(1, 10), (2, 20), (3, 30)]
最终库存:{1: 80, 2: 60, 3: 40}
在这个示例代码中,我们使用了for_update()
方法来锁定库存记录,确保在查询库存时不会被其他线程修改。在处理订单时,我们检查库存并更新库存和版本号。使用悲观锁的方式可以保证在处理订单过程中库存的一致性,避免超卖。
乐观锁适用于需要更高并发性能但可能需要做更多的
冲突检测
和重试
。
import threading
import time
from peewee import *
# 定义数据库连接
db = MySQLDatabase("inventory", host="192.168.91.1", port=3306, user="root", password="root")
# 定义基础模型
class BaseModel(Model):
class Meta:
database = db
# 定义库存表
class Inventory(BaseModel):
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0)
# 初始化数据库连接
db.connect()
# 创建库存表
db.create_tables([Inventory], safe=True)
# 出售商品函数,使用乐观锁
def sell2(goods_list, i):
# 演示基于数据库的乐观锁机制
max_retry = 3
retry = 0
stock = True
while retry < max_retry:
all_updated = True
with db.atomic() as txn:
for goods_id, num in goods_list:
try:
goods_inv = Inventory.select().where(Inventory.goods == goods_id).get()
print(f"线程: {i} : 商品{goods_id} 当前库存:{goods_inv.stocks}, 版本: {goods_inv.version}")
if goods_inv.stocks < num:
print(f"商品{goods_id} 库存不足")
all_updated = False
stock = False
break
time.sleep(1) # 模拟多个线程都读到数据,但是未更新时的状态:竞态
query = Inventory.update(stocks=Inventory.stocks - num, version=Inventory.version + 1).where(
Inventory.goods == goods_id, Inventory.version == goods_inv.version)
rows_updated = query.execute()
if rows_updated != 1:
print(f"线程{i} : 商品{goods_id} 更新失败,可能被其他线程修改,正在重试...")
all_updated = False
break
except Inventory.DoesNotExist:
print(f"商品{goods_id} 不存在")
all_updated = False
break
if all_updated:
print(f"线程{i} : 所有商品更新成功")
txn.commit()
break
else:
retry += 1
txn.rollback()
if not stock:
break
# 创建库存初始数据
def create_data():
with db.atomic():
for i in range(5):
Inventory.create(goods=i, stocks=100)
# 主程序
if __name__ == "__main__":
# create_data()
# 模拟一批订单并发出售商品
order_list = [(1, 50), (2, 20), (3, 30)]
current_stock = {}
for goods_id, _ in order_list:
stock = Inventory.get(Inventory.goods == goods_id).stocks
current_stock[goods_id] = stock
print(f"当前库存:{current_stock}")
num_threads = 2 # 可根据需要调整线程数
threads = []
for i in range(num_threads):
t = threading.Thread(target=sell2, args=(order_list, i))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
# 查询库存
final_stock = {}
for goods_id, _ in order_list:
stock = Inventory.get(Inventory.goods == goods_id).stocks
final_stock[goods_id] = stock
print(f"最终库存:{final_stock}")
输出结果为:
当前库存:{1: 100, 2: 100, 3: 100}
线程: 0 : 商品1 当前库存:100, 版本: 14
线程: 1 : 商品1 当前库存:100, 版本: 14
线程: 0 : 商品2 当前库存:100, 版本: 14
线程: 0 : 商品3 当前库存:100, 版本: 14
线程0 : 所有商品更新成功
线程1 : 商品1 更新失败,可能被其他线程修改,正在重试...
线程: 1 : 商品1 当前库存:50, 版本: 15
线程: 1 : 商品2 当前库存:80, 版本: 15
线程: 1 : 商品3 当前库存:70, 版本: 15
线程1 : 所有商品更新成功
最终库存:{1: 0, 2: 60, 3: 40}
在这个示例代码中,sell
函数来接受一个订单列表order_list
,每个订单包含商品ID和数量。在处理订单时,我们先检查所有商品的库存是否充足,如果充足则同时更新所有商品的库存和版本号,如果库存不足则回滚。这样可以保证订单中的所有商品在处理过程中要么全部更新成功,要么全部回滚。这种方式可以保证同一个订单的商品购买数量的一致性,并且避免超卖。
需要注意的是,由于多线程并发执行,数据库连接和操作的线程安全性需要保证。此外,乐观锁的方式在并发高的情况下可能会导致较多的重试,因此需要合理设计并发情况下的策略,确保库存更新的正确性。
在实际应用中,还需要考虑异常处理、数据库连接池的管理、线程数的设置、数据库索引的优化等因素,以确保系统的性能和稳定性。
Redis 分布式锁中需要用到的命令:
SET key value [EX seconds | PX milliseconds]
:设置带过期时间的key-value
。EXPIRE key seconds
:给指定的key
设置过期时间。GET key
:获取给定的key
的value。
DEL key
:删除给定的key。
SETNX key value
:如果key
不存在,则设置key-value
,反正设置失败。这里用SET
和GET
一起使用代替。使用Redis实现分布式锁来防止超卖需要借助Redis的原子性操作。我们可以使用Redis的SETNX
命令(或者Redis的分布式锁实现方式,如RedLock
等)来实现一个分布式锁,确保在某个时刻只有一个线程能够进行库存的检查和更新。下面是一个使用Redis分布式锁来防止超卖的示例代码,前提是你需要在Python环境中安装redis-py
库。
在Redis
中,一个相同的key
代表一把锁。是否拥有这把锁,需要判断key
和value
是否是自己设置的,同时还要判断锁是否已经过期。
以下是某个实例加锁的步骤:
GET
命令获取key
,如果获取不到key
,说明还没有加锁;SET
命令设置key
,同时设置锁的过期时间,加锁成功。返回;key
,并且value
是自己设置的,证明该实例已经加锁成功,此时需要使用EXPIRE
命令为锁添加过期时间,因为这次可能是重试,前一次已经加锁成功。返回;key
,但是value
不属于自己设置的,证明已经被其他实例抢到了锁,加锁失败。以上 1~4 步骤需要原子性操作,可以通过lua
脚本进行封装。
key
,使用DEL
命令进行删除。删除key
前,需要判断key
对应的value
是否为自己设置的value
,如果不是,证明锁已经被其他实例获取。判断和删除都也需要是原子操作。key
)设置了过期,如果锁没有被续期(增加过期时间),就会被 Redis 删除。需要注意的是,使用 Redis 实现分布式锁需要考虑一些问题,例如Redis
实例的可用性、网络延迟、锁的持有者异常退出等,需要进行合理的设计和实现。另外,为了保证锁的正确性和可靠性,可以采用一些常用的技术手段,例如设置合适的超时时间、使用RedLock 算法
、采用Lua
脚本等
# ! -*-conding=: UTF-8 -*-
# 2023/8/11 19:47
import redis
import time
import threading
from random import randint
from datetime import datetime
from peewee import *
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
pass
host = "192.168.91.1"
db = ReconnectMySQLDatabase("inventory", host=host, port=3306, user="root", password="root")
# 这个BaseModel可以忽略 只不过是重写了peewee中的cudr的一些操作 继承就行了
class BaseModel(Model):
add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
is_deleted = BooleanField(default=False, verbose_name="是否删除")
update_time = DateTimeField(verbose_name="更新时间", default=datetime.now)
def save(self, *args, **kwargs):
# 判断这是一个新添加的数据还是更新的数据
if self._pk is not None:
# 这是一个新数据
self.update_time = datetime.now()
return super().save(*args, **kwargs)
@classmethod
def delete(cls, permanently=False): # permanently表示是否永久删除
if permanently:
return super().delete()
else:
return super().update(is_deleted=True)
def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
if permanently:
return self.delete(permanently).where(self._pk_expr()).execute()
else:
self.is_deleted = True
self.save()
@classmethod
def select(cls, *fields):
return super().select(*fields).where(cls.is_deleted == False)
class Meta:
database = db
class Inventory(BaseModel):
# 商品的库存表
# stock = PrimaryKeyField(Stock)
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0) # 分布式锁的乐观锁 这里没用到
# 写一个redis分布式锁
class Lock:
# 初始化
def __init__(self, name):
self.redis_client = redis.Redis(host=host, port=6379)
self.name = name
# 上锁
def acquire(self):
if not self.redis_client.get(self.name):
self.redis_client.set(self.name, 1)
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.get(self.name):
return True
# 释放锁
def release(self):
self.redis_client.delete(self.name)
def sell2():
# 多线程下的并发带来的数据不一致的问题
# 顾客(goods_list)商品id为1的买10件以此类推
goods_list = [(1, 10), (2, 20), (3, 30)]
# 事务
with db.atomic() as txn:
# 超卖
for goods_id, num in goods_list:
# 获取锁
lock = Lock(f"lock:goods_{goods_id}")
# 上锁
lock.acquire()
# 查询库存
goods_inv = Inventory.get(Inventory.goods == goods_id)
print(f"商品{goods_id} 售出 {num}件")
time.sleep(randint(1, 3))
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
lock.release() # 释放锁
break
else:
# mysql中有修改的情况下另一个修改将无法进行 这是mysql的原子性
query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
lock.release() # 释放锁
if __name__ == '__main__':
# 开两个线程
t1 = threading.Thread(target=sell2)
t2 = threading.Thread(target=sell2)
t1.start()
t2.start()
t1.join()
t2.join()
输出结果为:
商品1 售出 10件
商品1 售出 10件
更新成功
商品2 售出 20件
更新成功
商品3 售出 30件
更新成功
更新成功
商品2 售出 20件
更新成功
商品3 售出 30件
更新成功
当并发非常高的时候还是会出现超卖的情况,问题出在以下代码中,
这里的get和set不是原子性的
if self.redis_client.get(self.name):
self.redis_client.set(self.name, 1)
return True
setnx版
#! -*-conding=: UTF-8 -*-
# 2023/8/11 19:47
import redis
import time
import threading
from random import randint
from datetime import datetime
from peewee import *
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
pass
host = "192.168.91.1"
db = ReconnectMySQLDatabase("inventory", host=host, port=3306, user="root", password="root")
# 这个BaseModel可以忽略 只不过是重写了peewee中的cudr的一些操作 继承就行了
class BaseModel(Model):
add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
is_deleted = BooleanField(default=False, verbose_name="是否删除")
update_time = DateTimeField(verbose_name="更新时间", default=datetime.now)
def save(self, *args, **kwargs):
# 判断这是一个新添加的数据还是更新的数据
if self._pk is not None:
# 这是一个新数据
self.update_time = datetime.now()
return super().save(*args, **kwargs)
@classmethod
def delete(cls, permanently=False): # permanently表示是否永久删除
if permanently:
return super().delete()
else:
return super().update(is_deleted=True)
def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
if permanently:
return self.delete(permanently).where(self._pk_expr()).execute()
else:
self.is_deleted = True
self.save()
@classmethod
def select(cls, *fields):
return super().select(*fields).where(cls.is_deleted == False)
class Meta:
database = db
class Inventory(BaseModel):
# 商品的库存表
# stock = PrimaryKeyField(Stock)
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0) # 分布式锁的乐观锁 这里没用到
# 写一个redis分布式锁
class Lock:
# 初始化
def __init__(self, name):
self.redis_client = redis.Redis(host=host, port=6379)
self.name = name
# 上锁
def acquire(self):
if self.redis_client.setnx(self.name, 1): # 如果不存在设置并且返回1,否则返回0,这是原子操作
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.setnx(self.name, 1):
return True
# 释放锁
def release(self):
self.redis_client.delete(self.name)
def sell2():
# 多线程下的并发带来的数据不一致的问题
# 顾客(goods_list)商品id为1的买10件以此类推
goods_list = [(1, 10), (2, 20), (3, 30)]
# 事务
with db.atomic() as txn:
# 超卖
for goods_id, num in goods_list:
# 获取锁
lock = Lock(f"lock:goods_{goods_id}")
# 上锁
lock.acquire()
# 查询库存
goods_inv = Inventory.get(Inventory.goods == goods_id)
print(f"商品{goods_id} 售出 {num}件")
time.sleep(randint(1, 3))
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
lock.release() # 释放锁
break
else:
# 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
# mysql中有修改的情况下另一个修改将无法进行 这是mysql的原子性
query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
lock.release() # 释放锁
if __name__ == '__main__':
# 开两个线程
t1 = threading.Thread(target=sell2)
t2 = threading.Thread(target=sell2)
t1.start()
t2.start()
t1.join()
t2.join()
输出结果为:
商品1 售出 10件
更新成功
商品2 售出 20件
商品1 售出 10件
更新成功
商品3 售出 30件
更新成功
更新成功
商品2 售出 20件
更新成功
商品3 售出 30件
更新成功
分布式锁需要解决的问题:
互斥性
:任意时刻只能有一个客户端拥有锁,不能同时多个客户端获取
安全性
:锁只能被持有该锁的用户删除,而不能被其他用户删除
死锁
:获取锁的客户端因为某些原因而宕机,而未能释放锁,其他客户端无法获取此锁,需要有机制来避免该类问题的发生
容错
:当部分节点宕机,客户端仍能获取锁或者释放锁
如何解决上述问题的发生 - 设置过期时间
过期设置会产生新的问题:
不安全
另一个线程进来以后会将当前的key给删除掉,另一个线程删除掉了本该属于我设置的值
如果当前的线程没有执行完,那我的这个线程还应该在适当的时候去续租,将过期时间重新设置
set版本
增加过期时间及值设置为随机字符串(只能删除自己设置的锁)
#! -*-conding=: UTF-8 -*-
# 2023/8/11 19:47
import redis
import time
import threading
from random import randint
from datetime import datetime
from peewee import *
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
pass
host = "192.168.91.1"
db = ReconnectMySQLDatabase("inventory", host=host, port=3306, user="root", password="root")
# 这个BaseModel可以忽略 只不过是重写了peewee中的cudr的一些操作 继承就行了
class BaseModel(Model):
add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
is_deleted = BooleanField(default=False, verbose_name="是否删除")
update_time = DateTimeField(verbose_name="更新时间", default=datetime.now)
def save(self, *args, **kwargs):
# 判断这是一个新添加的数据还是更新的数据
if self._pk is not None:
# 这是一个新数据
self.update_time = datetime.now()
return super().save(*args, **kwargs)
@classmethod
def delete(cls, permanently=False): # permanently表示是否永久删除
if permanently:
return super().delete()
else:
return super().update(is_deleted=True)
def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
if permanently:
return self.delete(permanently).where(self._pk_expr()).execute()
else:
self.is_deleted = True
self.save()
@classmethod
def select(cls, *fields):
return super().select(*fields).where(cls.is_deleted == False)
class Meta:
database = db
class Inventory(BaseModel):
# 商品的库存表
# stock = PrimaryKeyField(Stock)
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0) # 分布式锁的乐观锁 这里没用到
# 写一个redis分布式锁
class Lock:
# 初始化
def __init__(self, name, lock_id):
self.lock_id = lock_id
self.redis_client = redis.Redis(host=host, port=6379)
self.name = name
# 上锁
def acquire(self):
if self.redis_client.set(self.name, self.lock_id, nx=True, ex=15):
# 启动一个线程去定时的刷新这个过期时间,这个操作最好也是使用lua脚本
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.set(self.name, self.lock_id, nx=True, ex=15):
return True
# 释放锁
def release(self):
lock_id = self.redis_client.get(self.name)
if lock_id == self.lock_id:
self.redis_client.delete(self.name)
else:
print("不能删除不属于自己的锁")
def sell2():
# 多线程下的并发带来的数据不一致的问题
# 顾客(goods_list)商品id为1的买10件以此类推
goods_list = [(1, 10), (2, 20), (3, 30)]
# 事务
import uuid
lock_uuid = str(uuid.uuid4())
with db.atomic() as txn:
# 超卖
for goods_id, num in goods_list:
# 获取锁
lock = Lock(f"lock:goods_{goods_id}", lock_uuid)
# 上锁
lock.acquire()
# 查询库存
goods_inv = Inventory.get(Inventory.goods == goods_id)
print(f"商品{goods_id} 售出 {num}件")
time.sleep(randint(1, 3))
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
lock.release() # 释放锁
break
else:
# 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
# mysql中有修改的情况下另一个修改将无法进行 这是mysql的原子性
query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
lock.release() # 释放锁
if __name__ == '__main__':
# 开两个线程
t1 = threading.Thread(target=sell2)
t2 = threading.Thread(target=sell2)
t1.start()
t2.start()
t1.join()
t2.join()
输出结果为:
商品1 售出 10件
更新成功
不能删除不属于自己的锁
商品2 售出 20件
更新成功
不能删除不属于自己的锁
商品3 售出 30件
更新成功
不能删除不属于自己的锁
商品1 售出 10件
更新成功
不能删除不属于自己的锁
商品2 售出 20件
更新成功
不能删除不属于自己的锁
商品3 售出 30件
更新成功
不能删除不属于自己的锁
释放锁的代码仍然可能存在问题,不是原子操作,可以使用lua脚本继续封装
#! -*-conding=: UTF-8 -*-
# 2023/8/11 19:47
import redis
import time
import threading
from random import randint
from datetime import datetime
from peewee import *
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
pass
host = "192.168.91.1"
db = ReconnectMySQLDatabase("inventory", host=host, port=3306, user="root", password="root")
# 这个BaseModel可以忽略 只不过是重写了peewee中的cudr的一些操作 继承就行了
class BaseModel(Model):
add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
is_deleted = BooleanField(default=False, verbose_name="是否删除")
update_time = DateTimeField(verbose_name="更新时间", default=datetime.now)
def save(self, *args, **kwargs):
# 判断这是一个新添加的数据还是更新的数据
if self._pk is not None:
# 这是一个新数据
self.update_time = datetime.now()
return super().save(*args, **kwargs)
@classmethod
def delete(cls, permanently=False): # permanently表示是否永久删除
if permanently:
return super().delete()
else:
return super().update(is_deleted=True)
def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
if permanently:
return self.delete(permanently).where(self._pk_expr()).execute()
else:
self.is_deleted = True
self.save()
@classmethod
def select(cls, *fields):
return super().select(*fields).where(cls.is_deleted == False)
class Meta:
database = db
class Inventory(BaseModel):
# 商品的库存表
# stock = PrimaryKeyField(Stock)
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0) # 分布式锁的乐观锁 这里没用到
# 写一个redis分布式锁
class Lock:
# 初始化
def __init__(self, name, lock_id):
self.lock_id = lock_id
self.redis_client = redis.Redis(host=host, port=6379)
self.name = name
# 上锁
def acquire(self):
if self.redis_client.set(self.name, self.lock_id, nx=True, ex=15):
# 启动一个线程去定时的刷新这个过期时间,这个操作最好也是使用lua脚本
return True
else:
while True:
import time
time.sleep(1)
if self.redis_client.set(self.name, self.lock_id, nx=True, ex=15):
return True
# 释放锁
def release(self):
# lock_id = self.redis_client.get(self.name)
# if lock_id == self.lock_id:
# self.redis_client.delete(self.name)
# else:
# print("不能删除不属于自己的锁")
unlock_script = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
"""
unlock = self.redis_client.register_script(unlock_script)
result = unlock(keys=[self.name], args=[self.lock_id])
if result:
return True
else:
print("不能删除不属于自己的锁")
return False
def sell2():
# 多线程下的并发带来的数据不一致的问题
# 顾客(goods_list)商品id为1的买10件以此类推
goods_list = [(1, 10), (2, 20), (3, 30)]
# 事务
import uuid
lock_uuid = str(uuid.uuid4())
with db.atomic() as txn:
# 超卖
for goods_id, num in goods_list:
# 获取锁
lock = Lock(f"lock:goods_{goods_id}", lock_uuid)
# 上锁
lock.acquire()
# 查询库存
goods_inv = Inventory.get(Inventory.goods == goods_id)
print(f"商品{goods_id} 售出 {num}件")
time.sleep(randint(1, 3))
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
lock.release() # 释放锁
break
else:
# 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
# mysql中有修改的情况下另一个修改将无法进行 这是mysql的原子性
query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
lock.release() # 释放锁
if __name__ == '__main__':
# 开两个线程
t1 = threading.Thread(target=sell2)
t2 = threading.Thread(target=sell2)
t1.start()
t2.start()
t1.join()
t2.join()
输出结果为:
商品1 售出 10件
更新成功
商品1 售出 10件
商品2 售出 20件
更新成功
商品3 售出 30件
更新成功
更新成功
商品2 售出 20件
更新成功
商品3 售出 30件
更新成功
Redis官方库也为我们实现了分布式锁,根据需要使用即可
import threading
import redis
import time
from peewee import *
# 定义数据库连接
db = MySQLDatabase("inventory", host="192.168.91.1", port=3306, user="root", password="root")
# 定义基础模型
class BaseModel(Model):
class Meta:
database = db
# 定义库存表
class Inventory(BaseModel):
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0)
# 初始化数据库连接
db.connect()
# 创建库存表
db.create_tables([Inventory], safe=True)
# 创建Redis连接
redis_client = redis.StrictRedis(host='192.168.91.1', port=6379, db=0, decode_responses=True)
# 出售商品函数,使用Redis锁
def sell(order_list):
with db.atomic() as txn:
# 检查所有商品库存是否充足
for goods_id, num in order_list:
try:
# 查询库存,同时尝试获取Redis锁
with redis_client.lock(f'inventory_lock:{goods_id}', blocking_timeout=10):
goods_inv = Inventory.select().where(Inventory.goods == goods_id).get()
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
return False
except Inventory.DoesNotExist:
print(f"商品{goods_id} 不存在")
txn.rollback()
return False
# 更新所有商品库存和版本号
for goods_id, num in order_list:
query = Inventory.update(stocks=Inventory.stocks - num, version=Inventory.version + 1).where(
Inventory.goods == goods_id)
rows_updated = query.execute()
if rows_updated != 1:
print(f"商品{goods_id} 更新失败")
txn.rollback()
return False
print(f"订单成功,商品列表:{order_list}")
txn.commit()
return True
# 创建库存初始数据
def create_data():
with db.atomic():
for i in range(5):
Inventory.create(goods=i, stocks=100)
# 主程序
if __name__ == "__main__":
# create_data()
# 模拟一批订单并发出售商品
order_list = [(1, 50), (2, 20), (3, 30)]
current_stock = {}
for goods_id, _ in order_list:
stock = Inventory.get(Inventory.goods == goods_id).stocks
current_stock[goods_id] = stock
print(f"当前库存:{current_stock}")
num_threads = 2 # 可根据需要调整线程数
threads = []
for _ in range(num_threads):
t = threading.Thread(target=sell, args=(order_list,))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
# 查询库存
final_stock = {}
for goods_id, _ in order_list:
stock = Inventory.get(Inventory.goods == goods_id).stocks
final_stock[goods_id] = stock
print(f"最终库存:{final_stock}")
输出结果为:
当前库存:{1: 100, 2: 100, 3: 100}
订单成功,商品列表:[(1, 50), (2, 20), (3, 30)]
订单成功,商品列表:[(1, 50), (2, 20), (3, 30)]
最终库存:{1: 0, 2: 60, 3: 40}
在这个示例中,我们使用了Redis的redis-py
库来实现分布式锁。在每次查询库存之前,我们尝试获取一个以商品ID命名的Redis锁。只有成功获取锁的线程才能继续执行查询和库存更新操作,其他线程会等待锁被释放。这样可以确保在并发情况下,每个商品的库存查询和更新都是串行的,从而避免超卖的问题。
这个实现使用了Redis分布式锁来防止并发更新库存,但仍然可能面临一些问题:
锁的过期时间应该设置多长?
更严重的是,不管你设置多长,极端情况下,都会出现业务执行时间超过过期时间。
我们可以考虑在锁还没有过期的时候,再一次延长过期时间,那么:
续约其实就是对Redis
的key
延长过期时间,需要注意的时,续期也要判断锁是不是自己的,因为锁可能已经过期被其他实例获取了。
使用py-redis-lock
从图上看出作者和其它大多数用Redis实现分布式锁的思路类似(SET NX
),但是他对每个锁多用了一个list
类型键来做信号控制,如果客户端第一次尝试获取锁失败,可以选择在signal列表上阻塞一个timeout时间用来接收锁被释放的通知,Redis列表的这个特性保证了每次只有一个客户端接收到了锁释放的通知。而获取到锁的客户端在使用完后会在对应的信号列表上推送一个通知。另外,作者对锁超时还增加了一个刷新的功能来延长(Extend)对锁的占用,可以保证在持有锁的客户端上完成所有操作后才释放锁。个人认为这种设计的优点和需要注意的点如下:
优点
:
list signal
来让客户端决定是否要block自己;
Warning
源码如下
import threading
import weakref
from base64 import b64encode
from logging import getLogger
from os import urandom
from redis import StrictRedis
__version__ = '3.6.0'
logger = getLogger(__name__)
text_type = str
binary_type = bytes
# Check if the id match. If not, return an error code.
UNLOCK_SCRIPT = b"""
if redis.call("get", KEYS[1]) ~= ARGV[1] then
return 1
else
redis.call("del", KEYS[2])
redis.call("lpush", KEYS[2], 1)
redis.call("pexpire", KEYS[2], ARGV[2])
redis.call("del", KEYS[1])
return 0
end
"""
# Covers both cases when key doesn't exist and doesn't equal to lock's id
EXTEND_SCRIPT = b"""
if redis.call("get", KEYS[1]) ~= ARGV[1] then
return 1
elseif redis.call("ttl", KEYS[1]) < 0 then
return 2
else
redis.call("expire", KEYS[1], ARGV[2])
return 0
end
"""
RESET_SCRIPT = b"""
redis.call('del', KEYS[2])
redis.call('lpush', KEYS[2], 1)
redis.call('pexpire', KEYS[2], ARGV[2])
return redis.call('del', KEYS[1])
"""
RESET_ALL_SCRIPT = b"""
local locks = redis.call('keys', 'lock:*')
local signal
for _, lock in pairs(locks) do
signal = 'lock-signal:' .. string.sub(lock, 6)
redis.call('del', signal)
redis.call('lpush', signal, 1)
redis.call('expire', signal, 1)
redis.call('del', lock)
end
return #locks
"""
class AlreadyAcquired(RuntimeError):
pass
class NotAcquired(RuntimeError):
pass
class AlreadyStarted(RuntimeError):
pass
class TimeoutNotUsable(RuntimeError):
pass
class InvalidTimeout(RuntimeError):
pass
class TimeoutTooLarge(RuntimeError):
pass
class NotExpirable(RuntimeError):
pass
class Lock(object):
"""
A Lock context manager implemented via redis SETNX/BLPOP.
"""
unlock_script = None
extend_script = None
reset_script = None
reset_all_script = None
def __init__(self, redis_client, name, expire=None, id=None, auto_renewal=False, strict=True, signal_expire=1000):
"""
:param redis_client:
An instance of :class:`~StrictRedis`.
:param name:
The name (redis key) the lock should have.
:param expire:
The lock expiry time in seconds. If left at the default (None)
the lock will not expire.
:param id:
The ID (redis value) the lock should have. A random value is
generated when left at the default.
Note that if you specify this then the lock is marked as "held". Acquires
won't be possible.
:param auto_renewal:
If set to ``True``, Lock will automatically renew the lock so that it
doesn't expire for as long as the lock is held (acquire() called
or running in a context manager).
Implementation note: Renewal will happen using a daemon thread with
an interval of ``expire*2/3``. If wishing to use a different renewal
time, subclass Lock, call ``super().__init__()`` then set
``self._lock_renewal_interval`` to your desired interval.
:param strict:
If set ``True`` then the ``redis_client`` needs to be an instance of ``redis.StrictRedis``.
:param signal_expire:
Advanced option to override signal list expiration in milliseconds. Increase it for very slow clients. Default: ``1000``.
"""
if strict and not isinstance(redis_client, StrictRedis):
raise ValueError("redis_client must be instance of StrictRedis. "
"Use strict=False if you know what you're doing.")
if auto_renewal and expire is None:
raise ValueError("Expire may not be None when auto_renewal is set")
self._client = redis_client
if expire:
expire = int(expire)
if expire < 0:
raise ValueError("A negative expire is not acceptable.")
else:
expire = None
self._expire = expire
self._signal_expire = signal_expire
if id is None:
self._id = b64encode(urandom(18)).decode('ascii')
elif isinstance(id, binary_type):
try:
self._id = id.decode('ascii')
except UnicodeDecodeError:
self._id = b64encode(id).decode('ascii')
elif isinstance(id, text_type):
self._id = id
else:
raise TypeError("Incorrect type for `id`. Must be bytes/str not %s." % type(id))
self._name = 'lock:' + name
self._signal = 'lock-signal:' + name
self._lock_renewal_interval = (float(expire) * 2 / 3
if auto_renewal
else None)
self._lock_renewal_thread = None
self.register_scripts(redis_client)
@classmethod
def register_scripts(cls, redis_client):
global reset_all_script
if reset_all_script is None:
reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
cls.unlock_script = redis_client.register_script(UNLOCK_SCRIPT)
cls.extend_script = redis_client.register_script(EXTEND_SCRIPT)
cls.reset_script = redis_client.register_script(RESET_SCRIPT)
cls.reset_all_script = redis_client.register_script(RESET_ALL_SCRIPT)
@property
def _held(self):
return self.id == self.get_owner_id()
def reset(self):
"""
Forcibly deletes the lock. Use this with care.
"""
self.reset_script(client=self._client, keys=(self._name, self._signal), args=(self.id, self._signal_expire))
@property
def id(self):
return self._id
def get_owner_id(self):
owner_id = self._client.get(self._name)
if isinstance(owner_id, binary_type):
owner_id = owner_id.decode('ascii', 'replace')
return owner_id
def acquire(self, blocking=True, timeout=None):
"""
:param blocking:
Boolean value specifying whether lock should be blocking or not.
:param timeout:
An integer value specifying the maximum number of seconds to block.
"""
logger.debug("Getting %r ...", self._name)
if self._held:
raise AlreadyAcquired("Already acquired from this Lock instance.")
if not blocking and timeout is not None:
raise TimeoutNotUsable("Timeout cannot be used if blocking=False")
if timeout:
timeout = int(timeout)
if timeout < 0:
raise InvalidTimeout("Timeout (%d) cannot be less than or equal to 0" % timeout)
if self._expire and not self._lock_renewal_interval and timeout > self._expire:
raise TimeoutTooLarge("Timeout (%d) cannot be greater than expire (%d)" % (timeout, self._expire))
busy = True
blpop_timeout = timeout or self._expire or 0
timed_out = False
while busy:
busy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)
if busy:
if timed_out:
return False
elif blocking:
timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout
else:
logger.debug("Failed to get %r.", self._name)
return False
# 是否应该取刷新过期时间,不是一定要这样做, 这是有风险, 如果当前的进程没有挂,但是一直阻塞,退不出来,就会永远续租
logger.debug("Got lock for %r.", self._name)
if self._lock_renewal_interval is not None:
self._start_lock_renewer()
return True
def extend(self, expire=None):
"""Extends expiration time of the lock.
:param expire:
New expiration time. If ``None`` - `expire` provided during
lock initialization will be taken.
"""
if expire:
expire = int(expire)
if expire < 0:
raise ValueError("A negative expire is not acceptable.")
elif self._expire is not None:
expire = self._expire
else:
raise TypeError(
"To extend a lock 'expire' must be provided as an "
"argument to extend() method or at initialization time."
)
error = self.extend_script(client=self._client, keys=(self._name, self._signal), args=(self._id, expire))
if error == 1:
raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
elif error == 2:
raise NotExpirable("Lock %s has no assigned expiration time" % self._name)
elif error:
raise RuntimeError("Unsupported error code %s from EXTEND script" % error)
@staticmethod
def _lock_renewer(lockref, interval, stop):
"""
Renew the lock key in redis every `interval` seconds for as long
as `self._lock_renewal_thread.should_exit` is False.
"""
log = getLogger("%s.lock_refresher" % __name__)
while not stop.wait(timeout=interval):
log.debug("Refreshing lock")
lock = lockref()
if lock is None:
log.debug("The lock no longer exists, "
"stopping lock refreshing")
break
lock.extend(expire=lock._expire)
del lock
log.debug("Exit requested, stopping lock refreshing")
def _start_lock_renewer(self):
"""
Starts the lock refresher thread.
"""
if self._lock_renewal_thread is not None:
raise AlreadyStarted("Lock refresh thread already started")
logger.debug(
"Starting thread to refresh lock every %s seconds",
self._lock_renewal_interval
)
self._lock_renewal_stop = threading.Event()
self._lock_renewal_thread = threading.Thread(
group=None,
target=self._lock_renewer,
kwargs={'lockref': weakref.ref(self),
'interval': self._lock_renewal_interval,
'stop': self._lock_renewal_stop}
)
self._lock_renewal_thread.setDaemon(True)
self._lock_renewal_thread.start()
def _stop_lock_renewer(self):
"""
Stop the lock renewer.
This signals the renewal thread and waits for its exit.
"""
if self._lock_renewal_thread is None or not self._lock_renewal_thread.is_alive():
return
logger.debug("Signalling the lock refresher to stop")
self._lock_renewal_stop.set()
self._lock_renewal_thread.join()
self._lock_renewal_thread = None
logger.debug("Lock refresher has stopped")
def __enter__(self): # 用来使用with语句
acquired = self.acquire(blocking=True)
assert acquired, "Lock wasn't acquired, but blocking=True"
return self
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
self.release()
def release(self):
"""Releases the lock, that was acquired with the same object.
.. note::
If you want to release a lock that you acquired in a different place you have two choices:
* Use ``Lock("name", id=id_from_other_place).release()``
* Use ``Lock("name").reset()``
"""
if self._lock_renewal_thread is not None:
self._stop_lock_renewer()
logger.debug("Releasing %r.", self._name)
error = self.unlock_script(client=self._client, keys=(self._name, self._signal),
args=(self._id, self._signal_expire))
if error == 1:
raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)
elif error:
raise RuntimeError("Unsupported error code %s from EXTEND script." % error)
def locked(self):
"""
Return true if the lock is acquired.
Checks that lock with same name already exists. This method returns true, even if
lock have another id.
"""
return self._client.exists(self._name) == 1
reset_all_script = None
def reset_all(redis_client):
"""
Forcibly deletes all locks if its remains (like a crash reason). Use this with care.
:param redis_client:
An instance of :class:`~StrictRedis`.
"""
Lock.register_scripts(redis_client)
reset_all_script(client=redis_client) # noqa
作者定义了UNLOCK
, EXTEND
, RESET
… 等原子操作的Lua脚本。
如果指定了锁自动刷新,那刷新间隔会设定在超时的2/3时间。
这个库提供的分布式锁很灵活,是否需要超时?是否需要自动刷新?是否要阻塞?都是可选的。没有最好的算法,只有最合适的算法,用户应该根据自己是场景谨慎选择。
使用py-redis-lock
的锁实现
import threading
import redis
from datetime import datetime
from peewee import *
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
from py_redis_lock import Lock as PyLock
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
pass
db = ReconnectMySQLDatabase("inventory", host="192.168.91.1", port=3306, user="root", password="root")
# 删除 - 物理删除和逻辑删除 - 物理删除 -假设你把某个用户数据 - 用户购买记录,用户的收藏记录,用户浏览记录啊
# 通过save方法做了修改如何确保只修改update_time值而不是修改add_time
class BaseModel(Model):
add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
is_deleted = BooleanField(default=False, verbose_name="是否删除")
update_time = DateTimeField(verbose_name="更新时间", default=datetime.now)
def save(self, *args, **kwargs):
# 判断这是一个新添加的数据还是更新的数据
if self._pk is not None:
# 这是一个新数据
self.update_time = datetime.now()
return super().save(*args, **kwargs)
@classmethod
def delete(cls, permanently=False): # permanently表示是否永久删除
if permanently:
return super().delete()
else:
return super().update(is_deleted=True)
def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
if permanently:
return self.delete(permanently).where(self._pk_expr()).execute()
else:
self.is_deleted = True
self.save()
@classmethod
def select(cls, *fields):
return super().select(*fields).where(cls.is_deleted == False)
class Meta:
database = db
class Inventory(BaseModel):
# 商品的库存表
# stock = PrimaryKeyField(Stock)
goods = IntegerField(verbose_name="商品id", unique=True)
stocks = IntegerField(verbose_name="库存数量", default=0)
version = IntegerField(verbose_name="版本号", default=0) # 分布式锁的乐观锁
def sell():
# 多线程下的并发带来的数据不一致的问题
goods_list = [(1, 10), (2, 20), (3, 30)]
with db.atomic() as txn:
# 超卖
# 续租过期时间 - 看门狗 - java中有一个redisson
# 如何防止我设置的值被其他的线程给删除掉
for goods_id, num in goods_list:
# 查询库存
redis_client = redis.Redis(host="192.168.91.1")
lock = PyLock(redis_client, f"lock:goods_{goods_id}", auto_renewal=True, expire=15)
lock.acquire()
goods_inv = Inventory.get(Inventory.goods == goods_id)
import time
time.sleep(20)
if goods_inv.stocks < num:
print(f"商品:{goods_id} 库存不足")
txn.rollback()
break
else:
# 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.goods == goods_id)
ok = query.execute()
if ok:
print("更新成功")
else:
print("更新失败")
lock.release()
if __name__ == "__main__":
t1 = threading.Thread(target=sell)
t2 = threading.Thread(target=sell)
t1.start()
t2.start()
t1.join()
t2.join()
前面讨论的都是单点的Redis
,在集群部署的时候,需要额外考虑一个问题:主从切换。
一切顺利的情况:
主从切换的异常情况:
Redlock 的思路:不再部署单一主从集群,而是多个主节点(没有从节点) 。
比如说我们部署五个主节点,那么加锁过程是类似的,只是要在五个主节点上都加上锁,如果多数(这里是三个)都成功了,那么就认为加锁成功。
示例:
import redis
import time
class Redlock(object):
def __init__(self, connection_list, retry_times=3, retry_delay=200):
self.servers = []
for connection in connection_list:
self.servers.append(redis.StrictRedis(host=connection["host"], port=connection["port"], db=connection["db"]))
self.quorum = len(self.servers) // 2 + 1
self.retry_times = retry_times
self.retry_delay = retry_delay
def lock(self, resource, ttl):
retry = self.retry_times
while retry > 0:
n = 0
start_time = time.time() * 1000
for server in self.servers:
if server.set(resource, 1, nx=True, px=ttl):
n += 1
elapsed_time = time.time() * 1000 - start_time
validity = ttl - elapsed_time - 2
if n >= self.quorum and validity > 0:
return validity
else:
for server in self.servers:
server.delete(resource)
retry -= 1
time.sleep(self.retry_delay / 1000)
return False
def unlock(self, resource):
for server in self.servers:
server.delete(resource)
# Example usage:
redlock = Redlock([{"host": "localhost", "port": 6379, "db": 0}], retry_times=3, retry_delay=200)
validity = redlock.lock("my_resource", 10000)
if validity:
print("Lock acquired")
# Do something here...
redlock.unlock("my_resource")
else:
print("Failed to acquire lock")
Redlock算法的基本思想是,使用多个Redis实例来协调锁,以确保在任何情况下都不会出现死锁或竞争条件。Redlock算法的实现比较复杂,需要考虑多个因素,例如时钟漂移、网络延迟等。
感兴趣的也可以参考第三方库https://github.com/SPSCommerce/redlock-py/blob/master/redlock/init.py的实现
优点
缺点
Redis分布式锁
是实现分布式锁的一种常用方式,以下是一些可以优化Redis分布式锁的方法:
https://readthedocs.org/projects/python-redis-lock/downloads/pdf/latest/
https://redis.io/docs/manual/patterns/distributed-locks/
https://python-redis-lock.readthedocs.io/en/latest/
https://juejin.cn/post/7263878008615682104?searchId=202308141723011AA4D66584A48C0F786A
https://levelup.gitconnected.com/implementing-redlock-on-redis-for-distributed-locks-a3cfe60d4ea4