使用 Redis 实现令牌桶限流是一个常见且高效的方法。它的核心思想是利用 Redis 的原子性操作来管理令牌
的生产和消费,从而保证在高并发场景下的正确性。
核心思路
令牌桶算法的工作原理是:
- 令牌生产:以恒定速率向一个
桶
中添加令牌。
- 令牌桶容量:桶有固定的容量,当桶满时,新生成的令牌会被丢弃。
- 令牌消费:当一个请求到来时,它需要从桶中取出一个令牌。
- 限流:如果桶里没有令牌,请求就会被拒绝或阻塞,从而达到限流的目的。
将这个思路映射到 Redis,我们可以使用以下 Redis 数据结构和命令:
- 键(Key):用于标识不同的限流对象,例如
rate_limiter:user_id:123
。
- 值(Value):用来存储当前桶中剩余的令牌数。
INCR
/ DECR
:用来原子性地增加或减少令牌数。
EXPIRE
:设置键的过期时间,这在处理令牌桶的令牌生成和桶容量时非常关键。我们可以通过设置一个键的过期时间,让 Redis 自动在一段时间后清空令牌,模拟令牌的生成
和桶容量
的概念。
Python 示例
Python 中,我们可以使用 redis-py
库来与 Redis 进行交互。下面的示例展示了一个基于 Lua 脚本的实现,这是最推荐的方式,因为它能保证整个逻辑的原子性。
Lua 脚本的优势在于,它能在 Redis 服务器端执行多个命令,避免了客户端和服务端之间的多次网络往返,并且能确保操作的原子性。
1. 令牌桶限流的 Lua 脚本
-- KEYS[1] 是令牌桶的键名
-- ARGV[1] 是令牌桶的最大容量
-- ARGV[2] 是每秒生成的令牌数
-- ARGV[3] 是当前时间戳
-- ARGV[4] 是请求消耗的令牌数
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
-- 获取上一次更新时间戳
local last_time = redis.call('HGET', key, 'last_time')
-- https://stackoverflow.com/questions/21364251/how-to-do-null-nil-check-in-lua-with-redis-scripto-and-redis-db
-- redis-cli > EVAL "return redis.call('GET','hello')"
if last_time == false then
-- 如果是第一次访问,初始化令牌桶
last_time = now
redis.call('HMSET', key, 'last_time', last_time, 'tokens', capacity - requested)
redis.call('EXPIRE', key, capacity / rate + 1)
return 1
end
-- 计算自上次更新以来生成的令牌数
local time_passed = now - tonumber(last_time)
local new_tokens = time_passed * rate
-- 获取当前令牌数
local current_tokens = tonumber(redis.call('HGET', key, 'tokens'))
-- 更新令牌数,并限制不超过容量
local new_token_count = math.min(capacity, current_tokens + new_tokens)
-- 如果剩余令牌数足够,则消费令牌
if new_token_count >= requested then
local final_tokens = new_token_count - requested
redis.call('HMSET', key, 'last_time', now, 'tokens', final_tokens)
return 1
else
-- 令牌不足,请求被拒绝
redis.call('HMSET', key, 'last_time', now, 'tokens', new_token_count)
return 0
end
2. Python 客户端代码
import redis
import time
# 初始化 Redis 连接
r = redis.Redis(host='localhost', port=6379, db=0)
# 加载 Lua 脚本
lua_script = r.script_load("""
-- 上述 Lua 脚本代码
""")
def rate_limit(user_id, capacity=10, rate=1, requested=1):
"""
令牌桶限流函数
:param user_id: 用户ID,用于构建 Redis key
:param capacity: 令牌桶容量
:param rate: 每秒生成的令牌数
:param requested: 请求消耗的令牌数
:return: True 表示通过,False 表示被限流
"""
key = f"rate_limiter:{user_id}"
now = int(time.time())
# 调用 Lua 脚本,传入 KEYS 和 ARGV
result = r.evalsha(
lua_script,
1, # KEYS 的数量
key,
capacity,
rate,
now,
requested
)
return result == 1
# 示例使用
if __name__ == "__main__":
user_id = "user:101"
print("--- 首次访问,通过 ---")
print(f"请求结果: {rate_limit(user_id, capacity=10, rate=1)}")
print("\n--- 连续快速访问,令牌消耗 ---")
for i in range(15):
time.sleep(0.1) # 模拟请求间隔
if rate_limit(user_id, capacity=10, rate=1):
print(f"第 {i+1} 次请求: 通过")
else:
print(f"第 {i+1} 次请求: **被限流**")
print("\n--- 间隔一段时间后,令牌恢复 ---")
time.sleep(5)
print(f"等待5秒后再次请求: {rate_limit(user_id, capacity=10, rate=1)}")
Go 语言示例
Go 语言中,我们可以使用 go-redis/redis
库。同样,我们推荐使用 Lua 脚本来保证操作的原子性。
1. 令牌桶限流的 Lua 脚本
使用和 Python 示例中完全相同的 Lua 脚本。
2. Go 客户端代码
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/go-redis/redis/v9"
)
// Lua 脚本内容
const luaScript = `
-- ... 省略上述 Lua 脚本内容 ...
`
// Redis 客户端
var rdb *redis.Client
func init() {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // 密码
DB: 0, // 数据库
})
// 检查连接
_, err := rdb.Ping(context.Background()).Result()
if err != nil {
log.Fatalf("无法连接到 Redis: %v", err)
}
}
// RateLimit 令牌桶限流函数
func RateLimit(key string, capacity, rate, requested int) (bool, error) {
ctx := context.Background()
now := time.Now().Unix()
// 使用 Lua 脚本执行限流逻辑
result, err := rdb.Eval(ctx, luaScript,
[]string{key},
capacity,
rate,
now,
requested,
).Result()
if err != nil && err != redis.Nil {
return false, fmt.Errorf("执行 Redis Lua 脚本失败: %w", err)
}
return result.(int64) == 1, nil
}
func main() {
userKey := "rate_limiter:user:102"
capacity := 10
rate := 1
requested := 1
fmt.Println("--- 首次访问,通过 ---")
passed, err := RateLimit(userKey, capacity, rate, requested)
if err != nil {
log.Fatal(err)
}
fmt.Printf("请求结果: %v\n", passed)
fmt.Println("\n--- 连续快速访问,令牌消耗 ---")
for i := 0; i < 15; i++ {
time.Sleep(100 * time.Millisecond) // 模拟请求间隔
passed, err := RateLimit(userKey, capacity, rate, requested)
if err != nil {
log.Fatal(err)
}
if passed {
fmt.Printf("第 %d 次请求: 通过\n", i+1)
} else {
fmt.Printf("第 %d 次请求: **被限流**\n", i+1)
}
}
fmt.Println("\n--- 间隔一段时间后,令牌恢复 ---")
time.Sleep(5 * time.Second)
passed, err = RateLimit(userKey, capacity, rate, requested)
if err != nil {
log.Fatal(err)
}
fmt.Printf("等待5秒后再次请求: %v\n", passed)
}
总结
这两种语言的实现都遵循了相同的逻辑,即通过 Lua 脚本在 Redis 服务器端以原子方式处理限流逻辑。这种方法的好处是:
- 原子性:Lua 脚本在 Redis 中是原子执行的,避免了多个客户端同时操作导致的数据不一致问题。
- 性能:减少了客户端与 Redis 之间的网络往返,提高了执行效率。
- 高并发安全:保证了在并发场景下限流判断的准确性。
这种基于 Redis 和 Lua 脚本的令牌桶实现,是生产环境中非常可靠和常用的限流方案。