Golang 并发协程池实现分析(二)

分析 Fasthttp 的 goroutine pool

Posted by pandaychen on October 9, 2020

0x00 前言

    FastHTTP 开源项目给出了一个性能极佳协程池的实现。

标准库 net.httpfasthttp 最大的不同可能就是 server 在处理连接的时候使用了协程池。在并发量大的时候,goroutine 数量巨大,runtime 层的上下文切换成本对性能有影响。而 fasthttp 用协程池规避了这个问题。fasthttp 并不是 One request One goroutine,而是实现了一个非常 Nice 的 workerPool,尽可能复用 goroutine。

0x01 fastHTTP 协程池简介

阅读完整个 fasthttp 的实现代码,只能用精妙二字来形容,可以作为 golang channel 应用案例的典范。这里先给出我对 fasthttp 协程池的总结:

fasthttp 的 pool 实现是通过对加锁 slice 的增 / 减操作来进行并发控制的,slice 的最大长度即为并发值。 fasthttp 按需创建 goroutine,通过 workerChan 接收要处理的 net.Conn,处理完成将 workerChan 放回 slice,这样下一个请求又可以获取到… 继续循环处理

0x02 fasthttp VS net.http

下面是标准库的 http.Server 实现,对每个新客户端连接(请求)都开启一个单独的 goroutine 来处理:

net/http/server.go
func (srv *Server) Serve(l net.Listener) error {
    ......
    for {
      rw, e := l.Accept()
      ......
      //FastHTTP 在这步使用协程池
      go c.serve(ctx)
    }
}

FastHTTPfasthttp.ListenAndServe 则启用了 pool 机制:

github.com/valyala/fasthttp/server.go 1489
func (s *Server) Serve(ln net.Listener) error {
    // default concurrency set to 256*1024
    maxWorkersCount := s.getConcurrency()
    s.concurrencyCh = make(chan struct{}, maxWorkersCount)
    // 初始化 pool 结构
    wp := &workerPool{
        WorkerFunc:      s.serveConn,
        MaxWorkersCount: maxWorkersCount,
        LogAllErrors:    s.LogAllErrors,
        Logger:          s.logger(),
    }
    wp.Start()
    ......
    for {
      if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
          ......
      }
      // 对应 go 原生的 go c.serve(ctx)
      if !wp.Serve(c) {   //wp.Serve,使用 pool 的 Serve 方法来处理请求
          ......
      }
      ......
    }
}

     从上面两端代码可以明显看出,在 golang 原生 http.Server 包中,当接收到新请求就会启动一个协程处理,而 FastHTTP 则使用协程池处理。

接下来,我们来分析 fasthttp 中 pool 的具体实现。

0x03 核心结构

workerPool 是协程池的核心结构:

  • WorkerFuncs.serveConn 方法 ,即每个客户端连接 net.conn 的处理函数
  • MaxWorkersCount:worker 池的最大数量
  • workerChanPoolsync.Pool 对象池,用来复用堆上的结构,做优化所用
  • MaxIdleWorkerDuration:worker 的最大空闲时间
  • ready []*workerChan:核心结构,是可用的 worker 列表,所有 goroutine worker 是存放在 slice 中的。pool 的 worker 协程是通过 ready 数组来代表的(见下面的代码分析)

注意两点:

  1. ready:这个数组模拟一个类似栈的 FILO 队列,也就是说我们每次使用的 worker 都从 ready 数组的尾部开始取
  2. ready:数组的成员是指针
type workerPool struct {
    WorkerFunc func(c net.Conn) error
    MaxWorkersCount int
    LogAllErrors bool
    MaxIdleWorkerDuration time.Duration
    Logger Logger

    lock         sync.Mutex
    workersCount int
    mustStop     bool
    ready []*workerChan
    stopCh chan struct{}
    workerChanPool sync.Pool
}

workerChan 和上一篇文章 Golang 并发协程池实现分析(一)WorkerPool chan chan Job 的作用很类似,都是用作并发控制:

type workerChan struct {
	lastUseTime time.Time
	ch          chan net.Conn
}

0x04 代码分析

pool 初始化

Serve 方法,进入 accept 之前,完成对 pool 的初始化工作,主要是注册了对 net.Conn 的处理方法,s.serveConn

wp := &workerPool{
        WorkerFunc:      s.serveConn,
        MaxWorkersCount: maxWorkersCount,
        LogAllErrors:    s.LogAllErrors,
        Logger:          s.logger(),
}

pool 的启动

Serve 方法,Accept 之前,通过 wp.start 开启了一个 goroutine,定时清理 workerpool 中未使用时间超过 maxIdleWorkerDuration 的 goroutine(很长时间无请求,回收 goroutine):

func (wp *workerPool) Start() {
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    go func() {
        var scratch []*workerChan
        for {
            wp.clean(&scratch)
            select {
            case <-stopCh:
                return
            default:
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}

Pool 的停止

当协程池停止运行时,回收资源,清空 ready 里所有 ch,并清空 ready。ch.ch <- nil 这会触发 workerFunc 的 worker 协程退出:

func (wp *workerPool) Stop() {
    close(wp.stopCh)
    wp.stopCh = nil
    wp.lock.Lock()
    ready := wp.ready
    for i, ch := range ready {
        ch.ch <- nil
        ready[i] = nil
    }
    wp.ready = ready[:0]
    wp.mustStop = true
    wp.lock.Unlock()
}

核心方法:Serve

Serve 方法 的逻辑是:

  1. 调用 wp.getCh() 拿到一个票据 ch,同时该票据也是 net.Conn 的传递通道
  2. 将新的客户端连接放入 ch,触发 worker 工作协程对新连接的处理流程
func (wp *workerPool) Serve(c net.Conn) bool {
	ch := wp.getCh()
	if ch == nil {
		return false
	}
	ch.ch <- c
	return true
}

核心方法:getCh

getCh 的实现可以理解为一个用来执行 workerFunc 的 goroutine 都绑定了一个 workerChan。把要处理的 conn 发到这个 workerChan,这个 goroutine 就开始执行。没有要执行的 conn 则 goroutine 阻塞,直到下次 workerChan 有连接发来。

 func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++
        }
    } else {
        ch = ready[n]
        ready[n] = nil
        // 这里说明 ready 尾部的 workerChan 要被取出和 goroutine 绑定并使用了
        wp.ready = ready[:n]  //create 0-->n-1 a new slice
    }
    wp.lock.Unlock()

    if ch == nil {
        // 如果没有可用的 worker,包含两种情况:
        //1. 未达 worker 总数,需要新建
        //2. 超过 worker 总数,这时候返回失败(nil)
        if !createWorker {
            return nil
        }

        // 从对象池获取 chan net.Conn 结构,用于存储客户端连接的引用
        vch := wp.workerChanPool.Get()
        if vch == nil {
            vch = &workerChan{
                // 注意 workerChan 中可以定义为是个通用结构,目前是 net.Conn
                //ch          chan net.Conn
                ch: make(chan net.Conn, workerChanCap),
            }
        }
        ch = vch.(*workerChan)  // 类型转换,注意这里 ch 是指针
        go func() {
            // 注意:这里是真正处理业务逻辑的部分!
            wp.workerFunc(ch)
            wp.workerChanPool.Put(vch)
        }()
    }
    return ch
}

这里有个细节:

func main(){
        var ch *int
        var ready []*int
        ready=make([]*int,10)
        var a = 1
        ready[1]= &a
        ch = ready[1]
        ready[1] = nil
        fmt.Printf("%v,%v\n",ch,ready[1])
}

注意到上面代码中的这段逻辑,当前的要执行任务数超过了协程池的并发数,直接返回失败了(不处理),如果要剥离fasthttp的pool实现,这里需要特殊处理下。比较合理的方式是,等待任务处理完成后继续调度

if ch == nil {
        // 如果没有可用的 worker,包含两种情况:
        //1. 未达 worker 总数,需要新建
        //2. 超过 worker 总数,这时候返回失败(nil)
        if !createWorker {
            //超过了并发总数
            return nil
        }
    //...
}

核心方法:workerFunc

上一步中,单独开启的 goroutine 中来完成两个事情:

  1. wp.workerFunc(ch):阻塞接收 ch *workerChan 中的连接 conn
  2. wp.workerChanPool.Put(vch)

worker 处理完一个连接 conn 后,通过 wp.release() 这个 conn 对应的票据到 ready 数组。即表示这时 worker 空闲,可以执行下一次请求。同时处理完 net.Conn 后要将 conn 置为 nil。在 workerFunc 的实现中,调用了业务注册的方法 wp.WorkerFunc(c),注意前面是大写,来完成真正的业务逻辑,该方法对应于前面的 s.serveConn 方法。

此外,正常情况下 worker 是不退出的,除非 wp.Stop,这样也实现 pool 的最最核心的 goroutine 复用能力。

func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn
    var err error
    // for range 在一个 channel 上
    for c = range ch.ch {
        if c == nil {
            // 注意这里,和 clean 方法息息相关
            break
        }
        if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
            // 处理错误
        }
        c = nil

        // 将 ch *workerChan 放回 ready 数组
        if !wp.release(ch) {
            break
        }
    }
    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
}

func (wp *workerPool) release(ch *workerChan) bool {
    ch.lastUseTime = CoarseTimeNow()
    wp.lock.Lock()
    if wp.mustStop {
        wp.lock.Unlock()
        return false
    }
    //
    wp.ready = append(wp.ready, ch)
    wp.lock.Unlock()
    return true
}

PS:注意 workerFunc 方法中,for c = range ch.ch 的循环遍历实现中,当检测到 c==nil 时退出整个循环,那么何时退出呢? 1、在请求处理正常结束时,注意下面的 c = nil,这样 wp.workersCount--,即工作协程数 -1

func (s *Server) Serve(ln net.Listener) error {
    // default concurrency set to 256*1024
    maxWorkersCount := s.getConcurrency()
    s.concurrencyCh = make(chan struct{}, maxWorkersCount)
    wp := &workerPool{
        WorkerFunc:      s.serveConn,
        MaxWorkersCount: maxWorkersCount,
        LogAllErrors:    s.LogAllErrors,
        Logger:          s.logger(),
    }
    wp.Start()
    for {
        if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
            wp.Stop()
            if err == io.EOF {
                return nil
            }
            return err
        }
        if !wp.Serve(c) {
            s.writeFastError(c, StatusServiceUnavailable,
            "The connection cannot be served for Server.Concurrency limit exceeded")
            c.Close()
            time.Sleep(100 * time.Millisecond)
        }
        c = nil
    }
}

2、在 release 方法中,将 ch *workerChan 放回 ready 数组,也就是说,fasthttp 中,每处理完一个连接,就把 workerChan 放回,下一个循环再来请求,继续从 ready 数组的当前最后的位置取。这样来看,ready 数组很像限流算法中的令牌桶,而 workerChan 是令牌,谁拿到谁就可以执行。

Start 方法的子任务:Clean

最后看下 start 中开启的 clean 定时任务。// wp.clean() 的操作是 查看最近使用的 workerChan, 如果他的最近使用间隔大于某个值,那么把这个 workerChan 清理了。

之所以清理过程只从前遍历清理前面部分,是因为 ready 是 FILO 先进后出的,所以 ready 中越往后的空闲时间最短。

func (wp *workerPool) clean(scratch *[]*workerChan) {
    maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
    currentTime := time.Now()
    wp.lock.Lock()
    ready := wp.ready
    n := len(ready)
    i := 0
    for i <n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
        i++
    }
    *scratch = append((*scratch)[:0], ready[:i]...)
    if i > 0 {
        m := copy(ready, ready[i:])
        for i = m; i < n; i++ {
            ready[i] = nil
        }
        wp.ready = ready[:m]
    }
    wp.lock.Unlock()
    tmp := *scratch
    for i, ch := range tmp {
        ch.ch <- nil
        tmp[i] = nil
    }
}

0x05 再看 goroutine 复用

这里在问一个问题,本文介绍的复用 goroutine 在何处体现?让我们再回到 workerFunc 方法:

func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn
    var err error
    // for range 在一个 channel 上
    for c = range ch.ch {
        if c == nil {
            // 注意这里,和 clean 方法息息相关
            break
        }
        if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
            // 处理错误
        }
        c = nil

        // 将 ch *workerChan 放回 ready 数组
        if !wp.release(ch) {
            break
        }
    }
    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
}

让我们回顾下 workerFunc 之前的逻辑:当系统中运行的 goroutine 个数小于 maxWorkersCount 个时,按需创建(有新的请求且 ready 中没有可用令牌),此后每个 workerFunc 就在 for c = range ch.ch 中源源不断的处理新到来的请求(ch.ch 可以理解为 goroutine 暴露给外部的通信 channel)。这样避免了 goroutine 频繁的创建和销毁,从而达到了复用的目的。

0x06 总结

fasthttp 协程池是非常优秀的 golang GMP 模型的应用,在实际项目中极具借鉴意义。特别是在高并发的单一循环 Job 处理的场景中。另一个协程池开源项目 ants 也是基于 fasthttp 来实现的。

fasthttp

0x07 参考