0x00 前言

上一篇博客 理解 Kratos 的数据统计类型 Metrics(一) 中,分析了 Kratos 的滑动窗口类 - RollingPolicy 的实现。这篇文章进一步看下基于 RollingPolicy 进一步封装的应用结构:RollingCounterRollingGauge。这两个结构在 Kratos 项目中应用较为广泛。

0x01 RollingCounter

RollingCounter 就是基于滑动窗口的计数器。常用于 RPC、HTTP 调用成功率统计,在 WRR-LB 算法实现 中,就应用了 RollingCounter

type subConn struct {
	conn balancer.SubConn
	addr resolver.Address
	meta wmeta.MD

	err     metric.RollingCounter		//rpc 调用的错误率统计
	latency metric.RollingGauge			//rpc 调用的时延统计
	si      serverInfo
	// effective weight
	ewt int64
	// current weight
	cwt int64
	// last score
	score float64

rollingCounterRollingPolicy 的上一层封装:

type rollingCounter struct {
	policy *RollingPolicy

type RollingCounter interface {
	Metric			// with prometheus
	Timespan() int
	// Reduce applies the reduction function to all buckets within the window.
	Reduce(func(Iterator) float64) float64

RollingCounterAdd 方法,限制了计数器累加值不能为负数:

func (r *rollingCounter) Add(val int64) {
    // 计数器不能为负数
	if val < 0 {
		panic(fmt.Errorf("stat/metric: cannot decrease in value. val: %d", val))

r.policy.Add,回想上一篇博客分析 Window 的 Add 方法:向指定的偏移 offset 的 0 号位置累加值。图示如下:

// Add adds the given value to the latest point within bucket.
func (r *RollingPolicy) Add(val float64) {
	r.add(r.window.Add, val)


接开头,在 WRR-LB 中,初始化的代码如下:

err: metric.NewRollingCounter(metric.RollingCounterOpts{
	Size:           10,			//window 大小为 10
	BucketDuration: time.Millisecond * 100,		// 每个窗口代表 100ms

对 RPC 调用结果,计数器 累加值的代码 如下,判断 RPC 请求的返回错误代码命中则执行 conn.err.Add(1),反之则 conn.err.Add(0),当然无论调用成功还是失败,count 的值都是要 +1 的。

ev := int64(0) // error value ,if error set 1
if di.Err != nil {
	if st, ok := status.FromError(di.Err); ok {
		// only counter the local grpc error, ignore any business error
		if st.Code() != codes.Unknown && st.Code() != codes.OK {
			ev = 1

最后,再看下 subConn.err 提供的 Reduce() 归约方法,它有两个返回值,err 代表滑动窗口中错误的总计数,req 代表滑动窗口中请求的总计数。那么基于滑动窗口计算的错误率就是:

$cs = 1- \frac{float64(errc)}{float64(req)}$

errc, req := conn.errSummary()
cs := 1 - (float64(errc) / float64(req))

func (c *subConn) errSummary() (err int64, req int64) {
	c.err.Reduce(func(iterator metric.Iterator) float64 {
		for iterator.Next() {
			bucket := iterator.Bucket()
			req += bucket.Count			// 累加 bucket 的 Count 值
			for _, p := range bucket.Points {
				err += int64(p)			// 累加 bucket 的 Err 值
		return 0

0x02 RollingGauge

RollingGauge 是基于滑动窗口的瞬时数据统计器。它和 RollingCounter 的最大不同是向 Bucket 的 Points 数组,追加数据(Points 数组的每个 index 代表了一种维度的 gauge)。RollingGauge 的结构如下:

type rollingGauge struct {
	policy *RollingPolicy

// RollingGauge represents a ring window based on time duration.
// e.g. [[1, 2], [1, 2, 3], [1,2, 3, 4]]
type RollingGauge interface {
	// Reduce applies the reduction function to all buckets within the window.
	Reduce(func(Iterator) float64) float64

RollingGauge 调用了 Window 的 Append 方法来完成存值:

func (r *rollingGauge) Add(val int64) {
// Append appends the given points to the window.
func (r *RollingPolicy) Append(val float64) {
	r.add(r.window.Append, val)


此外,在前一节提到的 subConn 结构中,计算 RPC 调用平均时延就使用了 RollingGauge 结构,使用方式和 RollingCounter 差不多:

// 基于滑动窗口计算平均 latency
lagv, lagc := conn.latencySummary()

func (c *subConn) latencySummary() (latency float64, count int64) {
	c.latency.Reduce(func(iterator metric.Iterator) float64 {
		for iterator.Next() {
			bucket := iterator.Bucket()
			count += bucket.Count		// 累加 RPC 请求的总数
			for _, p := range bucket.Points {
				latency += p			// 累加所有 RPC 的调用时延
		return 0

	// 计算平均时延
	return latency / float64(count), count

0x03 总结

本文分析了 Kratos 基于滑动窗口实现的统计数据结构:RollingCounterRollingGauge

