当前位置: 首页 > news >正文

限流系列:sentinel

目录

滑动窗口算法

Sentinel

数据模型

示例

大致流程

​​​​​​​entry

​​​​​​​entryWithPriority

​​​​​​​FlowSlot.entry

​​​​​​​checkFlow

​​​​​​​canPass

​​​​​​​avgUsedTokens

​​​​​​​passQps

​​​​​​​pass

​​​​​​​currentWindow

calculateTimeIdx

​​​​​​​calculateWindowStart

​​​​​​​values


滑动窗口算法

       滑动窗口算法是将时间周期分为n个小周期,分别记录每个小周期内的访问次数,并且根据时间滑动删除过期的小周期。

       如下图,假设时间周期为1min,将1min再分割成2个小周期,统计每个小周期的访问数量,则可以看到,第一个时间周期内访问数量为75,第二个时间周期内访问数量为100,超过100的数量被限流掉了。


       由此可见,当滑动窗口格子划分得越多,那么滑动窗口的滚动就越平滑,限流的统计就越精确。可以很好的解决固定窗口的流动问题。

Sentinel

       滑动窗口算法也是Sentinel的默认算法。

​​​​​​​数据模型

英文名称

中文名称

备注

array

窗口数组

windowLengthInMs

单个窗口时间长度

sampleCount

总窗口数量

intervalInMs

时间窗口总长度

sampleCount * windowLengthInMs

示例

public static void main(String[] args) throws Exception {
    //加载流控规则
    initFlowRules();
    for (int i = 0; i < 5; i++) {
        Thread.sleep(200);
        Entry entry = null;
        try {
            entry = SphU.entry("sayHello");
            //被保护的逻辑
            log.info("访问sayHello资源");
        } catch (Exception ex) {
            log.info("被流量控制了,可以进行降级处理");
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
    }
}
private static void initFlowRules() {
    List<FlowRule> rules = new ArrayList<>();
    // 创建一个流控规则
    FlowRule rule = new FlowRule();
    // 对sayHello这个资源限流
    rule.setResource("sayHello");
    // 基于qps限流
    rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
    // qps最大为2,超过2就要被限流
    rule.setCount(2);
    rules.add(rule);
    // 设置规则
    FlowRuleManager.loadRules(rules);
}

大致流程

    点击示例里的entry()方法,如下所示:

​​​​​​​entry

public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
    return entryWithPriority(resourceWrapper, count, false, args);
}

        点击entryWithPriority()方法,如下所示:

​​​​​​​entryWithPriority

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
    .. ...
    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        // This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}

       点击chain.entry()方法,因为我们这次探究的是限流算法,所以选择FlowSlot类,如下所示:

​​​​​​​FlowSlot.entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

        点击checkFlow()方法,如下所示:

​​​​​​​checkFlow

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {

        if (ruleProvider == null || resource == null) {

            return;

        }

        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());

        if (rules != null) {

            for (FlowRule rule : rules) {

                if (!canPassCheck(rule, context, node, count, prioritized)) {

                    throw new FlowException(rule.getLimitApp(), rule);

                }

            }

        }

}

​​​​​​​canPass

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    int curCount = avgUsedTokens(node);
    if (curCount + acquireCount > count) {
        ... ...
        return false;
    }
    return true;
}

       在这里,获取已经使用的token数量,加上待申请的数量,如果超过流控规则里设置的最大值,则返回false。

       点击avgUsedTokens()方法,如下所示:

​​​​​​​avgUsedTokens

private int avgUsedTokens(Node node) {

        if (node == null) {

            return DEFAULT_AVG_USED_TOKENS;

        }

        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());

}

       在这里,通过调用node.passQps()获取已经使用的token数量。

       点击passQps(),如下所示:

​​​​​​​passQps

public double passQps() {
    return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}

       在这里,rollingCounterInSecond对象保存了时间窗口对象的数组。pass()方法可以获取每个有效的时间窗口对象里已经使用的令牌数量,getWindowIntervalInSec()方法是时间窗口总长度,以秒为单位。两者相除就可以已使用的QPS。

       点击pass()方法,如下所示:

​​​​​​​pass

public long pass() {

        data.currentWindow();

        long pass = 0;

        List<MetricBucket> list = data.values();

        for (MetricBucket window : list) {

            pass += window.pass();

        }

        return pass;

    }

       在这里,会调用currentWindow()刷新当前窗口信息,然后累加每个窗口的计数值作为当前计数周期的计数值。

       点击currentWindow()方法,如下所示:

​​​​​​​currentWindow

public WindowWrap<T> currentWindow(long timeMillis) {

    int idx = calculateTimeIdx(timeMillis);

    long windowStart = calculateWindowStart(timeMillis);

    while (true) {

        WindowWrap<T> old = array.get(idx);

        if (old == null) {

            // 创建新窗口

            WindowWrap<T> window = new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket());

            if (array.compareAndSet(idx, null, window)) {

                return window;

            }

        } else if (windowStart == old.windowStart()) {

            // 命中当前有效窗口

            return old;

        } else if (windowStart > old.windowStart()) {

            // 窗口已过期,重置并复用

            if (updateLock.tryLock()) {

                try {

                    return resetWindowTo(old, windowStart);

                } finally {

                    updateLock.unlock();

                }

            }

        }

    }

}

在这里:

  1. 获取当前时间窗口的索引
  2. 获取当前窗口的起始时间
  3. 根据当前时间窗口的索引,获取时间窗口对象,如果时间窗口对象为空,则创建一个新时间窗口对象;如果已经存在时间窗口对象,则返回该对象;如果时间窗口对象已过期,则重置并复用。

calculateTimeIdx

public int calculateTimeIdx(long timeMillis) {

    long timeId = timeMillis / windowLengthInMs;

    return (int)(timeId % array.length());

}

       在这里,根据当前时间戳计算对应窗口索引。

​​​​​​​calculateWindowStart

protected long calculateWindowStart(long timeMillis) {

    return timeMillis - timeMillis % windowLengthInMs;

}

       在这里,计算当前窗口的起始时间(对齐到窗口边界)。

       点击步骤pass的values()方法,如下所示:

​​​​​​​values

public List<T> values() {
    return values(TimeUtil.currentTimeMillis());
}
public List<T> values(long timeMillis) {
    if (timeMillis < 0) {
        return new ArrayList<T>();
    }
    int size = array.length();
    List<T> result = new ArrayList<T>(size);
    for (int i = 0; i < size; i++) {
        WindowWrap<T> windowWrap = array.get(i);
        if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
            continue;
        }
        result.add(windowWrap.value());
    }
    return result;
}

        在这里,循环时间窗口数组,忽略已经失效的时间窗口对象,将有效的时间窗口对象保存在一个列表对象里,并作为方法返回值进行返回。

​​​​​​​isWindowDeprecated

public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
    return time - windowWrap.windowStart() > intervalInMs;
}

        在这里,将当前时间减去指定时间窗口对象的起始时间,如果结果大于计数周期时长,则表明指定的时间窗口对象已经失效。

http://www.xdnf.cn/news/656191.html

相关文章:

  • 哈希表基础知识
  • 选择SEO公司时需要注意哪些关键指标?
  • 多模态大语言模型arxiv论文略读(九十二)
  • 2025.05.26【Wordcloud】词云图绘制技巧
  • pkg-config的功能与作用说明
  • jeecg-boot vue点击左侧菜单跳转无菜单栏的全屏页面
  • PostgreSQL日志管理完整方案(AI)
  • 学习心得(14--16)
  • 使用 Vuex 实现用户注册与登录功能
  • HTML流星雨
  • 充电枪IEC62196/EN 62196测试内容
  • 【PC网上邻居--1】基于Samba协议的局域网文件共享系统设计与实现
  • 行为型:责任链模式
  • 【DCCN】多模态情感分析解耦交叉属性关联网络
  • java虚拟机
  • 第11章 标准化和软件知识产权基础知识,多媒体、图像相关
  • 认识微服务
  • 区块链与Web3:如何有效保障个人数据安全
  • 扩容的未来:Web3 并行计算赛道全景图谱
  • moviepy视频添加中文水印
  • 网络原理 | TCP与UDP协议的区别以及回显服务器的实现
  • 热门大型语言模型(LLM)应用开发框架
  • yolov8分割任务的推理和后处理解析
  • HMI仿真报错
  • 【MATLAB例程】声纳信号处理与目标测距的程序|信号频率、信噪比、采样率和声速均可自行调整|附代码下载链接
  • 【Bug】--node命令加载失败
  • 安卓端智能耗材柜系统可行性方案(基于uniapp + Vue3)
  • 数据的六个特性以及由于独特性产生的一些有趣的想法
  • 【C/C++】基于 Docker 容器运行的 Kafka + C++ 练手项目
  • Vue条件渲染