asynq 是 Go 实现的一种简单、可靠、高效的分布式任务队列
介绍
- Asynq 是一个 Go 库,用于对任务进行排队并与工作线程异步处理它们
- Asynq 工作原理
- 客户端将任务放入队列
- 服务器从队列中取出任务并为每个任务启动一个工作协程
- 任务由多个 worker 同时处理
- 任务队列是一种在多台机器上分配工作的机制。一个系统可以由多个工作服务器和代理组成,从而实现高可用性和横向扩展
- 特性包括
- 保证至少执行一次任务
- 任务调度
- 重试失败的任务
- 在工作线程崩溃时自动恢复任务
- 加权优先级队列
- 严格优先级队列
- 由于 Redis 中的写操作快速,添加任务的延迟低
- 使用唯一选项对任务进行去重
- 允许为每个任务设置超时和截止时间
- 允许聚合一组任务以批量执行多个连续操作
- 灵活的处理程序接口,支持中间件
- 允许暂停队列以停止从队列中处理任务
- 周期性任务
- 支持 Redis Cluster 以自动分片和实现高可用性
- 支持 Redis Sentinel 实现高可用性
- 与 Prometheus 集成,以收集和可视化队列指标
- Web 界面,用于检查和远程控制队列和任务
- 命令行界面,用于检查和远程控制队列和任务
使用
server
package main
import (
"log"
"your/app/package/tasks"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: redisAddr},
asynq.Config{
// Specify how many concurrent workers to use
Concurrency: 10,
// Optionally specify multiple queues with different priority.
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
// See the godoc for other configuration options
},
)
// mux maps a type to a handler
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
// ...register other handlers...
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
client
package main
import (
"log"
"time"
"your/app/package/tasks"
"github.com/hibiken/asynq"
)
const redisAddr = "127.0.0.1:6379"
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
// ------------------------------------------------------
// Example 1: Enqueue task to be processed immediately.
// Use (*Client).Enqueue method.
// ------------------------------------------------------
task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err := client.Enqueue(task)
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
// ------------------------------------------------------------
// Example 2: Schedule task to be processed in the future.
// Use ProcessIn or ProcessAt option.
// ------------------------------------------------------------
info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
if err != nil {
log.Fatalf("could not schedule task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
// ----------------------------------------------------------------------------
// Example 3: Set other options to tune task processing behavior.
// Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
// ----------------------------------------------------------------------------
task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
if err != nil {
log.Fatalf("could not create task: %v", err)
}
info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))
if err != nil {
log.Fatalf("could not enqueue task: %v", err)
}
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}
task
package tasks
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
// A list of task types.
const (
TypeEmailDelivery = "email:deliver"
TypeImageResize = "image:resize"
)
type EmailDeliveryPayload struct {
UserID int
TemplateID string
}
type ImageResizePayload struct {
SourceURL string
}
//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------
func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeEmailDelivery, payload), nil
}
func NewImageResizeTask(src string) (*asynq.Task, error) {
payload, err := json.Marshal(ImageResizePayload{SourceURL: src})
if err != nil {
return nil, err
}
// task options can be passed to NewTask, which can be overridden at enqueue time.
return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20*time.Minute)), nil
}
//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
var p EmailDeliveryPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
// Email delivery code ...
return nil
}
// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
// ... fields for struct
}
func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
var p ImageResizePayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
log.Printf("Resizing image: src=%s", p.SourceURL)
// Image resizing code ...
return nil
}
func NewImageProcessor() *ImageProcessor {
return &ImageProcessor{}
}
- Task Lifecycle:
- Scheduled : task is waiting to be processed in the future (Only applies to tasks with ProcessAt or ProcessIn option).
- Pending : task is ready to be processed and will be picked up by a free worker.
- Active : task is being processed by a worker (i.e. handler is invoked with the task).
- Retry : worker failed to process the task and the task is waiting to be retried in the future.
- Archived : task reached its max retry and stored in an archive for manual inspection.
- Completed: task was successfully processed and retained until retention TTL expires (Only applies to tasks with Retention option).
- Task Timeout and Cancelation
- Task Retention and Result
其他
- Inspector 是一个客户端接口,用于检查和改变队列和任务的状态
- scheduler 定时任务、周期性任务,类似于 crontab 的作用
- ErrorHandler 用来捕获错误事件,解决 task lease expired 错误,参考
- RetryDelayFunc 根据不同的 typename 设定 retry 时间
- SkipRetry
监控
Web UI
docker run --rm \
--name asynqmon \
-p 8080:8080 \
hibiken/asynqmon
使用 lib 库集成到项目中
Client
go install github.com/hibiken/asynq/tools/asynq@latest
asynq --help
asynq ...