概览
1.高并发秒杀问题及可能出现的bug
2.秒杀场景JVM级别锁和分布式锁
3.大厂分布式锁Redisson框架
4.从Redisson源码剖析lua解决锁原子性问题
5.从Redisson源码剖析经典锁续命问题
6.Redis主从架构锁失效如何解决
7.Redlock分布式锁高并发下可能存在的问题
8.双十一大促如何将分布式锁性能提升100倍
9.放置订单重复提交或支付分布式锁方案设计
10.防止取消订单误支付bug分布式锁方案设计
快速待见一个redis环境:Redis和Redis可视化管理工具的下载和安装_redisdesktopmanager下载-CSDN博客
新建一个springboot快速构建:pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.9</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>18</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
它有哪些问题?
没有线程安全的保护措施,多个进程访问时,可能会导致超卖问题。
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("~/deduct_stock")
public String deductStock(){
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get("stock")
if(stock>0){
int realStock =stock-1;
stringRedisTemplate.opsForValue().set("stock",realStock+"");//jedis.set(key,value)
System.out.println("扣减成功,剩余库存:"+realStock);
}else{
System.out.println("扣减失败,库存不足");
}
return "end";
}
同步代码块,通过内置排序锁实现,多个进程访问时,排队进行
它有什么问题?
在单机模式下,能够保证线程安全,但是在分布式集群下,还是会线程不安全,导致超卖问题
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("~/deduct_stock")
public String deductStock(){
synchronized (this){
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get("stock")
if(stock>0){
int realStock =stock-1;
stringRedisTemplate.opsForValue().set("stock",realStock+"");//jedis.set(key,value)
System.out.println("扣减成功,剩余库存:"+realStock);
}else{
System.out.println("扣减失败,库存不足");
}
}
return "end";
}
使用分布式锁的原因是在集群结构上加锁,解决集群环境下多并发导致超卖问题。
SETNX命令:? setnx key value
- 将key的值设置为value,当且仅当key不存在
- 若key存在,则该命令无任何操作
- SETNX是Set if not exists的简写
- 可用版本>=1.0.0
使用该命令保证只有一个用户可以修改成功,另一个用户的操作不生效。
多线程并发在redis排队,单线程处理,拿到锁的处理,锁使用完,要删除,不然会造成死锁问题。
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("~/deduct_stock")
public String deductStock(){
String lockKey="product_101";
Boolean result=stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"zhangsan");//jedis.setnx(k,v)
if(!result){
return "biz_code";
}
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get("stock")
if(stock>0){
int realStock =stock-1;
stringRedisTemplate.opsForValue().set("stock",realStock+"");//jedis.set(key,value)
System.out.println("扣减成功,剩余库存:"+realStock);
}else{
System.out.println("扣减失败,库存不足");
}
return "end";
}
改进一:当程序异常,未及时对持有的锁释放,也会导致死锁问题。
? ? ? ? ? ? ? ?解决:将锁删除放在finally里,保证一定会执行。
改进二:当程序挂掉,但是锁没有释放,finally也没有执行,还是会导致死锁问题。
? ? ? ? ? ? ? ?解决:设置超时时间,如果程序挂了,到时间自动释放锁,不影响后续操作。
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("~/deduct_stock")
public String deductStock(){
String lockKey="product_101";
Boolean result=stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"zhangsan");//jedis.setnx(k,v)
//设置超时时间
stringRedisTemplate.expire(lockKey,10, TimeUnit.SECONDS);
if(!result){
return "biz_code";
}
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get("stock")
if(stock>0){
int realStock =stock-1;
stringRedisTemplate.opsForValue().set("stock",realStock+"");//jedis.set(key,value)
System.out.println("扣减成功,剩余库存:"+realStock);
}else{
System.out.println("扣减失败,库存不足");
}
}finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
改进三: 获取锁和超时时间分开写,可能会获取锁还没设置超时时间的时候挂掉,还是会导致前面那个问题。
? ? ? ? ? ? ? 解决:用redis内置方法,获取锁的同时设置超时时间,保证操作原子性。
String lockKey="product_101";
//Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"zhangsan");//jedis.setnx(k,v)
设置超时时间
//stringRedisTemplate.expire(lockKey,10, TimeUnit.SECONDS);
Boolean result=stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"zhangsan",10, TimeUnit.SECONDS);//jedis.setnx(k,v)
问题一:高并发场景下存在,一个线程刚加锁,就被另一个线程解锁的问题,导致锁一直刚获取就失效。
问题的关键在:不能释放别人的锁。应确保谁加锁,谁释放。
?给锁设置线程ID,确保加锁和释放锁的是同一线程。
Rlock redissonLock=redisson.getLock(lockKey); redissonLock.lock();
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("~/deduct_stock")
public String deductStock(){
String lockKey="product_101";
// Boolean result=stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"zhangsan");//jedis.setnx(k,v)
// //设置超时时间
// stringRedisTemplate.expire(lockKey,10, TimeUnit.SECONDS);
String clientId = UUID.randomUUID().toString();
Boolean result=stringRedisTemplate.opsForValue().setIfAbsent(lockKey,clientId,10, TimeUnit.SECONDS);//jedis.setnx(k,v)
if(!result){
return "biz_code";
}
Rlock redissonLock=redisson.getLock(lockKey);
redissonLock.lock();// setIfAbsent(lockKey,clientId,30,TimeUnit.SECONDS)
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get("stock")
if(stock>0){
int realStock =stock-1;
stringRedisTemplate.opsForValue().set("stock",realStock+"");//jedis.set(key,value)
System.out.println("扣减成功,剩余库存:"+realStock);
}else{
System.out.println("扣减失败,库存不足");
}
}finally {
if(clientId.equals(stringRedisTemplate.opsForValue().get("stock")))
stringRedisTemplate.delete(lockKey);
}
return "end";
}
问题二:线程获取锁,但是执行过程还没结束,卡顿,但是锁过期了,导致需要重新弄加锁的问题。
解决:无论设置多大的超时时间都有一定的概率导致这个问题,所以解决该问题的核心点是:锁续命机制——线程执行结束前,不断给即将过期的锁增加超时时间,以延长锁的寿命。
finally { if(clientId.equals(stringRedisTemplate.opsForValue().get("stock"))) stringRedisTemplate.delete(lockKey); }
Redisson 和 Jedis 的简单比较_redisson代替jedis-CSDN博客
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
@Bean
public Redisson redisson(){
//此为单机模式
Config config=new Config();
config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
//config.setLockWatchdogTimeout(10000);//设置分布式锁watch dog超时时间
return (Redisson) Redisson.create(config);
}
Controller
@Autowired
private Redisson redisson;
@Autowired
private Redisson redisson;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@RequestMapping("~/deduct_stock")
public String deductStock(){
String lockKey="product_101";
// Boolean result=stringRedisTemplate.opsForValue().setIfAbsent(lockKey,"zhangsan");//jedis.setnx(k,v)
// //设置超时时间
// stringRedisTemplate.expire(lockKey,10, TimeUnit.SECONDS);
String clientId = UUID.randomUUID().toString();
Boolean result=stringRedisTemplate.opsForValue().setIfAbsent(lockKey,clientId,10, TimeUnit.SECONDS);//jedis.setnx(k,v)
if(!result){
return "biz_code";
}
Rlock redissonLock=redisson.getLock(lockKey);
redissonLock.lock();// setIfAbsent(lockKey,clientId,30,TimeUnit.SECONDS)
try{
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));//jedis.get("stock")
if(stock>0){
int realStock =stock-1;
stringRedisTemplate.opsForValue().set("stock",realStock+"");//jedis.set(key,value)
System.out.println("扣减成功,剩余库存:"+realStock);
}else{
System.out.println("扣减失败,库存不足");
}
}finally {
redissonLock.unlock();
// if(clientId.equals(stringRedisTemplate.opsForValue().get("stock")))
// stringRedisTemplate.delete(lockKey);
}
return "end";
}
加锁的代码:
RLock redissonLock=redisson.getLock(lockKey);
redissonLock.lock();// setIfAbsent(lockKey,clientId,30,TimeUnit.SECONDS)
RedissonLock.class的lock()加锁:?
具体的加锁代码是由Lua脚本实现的。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
Redis Lua脚本
Redis 2.6推出Redis Lua脚本,允许开发者使用Lua语言编写脚本传到Redis中执行。通过内置Lua解释器,可以使用EVAL命令对Lua脚本进行求值。
-------------------------------------------------------------------------------------------------------------------------
Redis Lua的优势:
减少网络开销:
????????将原先五次请求放入redis服务器上完成。
? ? ? ? 使用脚本,减少了网络往返时延,和管道类似。
原子操作:
????????Redis将整个脚本作为一个整体执行,不允许中间插入其他命令。管道不是原子的,但Redis的批量操作(类似mset)是原子的
替代Redis事务:
????????redis自带的事务功能很鸡肋,而redis的lua脚本几乎实现了常规事务功能。官网推荐redis事务可以使用Redis Lua脚本替代。
-------------------------------------------------------------------------------------------------------------------------
EVAL命令格式:
????????EVAL script numkeys key [key ...] arg [arg ...]
- script参数是一段Lua脚本程序,运行再redis内置的lua解释器里。被定义为一个Lua函数。
- numkeys参数用于指定键名参数个数[key...]
- 表示脚本中所用到的那些Redis键key。键名可以在Lua通过全局变量KEYS数组,用1为基址的形式访问:KEYS[1]
>eval "return {KEYS[1], KEYS[2], ARGV[1], ARGV[2]} 2 key1 key2 first second
)1 "key1"
)2 "key2"
)3 "first"
)4 "second"
return {KEYS[1], KEYS[2], ARGV[1], ARGV[2]}? ?被求值的Lua脚本,数字2指定了键名参数的数量
key1 key2是键名参数,分别使用KEYS[1], KEYS[2]访问
first second是附加参数, 分别用ARGV[1], ARGV[2]访问
-------------------------------------------------------------------------------------------------------------------------
在Lua脚本中,可以使用redis.call()函数执行redis命令
jedis.set("product_stoc_10016","15");//初始化商品10016的库存 String script="local count=redis.call('get',KEYS[1])"+ ????????"local a=tonumber(count)"+ ????????"local b=tonumber(ARGV[1])"+ ????????"if a>=b then "+ ? ? ? ? "redis.call('set',KEYS[1],a-b)"+ ????????"return 1"+ ????????"end"+ ? ? ? ? "return 0"; Object obj=jedis.eval(script,Arrays.asList(product_stock_10016),Arrays.asList("10")); System.out.println(obj);
不要再Lua脚本中出现死循环和耗时的运算,否则redis会阻塞,将不接受其他命令。
redis是单线程、单线程执行脚本。管道不会阻塞redis.
为什么可以保证原子性——因为这里直接使用redis的操作命令,redis操作是原子性的。
SETNX可以用作加锁原语(locking primitive)。
如:SETNX lock.foo <current Unix time+lock timeout +1>
返回1:客户端获取锁成功,可通过DEL lock.foo释放锁
返回0:获取失败,有其他客户端加锁
????????如果是非阻塞锁(nonblocking lock),则返回调用,或进入一个重试循环,直到获取锁或重试超时。【超时时间是为了解决死锁问题】
Redis在这里加入的锁是可重入锁、异步回调的。
这里断的刷新过期时间,多线程不会死锁吗?这里的锁是可重入锁,异步回调
不断刷新过期时间,过期时间不会无限长吗?
若快过期了但是还没有执行完,则进入锁续命,延续过期时间。
每十秒续命以一次?
RedissonLock.class
redis单线程处理,相互等待,对于性能还是有影响的
数据异步,所以主从切换,如果锁是数据,就会可能由数据丢失导致锁丢失。
场景:主节点数据还没有同步给从节点,主节点挂了,从节点成为新的主节点,但丢失了部分数据。
zookeeper (偏向CP):一致性,锁同步一半才算成功——ZAB崩溃恢复,重新选举的机制,确保数据不丢失
redis (偏向AP):即想要redis高性能,又不想丢锁——redlock:一半加锁即为成功——数据一致性,和zookeeper底层相同——>redlock机制
CAP原理:
【大数据专题】大数据理论基础01之分布式CPA原理深入理解_分布式cpa理论-CSDN博客
RAFT算法:
Redis使用红锁来解决这个问题:只有当集群中有一半的节点加锁成功,就认为加锁成功。
红锁实现简单,但存在一些问题:
除此之外,还可以通过持久化来防止数据丢失:
持久化重启,锁丢失,没有一般加锁成功和解——每条命令持久化,但是性能差
通过修改配置打开AOF功能:
# appendonly yes
从现在开始,每执行一个改变数据集的操作,就被追加到AOF文件末尾。
redis重启时,通过重新执行AOF来重建数据集
配置redis多久将数据fsync到磁盘:
appendfsync always:????????每次有新命令就追加AOP文件,慢,但安全
appendfsync everysec:????????每秒fsync一次追加,快,其丢只丢一秒的数据【推荐,兼顾速度和安全】
appendfsync no:? ? ? ? 从不fsync,将数据交给系统处理,更快,但不安全。
4-5种优化方案:
将库存数据分成10端,性能提升10倍
token只能针对同一个页面,多个页面没用来说,不能避免这个问题。可以考虑分布式锁尝试解决。
电商场景可能存在的问题
2021年关于Redis最新的50道面试题(含答案)_以下提供多种 redis 优化的做法,错误的是哪个选项-CSDN博客