??经过上个章节的学习,我们已经实现了一致性哈希算法,这个算法保证我们可以在节点发生变动时,最少的key请求受到影响,并返回这个节点的名称;这很大程度上避免了哈希雪崩和哈希穿透的问题。这个章节我们要基于此实现完整的服务器端在处理客户端请求时,内部如何进行选择节点,并从此节点中找到key-value。
前文链接
手撕分布式缓存之一 | 定义缓存结构体与实现底层功能函数
手撕分布式缓存之二 | 互斥锁的优化
手撕分布式缓存之三 | HTTP Server搭建
手撕分布式缓存之四 | 多节点的调取策略
由于战线拉的太长了,导致后面几个章节有点失去了热情,因此就不复现代码了,采用人工理解+AI注释的方式记录
??当我们有多个缓存节点时,请求key发送时应该发送给谁,例如Redis这种分布式缓存采用的方法均是:客户端发送请求时是随机发送的;接收的服务端也不一定就存有这个key-value,但他会先检查本地的缓存是否存有这个key-value,如果没有,服务器端会通过一致性hash算法计算应该去哪个节点上去查询,并去调用对应服务器端的查询接口,获取到数据后统一返回给客户端,不再让客户端去调取。
??当新的节点加入后,需要广播通知其他节点自己的存在,如果是非主从复制节点关系的情况下,由于一致性hash算法,原本需要查看节点B才可以获取到的key-value现在需要通过新增的节点A去获取,如果当时节点B的压力过大,那么可以在请求查询B查询不到时,通过节点B获取的key-value逐步的存储在节点A,以实现符合一致性hash的预期;如果当时节点B的压力并不大,那么可以直接通过计算,查询出节点B的哪些key现在会被定位到节点A,由节点B主动的将数据同步给节点A。如果是删除节点也是同理。
package geecache
import (
"errors"
"fmt"
"net/http"
"strings"
"github.com/gorilla/mux"
)
// httpGetter 结构体实现了 PeerGetter 接口,并使用 HTTP GET 方法从指定 URL 检索数据。
//如果没有错误,则返回字节数组;否则返回错误。
type httpGetter struct {
baseURL string // baseURL 是该对等机的基本 URL(协议 + IP地址 + 端口)
}
func (h *httpGetter) Get(key string) ([]byte, error) {
resp, err := http.Get(h.baseURL + "/cache/" + key) // 向给定的 URL 发出 GET 请求
if err != nil {
return nil, fmt.Errorf("HTTPPool: Error fetching %s from peer: %v", key, err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, errors.New("HTTPPool: Key not found")
} else if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("HTTPPool: Peer returned HTTP status code %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body) // 从响应中读取数据
if err != nil {
return nil, fmt.Errorf("HTTPPool: Error reading response body: %v", err)
}
return body, nil
}
// 这是一个简单的 Getter,用于检索来自对等机(通过 HTTP)的键值。如果未找到该键或发生错误,则返回错误。
// func (h *httpGetter) Get(key string) ([]byte, error) {
// 向给定 URL 发出 GET 请求 resp, err := http.Get(h.baseURL + "/cache/" + key)
三者的直接原因均是缓存中的key失效,请求只能通过查询DB才能够获取key-value,但是它们出现的场景和解决方式不同,这才是我们区分三者的方式。
??缓存击穿的本质问题是:当存在一个时刻存在key失效后,有大量的key请求同时发送,且都落在了DB上。 针对这一问题,我们无法限制客户端同时发送大量key这一问题,但是我们可以限制当请求没有在缓存阶段找到key-value时,只有一个请求可以落到DB上。例如:当有大量的请求进行访问,我们可以通过互斥锁的方式进行限制,比如我们先判断如果缓存中存在key-value,那么可以直接返回结果,也不会造成缓存击穿的影响;但如果在缓存中找不到对应的key-value,那么我们可以允许第一个请求此不存在的key进行接下来的操作(DB操作),其他的请求则需要进行等待,等到唯一的一个请求处理结束之后,该对应key的互斥锁会打开,之后等待的请求直接返回结果即可。也就是说我们可以声明一个map,这个map的key是缓存中的key,map的value是一个对象,这个对象不仅可以表示当前的key是否已经有请求进行访问(是不是已经被锁定),也可以存储获取此key的第一个请求获取到的value值或异常信息。
type call struct {
wg sync.WaitGroup // WaitGroup用于同步等待所有goroutine完成任务后再返回结果
val interface{} // 保存函数fn()的返回值
err error // 保存函数fn()的错误信息
}
type Group struct {
mu sync.Mutex // 互斥锁,保证map操作的原子性
m map[string]*call
}
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) { // 传入一个字符串类型的key和接收两个参数的函数指针fn,返回一个interface{}类型的值和error类型的错误信息
g.mu.Lock()
if g.m == nil { // 如果map为空则初始化map
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { // 判断是否已经有正在运行或者等待中的goroutine处理该任务
g.mu.Unlock() // 解锁以防止死锁
c.wg.Wait() // 等待任务完成后获取结果
return c.val, c.err // 返回结果
}
c := new(call)
c.wg.Add(1) // WaitGroup加1,表示新增一个需要等待的goroutine
g.m[key] = c // 将当前任务添加到map中,用于标记正在执行或者等待中的任务
g.mu.Unlock()
go func() {
defer c.wg.Done() // 函数执行完成后,WaitGroup减1
c.val, c.err = fn() // 执行传入的fn函数并保存值和错误信息
}()
g.mu.Lock()
delete(g.m, key) // 从map中删除已经完成的任务
g.mu.Unlock()
return c.val, c.err // 返回结果
}
简单看了下介绍,个人理解Protobuf是非常值得学习的一门技术,对于服务性能的优化有很大的作用,准备深入了解一下,然后再完善这一部分,感兴趣的同学可以留个眼,更新之后一一通知。