Kratos 源码分析:理解 Kratos 的数据统计类型 Metrics(一)

分析 Kratos 框架中的 Metrics:滑动窗口

Posted by pandaychen on April 12, 2020

0x00 前言

这篇文章,分析下 Kratos 项目中的核心统计结构 Metrics 及滑动窗口 Window 的实现。 Metrics 和 Window 的用途如下:

  • Metrics:在数据库操作指标量化(如延迟,错误,调用次数等)、接口指标量化使用较为广泛
  • Window:在限流器、熔断器、负载均衡算法判定等需要计算比率的地方使用较多,主要解决单个指标计算的平滑性,Window 的实例化结构就是 RollingXX 系列,如 RollingCounter、RollingGauge 等

0x01 滑动窗口的结构

和 Hystrix-Go 实现不同,Kratos 中的滑动窗口是通过单链表结构实现的。如下图所示: image

下面看看 Window 的数据结构及封装的主要操作方法:

Window 中的单元节点:Bucket

Bucket 提供了 Append 方法,用于向 Points 中添加数据,Points 是 float64 类型的 slice,主要存放单个指标的值,如延迟,错误次数等等

type Bucket struct {
	Points []float64 // 单个节点中的统计数据(数组)
	Count  int64     // 总数
	next   *Bucket   // 链表实现
}

Bucket 提供了两种数值添加的接口:AppendAdd,这两个方法会被上层 Window 的方法调用:

  • Append 是在 Bucket 的 Points 数组中直接追加数据:
// Append appends the given value to the bucket.
func (b *Bucket) Append(val float64) {
	b.Points = append(b.Points, val)
	b.Count++
}
  • Add 是在 Bucket 的 Points 数组中的指定 index 位置累加值:
// Add adds the given value to the point.
// 给 []float64 指定位置累加值
func (b *Bucket) Add(offset int, val float64) {
	b.Points[offset] += val
	b.Count++
}

Bucket 的遍历方式:

// Next returns the next bucket.
// 遍历链表用(返回当前 node 的下一个 node)
func (b *Bucket) Next() *Bucket {
	return b.next
}

Window 结构及方法

Window 的结构,就是 Bucket 组成的 slice:

type Window struct {
	window []Bucket // 滑动窗口实现
	size   int
}

Window 的初始化逻辑,这里有个细节是将 Bucket 初始化为环形数组(RingQueue): image

// NewWindow creates a new Window based on WindowOpts.
// 初始化滑动窗口
func NewWindow(opts WindowOpts) *Window {
	buckets := make([]Bucket, opts.Size)
	for offset := range buckets {
		// 初始化每个 bucket
		buckets[offset] = Bucket{Points: make([]float64, 0)}
		nextOffset := offset + 1
		if nextOffset == opts.Size {
			// 构建一个 queue(环)
			nextOffset = 0
		}

		// 初始化为一个链表(首尾相接)
		buckets[offset].next = &buckets[nextOffset]
	}
	return &Window{window: buckets, size: opts.Size}
}

同样的,Window 的数值添加方法也只是对 Bucket 提供接口的进一步封装:

  • Window 的 Append 方法:向指定的偏移 offset(位于 offset 位置的 Bucket)添加值
// Append appends the given value to the bucket where index equals the given offset.
func (w *Window) Append(offset int, val float64) {
	// 调用了 Bucket 的 Append 方法
	w.window[offset].Append(val)
}
  • Window 的 Add 方法:向指定的偏移 offset 的 0 号位置累加值(和 Append 有稍许不同)
// Add adds the given value to the latest point within bucket where index equals the given offset.
func (w *Window) Add(offset int, val float64) {
	if w.window[offset].Count == 0 {
		// 如果 bucket 是空的(没有统计值),直接 Append
		w.window[offset].Append(val)
		return
	}
	// bucket 非空,在 Point[] 的 0 号位置累加值
	w.window[offset].Add(0, val)
}
  • Window 的迭代器生成
// 需要提供传入的 offset 和 count
func (w *Window) Iterator(offset int, count int) Iterator {
	// 构建 window 的 Iterator(迭代器),方便统计和遍历
	return Iterator{
		count: count,
		cur:   &w.window[offset],
	}
}

0x02 window 的迭代(遍历)器实现

Window 提供了 iterator 的封装,用于滑动窗口的遍历。遍历的目的是为了对窗口的数据做提取和计算;比如,计算截至当前时间滑动窗口的请求失败率,就需要遍历从窗口 start 位置到目前时间的所有 Bucket 的 $\frac{错误总数}{请求总数}$。

遍历器的结构如下:

// Iterator iterates the buckets within the window.
type Iterator struct {
	count         int // 遍历完成的条件(i.count != i.iteratedCount)
	iteratedCount int
	cur           *Bucket // 当前迭代器的位置
}

下面 Next 方法,定义了遍历器的退出条件:遍历完 count 个 Bucket(窗口)后完成,其中 iteratedCount 是计数器

// 这里很重要,Iter 的迭代规则是,移动 count 的次数 == 当前移动次数
// Next returns true util all of the buckets has been iterated.
func (i *Iterator) Next() bool {
	return i.count != i.iteratedCount
}

Bucket 方法,获取当前的 Bucket,并且把指针指向下一个 Bucket

// Bucket gets current bucket.
func (i *Iterator) Bucket() Bucket {
	if !(i.Next()) {
		panic(fmt.Errorf("stat/metric: iteration out of range iteratedCount: %d count: %d", i.iteratedCount, i.count))
	}
	bucket := *i.cur
	i.iteratedCount++	// 累加计数器
	i.cur = i.cur.Next()
	return bucket
}

0x03 滑动窗口 window + 时间跨度

在项目中,如何在滑动窗口中加入时间跨度,用来实现滑动窗口结构的实例化?答案就是 Rolling 结构。如下图,一个 Bucket 代表 500ms,一个滑动窗口占据 2 个 Bucket。这是一个非常精妙的数据结构,表现在以下几点:

  1. 采集到的指标的时效性非常强,系统只需要采集最近一小段时间内关注的 Metrics 即可,对于较老的数据,会自动丢弃
  2. 如下图展示的的滑动窗口。整个滑动窗口用来保存最近 1s 的采样数据,每个小的桶用来保存 500ms 的采样数据。当时间流动之后,过期的桶会自动被新桶的数据覆盖掉,在图中,在 1000-1500ms 时,Bucket-1 的数据因为过期而被丢弃,之后 Bucket-3 的数据填到了窗口的头部。(由于在实现上滑动窗口被构造为一个环,所以 Bucket-3 的位置实际上在第 0 号位,即 Bucket-1 的位置)
  3. 由于滑动窗口以时间为 key,外部接口调用 Add 方法添加新的指标时,会根据时间跨度将不同(相近)时间戳的指标汇总到一个 “窗口” 中,从而可以使得统计结果更加趋于平滑,不会受到单次统计波动的影响

image

0x04 滑动窗口的实例化 Rolling

Rolling_policy 中,封装了滑动窗口,加入了互斥锁、单位时间跨度(单个桶)、最后一次更新时间等,使其成为外部可调用的结构体,如下:

type RollingPolicy struct {
	mu     sync.RWMutex		//(子协程)并发的 add 操作必须加锁!
	size   int			// 滑动窗口的 size
	window *Window		// 滑动窗口
	offset int

	bucketDuration time.Duration		// 一个桶代表多长时间
	lastAppendTime time.Time			// 滑动窗口的 START 位置
}

通过,time.Now()- 当前时间、bucketDuration 以及 lastAppendTime 这几项时间因素的关联,是的 RollingPolicy.window 具有了基于时间的滑动窗口的概念。

RollingPolicy 的方法封装

RollingPolicy 中有 3 个核心方法,分别为 timespanaddReduce

下面 timespan() 方法就是计算:当前调用此方法的时刻,距离上一次写入(lastAppendTime)滑过了几个 Bucket

func (r *RollingPolicy) timespan() int {
	v := int(time.Since(r.lastAppendTime) / r.bucketDuration)
	if v > -1 { // maybe time backwards
		return v
	}
	// 时间调整了
	return r.size
}

RollingPolicy 的添加数据方法,分别调用了 window 的 AppendAdd 方法:

// Append appends the given points to the window.
func (r *RollingPolicy) Append(val float64) {
	r.add(r.window.Append, val)
}

// Add adds the given value to the latest point within bucket.
func (r *RollingPolicy) Add(val float64) {
	r.add(r.window.Add, val)
}

add 方法:通过计算出当前时间 time.Now()lastAppendTime 的跨度差,在环形滑动窗口中获取正确的位置,然后调用传入的 f 进行插入操作。请注意,跨度超过了 Window 的 End 位置需要从 Start 位置重新计算。

func (r *RollingPolicy) add(f func(offset int, val float64), val float64) {
	r.mu.Lock()
	// 计算时间跨度(跨过了几个 bucket)
	timespan := r.timespan()
	if timespan > 0 {
		// 当 timespan>0 时,表示有跨度
		// 更新当前 append 时间
		r.lastAppendTime = r.lastAppendTime.Add(time.Duration(timespan * int(r.bucketDuration)))
		offset := r.offset
		// reset the expired buckets
		s := offset + 1 //s 指向下一个位置
		if timespan > r.size {
			// 如果跨度超过了 window 的大小,timespan 最大为 window 的 size
			timespan = r.size
		}
		e, e1 := s+timespan, 0 // e: reset offset must start from offset+1
		if e > r.size {
			e1 = e - r.size
			e = r.size
		}
		for i := s; i < e; i++ {
			// 清理 offset1---> s+timespan 的之间的 bucket
			r.window.ResetBucket(i)
			offset = i
		}
		for i := 0; i < e1; i++ {
			// 如果超过一个跨度,那么说明时间跨度在两个区间上
			r.window.ResetBucket(i)
			offset = i
		}
		r.offset = offset
	}
	// 添加到 offset 位置
	//(当 timespan==0 时,说明,当前时间未出现 span,直接操作 r.offset 位置即可)
	f(r.offset, val)
	r.mu.Unlock()
}

Reduce 方法,非常有意思,它的传入参数是 reduce.go 中定义的求值操作,如 SumAvgMinMaxCouter。 该方法的作用是,在 timespan 这个区间进行遍历,计算遍历的起始位置 offset 和长度 count,会调用上面这个几个方法之一(对遍历的这些 Bucket)进行计算,最终得到 val。比如,求当前时间 time.Now()lastAppendTime 之间,滑动窗口的 Sum 累加值。

// Reduce applies the reduction function to all buckets within the window.
func (r *RollingPolicy) Reduce(f func(Iterator) float64) (val float64) {
	r.mu.RLock()
	timespan := r.timespan()
	if count := r.size - timespan; count > 0 {
		offset := r.offset + timespan + 1
		if offset >= r.size {
			offset = offset - r.size
		}
		// 计算得到遍历的开始位置 offset 和遍历长度 count
		//f 的参数,就是 Iterator 的结果
		val = f(r.window.Iterator(offset, count))
	}
	r.mu.RUnlock()
	return val
}

图解 Reduce 方法

1、初始化状态 image

2、计算需要遍历的 Bucket 个数,如图(所有深绿色的部分,滑动窗口中的旧数据,都是需要遍历累计计算值的) $count = r.size - r.timespan()$

image

3、从 r.offset + timespan + 1 开始,遍历滑动窗口,遍历的个数是 count,这里没考虑 offset 超出 r.size 的情况(意为 span 的位置在 r.offset 之前)

image

4、最后,调用传入的 Iterator 计算值

0x05 总结

本文分析了 Kratos 框架中的 Metrics 基础封装、滑动窗口的实现 Window、实例化 RollingPolicy 等,下一篇文章来分析下,更高级的结构 RollingCounter、RollingGauage 等。

0x06 参考

转载请注明出处,本文采用 CC4.0 协议授权