开源限流组件分析(二):Golang-time/rate 限速算法实现分析

分析 Golang 标准库提供的令牌桶限流器

Posted by pandaychen on April 5, 2020

0x00 前言

这篇文章来分析下标准库 time/rate 的使用及实现细节,此库同样基于令牌桶(Token Bucket)实现了限流。

0x01 time/rate 的使用

创建限流器

使用 NewLimiter(r Limit, b int) 创建限速器,令牌桶容量为 b。初始化状态下桶是满的,即桶里装有 b 个令牌,以后再以每秒往里面填充 r 个令牌。有两种特例:

  1. 允许声明容量为 0 的限速器,此时将以拒绝所有事件操作
  2. 就是 r == Inf 时,此时 b 参数将被忽略(const Inf = Limit(math.MaxFloat64)),即令牌桶无限大

限流判定

time/rate 库提供了三类方法(其中 AllowNReserveNWaitN 允许消费 n 个令牌):

  • Wait/WaitN:当没有可用或足够的 Token 时,将阻塞等待 Token 或者超时取消(推荐实际程序中使用这个方法)
  • Allow/AllowN:当没有可用或足够的 Token 时,返回 false
  • Reserve/ReserveN 当没有可用或足够的 Token 时,返回 Reservation 对象,和要等待多久才能获得足够的 Token(给用户的控制权是最多的)

注意 Wait 方法中的阻塞等待,因为令牌桶的实现是基于时间戳的(等的越久 Token 越多),Wait 会返回阻塞等待的时间跨度,在此之后就可以拿到足够的令牌,配和 context.Context 使用效果极好。

使用例子

以上使用例子 见此

0x02 令牌桶的本质

从上一篇文章 JuJu-Ratelimit 限速算法实现分析 的总结,令牌桶的实现本质就是利用了 Token 数可以和时间跨度相互转化 的原理。需要有如下关键信息:

  • 生产 Token 令牌的速率:一秒钟可以产生多少 Token(生产一个 Token 需要多长时间单位),记为 $p$(1s
  • Token 令牌桶的大小 $Bucket_{size}$

基于上面这两个基础信息,容易得到:

  1. 生成 $N$ 个新的 Token 一共需要的时间单位:$\frac{N}{p}*1s$
  2. 给定一段时长 $Duration$,这段时间一共可以生成多少个 Token,$\frac{Duration}{1s}*p$

0x03 分析

同 juju-ratelimit 的实现一样,在 timer/rate 实现中, 并没有单独维护一个 Timer,而是采用了 lazyload 的方式,直到每次消费之前才根据时间差计算并更新 Token 数目,而且也不是用 BlockingQueue 来存放 Token,而是仅仅通过计数的方式来实现时间与令牌的换算。

注意上面这句话: 直到每次消费之时根据时间差(本次减去上次保存的时间戳位置)计算 Token 数目 ,所以令牌桶的实现是严格依赖于时间的准确性的。

基础结构

Limit 为令牌桶的定义,在单机限流实践中通常定义为一个全局对象:

type Limit float64

type Limiter struct {
	// 初始化 NewLimiter 传入的两个值:limit 和 burst
	limit Limit	// 每秒中生产 token 的个数
	burst int // 桶的总大小

	mu     sync.Mutex
	tokens float64 // 桶中目前剩余的 token 数目,可以为负数(负数表示有些原子请求了数目过大的令牌)
	// last is the last time the limiter's tokens field was updated
	last time.Time
	// lastEvent is the latest time of a rate-limited event (past or future)
	lastEvent time.Time
}

基础接口

1、durationFromTokens 方法,计算生成 N 个新的 Token 一共需要的时间,即上一节的 $\frac{N}{p}*1s$,注意这里转为 time.Nanosecond 为单位了,颗粒度极小。而 limit 本身就是 float64 类型

// durationFromTokens is a unit conversion function from the number of tokens to the duration
// of time it takes to accumulate them at a rate of limit tokens per second.
// 将 token 转化为所需等待时间
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
	seconds := tokens / float64(limit)
	return time.Nanosecond * time.Duration(1e9*seconds)
}

2、tokensFromDuration 方法,用于计算给定一段时长 time.Duration,这段时长内一共可以生成多少个令牌 Token,即上一节的 $\frac{Duration}{1s}*p$。注意这里的除法可能导致的 精度丢失问题

// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	// Split the integer and fractional parts ourself to minimize rounding errors.
	// See golang.org/issues/34861.
	// 如果是用 d.Seconds() * float64(limit), 因为 d.Seconds 是 float64 的。因此会造成精度的损失。
	// time.Duration 是 int64 类型的,表示纳秒
	// time.Second
	sec := float64(d/time.Second) * float64(limit)
	nsec := float64(d%time.Second) * float64(limit)
	return sec + nsec/1e9
}

3、Every 方法,提供时间对令牌的转换接口

// Every converts a minimum time interval between events to a Limit.
// 可以将时间转化为速率
// 例如:每 5 秒一个,转化为速率就是 0.2 一秒
func Every(interval time.Duration) Limit {
	if interval <= 0 {
		return Inf
	}
	return 1 / Limit(interval.Seconds())
}

3、advance 方法:传入参数 now 为当前时间,该方法是获取到 now 为止,可用的 Token 的令牌个数(根据上面两个基础方法计算得到): 需要的关键参数:

  • 令牌桶结构中保存了 上一次原子操作成功获取令牌 操作的时间 lim.last
  • 传入参数为当前时间 now time.Time

上面两个时间跨度数据相减,就拿到了 两个相邻的原子操作之间,一共产生的令牌数目,再和令牌桶的 size 做比较(取最小值),就得到了最终可用的 Token 个数

// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// @param now
// @return newNow 似乎还是这个 now,没变
// @return newLast 如果 last > now, 则 last 为 now
// @return newTokens 当前桶中应有的数目
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
	// last 代表上一个取的时候的时间
	last := lim.last
	if now.Before(last) {
		last = now
	}

	// Avoid making delta overflow below when last is very old.
	// maxElapsed 表示:将 Token 桶填满需要多久
	// 为什么要拆分两步做,是为了防止后面的 delta 溢出
	// 因为默认情况下,last 为 0,此时 delta 算出来的,会非常大
	maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)

	// elapsed 表示从当前到上次一共过去了多久
	// 当然了,elapsed 不能大于将桶填满的时间
	elapsed := now.Sub(last)
	if elapsed > maxElapsed {
		elapsed = maxElapsed
	}

	// Calculate the new number of tokens, due to time that passed.
	// 计算下过去这段时间,一共产生了多少 token
	delta := lim.limit.tokensFromDuration(elapsed)

	// token 取 burst 最大值,因为显然 token 数不能大于桶容量
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		//burst为令牌桶的设计容量,tokens不能超过此值
		tokens = burst
	}

	return now, last, tokens
}

0x04 reserveN 方法

Token 的消费方式有如下 3 种。在内部实现,最终都调用了 reserveN 函数来生成和消费 Token:

  • Wait/WaitN
  • Allow/AllowN
  • Reserve/ReserveN
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
//
// @param now 当前消费的时间
// @param n 要消费的 token 数量
// @param maxFutureReserve 愿意等待的最长时间
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()

	// 如果没有限制
	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}

	// 通过 advance 拿到 now 到 lim.last 之间跨度一共可用的 token 数(<= 令牌桶个数)
	now, last, tokens := lim.advance(now)

	// Calculate the remaining number of tokens resulting from the request.
	// 看下取完之后,桶还能剩能下多少 token
	tokens -= float64(n)

	// Calculate the wait duration
	// 如果 token < 0, 说明目前的 token 不够,需要等待一段时间
	var waitDuration time.Duration
	if tokens < 0 {
		// durationFromTokens:将 tokens 转为时间
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	// n<=lim.burst :申请的 token 是否超过了桶的大小
	// waitDuration <= maxFutureReserve:需要等待的时间是否小于用户期望的时间
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}

	// timeToAct 表示当桶中满足 token 数目等于 n 的时间
	if ok {
		r.tokens = n	//n(用户传入)
		r.timeToAct = now.Add(waitDuration)
		//r.timeToAct 表示用于需要等待到这个时刻才能获得期望大小的 token 数目(当然 waitDuration 有可能为 0,就是立即满足)
	}

	// Update state
	// 更新桶里面的 token 数目
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		// 不满足,只更新 last,last 的规则在 advance 方法中
		lim.last = last
	}

	lim.mu.Unlock()
	// 将 Reservation 对象返回
	return r
}

0x05 (消费)接口分析

基于 reserveN 的实现,本小节看下对外接口对其的调用方式:

Allow 系列

AllowAllowN 的实现,最终调用了 reserveN(now, 1, 0).okreserveN(now, n, 0).ok,这两个方法只需要 ok 这个结果(即 Token 令牌拿到与否),比较直观:

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
	// 传入 time.Now()
	return lim.AllowN(time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(now, n, 0).ok
}

Wait 系列

Reserve 系列

0x06 Reservation 结构

// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
	ok        bool
	lim       *Limiter
	tokens    int
	timeToAct time.Time
	// This is the Limit at reservation time, it can change later.
	limit Limit
}

// OK returns whether the limiter can provide the requested number of tokens
// within the maximum wait time.  If OK is false, Delay returns InfDuration, and
// Cancel does nothing.
func (r *Reservation) OK() bool {
	return r.ok
}

// Delay is shorthand for DelayFrom(time.Now()).
func (r *Reservation) Delay() time.Duration {
	return r.DelayFrom(time.Now())
}

// InfDuration is the duration returned by Delay when a Reservation is not OK.
const InfDuration = time.Duration(1<<63 - 1)

// DelayFrom returns the duration for which the reservation holder must wait
// before taking the reserved action.  Zero duration means act immediately.
// InfDuration means the limiter cannot grant the tokens requested in this
// Reservation within the maximum wait time.
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
	if !r.ok {
		return InfDuration
	}
	delay := r.timeToAct.Sub(now)
	if delay < 0 {
		return 0
	}
	return delay
}

// Cancel is shorthand for CancelAt(time.Now()).
func (r *Reservation) Cancel() {
	r.CancelAt(time.Now())
	return
}

// CancelAt indicates that the reservation holder will not perform the reserved action
// and reverses the effects of this Reservation on the rate limit as much as possible,
// considering that other reservations may have already been made.
func (r *Reservation) CancelAt(now time.Time) {
	if !r.ok {
		return
	}

	r.lim.mu.Lock()
	defer r.lim.mu.Unlock()

	if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
		return
	}

	// calculate tokens to restore
	// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
	// after r was obtained. These tokens should not be restored.
	// 为什么新分配的就不算呢?
	// 因为可以 cancel 表示该 Event 尚未发生,如果已经发生,则在前面的 if 分支就 return 了;
	// 那么后面继续申请的 Event.timeToAct 必定大于当前的 r.timeToAct,也是预支的;
	// 那么归还当前的 token 时,需要把已经预支的那部分除去,因为已经算是预消费了,不能再给后面申请的 Event 使用
	restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))

	// 当小于 0,表示已经都预支完了,不能归还了
	if restoreTokens <= 0 {
		return
	}
	// advance time to now
	now, _, tokens := r.lim.advance(now)
	// calculate new number of tokens
	tokens += restoreTokens
	if burst := float64(r.lim.burst); tokens > burst {
		tokens = burst
	}

	// update state
	r.lim.last = now // 这一点也很关键
	r.lim.tokens = tokens

	// 如果都相等,说明跟没消费一样。直接还原成上次的状态吧
	if r.timeToAct == r.lim.lastEvent {
		prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
		if !prevEvent.Before(now) {
			r.lim.lastEvent = prevEvent
		}
	}

	return
}

0x07 细节:x/time/rate的Bug

bug复现

time/rate在并发goroutine运行下存在bug,参考关联issue,版本go version go1.17 linux/amd64,参考如下代码:

func main() {
        var succCount, failCount int64
        //limit := rate.Every(100 * time.Millisecond)
        //burst := 1
        limiter := rate.NewLimiter(10, 1)
        start := time.Now()
        for i := 0; i < 5000; i++ {
                go func() {
                        for {
                                if limiter.Allow() {
                                        atomic.AddInt64(&succCount, 1)
                                } else {
                                        atomic.AddInt64(&failCount, 1)
                                }
                        }
                }()
        }

        time.Sleep(2 * time.Second)
        elapsed := time.Since(start)
        fmt.Println("elapsed=", elapsed, "succCount=", atomic.LoadInt64(&succCount), "failCount=", atomic.LoadInt64(&failCount))
}

上述这段代码的输出如下,未达到限流目的:

[root@VM_120_245_centos ~]# ./rate 
elapsed= 2.009389403s succCount= 27905 failCount= 4738594

bug原因

回到上面的advance方法及其返回参数(获取nowlim.last之间的令牌数,不超过burst令牌桶的大小):

// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
	last := lim.last
	if now.Before(last) {
		last = now
	}

	// Avoid making delta overflow below when last is very old.
	maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
	elapsed := now.Sub(last)
	if elapsed > maxElapsed {
		elapsed = maxElapsed
	}

	// Calculate the new number of tokens, due to time that passed.
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}

	//2023-05-12 18:47:59.119059031 +0800 CST m=+1.003917034 2023-05-12 18:47:58.115177368 +0800 CST m=+0.000035321 1
	return now, last, tokens
}
  • newNow:直接返回了未修改参数,貌似无意义
  • newLast:是返回上次tokens被更新的时间点,如果当前传入的时间点是在上次更新的时间点之前的话同样会返回当前传入的时间点;
  • tokens: 返回根据当前的时间点与上次更新的时间点之间的流逝时间转换成token数量,为float类型

再列举下上面的reserveN方法:

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
	return lim.AllowN(time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(now, n, 0/*参数为0*/).ok
}

// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
    
    // 判断limit是否为无限大,直接返回ok
	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
    // 通过advance函数获取到now这个时间点可以用的token数量
	now, last, tokens := lim.advance(now)
    
	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst/*请求的不能超过桶大小*/ && waitDuration <= maxFutureReserve /*最终转换为时间进行比较*/

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}
    // 更新状态这里,ok为true时就更新当前的时间点以及需要更新的字段,为false不需要更新
	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

Limiter.last结构的定义看,last是tokens字段更新的时间点,获取token成功时候更新,不成功的时候不应该更新,所以,修复此BUG需要把lim.last = last这段代码移除即可。

bug解决

新版本库已经修复了这个问题

0x08 参考

转载请注明出处,本文采用 CC4.0 协议授权