GoWorkers 通用异步工作队列分析

分析一款基于 Golang 后台队列任务执行框架:jrallison/go-workers

Posted by pandaychen on June 30, 2021

0x00 前言

go-workers 是 sidekiq 的 go 实现,异步队列框架。完全满足了基于 redis queue 的任务调度工作,同时支持了自定义 middleware 供接入者开发延伸需求,仅支持 redis,支持延时任务。作者给出的特点如下:

  • reliable queueing for all queues using brpoplpush
  • handles retries
  • support custom middleware
  • customize concurrency per queue
  • responds to Unix signals to safely wait for jobs to finish before exiting.
  • provides stats on what jobs are currently running
  • well tested

实现一个异步任务队列,通常要考虑如下:

  • Producer,负责把调用者的函数、参数等传入到 Broker 里
  • Consumer,负责从 Broker 里取出消息,并且消费,如果有持久化运行结果的需求,还需要进行持久化
  • 选择一个 Producer 和 Consumer 之间序列化和反序列化的协议,通常使用 JSON
  • Consumer 对消费数据的 Acknowledge 机制
  • 任务的重试机制、隔离机制(不同属性的任务)、定时任务 / 延时任务 / 普通任务等
  • 队列支持的一致性如何?(参考kafka)

0x01 系统架构介绍

go-workers

整体分为下面模块:

  1. manager:负责 单个Queue-name 任务的处理,内含 1 个 Fetcher、N 个 worker,并发度可配置
    • worker:处理具体的任务
    • fetcher:与 redis 交互,拉取或删除任务
  2. schedule:负责延迟任务发送处理、重试任务处理
  3. producer:任务注册,任务可以设置立即执行或者延迟执行(可以优化为通过 API 接口对外部暴露),项目中是以包方法提供给外部直接调用的方式
    • 普通任务,写入 LIST:https://github.com/jrallison/go-workers/blob/master/enqueue.go#L80
    • 延时任务,写入 ZSET:https://github.com/jrallison/go-workers/blob/master/enqueue.go#L89
  4. workers:将 schedule 与 manager 模块整合在一起,提供外部启停接口

运行流程

以项目提供的 示例代码 为例,从任务执行上看:

  • 生产者:将任务放置于指定的 Queue(本质是 LIST,以 queueName 为唯一标识),任务可设置立即执行或延迟执行
  • 消费者:由 manager 模块启动多个消费者,消费 queueName 的任务,执行 job 函数

1、前置定义和配置部分
定义中间件

type myMiddleware struct{}

func (r *myMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) {
  // do something before each message is processed
  fmt.Println("before message processing...")
  acknowledge = next()
  fmt.Println("after message processing...")
  // do something after each message is processed
  return
}

//加载中间件
workers.Middleware.Append(&myMiddleware{})

初始化包配置:

workers.Configure(map[string]string{
    // location of redis instance
    "server":  "localhost:6379",
    // instance of the database
    "database":  "0",
    // number of connections to keep open with redis
    "pool":    "30",
    // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
    "process": "1",
  })

定义worker的处理方法:

func myJob(message *workers.Msg) {
  // do something with your message
  // message.Jid()
  // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
}

2、调用package提供的方法,将queueName与处理方法进行绑定,同时设置并发数(可视为不同的业务间隔离,没有注册方法的queueName不应被执行)

// pull messages from "myqueue" with concurrency of 10
workers.Process("myqueue", myJob, 10)

// pull messages from "myqueue2" with concurrency of 20
workers.Process("myqueue2", myJob, 20)

3、任务入队:Enqueue

// Add a job to a queue
workers.Enqueue("myqueue3", "Add", []int{1, 2})
// Add a job to a queue with retry
workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

若任务需要立即执行,则调用workers.Enqueue方法,将任务信息保存到 Redis 的 queueName LIST;若任务需要延迟执行,则需要先将任务保存到共用的Redis 延迟队列(ZSET)中,延时时间设置为score;此外,还可以通过workers.EnqueueWithOptions设置任务的属性

delayqueue

4、启动worker

// Blocks until process is told to exit via unix signal
workers.Run()

5、任务异步处理及获取结果

0x02 核心数据结构

1、msg:消息

type data struct {
	*simplejson.Json
}

type Msg struct {
	*data
	original string
}

2、任务(入队)

type EnqueueData struct {
	Queue      string      `json:"queue,omitempty"`
	Class      string      `json:"class"`
	Args       interface{} `json:"args"`
	Jid        string      `json:"jid"`
	EnqueuedAt float64     `json:"enqueued_at"`
	EnqueueOptions
}

0x03 实现细节

本小节着重分析下工作流转的核心过程,即manager/fetcher/worker/scheduler等模块的实现。

1、绑定关系(队列+处理方法)

示例中使用workers.Process方法注册,对每个注册的queue都开启独立的goroutine处理,调用managerStart方法:

func Process(queue string, job jobFunc, concurrency int, mids ...Action) {
	access.Lock()
	defer access.Unlock()

	managers[queue] = newManager(queue, job, concurrency, mids...)
}


//全局Run方法,开启整个流程!
func Run() {
	Start()
	go handleSignals()
	waitForExit()
}


//对每个注册的queue都开启独立的goroutine处理,调用manager的Start方法
func startManagers() {
	for _, manager := range managers {
		manager.start()
	}
}

2、任务生产:enqueue

核心方法EnqueueWithOptions,即调用RPUSH指令将任务入队,本方法用于外部接口调用;

func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error) {
	now := nowToSecondsWithNanoPrecision()
	data := EnqueueData{
		Queue:          queue,
		Class:          class,
		Args:           args,
		Jid:            generateJid(),
		EnqueuedAt:     now,
		EnqueueOptions: opts,		//入队参数
	}

	bytes, err := json.Marshal(data)
	if err != nil {
		return "", err
	}

	if now < opts.At {
		err := enqueueAt(data.At, bytes)
		return data.Jid, err
	}

	conn := Config.Pool.Get()
	defer conn.Close()

	_, err = conn.Do("sadd", Config.Namespace+"queues", queue)	//保存topic,不重复
	if err != nil {
		return "", err
	}
	queue = Config.Namespace + "queue:" + queue
	_, err = conn.Do("rpush", queue, bytes)		//使用RPUSH推送到redis
	if err != nil {
		return "", err
	}

	return data.Jid, nil
}

其中,任务额外参数包括重试次数,开关等:

type EnqueueOptions struct {
	RetryCount int     `json:"retry_count,omitempty"`
	Retry      bool    `json:"retry,omitempty"`
	At         float64 `json:"at,omitempty"`
}

3、manager 模块:

manager模块的核心逻辑如下:

  • 每个queueName都会单独创建一个manager管理协程,负责任务获取(PULL)/调度/分发到worker/完成时acknowledge等处理
  • manager 模块启动时,需要检查是否有残留任务需要处理(即未被ack的任务,如任务处理时异常退出,导致任务未执行完毕,此类任务需重新执行)
  • manager会通过独立goroutine,不停的通过fetcher.Fetch方法,将待执行任务从queueName LIST(原始工作任务队列)移动到inprogress LIST(正在执行队列),同时通过channel将任务异步发送给worker模块

1、manager结构
一个manager对应于一个queue,即某个指定的任务队列,包含1个fetcher,若干个workermanager中各个子模块之间均是通过channel通信的

type manager struct {
	queue       string
	fetch       Fetcher	//取任务
	job         jobFunc
	concurrency int		//worker的并发度
	workers     []*worker	//worker池
	workersM    *sync.Mutex
	confirm     chan *Msg
	stop        chan bool
	exit        chan bool
	mids        *Middlewares
	*sync.WaitGroup
}

从前文描述,manager包含了fetcherworker两个功能,其主要功能如下:

func (m *manager) start() {
	m.Add(1)
	m.loadWorkers()		//按并发度启动workers
	go m.manage()		//Fetch任务并根据结果进行响应处理
}

//按照配置的并发度,启动worker
func (m *manager) loadWorkers() {
	m.workersM.Lock()
	for i := 0; i < m.concurrency; i++ {
		m.workers[i] = newWorker(m)
		m.workers[i].start()
	}
	m.workersM.Unlock()
}

上述代码中的启动worker,是将managermanager.fetch.messages这个channel,传递给各个worker,用于任务异步发送的通道(各个worker通过manager.ready告知manager当前可以接收任务,是个简单的限制并发度的策略):

func (w *worker) start() {
	go w.work(w.manager.fetch.Messages())
}

func (f *fetch) Messages() chan *Msg {
	return f.messages
}

func (f *fetch) sendMessage(message string) {
	msg, err := NewMsg(message)

	if err != nil {
		Logger.Println("ERR: Couldn't create message from", message, ":", err)
		return
	}

	f.Messages() <- msg
}

fetcher的功能主要有下面两个:

  1. 独立调用fetch.Fetch()方法,该方法的功能,每次启动前仅调用一次processOldMessages,从inprocessing队列中获取上一次未被ACK的任务,进行备份处理;然后不停的基于<-f.Ready();f.tryFetchMessage()等待获取任务
  2. 监听worker模块任务成功执行结果(<-m.confirm),ack掉该任务
func (m *manager) manage() {
	Logger.Println("processing queue", m.queueName(), "with", m.concurrency, "workers.")

	go m.fetch.Fetch()

	for {
		select {
		case message := <-m.confirm:
			m.fetch.Acknowledge(message)
		case <-m.stop:
			m.exit <- true
			break
		}
	}
}

func (f *fetch) Fetch() {
	f.processOldMessages()

	go func() {
		for {
			// f.Close() has been called
			if f.Closed() {
				break
			}
			<-f.Ready()		//
			f.tryFetchMessage()	//获取任务
		}
	}()

	for {
		select {
		case <-f.stop:
			// Stop the redis-polling goroutine
			close(f.closed)
			// Signal to Close() that the fetcher has stopped
			close(f.exit)
			break
		}
	}
}

注意,tryFetchMessage方法会不停的将任务task从原始任务队列转移到inprogressQueue,通过Redis的brpoplpush指令;同时将task放入异步channel:fetch.messages,发送给worker实时处理;worker处理的成功结果会通过channel:confirm异步发送给manager模块,manager模块收到后,通过Acknowledge方法删除掉inprogressQueue的对应任务(本质是调用Redis的lrem指令),代码在此;如此就完成了一个任务处理完成到ACK的过程

func (f *fetch) tryFetchMessage() {
	conn := Config.Pool.Get()
	defer conn.Close()

	message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1))

	if err != nil {
		// If redis returns null, the queue is empty. Just ignore the error.
		if err.Error() != "redigo: nil returned" {
			Logger.Println("ERR: ", err)
			time.Sleep(1 * time.Second)
		}
	} else {
		f.sendMessage(message)
	}
}

func (f *fetch) Acknowledge(message *Msg) {
	conn := Config.Pool.Get()
	defer conn.Close()
	conn.Do("lrem", f.inprogressQueue(), -1, message.OriginalJson())
}

2、如何 ack 任务,收到 worker 的处理成功的信息后异步,删除 LIST 中的对应数据

func (f *fetch) Acknowledge(message *Msg) {
	conn := Config.Pool.Get()
	defer conn.Close()
	conn.Do("lrem", f.inprogressQueue(), -1, message.OriginalJson())
}

3、manager的quit方法

func (m *manager) quit() {
	Logger.Println("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).")
	m.prepare()

	m.workersM.Lock()
	for _, worker := range m.workers {
		worker.quit()
	}
	m.workersM.Unlock()

	m.stop <- true
	<-m.exit

	m.reset()

	m.Done()
}

4、worker 模块

worker模块的核心逻辑如下:

  1. 从与manager共享的任务channel中获取任务并执行
  2. 通过middleware中间件形式将处理状态返回,见下面的process方法
  3. 若任务处理成功,则通过channel通知manager模块,将任务从inprogress队列删除,即是acknowledge方式(可借鉴)
  4. 若任务失败需要重试,则将重试信息放入retry队列(重试间隔时间成指数级递增)

下面列举下worker核心的方法:

1、获取任务
各个worker通过messages这个channel抢夺任务,执行任务完成通过w.manager.fetch.Ready() <- true告知manager自己已经空闲,可以继续处理工作;不过这种任务是属于抢夺式的,并不是基于某种策略的分发模式,容易造成某个协程饿死的情况


func (w *worker) work(messages chan *Msg) {
	for {
		select {
		case message := <-messages:
			atomic.StoreInt64(&w.startedAt, time.Now().UTC().Unix())
			w.currentMsg = message

			//worker调用process处理任务,成功的结果异步发送到ack队列中待确认
			if w.process(message) {
				w.manager.confirm <- message
			}

			atomic.StoreInt64(&w.startedAt, 0)
			w.currentMsg = nil

			// Attempt to tell fetcher we're finished.
			// Can be used when the fetcher has slept due
			// to detecting an empty queue to requery the
			// queue immediately if we finish work.
			select {
			//告知manager,当前worker已经处理完成,可以继续接收任务
			case w.manager.fetch.FinishedWork() <- true:
			default:
			}
		case w.manager.fetch.Ready() <- true:
			// Signaled to fetcher that we're
			// ready to accept a message
		case <-w.stop:
			w.exit <- true
			return
		}
	}
}

2、执行任务的 逻辑
包含了中间件的处理方式w.manager.mids.call

func (w *worker) process(message *Msg) (acknowledge bool) {
	acknowledge = true

	defer func() {
		//防止异常崩溃
		recover()
	}()

	//以中间件形式运行并返回
	return w.manager.mids.call(w.manager.queueName(), message, func() {
		w.manager.job(message)
	})
}

5、schedule 模块

scheduler模块主要负责对延迟任务和重试任务的处理,这里巧妙的利用了延时任务和重试任务本质上其实是相同的(都是等待一段时间后出发的特点)

  • 延迟任务:ZSET 中 score 为任务执行时间戳,利用 zrangebyscore指令,score-inf -> now,获取到期的可执行任务,然后将任务从 ZSET 中删除,并放入对应 queueName 队列
  • 重试任务:同上

注意,poll的循环中,每次仅从ZSET中取1个任务,从ZSET中删除后通过LPUSH指令写入普通队列(然后再写入inprocessingQueue队列)

func (s *scheduled) poll() {
	conn := Config.Pool.Get()

	//获取延时
	now := nowToSecondsWithNanoPrecision()

	for _, key := range s.keys {
		key = Config.Namespace + key
		for {
			//仅取一个任务
			messages, _ := redis.Strings(conn.Do("zrangebyscore", key, "-inf", now, "limit", 0, 1))

			if len(messages) == 0 {
				//本轮退出
				break
			}

			message, _ := NewMsg(messages[0])

			if removed, _ := redis.Bool(conn.Do("zrem", key, messages[0])); removed {
				queue, _ := message.Get("queue").String()
				queue = strings.TrimPrefix(queue, Config.Namespace)
				message.Set("enqueued_at", nowToSecondsWithNanoPrecision())

				//到期的任务,放入普通队列
				conn.Do("lpush", Config.Namespace+"queue:"+queue, message.ToJson())
			}
		}
	}

	conn.Close()
}

fetcher 模块

fetcher模块主要封装了大部分操作Redis LIST任务队列的方法,从设计上说,使得逻辑层与数据层分离,仅通过 fetcher 模块与 redis 交互,其他模块仅需要调用fetcher提供的方法即可

  • Acknowledge方法:ack 后删除任务
  • processOldMessages方法:获取inprocessingQueue队列中未被ack的方法
  • Fetch方法:从Redis相关队列获取任务
func (f *fetch) Fetch() {
	f.processOldMessages()

	go func() {
		for {
			// f.Close() has been called
			if f.Closed() {
				break
			}
			<-f.Ready()
			f.tryFetchMessage()
		}
	}()

	for {
		select {
		case <-f.stop:
			// Stop the redis-polling goroutine
			close(f.closed)
			// Signal to Close() that the fetcher has stopped
			close(f.exit)
			break
		}
	}
}

func (f *fetch) tryFetchMessage() {
	conn := Config.Pool.Get()
	defer conn.Close()

	message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1))

	if err != nil {
		// If redis returns null, the queue is empty. Just ignore the error.
		if err.Error() != "redigo: nil returned" {
			Logger.Println("ERR: ", err)
			time.Sleep(1 * time.Second)
		}
	} else {
		//发送任务到worker
		f.sendMessage(message)
	}
}

Acknowledge方法,从inprocessingQueue删除对应的任务,注意这里LREM的用法是从表尾开始向表头搜索,移除与 VALUE 相等的元素,数量为 COUNT 的绝对值(因为被优先处理的任务一般都在LIST的尾部)

func (f *fetch) Acknowledge(message *Msg) {
	conn := Config.Pool.Get()
	defer conn.Close()
	conn.Do("lrem", f.inprogressQueue(), -1, message.OriginalJson())
}

hooks实现

hook

0x04 消费者一致性语义

参考另外一篇文章,可以简单的嵌套一下场景:

  • schedule 模块从 ZSET 中检索可执行任务,然后先调用 ZREM,再调用 LPUSH 将任务放入 queueName 中,如果在 ZREM 之后,LPUSH 之前挂掉,则任务丢失且无感知,这里是 at most once模型
  • manager 模块将任务下发后,worker 处理完毕,在 ack 之前进程挂掉,下次再次启动 manager 会进行任务重发。即ack之前的异常退出都被视作任务处理失败,触发重入,这里是 at least once模型(PS:在这种情况下,开发者需要自行实现消费者的幂等性)

0x05 总结

本项目基于 golang 实现了通用的任务系统(相对于 machinery 较为轻量级),将具体业务逻辑剥离,保留了整个最核心的任务拆分、任务调度、任务作业等功能,局限是仅支持 Redis 作为 Broker,此外,go-workers提供的方法均为包类型,直接调用即可。

从功能上说,该系统提供了业务隔离设计、延迟任务支持,失败重试能力以及ack任务的功能。此外,提供了middleware的中间件实现,将日志、状态统计、错误重试、用户自定义钩子等嵌入到任务处理流程中,可以借鉴

0x06 参考

转载请注明出处,本文采用 CC4.0 协议授权