0x00 前言
go-workers 是 sidekiq 的 go 实现,异步队列框架。完全满足了基于 redis queue 的任务调度工作,同时支持了自定义 middleware 供接入者开发延伸需求,仅支持 redis,支持延时任务。作者给出的特点如下:
- reliable queueing for all queues using brpoplpush
- handles retries
- support custom middleware
- customize concurrency per queue
- responds to Unix signals to safely wait for jobs to finish before exiting.
- provides stats on what jobs are currently running
- well tested
实现一个异步任务队列,通常要考虑如下:
- Producer,负责把调用者的函数、参数等传入到 Broker 里
- Consumer,负责从 Broker 里取出消息,并且消费,如果有持久化运行结果的需求,还需要进行持久化
- 选择一个 Producer 和 Consumer 之间序列化和反序列化的协议,通常使用 JSON
- Consumer 对消费数据的 Acknowledge 机制
- 任务的重试机制、隔离机制(不同属性的任务)、定时任务 / 延时任务 / 普通任务等
- 队列支持的一致性如何?(参考kafka)
0x01 系统架构介绍
整体分为下面模块:
- manager:负责 单个Queue-name 任务的处理,内含
1
个 Fetcher、N
个 worker,并发度可配置 - schedule:负责延迟任务发送处理、重试任务处理
- producer:任务注册,任务可以设置立即执行或者延迟执行(可以优化为通过 API 接口对外部暴露),项目中是以包方法提供给外部直接调用的方式
- 普通任务,写入 LIST:https://github.com/jrallison/go-workers/blob/master/enqueue.go#L80
- 延时任务,写入 ZSET:https://github.com/jrallison/go-workers/blob/master/enqueue.go#L89
- workers:将 schedule 与 manager 模块整合在一起,提供外部启停接口
运行流程
以项目提供的 示例代码 为例,从任务执行上看:
- 生产者:将任务放置于指定的 Queue(本质是 LIST,以 queueName 为唯一标识),任务可设置立即执行或延迟执行
- 消费者:由 manager 模块启动多个消费者,消费 queueName 的任务,执行 job 函数
1、前置定义和配置部分
定义中间件
type myMiddleware struct{}
func (r *myMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) {
// do something before each message is processed
fmt.Println("before message processing...")
acknowledge = next()
fmt.Println("after message processing...")
// do something after each message is processed
return
}
//加载中间件
workers.Middleware.Append(&myMiddleware{})
初始化包配置:
workers.Configure(map[string]string{
// location of redis instance
"server": "localhost:6379",
// instance of the database
"database": "0",
// number of connections to keep open with redis
"pool": "30",
// unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
"process": "1",
})
定义worker的处理方法:
func myJob(message *workers.Msg) {
// do something with your message
// message.Jid()
// message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
}
2、调用package提供的方法,将queueName与处理方法进行绑定,同时设置并发数(可视为不同的业务间隔离,没有注册方法的queueName不应被执行)
// pull messages from "myqueue" with concurrency of 10
workers.Process("myqueue", myJob, 10)
// pull messages from "myqueue2" with concurrency of 20
workers.Process("myqueue2", myJob, 20)
3、任务入队:Enqueue
// Add a job to a queue
workers.Enqueue("myqueue3", "Add", []int{1, 2})
// Add a job to a queue with retry
workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})
若任务需要立即执行,则调用workers.Enqueue
方法,将任务信息保存到 Redis 的 queueName LIST;若任务需要延迟执行,则需要先将任务保存到共用的Redis 延迟队列(ZSET)中,延时时间设置为score;此外,还可以通过workers.EnqueueWithOptions
设置任务的属性
4、启动worker
// Blocks until process is told to exit via unix signal
workers.Run()
5、任务异步处理及获取结果
0x02 核心数据结构
1、msg:消息
type data struct {
*simplejson.Json
}
type Msg struct {
*data
original string
}
2、任务(入队)
type EnqueueData struct {
Queue string `json:"queue,omitempty"`
Class string `json:"class"`
Args interface{} `json:"args"`
Jid string `json:"jid"`
EnqueuedAt float64 `json:"enqueued_at"`
EnqueueOptions
}
0x03 实现细节
本小节着重分析下工作流转的核心过程,即manager/fetcher/worker/scheduler等模块的实现。
1、绑定关系(队列+处理方法)
示例中使用workers.Process
方法注册,对每个注册的queue
都开启独立的goroutine处理,调用manager
的Start
方法:
func Process(queue string, job jobFunc, concurrency int, mids ...Action) {
access.Lock()
defer access.Unlock()
managers[queue] = newManager(queue, job, concurrency, mids...)
}
//全局Run方法,开启整个流程!
func Run() {
Start()
go handleSignals()
waitForExit()
}
//对每个注册的queue都开启独立的goroutine处理,调用manager的Start方法
func startManagers() {
for _, manager := range managers {
manager.start()
}
}
2、任务生产:enqueue
核心方法是EnqueueWithOptions
,即调用RPUSH
指令将任务入队,本方法用于外部接口调用;
func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error) {
now := nowToSecondsWithNanoPrecision()
data := EnqueueData{
Queue: queue,
Class: class,
Args: args,
Jid: generateJid(),
EnqueuedAt: now,
EnqueueOptions: opts, //入队参数
}
bytes, err := json.Marshal(data)
if err != nil {
return "", err
}
if now < opts.At {
err := enqueueAt(data.At, bytes)
return data.Jid, err
}
conn := Config.Pool.Get()
defer conn.Close()
_, err = conn.Do("sadd", Config.Namespace+"queues", queue) //保存topic,不重复
if err != nil {
return "", err
}
queue = Config.Namespace + "queue:" + queue
_, err = conn.Do("rpush", queue, bytes) //使用RPUSH推送到redis
if err != nil {
return "", err
}
return data.Jid, nil
}
其中,任务额外参数包括重试次数,开关等:
type EnqueueOptions struct {
RetryCount int `json:"retry_count,omitempty"`
Retry bool `json:"retry,omitempty"`
At float64 `json:"at,omitempty"`
}
3、manager 模块:
manager模块的核心逻辑如下:
- 每个queueName都会单独创建一个manager管理协程,负责任务获取(PULL)/调度/分发到worker/完成时acknowledge等处理
- manager 模块启动时,需要检查是否有残留任务需要处理(即未被ack的任务,如任务处理时异常退出,导致任务未执行完毕,此类任务需重新执行)
- manager会通过独立goroutine,不停的通过
fetcher.Fetch
方法,将待执行任务从queueName LIST(原始工作任务队列)移动到inprogress LIST(正在执行队列),同时通过channel将任务异步发送给worker模块
1、manager
结构
一个manager
对应于一个queue
,即某个指定的任务队列,包含1
个fetcher,若干个worker
;manager
中各个子模块之间均是通过channel通信的
type manager struct {
queue string
fetch Fetcher //取任务
job jobFunc
concurrency int //worker的并发度
workers []*worker //worker池
workersM *sync.Mutex
confirm chan *Msg
stop chan bool
exit chan bool
mids *Middlewares
*sync.WaitGroup
}
从前文描述,manager
包含了fetcher
和worker
两个功能,其主要功能如下:
func (m *manager) start() {
m.Add(1)
m.loadWorkers() //按并发度启动workers
go m.manage() //Fetch任务并根据结果进行响应处理
}
//按照配置的并发度,启动worker
func (m *manager) loadWorkers() {
m.workersM.Lock()
for i := 0; i < m.concurrency; i++ {
m.workers[i] = newWorker(m)
m.workers[i].start()
}
m.workersM.Unlock()
}
上述代码中的启动worker,是将manager
的manager.fetch.messages
这个channel,传递给各个worker
,用于任务异步发送的通道(各个worker
通过manager.ready
告知manager
当前可以接收任务,是个简单的限制并发度的策略):
func (w *worker) start() {
go w.work(w.manager.fetch.Messages())
}
func (f *fetch) Messages() chan *Msg {
return f.messages
}
func (f *fetch) sendMessage(message string) {
msg, err := NewMsg(message)
if err != nil {
Logger.Println("ERR: Couldn't create message from", message, ":", err)
return
}
f.Messages() <- msg
}
fetcher
的功能主要有下面两个:
- 独立调用
fetch.Fetch()
方法,该方法的功能,每次启动前仅调用一次processOldMessages
,从inprocessing队列中获取上一次未被ACK的任务,进行备份处理;然后不停的基于<-f.Ready();f.tryFetchMessage()
等待获取任务 - 监听
worker
模块任务成功执行结果(<-m.confirm
),ack掉该任务
func (m *manager) manage() {
Logger.Println("processing queue", m.queueName(), "with", m.concurrency, "workers.")
go m.fetch.Fetch()
for {
select {
case message := <-m.confirm:
m.fetch.Acknowledge(message)
case <-m.stop:
m.exit <- true
break
}
}
}
func (f *fetch) Fetch() {
f.processOldMessages()
go func() {
for {
// f.Close() has been called
if f.Closed() {
break
}
<-f.Ready() //
f.tryFetchMessage() //获取任务
}
}()
for {
select {
case <-f.stop:
// Stop the redis-polling goroutine
close(f.closed)
// Signal to Close() that the fetcher has stopped
close(f.exit)
break
}
}
}
注意,tryFetchMessage
方法会不停的将任务task从原始任务队列转移到inprogressQueue
,通过Redis的brpoplpush
指令;同时将task放入异步channel:fetch.messages
,发送给worker实时处理;worker处理的成功结果会通过channel:confirm
,异步发送给manager模块,manager模块收到后,通过Acknowledge
方法删除掉inprogressQueue
的对应任务(本质是调用Redis的lrem
指令),代码在此;如此就完成了一个任务处理完成到ACK的过程;
func (f *fetch) tryFetchMessage() {
conn := Config.Pool.Get()
defer conn.Close()
message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1))
if err != nil {
// If redis returns null, the queue is empty. Just ignore the error.
if err.Error() != "redigo: nil returned" {
Logger.Println("ERR: ", err)
time.Sleep(1 * time.Second)
}
} else {
f.sendMessage(message)
}
}
func (f *fetch) Acknowledge(message *Msg) {
conn := Config.Pool.Get()
defer conn.Close()
conn.Do("lrem", f.inprogressQueue(), -1, message.OriginalJson())
}
2、如何 ack 任务,收到 worker 的处理成功的信息后异步,删除 LIST 中的对应数据
func (f *fetch) Acknowledge(message *Msg) {
conn := Config.Pool.Get()
defer conn.Close()
conn.Do("lrem", f.inprogressQueue(), -1, message.OriginalJson())
}
3、manager的quit方法
func (m *manager) quit() {
Logger.Println("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).")
m.prepare()
m.workersM.Lock()
for _, worker := range m.workers {
worker.quit()
}
m.workersM.Unlock()
m.stop <- true
<-m.exit
m.reset()
m.Done()
}
4、worker 模块
worker模块的核心逻辑如下:
- 从与manager共享的任务channel中获取任务并执行
- 通过middleware中间件形式将处理状态返回,见下面的
process
方法 - 若任务处理成功,则通过channel通知manager模块,将任务从inprogress队列删除,即是acknowledge方式(可借鉴)
- 若任务失败需要重试,则将重试信息放入retry队列(重试间隔时间成指数级递增)
下面列举下worker核心的方法:
1、获取任务
各个worker
通过messages
这个channel抢夺任务,执行任务完成通过w.manager.fetch.Ready() <- true
告知manager
自己已经空闲,可以继续处理工作;不过这种任务是属于抢夺式的,并不是基于某种策略的分发模式,容易造成某个协程饿死的情况
func (w *worker) work(messages chan *Msg) {
for {
select {
case message := <-messages:
atomic.StoreInt64(&w.startedAt, time.Now().UTC().Unix())
w.currentMsg = message
//worker调用process处理任务,成功的结果异步发送到ack队列中待确认
if w.process(message) {
w.manager.confirm <- message
}
atomic.StoreInt64(&w.startedAt, 0)
w.currentMsg = nil
// Attempt to tell fetcher we're finished.
// Can be used when the fetcher has slept due
// to detecting an empty queue to requery the
// queue immediately if we finish work.
select {
//告知manager,当前worker已经处理完成,可以继续接收任务
case w.manager.fetch.FinishedWork() <- true:
default:
}
case w.manager.fetch.Ready() <- true:
// Signaled to fetcher that we're
// ready to accept a message
case <-w.stop:
w.exit <- true
return
}
}
}
2、执行任务的 逻辑
包含了中间件的处理方式w.manager.mids.call
func (w *worker) process(message *Msg) (acknowledge bool) {
acknowledge = true
defer func() {
//防止异常崩溃
recover()
}()
//以中间件形式运行并返回
return w.manager.mids.call(w.manager.queueName(), message, func() {
w.manager.job(message)
})
}
5、schedule 模块
scheduler模块主要负责对延迟任务和重试任务的处理,这里巧妙的利用了延时任务和重试任务本质上其实是相同的(都是等待一段时间后出发的特点)
- 延迟任务:ZSET 中
score
为任务执行时间戳,利用zrangebyscore
指令,score
为-inf -> now
,获取到期的可执行任务,然后将任务从 ZSET 中删除,并放入对应 queueName 队列 - 重试任务:同上
注意,poll
的循环中,每次仅从ZSET中取1
个任务,从ZSET中删除后通过LPUSH
指令写入普通队列(然后再写入inprocessingQueue
队列)
func (s *scheduled) poll() {
conn := Config.Pool.Get()
//获取延时
now := nowToSecondsWithNanoPrecision()
for _, key := range s.keys {
key = Config.Namespace + key
for {
//仅取一个任务
messages, _ := redis.Strings(conn.Do("zrangebyscore", key, "-inf", now, "limit", 0, 1))
if len(messages) == 0 {
//本轮退出
break
}
message, _ := NewMsg(messages[0])
if removed, _ := redis.Bool(conn.Do("zrem", key, messages[0])); removed {
queue, _ := message.Get("queue").String()
queue = strings.TrimPrefix(queue, Config.Namespace)
message.Set("enqueued_at", nowToSecondsWithNanoPrecision())
//到期的任务,放入普通队列
conn.Do("lpush", Config.Namespace+"queue:"+queue, message.ToJson())
}
}
}
conn.Close()
}
fetcher 模块
fetcher模块主要封装了大部分操作Redis LIST任务队列的方法,从设计上说,使得逻辑层与数据层分离,仅通过 fetcher 模块与 redis 交互,其他模块仅需要调用fetcher提供的方法即可:
Acknowledge
方法:ack 后删除任务processOldMessages
方法:获取inprocessingQueue
队列中未被ack的方法Fetch
方法:从Redis相关队列获取任务
func (f *fetch) Fetch() {
f.processOldMessages()
go func() {
for {
// f.Close() has been called
if f.Closed() {
break
}
<-f.Ready()
f.tryFetchMessage()
}
}()
for {
select {
case <-f.stop:
// Stop the redis-polling goroutine
close(f.closed)
// Signal to Close() that the fetcher has stopped
close(f.exit)
break
}
}
}
func (f *fetch) tryFetchMessage() {
conn := Config.Pool.Get()
defer conn.Close()
message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1))
if err != nil {
// If redis returns null, the queue is empty. Just ignore the error.
if err.Error() != "redigo: nil returned" {
Logger.Println("ERR: ", err)
time.Sleep(1 * time.Second)
}
} else {
//发送任务到worker
f.sendMessage(message)
}
}
Acknowledge
方法,从inprocessingQueue
删除对应的任务,注意这里LREM
的用法是从表尾开始向表头搜索,移除与 VALUE
相等的元素,数量为 COUNT
的绝对值(因为被优先处理的任务一般都在LIST的尾部)
func (f *fetch) Acknowledge(message *Msg) {
conn := Config.Pool.Get()
defer conn.Close()
conn.Do("lrem", f.inprogressQueue(), -1, message.OriginalJson())
}
hooks实现
0x04 消费者一致性语义
参考另外一篇文章,可以简单的嵌套一下场景:
- schedule 模块从 ZSET 中检索可执行任务,然后先调用
ZREM
,再调用LPUSH
将任务放入 queueName 中,如果在ZREM
之后,LPUSH
之前挂掉,则任务丢失且无感知,这里是 at most once模型 - manager 模块将任务下发后,worker 处理完毕,在 ack 之前进程挂掉,下次再次启动 manager 会进行任务重发。即ack之前的异常退出都被视作任务处理失败,触发重入,这里是 at least once模型(PS:在这种情况下,开发者需要自行实现消费者的幂等性)
0x05 总结
本项目基于 golang 实现了通用的任务系统(相对于 machinery 较为轻量级),将具体业务逻辑剥离,保留了整个最核心的任务拆分、任务调度、任务作业等功能,局限是仅支持 Redis 作为 Broker,此外,go-workers提供的方法均为包类型,直接调用即可。
从功能上说,该系统提供了业务隔离设计、延迟任务支持,失败重试能力以及ack任务的功能。此外,提供了middleware的中间件实现,将日志、状态统计、错误重试、用户自定义钩子等嵌入到任务处理流程中,可以借鉴
0x06 参考
- Sidekiq compatible background workers in golang
- [12 台减至 3 台] 用 Golang 重写 Sidekiq 的 worker
- golang任务作业系统实现分析
转载请注明出处,本文采用 CC4.0 协议授权