0x00 前言
gobreaker 实现了 Circuit Breaker pattern 模式的熔断机制。本篇文章简单分析下其实现。
Circuit Breaker 回顾
回顾下 Circuit Breaker 的状态机模型:即 3
种状态,4
种状态(变化)迁移,如下图:
Circuit Breaker 状态如下
- 熔断器关闭状态(StateClosed), 服务正常访问,不会影响用户请求
- 熔断器开启状态(StateOpen),服务异常,内部服务不可用
- 熔断器半开状态(StateHalfOpen),部分请求限流访问(恢复阶段,放行零星请求)
Circuit Breaker 的迁移流程如下:
- 在熔断器关闭状态下,当失败后并满足熔断预设阈值后,将直接转移为熔断器开启状态
- 在熔断器开启状态下,如果过了规定的时间,将进入半开启状态,验证目前服务是否可用
- 在熔断器半开启状态下,如果出现失败,则再次进入熔断器开启状态
- 在熔断器半开启后,所有请求(有限额)都是成功的,则熔断器关闭。关闭后所有请求将正常访问
状态迁移的条件判定
这里再简单列举下状态迁移时的边缘判定条件,需要包含如下几个维度的数据:
- (Breaker 迁移周期内)总请求数:
total_requests
- 失败请求数:
failed_requests
- 成功请求数:
succ_requests
-
熔断器打开到半打开状态的定时器:
timer1
-
熔断器关闭状态下,有请求到来
total_requests++
- 调用成功:
succ_requests++
- 调用失败:
failed_requests++
,若失败数(失败率)触发了预设规则(如:最近的故障数在给定的时间段内超过了指定的阈值),则熔断器将迁移到打开状态。同时,Breaker 启动一个超时计时器,当该计时器到期时,熔断器将进入半打开状态 - 注意:当发生了熔断器状态迁移时,需要清空当前(周期)的计数器
- 熔断器打开状态:来自应用程序的调用立即失败,不统计
- 熔断器半开状态:允许有限数量的请求通过并调用操作,同时统计结果
total_requests++
- 调用成功,
succ_requests++
,若成功数(成功率)超过了预设规则,则判断先前的故障已恢复,并且熔断器切换到关闭状态(同时重置计数器) - 调用失败,则熔断器将认为故障仍然存在,将熔断器恢复为打开状态,重新设置超时计时器
timer1
,以使系统有更多时间从故障中恢复
0x01 gobreaker 的实现 - 基础结构
完整的代码分析 在此。按照 熔断器的转换流程 来分析 gobreaker 实现。
熔断器状态
const (
StateClosed State = iota // 熔断器关闭 0
StateHalfOpen // 半开放
StateOpen // 打开
)
熔断器状态统计
如何判定是否要进行状态迁移呢?需要有个全局的统计结构 Counts
:
// Counts holds the numbers of requests and their successes/failures.
// CircuitBreaker clears the internal Counts either
// on the change of the state or at the closed-state intervals.
// Counts ignores the results of the requests sent before clearing.
type Counts struct {
Requests uint32 // 请求次数
TotalSuccesses uint32 // 总共成功次数
TotalFailures uint32 // 总共失败次数
ConsecutiveSuccesses uint32 // 连续成功次数
ConsecutiveFailures uint32 // 连续失败次数
}
func (c *Counts) onRequest() {
c.Requests++
}
func (c *Counts) onSuccess() {
c.TotalSuccesses++
c.ConsecutiveSuccesses++
c.ConsecutiveFailures = 0
}
func (c *Counts) onFailure() {
c.TotalFailures++
c.ConsecutiveFailures++
c.ConsecutiveSuccesses = 0
}
func (c *Counts) clear() {
c.Requests = 0
c.TotalSuccesses = 0
c.TotalFailures = 0
c.ConsecutiveSuccesses = 0
c.ConsecutiveFailures = 0
}
注意,Counts
的生效周期仅限于一个 Generation
周期内,有点类似于限流器的固定窗口机制。
统计周期 generation
//toNewGeneration: 生成新的 generation。 主要是清空 counts 和设置 expiry(过期时间)
//1. 当状态为 Closed 时 expiry 为 Closed 的过期时间(当前时间 + interval)
//2. 当状态为 Open 时 expiry 为 Open 的过期时间(当前时间 + timeout)
func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
cb.generation++
// 清空单个周期内的计数结构
cb.counts.clear()
var zero time.Time
switch cb.state {
// 当熔断器在 CLOSE 状态下
case StateClosed:
if cb.interval == 0 {
//defaultInterval
cb.expiry = zero
} else {
//
cb.expiry = now.Add(cb.interval)
}
case StateOpen:
cb.expiry = now.Add(cb.timeout)
default: // StateHalfOpen
cb.expiry = zero
}
}
0x02 gobreaker 的代码分析
熔断器外部接口
熔断器的执行 Execute
方法包含三个部分,参数 req
为封装的业务方法。该函数分为三步:
beforeRequest
:请求之前的判定(如:如果熔断器处于打开状态就直接返回)- 执行请求:服务的请求执行
afterRequest
:请求后的状态和计数的更新
// 核心执行函数 Execute: 该函数分为三步 beforeRequest、 执行请求、 afterRequest
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
generation, err := cb.beforeRequest()
if err != nil {
return nil, err
}
defer func() {
e := recover()
if e != nil {
cb.afterRequest(generation, false)
panic(e) //if panic,继续 panic 给上层调用者去 recover,有趣
}
}()
// 执行真正的用户调用
result, err := req()
// 调用后更新熔断器状态
cb.afterRequest(generation, cb.isSuccessful(err))
return result, err
}
步骤 1: beforeRequest
beforeRequest
的作用是判断是否放行请求,计数或达到切换新条件刚切换,主要功能都在 currentState
方法中;
- 判断是否
Closed
,如是,放行所有请求;并且判断时间是否达到Interval
周期,从而清空计数,调用toNewGeneration
进入新周期(清空计数); - 如果是
Open
状态,返回ErrOpenState
,—不放行所有请求;同样判断周期时间,到达则同样调用toNewGeneration
进入新周期 - 如果是
HalfOpen
状态,则判断是否已放行MaxRequests
个请求,如未达到则放行;否则返回:ErrTooManyRequests
,此函数一旦放行请求,就调用conut.onRequest
对请求计数加1
,进入请求逻辑
/*
beforeRequest 函数的核心功能:判断是否放行请求,计数或达到切换新条件刚切换。
1. 判断是否 Closed,如是,放行所有请求。
-- 并且判断时间是否达到 Interval 周期,从而清空计数,进入新周期,调用 toNewGeneration()
2. 如果是 Open 状态,返回 ErrOpenState,不放行所有请求。
-- 同样判断周期时间,到达则 同样调用 toNewGeneration(),清空计数
3. 如果是 half-open 状态,则判断是否已放行 MaxRequests 个请求,如未达到刚放行;否则返回: ErrTooManyRequests。
4. 此函数一旦放行请求,就会对请求计数加 1(conut.onRequest()),请求后到另一个关键函数 : afterRequest()。
*/
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
// 获取当前熔断器的状态和 generation
state, generation := cb.currentState(now)
if state == StateOpen {
// 如果熔断器处于打开状态,禁止请求,直接返回错误
return generation, ErrOpenState
} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
//half-open 状态 && 请求超量,也拒绝请求
return generation, ErrTooManyRequests
}
// 其他情况,放行请求,走到 afterRequest 逻辑
cb.counts.onRequest()
return generation, nil
}
步骤 2:req
req
方法: 获取请求结果,注意这里做了defer
,防止在req()
执行中异常退出。
// 核心执行函数 Execute: 该函数分为三步 beforeRequest、 执行请求、 afterRequest
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
//...
defer func() {
e := recover()
if e != nil {
cb.afterRequest(generation, false)
panic(e) //if panic,继续 panic 给上层调用者去 recover,有趣
}
}()
// 执行真正的用户调用
result, err := req()
//...
}
步骤 3:afterRequest
afterRequest
的作用是上一步业务请求的结果进行对成功 / 失败进行计数,达到条件则切换状态;注意参数 cb.afterRequest(generation, cb.isSuccessful(err))
中 isSuccessful
,默认只是判断是否为 nil
,在实际中这样是有问题的,我们需要指定进行熔断失败计数的错误类型,比如服务调用超时,服务不可达等,其他业务逻辑错误不应该作为熔断器失败的计数统计条件;
afterRequest
的逻辑是
- 调用公共函数
currentState(now)
,先判断是否进入一个新的计数时间周期Interval
, 是则重置计数,改变熔断器状态,并返回新周期的状态 - 熔断状态机计数统计更新
/*
函数核心内容很简单,就对成功 / 失败进行计数,达到条件则切换状态。
与 beforeRequest 一样,会调用公共函数 currentState(now)
currentState(now) 先判断是否进入一个先的计数时间周期 (Interval), 是则重置计数,改变熔断器状态,并返回新一代。
如果 request 耗时大于 Interval, 几本每次都会进入新的计数周期,熔断器就没什么意义了
*/
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if generation != before {
// 说明,在 currentState 已经更新了代数,直接返回吧
return
}
// 否则,说明还在同一代中,根据 err(是否为 nil,这里比较简单)更新计数
if success {
// 更新 succ 技数
cb.onSuccess(state, now)
} else {
// 更新错误计数
cb.onFailure(state, now)
}
}
当前状态的判定及迁移
currentState
方法承担了状态机的迁移工作。下一(阶段)状态的计算,是依据当前状态来的:
- 如果当前状态为关闭
StateClosed
,则通过周期判断toNewGeneration
是否进入下一个新周期(重置Couters
) - 如果当前状态为已开启
StateOpen
,则判断是否已经超时,超时就可以变更状态到半开;如果当前状态为关闭状态,则通过周期判断是否进入下一个周期 - 如果当前状态为半开启
StateHalfOpen
,则直接返回(后续根据结果计算再决定是否进行状态变迁)
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
switch cb.state {
case StateClosed:
if !cb.expiry.IsZero() && cb.expiry.Before(now) { // 是否需要进入下一个计数周期
cb.toNewGeneration(now)
}
case StateOpen:
if cb.expiry.Before(now) {
// 熔断器由开启变更为半开
cb.setState(StateHalfOpen, now)
}
}
return cb.state, cb.generation
}
迁移状态 setState
setState
方法是用来执行熔断状态机迁移的,每当设置新状态时,需要重置当前的 Generation
:
// 设置当前熔断器状态
func (cb *CircuitBreaker) setState(state State, now time.Time) {
if cb.state == state {
// 无需设置
return
}
prev := cb.state
cb.state = state
// 每当设置新状态时,需要重置当前的 generation
cb.toNewGeneration(now)
// 如果用户设置了状态变迁回调,那么就调用
if cb.onStateChange != nil {
cb.onStateChange(cb.name, prev, state)
}
}
在何处迁移状态
所以,参照前文 微服务基础之熔断保护(Breaker) 熔断状态机迁移的状态机,有下面几处调用了状态迁移方法:
1、动作 B
:当熔断器从关闭状态到打开状态时,由每次熔断器调用 Execute
中的 afterRequest
方法来检查并设置
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if generation != before {
// 说明,在 currentState 已经更新了代数,直接返回吧
return
}
// 否则,说明还在同一代中,根据 err(是否为 nil,这里比较简单)更新计数
if success {
// 更新 succ
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
}
}
// 调用失败情况下的处理
func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onFailure() // 失败计数 ++
if cb.readyToTrip(cb.counts) {
// 调用触发熔断器由关闭 => 打开的判断方法(可由用户传入,默认方法 defaultReadyToTrip 是连续的错误次数 > 5)
// 设置熔断器为打开状态
cb.setState(StateOpen, now)
}
case StateHalfOpen:
// 在 half-open 情况下,如果仍然调用失败,那么继续把熔断器设置为打开状态
cb.setState(StateOpen, now)
}
}
gobreaker 的默认逻辑是,在 StateClosed
关闭状态下, 当连续失败次数 >5
次时, 则切换到 StateOpen
打开状态,用户可以指定回调函数
2、动作 D
,熔断器从打开到半打开状态的迁移,在每个 generation
中都会触发的 currentState
方法中完成(基于时间跨度参数)
//currentState: 获取当前状态
//1、当 Closed 时且 expiry 过期,调用 toNewGeneration 生成新的 generation
//2、当 Open 时且 expiry 过期,设为 halfOpen
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
switch cb.state {
// 熔断器关闭时
case StateClosed:
if !cb.expiry.IsZero() /*cb.expiry 非 0 值 */ && cb.expiry.Before(now) /*cb.expiry 比 now 早,说明 cb.expiry 过期 */ {
// 需要重新生成一个周期
cb.toNewGeneration(now)
}
// 否则不需要
case StateOpen:
// 熔断器打开时
if cb.expiry.Before(now) {
// 如果打开时,cb.expiry 过期,那么熔断器需要进入 half-open 状态
// 注意:在此来完成从熔断器打开 => 熔断器半打开的触发逻辑!!!!!
cb.setState(StateHalfOpen, now)
}
}
return cb.state, cb.generation
}
在 StateOpen
状态,受参数 Settings.Timeout
控制,变迁到 StateHalfOpen
半开放状态
3、动作 F
,在 StateHalfOpen
半开放状态时,如果进行请求(探测)仍然失败,则继续切换到 StateOpen
熔断打开状态。逻辑在前面的 onFailure
方法中
4、动作 G
,在 StateHalfOpen
半开放状态时,如果(当前这代 counts 中)连续 succ 的数目 cb.counts.ConsecutiveSuccesses
超过 Settings.MaxRequests
,那么则重置当前熔断器的状态为 closed
func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onSuccess()
case StateHalfOpen:
// 在 half-open 状态下,如果(当前这代 counts 中)连续 succ 的数目超过 maxRequests,那么则重置当前熔断器的状态为 closed(关闭)
cb.counts.onSuccess()
if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
cb.setState(StateClosed, now)
}
// 这里不可能出现 stateOpen 状态
}
}
0x03 总结
gobreaker 的实现并不难理解,它使用了一个 Generation
计数周期的概念,每一个时间周期 Interval
的计数 count
状态称为一个 Generation
。在 beforeRequest
/afterRequest
的两个方法中,实现了两个状态自动切换的机制:
- 在同一个
Generation
周期内,计数满足状态切换条件,即自动切换 - 超过一个
Generation
时间周期的也会自动切换 gobreaker 并没有使用定时器,只在请求调用时,去检测当时状态与时间间隔来切换,这也是比较典型的技巧。此外,在beforeRequest
/afterRequest
中都强制加了互斥锁来确保并发安全
gobreaker 的缺点如下:
- 固定的时间窗口(相较于滑动窗口精准度欠缺),如果业务请求
Request
耗时大于Interval
, 那么每次都会进入新的计数周期,熔断器就没什么意义了(即如果用户请求时间过长,请求前后状态已改变,则对此次请求不进行计数) - 在半开放状态,没有提供回调方法来自定义状态变迁
- gobreaker 的各类参数都需要用户自行传入,配置的准确性还需要多次测试才能得出