为了演示多级缓存,这里先导入一个商品管理的案例,其中包含商品的CRUD功能。将来会给查询商品添加多级缓存。
后期做数据同步需要用到MySQL的主从功能,所以需要在虚拟机中,利用Docker来运行一个MySQL容器。
为了方便后期配置MySQL,我们先准备两个目录,用于挂载容器的数据和配置文件目录:
# 进入/docker_volume目录
cd /docker_volume
# 创建文件夹
mkdir mysql
# 进入mysql目录
cd mysql
进入mysql目录后,执行下面的Docker命令:
这里mysql容器版本需要自己根据自己的容器版本准备
docker run \
-p 3306:3306 \
--name mysql \
-v $PWD/conf:/etc/mysql/conf.d \
-v $PWD/logs:/logs \
-v $PWD/data:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=123 \
--privileged \
-itd \
mysql:5.7.25
在/docker_volume/mysql/conf目录添加一个my.cnf文件,作为mysql的配置文件: ?
# 创建文件
touch /docker_volume/mysql/conf/my.cnf
?文件的内容如下:
[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
server-id=1000
docker restart mysql
利用课前资料里面的sql文件,在idea里面连接mysql进行导入.
其中包含两张表:
tb_item:商品表,包含商品的基本信息
tb_item_stock:商品库存表,包含商品的库存信息
之所以将库存分离出来,是因为库存是更新比较频繁的信息,写操作较多。而其他信息修改的频率非常低。
使用给的资料里的工程进行导入.
?
?
?完整内容如下
#user nobody;
worker_processes 1;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
#tcp_nopush on;
keepalive_timeout 65;
# nginx的业务集群,nginx本地缓存,redis缓存,tomcat查询
upstream nginx-cluster{
server 192.168.150.101:8081;
}
server {
listen 80;
server_name localhost;
location /api {
proxy_pass http://nginx-cluster;
}
location / {
root html;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
分布式缓存和进程本地缓存的对比,进程缓存只能在本地,不能和别的tomcat共享。
示例
基于容量清理的是基于LRU策略,最近最少使用的。
/*
基于大小设置驱逐策略:
*/
@Test
void testEvictByNum() throws InterruptedException {
// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
// 设置缓存大小上限为 1
.maximumSize(1)
.build();
// 存数据
cache.put("gf1", "柳岩");
cache.put("gf2", "范冰冰");
cache.put("gf3", "迪丽热巴");
// 延迟10ms,给清理线程一点时间
Thread.sleep(10L);
// 获取数据
System.out.println("gf1: " + cache.getIfPresent("gf1"));
System.out.println("gf2: " + cache.getIfPresent("gf2"));
System.out.println("gf3: " + cache.getIfPresent("gf3"));
}
/*
基于时间设置驱逐策略:
*/
@Test
void testEvictByTime() throws InterruptedException {
// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
.expireAfterWrite(Duration.ofSeconds(1)) // 设置缓存有效期为 10 秒
.build();
// 存数据
cache.put("gf", "柳岩");
// 获取数据
System.out.println("gf: " + cache.getIfPresent("gf"));
// 休眠一会儿
Thread.sleep(1200L);
System.out.println("gf: " + cache.getIfPresent("gf"));
}
?
?这里的本地缓存真实点的场景是存点什么,商品数据这样存那么多机器很容易就遇到不一致了.
准备两个配置类
@Configuration
public class CaffeineConfig {
@Bean
public Cache<Long, Item> itemCache(){
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}
@Bean
public Cache<Long, ItemStock> stockCache(){
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}
}
?改造业务代码,这里要注入两个bean和使用现成的api在查询数据库前先查询本地缓存。
@Autowired
private Cache<Long,Item> itemCache;
@Autowired
private Cache<Long,ItemStock> stockCache;
@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id){
return itemCache.get(id,key->itemService.query() //这里key就是id,因为lamda表达式需要重新命名,不然会冲突。
.ne("status",3).eq("id",key)
.one()
);
}
@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id){
return stockCache.get(id,key->stockService.getById(key));
}
测试
第一次查询可以看见有sql语句查询了数据库
第二次查询就没有查询数据库了?,控制台一篇空白
要配置查询nginx缓存需要使用lua语言。
这玩意可以写外挂脚本,再配合修改器使用。并且redis里面也是支持lua的。
在ubuntu里面需要先安装Lua环境。?
sudo apt install Lua5.1
拉取镜像
docker pull openresty/openresty
启动
docker run --name openresty -p 80:80 -d openresty/openresty
复制配置文件
1.创建宿主机目录
mkdir /usr/local/openresty
cd /usr/local/openresty
# 存放nginx的配置文件
mkdir conf
# 存放lua脚本
mkdir lua
2、拷贝容器中nginx配置文件到宿主机目录
docker cp openresty:/usr/local/openresty/nginx/conf/nginx.conf /usr/local/openresty/conf/
# 拷贝lua库
docker cp openresty:/usr/local/openresty/lualib /usr/local/openresty/
删除容器,启动新容器
### 删除 openresty 容器
docker rm -f openresty
### 配置启动 openresty,配置自动启动
docker run -p 80:80 -p 8081:8081 \
--name openresty --restart always \
-v /usr/local/openresty/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf \
-v /etc/localtime:/etc/localtime \
openresty/openresty
# 或者修改启动端口,去掉自动启动,增加lua脚本映射目录
docker run --name openresty \
-v /usr/local/openresty/conf/nginx.conf:/usr/local/openresty/nginx/conf/nginx.conf \
-v /usr/local/openresty/lua/:/usr/local/openresty/nginx/lua \
-v /usr/local/openresty/lualib/:/usr/local/openresty/lualib \
-p 80:80 -p 8081:8081 -d openresty/openresty
?然后访问虚拟机的ip可以得到如下页面
nginx的默认配置文件注释太多,影响后续我们的编辑,这里将nginx.conf中的注释部分删除,保留有效部分。
修改`/usr/local/openresty/conf/nginx.conf`文件,内容如下:
#user nobody;
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
server {
listen 8081;
server_name localhost;
location / {
root html;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
上面已经配置好了?
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
location /api/item {
# 默认的响应类型
default_type application/json;
# 响应结果有lua/item.lua文件来决定
content_by_lua_file lua/item.lua;
}
?上面创建的时候已经创建过了,所以这里不用再创建了。
?测试成功
最终的配置文件
#user nobody;
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
underscores_in_headers on;#表示如果header name中包含下划线,则不忽略
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
server {
listen 8081;
server_name localhost;
location /api/item {
#默认的响应类型
default_type application/json;
#响应结果由lua/item.lua文件决定
content_by_lua_file lua/item.lua;
}
location / {
root html;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
?在配置文件里面修改如下
location ~ /api/item/(\d+)
然后修改lua文件
-- 获取路径参数
local id = ngx.var[1]
-- 返回结果
ngx.say('{"id":'..id..',"name":"SALSA AIR","title":"RIMOWA 29寸托运箱拉杆箱 SALSA AIR垃圾蛇 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}')
重新加载配置文件后?
测试成功,传过去的参数成功传递回来。
这里缓存的数据都要先查询tomcat获取,然后才能保存在缓存当中。这里openResty和tomcat不在同一个地址,windows电脑地址只要把虚拟机地址的最后一位改成1就一定是windows电脑的地址.
?这里内部发送的请求会被nginx自己捕获,然后要让nginx再次反向代理到tomcat所在ip和端口.
?????????
前面已经封装好了一个查询工具类。
这里要修改item.lua将请求转到common.lua.
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
-- 获取路径参数
local id = ngx.var[1]
-- 查询商品信息
local itemJSON = read_http("/item/"..id,nil)
--查询库存信息
local stockJSON = read_http("/item/stock/"..id,nil)
-- 返回结果
ngx.say(itemJSON)
?tmd,终于成功了.
虽然现在数据不全,接下来修改item.lua进行数据的拼接
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
--导入cjson库
local cjson= require('cjson')
-- 获取路径参数
local id = ngx.var[1]
-- 查询商品信息
local itemJSON = read_http("/item/"..id,nil)
--查询库存信息
local stockJSON = read_http("/item/stock/"..id,nil)
--JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
--组合数据
item.stock = stock.stock
item.sold = stock.sold
-- 把item序列化为json返回结果
ngx.say(cjson.encode(item))
然后现在库存也可以正常显示了.?
这里会有个问题,假如一个数据保存在8081的缓存里了,但是下一次访问到8082时就无法命中缓存。所以这里需要让同一个id每次都指向同一台tomcat。需要修改nginx的负载均衡算法。
#user nobody;
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
upstream tomcat-cluster{
hash $request_uri;
server 192.168.241.1:8081;
server 192.168.241.1:8082;
}
server {
listen 8081;
server_name localhost;
location /item {
proxy_pass http://tomcat-cluster;
}
location ~ /api/item/(\d+) {
#默认的响应类型
default_type application/json;
#响应结果由lua/item.lua文件决定
content_by_lua_file lua/item.lua;
}
location / {
root html;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
然后启动两台tomcat机器。
成功实现根据哈希值进行负债均很。
?
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
private static final ObjectMapper MAPPER=new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
//初始化缓存
//1.查询商品信息
List<Item> list = itemService.list();
//2.放入缓存
for(Item item:list){
//2.1item序列化为JSON
String json = MAPPER.writeValueAsString(item);
//2.2存入redis
redisTemplate.opsForValue().set("item:id:"+item.getId(),json);
}
//3.查询商品库存信息
List<ItemStock> stockList = stockService.list();
//4.放入缓存
for(ItemStock itemStock:stockList){
//2.1item序列化为JSON
String json = MAPPER.writeValueAsString(itemStock);
//2.2存入redis
redisTemplate.opsForValue().set("item:stock:id:"+itemStock.getId(),json);
}
}
}
?成功实现缓存预热
?最终common.lua变成如下,
有密码的要在获取一个连接成功之后确认密码。
-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000,1000,1000)
-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
end
end
-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
-- 获取一个连接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "连接redis失败 : ", err)
return nil
end
red:auth(password)
-- 查询redis
local resp, err = red:get(key)
-- 查询失败处理
if not resp then
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
end
close_redis(red)
return resp
end
-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
local resp = ngx.location.capture(path, {
method = ngx.HTTP_GET,
args = params,
})
if not resp then
-- 记录错误信息,返回404
ngx.log(ngx.ERR, "http查询失败, path: ", path, ", args: ", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法导出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
--导入cjson库
local cjson= require('cjson')
-- 封装查询函数
function read_data(key,path,params)
--查询redis
local resp = read_redis("8.134.198.34",6379,key)
--判断查询结果
if not resp then
ngx.log("redis查询失败,尝试去查询http,key:",key)
--redis 查询失败,去查询http
resp = read_http(path,params)
end
return resp
end
-- 获取路径参数
local id = ngx.var[1]
-- 查询商品信息
local itemJSON = read_data("item:id:"..id,"/item/"..id,nil)
--查询库存信息
local stockJSON = read_data("item:stock:id:"..id,"/item/stock/"..id,nil)
--JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
--组合数据
item.stock = stock.stock
item.sold = stock.sold
-- 把item序列化为json返回结果
ngx.say(cjson.encode(item))
成功实现关了后端后从自从redis缓存查数据
?
# 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m
lua_shared_dict item_cache 150m;?
?
?
成功item.lua代码
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
--导入cjson库
local cjson= require('cjson')
--导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache
-- 封装查询函数
function read_data(key,path,params)
-- 查询本地缓存
local val = item_cache:get(key)
if not val then
ngx.log(ngx.ERR,"本地缓存查询失败,尝试去查询redis,key:",key)
--查询redis
val = read_redis("127.0.0.1",6379,key)
--判断查询结果
if not val then
ngx.log(ngx.ERR,"redis查询失败,尝试去查询http,key:",key)
--redis 查询失败,去查询http
val = read_http(path,params)
end
end
-- 查询成功,把数据写入本地缓存
item_cache:set(key,val,expire)
-- 返回数据
return val
end
-- 获取路径参数
local id = ngx.var[1]
-- 查询商品信息
local itemJSON = read_data("item:id:"..id,1800,"/item/"..id,nil)
--查询库存信息
local stockJSON = read_data("item:stock:id:"..id,60,"/item/stock/"..id,nil)
--JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
--组合数据
item.stock = stock.stock
item.sold = stock.sold
-- 把item序列化为json返回结果
ngx.say(cjson.encode(item))
?
Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。
打开mysql容器挂载的日志文件my.cnf,我的在/docker_volume/mysql/conf
目录:
添加如下内容:
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=heima
配置解读:
log-bin=/var/lib/mysql/mysql-bin
:设置binary log文件的存放地址和文件名,叫做mysql-bin
binlog-do-db=heima
:指定对哪个database记录binary log events,这里记录heima这个库
最终文件
[mysqld]
skip-name-resolve
character_set_server=utf8
datadir=/var/lib/mysql
bind-address = 0.0.0.0
server-id=1000
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=heima
接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对heima这个库的操作权限。
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;
重启之后测试设置是否成功:在mysql控制台,或者Navicat中,输入命令:
show master status;
我们需要创建一个网络,将MySQL、Canal、MQ放到同一个Docker网络中:
docker network create heima
让mysql加入这个网络: ?
docker network connect heima mysql
拉取Canal镜像
docker pull canal/canal-server:v1.1.5
创建容器
docker run -p 11111:11111 --name canal \
-e canal.destinations=heima \
-e canal.instance.master.address=mysql:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=heima\\..* \
--network heima \
-d canal/canal-server:v1.1.5
?配置说明
-p 11111:11111
:这是canal的默认监听端口
-e canal.instance.master.address=mysql:3306
:数据库地址和端口,如果不知道mysql容器地址,可以通过docker inspect 容器id
来查看
-e canal.instance.dbUsername=canal
:数据库用户名
-e canal.instance.dbPassword=canal
:数据库密码
-e canal.instance.filter.regex=
:要监听的表名称,上面是监听了heima库下的所有表。
表名称监听支持的语法:
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打头的表:canal\\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2
使用docker logs -f canal查看运行日志。?
查看canal运行日志
docker exec -it canal bash
tail -f canal-server/logs/canal/canal.log
tail -f canal-server/logs/heima/heima.log
?
?
?
@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
}
?在redisHandler中增加两个方法
public void saveItem(Item item) {
try {
String json = MAPPER.writeValueAsString(item);
redisTemplate.opsForValue().set("item:id:"+item.getId(),json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void deleteItemById(Long id){
redisTemplate.delete("item:id:"+id);
}
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {
@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long,Item> itemCache;
@Override
public void insert(Item item) {
//写数据到JVM缓存
itemCache.put(item.getId(),item);
//写数据到redis
redisHandler.saveItem(item);
}
@Override
public void update(Item before, Item after) {
//写数据到JVM缓存
itemCache.put(after.getId(),after);
//写数据到redis
redisHandler.saveItem(after);
}
@Override
public void delete(Item item) {
//删除数据到JVM缓存
itemCache.invalidate(item.getId());
//删除数据到redis
redisHandler.deleteItemById(item.getId());
}
}
用已经准备好的静态资源页面
修改之后可以看见控制台输出
?到redis里面也可以看见修改后的数据