当前位置: 首页 > news >正文

Nacos源码—9.Nacos升级gRPC分析七

大纲

10.gRPC客户端初始化分析

11.gRPC客户端的心跳机制(健康检查)

12.gRPC服务端如何处理客户端的建立连接请求

13.gRPC服务端如何映射各种请求与对应的Handler处理类

14.gRPC简单介绍

10.gRPC客户端初始化分析

(1)gRPC客户端代理初始化的源码

(2)gRPC客户端启动的源码

(3)gRPC客户端发起与服务端建立连接请求的源码

(1)gRPC客户端代理初始化的源码

Nacos客户端注册服务实例时会调用NacosNamingService的registerInstance()方法,接着会调用NamingClientProxyDelegate的registerService()方法,然后判断注册的服务实例是不是临时的。如果注册的服务实例是临时的,那么就使用gRPC客户端代理去进行注册。如果注册的服务实例不是临时的,那么就使用HTTP客户端代理去进行注册。

NacosNamingService的init()方法在创建客户端代理,也就是执行NamingClientProxyDelegate的构造方法时,便会创建和初始化gRPC客户端代理NamingGrpcClientProxy。

创建和初始化gRPC客户端代理NamingGrpcClientProxy时,首先会由RpcClientFactory的createClient()方法创建一个RpcClient对象,并将GrpcClient对象赋值给NamingGrpcClientProxy的rpcClient属性,然后调用NamingGrpcClientProxy的start()方法启动RPC客户端连接。

在NamingGrpcClientProxy的start()方法中,会先注册一个用于处理服务端推送请求的NamingPushRequestHandler,然后调用RpcClient的start()方法启动RPC客户端即RpcClient对象,最后将NamingGrpcClientProxy自己作为订阅者向通知中心进行注册。

public class NacosNamingService implements NamingService {...private NamingClientProxy clientProxy;private void init(Properties properties) throws NacosException {...this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);}...@Overridepublic void registerInstance(String serviceName, Instance instance) throws NacosException {registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);}@Overridepublic void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);//调用NamingClientProxy的注册方法registerService(),其实就是NamingClientProxyDelegate.registerService()方法clientProxy.registerService(serviceName, groupName, instance);}...
}//客户端代理
public class NamingClientProxyDelegate implements NamingClientProxy {private final NamingHttpClientProxy httpClientProxy;private final NamingGrpcClientProxy grpcClientProxy;public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {...//初始化HTTP客户端代理this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);//初始化gRPC客户端代理this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);}...@Overridepublic void registerService(String serviceName, String groupName, Instance instance) throws NacosException {getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);}private NamingClientProxy getExecuteClientProxy(Instance instance) {return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;}...
}//gRPC客户端代理
public class NamingGrpcClientProxy extends AbstractNamingClientProxy {private final String namespaceId;private final String uuid;    private final Long requestTimeout;    private final RpcClient rpcClient;private final NamingGrpcRedoService redoService;//初始化gRPC客户端代理public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {super(securityProxy);this.namespaceId = namespaceId;this.uuid = UUID.randomUUID().toString();this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));Map<String, String> labels = new HashMap<String, String>();labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);//1.通过RpcClientFactory.createClient()方法创建一个GrpcSdkClient对象实例,然后赋值给rpcClient属性this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);this.redoService = new NamingGrpcRedoService(this);//2.启动gRPC客户端代理NamingGrpcClientProxystart(serverListFactory, serviceInfoHolder);}private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {rpcClient.serverListFactory(serverListFactory);//注册连接监听器rpcClient.registerConnectionListener(redoService);//1.注册一个用于处理服务端推送请求的NamingPushRequestHandlerrpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));//2.启动RPC客户端RpcClientrpcClient.start();//3.将NamingGrpcClientProxy自己作为订阅者向通知中心进行注册NotifyCenter.registerSubscriber(this);}...@Overridepublic void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance);redoService.cacheInstanceForRedo(serviceName, groupName, instance);//执行服务实例的注册doRegisterService(serviceName, groupName, instance);}//Execute register operation.public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {//创建请求参数对象InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance);//向服务端发起请求requestToServer(request, Response.class);redoService.instanceRegistered(serviceName, groupName);}private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {try {request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));//实际会调用RpcClient.request()方法发起gRPC请求Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {throw new NacosException(response.getErrorCode(), response.getMessage());}if (responseClass.isAssignableFrom(response.getClass())) {return (T) response;}NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());} catch (Exception e) {throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);}throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");}...
}public class RpcClientFactory {private static final Map<String, RpcClient> CLIENT_MAP = new ConcurrentHashMap<>();...//create a rpc client.public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize, Integer threadPoolMaxSize, Map<String, String> labels) {if (!ConnectionType.GRPC.equals(connectionType)) {throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());}return CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);try {//创建GrpcClient对象GrpcClient client = new GrpcSdkClient(clientNameInner);//设置线程核心数和最大数client.setThreadPoolCoreSize(threadPoolCoreSize);client.setThreadPoolMaxSize(threadPoolMaxSize);client.labels(labels);return client;} catch (Throwable throwable) {LOGGER.error("Error to init GrpcSdkClient for client name :" + clientName, throwable);throw throwable;}});}...
}

(2)gRPC客户端启动的源码

NamingGrpcClientProxy的start()方法会通过调用RpcClient的start()方法,来启动RPC客户端即RpcClient对象。

在RpcClient的start()方法中,首先会利用CAS来修改RPC客户端(RpcClient)的状态,也就是将RpcClient.rpcClientStatus属性从INITIALIZED更新为STARTING。

然后会创建一个核心线程数为2的线程池,并提交两个任务。任务一是处理连接成功或连接断开时的线程,任务二是处理重连或健康检查的线程。

接着会创建Connection连接对象,也就是在while循环中调用GrpcClient的connectToServer()方法,尝试与服务端建立连接。如果连接失败,则会抛出异常并且进行重试,由于是同步连接,所以最大重试次数是3。

最后当客户端与服务端成功建立连接后,会把对应的Connection连接对象赋值给RpcClient.currentConnection属性,并且修改RpcClient.rpcClientStatus属性即RPC客户端状态为RUNNING。

如果客户端与服务端连接失败,则会通过异步尝试进行连接,也就是调用RpcClient的switchServerAsync()方法,往RpcClient的reconnectionSignal队列中放入一个ReconnectContext对象,reconnectionSignal队列中的元素会交给任务2来处理。

public abstract class RpcClient implements Closeable {protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);protected ScheduledExecutorService clientEventExecutor;protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();//在NamingGrpcClientProxy初始化 -> 调用RpcClient.start()方法时,会将GrpcClient.connectToServer()方法的返回值赋值给currentConnection属性protected volatile Connection currentConnection;private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);...public final void start() throws NacosException {//利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTINGboolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);if (!success) {return;}//接下来创建调度线程池执行器,并提交两个任务clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.remote.worker");t.setDaemon(true);return t;});//任务1:处理连接成功或连接断开时的线程clientEventExecutor.submit(() -> {...     });//任务2:处理重连或健康检查的线程clientEventExecutor.submit(() -> {...});//创建连接对象Connection connectToServer = null;rpcClientStatus.set(RpcClientStatus.STARTING);//重试次数为3次int startUpRetryTimes = RETRY_TIMES;//在while循环中尝试与服务端建立连接,最多循环3次while (startUpRetryTimes > 0 && connectToServer == null) {try {startUpRetryTimes--;//获取服务端信息ServerInfo serverInfo = nextRpcServer();LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name, serverInfo);//调用GrpcClient.connectToServer()方法建立和服务端的长连接connectToServer = connectToServer(serverInfo);} catch (Throwable e) {LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}", name, e.getMessage(), startUpRetryTimes);}}//如果连接成功,connectToServer对象就不为空if (connectToServer != null) {LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}", name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());//连接对象赋值,currentConnection其实就是一个在客户端使用的GrpcConnection对象实例this.currentConnection = connectToServer;//更改RPC客户端RpcClient的状态rpcClientStatus.set(RpcClientStatus.RUNNING);//往eventLinkedBlockingQueue队列放入ConnectionEvent事件eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));} else {//尝试进行异步连接switchServerAsync();}registerServerRequestHandler(new ConnectResetRequestHandler());    //register client detection request.registerServerRequestHandler(request -> {if (request instanceof ClientDetectionRequest) {return new ClientDetectionResponse();}return null;});}protected ServerInfo nextRpcServer() {String serverAddress = getServerListFactory().genNextServer();//获取服务端信息return resolveServerInfo(serverAddress);}private ServerInfo resolveServerInfo(String serverAddress) {Matcher matcher = EXCLUDE_PROTOCOL_PATTERN.matcher(serverAddress);if (matcher.find()) {serverAddress = matcher.group(1);}String[] ipPortTuple = serverAddress.split(Constants.COLON, 2);int defaultPort = Integer.parseInt(System.getProperty("nacos.server.port", "8848"));String serverPort = CollectionUtils.getOrDefault(ipPortTuple, 1, Integer.toString(defaultPort));return new ServerInfo(ipPortTuple[0], NumberUtils.toInt(serverPort, defaultPort));}public void switchServerAsync() {//异步注册逻辑switchServerAsync(null, false);}protected void switchServerAsync(final ServerInfo recommendServerInfo, boolean onRequestFail) {//往reconnectionSignal队列里放入一个对象reconnectionSignal.offer(new ReconnectContext(recommendServerInfo, onRequestFail));}...
}

(3)gRPC客户端发起与服务端建立连接请求的源码

gRPC客户端与服务端建立连接的方法是GrpcClient的connectToServer()方法。该方法首先会获取进行网络通信的端口号,因为gRPC服务需要额外占用一个端口的,所以这个端口号是在Nacos的8848基础上 + 偏移量1000,变成9848。

在建立连接之前,会先检查一下服务端,如果没问题才发起连接请求,接着就会调用GrpcConnection的sendRequest()方法发起连接请求,最后返回GrpcConnection连接对象。

public abstract class GrpcClient extends RpcClient {...@Overridepublic Connection connectToServer(ServerInfo serverInfo) {try {if (grpcExecutor == null) {this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp());}//获取端口号:gRPC服务需要额外占用一个端口的,这个端口是在Nacos 8848的基础上,+ 偏移量1000,所以是9848int port = serverInfo.getServerPort() + rpcPortOffset();RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);if (newChannelStubTemp != null) {//检查一下服务端,没问题才会发起RPC连接请求Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);if (response == null || !(response instanceof ServerCheckResponse)) {shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());return null;}BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(newChannelStubTemp.getChannel());//创建连接对象GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());//create stream request and bind connection event to this connection.//创建流请求并将连接事件绑定到此连接StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);//stream observer to send response to servergrpcConn.setPayloadStreamObserver(payloadStreamObserver);grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());//send a  setup request.ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());conSetupRequest.setLabels(super.getLabels());conSetupRequest.setAbilities(super.clientAbilities);conSetupRequest.setTenant(super.getTenant());//发起连接请求grpcConn.sendRequest(conSetupRequest);//wait to register connection setupThread.sleep(100L);return grpcConn;}return null;} catch (Exception e) {LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);}return null;}private Response serverCheck(String ip, int port, RequestGrpc.RequestFutureStub requestBlockingStub) {try {if (requestBlockingStub == null) {return null;}ServerCheckRequest serverCheckRequest = new ServerCheckRequest();Payload grpcRequest = GrpcUtils.convert(serverCheckRequest);//向服务端发送一个检查请求ListenableFuture<Payload> responseFuture = requestBlockingStub.request(grpcRequest);Payload response = responseFuture.get(3000L, TimeUnit.MILLISECONDS);//receive connection unregister response here,not check response is success.return (Response) GrpcUtils.parse(response);} catch (Exception e) {LoggerUtils.printIfErrorEnabled(LOGGER, "Server check fail, please check server {} ,port {} is available , error ={}", ip, port, e);return null;}}private StreamObserver<Payload> bindRequestStream(final BiRequestStreamGrpc.BiRequestStreamStub streamStub, final GrpcConnection grpcConn) {//调用BiRequestStreamStub.requestBiStream()方法连接服务端return streamStub.requestBiStream(new StreamObserver<Payload>() {@Overridepublic void onNext(Payload payload) {LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}", grpcConn.getConnectionId(), payload.toString());try {Object parseBody = GrpcUtils.parse(payload);final Request request = (Request) parseBody;if (request != null) {try {Response response = handleServerRequest(request);if (response != null) {response.setRequestId(request.getRequestId());sendResponse(response);} else {LOGGER.warn("[{}]Fail to process server request, ackId->{}", grpcConn.getConnectionId(), request.getRequestId());}} catch (Exception e) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}", grpcConn.getConnectionId(), payload.toString(), e.getMessage());Response errResponse = ErrorResponse.build(NacosException.CLIENT_ERROR, "Handle server request error");errResponse.setRequestId(request.getRequestId());sendResponse(errResponse);}}} catch (Exception e) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}", grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());}}@Overridepublic void onError(Throwable throwable) {boolean isRunning = isRunning();boolean isAbandon = grpcConn.isAbandon();if (isRunning && !isAbandon) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}", grpcConn.getConnectionId(), throwable);if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {switchServerAsync();}} else {LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon);}}@Overridepublic void onCompleted() {boolean isRunning = isRunning();boolean isAbandon = grpcConn.isAbandon();if (isRunning && !isAbandon) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server", grpcConn.getConnectionId());if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {switchServerAsync();}} else {LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Ignore complete event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon);}}});}...
}

(4)总结

11.gRPC客户端的心跳机制(健康检查)

(1)线程任务一:处理连接成功或连接断开时的通知

(2)线程任务二:处理重连或健康检查

RpcClient的start()方法会调用GrpcClient的connectToServer()方法连接服务端,不管连接是否成功,最后都会往不同的阻塞队列中添加事件。

如果连接成功,那么就往RpcClient的eventLinkedBlockingQueue添加连接事件。如果连接失败,那么就往RpcClient的reconnectionSignal队列添加重连对象。而这两个阻塞队列中的数据处理,便是由执行RpcClient的start()方法时启动的两个线程任务进行处理的。

(1)线程任务一:处理连接成功或连接断开时的通知

这个任务主要在连接成功或者连接断开时,修改一些属性状态。通过eventLinkedBlockingQueue的take()方法从队列取到连接事件后,会判断连接事件是否建立连接还是断开连接。

如果是建立连接,那么就调用RpcClient的notifyConnected()方法,把执行NamingGrpcClientProxy的start()方法时所注册的NamingGrpcRedoService对象的connected属性设置为true。

如果是断开连接,那么就调用RpcClient的notifyDisConnected()方法,把执行NamingGrpcClientProxy的start()方法时所注册的NamingGrpcRedoService对象的connected属性设置为false。

public abstract class RpcClient implements Closeable {protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);protected ScheduledExecutorService clientEventExecutor;protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);//listener called where connection's status changed. 连接状态改变的监听器protected List<ConnectionEventListener> connectionEventListeners = new ArrayList<>();...public final void start() throws NacosException {//利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTINGboolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);if (!success) {return;}//接下来创建调度线程池执行器,并提交两个任务clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.remote.worker");t.setDaemon(true);return t;});//任务1:处理连接成功或连接断开时的线程clientEventExecutor.submit(() -> {while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {ConnectionEvent take;try {take = eventLinkedBlockingQueue.take();if (take.isConnected()) {notifyConnected();} else if (take.isDisConnected()) {notifyDisConnected();}} catch (Throwable e) {// Do nothing}}   });//任务2:向服务端上报心跳或重连的线程clientEventExecutor.submit(() -> {...});}...//Notify when client new connected.protected void notifyConnected() {if (connectionEventListeners.isEmpty()) {return;}LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", name);for (ConnectionEventListener connectionEventListener : connectionEventListeners) {try {connectionEventListener.onConnected();} catch (Throwable throwable) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}", name, connectionEventListener.getClass().getName());}}}//Notify when client disconnected.protected void notifyDisConnected() {if (connectionEventListeners.isEmpty()) {return;}LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", name);for (ConnectionEventListener connectionEventListener : connectionEventListeners) {try {connectionEventListener.onDisConnect();} catch (Throwable throwable) {LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}", name, connectionEventListener.getClass().getName());}}}...//Register connection handler. Will be notified when inner connection's state changed.//在执行NamingGrpcClientProxy.start()方法时会将NamingGrpcRedoService对象注册到connectionEventListeners中public synchronized void registerConnectionListener(ConnectionEventListener connectionEventListener) {LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Registry connection listener to current client:{}", name, connectionEventListener.getClass().getName());this.connectionEventListeners.add(connectionEventListener);}...
}public class NamingGrpcRedoService implements ConnectionEventListener {private volatile boolean connected = false;...@Overridepublic void onConnected() {connected = true;LogUtils.NAMING_LOGGER.info("Grpc connection connect");}@Overridepublic void onDisConnect() {connected = false;LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo");synchronized (registeredInstances) {registeredInstances.values().forEach(instanceRedoData -> instanceRedoData.setRegistered(false));}synchronized (subscribes) {subscribes.values().forEach(subscriberRedoData -> subscriberRedoData.setRegistered(false));}LogUtils.NAMING_LOGGER.warn("mark to redo completed");}...
}

(2)线程任务二:处理重连或健康检查

如果RpcClient的start()方法在调用GrpcClient的connectToServer()方法连接服务端时失败了,那么会往RpcClient.reconnectionSignal队列添加重连对象的,而这个任务就会获取reconnectionSignal队列中的重连对象进行重连。

因为reconnectionSignal中的数据是当连接失败时放入的,所以如果从reconnectionSignal中获取不到重连对象,等同于连接成功。

注意:这个任务从reconnectionSignal阻塞队列中获取重连对象时,调用的是阻塞队列的take()方法,而不是阻塞队列的poll()方法。BlockingQueue的take()方法,如果读取不到数据,会一直处于阻塞状态。BlockingQueue的poll()方法,在指定的时间内读取不到数据,会返回null。

情况一:如果从reconnectionSignal队列中获取到的重连对象为null

首先判断存活时间是否大于 5s,如果大于则调用RpcClient.healthCheck()方法发起健康检查的RPC请求。健康检查的触发方法是currentConnection.request()方法,健康检查的请求类型是HealthCheckRequest。

如果健康检查成功,只需刷新存活时间即可。如果健康检查失败,则需要尝试与服务端重新建立连接。

情况二:如果从reconnectionSignal队列中获取到的重连对象不为null

那么就调用RpcClient的reconnect()方法进行重新连接,该方法会通过GrpcClient的connectToServer()方法尝试与服务端建立连接。

public abstract class RpcClient implements Closeable {protected volatile AtomicReference<RpcClientStatus> rpcClientStatus = new AtomicReference<>(RpcClientStatus.WAIT_INIT);protected ScheduledExecutorService clientEventExecutor;protected BlockingQueue<ConnectionEvent> eventLinkedBlockingQueue = new LinkedBlockingQueue<>();private final BlockingQueue<ReconnectContext> reconnectionSignal = new ArrayBlockingQueue<>(1);...public final void start() throws NacosException {//利用CAS来修改RPC客户端(RpcClient)的状态,从INITIALIZED更新为STARTINGboolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);if (!success) {return;}//接下来创建调度线程池执行器,并提交两个任务clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.remote.worker");t.setDaemon(true);return t;});//任务1:处理连接成功或连接断开时的线程clientEventExecutor.submit(() -> {...     });//任务2:向服务端上报心跳或重连的线程clientEventExecutor.submit(() -> {while (true) {try {if (isShutdown()) {break;}//这里从reconnectionSignal阻塞队列中获取任务不是调用take()方法,而是调用poll()方法,并且指定了5s的最大读取时间//BlockingQueue的take()方法,如果读取不到数据,会一直处于阻塞状态//BlockingQueue的poll()方法,在指定的时间内读取不到数据,会返回nullReconnectContext reconnectContext = reconnectionSignal.poll(keepAliveTime, TimeUnit.MILLISECONDS);//reconnectContext为null,说明从reconnectionSignal中获取不到数据//由于reconnectionSignal中的数据是当连接失败时放入的//所以从reconnectionSignal中获取不到数据,等同于连接成功if (reconnectContext == null) {//check alive time.//检查存活时间,默认存活时间为5s,超过5s就需要做健康检查if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {//调用RpcClient.healthCheck()方法,发起健康检查请求boolean isHealthy = healthCheck();//如果向服务端发起健康检查请求失败,则需要尝试重新建立连接if (!isHealthy) {if (currentConnection == null) {continue;}LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server healthy check fail, currentConnection = {}", name, currentConnection.getConnectionId());//判断连接状态是否关闭,如果是则结束异步任务RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {break;}//修改RpcClient的连接状态为不健康boolean statusFLowSuccess = RpcClient.this.rpcClientStatus.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);//给reconnectContext属性赋值,准备尝试重连if (statusFLowSuccess) {//重新赋值,注意这里没有continue,所以逻辑会接着往下执行reconnectContext = new ReconnectContext(null, false);} else {continue;}} else {//如果向服务端发起健康检查请求成功,则刷新RpcClient的存活时间lastActiveTimeStamp = System.currentTimeMillis();continue;}} else {continue;}}if (reconnectContext.serverInfo != null) {//clear recommend server if server is not in server list.//如果服务器不在服务器列表中,则清除推荐服务器,即设置reconnectContext.serverInfo为nullboolean serverExist = false;//遍历服务端列表for (String server : getServerListFactory().getServerList()) {ServerInfo serverInfo = resolveServerInfo(server);if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {serverExist = true;reconnectContext.serverInfo.serverPort = serverInfo.serverPort;break;}}//reconnectContext.serverInfo不存在服务端列表中,就清除服务器信息,设置reconnectContext.serverInfo为nullif (!serverExist) {LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Recommend server is not in server list, ignore recommend server {}", name, reconnectContext.serverInfo.getAddress());reconnectContext.serverInfo = null;}}//进行重新连接,RpcClient.reconnect()方法中会调用GrpcClient.connectToServer()方法尝试与服务端建立连接reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);} catch (Throwable throwable) {//Do nothing}}});}private boolean healthCheck() {HealthCheckRequest healthCheckRequest = new HealthCheckRequest();if (this.currentConnection == null) {return false;}try {//利用currentConnection连接对象,发起RPC请求,请求类型是HealthCheckRequestResponse response = this.currentConnection.request(healthCheckRequest, 3000L);//not only check server is ok, also check connection is register.return response != null && response.isSuccess();} catch (NacosException e) {//ignore}return false;}...
}

(3)总结

http://www.xdnf.cn/news/428419.html

相关文章:

  • Leetcode (力扣)做题记录 hot100(49,136,169,20)
  • YOLOv1:开启实时目标检测的新篇章
  • SWMM的快速建模方法、SWMM与其他软件之间的数据转换:排水防涝、海绵城市设计等技术与二次开发
  • dockerdesktop 重新安装
  • SQL Server中delete table和truncate table删除全表数据哪个快?
  • 云手机服务器搭建
  • TCP协议中的IP地址/域名
  • 在scala中sparkSQL连接mysql并添加新数据
  • 单链表:多米诺骨牌的奇妙旅程
  • Shinkai开源程序 是一个双击安装 AI 管理器(本地和远程),它允许您使用简单的 UI 在 5 分钟或更短的时间内创建 AI 代理
  • 量化感知训练与 PyTorch 的哪些事
  • 力扣-226.翻转二叉树
  • 51c嵌入式~电路~合集27
  • 整数和浮点数转换时的精度损失
  • 拓扑排序(竞赛)
  • 按键精灵ios脚本新增元素功能助力辅助工具开发(二)
  • 春秋云镜 Time Writeup
  • 面试中被问到谈谈你对threadlocal的理解
  • 2025年5月-信息系统项目管理师高级-软考高项一般计算题
  • 基于Session实现短信登录全流程详解
  • 数据治理的核心
  • 论文知识总结
  • 日常知识点之随手问题整理(vcpkg安装osgearth并进行测试简单整理)
  • 【Ubuntu】扩充磁盘大小
  • 求1+3+5+7+9+…,其和小于等于500 的最大项
  • Java线程池性能优化全解析:从配置到实践
  • Redis学习笔记
  • SAP Business One(B1)打开自定义对象报错【Failed to initialize document numbering:】
  • 大模型核心运行机制
  • 玩转ChatGPT:DeepSeek实战(统一所在地格式)