微服务先行者Martin Fowler与James Lewis在文章microservices中指出了微服务的九大特征,其中一个便是容错性设计(Design for failure)。正如文章中提到的,微服务相对于单体服务而言,不同服务之间的通信是经过网络完成的,上下游服务间调用时,下游服务可能随时处于不可用状态(比如崩溃,达到服务最大处理能力等等原因)。
由此会引发一个问题,一个服务点的错误经过层层传递,最终会波及到调用链上的所有服务,这便是雪崩效应,因此如何防止雪崩效应便是微服务架构容错性设计原则的具体实践,否则服务化程度越高,整个系统反而越不稳定。
在实践中有很多容错方案,诸如故障转移、快速失败(服务熔断)、安全失败、沉默失败、故障恢复、负载均衡、重试、限流、服务降级、舱壁隔离等等。这些方案分别从事前(负载均衡、限流、舱壁隔离、服务降级),事中(故障转移、快速失败、安全失败、沉默失败、重试),事后(故障恢复)三个节点提高整个系统的稳定性。
PS:服务降级归到事前,主要是因为服务降级大多数情况下不是在出现错误后才被执行的,在许多场景中,所说的服务降级更多的是指需要主动使服务进入降级逻辑的情况,比如电商预见双11流量高峰、游戏停服更新等。
服务熔断策略方案来源于生活中的电路保险丝,电路保险丝遵循一家一个的原则,当该家庭电流增大到一定数值时,其自身熔断而切断电路,保护电视机、冰箱等电器,并不会影响其他家庭的用电。
同理,可推理到微服务之间的网络调用。
如图,当服务C出现异常构,服务B很快会检测到服务C不可用(服务C接口超时或错误等指标满足不可用判定条件),此时服务B不在将请求转发到服务C,而是快速返回错误信息(快速失败)。在一段时间内的后续请求就一直返回失败,稍后当检测到服务C接口调用响应正常后,就会恢复到正常状态。
断路器模式是实现熔断策略的具体方案,其本质是接管微服务之间的远程调用请求,断路器会持续监控并统计被调用服务接口返回成功、失败、超时、拒绝等各种结果的指标,当某一个指标满足预设阈值时,断路器就会进入开启状态,后续相应的远程调用请求就会快速返回错误信息,而不会真的对被调用服务发起请求。若干时间后断路器会进入半打开状态,此时断路器会放行一次请求,如果请求正常,则断路器进入关闭状态,否则转入开启状态。
从上面描述来看,断路器是一种有限状态机:
断路器进入半打开状态在实现时并不需要计时器,而是收到请求时检测下是否满足半打开状态(一般是将断路器开启时间与当前时间做比较),是的话就放行该次请求,否则快速返回错误信息。
断路器工作时序图如下:
hystrix-go是作者从JAVA Netflix的子项目Hystrix翻译过来的,很经典的断路器项目。
hystrix-go 调用接口有两个:
func Do(name string, run runFunc, fallback fallbackFunc)
func Go(name string, run runFunc, fallback fallbackFunc)
hystrix-go配置项:
// CommandConfig is used to tune circuit settings at runtime
type CommandConfig struct {
Timeout int `json:"timeout"`
MaxConcurrentRequests int `json:"max_concurrent_requests"`
RequestVolumeThreshold int `json:"request_volume_threshold"`
SleepWindow int `json:"sleep_window"`
ErrorPercentThreshold int `json:"error_percent_threshold"`
}
直接上代码。
import (
"errors"
"fmt"
"github.com/afex/hystrix-go/hystrix"
"time"
)
var (
global error
times int
)
//模拟远程请求
func mockHttp() error {
times++
fmt.Println(times)
if global != nil {
return nil
}
time.Sleep(2 * time.Second)
return errors.New("业务出错")
}
const breakFlag = "testBreaker"
func main() {
hystrix.ConfigureCommand(breakFlag, hystrix.CommandConfig{
Timeout: 1000,
MaxConcurrentRequests: 50,
ErrorPercentThreshold: 25,
RequestVolumeThreshold: 4,
SleepWindow: 1000,
})
//hystrix.SetLogger() //打印断流器内部日志
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond * 400) //给熔断器重试服务时机
_ = hystrix.Do(breakFlag, func() error {
return mockHttp()
}, func(err error) error { //不发生错误不会进入该逻辑的
if err != nil {
fmt.Printf("times:%d,断路器检测到错误:%s\n", times, err.Error())
} else {
fmt.Printf("times:%d,断路器恢复正常", times)
}
global = err
return nil
})
}
fmt.Println("times:", times)
}
输出如下:
1
times:1,断路器检测到错误:hystrix: timeout
2
3
4
times:4,断路器检测到错误:hystrix: circuit open
times:4,断路器检测到错误:hystrix: circuit open
times:4,断路器检测到错误:hystrix: circuit open
5
6
7
times: 7
分析:
可以看到真正发出的请求是7次,3次是被快速失败了
Do和Go两个API最终都会进入GoC函数
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
cmd := &command{
run: run,
fallback: fallback,
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
}
circuit, _, err := GetCircuit(name)//获取指标统计器
if err != nil {
cmd.errChan <- err
return cmd.errChan
}
cmd.circuit = circuit
ticketCond := sync.NewCond(cmd)
ticketChecked := false
returnTicket := func() {
cmd.Lock()
// Avoid releasing before a ticket is acquired.
for !ticketChecked {
ticketCond.Wait()
}
cmd.circuit.executorPool.Return(cmd.ticket)//执行完之后归还请求令牌
cmd.Unlock()
}
// Shared by the following two goroutines. It ensures only the faster
// goroutine runs errWithFallback() and reportAllEvent().
returnOnce := &sync.Once{}
reportAllEvent := func() {
err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)//上报此次请求时正常还是异常,便于后续进行指标统计
if err != nil {
log.Printf(err.Error())
}
}
go func() {
defer func() { cmd.finished <- true }()
if !cmd.circuit.AllowRequest() {//统计指标,决定开启、半开启、关闭三个状态的流转
cmd.Lock()
// It's safe for another goroutine to go ahead releasing a nil ticket.
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrCircuitOpen)//上报断路器处于开启状态的错误,不过该错误不会被纳入接口错误指标
reportAllEvent()
})
return
}
cmd.Lock()
select {
case cmd.ticket = <-circuit.executorPool.Tickets://获取一个请求令牌
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
default: //没有令牌,就表示请求达到并发限制MaxConcurrentRequests配置的值,上报ErrMaxConcurrency错误
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrMaxConcurrency)
reportAllEvent()
})
return
}
runStart := time.Now()
runErr := run(ctx) //没有达到限流就发起请求
returnOnce.Do(func() {
defer reportAllEvent()
cmd.runDuration = time.Since(runStart)
returnTicket()
if runErr != nil {
cmd.errorWithFallback(ctx, runErr) //出错就上报业务接口的错误
return
}
cmd.reportEvent("success")//表示请求成功
})
}()
go func() {
timer := time.NewTimer(getSettings(name).Timeout)//根据Timeout配置起一个定时器
defer timer.Stop()
select {
case <-cmd.finished: //请求执行完毕
// returnOnce has been executed in another goroutine
case <-ctx.Done(): //收集context上下文错误
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ctx.Err())
reportAllEvent()
})
return
case <-timer.C: //标识服务接口超时,上报ErrTimeout错误
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrTimeout)
reportAllEvent()
})
return
}
}()
return cmd.errChan
}
进入开启状态
func (circuit *CircuitBreaker) AllowRequest() bool {
return !circuit.IsOpen() || circuit.allowSingleTest()
}
//判断断路器处于关闭状态还是开启状态
func (circuit *CircuitBreaker) IsOpen() bool {
circuit.mutex.RLock()
o := circuit.forceOpen || circuit.open
circuit.mutex.RUnlock()
if o {
return true
}
if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
return false
}
if !circuit.metrics.IsHealthy(time.Now()) {//计算10s内错误请求百分比
// too many failures, open the circuit
circuit.setOpen() //断路器状态为开启状态
return true
}
return false
}
//circuit.metrics.Requests().Sum方法,这里可以看到统计指标的窗口是10s
func (r *Number) Sum(now time.Time) float64 {
sum := float64(0)
r.Mutex.RLock()
defer r.Mutex.RUnlock()
for timestamp, bucket := range r.Buckets {
// TODO: configurable rolling window
if timestamp >= now.Unix()-10 {
sum += bucket.Value
}
}
return sum
}
断路器半开启状态判断
func (circuit *CircuitBreaker) allowSingleTest() bool {
circuit.mutex.RLock()
defer circuit.mutex.RUnlock()
now := time.Now().UnixNano()
openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
//如果断路器处于开启状态,且当前时间>断路器开启时间+SleepWindow配置,精确到纳秒,则进入半开启状态
if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now)
if swapped {
log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name)
}
return swapped
}
return false
}
恢复为关闭状态
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
if len(eventTypes) == 0 {
return fmt.Errorf("no event types sent for metrics")
}
circuit.mutex.RLock()
o := circuit.open
circuit.mutex.RUnlock()
if eventTypes[0] == "success" && o {//此次请求成功,且断路器处于开启状态,则将断路器转为关闭状态
circuit.setClose()
}
//省略代码...
return nil
}