0x00 前言
0x01 SizedWaitGroup机制
SizedWaitGroup
在sync.WaitGroup
基础上增加了并发启动的goroutines的数量限制特性,即SizedWaitGroup
增加了限制同时启动例程的最大数量的功能,使用方法如下:
import (
"fmt"
"math/rand"
"time"
"github.com/remeh/sizedwaitgroup"
)
func main() {
rand.Seed(time.Now().UnixNano())
// Typical use-case:
// 50 queries must be executed as quick as possible
// but without overloading the database, so only
// 8 routines should be started concurrently.
swg := sizedwaitgroup.New(8) //仅仅限制8个并发
for i := 0; i < 50; i++ {
swg.Add()
go func(i int) {
defer swg.Done()
query(i)
}(i)
}
swg.Wait()
}
func query(i int) {
fmt.Println(i)
ms := i + 500 + rand.Intn(500)
time.Sleep(time.Duration(ms) * time.Millisecond)
}
实现分析
// SizedWaitGroup has the same role and close to the
// same API as the Golang sync.WaitGroup but adds a limit of
// the amount of goroutines started concurrently.
type SizedWaitGroup struct {
Size int
current chan struct{}
wg sync.WaitGroup
}
// New creates a SizedWaitGroup.
// The limit parameter is the maximum amount of
// goroutines which can be started concurrently.
func New(limit int) SizedWaitGroup {
size := math.MaxInt32 // 2^31 - 1
if limit > 0 {
size = limit
}
return SizedWaitGroup{
Size: size,
current: make(chan struct{}, size),
wg: sync.WaitGroup{},
}
}
// Add increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called.
//
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Add() {
s.AddWithContext(context.Background())
}
// AddWithContext increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called, or when the context is canceled. Returns nil on
// success or an error if the context is canceled before the lock
// is acquired.
//
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) AddWithContext(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.current <- struct{}{}:
break
}
s.wg.Add(1)
return nil
}
// Done decrements the SizedWaitGroup counter.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Done() {
<-s.current
s.wg.Done()
}
0x02 errgroup
先看下经典的例子:
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
}()
}
wg.Wait()
}
上面例子中,想要知道某个 goroutine 报什么错误的时候发现很难,因为是直接 go func(){}
出去的,并没有返回值,因此对需要接受返回值做进一步处理的需求就无法满足了,那如何做呢?
前文 Kratos 源码分析:Errgroup 机制 介绍了应对这种方法的处理一种方式,那就是 errgroup
机制,先简单回顾下。
0x03 参考
FEATURED TAGS
Latex
gRPC
负载均衡
OpenSSH
Authentication
Consul
Etcd
Kubernetes
性能优化
Python
分布式锁
WebConsole
后台开发
Golang
OpenSource
Nginx
Vault
网络安全
Perl
分布式理论
Raft
正则表达式
Redis
分布式
限流
go-redis
微服务
反向代理
ReverseProxy
Cache
缓存
连接池
OpenTracing
GOMAXPROCS
GoMicro
微服务框架
日志
Pool
Kratos
Hystrix
熔断
并发
Pipeline
证书
Prometheus
Metrics
PromQL
Breaker
定时器
Timer
Timeout
Kafka
Xorm
MySQL
Fasthttp
bytebufferpool
任务队列
队列
异步队列
GOIM
Pprof
errgroup
consistent-hash
Zinx
网络框架
设计模式
HTTP
Gateway
Queue
Docker
网关
Statefulset
NFS
Machinery
Teleport
Zero Trust
Oxy
存储
Confd
热更新
OAuth
SAML
OpenID
Openssl
AES
微服务网关
IM
KMS
安全
数据结构
hashtable
Sort
Asynq
基数树
Radix
Crontab
热重启
系统编程
sarama
Go-Zero
RDP
VNC
协程池
UDP
hashmap
网络编程
自适应技术
环形队列
Ring Buffer
Circular Buffer
InnoDB
timewheel
GroupCache
Jaeger
GOSSIP
CAP
Bash
websocket
事务
GC
TLS
singleflight
闭包
Helm
network
iptables
MITM
HTTPS
Tap
Tun
路由
wireguard
gvisor
Git
NAT
协议栈
Envoy
FRP
DPI
gopacket
DNS
eBPF
GoZero
Gost
gopsutil