在分布式系统中,延迟队列是一种常见的需求,例如订单超时取消、任务定时执行等。Redis 作为高性能的内存数据库,提供了多种实现延迟队列的方案。本文将介绍几种不同的 Redis 方案,并分析其优缺点及适用场景。
Sorted Set
利用 Redis 的 Sorted Set 数据结构,将消息 ID 作为成员,到期时间戳作为分数。通过 ZRANGEBYSCORE 命令获取已到期的消息。
const (queueName = "delay_queue"
)type ZSetDelayQueue struct {rds *redis.ClientscanInterval time.Duration
}// PushToDelayQueue :将任务推到延迟队列中,任务会在指定的延迟时间后可用。
// val: 需要推送的值, score: 过期时间点
func (d *ZSetDelayQueue) PushToDelayQueue(ctx context.Context, val string, score float64) error {_, err := d.rds.ZAdd(ctx, queueName, redis.Z{Score: score, Member: val}).Result()if err != nil {return err}return nil
}// PopFromDelayQueue :从延迟队列中取出过期的任务
func (d *ZSetDelayQueue) PopFromDelayQueue(ctx context.Context) ([]string, error) {nowStr := strconv.FormatInt(time.Now().Unix(), 10)var vals []string// 使用Redis事务保证原子性err := d.rds.Watch(ctx, func(tx *redis.Tx) error {// 获取过期的任务var err errorvals, err = d.rds.ZRangeByScore(ctx, queueName, &redis.ZRangeBy{Min: "0",Max: nowStr,}).Result()if err != nil {return err}// 删除过期的任务if len(vals) == 0 {return nil}_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {pipe.ZRemRangeByScore(ctx, queueName, "0", nowStr)return nil})return nil}, queueName)if err != nil {return nil, err}return vals, nil
}
✅ 优点
- 实现简单,易于理解
- 支持任意精度的延迟时间
- 可以按时间顺序处理任务
- 支持快速查找和删除特定任务
❌缺点
- 需要轮询检查到期任务
- 大量任务时性能可能下降
- 没有内置的消费者竞争机制
适用场景
- 任务量中等的系统
- 对延迟精度要求不是特别高的场景
- 需要支持任务优先级调整的场景
Key Expiration + Pub/Sub
利用 Redis 的键过期通知功能,为每个延迟任务设置一个过期键,当键过期时通过 Pub/Sub 机制通知消费者。
type KeyExpDelayQueue struct {rdb *redis.Client
}var (expireKeyPrefix = "expire_key"
)func (q *KeyExpDelayQueue) AddTask(ctx context.Context, val string, delay time.Duration) error {// 设置过期键expireKey := expireKeyPrefix + val// 设置过期时间return q.rdb.Set(ctx, expireKey, val, delay).Err()
}func (q *KeyExpDelayQueue) StartConsume(ctx context.Context, handler func(val string) error) error {// 确保开启了键空间通知q.rdb.ConfigSet(ctx, "notify-keyspace-events", "Ex")// 订阅过期事件pubsub := q.rdb.Subscribe(ctx, "__keyevent@0__:expired")defer pubsub.Close()// 监听处理过期事件ch := pubsub.Channel()for msg := range ch {// 检查key是否为关注的过期键if msg.Payload[:len(expireKeyPrefix)] != expireKeyPrefix {continue}// 处理过期事件val := msg.Payload[len(expireKeyPrefix):]if err := handler(val); err == nil {q.rdb.Del(ctx, msg.Payload)}}return nil
}
✅优点
- 不需要轮询,事件驱动模式
- 实现简单,占用资源少
- 任务触发及时
❌缺点
- 需要开启 Redis 键空间通知功能
- 可靠性较低,如果消费者断开连接可能丢失通知
- 无法查看待处理的任务列表
- 无法实现任务优先级
适用场景
- 对可靠性要求不高的场景
- 任务量较小的系统
- 需要实时触发的场景
Sorted Set方案进阶版
为了解耦任务发现和任务处理、支持多消费者模型和可靠性保障,在原sorted set方案基础上引入新结构和处理机制
- Stream+Consumer Group:
- List+Block:
Stream+Consumer Group
使用 Redis Stream 作为消息队列,结合有序集合存储延迟信息。定时将到期消息从有序集合移动到 Stream 中,由消费者组处理
Stream+Consumer Group方案支持多消费者组,那么多个消费者就可以进行并行处理,提升处理效率。并且Stream还提供了消息确认机制,确保任务能被处理
type streamDelayQueue struct {rdb *redis.ClientdelayKey, streamKey, groupName string
}var (moveTaskScript = `local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '0', ARGV[1])local count = 0for _, task in ipairs(tasks) doredis.call('XADD', KEYS[2], '*', 'task', task)redis.call('ZREM', KEYS[1], task)count = count + 1endreturn count
`
)// AddTask 添加任务到延迟队列
func (q *streamDelayQueue) AddTask(ctx context.Context, taskID string, delay time.Duration) error {// 1. 计算过期时间expireAt := time.Now().Add(delay).Unix()// 2. 将任务推到有序集合return q.rdb.ZAdd(ctx, q.delayKey, redis.Z{Score: float64(expireAt), Member: taskID}).Err()
}// MoveReadyTasks 将准备好的任务移动到任务队列
func (q *streamDelayQueue) MoveReadyTasks(ctx context.Context) (int64, error) {return q.rdb.Eval(ctx, moveTaskScript, []string{q.delayKey, q.streamKey}, time.Now().Unix()).Int64()
}// ConsumeTasks 消费任务
func (q *streamDelayQueue) ConsumeTasks(ctx context.Context, consumerName string, count int64) ([]redis.XStream, error) {return q.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: q.groupName,Consumer: consumerName,Streams: []string{q.streamKey, ">"},Count: count,Block: 0,}).Result()
}// AckTask 确认任务
func (q *streamDelayQueue) AckTask(ctx context.Context, taskID string) error {return q.rdb.XAck(ctx, q.streamKey, q.groupName, taskID).Err()
}
List+Block
使用 Redis List 作为队列,结合定时任务将到期的延迟任务添加到队列中,消费者使用阻塞操作等待任务。
本方案则支持多消费者进行竞争
type listBlockDelayQueue struct {rdb *redis.ClientdelayKey, listKey string
}// AddTask 添加任务到延迟队列
func (q *listBlockDelayQueue) AddTask(ctx context.Context, taskID string, delay time.Duration) error {// 计算执行时间execTime := time.Now().Add(delay).Unix()// 添加到有序集合return q.rdb.ZAdd(ctx, q.delayKey, redis.Z{Score: float64(execTime),Member: taskID,}).Err()
}// MoveReadyTasks 将到期任务移动到List
func (q *listBlockDelayQueue) MoveReadyTasks(ctx context.Context) (int64, error) {now := time.Now().Unix()script := `local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '0', ARGV[1])local count = 0for i, task in ipairs(tasks) doredis.call('LPUSH', KEYS[2], task)redis.call('ZREM', KEYS[1], task)count = count + 1endreturn count`result, err := q.rdb.Eval(ctx, script, []string{q.delayKey, q.listKey}, now).Int64()return result, err
}// ConsumeTask 消费任务(堵塞操作)
func (q *listBlockDelayQueue) ConsumeTask(ctx context.Context, timeout time.Duration) (string, error) {result, err := q.rdb.BRPop(ctx, timeout, q.listKey).Result()if err != nil {return "", err}return result[1], nil
}