11. 从零用Rust编写正反向代理, 实现TCP连接的健康检查

发布时间:2023年12月17日

wmproxy

wmproxy是由Rust编写,已实现http/https代理,socks5代理, 反向代理,静态文件服务器,内网穿透,配置热更新等, 后续将实现websocket代理等,同时会将实现过程分享出来, 感兴趣的可以一起造个轮子法

项目 ++wmproxy++

gite: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

健康检查的意义

健康检查维持着系统的稳定运行, 极大的加速着服务的响应时间, 并保证服务器不会把消息包转发到不能响应的服务器上, 从而使系统快速稳定的运转
在LINUX系统中,系统默认TCP建立连接超时时间为127秒。通常网络不可达或者网络连接被拒绝或者网络连接超时需要耗时的时长较长。此时会超成服务器的响应时间变长很多,而且重复发起不可达的连接尝试也在耗着大量的IO资源。
当健康检查介入后,如果短时间内多次建立连接失败,则暂时判定该地址不可达,状态设置为不可达。如果此时接收到该地址的请求时直接返回错误。大大提高了响应的时间。
所以健康检查是必不可少的存在。

如何实现

由于健康状态需要调用的地方可能在任意处需要发起连接的地方,如果通过参数透传也会涉及到多线程的数据共用,如Arc<Mutex<Data>>,取用的时候也是要通过锁共用,且编码的复杂度和理解成本急剧升高,所以此处健康检查选用的是多线程共用的静态处理变量。

Rust中的静态变量

在Rust中,全局变量可以分为两种:

  • 编译期初始化的全局变量
  • 运行期初始化的全局变量

编译期初始化的全局变量有:

const创建的常量,如 const MAX_ID:usize=usize::MAX/2;
static创建的静态变量,如 static mut REQUEST_RECV:usize=0;

运行期初始化的全局变量有lazy_static用于懒初始化。例如:

lazy_static! {
    static ref HEALTH_CHECK: RwLock<HealthCheck> = RwLock::new(HealthCheck::new(60, 3, 2));
}

此外还有

  • 实现你自己的运行时初始化:std::sync::Once + static mut T
  • 单线程运行时初始化的特殊情况:thread_local

我们此处维持一个HealthCheck的全局变量,因为程序是多线程,用thread_local,无法共用其它线程的检测,不条例预期,所以此处用读写锁来保证全局变量的正确性,读写锁的特点是允许存在多个读,但如果获取写必须保证唯一。

源码解析,暂时不做主动性的健康检查

接下来我们看HealthCheck的定义

pub struct HealthCheck {
    /// 健康检查的重置时间, 失败超过该时间会重新检查, 统一单位秒
    fail_timeout: usize,
    /// 最大失败次数, 一定时间内超过该次数认为不可访问
    max_fails: usize,
    /// 最小上线次数, 到达这个次数被认为存活
    min_rises: usize,
    /// 记录跟地址相关的信息
    health_map: HashMap<SocketAddr, HealthRecord>,
}

/// 每个SocketAddr的记录值
struct HealthRecord {
    /// 最后的记录时间
    last_record: Instant,
    /// 失败的恢复时间
    fail_timeout: Duration,
    /// 当前连续失败的次数
    fall_times: usize,
    /// 当前连续成功的次数
    rise_times: usize,
    /// 当前的状态
    failed: bool,
}

主要有最后记录时间,失败次数,成功次数,最大失败惩罚时间等元素组成

我们通过函数is_fall_down判定是否是异常状态,未检查前默认为正常状态,超出一定时间后,解除异常状态。

/// 检测状态是否能连接
pub fn is_fall_down(addr: &SocketAddr) -> bool {
    // 只读,获取读锁
    if let Ok(h) = HEALTH_CHECK.read() {
        if !h.health_map.contains_key(addr) {
            return false;
        }
        let value = h.health_map.get(&addr).unwrap();
        if Instant::now().duration_since(value.last_record) > value.fail_timeout {
            return false;
        }
        h.health_map[addr].failed
    } else {
        false
    }
}

如果连接TCP失败则调用add_fall_down将该地址失败连接次数+1,如果失败次数达到最大失败次数将状态置为不可用。

/// 失败时调用
pub fn add_fall_down(addr: SocketAddr) {
    // 需要写入,获取写入锁
    if let Ok(mut h) = HEALTH_CHECK.write() {
        if !h.health_map.contains_key(&addr) {
            let mut health = HealthRecord::new(h.fail_timeout);
            health.fall_times = 1;
            h.health_map.insert(addr, health);
        } else {
            let max_fails = h.max_fails;
            let value = h.health_map.get_mut(&addr).unwrap();
            // 超出最大的失败时长,重新计算状态
            if Instant::now().duration_since(value.last_record) > value.fail_timeout {
                value.clear_status();
            }
            value.last_record = Instant::now();
            value.fall_times += 1;
            value.rise_times = 0;

            if value.fall_times >= max_fails {
                value.failed = true;
            }
        }
    }
}

如果连接TCP成功则调用add_rise_up将该地址成功连接次数+1,如果成功次数达到最小次数将状态置为不可用。

/// 成功时调用
pub fn add_rise_up(addr: SocketAddr) {
    // 需要写入,获取写入锁
    if let Ok(mut h) = HEALTH_CHECK.write() {
        if !h.health_map.contains_key(&addr) {
            let mut health = HealthRecord::new(h.fail_timeout);
            health.rise_times = 1;
            h.health_map.insert(addr, health);
        } else {
            let min_rises = h.min_rises;
            let value = h.health_map.get_mut(&addr).unwrap();
            // 超出最大的失败时长,重新计算状态
            if Instant::now().duration_since(value.last_record) > value.fail_timeout {
                value.clear_status();
            }
            value.last_record = Instant::now();
            value.rise_times += 1;
            value.fall_times = 0;

            if value.rise_times >= min_rises {
                value.failed = false;
            }
        }
    }
}

接下来我们将TcpStream::connect函数统一替换成HealthCheck::connect外部修改几乎为0,可实现开启健康检查,后续还会有主动式的健康检查。

pub async fn connect<A>(addr: &A) -> io::Result<TcpStream>
    where
        A: ToSocketAddrs,
    {
        let addrs = addr.to_socket_addrs()?;
        let mut last_err = None;

        for addr in addrs {
            // 健康检查失败,直接返回错误
            if Self::is_fall_down(&addr) {
                last_err = Some(io::Error::new(io::ErrorKind::Other, "health check falldown"));
            } else {
                match TcpStream::connect(&addr).await {
                    Ok(stream) => 
                    {
                        Self::add_rise_up(addr);
                        return Ok(stream)
                    },
                    Err(e) => {
                        Self::add_fall_down(addr);
                        last_err = Some(e)
                    },
                }
            }
        }

        Err(last_err.unwrap_or_else(|| {
            io::Error::new(
                io::ErrorKind::InvalidInput,
                "could not resolve to any address",
            )
        }))
    }
效果

在前三次请求的时候,将花费5秒左右才抛出拒绝链接的错误

connect server Err(Os { code: 10061, kind: ConnectionRefused, message: "由于目标计算机积极拒绝,无
法连接。" })

可以发现三次之后,将会快速的抛出错误,达成健康检查的目标

connect server Err(Custom { kind: Other, error: "health check falldown" })

此时被动式的健康检查已完成,后续按需要的话将按需看是否实现主动式的健康检查。

文章来源:https://blog.csdn.net/w273732573/article/details/134977327
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。