Golang 并发:如何优雅实现并发 goroutine 若干细节

Golang errorgroup 应用(续)

Posted by pandaychen on March 25, 2023

0x00 前言

0x01 SizedWaitGroup机制


import (


func main() {

        // Typical use-case:
        // 50 queries must be executed as quick as possible
        // but without overloading the database, so only
        // 8 routines should be started concurrently.
        swg := sizedwaitgroup.New(8)  //仅仅限制8个并发
        for i := 0; i < 50; i++ {
                go func(i int) {
                        defer swg.Done()


func query(i int) {
        ms := i + 500 + rand.Intn(500)
        time.Sleep(time.Duration(ms) * time.Millisecond)


// SizedWaitGroup has the same role and close to the
// same API as the Golang sync.WaitGroup but adds a limit of
// the amount of goroutines started concurrently.
type SizedWaitGroup struct {
	Size int

	current chan struct{}
	wg      sync.WaitGroup

// New creates a SizedWaitGroup.
// The limit parameter is the maximum amount of
// goroutines which can be started concurrently.
func New(limit int) SizedWaitGroup {
	size := math.MaxInt32 // 2^31 - 1
	if limit > 0 {
		size = limit
	return SizedWaitGroup{
		Size: size,

		current: make(chan struct{}, size),
		wg:      sync.WaitGroup{},

// Add increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Add() {

// AddWithContext increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called, or when the context is canceled. Returns nil on
// success or an error if the context is canceled before the lock
// is acquired.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) AddWithContext(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case s.current <- struct{}{}:
	return nil

// Done decrements the SizedWaitGroup counter.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Done() {

0x02 errgroup


func main() {
        wg := sync.WaitGroup{}
        for i := 0; i < 10; i++ {
                go func() {
                        defer wg.Done()

上面例子中,想要知道某个 goroutine 报什么错误的时候发现很难,因为是直接 go func(){} 出去的,并没有返回值,因此对需要接受返回值做进一步处理的需求就无法满足了,那如何做呢?

前文 Kratos 源码分析:Errgroup 机制 介绍了应对这种方法的处理一种方式,那就是 errgroup 机制,先简单回顾下。

0x03 参考