基于Redis实现限流的几种方式
目录
基于Redis实现限流的几种方式
限流尽可能在满足需求的情况下越简单越好!
分布式限流是指在分布式系统中对请求进行限制,以防止系统过载或滥用资源。以下是常见的分布式限流策略及其实现方式:
1、基于 Redis 的固定窗口限流
原理 :
- 设定一个时间窗口(如 1 秒)
- 使用 Redis 维护一个计数器,存储当前窗口的请求数
- 当请求到来时,
INCR
计数器,如果超过阈值则拒绝 - 过期后自动删除键,进入下一个窗口
优缺点 : ✅ 简单易实现
❌ 在窗口交界处可能会出现短时间的突发流量(“临界突增”)
public class RedisRateLimiter {
private final StringRedisTemplate redisTemplate;
// 命令前缀
private final String key;
private final int rate;
private final int window;
public RedisRateLimiter(StringRedisTemplate redisTemplate, String key, int rate, int window) {
this.redisTemplate = redisTemplate;
this.key = key;
this.rate = rate;
this.window = window;
}
// 检查并获取令牌
public boolean acquire() {
String currentKey = key + "_" + (getCurrentSeconds() / window);
Long currentCount = redisTemplate.opsForValue().increment(currentKey);
redisTemplate.expire(currentKey, window, TimeUnit.SECONDS);
log.info("当前获取到的令牌数 key {} count {} result {} ",currentKey,currentCount,currentCount > rate);
if (currentCount > rate){
return false;
}
return true;
}
private long getCurrentSeconds() {
return System.currentTimeMillis()/1000;
}
public void acquireSleep() {
int count = 0;
while (!acquire()){
sleep(1);
count++;
}
}
private void sleep(int second) {
try {
TimeUnit.SECONDS.sleep(second);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public boolean acquireSleep(int waitSecond) {
int count = 0;
while (!acquire()){
if (count >= waitSecond){
return false;
}
sleep(1);
count++;
log.info("RedisRateLimiter[{}] try acquire sleep {}",key,count);
}
return true;
}
public static void main(String[] args) throws InterruptedException {
ch.qos.logback.classic.Logger logger=(ch.qos.logback.classic.Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
logger.setLevel(Level.OFF);
StringRedisTemplate stringRedisTemplate=getStringRedisTemplate();
RedisRateLimiter redisRateLimiter = new RedisRateLimiter(stringRedisTemplate,"request_interface",16,10);
// 模拟 50 个并发线程,每个线程尝试获取 10 次令牌
final int threadCount = 50;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
// 每个线程尝试多次调用限流方法
for (int j = 0; j < 10; j++) {
redisRateLimiter.acquireSleep();
System.out.println("当前线程:"+Thread.currentThread().getName()+",获取到令牌,时间"+ DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss"));
// 模拟每次请求间隔 100 毫秒
redisRateLimiter.milliseconds(100);
}
latch.countDown();
});
}
latch.await();
executor.shutdown();
}
private static StringRedisTemplate getStringRedisTemplate() {
// 1. 创建单机模式的配置
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setHostName("127.0.0.1");
redisStandaloneConfiguration.setPort(6379);
// 2. 构造 LettuceConnectionFactory,并初始化
LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration);
factory.afterPropertiesSet(); // 初始化连接工厂
// 3. 创建 StringRedisTemplate 并设置连接工厂
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.setConnectionFactory(factory);
stringRedisTemplate.afterPropertiesSet(); // 初始化模板
return stringRedisTemplate;
}
private void milliseconds(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
main方法中的计算结果可以看到在并发环境下严格的执行10s16次请求(也就是1分钟96次请求),这个就有个弊端,在并发环境下他们一拿到令牌同一秒就执行请求了。这个就是突发流量。
我的业务就是1分钟允许请求100次对方接口,像这种虽然严格按照1分钟不超过100次请求但是有突发流量对方还是返回了频率过高,可能对方计算频率方式不一样吧。所以这种方式不太可取。
当前线程:pool-1-thread-30,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-6,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-18,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-35,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-38,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-37,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-33,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-44,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-3,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-45,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-5,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-20,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-11,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-43,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-15,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-29,获取到令牌,时间2025-03-15 00:17:11
当前线程:pool-1-thread-4,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-16,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-12,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-24,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-26,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-8,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-14,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-49,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-42,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-21,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-1,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-10,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-31,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-50,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-36,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-48,获取到令牌,时间2025-03-15 00:17:20
当前线程:pool-1-thread-9,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-19,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-47,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-2,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-34,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-46,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-41,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-22,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-17,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-27,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-28,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-32,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-25,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-13,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-40,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-23,获取到令牌,时间2025-03-15 00:17:30
当前线程:pool-1-thread-39,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-7,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-34,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-13,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-2,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-22,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-28,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-46,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-25,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-19,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-9,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-32,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-39,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-17,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-47,获取到令牌,时间2025-03-15 00:17:40
当前线程:pool-1-thread-41,获取到令牌,时间2025-03-15 00:17:40
2. 基于 Redis 的滑动窗口限流
原理 :
- 维护一个基于时间的列表(ZSET,有序集合)
- 每次请求时,记录当前时间戳到 ZSET
- 删除超出窗口时间范围的请求
- 统计 ZSET 中当前窗口内的请求数,超出阈值则拒绝
优缺点 : ✅ 解决了固定窗口的临界突增问题
❌ 存储和计算成本比固定窗口稍高
原理说明
- 利用 Redis 的有序集合(ZSet),以请求的时间戳作为 score,每个请求入队一个唯一的 member(例如时间戳+UUID)。
- 每次请求时,先移除时间窗口外的记录(score 小于当前时间减去窗口长度)。
- 统计当前窗口内的请求数量,若数量超过设定阈值,则拒绝请求。
@Slf4j
public class RedisSlidingWindowRateLimiter {
private final StringRedisTemplate redisTemplate;
private final String key;
private final int rate;
private final int window; // 窗口长度,单位秒
public RedisSlidingWindowRateLimiter(StringRedisTemplate redisTemplate, String key, int rate, int window) {
this.redisTemplate = redisTemplate;
this.key = key;
this.rate = rate;
// 限制窗口长度在 1 分钟以内
Assert.isTrue(window > 0 && window <= 60, "窗口只支持一分钟内");
this.window = window;
}
// 检查并获取令牌
public boolean acquire() {
long now = System.currentTimeMillis();
// 计算窗口起始时间(单位毫秒)
long windowStart = now - window * 1000;
// 移除过期的请求记录
redisTemplate.opsForZSet().removeRangeByScore(key, 0, windowStart);
// 添加当前请求记录,member 用当前时间戳加 UUID 保证唯一性,score 为当前时间
String member = now + "_" + UUID.randomUUID().toString();
redisTemplate.opsForZSet().add(key, member, now);
// 统计当前窗口内的请求数量
Long count = redisTemplate.opsForZSet().count(key, windowStart, now);
// 为了避免 key 永不过期,设置一个过期时间(窗口长度)
redisTemplate.expire(key, window, TimeUnit.SECONDS);
if (count != null && count > rate) {
return false;
}
return true;
}
// 采用轮询方式等待获取令牌
public void acquireSleep() {
int count = 0;
while (!acquire()){
ThreadUtil.sleep(1, TimeUnit.SECONDS);
count++;
log.info("RedisSlidingWindowRateLimiter[{}] try acquire sleep {}", key, count);
}
}
public boolean acquireSleep(int waitSecond) {
int count = 0;
while (!acquire()){
if (count >= waitSecond){
return false;
}
ThreadUtil.sleep(1, TimeUnit.SECONDS);
count++;
log.info("RedisSlidingWindowRateLimiter[{}] try acquire sleep {}", key, count);
}
return true;
}
}
代码说明
- 移除过期记录
:调用
removeRangeByScore
清理掉窗口外的请求数据。 - 添加当前请求 :将当前请求的时间戳与 UUID 组合后添加到 ZSet 中,score 为当前时间,确保在滑动窗口内计数。
- 统计计数
:通过
count
方法统计当前窗口内的请求数,如果超出限制则返回 false。
3. 基于 Redis 的令牌桶限流
原理 :
- 设定一个容量为
max_tokens
的令牌桶,初始装满 - 以固定速率向桶中添加令牌(如每秒 10 个)
- 每次请求需要消耗一个令牌,没有令牌时拒绝请求
- 通常使用 Redis 的
Lua
脚本实现原子操作
优缺点 : ✅ 更加平滑,支持突发流量
❌ 需要额外的定时任务或后台线程补充令牌
原理说明
- 令牌桶算法中,设定一个桶最大容量
capacity
,同时以一定速率refillRate
补充令牌。 - 每次请求需要消耗一个令牌,若当前桶内令牌不足,则拒绝请求。
- 为保证原子性,利用 Redis 的 Lua 脚本将令牌获取和补充过程封装为原子操作。
@Slf4j
public class RedisTokenBucketRateLimiter {
private final StringRedisTemplate redisTemplate;
private final String key;
// 桶的容量(最大令牌数)
private final int capacity;
// 令牌补充速率,单位:个/秒
private final double refillRate;
// Lua 脚本,用于原子化处理令牌桶逻辑
private static final String LUA_SCRIPT =
"local tokens_key = KEYS[1] .. ':tokens' \n" +
"local timestamp_key = KEYS[1] .. ':ts' \n" +
"local capacity = tonumber(ARGV[1]) \n" +
"local refill_rate = tonumber(ARGV[2]) \n" +
"local current_time = tonumber(ARGV[3]) \n" +
"local requested = tonumber(ARGV[4]) \n" +
"local tokens = tonumber(redis.call('get', tokens_key) or capacity) \n" +
"local last_refill = tonumber(redis.call('get', timestamp_key) or current_time) \n" +
"local delta = current_time - last_refill \n" +
"local tokens_to_add = delta * refill_rate \n" +
"tokens = math.min(capacity, tokens + tokens_to_add) \n" +
"if tokens < requested then \n" +
" return 0 \n" +
"else \n" +
" tokens = tokens - requested \n" +
" redis.call('set', tokens_key, tokens) \n" +
" redis.call('set', timestamp_key, current_time) \n" +
" return 1 \n" +
"end";
public RedisTokenBucketRateLimiter(StringRedisTemplate redisTemplate, String key, int capacity, double refillRate) {
this.redisTemplate = redisTemplate;
this.key = key;
this.capacity = capacity;
this.refillRate = refillRate;
}
// 检查并获取令牌
public boolean acquire() {
// 当前时间(单位秒)
long currentTime = System.currentTimeMillis() / 1000;
// 请求消耗 1 个令牌
Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
List<byte[]> keys = Collections.singletonList(key.getBytes());
List<byte[]> args = Arrays.asList(
String.valueOf(capacity).getBytes(),
String.valueOf(refillRate).getBytes(),
String.valueOf(currentTime).getBytes(),
"1".getBytes()
);
return connection.eval(LUA_SCRIPT.getBytes(), ReturnType.INTEGER, keys.size(), keys.toArray(new byte[0][]), args.toArray(new byte[0][]));
});
return result != null && result == 1;
}
}
代码说明
- Lua 脚本逻辑
:
- 获取当前桶中剩余令牌数和上次补充时间,若不存在则默认初始化为满桶状态。
- 根据当前时间与上次更新时间的差值计算应补充的令牌数,并更新桶内令牌。
- 判断是否有足够令牌供本次请求(默认请求 1 个令牌),若不足返回 0,否则扣减令牌并更新上次补充时间,返回 1。
- 原子执行
:通过 redisTemplate 的
eval
方法保证 Lua 脚本的原子性,避免并发问题。
4. 基于 Redis 的漏桶限流
原理 :
- 设定一个队列模拟漏桶
- 按固定速率从队列取出请求执行
- 请求过多时,超出队列长度的请求被丢弃
优缺点 : ✅ 输出速率稳定,不受突发流量影响
❌ 可能会丢弃部分流量
原理说明
- 漏桶算法中,将请求看作向桶中注入的“水”,桶以固定速率漏水(处理请求)。
- 当桶中水量超过预设容量时,则拒绝新请求。
- 同样利用 Lua 脚本保证原子操作。
@Slf4j
public class RedisLeakyBucketRateLimiter {
private final StringRedisTemplate redisTemplate;
private final String key;
// 桶的容量(允许的最大突发请求数)
private final int capacity;
// 漏水速率,单位:个/秒,表示每秒可处理的请求数
private final double leakRate;
// Lua 脚本,用于原子化处理漏桶逻辑
private static final String LUA_SCRIPT =
"local level_key = KEYS[1] .. ':level' \n" +
"local timestamp_key = KEYS[1] .. ':ts' \n" +
"local capacity = tonumber(ARGV[1]) \n" +
"local leak_rate = tonumber(ARGV[2]) \n" +
"local current_time = tonumber(ARGV[3]) \n" +
"local level = tonumber(redis.call('get', level_key) or '0') \n" +
"local last_time = tonumber(redis.call('get', timestamp_key) or current_time) \n" +
"local delta = current_time - last_time \n" +
"local leaked = delta * leak_rate \n" +
// 计算漏水后桶内水量,不能低于 0
"level = math.max(0, level - leaked) \n" +
"if level + 1 > capacity then \n" +
" return 0 \n" +
"else \n" +
" level = level + 1 \n" +
" redis.call('set', level_key, level) \n" +
" redis.call('set', timestamp_key, current_time) \n" +
" return 1 \n" +
"end";
public RedisLeakyBucketRateLimiter(StringRedisTemplate redisTemplate, String key, int capacity, double leakRate) {
this.redisTemplate = redisTemplate;
this.key = key;
this.capacity = capacity;
this.leakRate = leakRate;
}
// 检查并获取请求处理资格
public boolean acquire() {
// 当前时间(单位秒)
long currentTime = System.currentTimeMillis() / 1000;
Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
List<byte[]> keys = Collections.singletonList(key.getBytes());
List<byte[]> args = Arrays.asList(
String.valueOf(capacity).getBytes(),
String.valueOf(leakRate).getBytes(),
String.valueOf(currentTime).getBytes()
);
return connection.eval(LUA_SCRIPT.getBytes(), ReturnType.INTEGER, keys.size(), keys.toArray(new byte[0][]), args.toArray(new byte[0][]));
});
return result != null && result == 1;
}
}
代码说明
- Lua 脚本逻辑
:
- 从 Redis 中获取当前桶内水量(即请求数量)和上次更新的时间。
- 根据当前时间与上次更新时间的差值和设定的漏水速率计算“漏掉”的水量,并更新桶内水量(不能低于 0)。
- 判断加入当前请求后是否超过桶的容量,超过则返回 0(拒绝),否则将水量加 1 并更新记录,返回 1 表示允许。
- 原子执行
:同样通过
eval
方法保证操作原子性,避免并发修改问题。
总结
- 滑动窗口 :使用 Redis ZSet 记录请求时间戳,动态统计窗口内请求数,平滑控制突发流量。
- 令牌桶 :通过 Lua 脚本实现令牌的自动补充和扣减,支持一定的突发请求。
- 漏桶 :用固定漏水速率保证请求以均匀的速率被处理,避免瞬间大量请求。