当前位置: 首页 > news >正文

Nacos源码—8.Nacos升级gRPC分析五

大纲

7.服务端对服务实例进行健康检查

8.服务下线如何注销注册表和客户端等信息

9.事件驱动架构源码分析

7.服务端对服务实例进行健康检查

(1)服务端对服务实例进行健康检查的设计逻辑

(2)服务端对服务实例进行健康检查的源码

(3)服务端检查服务实例不健康后的注销处理

(1)服务端对服务实例进行健康检查的设计逻辑

一.首先会获取所有客户端的Connection连接对象

Connection连接对象里有个属性叫lastActiveTime,表示的是最后存活时间。

二.然后判断当前时间-最后存活时间是否大于20s

如果大于,则把该Connection连接对象的connectionId放入到一个集合里。这个集合是一个名为outDatedConnections的待移除集合Set,此时该Connection连接对象并不会马上删除。

三.当判断完全部的Connection连接对象后会遍历outDatedConnections集合

向遍历到的Connection连接对象发起一次请求,确认是否真的下线。如果响应成功,则往successConnections集合中添加connectionId,并且刷新Connection连接对象的lastActiveTime属性。这个机制有一个专业的名称叫做:探活机制。

四.遍历待移除集合进行注销并且在注销之前先判断一下是否探活成功

也就是connectionId存在于待移除集合outDatedConnections中,但是不存在于探活成功集合successConnections中,那么这个connectionId对应的客户端就会被注销掉。

(2)服务端对服务实例进行健康检查的源码

对服务实例进行健康检查的源码入口是ConnectionManager的start()方法。

@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {Map<String, Connection> connections = new ConcurrentHashMap<>();...//Start Task:Expel the connection which active Time expire.@PostConstructpublic void start() {//Start UnHealthy Connection Expel Task.RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {...//一.首先获取所有的连接Set<Map.Entry<String, Connection>> entries = connections.entrySet();...//二.然后判断客户端是否超过20s没有发来心跳信息了,如果是则会将clientId加入outDatedConnections集合中Set<String> outDatedConnections = new HashSet<>();long now = System.currentTimeMillis();for (Map.Entry<String, Connection> entry : entries) {Connection client = entry.getValue();String clientIp = client.getMetaInfo().getClientIp();AtomicInteger integer = expelForIp.get(clientIp);if (integer != null && integer.intValue() > 0) {integer.decrementAndGet();expelClient.add(client.getMetaInfo().getConnectionId());expelCount--;} else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {//判断心跳时间//添加到待移除列表outDatedConnections.add(client.getMetaInfo().getConnectionId());}}...//client active detection.//三.初次检测完超过20s的Connection连接对象后,并不会立马进行删除,而是进行探活,服务端主动请求客户端,来确认是否真的下线Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());if (CollectionUtils.isNotEmpty(outDatedConnections)) {Set<String> successConnections = new HashSet<>();final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());//遍历超过20s没有心跳的客户端clientIdfor (String outDateConnectionId : outDatedConnections) {try {Connection connection = getConnection(outDateConnectionId);if (connection != null) {ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();//调用GrpcConnection.asyncRequest()方法异步发送请求connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {@Overridepublic Executor getExecutor() {return null;}@Overridepublic long getTimeout() {return 1000L;}@Overridepublic void onResponse(Response response) {latch.countDown();if (response != null && response.isSuccess()) {//响应成功刷新心跳时间connection.freshActiveTime();//并且加入到探活成功的集合列表中successConnections.add(outDateConnectionId);}}@Overridepublic void onException(Throwable e) {latch.countDown();}});Loggers.REMOTE_DIGEST.info("[{}]send connection active request ", outDateConnectionId);} else {latch.countDown();}                            } catch (ConnectionAlreadyClosedException e) {latch.countDown();} catch (Exception e) {Loggers.REMOTE_DIGEST.error("[{}]Error occurs when check client active detection ,error={}", outDateConnectionId, e);latch.countDown();}}latch.await(3000L, TimeUnit.MILLISECONDS);Loggers.REMOTE_DIGEST.info("Out dated connection check successCount={}", successConnections.size());//经过探活还是不成功的Connection连接对象,就准备进行移除了//遍历20s没有心跳的客户端,准备移除客户端信息for (String outDateConnectionId : outDatedConnections) {//判断探活是否成功,如果成功了则不需要移除if (!successConnections.contains(outDateConnectionId)) {Loggers.REMOTE_DIGEST.info("[{}]Unregister Out dated connection....", outDateConnectionId);//执行客户端注销逻辑unregister(outDateConnectionId);}}}...}}, 1000L, 3000L, TimeUnit.MILLISECONDS);}...
}

(3)服务端检查服务实例不健康后的注销处理

进行注销处理的方法是ConnectionManager的unregister()方法。该方法主要会移除Connection连接对象 + 清除一些数据,以及发布一个ClientDisconnectEvent客户端注销事件。

@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {private Map<String, AtomicInteger> connectionForClientIp = new ConcurrentHashMap<String, AtomicInteger>(16);Map<String, Connection> connections = new ConcurrentHashMap<>();@Autowiredprivate ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;...//unregister a connection .public synchronized void unregister(String connectionId) {//移除客户端信息Connection remove = this.connections.remove(connectionId);if (remove != null) {String clientIp = remove.getMetaInfo().clientIp;AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);if (atomicInteger != null) {int count = atomicInteger.decrementAndGet();if (count <= 0) {connectionForClientIp.remove(clientIp);}}remove.close();Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);//通知客户端注销连接clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);}}...
}@Service
public class ClientConnectionEventListenerRegistry {...//notify where a new client disconnected.public void notifyClientDisConnected(final Connection connection) {for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {try {//调用ConnectionBasedClientManager.clientDisConnected()方法clientConnectionEventListener.clientDisConnected(connection);} catch (Throwable throwable) {Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}", clientConnectionEventListener.getName(), throwable);}}}...
}@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();...@Overridepublic boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);ConnectionBasedClient client = clients.remove(clientId);if (null == client) {return true;}client.release();//最后发布客户端注销事件NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));return true;}...
}

ClientDisconnectEvent客户端注销事件会被两个监听响应:一是ClientServiceIndexesManager的onEvent()方法用来移除注册表 + 订阅表信息,二是DistroClientDataProcessor的onEvent()方法用来同步服务实例被注销后的数据。

@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {Client client = event.getClient();for (Service each : client.getAllSubscribeService()) {//移除订阅者列表的元素removeSubscriberIndexes(each, client.getClientId());}for (Service each : client.getAllPublishedService()) {//移除注册表的元素removePublisherIndexes(each, client.getClientId());}}private void removePublisherIndexes(Service service, String clientId) {if (!publisherIndexes.containsKey(service)) {return;}publisherIndexes.get(service).remove(clientId);NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}private void removeSubscriberIndexes(Service service, String clientId) {if (!subscriberIndexes.containsKey(service)) {return;}subscriberIndexes.get(service).remove(clientId);if (subscriberIndexes.get(service).isEmpty()) {subscriberIndexes.remove(service);}}...
}public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {...@Overridepublic void onEvent(Event event) {...if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();//Only ephemeral data sync by Distro, persist client should sync by raft.//临时实例使用Distro协议,持久化实例使用Raft协议//ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//如果event是客户端注销实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//如果event是客户端注册实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}...
}

(4)总结

在Nacos 1.4.1版本中的服务健康检查:是15s没心跳则把健康状态修改为不健康,30s没心跳则把实例对象移除。

在Nacos 2.1.0版本中的服务健康检查:是20s没心跳则把客户端放入一个过期集合,此时并不移除客户端连接。由于引入了gRPC长连接,所以可以新增探活机制检查过期集合中的连接。服务端发送探活请求给客户端时的代价并不大,可确保客户端下线。

8.服务下线如何注销注册表和客户端等信息

(1)客户端发出服务下线请求的源码

(2)服务端处理服务下线请求的源码

(1)客户端发出服务下线请求的源码

Nacos客户端发起服务下线的入口在AbstractAutoServiceRegistration这个类之中,而AbstractAutoServiceRegistration是nacos-discovery中的类。

由于AbstractAutoServiceRegistration的destroy()方法被@PreDestroy修饰,所以当容器关闭时,会调用AbstractAutoServiceRegistration的destroy()方法。该方法最后会触发调用NacosNamingService的deregisterInstance()方法,然后调用NamingClientProxyDelegate的deregisterService()方法,接着调用NamingGrpcClientProxy的deregisterService()方法。和客户端发起服务注册一样,首先会创建请求参数对象,然后通过NamingGrpcClientProxy的requestToServer()方法发起请求,也就是调用RpcClient的request()方法发起gRPC请求进行服务下线。

public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {...@PreDestroypublic void destroy() {stop();}public void stop() {if (this.getRunning().compareAndSet(true, false) && isEnabled()) {deregister();if (shouldRegisterManagement()) {deregisterManagement();}this.serviceRegistry.close();}}protected void deregister() {this.serviceRegistry.deregister(getRegistration());}...
}public class NacosServiceRegistry implements ServiceRegistry<Registration> {...@Overridepublic void deregister(Registration registration) {log.info("De-registering from Nacos Server now...");if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No dom to de-register for nacos client...");return;}NamingService namingService = namingService();String serviceId = registration.getServiceId();String group = nacosDiscoveryProperties.getGroup();try {//调用NacosNamingService.deregisterInstance()方法namingService.deregisterInstance(serviceId, group, registration.getHost(), registration.getPort(), nacosDiscoveryProperties.getClusterName());} catch (Exception e) {log.error("ERR_NACOS_DEREGISTER, de-register failed...{},", registration.toString(), e);}log.info("De-registration finished.");}...
}@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
public class NacosNamingService implements NamingService {private NamingClientProxy clientProxy;...public NacosNamingService(Properties properties) throws NacosException {init(properties);}private void init(Properties properties) throws NacosException {...this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);}@Overridepublic void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException {Instance instance = new Instance();instance.setIp(ip);instance.setPort(port);instance.setClusterName(clusterName);deregisterInstance(serviceName, groupName, instance);}@Overridepublic void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {//调用NamingClientProxyDelegate.deregisterService()方法clientProxy.deregisterService(serviceName, groupName, instance);}...
}public class NamingClientProxyDelegate implements NamingClientProxy {private final NamingHttpClientProxy httpClientProxy;private final NamingGrpcClientProxy grpcClientProxy;...public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {...this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);}@Overridepublic void deregisterService(String serviceName, String groupName, Instance instance) throws NacosException {getExecuteClientProxy(instance).deregisterService(serviceName, groupName, instance);}private NamingClientProxy getExecuteClientProxy(Instance instance) {return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;}...
}public class NamingGrpcClientProxy extends AbstractNamingClientProxy {private final NamingGrpcRedoService redoService;...@Overridepublic void deregisterService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}", namespaceId, serviceName, instance);redoService.instanceDeregister(serviceName, groupName);doDeregisterService(serviceName, groupName, instance);}//Execute deregister operation.public void doDeregisterService(String serviceName, String groupName, Instance instance) throws NacosException {//创建请求参数对象  InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.DE_REGISTER_INSTANCE, instance);//向服务端发起请求requestToServer(request, Response.class);redoService.removeInstanceForRedo(serviceName, groupName);}private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {try {request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));//实际会调用RpcClient.request()方法发起gRPC请求Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {throw new NacosException(response.getErrorCode(), response.getMessage());}if (responseClass.isAssignableFrom(response.getClass())) {return (T) response;}NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());} catch (Exception e) {throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);}throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");}...
}

(2)服务端处理服务下线请求的源码

服务注册时请求参数InstanceRequest的类型是REGISTER_INSTANCE,服务下线时请求参数InstanceRequest的类型是DE_REGISTER_INSTANCE。

处理服务注册和服务下线的入口是InstanceRequestHandler的handle()方法,这个方法会触发调用InstanceRequestHandler的deregisterInstance()方法,也就是调用EphemeralClientOperationServiceImpl的deregisterInstance()方法。

在EphemeralClientOperationServiceImpl的deregisterInstance()方法中,会在移除Client对象中的instance信息时,发布ClientChangedEvent事件,然后接着发布客户端注销服务实例的事件ClientDeregisterServiceEvent。

其中ClientChangedEvent事件是用来同步数据给集群节点的,ClientDeregisterServiceEvent事件是用来移除注册表 + 订阅表的服务实例。移除注册表 + 订阅表的服务实例时,还会发布ServiceChangeEvent事件,ServiceChangeEvent事件是用来通知订阅了该服务的Nacos客户端的。同理,服务注册时其实也会发布类似的三个事件。

@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {private final EphemeralClientOperationServiceImpl clientOperationService;public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {this.clientOperationService = clientOperationService;}@Override@Secured(action = ActionTypes.WRITE)public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {//根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE://注册实例return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE://注销实例return deregisterInstance(service, request, meta);default:throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));}}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {//调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;//参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名//参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息//参数meta.getConnectionId():这个参数很关键,它是连接IDclientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {//调用EphemeralClientOperationServiceImpl的注销方法deregisterInstance()clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);}
}@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {private final ClientManager clientManager;...@Overridepublic void deregisterInstance(Service service, Instance instance, String clientId) {if (!ServiceManager.getInstance().containSingleton(service)) {Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", service);return;}//从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象Service singleton = ServiceManager.getInstance().getSingleton(service);//从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//调用AbstractClient.removeServiceInstance()方法//移除Client对象中的instance信息并发布ClientChangedEvent事件来同步集群节点InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);client.setLastUpdatedTime();if (null != removedInstance) {//发布客户端注销服务实例的事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));}}...
}public abstract class AbstractClient implements Client {//publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务//存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象//key为已注册的Service,value是根据请求中的instance实例信息封装的InstancePublishInfo对象protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);...@Overridepublic InstancePublishInfo removeServiceInstance(Service service) {InstancePublishInfo result = publishers.remove(service);if (null != result) {MetricsMonitor.decrementInstanceCount();//发布客户端改变事件,用于处理集群间的数据同步NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));}Loggers.SRV_LOG.info("Client remove for service {}, {}", service, getClientId());return result;}...
}

http://www.xdnf.cn/news/414487.html

相关文章:

  • 【K8S学习之生命周期钩子】详细了解 postStart 和 preStop 生命周期钩子
  • 【日撸 Java 三百行】Day 13(链表)
  • 【AIGC梦幻婚纱美学】:白纱与花卉的浪漫算法融合
  • 2025-5-12 底部埋伏记录
  • Matlab 基于GUI的图像去雾技术GlobalHisteq、LocalHisteq和Retinex
  • 基于世界土壤数据库(HWSD)的中国土壤数据集(v1.1)(2009)
  • 大核极坐标码
  • vulhub-Stapler
  • 耀圣-高温釜进料口气动耐磨切断球阀:高粘度、高腐蚀颗粒介质的终极进料解决方案
  • DeepSeek | AI需求分析
  • 手机电池健康提示怎么看?
  • 封装echarts的柱状图+折线图+堆积图
  • 使用 Watt toolkit 加速 git clone
  • 栈和队列复习(C语言版)
  • 判断一个数组有没有重复值
  • k8s监控方案实践(三):部署与配置Grafana可视化平台
  • 【Redis】键值对数据库实现
  • Tenacity 高级使用指南:Python 重试机制的终极解决方案
  • 使用ACE-Step在本地生成AI音乐
  • 基于大模型预测的多发性硬化综合诊疗方案研究报告大纲
  • 棉花杂草检测数据集VOC+YOLO格式4279张2类别
  • 时空注意力机制深度解析:理论、技术与应用全景
  • 【笔试训练】给一个数组构建二叉树|从前序遍历与中序遍历构建二叉树|二叉树中的最大路径和
  • Windows远程桌面实现之十七:基于浏览器的文件和目录传输(二)
  • C++舆情监控爬虫程序实现
  • [特殊字符] 本地部署DeepSeek大模型:安全加固与企业级集成方案
  • 利用SSRF击穿内网!kali靶机实验
  • 嵌入式gcc编译生产的.d 和 .o文件是什么文件?
  • dotnet-hosting-2.2.8-win安装步骤指南
  • 【操作系统】零拷贝技术