Redis 令牌桶限流实现

发布时间: 更新时间: 总字数:1801 阅读时间:4m 作者: IP上海 分享 网址

使用 Redis 实现令牌桶限流是一个常见且高效的方法。它的核心思想是利用 Redis 的原子性操作来管理令牌的生产和消费,从而保证在高并发场景下的正确性。

核心思路

令牌桶算法的工作原理是:

  1. 令牌生产:以恒定速率向一个中添加令牌。
  2. 令牌桶容量:桶有固定的容量,当桶满时,新生成的令牌会被丢弃。
  3. 令牌消费:当一个请求到来时,它需要从桶中取出一个令牌。
  4. 限流:如果桶里没有令牌,请求就会被拒绝或阻塞,从而达到限流的目的。

将这个思路映射到 Redis,我们可以使用以下 Redis 数据结构和命令:

  • 键(Key):用于标识不同的限流对象,例如 rate_limiter:user_id:123
  • 值(Value):用来存储当前桶中剩余的令牌数。
  • INCR / DECR:用来原子性地增加或减少令牌数。
  • EXPIRE:设置键的过期时间,这在处理令牌桶的令牌生成桶容量时非常关键。我们可以通过设置一个键的过期时间,让 Redis 自动在一段时间后清空令牌,模拟令牌的生成桶容量的概念。

Python 示例

Python 中,我们可以使用 redis-py 库来与 Redis 进行交互。下面的示例展示了一个基于 Lua 脚本的实现,这是最推荐的方式,因为它能保证整个逻辑的原子性。

Lua 脚本的优势在于,它能在 Redis 服务器端执行多个命令,避免了客户端和服务端之间的多次网络往返,并且能确保操作的原子性。

1. 令牌桶限流的 Lua 脚本

-- KEYS[1] 是令牌桶的键名
-- ARGV[1] 是令牌桶的最大容量
-- ARGV[2] 是每秒生成的令牌数
-- ARGV[3] 是当前时间戳
-- ARGV[4] 是请求消耗的令牌数

local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

-- 获取上一次更新时间戳
local last_time = redis.call('HGET', key, 'last_time')
-- https://stackoverflow.com/questions/21364251/how-to-do-null-nil-check-in-lua-with-redis-scripto-and-redis-db
-- redis-cli > EVAL "return redis.call('GET','hello')"
if last_time == false then
    -- 如果是第一次访问,初始化令牌桶
    last_time = now
    redis.call('HMSET', key, 'last_time', last_time, 'tokens', capacity - requested)
    redis.call('EXPIRE', key, capacity / rate + 1)
    return 1
end

-- 计算自上次更新以来生成的令牌数
local time_passed = now - tonumber(last_time)
local new_tokens = time_passed * rate

-- 获取当前令牌数
local current_tokens = tonumber(redis.call('HGET', key, 'tokens'))

-- 更新令牌数,并限制不超过容量
local new_token_count = math.min(capacity, current_tokens + new_tokens)

-- 如果剩余令牌数足够,则消费令牌
if new_token_count >= requested then
    local final_tokens = new_token_count - requested
    redis.call('HMSET', key, 'last_time', now, 'tokens', final_tokens)
    return 1
else
    -- 令牌不足,请求被拒绝
    redis.call('HMSET', key, 'last_time', now, 'tokens', new_token_count)
    return 0
end

2. Python 客户端代码

import redis
import time

# 初始化 Redis 连接
r = redis.Redis(host='localhost', port=6379, db=0)

# 加载 Lua 脚本
lua_script = r.script_load("""
-- 上述 Lua 脚本代码
""")

def rate_limit(user_id, capacity=10, rate=1, requested=1):
    """
    令牌桶限流函数
    :param user_id: 用户ID,用于构建 Redis key
    :param capacity: 令牌桶容量
    :param rate: 每秒生成的令牌数
    :param requested: 请求消耗的令牌数
    :return: True 表示通过,False 表示被限流
    """
    key = f"rate_limiter:{user_id}"
    now = int(time.time())

    # 调用 Lua 脚本,传入 KEYS 和 ARGV
    result = r.evalsha(
        lua_script,
        1,  # KEYS 的数量
        key,
        capacity,
        rate,
        now,
        requested
    )
    return result == 1

# 示例使用
if __name__ == "__main__":
    user_id = "user:101"

    print("--- 首次访问,通过 ---")
    print(f"请求结果: {rate_limit(user_id, capacity=10, rate=1)}")

    print("\n--- 连续快速访问,令牌消耗 ---")
    for i in range(15):
        time.sleep(0.1) # 模拟请求间隔
        if rate_limit(user_id, capacity=10, rate=1):
            print(f"第 {i+1} 次请求: 通过")
        else:
            print(f"第 {i+1} 次请求: **被限流**")

    print("\n--- 间隔一段时间后,令牌恢复 ---")
    time.sleep(5)
    print(f"等待5秒后再次请求: {rate_limit(user_id, capacity=10, rate=1)}")

Go 语言示例

Go 语言中,我们可以使用 go-redis/redis 库。同样,我们推荐使用 Lua 脚本来保证操作的原子性。

1. 令牌桶限流的 Lua 脚本

使用和 Python 示例中完全相同的 Lua 脚本

2. Go 客户端代码

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/go-redis/redis/v9"
)

// Lua 脚本内容
const luaScript = `
-- ... 省略上述 Lua 脚本内容 ...
`

// Redis 客户端
var rdb *redis.Client

func init() {
	rdb = redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "", // 密码
		DB:       0,  // 数据库
	})

	// 检查连接
	_, err := rdb.Ping(context.Background()).Result()
	if err != nil {
		log.Fatalf("无法连接到 Redis: %v", err)
	}
}

// RateLimit 令牌桶限流函数
func RateLimit(key string, capacity, rate, requested int) (bool, error) {
	ctx := context.Background()
	now := time.Now().Unix()

	// 使用 Lua 脚本执行限流逻辑
	result, err := rdb.Eval(ctx, luaScript,
		[]string{key},
		capacity,
		rate,
		now,
		requested,
	).Result()

	if err != nil && err != redis.Nil {
		return false, fmt.Errorf("执行 Redis Lua 脚本失败: %w", err)
	}

	return result.(int64) == 1, nil
}

func main() {
	userKey := "rate_limiter:user:102"
	capacity := 10
	rate := 1
	requested := 1

	fmt.Println("--- 首次访问,通过 ---")
	passed, err := RateLimit(userKey, capacity, rate, requested)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("请求结果: %v\n", passed)

	fmt.Println("\n--- 连续快速访问,令牌消耗 ---")
	for i := 0; i < 15; i++ {
		time.Sleep(100 * time.Millisecond) // 模拟请求间隔
		passed, err := RateLimit(userKey, capacity, rate, requested)
		if err != nil {
			log.Fatal(err)
		}
		if passed {
			fmt.Printf("第 %d 次请求: 通过\n", i+1)
		} else {
			fmt.Printf("第 %d 次请求: **被限流**\n", i+1)
		}
	}

	fmt.Println("\n--- 间隔一段时间后,令牌恢复 ---")
	time.Sleep(5 * time.Second)
	passed, err = RateLimit(userKey, capacity, rate, requested)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("等待5秒后再次请求: %v\n", passed)
}

总结

这两种语言的实现都遵循了相同的逻辑,即通过 Lua 脚本在 Redis 服务器端以原子方式处理限流逻辑。这种方法的好处是:

  • 原子性:Lua 脚本在 Redis 中是原子执行的,避免了多个客户端同时操作导致的数据不一致问题。
  • 性能:减少了客户端与 Redis 之间的网络往返,提高了执行效率。
  • 高并发安全:保证了在并发场景下限流判断的准确性。

这种基于 Redis 和 Lua 脚本的令牌桶实现,是生产环境中非常可靠和常用的限流方案。

本文总阅读量 次 本站总访问量 次 本站总访客数
Home Archives Categories Tags Statistics