每秒扛住10万请求?RedissonRateLimiter 分布式限流器详解
目录
- Java源码分析
- Lua脚本分析
- 创建限流器
- 尝试获取令牌
- 案例实战
- 第一次请求进入
- 第二次请求进入
种一棵树最好的时间是10年前,其次就是现在,加油!
--by蜡笔小柯南
RedissonRateLimiter作为方便好用的限流工具,在某些场景下,极简了我们的开发,通过简单几行代码,就能搞定限流。那么,如何好用的限流器,底层是如何实现的呢?接下来,让我们一起去探索!
Java源码分析
这是 RedissonRateLimiter
类下的 tryAcquireAsync
方法,限流的主要逻辑在这里面
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {byte[] random = getServiceManager().generateIdArray();return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,"local rate = redis.call('hget', KEYS[1], 'rate');"+ "local interval = redis.call('hget', KEYS[1], 'interval');"+ "local type = redis.call('hget', KEYS[1], 'type');"+ "assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')"+ "local valueName = KEYS[2];"+ "local permitsName = KEYS[4];"+ "if type == '1' then "+ "valueName = KEYS[3];"+ "permitsName = KEYS[5];"+ "end;"+ "assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); "+ "local currentValue = redis.call('get', valueName); "+ "local res;"+ "if currentValue ~= false then "+ "local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "+ "local released = 0; "+ "for i, v in ipairs(expiredValues) do "+ "local random, permits = struct.unpack('Bc0I', v);"+ "released = released + permits;"+ "end; "+ "if released > 0 then "+ "redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); "+ "if tonumber(currentValue) + released > tonumber(rate) then "+ "currentValue = tonumber(rate) - redis.call('zcard', permitsName); "+ "else "+ "currentValue = tonumber(currentValue) + released; "+ "end; "+ "redis.call('set', valueName, currentValue);"+ "end;"+ "if tonumber(currentValue) < tonumber(ARGV[1]) then "+ "local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); "+ "res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));"+ "else "+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "+ "redis.call('decrby', valueName, ARGV[1]); "+ "res = nil; "+ "end; "+ "else "+ "redis.call('set', valueName, rate); "+ "redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); "+ "redis.call('decrby', valueName, ARGV[1]); "+ "res = nil; "+ "end;"+ "local ttl = redis.call('pttl', KEYS[1]); "+ "if ttl > 0 then "+ "redis.call('pexpire', valueName, ttl); "+ "redis.call('pexpire', permitsName, ttl); "+ "end; "+ "return res;",Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),value, System.currentTimeMillis(), random);}
可以看到,使用了大量的 lua
脚本,其中,KEYS[1]、ARGV[1]分别表示取的key的第1个值,以及参数的第1个值,中括号里面的数字,表示取的是第几个值。
我们看一下 evalWriteAsync
方法的定义,如下:
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);
最后两个参数,一个是 List<Object> keys
,一个是 Object... params
。KEYS[1]就是从 keys
中取第1个值,ARGV[1]就是从 params
中取第1个值。
Lua脚本分析
创建限流器
使用 Hash
结构来保存
保存和获取的命令分别为:
- hset key field value
- hget key field
redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);
创建限流器,向redis中保存了 rate
、interval
、type
信息
尝试获取令牌
通过 Lua
脚本实现,我把 Lua
脚本拷贝出来,写了注释
-- 速率,即规定的时间内,能够通过多少个
local rate = redis.call('hget', KEYS[1], 'rate');
-- 时间间隔,即规定的时间,如60秒
local interval = redis.call('hget', KEYS[1], 'interval');
-- 类型,用于区分单机限流还是集群限流,默认type=0,所以这里不用关注
local type = redis.call('hget', KEYS[1], 'type');
-- 校验,rate!=false && interval!=false && type!=false,其中任何一个不满足则抛出异常:RateLimiter is not initialized
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')-- {keyName}:value,大括号内是key的名称,后面+冒号+value共同组成valueName
local valueName = KEYS[2];-- {keyName}:permits,大括号内是key的名称,后面+冒号+ permits共同组成permitsName
local permitsName = KEYS[4];-- type即为第6行获取到的type的值,当type=1时才会走下面的逻辑,由于type默认为0,所以这里不用关注
if type == '1' then valueName = KEYS[3];permitsName = KEYS[5];
end;-- ARGV[1]是请求的令牌数量,rate必须要比请求的大
-- 如:60秒允许1个,rate=1,如果请求的令牌数是2,即rate<ARGV[1],则抛出异常,异常信息为:Requested permits amount could not exceed defined rate
assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'); -- get valueName --> get {keyName}:value,第一次执行redis中什么都还没有,所以currentValue为null
-- 第二次执行,currentValue有值
local currentValue = redis.call('get', valueName);
local res;-- 第一次:currentValue为null,进入else分支
-- 第二次,currentValue不为null,进入下面的if分支
if currentValue ~= false then -- 从zset中取数据,{keyName}:permits为key,取的区间是 0 ~ (当前时间戳 - 时间间隔)-- 如:第一次请求的时间是8:00:00,第二次请求的时间是8:00:30,当前时间戳就是8点30秒,时间间隔是60秒-- 8:00:30减去60秒,7:59:30,则取的数据就是0 ~ 7:59:30 这区间的数据-- 由于第一次的请求时间是8:00:00,所以取出的expiredValues值为空,也就是过期的数据local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); local released = 0; -- 遍历expiredValues,如果expiredValues有值,released=之前所有请求的令牌数之和for i, v in ipairs(expiredValues) do local random, permits = struct.unpack('Bc0I', v);-- 表示释放或者回收已过期的令牌,供下次请求使用released = released + permits;end; -- released大于0,说明有过期的数据,如果没有,不会进入此if分支if released > 0 then -- {keyName}:permits为key,区间是0 ~ (当前时间戳 - 时间间隔),将所有过期的数据从zset中移除redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); -- 如果当前令牌数 + 释放(回收)的令牌数 > rateif tonumber(currentValue) + released > tonumber(rate) then -- 当前的令牌数 = rate - zset中的元素个数currentValue = tonumber(rate) - redis.call('zcard', permitsName); else -- 当前的令牌数 = 当前的令牌数 + 释放(回收)的令牌数currentValue = tonumber(currentValue) + released; end; -- 将更新后的当前令牌数的值,重新保存到{keyName}:value中,表示当前可用的令牌总数redis.call('set', valueName, currentValue);end;-- 如果当前的令牌数小于请求的令牌数if tonumber(currentValue) < tonumber(ARGV[1]) then -- 获取zset中最早的一次请求,返回的firstValue中包含此次请求的令牌数和请求时间local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); -- ARGV[2]:当前时间戳 firstValue[2]:最早请求的时间戳,这里不同的版本源码有区别-- 最终表示的是,距离下次令牌产生还需要多长时间res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));-- 表示当前的令牌数>=请求的令牌数else -- 更新zset,当前请求的令牌数以及当前时间戳redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); -- 对{keyName}:value做自减,总的令牌数 - 请求的令牌数redis.call('decrby', valueName, ARGV[1]); res = nil; end; else -- set一个key-value数据,记录当前限流器的令牌数量-- set {keyName}:value rateredis.call('set', valueName, rate); -- 保存一个zset数据,{keyName}:permits为key,score是当前时间戳,member是经过格式化的,在redis可视化工具中-- 显示的是编码。实际上记录的就是这次请求的令牌数。所以,这个zset的数据,保存的就是请求的时间戳和请求的令牌数量redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); -- 对{keyName}:value做自减,总的令牌数 - 请求的令牌数redis.call('decrby', valueName, ARGV[1]); res = nil;
end;-- 获取key的过期时间,如果设置了过期时间,则进行续期
local ttl = redis.call('pttl', KEYS[1]);
if ttl > 0 then redis.call('pexpire', valueName, ttl); redis.call('pexpire', permitsName, ttl);
end;
return res;
redisson使用了zset
数据结构来保存请求的信息,这样可以通过比较 score
分数,也就是请求的时间戳,来判断令牌需不需要生产。如果当前请求距离上一次请求超过了一个令牌生产周期,则说明令牌需要生产,之前用掉了多少个,就生产多少个,而之前用掉了多少个令牌的信息也在 zset
中保存了;如果没有超过一个令牌周期,则判断剩余可用的令牌数是否满足请求的令牌数,如果满足,则通过,如果不满足,则拒绝。
案例实战
比如:我们设置,60秒内只允许1个请求通过,也就是说,每60秒,才会生产1个令牌,每次只允许请求1个令牌
这里我们使用发短信场景,60秒内,只允许发送1条短信
key为:telephone:limit:13612345678
rate为:1
interval为:60秒
创建完成后,redis中保存的数据结构是这样的,我们传入的是60秒,在实际保存时会转换为毫秒,所以interval的值是60000
第一次请求进入
分析源码可得,Lua
脚本中,KEYS[1]的值就是key的值,即KEYS[1] = telephone:limit:13612345678
local rate = redis.call('hget', KEYS[1], 'rate');
local interval = redis.call('hget', KEYS[1], 'interval');
local type = redis.call('hget', KEYS[1], 'type');
Lua
脚本中,这3条语句,转换为redis的实际命令就是:
- hget telephone:limit:13612345678 rate
- hget telephone:limit:13612345678 interval
- hget telephone:limit:13612345678 type
获取到的值就是redis中hash
结构保存的信息,得到:
- rate:1
- interval:60000
- type:0
下面,继续执行这条语句:
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')
这条语句对上面获取到的3个字段做了校验,rate、interval、type 必须同时不为空,否则,抛出异常,异常信息为:RateLimiter is not initialized
下面,继续执行以下语句:
local valueName = KEYS[2];
local permitsName = KEYS[4];
if type == '1' thenvalueName = KEYS[3];permitsName = KEYS[5];
end;
经源码分析可得
KEYS[2] = {telephone:limit:13612345678}:value
KEYS[4] = {telephone:limit:13612345678}:permits
即:
valueName = {telephone:limit:13612345678}:value
permitsName = {telephone:limit:13612345678}:permits
由于 type = 0
,所以这里的if分支不进入。继续执行后面语句
assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate'
这条语句是校验:请求的令牌数不能超过设置的 rate
数,如:我们 rate
设为了1,请求的令牌数是2,则抛出异常,异常信息为:Requested permits amount could not exceed defined rate
继续执行后续语句,
local currentValue = redis.call('get', valueName);
local res;
if currentValue ~= false then
currentValue
实际执行的redis命令是:
- get {telephone:limit:13612345678}:value
由于是第一次请求,此时redis中除了保存的 hash
结构的数据外,没有其他任何数据,所以 currentValue
获取到的就是null,再判断 currentValue,由于它为null,if 分支进不去,直接到else分支
else redis.call('set', valueName, rate); redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); redis.call('decrby', valueName, ARGV[1]); res = nil;
end;
set valueName rate 转换为redis命令就是:
set {telephone:limit:13612345678}:value 1
此时,currentValue才保存到redis中
再向redis中保存一个 zset
,key是permitsName,即{telephone:limit:13612345678}:permits,redis中 zadd
的命令格式是:
zadd key socre member
,最终,score就是当前的时间戳,member就是当前请求的令牌数
最后,使用 decrby
命令对 valueName 进行自减
decrby {telephone:limit:13612345678}:value 1
自减1后,此时,redis中的值为0
继续执行后续语句:
local ttl = redis.call('pttl', KEYS[1]);
if ttl > 0 thenredis.call('pexpire', valueName, ttl);redis.call('pexpire', permitsName, ttl);
end
return res;
获取过期时间,默认我们没有手动设置过期时间,所以获取到的 ttl = -1
,不满足 if 条件,if 分支不执行。后续我们直接略过此段代码。
至此,第一次请求完成,向redis中新增的key有:
- {telephone:limit:13612345678}:value
String
结构,值为0
- {telephone:limit:13612345678}:permits
hash
结构,member为请求的令牌数,score为这次请求的时间戳
第二次请求进入
当执行语句到local currentValue = redis.call('get', valueName);
时,currentValue有值,为0,就会进入if currentValue ~= false
分支
后续执行的语句为:
if currentValue ~= false thenlocal expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);local released = 0;for i, v in ipairs(expiredValues) dolocal random, permits = struct.unpack('Bc0I', v);released = released + permits;
end;
redis.call
实际执行的redis命令是:zrangebyscore {telephone:limit:13612345678}:permits 0 当前时间-interval
,从zset中取数据,key是{telephone:limit:13612345678}:permits,范围是 0 ~ (当前时间戳 - interval)。
例如:第一次请求的时间是8:00:00,请求的令牌数量是1个,那么zset
中就会记录第一次请求的时间和请求的令牌数量;第二次的请求时间是8:00:30,interval是60秒,第二次请求的时间减去interval
就是7:59:30,获取的zset
数据范围是 0 ~ 7:59:30,因为第一次请求是8:00:00,所以这次获取不到数据,expiredValues
就是空,released = 0,for循环不会进入,此语句块执行结束。
因为 released = 0
,所以 if released > 0 的分支不会进入,直接走到后面的语句。
if tonumber(currentValue) < tonumber(ARGV[1]) thenlocal firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores');res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));
elseredis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1]));redis.call('decrby', valueName, ARGV[1]);res = nil;
end;
currentValue是当前的令牌数,ARGV[1] 是请求的令牌数,如果当前的令牌数 < 请求的令牌数,则获取zset
中按score排序后,score最小的那一条数据,即时间最早的那一条数据,转换为redis命令是:zrange {telephone:limit:13612345678}:permits 0 0 withscores
,如果firstValue有值,说明这次请求应该被拒绝,没有可用的令牌提供给这次请求,返回距离下次令牌产生还需要多长时间。
如:60秒允许1个请求通过,9:00:00第一个请求,请求1个令牌,成功通过,令牌分配给第一个请求,此时当前可用令牌为0,9:00:05第二个请求,请求1个令牌,当前已没有空闲可用的令牌,得需要下一个时间周期后才会产生新的令牌,也就是60秒后,第二个请求只比第一个请求晚了5秒,小于令牌产生的周期,所以,第二个请求则不通过,返回距离下次令牌产生还需要多长时间。
下面是else分支执行的语句:
elseredis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); redis.call('decrby', valueName, ARGV[1]); res = nil;
end;
如果当前的令牌数 >= 请求的令牌数,走入else分支,这里就比较简单,向zset
中保存数据,对valueName做自减1,结束。
下面讨论released > 0 的情况:
if currentValue ~= false thenlocal expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);local released = 0;for i, v in ipairs(expiredValues) dolocal random, permits = struct.unpack('Bc0I', v);released = released + permits;
end;
if released > 0 thenredis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval);if tonumber(currentValue) + released > tonumber(rate) thencurrentValue = tonumber(rate) - redis.call('zcard', permitsName);elsecurrentValue = tonumber(currentValue) + released;endredis.call('set', valueName, currentValue);
end
released > 0 说明 firstValue有值,进入for循环后,给released进行了赋值。
例如:第一次请求的时间是8:00:00,请求的令牌数量是1个,那么zset
中就会记录第一次请求的时间和请求的令牌数量;第二次的请求时间是8:01:01,interval是60秒,第二次请求的时间减去interval
就是8:00:01,获取的zset
数据范围是 0 ~ 8:00:01,因为第一次请求是8:00:00,所以能获取到数据,expiredValues
中记录了请求的令牌数以及时间,就是已过期的数据。
进入for循环,permits就是需要释放的令牌数,因为第一次请求的令牌数是1,所以permits = 1,released = released + permits,released = 0 + 1 = 1,released > 0,进入 if 分支。
使用 zremrangebyscore
移除过期的数据,移除的就是expiredValues,即数据范围是0 ~ 8:00:01的数据。下面又做了一层校验,如果 当前令牌数 + 释放的令牌数 > rate,则 当前令牌数 = rate - zset中的元素个数,保证总数不能超过设置的rate;否则,进入else分支,将释放的令牌数加到当前令牌数中,对currentValue进行赋值,更新当前令牌数的值。
完结散花!这一套组合拳分析下来,你学废了吗?
如果你有任何疑问或经验分享,可以在评论区留言哦~~
不管在任何时候,我希望你永远不要害怕挑战,不要畏惧失败。每一个错误都是向成功迈出的一步,每一个挑战都是成长的机会,因为每一次的努力,都会使我们离梦想更近一点。只要你行动起来,任何时候都不算晚。最后,把座右铭送给大家:种一棵树最好的时间是10年前,其次就是现在,加油!共勉 💪。