Go asynq 异步任务队列

发布时间: 更新时间: 总字数:1237 阅读时间:3m 作者: IP上海 分享 网址

asynq 是 Go 实现的一种简单、可靠、高效的分布式任务队列

介绍

asynq arch
  • Asynq 是一个 Go 库,用于对任务进行排队并与工作线程异步处理它们
    • 它使用 Redis 作为队列
  • Asynq 工作原理
    • 客户端将任务放入队列
    • 服务器从队列中取出任务并为每个任务启动一个工作协程
    • 任务由多个 worker 同时处理
    • 任务队列是一种在多台机器上分配工作的机制。一个系统可以由多个工作服务器和代理组成,从而实现高可用性和横向扩展
  • 特性包括
    • 保证至少执行一次任务
    • 任务调度
    • 重试失败的任务
    • 在工作线程崩溃时自动恢复任务
    • 加权优先级队列
    • 严格优先级队列
    • 由于 Redis 中的写操作快速,添加任务的延迟低
    • 使用唯一选项对任务进行去重
    • 允许为每个任务设置超时和截止时间
    • 允许聚合一组任务以批量执行多个连续操作
    • 灵活的处理程序接口,支持中间件
    • 允许暂停队列以停止从队列中处理任务
    • 周期性任务
    • 支持 Redis Cluster 以自动分片和实现高可用性
    • 支持 Redis Sentinel 实现高可用性
    • 与 Prometheus 集成,以收集和可视化队列指标
    • Web 界面,用于检查和远程控制队列和任务
    • 命令行界面,用于检查和远程控制队列和任务

使用

  • 支持中间件

server

package main

import (
	"log"

	"your/app/package/tasks"

	"github.com/hibiken/asynq"
)

const redisAddr = "127.0.0.1:6379"

func main() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: redisAddr},
		asynq.Config{
			// Specify how many concurrent workers to use
			Concurrency: 10,
			// Optionally specify multiple queues with different priority.
			Queues: map[string]int{
				"critical": 6,
				"default":  3,
				"low":      1,
			},
			// See the godoc for other configuration options
		},
	)

	// mux maps a type to a handler
	mux := asynq.NewServeMux()
	mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
	mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
	// ...register other handlers...

	if err := srv.Run(mux); err != nil {
		log.Fatalf("could not run server: %v", err)
	}
}

client

package main

import (
	"log"
	"time"

	"your/app/package/tasks"

	"github.com/hibiken/asynq"
)

const redisAddr = "127.0.0.1:6379"

func main() {
	client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
	defer client.Close()

	// ------------------------------------------------------
	// Example 1: Enqueue task to be processed immediately.
	//            Use (*Client).Enqueue method.
	// ------------------------------------------------------

	task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
	if err != nil {
		log.Fatalf("could not create task: %v", err)
	}
	info, err := client.Enqueue(task)
	if err != nil {
		log.Fatalf("could not enqueue task: %v", err)
	}
	log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)

	// ------------------------------------------------------------
	// Example 2: Schedule task to be processed in the future.
	//            Use ProcessIn or ProcessAt option.
	// ------------------------------------------------------------

	info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
	if err != nil {
		log.Fatalf("could not schedule task: %v", err)
	}
	log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)

	// ----------------------------------------------------------------------------
	// Example 3: Set other options to tune task processing behavior.
	//            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
	// ----------------------------------------------------------------------------

	task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
	if err != nil {
		log.Fatalf("could not create task: %v", err)
	}
	info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))
	if err != nil {
		log.Fatalf("could not enqueue task: %v", err)
	}
	log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

task

package tasks

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/hibiken/asynq"
)

// A list of task types.
const (
	TypeEmailDelivery = "email:deliver"
	TypeImageResize   = "image:resize"
)

type EmailDeliveryPayload struct {
	UserID     int
	TemplateID string
}

type ImageResizePayload struct {
	SourceURL string
}

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
	payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID})
	if err != nil {
		return nil, err
	}
	return asynq.NewTask(TypeEmailDelivery, payload), nil
}

func NewImageResizeTask(src string) (*asynq.Task, error) {
	payload, err := json.Marshal(ImageResizePayload{SourceURL: src})
	if err != nil {
		return nil, err
	}
	// task options can be passed to NewTask, which can be overridden at enqueue time.
	return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20*time.Minute)), nil
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
	var p EmailDeliveryPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
	}
	log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
	// Email delivery code ...
	return nil
}

// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
	// ... fields for struct
}

func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
	var p ImageResizePayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
	}
	log.Printf("Resizing image: src=%s", p.SourceURL)
	// Image resizing code ...
	return nil
}

func NewImageProcessor() *ImageProcessor {
	return &ImageProcessor{}
}
  • Task Lifecycle
    • Scheduled : task is waiting to be processed in the future (Only applies to tasks with ProcessAt or ProcessIn option).
    • Pending : task is ready to be processed and will be picked up by a free worker.
    • Active : task is being processed by a worker (i.e. handler is invoked with the task).
    • Retry : worker failed to process the task and the task is waiting to be retried in the future.
    • Archived : task reached its max retry and stored in an archive for manual inspection.
    • Completed: task was successfully processed and retained until retention TTL expires (Only applies to tasks with Retention option).
  • Task Timeout and Cancelation
  • Task Retention and Result

其他

  • Inspector 是一个客户端接口,用于检查和改变队列和任务的状态
  • scheduler 定时任务、周期性任务,类似于 crontab 的作用
  • ErrorHandler 用来捕获错误事件,解决 task lease expired 错误,参考
  • RetryDelayFunc 根据不同的 typename 设定 retry 时间
  • SkipRetry

监控

Web UI

docker run --rm \
    --name asynqmon \
    -p 8080:8080 \
    hibiken/asynqmon

使用 lib 库集成到项目中

Client

go install github.com/hibiken/asynq/tools/asynq@latest

asynq --help

asynq ...

参考

  1. https://github.com/hibiken/asynq
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数