【NFTurbo】基于Redisson滑动窗口实现验证码发送限流
【NFTurbo】基于Redisson滑动窗口实现验证码发送限流
- 1. 场景
- 2. 代码
- 3. 核心解释
- 4. Redisson分布式限流器RRateLimiter原理解析
- 4.1 转载原理解析
1. 场景
服务在登录和注册的时候需要短信验证,为了防止被灰黑产抓包盗刷,需要限制次数,这里我们在前端做了按钮的置灰,但是通过接口的方式还是可以盗刷,所以这里主要是靠后端来拦截的。
2. 代码
@DubboReference(version = "1.0.0")
private NoticeFacadeService noticeFacadeService;@GetMapping("/sendCaptcha")
public Result<Boolean> sendCaptcha(@IsMobile String telephone) {NoticeResponse noticeResponse = noticeFacadeService.generateAndSendSmsCaptcha(telephone);return Result.success(noticeResponse.getSuccess());
}
package cn.hollis.nft.turbo.api.notice.constant;/*** 通知相关的常量** @author Hollis*/
public class NoticeConstant {public static final String CAPTCHA_KEY_PREFIX = "nft:turbo:captcha:";
}========================================================================================================
package cn.hollis.nft.turbo.notice.facade;import cn.hollis.nft.turbo.api.notice.response.NoticeResponse;
import cn.hollis.nft.turbo.api.notice.service.NoticeFacadeService;
import cn.hollis.nft.turbo.base.exception.SystemException;
import cn.hollis.nft.turbo.limiter.SlidingWindowRateLimiter;
import cn.hollis.nft.turbo.notice.domain.constant.NoticeState;
import cn.hollis.nft.turbo.notice.domain.entity.Notice;
import cn.hollis.nft.turbo.notice.domain.service.NoticeService;
import cn.hollis.nft.turbo.rpc.facade.Facade;
import cn.hollis.nft.turbo.sms.SmsService;
import cn.hollis.nft.turbo.sms.response.SmsSendResponse;
import cn.hutool.core.util.RandomUtil;
import com.alibaba.fastjson.JSON;
import org.apache.dubbo.config.annotation.DubboService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;import java.util.Date;
import java.util.concurrent.TimeUnit;import static cn.hollis.nft.turbo.api.notice.constant.NoticeConstant.CAPTCHA_KEY_PREFIX;
import static cn.hollis.nft.turbo.base.exception.BizErrorCode.SEND_NOTICE_DUPLICATED;/*** @author Hollis*/
@DubboService(version = "1.0.0")
public class NoticeFacadeServiceImpl implements NoticeFacadeService {@Autowiredprivate SlidingWindowRateLimiter slidingWindowRateLimiter;@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate NoticeService noticeService;@Autowiredprivate SmsService smsService;/*** 生成并发送短信验证码** @param telephone 接收验证码的手机号码* @return 验证码发送结果响应对象*/@Facade@Overridepublic NoticeResponse generateAndSendSmsCaptcha(String telephone) {// 使用滑动窗口限流器控制验证码发送频率,每个手机号60秒内只能发送1次Boolean access = slidingWindowRateLimiter.tryAcquire(telephone, 1, 60);if (!access) {throw new SystemException(SEND_NOTICE_DUPLICATED);}// 生成4位随机数字验证码String captcha = RandomUtil.randomNumbers(4);// 将验证码存储到Redis中,设置5分钟过期时间redisTemplate.opsForValue().set(CAPTCHA_KEY_PREFIX + telephone, captcha, 5, TimeUnit.MINUTES);Notice notice = noticeService.saveCaptcha(telephone, captcha);// 异步发送短信验证码Thread.ofVirtual().start(() -> {SmsSendResponse result = smsService.sendMsg(notice.getTargetAddress(), notice.getNoticeContent());if (result.getSuccess()) {notice.setState(NoticeState.SUCCESS);notice.setSendSuccessTime(new Date());noticeService.updateById(notice);} else {notice.setState(NoticeState.FAILED);notice.addExtendInfo("executeResult", JSON.toJSONString(result));noticeService.updateById(notice);}});return new NoticeResponse.Builder().setSuccess(true).build();}
}
package cn.hollis.nft.turbo.limiter; // 包名:用于组织当前限流器类所在的命名空间import org.redisson.api.RRateLimiter; // 引入 Redisson 提供的分布式限流器接口(令牌桶实现)
import org.redisson.api.RateIntervalUnit; // 引入时间单位枚举(秒、毫秒等)
import org.redisson.api.RateType; // 引入限流类型枚举(OVERALL 全局、PER_CLIENT 按客户端)
import org.redisson.api.RedissonClient; // 引入 Redisson 客户端,用于操作 Redis/*** 滑动窗口限流服务 // 类的文档注释:对外说明用途** @author Hollis*/
public class SlidingWindowRateLimiter implements RateLimiter { // 声明类:实现自定义的 RateLimiter 接口(对外暴露 tryAcquire)private RedissonClient redissonClient; // Redisson 客户端实例,用于获取/创建分布式限流器private static final String LIMIT_KEY_PREFIX = "nft:turbo:limit:"; // Redis 中 key 的统一前缀,区分业务与避免冲突public SlidingWindowRateLimiter(RedissonClient redissonClient) { // 构造方法:通过依赖注入传入 Redisson 客户端this.redissonClient = redissonClient; // 保存 Redisson 客户端到成员变量}@Overridepublic Boolean tryAcquire(String key, int limit, int windowSize) { // 对外方法:尝试申请一次“验证码发送”的名额RRateLimiter rRateLimiter = // 从 Redisson 获取一个分布式限流器实例(懒创建)redissonClient.getRateLimiter(LIMIT_KEY_PREFIX + key); // key 一般按维度区分(如手机号/邮箱),保证“每个 key 单独限流”if (!rRateLimiter.isExists()) { // 如果该限流器还未在 Redis 中初始化速率配置rRateLimiter.trySetRate( // 尝试设置限流速率(只在未设置过时生效)RateType.OVERALL, // 限流类型:OVERALL 表示对所有客户端全局共享该配额limit, // 在一个时间窗口内允许的最大请求数(如:3 次)windowSize, // 时间窗口大小(如:60 表示 60 秒)RateIntervalUnit.SECONDS // 时间窗口单位(此处为“秒”));}return rRateLimiter.tryAcquire(); // 立即尝试获取 1 个令牌;有令牌则返回 true,否则返回 false(被限流)}
}==================================================================================================
package cn.hollis.nft.turbo.limiter;/*** 限流服务** @author Hollis*/
public interface RateLimiter {/*** 判断一个key是否可以通过** @param key 限流的key* @param limit 限流的数量* @param windowSize 窗口大小,单位为秒* @return*/public Boolean tryAcquire(String key, int limit, int windowSize);
}
3. 核心解释
-
这个实现使用的是 Redisson 的 RRateLimiter(令牌桶算法)。名字叫“滑动窗口”,但底层不是 ZSet 的“严格滑动窗口”实现;令牌桶在效果上也能平滑限流,足以覆盖验证码发送的大多数需求。如果你需要严格的滑动窗口统计(按“过去 N 秒内真实次数”),可用 ZSet + 时间戳自己实现。
-
RateType.OVERALL:所有实例/客户端共享同一配额,适合分布式场景确保全局一致的限流。若要按“客户端实例”区分,可用 PER_CLIENT(但验证码一般用 OVERALL)。
-
isExists() 与 trySetRate(…) 的配合是并发安全的:即使多实例并发初始化,trySetRate 只有在未设置过时才会成功,已存在则返回 false,不会互相覆盖。实践中也可以直接调用 trySetRate,无需先判 isExists()。
-
速率一旦设置,后续 trySetRate 不会更新已有配置;若你想动态调整限流规则:
- 新版本 Redisson 支持 setRate(…)(可直接更新);
- 或者先 delete() 该 limiter key,再 trySetRate(…) 重新设置(注意瞬时并发窗口)。
-
tryAcquire() 是非阻塞、立即返回的获取 1 个令牌。若希望等待一小段时间再放弃,可使用带超时/异步的重载(如 tryAcquireAsync(permits, timeout, unit))。
-
Key 设计:“nft:turbo:limit:” + key 建议确保 key 维度明确(手机号/邮箱/IP 等);在 Redis Cluster 中如果你有跨键 Lua/事务需求,可考虑使用 哈希标签(如 “{nft:turbo:limit}:” + key)来控制槽位,但 RRateLimiter 本身只占用一个 Redis 键,通常不必强制同槽。
4. Redisson分布式限流器RRateLimiter原理解析
https://github.com/oneone1995/blog/issues/13
4.1 转载原理解析
redisson就不多做介绍了,它提供的分布式锁非常强大,一般公司都会选择它在生产环境中使用。但其提供的其他分布式工具就不是那么有名了,比如其提供的分布式限流器RRateLimiter网上几乎没有分析它的文章,本文也基于此目的记录一下学习RRateLimiter的心得。如有不对,请多指正。
简单使用
public class Main {public static void main(String[] args) throws InterruptedException {RRateLimiter rateLimiter = createLimiter();int allThreadNum = 20;CountDownLatch latch = new CountDownLatch(allThreadNum);long startTime = System.currentTimeMillis();for (int i = 0; i < allThreadNum; i++) {new Thread(() -> {rateLimiter.acquire(1);System.out.println(Thread.currentThread().getName());latch.countDown();}).start();}latch.await();System.out.println("Elapsed " + (System.currentTimeMillis() - startTime));}public static RRateLimiter createLimiter() {Config config = new Config();config.useSingleServer().setTimeout(1000000).setAddress("redis://127.0.0.1:6379");RedissonClient redisson = Redisson.create(config);RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");// 初始化// 最大流速 = 每1秒钟产生1个令牌rateLimiter.trySetRate(RateType.OVERALL, 1, 1, RateIntervalUnit.SECONDS);return rateLimiter;}
}
源码分析
1.创建限流器源码
redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);
2.获取令牌
通过Demo的代码示例点进去,最后可以看到执行lua脚本拿令牌的代码在org.redisson.RedissonRateLimiter#tryAcquireAsync(org.redisson.client.protocol.RedisCommand<T>, java.lang.Long)
这个方法里,我把它的lua脚本拷出来写了注释
-- 速率
local rate = redis.call("hget", KEYS[1], "rate")
-- 时间区间(ms)
local interval = redis.call("hget", KEYS[1], "interval")
local type = redis.call("hget", KEYS[1], "type")
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")-- {name}:value 分析后面的代码,这个key记录的是当前令牌桶中的令牌数
local valueName = KEYS[2]-- {name}:permits 这个key是一个zset,记录了请求的令牌数,score则为请求的时间戳
local permitsName = KEYS[4]-- 单机限流才会用到,集群模式不用关注
if type == "1" thenvalueName = KEYS[3]permitsName = KEYS[5]
end-- 原版本有bug(https://github.com/redisson/redisson/issues/3197),最新版将这行代码提前了
-- rate为1 arg1这里是 请求的令牌数量(默认是1)。rate必须比请求的令牌数大
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")-- 第一次执行这里应该是null,会进到else分支
-- 第二次执行到这里由于else分支中已经放了valueName的值进去,所以第二次会进if分支
local currentValue = redis.call("get", valueName)
if currentValue ~= false then-- 从第一次设的zset中取数据,范围是0 ~ (第二次请求时间戳 - 令牌生产的时间)-- 可以看到,如果第二次请求时间距离第一次请求时间很短(小于令牌产生的时间),那么这个差值将小于上一次请求的时间,取出来的将会是空列表。反之,能取出之前的请求信息-- 这里作者将这个取出来的数据命名为expiredValues,可认为指的是过期的数据local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)local released = 0-- lua迭代器,遍历expiredValues,如果有值,那么released等于之前所有请求的令牌数之和,表示应该释放多少令牌for i, v in ipairs(expiredValues) dolocal random, permits = struct.unpack("fI", v)released = released + permitsend-- 没有过期请求的话,released还是0,这个if不会进,有过期请求才会进if released > 0 then-- 移除zset中所有元素,重置周期redis.call("zrem", permitsName, unpack(expiredValues))currentValue = tonumber(currentValue) + releasedredis.call("set", valueName, currentValue)end-- 这里简单分析下上面这段代码:-- 1. 只有超过了1个令牌生产周期后的请求,expiredValues才会有值。-- 2. 以rate为3举例,如果之前发生了两个请求那么现在released为2,currentValue为1 + 2 = 3-- 以此可以看到,redisson的令牌桶放令牌操作是通过请求时间窗来做的,如果距离上一个请求的时间已经超过了一个令牌生产周期时间,那么令牌桶中的令牌应该得到重置,表示生产rate数量的令牌。-- 如果当前令牌数 < 请求的令牌数if tonumber(currentValue) < tonumber(ARGV[1]) then-- 从zset中找到距离当前时间最近的那个请求,也就是上一次放进去的请求信息local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1); local random, permits = struct.unpack("fI", nearest[1])-- 返回 上一次请求的时间戳 - (当前时间戳 - 令牌生成的时间间隔) 这个值表示还需要多久才能生产出足够的令牌return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)else-- 如果当前令牌数 ≥ 请求的令牌数,表示令牌够多,更新zsetredis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))-- valueName存的是当前总令牌数,-1表示取走一个redis.call("decrby", valueName, ARGV[1])return nilend
else-- set一个key-value数据 记录当前限流器的令牌数redis.call("set", valueName, rate)-- 建了一个以当前限流器名称相关的zset,并存入 以score为当前时间戳,以lua格式化字符串{当前时间戳为种子的随机数、请求的令牌数}为value的值。-- struct.pack第一个参数表示格式字符串,f是浮点数、I是长整数。所以这个格式字符串表示的是把一个浮点数和长整数拼起来的结构体。我的理解就是往zset里记录了最后一次请求的时间戳和请求的令牌数redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))-- 从总共的令牌数 减去 请求的令牌数。redis.call("decrby", valueName, ARGV[1])return nil
end
总结一下,redisson用了zset来记录请求的信息,这样可以非常巧妙的通过比较score,也就是请求的时间戳,来判断当前请求距离上一个请求有没有超过一个令牌生产周期。如果超过了,则说明令牌桶中的令牌需要生产,之前用掉了多少个就生产多少个,而之前用掉了多少个令牌的信息也在zset中保存了。
然后比较当前令牌桶中令牌的数量,如果足够多就返回了,如果不够多则返回到下一个令牌生产还需要多少时间。这个返回值特别重要。
接下来就是回到java代码,各个API入口点进去,最后都会调到org.redisson.RedissonRateLimiter#tryAcquireAsync(long, org.redisson.misc.RPromise<java.lang.Boolean>, long)
这个方法。我也拷出来做了简单的注释。
private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) {long s = System.currentTimeMillis();RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);future.onComplete((delay, e) -> {if (e != null) {promise.tryFailure(e);return;}if (delay == null) {//delay就是lua返回的 还需要多久才会有令牌promise.trySuccess(true);return;}//没有手动设置超时时间的逻辑if (timeoutInMillis == -1) {//延迟delay时间后重新执行一次拿令牌的动作commandExecutor.getConnectionManager().getGroup().schedule(() -> {tryAcquireAsync(permits, promise, timeoutInMillis);}, delay, TimeUnit.MILLISECONDS);return;}//el 请求redis拿令牌的耗时long el = System.currentTimeMillis() - s;//如果设置了超时时间,那么应该减去拿令牌的耗时long remains = timeoutInMillis - el;if (remains <= 0) {//如果那令牌的时间比设置的超时时间还要大的话直接就false了promise.trySuccess(false);return;}//比如设置的的超时时间为1s,delay为1500ms,那么1s后告知失败if (remains < delay) {commandExecutor.getConnectionManager().getGroup().schedule(() -> {promise.trySuccess(false);}, remains, TimeUnit.MILLISECONDS);} else {long start = System.currentTimeMillis();commandExecutor.getConnectionManager().getGroup().schedule(() -> {//因为这里是异步的,所以真正再次拿令牌之前再检查一下过去了多久时间。如果过去的时间比设置的超时时间大的话,直接falselong elapsed = System.currentTimeMillis() - start;if (remains <= elapsed) {promise.trySuccess(false);return;}//再次拿令牌tryAcquireAsync(permits, promise, remains - elapsed);}, delay, TimeUnit.MILLISECONDS);}});
}
再次总结一下,Java客户端拿到redis返回的下一个令牌生产完成还需要多少时间,也就是delay字段。如果这个delay为null,则表示成功获得令牌,如果没拿到,则过delay时间后通过异步线程再次发起拿令牌的动作。这里也可以看到,redisson的RateLimiter是非公平的,多个线程同时拿不到令牌的话并不保证先请求的会先拿到令牌。