当前位置: 首页 > ops >正文

Spring Cloud Alibaba Sentinel 基本工作原理源码阅读

SpiLoader

SPI 是一种服务发现机制,它允许第三方为某个接口提供实现,而这些实现可以在运行时被发现和加载。简单来说,就是定义一个接口,然后允许别人提供这个接口的具体实现,而主程序不需要在编译时就知道这些实现类的具体细节,只在运行时去发现和加载它们。 Sentinel 并没有直接使用 JDK 内置的 ServiceLoader,而是自己实现了一套 SpiLoader。这主要是因为 Sentinel 的 SpiLoader 提供了更灵活的控制,例如:

排序 (@Spi 注解的 order 属性): 可以控制加载的实现类的优先级,从而决定它们在 Slot Chain 中的执行顺序。

单例 (@Spi 注解的 isSingleton 属性): 可以指定某个 SPI 实现是否是单例模式。

按需加载: 更好地控制加载时机。

控制台通讯

服务端与控制台初始化都是通过InitFunc来实现,InitFunc是一个接口,服务加载是通过SpiLoader来加载。默认懒加载在第一次调用时加载Env类静态块执行,如果配置了非懒加载在自动装配类SentinelWebAutoConfiguration的init方法也会启动时加载

public class Env {public static final Sph sph = new CtSph();static {// If init fails, the process will exit.InitExecutor.doInit();}}

这里doInit()方法会获取所有配置的InitFunc服务进行初始化。

            List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();List<OrderWrapper> initList = new ArrayList<OrderWrapper>();for (InitFunc initFunc : initFuncs) {RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());insertSorted(initList, initFunc);}for (OrderWrapper w : initList) {w.func.init();//调用init方法初始化RecordLog.info("[InitExecutor] Executing {} with order {}",w.func.getClass().getCanonicalName(), w.order);}

在sentinel-transport-common-1.8.5.jar包的META-INF/services下就定义了两个和控制台通讯相关的InitFunc:CommandCenterInitFunc和HeartbeatSenderInitFunc。

心跳检测

HeartbeatSenderInitFunc主要建立应用控制台和应用程序直接的心跳检测。将客户端的消息通讯地址发送给控制台。这个在控制台的机器列表可以看到对应信息。

来看HeartbeatSenderInitFunc.init()方法:

    public void init() {//这里还是通过SpiLoader获取HeartbeatSender的服务实例HeartbeatSender的服务实例 sender = HeartbeatSenderProvider.getHeartbeatSender();if (sender == null) {RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");return;}//初始化定时线程池ScheduledThreadPoolExecutorinitSchedulerIfNeeded();long interval = retrieveInterval(sender);setIntervalIfNotExists(interval);//启动定时发送心跳scheduleHeartbeatTask(sender, interval);}

启动定时方法:

    private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {pool.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {sender.sendHeartbeat();} catch (Throwable e) {RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);}}}, 5000, interval, TimeUnit.MILLISECONDS);RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "+ sender.getClass().getCanonicalName());}

这里看到每隔5秒发送一次心跳包。

HeartbeatSender的spi在sentinel-transport-simple-http-1.8.5.jar包中定义实现为SimpleHttpHeartbeatSender。来看下具体心跳包发送内容。

SimpleHttpHeartbeatSender.sendHeartbeat()主要代码如下

    public boolean sendHeartbeat() throws Exception {//获取控制台地址Endpoint addrInfo = getAvailableAddress();//构造请求对象SimpleHttpRequest request = new SimpleHttpRequest(addrInfo, TransportConfig.getHeartbeatApiPath());//设置心跳报文request.setParams(heartBeat.generateCurrentMessage());//发送心跳请求SimpleHttpResponse response = httpClient.post(request);     return false;}

这里heartBeat变量是HeartbeatMessage类型

HeartbeatMessage#generateCurrentMessage()

    public HeartbeatMessage() {message.put("hostname", HostNameUtil.getHostName());message.put("ip", TransportConfig.getHeartbeatClientIp());message.put("app", AppNameUtil.getAppName());// Put application type (since 1.6.0).message.put("app_type", String.valueOf(SentinelConfig.getAppType()));message.put("port", String.valueOf(TransportConfig.getPort()));}    
public Map<String, String> generateCurrentMessage() {// Version of Sentinel.message.put("v", Constants.SENTINEL_VERSION);// Actually timestamp.message.put("version", String.valueOf(TimeUtil.currentTimeMillis()));message.put("port", String.valueOf(TransportConfig.getPort()));return message;}

这里将本服务的ip和端口信息发送给console,这样控制台就可以与应用程序通过该端口进行通讯。

数据通讯

应用端通讯服务初始化是通过CommandCenterInitFunc。

CommandCenterInitFunc.init()方法调用CommandCenterProvider.getCommandCenter()获取CommandCenter。最后通过SpiLoader.of(CommandCenter.class).loadHighestPriorityInstance();还是SPI机制。这个默认配置在 sentinel-transport-simple-http-1.8.5.jar包中

默认的CommandCenter是SimpleHttpCommandCenter。

SimpleHttpCommandCenter的start()方法会启动一个ServerSocket来和sentinel的console来进行通讯。默认端口是8719,可通过csp.sentinel.api.port指定。这样console会不断从应用拉取流量控制数据,并且在console端配置新的流量规则可以推送到应用端。

有请求指令到来时会交给HttpEventTask来执行具体指令,不同的指令由不同的CommandHandler实现类来处理,像修改流量控制会使用ModifyRulesCommandHandler来处理。ModifyRulesCommandHandler.handler()方法根据不同的流控规则来更新内存控制规则。

ModifyRulesCommandHandler.handler()

    public CommandResponse<String> handle(CommandRequest request) {String type = request.getParam("type");// rule data in get parameterString data = request.getParam("data");...String result = "success";if (FLOW_RULE_TYPE.equalsIgnoreCase(type)) {List<FlowRule> flowRules = JSONArray.parseArray(data, FlowRule.class);//流量控制规则刷新FlowRuleManager.loadRules(flowRules);if (!writeToDataSource(getFlowDataSource(), flowRules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (AUTHORITY_RULE_TYPE.equalsIgnoreCase(type)) {List<AuthorityRule> rules = JSONArray.parseArray(data, AuthorityRule.class);//黑白名单刷新AuthorityRuleManager.loadRules(rules);if (!writeToDataSource(getAuthorityDataSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (DEGRADE_RULE_TYPE.equalsIgnoreCase(type)) {List<DegradeRule> rules = JSONArray.parseArray(data, DegradeRule.class);//降级规则刷新DegradeRuleManager.loadRules(rules);if (!writeToDataSource(getDegradeDataSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);} else if (SYSTEM_RULE_TYPE.equalsIgnoreCase(type)) {List<SystemRule> rules = JSONArray.parseArray(data, SystemRule.class);//系统规则刷新SystemRuleManager.loadRules(rules);if (!writeToDataSource(getSystemSource(), rules)) {result = WRITE_DS_FAILURE_MSG;}return CommandResponse.ofSuccess(result);}return CommandResponse.ofFailure(new IllegalArgumentException("invalid type"));}

流量控制工作原理

在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain)链表。

springboot自动装配通过SentinelWebAutoConfiguration类自动装配SentinelWebInterceptor。SentinelWebInterceptor实现了HandlerInterceptor接口,是一个拦截器,在请求到达controller之前通过preHandle()可以执行自定义操作。

AbstractSentinelInterceptor#preHandle()

    @Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)throws Exception {try {//请求资源名称String resourceName = getResourceName(request);if (StringUtil.isEmpty(resourceName)) {return true;}if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {return true;}// Parse the request origin using registered origin parser.String origin = parseOrigin(request);String contextName = getContextName(request);ContextUtil.enter(contextName, origin);//最重要的方法,内部调用不同的功能slot进行流量控制Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);return true;} catch (BlockException e) {//如果发生BlockException,则触发流量控制规则try {handleBlockException(request, response, e);} finally {ContextUtil.exit();}return false;}}

SphU.entry()

SphU的默认实例是CtSph,最后会进入其entryWithPriority()方法

    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)throws BlockException {Context context = ContextUtil.getContext();if (context instanceof NullContext) {// The {@link NullContext} indicates that the amount of context has exceeded the threshold,// so here init the entry only. No rule checking will be done.return new CtEntry(resourceWrapper, null, context);}if (context == null) {// Using default context.context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);}// Global switch is close, no rule checking will do.if (!Constants.ON) {return new CtEntry(resourceWrapper, null, context);}//获取执行链ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);/** Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},* so no rule checking will be done.*/if (chain == null) {return new CtEntry(resourceWrapper, null, context);}Entry e = new CtEntry(resourceWrapper, chain, context);try {chain.entry(context, resourceWrapper, null, count, prioritized, args);} catch (BlockException e1) {e.exit(count, args);throw e1;} catch (Throwable e1) {// This should not happen, unless there are errors existing in Sentinel internal.RecordLog.info("Sentinel unexpected exception", e1);}return e;}

lookProcessChain()方法通过SlotChainProvider.newSlotChain()实例slot链,最后通过

SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault().build()

使用SPI机制从classpath中加载默认的slot链。

在sentinel-core-1.8.5.jar包中有对应的默认配置文件

META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot

默认配置内容

# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

每一个slot都有不同的作用,作为功能链表依次进行调用。

  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;

  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;

  • StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;

  • FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;

  • AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;

  • DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;

  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;

http://www.xdnf.cn/news/16048.html

相关文章:

  • BQ4050RSMR DIOTEC高精度锂电池保护与电量监测芯片 集成保护+计量+通信
  • AWS Lambda IoT数据处理异常深度分析:从告警到根因的完整排查之路
  • 快手DHPS:国内首个实现基于RDMA 通信的可负载均衡高性能服务架构!
  • 设计汽车集群电源
  • 前端资源缓存优化案例:深入探讨 Nginx 配置中的 Cache-Control 头部叠加问题
  • 一次Oracle集群脑裂问题分析处理
  • 耐达讯自动化EtherCAT转RS232:示波器连接的“开挂秘籍”
  • pig cloud框架中引入websocket
  • Android Camera openCamera
  • Node.js 倒计时图片服务部署与 Nginx 反向代理实战总结
  • OneTwoVLA——基于π0实现类π0.5:一个模型中完成原来双系统下的慢思考、快执行,且具备自适应推理能力和自我纠错能力
  • Java 大视界 -- Java 大数据机器学习模型在电商产品销量预测与库存优化管理中的应用(359)
  • OpenCV 零基础到项目实战 | DAY 2:图像预处理全解析
  • 基于JSP的高校寝室综合管理系统/宿舍管理系统
  • 【JavaSE】正则表达式学习笔记
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 主页-最近七天微博发布量实现
  • PetaLinux 使用技巧与缓存配置
  • Docker 容器中的 HEAD 请求缺失 header?从 Content-MD5 缺失聊起
  • 亚马逊云科技 上海AI研究院 突然解散 | AI早报
  • MatchResult
  • docker-desktop启动失败
  • <PLC><汇川><算法>基于汇川PLC,实现给定数组的“子集求和”算法
  • 技能系统详解(4)——运动表现
  • Day 18:推断聚类后簇的类型
  • 17.VRRP技术
  • rabbitmq 03
  • HTTP 协议常见字段(请求头/响应头)
  • 按键精灵脚本:自动化利刃的双面性 - 从技术原理到深度实践与反思
  • 大型语言模型(Large Language Models,LLM)
  • 循环神经网络--NLP基础