Redis全称为 Remote Dictionary Server,表示远程字典服务器,是跨平台的非关系型数据库。Redis 是一个开源的使用键值对(Key-Value)存储数据库,也是一种NoSQL数据库。
NoSQL,指的是非关系型的数据库。NoSQL有时也称作Not Only SQL的缩写,是对不同于传统的关系型数据库的数据库管理系统的统称。
NoSQL用于超大规模数据的存储。(例如谷歌或Facebook每天为他们的用户收集万亿比特的数据)。这些类型的数据存储不需要固定的模式,无需多余操作就可以横向扩展。
map存储数据是在 JVM 的内存中,但 JVM 的内存是有限的 不能大量的存储数据
map不能进行横向和纵向扩展, 但Redis可以,Redis支持集群模式
Redis与map一样,也是基于内存的(所有数据都是放到内存的) -- 速度也快,读的速度是110000次/s,写的速度是81000次/s 。
Redis支持多种持久化模式(把内存的数据 可以直接保存到硬盘)
Redis支持多种数据类型,字符串(String)、哈希(Hash)、列表(list)、集合(set)和有序集合(sorted set)
Redis支持多种数据的淘汰策略(LRU算法)
可以用于存储用户的登陆信息和认证信息(单点登录 SSO,在分布式系统中的任一一台服务器登录,访问其他的服务器均不需要再进行登录)
Redis中最大的功能可能就是做系统数据的缓存(这是这个时代的一个技术特点)
可以用来做秒杀系统
用来存储一些可以容忍丢失的数据 因为Redis本身是基于内存的,怎么都有可能存在数据丢失的风险,所以Redis的适用场景,一定是对数据要求 不严格的地方,比如:评论数、点赞数、最热商品
接口的防刷和限流
Redis 的客户端和服务端之间采取了一种名为 Redis序列化的协议(REdis Serialization Protocol,简称RESP),是基于 TCP 的应用层协议 ,RESP 底层采用的是 TCP 的连接方式,通过 TCP 进行数据传输,然后根据解析规则解析相应信息。
在RESP协议中,数据的类型取决于第一个字节:
+开始表示单行字符串
-开始表示错误类型
:开始表示整数
$开始表示多行字符串
*开始表示数组
在RESP协议中,构成协议的每一部分必须使用\r\n作为结束符
示例:
SET key value
*3\r\n #3表示这个命令由3部分组成
$3\r\n # 第一部分的长度是3
SET\r\n # 第一部分的内容
$3\r\n # 第二部分的长度是3
key\r\n # 第二部分的内容
$5\r\n # 第三部分的长度是5
value\r\n # 第三部分的内容
安装redis 及 redis 图形化界面
因为redis存储数据采用的是key-value进行存储,不管value是什么类型,针对的key操作都属于通用操作
del key #删除键
exists key #检测键是否存在,1-表示存在,0-表示不存在
expire key seconds #为键设置过期时间 单位是秒
pexpire key milliSeconds #为键设置过期时间 单位是毫秒
keys pattern #使用正则表达式查找键,尽量不要使用这个名来来查找,这个名来查找时可能会造成服务器卡顿
persist key #持久化键,过期时间就相当于没有了
ttl key #获取键的剩余过期时间 单位是秒
pttl key #获取键的剩余过期时间 单位是毫秒
type key #获取键的存储的值的类型
select 0~15 #选择操作的库
move key db #移动键到另外一个库中
flushdb #清空当前所在的数据库
flushall #清空全部数据库
dbsize #查看当前数据库中有多少个键
lastsave #查看最后一次操作的时间
monitor #实时监控Redis服务接收到的命令
set key value #设置键的值,如果存在就是修改,不存在就是增加
get key #获取键的值
mset key value[key value ...] #批量设置键的值
mget key [key ...] #批量获取键的值
setex key seconds value #设置键的值,同时设置键的过期时间
setnx key value #当键不存在时才设置键的值
incr key #将键存储的值增加1,只有存储数字的时候有效
incrby key increment #将键存储的值增加给定的增量,只有存储数字的时候有效
decr key #将键存储的值减去1,只有存储数字的时候有效
decrby key decrement #将键存储的值减少给定减量,只有存储数字的时候有效
append key value #当键存在且存储的值是一个字符串时,将值追加到存储的字符串的末尾
hset key field value #设置键存储的字段和值
hget key field #获取键存储的字段值
hmset key field value[field value ...] #批量设置键的存储的字段和值
hmget key [field ...] #批量获取键存储的字段值
hincrby key field increment #键存储的字段值自增1
hsetnx key field value #不存在字段就添加
hexists key field #检查键存储的字段是否存在
hdel key field [field ...] #批量删除键中存储的字段
hgetall key #获取当前键存储的字段和值
hkeys key #获取当前键存储的所有字段
hvals key #获取当前键存储的所有值
hlen key #获取当前键存储的字段数量
#存储数据(从左侧插入数据,从右侧插入数据)
lpush key value [value ...]
rpush key value [value ...]
lpushx key value #将一个值插入到已存在的列表头部
rpushx key value #为已存在的列表添加值
lset key index value #通过索引设置列表元素的值
#弹栈方式获取数据(左侧弹出数据,从右侧弹出数据)
lpop key
rpop key
lrange key start stop #获取列表指定范围内的元素 -1表示末尾
lindex key index #获取指定索引位置的数据
llen key #获取列表的长度
#删除列表中的数据(他是删除当前列表中的count个value值,count > 0从左侧向右侧删除,count < 0从右侧向左侧删除,count == 0删除列表中全部的value)
lrem key count value
#对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
ltrim key start stop
#将一个列表中最后的一个数据,插入到另外一个列表的头部位置
rpoplpush list1 list2
#存储数据
sadd key member [member ...]
#获取数据(获取全部数据)
smembers key
#随机获取一个数据(获取的同时,移除数据,count默认为1,代表弹出数据的数量)
spop key [count]
#交集(取多个set集合交集)
sinter set1 set2 ...
#并集(获取全部集合中的数据)
sunion set1 set2 ...
#差集(获取多个集合中不一样的数据)
sdiff set1 set2 ...
#删除数据
srem key member [member ...]
#查看当前的set集合中是否包含这个值
sismember key member
#添加数据(score必须是数值。member不允许重复的。)
zadd key score member [score member ...]
#修改member的分数(如果member是存在于key中的,正常增加分数,如果memeber不存在,这个命令就相当于zadd)
zincrby key increment member
#查看指定的member的分数
zscore key member
#获取zset中数据的数量
zcard key
#根据score的范围查询member数量
zcount key min max
#删除zset中的成员
zrem key member [member...]
#根据分数从小到大排序,获取指定范围内的数据(withscores如果添加这个参数,那么会返回member对应的分数)
zrange key start stop [withscores]
#根据分数从大到小排序,获取指定范围内的数据(withscores如果添加这个参数,那么会返回member对应的分数)
zrevrange key start stop [withscores]
#根据分数的返回去获取member(withscores代表同时返回score,添加limit,就和MySQL中一样,如果不希望等于min或者max的值被查询出来可以采用 ‘(分数’ 相当于 < 但是不等于的方式,最大值和最小值使用+inf和-inf来标识)
zrangebyscore key min max [withscores] [limit offset count]
#根据分数的返回去获取member(withscores代表同时返回score,添加limit,就和MySQL中一样)
zrevrangebyscore key max min [withscores] [limit offset count]
思考:在添加一个键的时候,需要判断该键是否存在,同时还需要设置该键的过期时间?
按照常规思维来说可以使用两步完成,setnx 命令进行判断 setex命令设置过期时间。但是这样做会存在一个问题,在极端情况下,Redis 执行完 setnx 命令后宕机了或者断电了,那么 setex 命令就没有得到执行,这个添加进去的键就没有过期时间。因此,在做这种操作的需要考虑的是操作的原子性。
<dependencies>
<!-- jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
</dependency>
</dependencies>
public class Demo1 {
@Test
public void set(){
//1. 连接Redis
Jedis jedis = new Jedis("121.199.174.183",6379);
//2. 操作Redis - 因为Redis的命令是什么,Jedis的方法就是什么
jedis.set("name","李四");
//3. 释放资源
jedis.close();
}
@Test
public void get(){
//1. 连接Redis
Jedis jedis = new Jedis("121.199.174.183",6379);
//2. 操作Redis - 因为Redis的命令是什么,Jedis的方法就是什么
String value = jedis.get("name");
System.out.println(value);
//3. 释放资源
jedis.close();
}
}
SCAN操作可以根据提供的匹配方式和扫描数量来进行扫描,但是每次扫描的结果不一定与扫描数量匹配,只是返回一个在扫描数量范围左右的结果。可能比扫描数量多,也可能比扫描的数量少。
@Test
public void scan(){
Jedis jedis = new Jedis("121.199.174.183", 6379);
jedis.select(0);
jedis.flushDB();
jedis.mset("name", "张三", "age", "20", "address", "四川成都", "phone", "13612345678", "class", "计科1班");
//scan就是扫描 扫描需要配置扫描的条件 这个条件就是ScanParams
ScanParams params = new ScanParams().match("*a*").count(3);
//这个静态变量表示的就是光标开始的位置,默认值是0
String cursor = ScanParams.SCAN_POINTER_START;
do {
//扫描后会得到扫描的结果ScanResult
ScanResult<String> scanResult = jedis.scan(cursor, params);
List<String> result = scanResult.getResult();
result.forEach(System.out::println);
System.out.println("===================");
cursor = scanResult.getStringCursor();
//当光标位置再次归0 表示扫描完成
} while (!ScanParams.SCAN_POINTER_START.equals(cursor));
jedis.close();
}
@Test
public void hscan(){
Jedis jedis = new Jedis("121.199.174.183", 6379);
jedis.select(1);
jedis.flushDB();
Map<String, String> map = new HashMap<>();
map.put("name", "张三");
map.put("age", "20");
map.put("address", "四川成都");
map.put("phone", "13612345678");
map.put("class", "计科1班");
jedis.hmset("user", map);
ScanParams params = new ScanParams().match("a*").count(3);
String cursor = ScanParams.SCAN_POINTER_START;
do {
ScanResult<Map.Entry<String, String>> scanResult = jedis.hscan("user", cursor, params);
List<Map.Entry<String, String>> result = scanResult.getResult();
result.forEach(System.out::println);
System.out.println("===================");
cursor = scanResult.getStringCursor();
} while (!ScanParams.SCAN_POINTER_START.equals(cursor));
jedis.close();
}
@Test
public void sscan(){
Jedis jedis = new Jedis("121.199.174.183", 6379);
jedis.select(2);
jedis.flushDB();
jedis.sadd("infos", "bac","abc","cab","cba","acb");
ScanParams params = new ScanParams().match("a*").count(3);
String cursor = ScanParams.SCAN_POINTER_START;
do{
ScanResult<String> infos = jedis.sscan("infos", cursor, params);
List<String> result = infos.getResult();
result.forEach(System.out::println);
System.out.println("=====================");
cursor = infos.getStringCursor();
} while (!ScanParams.SCAN_POINTER_START.equals(cursor));
jedis.close();
}
@Test
public void zscan(){
Jedis jedis = new Jedis("121.199.174.183", 6379);
jedis.select(3);
jedis.flushDB();
Random r = new Random();
List<String> members = Arrays.asList("bac","abc","cab","cba","acb");
Map<String, Double> membersAndScores = new HashMap<>();
members.forEach(str->membersAndScores.put(str, (double)r.nextInt(100)));
jedis.zadd("infos", membersAndScores, ZAddParams.zAddParams().nx());
ScanParams params = new ScanParams().match("*a*").count(3);
String cursor = ScanParams.SCAN_POINTER_START;
do{
ScanResult<Tuple> infos = jedis.zscan("infos", cursor, params);
List<Tuple> result = infos.getResult();
result.forEach(t-> System.out.println(t.getElement() + "=>" + t.getScore()));
System.out.println("===============");
cursor = infos.getStringCursor();
}while (!ScanParams.SCAN_POINTER_START.equals(cursor));
jedis.close();
}
String: 点赞数、评论数、粉丝数、查询缓存等
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.incr("comments");//将评论数增加1
Hash:购物车、抢购、限购、限量发放优惠券、激活码等
Jedis jedis = new Jedis("127.0.0.1", 6379);
//用户添加购物车时,先判断有无该商品的记录
if(jedis.hexists("zhangsan", "goods0001")){
//有记录就在原来的数量上增加
jedis.hincrBy("zhangsan", "goods0001", 3);
} else {
//没有记录就执行新增
jedis.hset("zhangsan", "goods001", "3");
}
Jedis jedis = new Jedis("127.0.0.1", 6379);
//参与抢购的商品编号和抢购的数量
Map<String,String> goods = new HashMap<>();
goods.put("goods0001", "100");
goods.put("goods0002", "100");
goods.put("goods0003", "100");
goods.put("goods0004", "100");
//p0001商家参与抢购的商品信息
jedis.hmset("p0001", goods);
//当用户抢购一件商品时,减少售卖数量
jedis.hincrBy("p0001", "goods0001", -1);
List: 消息队列、最新评论、最近回复等
//消息队列 可以简单的理解为存放消息的队列,既然是队列,就具有先进先出的特性
//使用redis来模拟这个特性就是消息队列
Jedis jedis = new Jedis("127.0.0.1", 6379);
//用户发出的第一条消息
jedis.lpush("message_queue", "第一条消息");
//用户发出的第二条消息
jedis.lpush("message_queue", "第二条消息");
//用户发出的第三条消息
jedis.lpush("message_queue", "第三条消息");
//服务器端只需要依次取就可以了
while (true){
//如果message_queue不存在,那么就阻塞等待1秒,如果还不存在,就抛出异常
List<String> messages = jedis.brpop(1000,"message_queue");
messages.forEach(System.out::println);
}
//消息队列 可以简单的理解为存放消息的队列,既然是队列,就具有先进先出的特性
//使用redis来模拟这个特性就是消息队列
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.lpush("comments", "第一条评论");
jedis.lpush("comments", "第二条评论");
jedis.lpush("comments", "第三条评论");
jedis.lpush("comments", "第四条评论");
jedis.lpush("comments", "第五条评论");
jedis.lpush("comments", "第六条评论");
//展示评论列表,评论展示都是最后的评论在最前,取前5条展示
List<String> comments = jedis.lrange("comments", 0, 4);
comments.forEach(System.out::println);
Set:交集、差集、并集、黑白名单等。如好友、关注、粉丝、感兴趣的人集合
Jedis jedis = new Jedis("127.0.0.1", 6379);
//无效的IP地址列表,只要在集合中,就说明这些IP存在非法操作,也就是所谓的黑名单
jedis.sadd("invalidIpAddress", "10.121.11.76","10.121.11.85","10.121.11.32","10.121.11.21","10.121.11.54");
String currentIp = "10.121.11.85";
if(jedis.sismember("invalidIpAddress", currentIp)){
System.out.println("黑名单用户");
}
Jedis jedis = new Jedis("127.0.0.1", 6379);
//zhangsan和lisi好友列表
jedis.sadd("zhangsan", "lisi", "longhua", "ligang");
jedis.sadd("jinfeng", "longqiang", "ligang", "qiqi");
//求张三和李四两人共同的好友
Set<String> friends = jedis.sinter("zhangsan", "jinfeng");
System.out.println("============两人共同的好友==============");
friends.forEach(System.out::println);
Set<String> allFriends = jedis.sunion("zhangsan", "jinfeng");
System.out.println("============两人所有的好友==============");
allFriends.forEach(System.out::println);
Zset:延迟队列、排行榜、限流等
//所谓的延迟队列,就是指队列中的消息需要在给定的时间点执行,而这个时间点一定是在将来
Jedis jedis = new Jedis("127.0.0.1", 6379);
long time = System.currentTimeMillis();
//消息制作:消息的分数是一个时间戳,这样,根据分数来查就是根据时间戳查
jedis.zadd("delay_queue", time + 500, "500毫秒后消费的消息");
jedis.zadd("delay_queue", time + 1000, "1000毫秒后消费的消息");
jedis.zadd("delay_queue", time + 1500, "1500毫秒后消费的消息");
while (true){
long date = System.currentTimeMillis();
System.out.println("当前消费时间:" + date);
//按分数来查询,这里的分数用的是当前时间
Set<Tuple> messages = jedis.zrangeByScoreWithScores("delay_queue", 0, date);
messages.forEach(t-> {
System.out.println(t.getElement() + " => " + (long)t.getScore());
jedis.zrem("delay_queue", t.getElement());
});
Thread.sleep(100);
}
Jedis jedis = new Jedis("127.0.0.1", 6379);
Random r = new Random();
String[] names = {"AA", "BB","CC","DD","EE", "FF","GG"};
for(String name: names){
int score = r.nextInt(100000) + 10000;
System.out.println(name + " => " + score);
jedis.zadd("attack", score, name);
}
System.out.println("攻击排行榜");
Set<Tuple> attacks = jedis.zrevrangeWithScores("attack", 0, 4);
attacks.forEach(t -> System.out.println(t.getElement() + " => " + t.getScore()));
//假设每个用户5秒内访问了10次,则将被限流,后续请求将不再被处理
Jedis jedis = new Jedis("127.0.0.1", 6379);
Random r = new Random();
String[] names = {"AA", "BB","CC","DD","EE", "FF","GG"};
long currentTime = System.currentTimeMillis();
for(String name: names){
//使用随机数模拟访问次数
int times = r.nextInt(20);
System.out.println(name + " => 访问 " + times + "次");
for(int i=0; i<times; i++){
//这里考虑sorted set的原因是 key只存在一个,但可以有多个得分,这里使用的是时间来作为得分,那么统计的时候就
//只需要统计5秒内的成员数量,这样就能得到5秒内的访问次数。虽然使用hash也能得到这样的访问次数,但是hash在移出
//过期的记录时没有规则,sorted set可以根据得分范围来进行移出,也就是可以通过时间范围来进行移出,这样可以避免
//记录无限增长的情况
jedis.zadd(name, currentTime- r.nextInt(5000), name + System.nanoTime());
}
jedis.zremrangeByScore(name, 0, currentTime - 5000);//移出前5秒的访问记录
}
System.out.println("=========================================");
for(String name: names){
long time = System.currentTimeMillis();
Long count = jedis.zreverserangeByScore(name, time - 5000, time);//统计5秒内访问次数
if(count >= 10){
System.out.println(name + " => 被限流了");
} else {
System.out.println(name + " => 当前访问次数为" + count + ",可以继续访问");
}
}
说到连接池,大家自然而然会想到我们之前学习过的数据源连接池,其作用就是为了避免频繁的创建和关闭连接而带来的性能开销。使用Redis连接池操作也是一样的道理。
@Test
public void jedisPoolTest(){
//JedisPool pool = new JedisPool("121.199.174.183", 6379);
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(100); //连接池中最大活跃数
poolConfig.setMaxIdle(10); //最大空闲数
poolConfig.setMinIdle(5); //最小空闲数
poolConfig.setMaxWaitMillis(2000);//连接池空了后,2秒内未获取jedis就超时
JedisPool pool = new JedisPool(poolConfig,"121.199.174.183", 6379);
Jedis jedis = pool.getResource(); //获取资源,JedisPool池中的资源就是Jedis
jedis.set("test", "连接池");
jedis.close();
pool.close();
}
在操作Redis的时候,执行一个命令需要先发送请求到Redis服务器,这个过程需要经历网络的延迟,Redis 还需要给客户端一个响应。如果需要一次性执行多个命令,这种方式的效率就很低下了。为了解决这个问题,可以通过Redis的管道,先将命令放到客户端的一个Pipeline中,之后一次性的将全部命令都发送到Redis服务,Redis服务一次性的将全部的返回结果响应给客户端。
@Test
public void jedisPipeLineTest(){
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(100); //连接池中最大活跃数
poolConfig.setMaxIdle(10); //最大空闲数
poolConfig.setMinIdle(5); //最小空闲数
poolConfig.setMaxWaitMillis(2000);//连接池空了后,2秒内未获取jedis就超时
JedisPool pool = new JedisPool(poolConfig,"121.199.174.183", 6379);
Jedis jedis = pool.getResource(); //获取资源,JedisPool池中的资源就是Jedis
Pipeline pipeline = jedis.pipelined();
pipeline.set("name","张三");
pipeline.set("sex","男");
pipeline.set("age","20");
List<Object> results = pipeline.syncAndReturnAll();
results.forEach(System.out::println);
jedis.close();
pool.close();
}
Redis 事务的实现需要使用到watch监听机制,在开启事务之前,先使用watch监听器机制将涉及到的相关键进行监听。在开启事务后,如果有其他操作对事务涉及到的键进行了修改,则事务自动取消。如果事务执行完成或者事务被取消,则watch监听自动消除。
开启事务命令:multi
输入要执行的命令:被放入到一个队列中
执行事务命令:exec
取消事务命令:discard
Redis事务的原理: 在开启事务后,会执行一系列的命令,但这些命令并非真正执行,而是将这些命令放在一个队列中。如果执行事务,那么这个队列中的命令全部执行,如果取消了事务,这个队列中的命令全部作废
@Test
public void jedisTransactionTest() throws InterruptedException {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(100);
config.setMaxIdle(10);
config.setMinIdle(5);
config.setMaxWaitMillis(2000);
JedisPool pool = new JedisPool(config, "121.199.174.183", 6379);
Jedis jedis = pool.getResource();
jedis.set("name", "张三");
jedis.set("age", "20");
jedis.watch("name"); //监听 键name 一旦name键被其他的操作修改,该键上的事务自动取消
Thread.sleep(10000L);//睡眠过程中可以修改name和age的值,这样可以使得watch监听自动消除,事务从而取消
Transaction transaction = jedis.multi();//开启事务
transaction.set("name", "李四");
transaction.set("age", "25");
List<Object> objects = transaction.exec();//执行事务
objects.forEach(System.out::println);
jedis.close();
pool.close();
}
在安装目录下找到 redis.windows.conf
文件,这个文件就是 Redis 的配置文件。可以在该配置文件中配置 Redis 密码。
requirepass 密码
在 windows 下,密码生效首先需要关闭 Redis 服务, 然后在命令行中进入 Redis 安装目录,然后执行命令
redis-service.exe redis.windows.conf
redis-cli 连接 Redis: 首先需要执行命令
auth 密码
Jedis 连接 Redis:
Jedis jedis = new Jedis("121.199.174.183", 6379);
jedis.auth(密码);
JedisPool 连接 Redis:
JedisPool pool = new JedisPool(config, "121.199.174.183", 6379, 2000, 密码);
Redis 提供了三种持久化机制: RDB 、 AOF(Append Only File)和混合持久化。其中 RDB 是 Redis 的默认持久化机制。 AOF 默认情况下是关闭的。
RDB
RDB持久化文件存储的是一个二进制的文件,速度比较快,传输也很方便。RDB持久化是通过快照(snapshotting)完成的,当符合一定条件时Redis会自动将内存中的所有数据进行快照并存储在硬盘上。进行快照的条件可以由用户在配置文件中自定义,由两个参数构成:时间(秒)和改动的键的个数。当在指定的时间内被更改的键的个数大于指定的数值时就会进行快照。RDB是redis默认采用的持久化方式,在配置文件中已经预置了3个条件:
save 900 1
save 300 10
save 60 10000
save 900 1
表示在900秒内,有1个key改变了,就执行RDB持久化。
save 300 10
表示在300秒内,有10个key改变了,就执行RDB持久化。
save 60 10000
表示在60秒内,有10000个key改变了,就执行RDB持久化。
由此可以看出,RDB 是在一个时间范围内触发事件才进行持久化,如果在未触发事件的情况下, Redis 宕机或者断电了,将会造成数据丢失。因此无法保证数据的绝对安全
RDB模式如何进行数据同步呢?
RDB模式下,首先清除原来rdb文件中的所有内容,清空内容之后,再将内存所有的数据同步到rdb文件中, 但这可能造成所有的数据都丢失
RDB模式缺点很明显,为什么还要使用RDB呢?
因为在生产环境中,Redis服务器是不可能一直运行,那么就可能出现宕机、重启等情况。这时就需要做数据同步,数据同步时,主要考虑的点是恢复速度。而 RDB 保存数据采用的二进制,在数据恢复的时候,识别速度快,这也是为什么要使用RDB的原因。
AOF
AOF持久化文件存储的是一个文本文件,速度相对 RDB 较慢,到了后期文件会比较大,传输较为困难。开启AOF持久化后每执行一条会更改Redis中的数据的命令,Redis就会将该命令写入硬盘中的AOF文件。在 Redis 中有 AOF 的配置项:
appendonly yes # 开启AOF持久化
appendfilename "appendonly.aof" # AOF持久化文件
#appendfsync always
appendfsync everysec
#appendfsync no
appendfsync always
表示每执行一个写操作,立即持久化到AOF文件中,性能比较低。 appendfsync everysec
表示每秒执行一次持久化,在开发时,综合考虑使用这种方案。 appendfsync no
表示不同步,数据只保存在内存中,操作系统需要时刷新数据即可。
aof-use-rdb-preamble no # 关闭混合持久化
AOF 保存文件时,如果执行相同的命令,AOF文件会记录多次,并没有优化,如果需要优化,则需要手动使用bgrewriteaof
命令来优化。但手动重写很显然不能满足生产环境的需要,因此,redis提供了如下配置来实现:
auto-aof-rewrite-percentage 100 # 触发的条件:下面设置的文件大小增加100%,也就是1倍,才会重写
auto-aof-rewrite-min-size 64mb # aof文件大小达到64M才可能重写记录的命令
由此可以看出,AOF 相对于 RDB 来说更为安全。因此,官方推荐同时开启 RDB 和 AOF。
思考:Redis 同时开启 RDB 和 AOF, 它们之间如何协作的?
aof-use-rdb-preamble yes # 开启混合持久化,需要开启aof才有效
混合持久化实际上就是每一次aof被优化的时候,都会将原来的 aof 变成 rdb,再向aof中写内容的时候 依然是 aof,这样在同一个持久化文件中 既有 aof,又有 rdb,这就是混合持久化
Redis服务器的内存是有限的,硬件一旦确定了,内存大小也就确定了。当不停的向内存中写数据时,就会出现内存写满的情况,此时,就需要使用到Redis的数据淘汰策略了。Redis中设计了多种数据的淘汰策略,详情如下:
volatile主要针对的是设置了过期时间的key,如果需要淘汰redis中的数据,那么这些设置了过期时间的key优先被淘汰,如果空间依然不足,那么才会在没有设置过期时间的key进行淘汰。
allkeys主要针对的是所有的键,不管有没有设置过期时间
LFU:根据访问频率来决定淘汰哪一个key,访问频率指的是在一定时间范围内的访问次数。
# 优先淘汰掉设置了过期时间的key,然后才淘汰掉使用的比较少的key,假设key没有设置过期时间,那么不会优先淘
# 汰,这种模式也是在开发中使用的比较多的一种缓存策略模式
#volatile-lru -> Evict using approximated LRU among the keys with an expire set.
# 对所有key通用,优先删除最近最少使用的Key
# allkeys-lru -> Evict any key using approximated LRU.
# Redis中存储的每一个key都有一个内部时钟,当key使用频率高时,内部时钟会递增,当key使用频率低时,内部时钟会
# 递减,该策略是淘汰设置了过期时间且内部时钟最小的key
# volatile-lfu -> Evict using approximated LFU among the keys with an expire set.
# Redis中存储的每一个key都有一个内部时钟,当key使用频率高时,内部时钟会递增,当key使用频率低时,内部时钟会
# 递减,该策略是淘汰所有key中内部时钟最小的key
# allkeys-lfu -> Evict any key using approximated LFU.
# 随机淘汰具有过期时间的key
# volatile-random -> Remove a random key among the ones with an expire set.
# 淘汰的是随机的key
# allkeys-random -> Remove a random key, any key.
# 根据key剩余的过期时间(ttl值)来进行数据淘汰, ttl值越小,越先被淘汰
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# 只要缓存满了,就不继续服务器里面的写请求,读请求是可以完成的,这种模式缓存里面的所有数据都不会丢失,但会导致
# 参与Redis的业务会失败
# noeviction -> Don't evict anything, just return an error on write operations.
maxmemory-policy volatile-lru #这个就是配置缓存的淘汰策略的
maxmemory <bytes> #这个是配置Redis的缓存的大小