Golang 并发协程池实现分析(四)

分析 Jeffail/tunny 协程池实现

Posted by pandaychen on February 2, 2022

0x01 tunny 协程池使用


  • 全局池,初始化处理方法
  • 在逻辑较重的地方,使用pool.Process(input)处理并返回
  • 如果项目针对多个方法都需要使用协程池,那么有两个方法
    1. 每个方法都开启独立的协程池
    2. 仅开启1个协程池,根据传入参数做switch
import (


func main() {
	numCPUs := runtime.NumCPU()

	pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
		var result []byte
		// TODO: Something CPU heavy with payload	
		return result
	defer pool.Close()

	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
		input, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Internal error", http.StatusInternalServerError)
		defer r.Body.Close()

		// Funnel this work into our pool. This call is synchronous and will
		// block until the job is completed.
		result := pool.Process(input)


	http.ListenAndServe(":8080", nil)


注意reqChan这个成员,初始化长度为1reqChan: make(chan workRequest),各个空闲的worker把自己的workRequest.jobChanworkRequest.retChan发送给PoolPool会把待处理的payload放入workRequest.jobChan,并且等待从workRequest.retChan获取执行结果,代码在此

// Pool is a struct that manages a collection of workers, each with their own
// goroutine. The Pool can initialize, expand, compress and close the workers,
// as well as processing jobs with the workers synchronously.
type Pool struct {
	queuedJobs int64

	ctor    func() Worker
	workers []*workerWrapper	//存储worker的地方,len()表示有多少个worker
	reqChan chan workRequest	//有点像令牌

	workerMut sync.Mutex


  • closureWorker:参数payload为参数,调用w.processor(payload)处理;对应初始化方法为NewFunc
  • callbackWorker:参数payload为func类型,调用f, ok := payload.(func());f()处理;对应初始化方法为NewCallback
type Worker interface {
	// Process will synchronously perform a job and return the result.
	Process(interface{}) interface{}

	// BlockUntilReady is called before each job is processed and must block the
	// calling goroutine until the Worker is ready to process the next job.

	// Interrupt is called when a job is cancelled. The worker is responsible
	// for unblocking the Process implementation.

	// Terminate is called when a Worker is removed from the processing pool
	// and is responsible for cleaning up any held resources.


// workerWrapper takes a Worker implementation and wraps it within a goroutine
// and channel arrangement. The workerWrapper is responsible for managing the
// lifetime of both the Worker and the goroutine.
type workerWrapper struct {
	worker        Worker
	interruptChan chan struct{}

	// reqChan is NOT owned by this type, it is used to send requests for work.
	reqChan chan<- workRequest

	// closeChan can be closed in order to cleanly shutdown this worker.
	closeChan chan struct{}

	// closedChan is closed by the run() goroutine when it exits.
	closedChan chan struct{}


0x02 分析

tunny的核心原理如下图,涵盖了tunny从资源池中获取goroutine并进行处理的逻辑: tunny-principle


  1. tunny将goroutine处理单元封装为workWrapper并且固定数量,由此可以对goroutine的数目进行限制
  2. workerWrapper.run()作为一个goroutine,内部负责具体事务的处理;当一个worker空闲时,会将一个workRequest(可看作是请求处理单元)放入reqChan,并阻塞等待使用方的调用,核心代码在此
  3. workRequest包含两个channel,其中jobChan用于传入调用方的输入,retChan用于给调用方返回执行结果
  4. 调用方会从pool的reqChan中获取一个workRequest请求处理单元,并在workRequest.jobChan中传参,这样workerWrapper.run()中就会继续进行work.process执行;处理结束之后将结果通过workRequest.retChan返回给调用方,然后继续通过reqChan <- workRequest阻塞等待下一个调用方的处理
  5. workerWrapper.run()中的work是一个需要用户实现的接口,必须实现Process(interface{}) interface{},即业务逻辑方法!

0x03 Pool创建


// New creates a new Pool of workers that starts with n workers. You must
// provide a constructor function that creates new Worker types and when you
// change the size of the pool the constructor will be called to create each new
// Worker.
func New(n int, ctor func() Worker) *Pool {
	p := &Pool{
		ctor:    ctor,
		reqChan: make(chan workRequest),

	return p

// NewFunc creates a new Pool of workers where each worker will process using
// the provided func.
func NewFunc(n int, f func(interface{}) interface{}) *Pool {
	return New(n, func() Worker {
		return &closureWorker{
			processor: f,

// NewCallback creates a new Pool of workers where workers cast the job payload
// into a func() and runs it, or returns ErrNotFunc if the cast failed.
func NewCallback(n int) *Pool {
	return New(n, func() Worker {
		return &callbackWorker{}

0x03 生产者


  • p.Process():会一直阻塞直到任务完成,即使当前没有空闲 worker 也会阻塞
  • p.ProcessTimed():带超时的Process(),支持传入一个超时时间,如果超过这个时间还没有空闲 worker,或者任务还没有处理完成,就会终止,并返回错误
  • p.ProcessCtx():当前 context 状态变为Done之后,任务也会停止执行。context 会由于超时、取消等原因切换为Done状态。

通常,在协程池中超时有 2 种情况:

  1. 等不到空闲的 worker:所有 worker 一直处理繁忙状态,正在处理的任务比较耗时,无法短时间内完成
  2. 任务本身比较耗时,必须加一个超时时间方式任务执行被长期阻塞
// Process will use the Pool to process a payload and synchronously return the
// result. Process can be called safely by any goroutines, but will panic if the
// Pool has been stopped.
func (p *Pool) Process(payload interface{}) interface{} {
	atomic.AddInt64(&p.queuedJobs, 1)

  //1. 获取一个执行令牌
	request, open := <-p.reqChan
	if !open {

  //2. 将运行参数通过channel传递给执行goroutine
	request.jobChan <- payload

  //3. 阻塞等待获取结果
	payload, open = <-request.retChan
	if !open {

  //4. 返回调用方结果
	atomic.AddInt64(&p.queuedJobs, -1)
	return payload

// ProcessTimed will use the Pool to process a payload and synchronously return
// the result. If the timeout occurs before the job has finished the worker will
// be interrupted and ErrJobTimedOut will be returned. ProcessTimed can be
// called safely by any goroutines.
func (p *Pool) ProcessTimed(
	payload interface{},
	timeout time.Duration,
) (interface{}, error) {
	atomic.AddInt64(&p.queuedJobs, 1)
	defer atomic.AddInt64(&p.queuedJobs, -1)

	tout := time.NewTimer(timeout)

	var request workRequest
	var open bool

	select {
	case request, open = <-p.reqChan:
		if !open {
			return nil, ErrPoolNotRunning
	case <-tout.C:
		return nil, ErrJobTimedOut

	select {
	case request.jobChan <- payload:
	case <-tout.C:
		return nil, ErrJobTimedOut

	select {
	case payload, open = <-request.retChan:
		if !open {
			return nil, ErrWorkerClosed
	case <-tout.C:
		return nil, ErrJobTimedOut

	return payload, nil

// ProcessCtx will use the Pool to process a payload and synchronously return
// the result. If the context cancels before the job has finished the worker will
// be interrupted and ErrJobTimedOut will be returned. ProcessCtx can be
// called safely by any goroutines.
func (p *Pool) ProcessCtx(ctx context.Context, payload interface{}) (interface{}, error) {
	atomic.AddInt64(&p.queuedJobs, 1)
	defer atomic.AddInt64(&p.queuedJobs, -1)

	var request workRequest
	var open bool

	select {
	case request, open = <-p.reqChan:
		if !open {
			return nil, ErrPoolNotRunning
	case <-ctx.Done():
		return nil, ctx.Err()

	select {
	case request.jobChan <- payload:
	case <-ctx.Done():
		return nil, ctx.Err()

	select {
	case payload, open = <-request.retChan:
		if !open {
			return nil, ErrWorkerClosed
	case <-ctx.Done():
		return nil, ctx.Err()

	return payload, nil



  1. 首先阻塞在逻辑request, open := <-p.reqChan上,当前空闲的worker会把自己的jobChanretChan放入到p.reqChan中,这样解除阻塞
  2. 生产者把用户数据payload放到Worker的jobChan
  3. 阻塞在payload, open = <-request.retChan上,等待Worker执行完成并获取结果,获取结果后解除阻塞
  4. Process运行完成
func (p *Pool) Process(payload interface{}) interface{} {
	atomic.AddInt64(&p.queuedJobs, 1)

	request, open := <-p.reqChan
	if !open {

	request.jobChan <- payload


	payload, open = <-request.retChan
	if !open {

	atomic.AddInt64(&p.queuedJobs, -1)
	return payload

0x04 任务传输


  1. 任务数据channel(Pool把要处理的数据或者func传递给Worker)
  2. 结果channel,Worker把结果异步传输给Pool

0x05 消费者:任务执行


  1. 当空闲时,就把自己的jobChanretChan告知Pool(当前可以处理任务)
  2. 处理完成后,结果发送到retChan,继续处理下一个任务
func (w *workerWrapper) run() {
	jobChan, retChan := make(chan interface{}), make(chan interface{})
	defer func() {

	for {
		// NOTE: Blocking here will prevent the worker from closing down.
		select {
		case w.reqChan <- workRequest{
			jobChan:       jobChan,
			retChan:       retChan,
			interruptFunc: w.interrupt,
			select {
			case payload := <-jobChan:
				result := w.worker.Process(payload)
				select {
				case retChan <- result:
				case <-w.interruptChan:
					w.interruptChan = make(chan struct{})
			case <-w.interruptChan:
				w.interruptChan = make(chan struct{})
		case <-w.closeChan:

0x06 动态调整大小


  1. 扩容:如果当前协程池大小lWorkers小于目标sizen,则扩容p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
  2. 缩容:反之,则停止待缩容的Worker:p.workers[i].stop()


// SetSize changes the total number of workers in the Pool. This can be called
// by any goroutine at any time unless the Pool has been stopped, in which case
// a panic will occur.
func (p *Pool) SetSize(n int) {
	defer p.workerMut.Unlock()

	lWorkers := len(p.workers)
	if lWorkers == n {

	// Add extra workers if N > len(workers)
	for i := lWorkers; i < n; i++ {
		p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))

	// Asynchronously stop all workers > N
	for i := n; i < lWorkers; i++ {

	// Synchronously wait for all workers > N to stop
	for i := n; i < lWorkers; i++ {
		p.workers[i] = nil

	// Remove stopped workers from slice
	p.workers = p.workers[:n]


  1. stop:向Worker发送退出信号,调用workerWrapper.stop()会关闭closeChan通道,会触发workerWrapper.run()中的for循环跳出,进而执行defer中的close(retChan)close(closedChan)代码
  2. join:阻塞等待每个Worker成功退出

这里需要关闭retChan channel是为了防止ProcessX方法在等待retChan数据,closedChanchannel关闭后,workerWrapper.join()方法就返回了,这里的动态扩缩容都是实时生效的

// Asynchronously stop all workers > N
	for i := n; i < lWorkers; i++ {

	// Synchronously wait for all workers > N to stop
	for i := n; i < lWorkers; i++ {
		p.workers[i] = nil

	// Remove stopped workers from slice
	p.workers = p.workers[:n]

func (w *workerWrapper) stop() {

func (w *workerWrapper) join() {

0x07 一些细节

0x08 总结


  • tunny仅支持同步的方式执行任务(适用于需要同步等待结果的场景),虽然任务在另一个 goroutine 执行,但是提交任务的 goroutine 必须等待结果返回或超时,不能做其他事情
  • tunny 为了支持超时和取消,设计了多个channel用于和执行任务的 goroutine 通信。一次任务执行的过程涉及多次通信,性能会有损失
  • 而ants完全是异步的任务执行流程,相比tunny性能是稍高一些的。但是也因为它的异步特性,导致没有任务超时、取消这些机制。而且如果需要收集结果,必须要自己编写额外的代码

