Spring Cloud Ribbon 核心原理
Spring Cloud Ribbon 是一个客户端负载均衡框架,可以结合 Spring Cloud Eureka 使用。
主要特点
• 负载均衡: Ribbon 提供了多种内置的负载均衡策略(如轮询、随机等),也可以自定义这些策略,以满足不同的业务需求。
• 与 Eureka 集成: Ribbon 可以无缝地与 Spring Cloud Eureka 集成,自动从 Eureka Server 获取服务列表,并根据负载均衡策略选择合适的服务实例进行调用。
• 重试机制: 支持配置请求失败后的重试逻辑,提高系统可靠性。
简单使用示例
pom.xml 添加依赖:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
在 application.yml 中配置 Eureka 客户端,注册到 Eureka Server:
eureka:client:serviceUrl:defaultZone: http://localhost:8761/eureka/
创建一个 RestTemplate Bean,并通过 @LoadBalanced 注解来启用 Ribbon 负载均衡:
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;@Configuration
publicclassConfig {@Bean@LoadBalancedpublic RestTemplate restTemplate() {returnnewRestTemplate();}
}
使用 RestTemplate 进行服务调用:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;@Service
publicclassGreetingService {privatefinal RestTemplate restTemplate;@AutowiredpublicGreetingService(RestTemplate restTemplate) {this.restTemplate = restTemplate;}public String getGreeting() {// 注意这里直接使用服务名return restTemplate.getForObject("http://greeting-service/api/greeting", String.class);}
}
核心原理
下文源码基于 spring-cloud-starter-netflix-ribbon : 2.1.6.RELEASE
核心组件
• ILoadBalancer:这是 Ribbon 中用于管理服务实例列表并根据某种策略选择实例的核心接口。它负责维护所有已知的服务实例,并提供方法来获取这些实例。
• IRule:定义了从可用的服务实例列表中选择一个实例的逻辑。Ribbon 提供了多种规则实现,如轮询(RoundRobinRule)、随机(RandomRule)等。
• IPing:用于检查服务实例是否存活。通过定期 ping 服务实例来确定它们的状态是 UP 还是 DOWN 。
• ServerListUpdater:负责更新服务实例列表。当使用与服务发现组件(如 Eureka )集成时,它会定期刷新服务器列表以反映最新的服务状态。
// BaseLoadBalancer 继承自 AbstractLoadBalancer。 AbstractLoadBalancer是实现了 ILoadBalancer 接口抽象基类publicclassBaseLoadBalancerextendsAbstractLoadBalancerimplementsPrimeConnections.PrimeConnectionListener, IClientConfigAware {// 默认负载均衡规则:轮询策略privatefinalstaticIRuleDEFAULT_RULE=newRoundRobinRule();// 负载均衡规则protectedIRulerule= DEFAULT_RULE;// 默认 Ping 策略:SerialPingStrategy,按顺序逐一 Ping 所有 ServerprivatefinalstaticSerialPingStrategyDEFAULT_PING_STRATEGY=newSerialPingStrategy();// Ping 策略protectedIPingStrategypingStrategy= DEFAULT_PING_STRATEGY;// IPing 接口:定义如何判断 Server 是否“存活”。比如 NIWSDiscoveryPing:结合 Eureka 注册中心判断是否在线protectedIPingping=null;// allServerList:所有已知的 Server 列表(包括 Up 和 Down)@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)protectedvolatile List<Server> allServerList = Collections.synchronizedList(newArrayList<Server>());// upServerList:当前健康的 Server 列表@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)protectedvolatile List<Server> upServerList = Collections.synchronizedList(newArrayList<Server>());// (省略其他)...}// DynamicServerListLoadBalancer 继承自 BaseLoadBalancer// 扩展功能:支持动态更新服务列表,适用于服务注册中心场景(如 Eureka、Consul 等)// DynamicServerListLoadBalancer 的工作流程:// [定时任务/事件通知] -> ServerListUpdater // -> 触发updateAction.doUpdate() // -> updateListOfServers()// -> serverListImpl.getServerList() 获取原始服务列表// -> filter.getFilteredListOfServers(...) 过滤 // -> BaseLoadBalancer.setServers(List<Server>) 更新内部服务实例列表publicclassDynamicServerListLoadBalancer<T extendsServer> extendsBaseLoadBalancer {// 负责从服务注册中心拉取最新的服务实例列表。支持定期刷新机制,保证本地缓存与注册中心一致// ServerList 常见实现:DiscoveryEnabledNIWSServerList(结合 Eureka 使用)、ConfigurationBasedServerList(基于静态配置的服务列表)volatile ServerList<T> serverListImpl;// 服务列表过滤器,用于在从注册中心获取原始服务列表后,对其进行筛选,只保留符合要求的实例,例如:剔除不健康的节点、按 Zone 过滤等。// ServerListFilter 常见实现:ZoneAffinityServerListFilter(根据区域亲和性过滤)、NoOpServerListFilter(不过滤)volatile ServerListFilter<T> filter;// UpdateAction 接口定义了当需要更新服务列表时要执行的动作,由 ServerListUpdater 触发调用protectedfinal ServerListUpdater.UpdateActionupdateAction=newServerListUpdater.UpdateAction() {@OverridepublicvoiddoUpdate() {// 实际更新服务列表updateListOfServers();}};// 负责定时触发服务列表的更新// 主要实现:PollingServerListUpdater(默认实现,通过定时任务周期性地更新服务列表)protectedvolatile ServerListUpdater serverListUpdater;// (省略其他)...}publicinterfaceServerList<T extendsServer> {public List<T> getInitialListOfServers();/*** Return updated list of servers. This is called say every 30 secs* (configurable) by the Loadbalancer's Ping cycle**/public List<T> getUpdatedListOfServers();}
组件关系:
工作流程
• 初始化阶段:
• 拦截器注册:RestTemplate 上的 @LoadBalanced 注解会触发相应的自动配置逻辑,这些逻辑包括注册一个特殊的拦截器 —— LoadBalancerInterceptor,这个拦截器会在每次使用 RestTemplate 发起HTTP请求时被调用,实现与负载均衡器集成,使得我们可以直接使用服务名而不是具体的 URL 来发起 HTTP 请求。LoadBalancerInterceptor 内部使用了 LoadBalancerClient 接口的实现类(如 RibbonLoadBalancerClient)来解析服务名并选择合适的服务实例,在这个过程中,它利用了 ILoadBalancer 、IRule 等组件来进行负载均衡决策。
• 负载均衡组件初始化:创建 ILoadBalancer 实例,实例内部会初始化一个 ServerList,这通常是通过服务发现组件(如 Eureka )获取的一组服务实例。同时,依据配置或默认设置,初始化 IRule 和 IPing 对象。
• 服务调用阶段:
• 当通过 RestTemplate 发起一个 HTTP 请求,并且 URL 中的 host 部分是一个服务名而不是具体的 IP 地址或域名时,会触发 LoadBalancerInterceptor 拦截器。LoadBalancerInterceptor 会将服务名传递给 LoadBalancerClient,后者内部会通过 ILoadBalancer 获取该服务的所有可用实例列表(通过查询 Eureka 或其他服务发现机制获取)。ILoadBalancer 使用 IPing 来过滤掉不可用的服务实例,然后应用 IRule 负载均衡策略(默认是轮询)来从剩余的可用服务实例中选择一个实例。
• 选定实例后,最后会将原始请求的 URL 替换为所选实例的具体地址( IP + 端口),然后执行 HTTP 请求。
• 健康检查与更新:
• IPing会定期检查每个服务实例的健康状况,并将不健康的实例标记为不可用。
• ServerListUpdater也会定期更新服务实例列表,确保其反映最新的服务状态。
public classLoadBalancerInterceptorimplementsClientHttpRequestInterceptor {private LoadBalancerClient loadBalancer;// (省略其他) ...@Overridepublic ClientHttpResponse intercept(final HttpRequest request, final byte[] body,final ClientHttpRequestExecution execution)throws IOException {// 获取原始请求的 URIfinalURIoriginalUri= request.getURI();// 从 URI 中提取主机名部分作为服务名。在微服务架构中,这个“主机名”实际上是服务的逻辑名称,而非实际的物理地址。StringserviceName= originalUri.getHost();Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);// 通过 LoadBalancerClient 执行负载均衡逻辑returnthis.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));}
}publicclassRibbonLoadBalancerClientimplementsLoadBalancerClient {// (省略其他) .../*** 执行负载均衡策略, 然后发起请求*/@Overridepublic <T> T execute(String serviceId, LoadBalancerRequest<T> request)throws IOException {// 获取 ILoadBalancer 实例(比如:继承了DynamicServerListLoadBalancer 的 ZoneAwareLoadBalancer )// ILoadBalancer 对象包含了特定服务的所有可用实例及其健康状态等信息ILoadBalancerloadBalancer= getLoadBalancer(serviceId);// 从 loadBalancer 中选择一个服务实例。此方法内部会通过 ILoadBalancer 使用 IRule 策略来选择具体的 Server 实例(即服务实例)。// IRule 定义了不同的负载均衡算法,例如轮询(RoundRobinRule)、随机(RandomRule)等。默认情况下,使用的是轮询策略。Serverserver= getServer(loadBalancer);// 如果没有可用的服务实例,则抛出异常if (server == null) {thrownewIllegalStateException("No instances available for " + serviceId);}// 创建一个 RibbonServer 对象,封装了服务ID、选定的服务实例、安全标志以及元数据信息RibbonServerribbonServer=newRibbonServer(serviceId, server, isSecure(server,serviceId), serverIntrospector(serviceId).getMetadata(server));// 实际执行请求return execute(serviceId, ribbonServer, request);}/*** 对服务实例发起请求* @param serviceId 服务名,如 greeting-service* @param serviceInstance 中的服务实例,通常为 RibbonServer 类型* @param request 封装了实际请求逻辑的函数式接口*/public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)throws IOException {Serverserver=null;// 获取 Server 实例,包含了服务实例的主机名、端口等信息if(serviceInstance instanceof RibbonServer) {server = ((RibbonServer)serviceInstance).getServer();}if (server == null) {thrownewIllegalStateException("No instances available for " + serviceId);}// 获取负载均衡上下文,该上下文对象封装了 Ribbon 的核心组件(如 ILoadBalancer, IClientConfig, ServerIntrospector 等),并在执行请求时提供上下文支持。RibbonLoadBalancerContextcontext=this.clientFactory.getLoadBalancerContext(serviceId);// 创建统计记录器,用于记录请求的成功、失败、响应时间等统计信息。它会更新 LoadBalancerStats 中的数据,供后续的健康检查、负载均衡策略使用。RibbonStatsRecorderstatsRecorder=newRibbonStatsRecorder(context, server);try {// 实际执行 HTTP 请求的逻辑TreturnVal= request.apply(serviceInstance);statsRecorder.recordStats(returnVal);return returnVal;}// catch IOException and rethrow so RestTemplate behaves correctlycatch (IOException ex) {statsRecorder.recordStats(ex);throw ex;}catch (Exception ex) {statsRecorder.recordStats(ex);ReflectionUtils.rethrowRuntimeException(ex);}returnnull;}
}
流程总结: