0x00 前言
go-workers 是 sidekiq 的 go 实现,异步队列框架。完全满足了基于 redis queue 的任务调度工作,同时支持了自定义 middleware 供接入者开发延伸需求,仅支持 redis,支持延时任务。作者给出的特点如下:
- reliable queueing for all queues using brpoplpush
 - handles retries
 - support custom middleware
 - customize concurrency per queue
 - responds to Unix signals to safely wait for jobs to finish before exiting.
 - provides stats on what jobs are currently running
 - well tested
 
实现一个异步任务队列,通常要考虑如下:
- Producer,负责把调用者的函数、参数等传入到 Broker 里
 - Consumer,负责从 Broker 里取出消息,并且消费,如果有持久化运行结果的需求,还需要进行持久化
 - 选择一个 Producer 和 Consumer 之间序列化和反序列化的协议,通常使用 JSON
 - Consumer 对消费数据的 Acknowledge 机制
 - 任务的重试机制、隔离机制(不同属性的任务)、定时任务 / 延时任务 / 普通任务等
 - 队列支持的一致性如何?(参考kafka)
 
0x01 系统架构介绍

整体分为下面模块:
- manager:负责 单个Queue-name 任务的处理,内含 
1个 Fetcher、N个 worker,并发度可配置 - schedule:负责延迟任务发送处理、重试任务处理
 - producer:任务注册,任务可以设置立即执行或者延迟执行(可以优化为通过 API 接口对外部暴露),项目中是以包方法提供给外部直接调用的方式
    
- 普通任务,写入 LIST:https://github.com/jrallison/go-workers/blob/master/enqueue.go#L80
 - 延时任务,写入 ZSET:https://github.com/jrallison/go-workers/blob/master/enqueue.go#L89
 
 - workers:将 schedule 与 manager 模块整合在一起,提供外部启停接口
 
运行流程
以项目提供的 示例代码 为例,从任务执行上看:
- 生产者:将任务放置于指定的 Queue(本质是 LIST,以 queueName 为唯一标识),任务可设置立即执行或延迟执行
 - 消费者:由 manager 模块启动多个消费者,消费 queueName 的任务,执行 job 函数
 
1、前置定义和配置部分 
定义中间件
type myMiddleware struct{}
func (r *myMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) {
  // do something before each message is processed
  fmt.Println("before message processing...")
  acknowledge = next()
  fmt.Println("after message processing...")
  // do something after each message is processed
  return
}
//加载中间件
workers.Middleware.Append(&myMiddleware{})
初始化包配置:
workers.Configure(map[string]string{
    // location of redis instance
    "server":  "localhost:6379",
    // instance of the database
    "database":  "0",
    // number of connections to keep open with redis
    "pool":    "30",
    // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
    "process": "1",
  })
定义worker的处理方法:
func myJob(message *workers.Msg) {
  // do something with your message
  // message.Jid()
  // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
}
2、调用package提供的方法,将queueName与处理方法进行绑定,同时设置并发数(可视为不同的业务间隔离,没有注册方法的queueName不应被执行)
// pull messages from "myqueue" with concurrency of 10
workers.Process("myqueue", myJob, 10)
// pull messages from "myqueue2" with concurrency of 20
workers.Process("myqueue2", myJob, 20)
3、任务入队:Enqueue
// Add a job to a queue
workers.Enqueue("myqueue3", "Add", []int{1, 2})
// Add a job to a queue with retry
workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})
若任务需要立即执行,则调用workers.Enqueue方法,将任务信息保存到 Redis 的 queueName LIST;若任务需要延迟执行,则需要先将任务保存到共用的Redis 延迟队列(ZSET)中,延时时间设置为score;此外,还可以通过workers.EnqueueWithOptions设置任务的属性

4、启动worker
// Blocks until process is told to exit via unix signal
workers.Run()
5、任务异步处理及获取结果
0x02 核心数据结构
1、msg:消息
type data struct {
	*simplejson.Json
}
type Msg struct {
	*data
	original string
}
2、任务(入队)
type EnqueueData struct {
	Queue      string      `json:"queue,omitempty"`
	Class      string      `json:"class"`
	Args       interface{} `json:"args"`
	Jid        string      `json:"jid"`
	EnqueuedAt float64     `json:"enqueued_at"`
	EnqueueOptions
}
0x03 实现细节
本小节着重分析下工作流转的核心过程,即manager/fetcher/worker/scheduler等模块的实现。
1、绑定关系(队列+处理方法)
示例中使用workers.Process方法注册,对每个注册的queue都开启独立的goroutine处理,调用manager的Start方法:
func Process(queue string, job jobFunc, concurrency int, mids ...Action) {
	access.Lock()
	defer access.Unlock()
	managers[queue] = newManager(queue, job, concurrency, mids...)
}
//全局Run方法,开启整个流程!
func Run() {
	Start()
	go handleSignals()
	waitForExit()
}
//对每个注册的queue都开启独立的goroutine处理,调用manager的Start方法
func startManagers() {
	for _, manager := range managers {
		manager.start()
	}
}
2、任务生产:enqueue
核心方法是EnqueueWithOptions,即调用RPUSH指令将任务入队,本方法用于外部接口调用;
func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error) {
	now := nowToSecondsWithNanoPrecision()
	data := EnqueueData{
		Queue:          queue,
		Class:          class,
		Args:           args,
		Jid:            generateJid(),
		EnqueuedAt:     now,
		EnqueueOptions: opts,		//入队参数
	}
	bytes, err := json.Marshal(data)
	if err != nil {
		return "", err
	}
	if now < opts.At {
		err := enqueueAt(data.At, bytes)
		return data.Jid, err
	}
	conn := Config.Pool.Get()
	defer conn.Close()
	_, err = conn.Do("sadd", Config.Namespace+"queues", queue)	//保存topic,不重复
	if err != nil {
		return "", err
	}
	queue = Config.Namespace + "queue:" + queue
	_, err = conn.Do("rpush", queue, bytes)		//使用RPUSH推送到redis
	if err != nil {
		return "", err
	}
	return data.Jid, nil
}
其中,任务额外参数包括重试次数,开关等:
type EnqueueOptions struct {
	RetryCount int     `json:"retry_count,omitempty"`
	Retry      bool    `json:"retry,omitempty"`
	At         float64 `json:"at,omitempty"`
}
3、manager 模块:
manager模块的核心逻辑如下:
- 每个queueName都会单独创建一个manager管理协程,负责任务获取(PULL)/调度/分发到worker/完成时acknowledge等处理
 - manager 模块启动时,需要检查是否有残留任务需要处理(即未被ack的任务,如任务处理时异常退出,导致任务未执行完毕,此类任务需重新执行)
 - manager会通过独立goroutine,不停的通过
fetcher.Fetch方法,将待执行任务从queueName LIST(原始工作任务队列)移动到inprogress LIST(正在执行队列),同时通过channel将任务异步发送给worker模块 
1、manager结构
一个manager对应于一个queue,即某个指定的任务队列,包含1个fetcher,若干个worker;manager中各个子模块之间均是通过channel通信的
type manager struct {
	queue       string
	fetch       Fetcher	//取任务
	job         jobFunc
	concurrency int		//worker的并发度
	workers     []*worker	//worker池
	workersM    *sync.Mutex
	confirm     chan *Msg
	stop        chan bool
	exit        chan bool
	mids        *Middlewares
	*sync.WaitGroup
}
从前文描述,manager包含了fetcher和worker两个功能,其主要功能如下:
func (m *manager) start() {
	m.Add(1)
	m.loadWorkers()		//按并发度启动workers
	go m.manage()		//Fetch任务并根据结果进行响应处理
}
//按照配置的并发度,启动worker
func (m *manager) loadWorkers() {
	m.workersM.Lock()
	for i := 0; i < m.concurrency; i++ {
		m.workers[i] = newWorker(m)
		m.workers[i].start()
	}
	m.workersM.Unlock()
}
上述代码中的启动worker,是将manager的manager.fetch.messages这个channel,传递给各个worker,用于任务异步发送的通道(各个worker通过manager.ready告知manager当前可以接收任务,是个简单的限制并发度的策略):
func (w *worker) start() {
	go w.work(w.manager.fetch.Messages())
}
func (f *fetch) Messages() chan *Msg {
	return f.messages
}
func (f *fetch) sendMessage(message string) {
	msg, err := NewMsg(message)
	if err != nil {
		Logger.Println("ERR: Couldn't create message from", message, ":", err)
		return
	}
	f.Messages() <- msg
}
fetcher的功能主要有下面两个:
- 独立调用
fetch.Fetch()方法,该方法的功能,每次启动前仅调用一次processOldMessages,从inprocessing队列中获取上一次未被ACK的任务,进行备份处理;然后不停的基于<-f.Ready();f.tryFetchMessage()等待获取任务 - 监听
worker模块任务成功执行结果(<-m.confirm),ack掉该任务 
func (m *manager) manage() {
	Logger.Println("processing queue", m.queueName(), "with", m.concurrency, "workers.")
	go m.fetch.Fetch()
	for {
		select {
		case message := <-m.confirm:
			m.fetch.Acknowledge(message)
		case <-m.stop:
			m.exit <- true
			break
		}
	}
}
func (f *fetch) Fetch() {
	f.processOldMessages()
	go func() {
		for {
			// f.Close() has been called
			if f.Closed() {
				break
			}
			<-f.Ready()		//
			f.tryFetchMessage()	//获取任务
		}
	}()
	for {
		select {
		case <-f.stop:
			// Stop the redis-polling goroutine
			close(f.closed)
			// Signal to Close() that the fetcher has stopped
			close(f.exit)
			break
		}
	}
}
注意,tryFetchMessage方法会不停的将任务task从原始任务队列转移到inprogressQueue,通过Redis的brpoplpush指令;同时将task放入异步channel:fetch.messages,发送给worker实时处理;worker处理的成功结果会通过channel:confirm,异步发送给manager模块,manager模块收到后,通过Acknowledge方法删除掉inprogressQueue的对应任务(本质是调用Redis的lrem指令),代码在此;如此就完成了一个任务处理完成到ACK的过程;
func (f *fetch) tryFetchMessage() {
	conn := Config.Pool.Get()
	defer conn.Close()
	message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1))
	if err != nil {
		// If redis returns null, the queue is empty. Just ignore the error.
		if err.Error() != "redigo: nil returned" {
			Logger.Println("ERR: ", err)
			time.Sleep(1 * time.Second)
		}
	} else {
		f.sendMessage(message)
	}
}
func (f *fetch) Acknowledge(message *Msg) {
	conn := Config.Pool.Get()
	defer conn.Close()
	conn.Do("lrem", f.inprogressQueue(), -1, message.OriginalJson())
}
2、如何 ack 任务,收到 worker 的处理成功的信息后异步,删除 LIST 中的对应数据
func (f *fetch) Acknowledge(message *Msg) {
	conn := Config.Pool.Get()
	defer conn.Close()
	conn.Do("lrem", f.inprogressQueue(), -1, message.OriginalJson())
}
3、manager的quit方法
func (m *manager) quit() {
	Logger.Println("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).")
	m.prepare()
	m.workersM.Lock()
	for _, worker := range m.workers {
		worker.quit()
	}
	m.workersM.Unlock()
	m.stop <- true
	<-m.exit
	m.reset()
	m.Done()
}
4、worker 模块
worker模块的核心逻辑如下:
- 从与manager共享的任务channel中获取任务并执行
 - 通过middleware中间件形式将处理状态返回,见下面的
process方法 - 若任务处理成功,则通过channel通知manager模块,将任务从inprogress队列删除,即是acknowledge方式(可借鉴)
 - 若任务失败需要重试,则将重试信息放入retry队列(重试间隔时间成指数级递增)
 
下面列举下worker核心的方法:
1、获取任务
各个worker通过messages这个channel抢夺任务,执行任务完成通过w.manager.fetch.Ready() <- true告知manager自己已经空闲,可以继续处理工作;不过这种任务是属于抢夺式的,并不是基于某种策略的分发模式,容易造成某个协程饿死的情况
func (w *worker) work(messages chan *Msg) {
	for {
		select {
		case message := <-messages:
			atomic.StoreInt64(&w.startedAt, time.Now().UTC().Unix())
			w.currentMsg = message
			//worker调用process处理任务,成功的结果异步发送到ack队列中待确认
			if w.process(message) {
				w.manager.confirm <- message
			}
			atomic.StoreInt64(&w.startedAt, 0)
			w.currentMsg = nil
			// Attempt to tell fetcher we're finished.
			// Can be used when the fetcher has slept due
			// to detecting an empty queue to requery the
			// queue immediately if we finish work.
			select {
			//告知manager,当前worker已经处理完成,可以继续接收任务
			case w.manager.fetch.FinishedWork() <- true:
			default:
			}
		case w.manager.fetch.Ready() <- true:
			// Signaled to fetcher that we're
			// ready to accept a message
		case <-w.stop:
			w.exit <- true
			return
		}
	}
}
2、执行任务的 逻辑
包含了中间件的处理方式w.manager.mids.call
func (w *worker) process(message *Msg) (acknowledge bool) {
	acknowledge = true
	defer func() {
		//防止异常崩溃
		recover()
	}()
	//以中间件形式运行并返回
	return w.manager.mids.call(w.manager.queueName(), message, func() {
		w.manager.job(message)
	})
}
5、schedule 模块
scheduler模块主要负责对延迟任务和重试任务的处理,这里巧妙的利用了延时任务和重试任务本质上其实是相同的(都是等待一段时间后出发的特点)
- 延迟任务:ZSET 中 
score为任务执行时间戳,利用zrangebyscore指令,score为-inf -> now,获取到期的可执行任务,然后将任务从 ZSET 中删除,并放入对应 queueName 队列 - 重试任务:同上
 
注意,poll的循环中,每次仅从ZSET中取1个任务,从ZSET中删除后通过LPUSH指令写入普通队列(然后再写入inprocessingQueue队列)
func (s *scheduled) poll() {
	conn := Config.Pool.Get()
	//获取延时
	now := nowToSecondsWithNanoPrecision()
	for _, key := range s.keys {
		key = Config.Namespace + key
		for {
			//仅取一个任务
			messages, _ := redis.Strings(conn.Do("zrangebyscore", key, "-inf", now, "limit", 0, 1))
			if len(messages) == 0 {
				//本轮退出
				break
			}
			message, _ := NewMsg(messages[0])
			if removed, _ := redis.Bool(conn.Do("zrem", key, messages[0])); removed {
				queue, _ := message.Get("queue").String()
				queue = strings.TrimPrefix(queue, Config.Namespace)
				message.Set("enqueued_at", nowToSecondsWithNanoPrecision())
				//到期的任务,放入普通队列
				conn.Do("lpush", Config.Namespace+"queue:"+queue, message.ToJson())
			}
		}
	}
	conn.Close()
}
fetcher 模块
fetcher模块主要封装了大部分操作Redis LIST任务队列的方法,从设计上说,使得逻辑层与数据层分离,仅通过 fetcher 模块与 redis 交互,其他模块仅需要调用fetcher提供的方法即可:
Acknowledge方法:ack 后删除任务processOldMessages方法:获取inprocessingQueue队列中未被ack的方法Fetch方法:从Redis相关队列获取任务
func (f *fetch) Fetch() {
	f.processOldMessages()
	go func() {
		for {
			// f.Close() has been called
			if f.Closed() {
				break
			}
			<-f.Ready()
			f.tryFetchMessage()
		}
	}()
	for {
		select {
		case <-f.stop:
			// Stop the redis-polling goroutine
			close(f.closed)
			// Signal to Close() that the fetcher has stopped
			close(f.exit)
			break
		}
	}
}
func (f *fetch) tryFetchMessage() {
	conn := Config.Pool.Get()
	defer conn.Close()
	message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1))
	if err != nil {
		// If redis returns null, the queue is empty. Just ignore the error.
		if err.Error() != "redigo: nil returned" {
			Logger.Println("ERR: ", err)
			time.Sleep(1 * time.Second)
		}
	} else {
		//发送任务到worker
		f.sendMessage(message)
	}
}
Acknowledge方法,从inprocessingQueue删除对应的任务,注意这里LREM的用法是从表尾开始向表头搜索,移除与 VALUE 相等的元素,数量为 COUNT 的绝对值(因为被优先处理的任务一般都在LIST的尾部)
func (f *fetch) Acknowledge(message *Msg) {
	conn := Config.Pool.Get()
	defer conn.Close()
	conn.Do("lrem", f.inprogressQueue(), -1, message.OriginalJson())
}
hooks实现
0x04 消费者一致性语义
参考另外一篇文章,可以简单的嵌套一下场景:
- schedule 模块从 ZSET 中检索可执行任务,然后先调用 
ZREM,再调用LPUSH将任务放入 queueName 中,如果在ZREM之后,LPUSH之前挂掉,则任务丢失且无感知,这里是 at most once模型 - manager 模块将任务下发后,worker 处理完毕,在 ack 之前进程挂掉,下次再次启动 manager 会进行任务重发。即ack之前的异常退出都被视作任务处理失败,触发重入,这里是 at least once模型(PS:在这种情况下,开发者需要自行实现消费者的幂等性)
 
0x05 总结
本项目基于 golang 实现了通用的任务系统(相对于 machinery 较为轻量级),将具体业务逻辑剥离,保留了整个最核心的任务拆分、任务调度、任务作业等功能,局限是仅支持 Redis 作为 Broker,此外,go-workers提供的方法均为包类型,直接调用即可。
从功能上说,该系统提供了业务隔离设计、延迟任务支持,失败重试能力以及ack任务的功能。此外,提供了middleware的中间件实现,将日志、状态统计、错误重试、用户自定义钩子等嵌入到任务处理流程中,可以借鉴
0x06 参考
- Sidekiq compatible background workers in golang
 - [12 台减至 3 台] 用 Golang 重写 Sidekiq 的 worker
 - golang任务作业系统实现分析
 
转载请注明出处,本文采用 CC4.0 协议授权