当前位置: 首页 > ds >正文

每秒扛住10万请求?RedissonRateLimiter 分布式限流器详解

目录

  • Java源码分析
  • Lua脚本分析
    • 创建限流器
    • 尝试获取令牌
  • 案例实战
    • 第一次请求进入
    • 第二次请求进入


种一棵树最好的时间是10年前,其次就是现在,加油!
                                                                                   --by蜡笔小柯南

RedissonRateLimiter作为方便好用的限流工具,在某些场景下,极简了我们的开发,通过简单几行代码,就能搞定限流。那么,如何好用的限流器,底层是如何实现的呢?接下来,让我们一起去探索!

Java源码分析

这是 RedissonRateLimiter 类下的 tryAcquireAsync 方法,限流的主要逻辑在这里面

private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {byte[] random = getServiceManager().generateIdArray();return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"local rate = redis.call('hget', KEYS[1], 'rate');"+ "local interval = redis.call('hget', KEYS[1], 'interval');"+ "local type = redis.call('hget', KEYS[1], 'type');"+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"+ "local valueName = KEYS[2];"+ "local permitsName = KEYS[4];"+ "if type == '1' then "+ "valueName = KEYS[3];"+ "permitsName = KEYS[5];"+ "end;"+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "+ "local currentValue = redis.call('get', valueName); "+ "local res;"+ "if currentValue ~= false then "+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "+ "local released = 0; "+ "for i, v in ipairs(expiredValues) do "+ "local random, permits = struct.unpack('Bc0I', v);"+ "released = released + permits;"+ "end; "+ "if released > 0 then "+ "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "+ "if tonumber(currentValue) + released > tonumber(rate) then "+ "currentValue = tonumber(rate) - redis.call('zcard', permitsName); "+ "else "+ "currentValue = tonumber(currentValue) + released; "+ "end; "+ "redis.call('set', valueName, currentValue);"+ "end;"+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "+ "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "+ "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"+ "else "+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "+ "redis.call('decrby', valueName, ARGV[1]); "+ "res = nil; "+ "end; "+ "else "+ "redis.call('set', valueName, rate); "+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "+ "redis.call('decrby', valueName, ARGV[1]); "+ "res = nil; "+ "end;"+ "local ttl = redis.call('pttl', KEYS[1]); "+ "if ttl > 0 then "+ "redis.call('pexpire', valueName, ttl); "+ "redis.call('pexpire', permitsName, ttl); "+ "end; "+ "return res;",Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),value, System.currentTimeMillis(), random);}

可以看到,使用了大量的 lua 脚本,其中,KEYS[1]、ARGV[1]分别表示取的key的第1个值,以及参数的第1个值,中括号里面的数字,表示取的是第几个值。

我们看一下 evalWriteAsync 方法的定义,如下:

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);

最后两个参数,一个是 List<Object> keys,一个是 Object... params。KEYS[1]就是从 keys 中取第1个值,ARGV[1]就是从 params 中取第1个值。

Lua脚本分析

创建限流器

使用 Hash 结构来保存

保存和获取的命令分别为:

  • hset key field value
  • hget key field
redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);

创建限流器,向redis中保存了 rateintervaltype 信息

尝试获取令牌

通过 Lua 脚本实现,我把 Lua 脚本拷贝出来,写了注释

-- 速率,即规定的时间内,能够通过多少个
local rate = redis.call('hget', KEYS[1], 'rate');
-- 时间间隔,即规定的时间,如60秒
local interval = redis.call('hget', KEYS[1], 'interval');
-- 类型,用于区分单机限流还是集群限流,默认type=0,所以这里不用关注
local type = redis.call('hget', KEYS[1], 'type');
-- 校验,rate!=false && interval!=false && type!=false,其中任何一个不满足则抛出异常:RateLimiter is not initialized
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')-- {keyName}:value,大括号内是key的名称,后面+冒号+value共同组成valueName
local valueName = KEYS[2];-- {keyName}:permits,大括号内是key的名称,后面+冒号+ permits共同组成permitsName
local permitsName = KEYS[4];-- type即为第6行获取到的type的值,当type=1时才会走下面的逻辑,由于type默认为0,所以这里不用关注
if type == '1' then valueName = KEYS[3];permitsName = KEYS[5];
end;-- ARGV[1]是请求的令牌数量,rate必须要比请求的大
-- 如:60秒允许1个,rate=1,如果请求的令牌数是2,即rate<ARGV[1],则抛出异常,异常信息为:Requested permits amount could not exceed defined rate
assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); -- get valueName --> get {keyName}:value,第一次执行redis中什么都还没有,所以currentValue为null
-- 第二次执行,currentValue有值
local currentValue = redis.call('get', valueName); 
local res;-- 第一次:currentValue为null,进入else分支
-- 第二次,currentValue不为null,进入下面的if分支
if currentValue ~= false then -- 从zset中取数据,{keyName}:permits为key,取的区间是 0 ~ (当前时间戳 - 时间间隔)-- 如:第一次请求的时间是8:00:00,第二次请求的时间是8:00:30,当前时间戳就是8点30秒,时间间隔是60秒-- 8:00:30减去60秒,7:59:30,则取的数据就是0 ~ 7:59:30 这区间的数据-- 由于第一次的请求时间是8:00:00,所以取出的expiredValues值为空,也就是过期的数据local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); local released = 0; -- 遍历expiredValues,如果expiredValues有值,released=之前所有请求的令牌数之和for i, v in ipairs(expiredValues) do local random, permits = struct.unpack('Bc0I', v);-- 表示释放或者回收已过期的令牌,供下次请求使用released = released + permits;end; -- released大于0,说明有过期的数据,如果没有,不会进入此if分支if released > 0 then -- {keyName}:permits为key,区间是0 ~ (当前时间戳 - 时间间隔),将所有过期的数据从zset中移除redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); -- 如果当前令牌数 + 释放(回收)的令牌数 > rateif tonumber(currentValue) + released > tonumber(rate) then -- 当前的令牌数 = rate - zset中的元素个数currentValue = tonumber(rate) - redis.call('zcard', permitsName); else -- 当前的令牌数 = 当前的令牌数 + 释放(回收)的令牌数currentValue = tonumber(currentValue) + released; end; -- 将更新后的当前令牌数的值,重新保存到{keyName}:value中,表示当前可用的令牌总数redis.call('set', valueName, currentValue);end;-- 如果当前的令牌数小于请求的令牌数if tonumber(currentValue) < tonumber(ARGV[1]) then -- 获取zset中最早的一次请求,返回的firstValue中包含此次请求的令牌数和请求时间local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); -- ARGV[2]:当前时间戳 firstValue[2]:最早请求的时间戳,这里不同的版本源码有区别-- 最终表示的是,距离下次令牌产生还需要多长时间res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));-- 表示当前的令牌数>=请求的令牌数else -- 更新zset,当前请求的令牌数以及当前时间戳redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); -- 对{keyName}:value做自减,总的令牌数 - 请求的令牌数redis.call('decrby', valueName, ARGV[1]); res = nil; end; else -- set一个key-value数据,记录当前限流器的令牌数量-- set {keyName}:value rateredis.call('set', valueName, rate); -- 保存一个zset数据,{keyName}:permits为key,score是当前时间戳,member是经过格式化的,在redis可视化工具中-- 显示的是编码。实际上记录的就是这次请求的令牌数。所以,这个zset的数据,保存的就是请求的时间戳和请求的令牌数量redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); -- 对{keyName}:value做自减,总的令牌数 - 请求的令牌数redis.call('decrby', valueName, ARGV[1]); res = nil; 
end;-- 获取key的过期时间,如果设置了过期时间,则进行续期
local ttl = redis.call('pttl', KEYS[1]); 
if ttl > 0 then redis.call('pexpire', valueName, ttl); redis.call('pexpire', permitsName, ttl); 
end; 
return res;

redisson使用了zset 数据结构来保存请求的信息,这样可以通过比较 score 分数,也就是请求的时间戳,来判断令牌需不需要生产。如果当前请求距离上一次请求超过了一个令牌生产周期,则说明令牌需要生产,之前用掉了多少个,就生产多少个,而之前用掉了多少个令牌的信息也在 zset 中保存了;如果没有超过一个令牌周期,则判断剩余可用的令牌数是否满足请求的令牌数,如果满足,则通过,如果不满足,则拒绝。

案例实战

比如:我们设置,60秒内只允许1个请求通过,也就是说,每60秒,才会生产1个令牌,每次只允许请求1个令牌

这里我们使用发短信场景,60秒内,只允许发送1条短信

key为:telephone:limit:13612345678
rate为:1
interval为:60秒

创建完成后,redis中保存的数据结构是这样的,我们传入的是60秒,在实际保存时会转换为毫秒,所以interval的值是60000

在这里插入图片描述

第一次请求进入

分析源码可得,Lua 脚本中,KEYS[1]的值就是key的值,即KEYS[1] = telephone:limit:13612345678

local rate = redis.call('hget', KEYS[1], 'rate');
local interval = redis.call('hget', KEYS[1], 'interval');
local type = redis.call('hget', KEYS[1], 'type');

Lua 脚本中,这3条语句,转换为redis的实际命令就是:

  • hget telephone:limit:13612345678 rate
  • hget telephone:limit:13612345678 interval
  • hget telephone:limit:13612345678 type

获取到的值就是redis中hash结构保存的信息,得到:

  • rate:1
  • interval:60000
  • type:0

下面,继续执行这条语句:

assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')

这条语句对上面获取到的3个字段做了校验,rate、interval、type 必须同时不为空,否则,抛出异常,异常信息为:RateLimiter is not initialized

下面,继续执行以下语句:

local valueName = KEYS[2];
local permitsName = KEYS[4];
if type == '1' thenvalueName = KEYS[3];permitsName = KEYS[5];
end;

经源码分析可得
KEYS[2] = {telephone:limit:13612345678}:value
KEYS[4] = {telephone:limit:13612345678}:permits

即:
valueName = {telephone:limit:13612345678}:value
permitsName = {telephone:limit:13612345678}:permits

由于 type = 0 ,所以这里的if分支不进入。继续执行后面语句

assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'

这条语句是校验:请求的令牌数不能超过设置的 rate 数,如:我们 rate 设为了1,请求的令牌数是2,则抛出异常,异常信息为:Requested permits amount could not exceed defined rate

继续执行后续语句,

local currentValue = redis.call('get', valueName); 
local res;
if currentValue ~= false then

currentValue 实际执行的redis命令是:

  • get {telephone:limit:13612345678}:value

由于是第一次请求,此时redis中除了保存的 hash 结构的数据外,没有其他任何数据,所以 currentValue 获取到的就是null,再判断 currentValue,由于它为null,if 分支进不去,直接到else分支

else redis.call('set', valueName, rate); redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); redis.call('decrby', valueName, ARGV[1]); res = nil; 
end;

set valueName rate 转换为redis命令就是:

set {telephone:limit:13612345678}:value 1

此时,currentValue才保存到redis中

再向redis中保存一个 zset ,key是permitsName,即{telephone:limit:13612345678}:permits,redis中 zadd 的命令格式是:
zadd key socre member ,最终,score就是当前的时间戳,member就是当前请求的令牌数

在这里插入图片描述

最后,使用 decrby 命令对 valueName 进行自减
decrby {telephone:limit:13612345678}:value 1

自减1后,此时,redis中的值为0

在这里插入图片描述

继续执行后续语句:

local ttl = redis.call('pttl', KEYS[1]);
if ttl > 0 thenredis.call('pexpire', valueName, ttl);redis.call('pexpire', permitsName, ttl);
end
return res;

获取过期时间,默认我们没有手动设置过期时间,所以获取到的 ttl = -1 ,不满足 if 条件,if 分支不执行。后续我们直接略过此段代码。

至此,第一次请求完成,向redis中新增的key有:

  • {telephone:limit:13612345678}:value
    • String 结构,值为0
  • {telephone:limit:13612345678}:permits
    • hash 结构,member为请求的令牌数,score为这次请求的时间戳

第二次请求进入

当执行语句到local currentValue = redis.call('get', valueName);时,currentValue有值,为0,就会进入if currentValue ~= false分支

后续执行的语句为:

if currentValue ~= false thenlocal expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);local released = 0;for i, v in ipairs(expiredValues) dolocal random, permits = struct.unpack('Bc0I', v);released = released + permits;
end;

redis.call 实际执行的redis命令是:zrangebyscore {telephone:limit:13612345678}:permits 0 当前时间-interval,从zset中取数据,key是{telephone:limit:13612345678}:permits,范围是 0 ~ (当前时间戳 - interval)。

例如:第一次请求的时间是8:00:00,请求的令牌数量是1个,那么zset中就会记录第一次请求的时间和请求的令牌数量;第二次的请求时间是8:00:30,interval是60秒,第二次请求的时间减去interval 就是7:59:30,获取的zset数据范围是 0 ~ 7:59:30,因为第一次请求是8:00:00,所以这次获取不到数据,expiredValues就是空,released = 0,for循环不会进入,此语句块执行结束。

因为 released = 0,所以 if released > 0 的分支不会进入,直接走到后面的语句。

if tonumber(currentValue) < tonumber(ARGV[1]) thenlocal firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores');res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));
elseredis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));redis.call('decrby', valueName, ARGV[1]);res = nil;
end;

currentValue是当前的令牌数,ARGV[1] 是请求的令牌数,如果当前的令牌数 < 请求的令牌数,则获取zset中按score排序后,score最小的那一条数据,即时间最早的那一条数据,转换为redis命令是:zrange {telephone:limit:13612345678}:permits 0 0 withscores,如果firstValue有值,说明这次请求应该被拒绝,没有可用的令牌提供给这次请求,返回距离下次令牌产生还需要多长时间。

如:60秒允许1个请求通过,9:00:00第一个请求,请求1个令牌,成功通过,令牌分配给第一个请求,此时当前可用令牌为0,9:00:05第二个请求,请求1个令牌,当前已没有空闲可用的令牌,得需要下一个时间周期后才会产生新的令牌,也就是60秒后,第二个请求只比第一个请求晚了5秒,小于令牌产生的周期,所以,第二个请求则不通过,返回距离下次令牌产生还需要多长时间。

下面是else分支执行的语句:

elseredis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); redis.call('decrby', valueName, ARGV[1]); res = nil; 
end; 

如果当前的令牌数 >= 请求的令牌数,走入else分支,这里就比较简单,向zset中保存数据,对valueName做自减1,结束。

下面讨论released > 0 的情况:

if currentValue ~= false thenlocal expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);local released = 0;for i, v in ipairs(expiredValues) dolocal random, permits = struct.unpack('Bc0I', v);released = released + permits;
end;
if released > 0 thenredis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);if tonumber(currentValue) + released > tonumber(rate) thencurrentValue = tonumber(rate) - redis.call('zcard', permitsName);elsecurrentValue = tonumber(currentValue) + released;endredis.call('set', valueName, currentValue);
end

released > 0 说明 firstValue有值,进入for循环后,给released进行了赋值。

例如:第一次请求的时间是8:00:00,请求的令牌数量是1个,那么zset中就会记录第一次请求的时间和请求的令牌数量;第二次的请求时间是8:01:01,interval是60秒,第二次请求的时间减去interval 就是8:00:01,获取的zset数据范围是 0 ~ 8:00:01,因为第一次请求是8:00:00,所以能获取到数据,expiredValues中记录了请求的令牌数以及时间,就是已过期的数据。

进入for循环,permits就是需要释放的令牌数,因为第一次请求的令牌数是1,所以permits = 1,released = released + permits,released = 0 + 1 = 1,released > 0,进入 if 分支。

使用 zremrangebyscore 移除过期的数据,移除的就是expiredValues,即数据范围是0 ~ 8:00:01的数据。下面又做了一层校验,如果 当前令牌数 + 释放的令牌数 > rate,则 当前令牌数 = rate - zset中的元素个数,保证总数不能超过设置的rate;否则,进入else分支,将释放的令牌数加到当前令牌数中,对currentValue进行赋值,更新当前令牌数的值。


完结散花!这一套组合拳分析下来,你学废了吗?

如果你有任何疑问或经验分享,可以在评论区留言哦~~

不管在任何时候,我希望你永远不要害怕挑战,不要畏惧失败。每一个错误都是向成功迈出的一步,每一个挑战都是成长的机会,因为每一次的努力,都会使我们离梦想更近一点。只要你行动起来,任何时候都不算晚。最后,把座右铭送给大家:种一棵树最好的时间是10年前,其次就是现在,加油!共勉 💪。
http://www.xdnf.cn/news/19931.html

相关文章:

  • (五)Python控制结构(循环结构)
  • C++学习——继承
  • 刷题日记0902
  • 一年级这样排座位,家长超满意!
  • 互联网大厂求职面试记:谢飞机的搞笑答辩
  • 零知开源——STM32红外通信YS-IRTM红外编解码器集成灯控与显示系统
  • 科学研究系统性思维的方法体系:数据分析方法
  • 海康摄像头开发---标准配置结构体(NET_DVR_STD_CONFIG)
  • AI任务相关解决方案13-AI智能体架构方案(意图识别+多任务规划+MCP+RAG)与关键技术深度解析研究报告,以及实现代码
  • CentOS 7/8 单用户模式重置 root 密码完整流程
  • Shell 秘典(卷七)—— 流刃裁文秘术・sed 玄章精解
  • Shell文本处理四剑客
  • 在 Qt 的 .pro 文件中设置警告级别和 C++11 标准
  • 宋红康 JVM 笔记 Day10|对象实例化
  • 2025年经济学专业人士证书选择与分析
  • 科技信息差(9.2)
  • =Windows下VSCode配置SSH密钥远程登录
  • BWN-4000指纹采集器技术规格书
  • 四端子电阻有哪些好处?-华年商城
  • WPF依赖属性和依赖属性的包装器:
  • 鸿蒙权限崩溃?一招解决闪退难题
  • 单调栈与单调队列
  • ThinkPHP的log
  • C++二维数组的前缀和
  • 小杰机械视觉(finally)——题库上——gray、thresh、erode、dilate、HSV、开运算、ROI切割、矫正。
  • 博主必备神器~
  • HBase Region
  • 【代码解读】Deepseek_vl2中具体代码调用
  • 一款高效、强大的子域名爬取工具,帮助安全研究者和渗透测试人员快速收集目标域名的子域名信息
  • 服务器数据恢复—OceanStor存储数据丢失原来这样恢复