当前位置: 首页 > news >正文

依赖nacos实例动态创建线程池并监听服务上下线

版本

  • Spring Booot 版本 3.2.4
  • Spring Cloud 版本 2023.0.1
  • Spring Cloud Alibaba 版本 2023.0.1.2

依赖

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

配置文件

  • bootstrap.yml配置
spring:application:name: ${app_name:${APP_NAME:demo}} cloud:nacos:username: ${config_username:${CONFIG_USERNAME:nacos}}password: ${config_password:${CONFIG_PASSWORD:nacos}}discovery:server-addr: ${config_server_uri:${CONFIG_SERVER_URI:10.244.36.167:8848}}namespace: ${config_profiles_active:${CONFIG_PROFILES_ACTIVE:dev}}group: ${config_group:${CONFIG_GROUP:demo}}service: ${spring.application.name}config:server-addr: ${config_server_uri:${CONFIG_SERVER_URI:10.244.36.167:8848}}namespace: ${config_profiles_active:${CONFIG_PROFILES_ACTIVE:dev}}group: ${config_group:${CONFIG_GROUP:demo}}file-extension: ymlrefresh-enabled: true # 开启配置自动刷新,默认开启,此处显示声明
  •  application-dev.yml配置
thread-pool:core-size: 2          # 核心线程数max-size: 2           # 最大线程数queue-capacity: 100   # 队列容量keep-alive-time: 60   # 空闲线程存活时间(秒)

配置类

  • NacosThreadPoolProperties
package com.xxx.xxx.amedia.media.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;/*** nacos 线程池配置类* @author xxx* @date 2025-5-23 13:54:21*/
@Data
@Configuration
@ConfigurationProperties(prefix = "thread-pool")
public class NacosThreadPoolProperties {/*** 核心线程数*/private int coreSize;/*** 最大线程数*/private int maxSize;/*** 队列容量*/private int queueCapacity;/*** 空闲线程活跃时间*/private int keepAliveTime;
}

  • NacosDiscoveryProperties
package com.xxx.xxx.amedia.media.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;/*** nacos discovery 配置类* @author xxx* @date 2025-5-23 13:54:21*/
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.cloud.nacos.discovery")
public class NacosDiscoveryProperties {/*** nacos服务地址*/private String serverAddr;/*** 命名空间*/private String namespace;/*** 分组*/private String group;
}
  • NacosTokenProperties
package com.xxx.xxx.amedia.media.properties;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;/*** nacos 线程池配置类* @author xxx* @date 2025-5-23 13:54:21*/
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.cloud.nacos")
public class NacosTokenProperties {/*** 用户名*/private String username;/*** 密码*/private String password;
}

常量类

package com.xxx.xxx.amedia.media.constant;/*** nacos 常量类* @author xxx* @date 2025-5-26 10:07:23*/
public final class NacosConstants {private NacosConstants() {}/*** nacos配置 username*/public static final String USERNAME = "username";/*** nacos配置 password*/public static final String PASSWORD = "password";/*** nacos配置 server-addr*/public static final String SERVER_ADDR = "serverAddr";/*** nacos配置 namespace*/public static final String NAMESPACE = "namespace";/*** nacos配置 group*/public static final String GROUP = "group";}

nacos 核心类 NamingService 配置

package com.xxx.amedia.media.config;import com.xxx.amedia.media.constant.NacosConstants;
import com.xxx.amedia.media.properties.NacosDiscoveryProperties;
import com.xxx.amedia.media.properties.NacosTokenProperties;import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Properties;/*** nacos 核心类 NamingService 配置* @author xxx* @date 2025-5-23 15:18:24*/
@Configuration
public class NamingServiceConfig {@Autowiredprivate NacosTokenProperties nacosTokenProperties;@Autowiredprivate NacosDiscoveryProperties nacosDiscoveryProperties;@Beanpublic NamingService namingService() throws NacosException {Properties properties = new Properties();// nacos 服务器地址properties.put(NacosConstants.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());// 命名空间properties.put(NacosConstants.NAMESPACE, nacosDiscoveryProperties.getNamespace());properties.put(NacosConstants.GROUP, nacosDiscoveryProperties.getGroup());properties.put(NacosConstants.USERNAME, nacosTokenProperties.getUsername());properties.put(NacosConstants.PASSWORD, nacosTokenProperties.getPassword());return NacosFactory.createNamingService(properties);}
}

 nacos实例 线程池管理类

package com.xxx.amedia.media.manager;import com.xxx.amedia.media.properties.NacosDiscoveryProperties;
import com.xxx.amedia.media.properties.NacosThreadPoolProperties;
import com.xxx.amedia.rpc.config.MediaRpcProperties;import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** nacos实例 线程池管理类* @author xxx* @date 2025-5-23 10:30:21*/
@Slf4j
@Component
public class NacosThreadPoolManager {@Autowiredprivate MediaRpcProperties mediaRpcProperties;@Autowiredprivate NacosDiscoveryProperties nacosDiscoveryProperties;@Autowiredprivate NacosThreadPoolProperties nacosThreadPoolConfig;/*** 订阅服务上线和下线事件*/@Autowiredprivate NamingService namingService;/*** 存储 IP 和线程池的映射关系*/private final Map<String, ThreadPoolExecutor> ipThreadPoolMap = new ConcurrentHashMap<>();@PostConstructpublic void init() {// 注册监听器registerServiceListener(mediaRpcProperties.getServiceName());}/*** 注册服务监听器,监听指定服务的实例变化** @param serviceName 服务名称*/private void registerServiceListener(String serviceName) {try {namingService.subscribe(serviceName, nacosDiscoveryProperties.getGroup(), event -> {if (event instanceof NamingEvent namingEvent) {String serviceNameFromEvent = namingEvent.getServiceName();if (StringUtils.equals(serviceNameFromEvent, serviceName)) {handleInstanceChange(namingEvent);}}});} catch (NacosException ex) {// 处理异常,例如记录日志log.error("Register service listener occur exception.", ex);}}/*** 处理实例变化事件* @param event NamingEvent*/private void handleInstanceChange(NamingEvent event) {// 获取当前所有实例的 IPSet<String> currentIps = event.getInstances().stream().map(Instance::getIp).collect(Collectors.toSet());// 处理实例变化事件handleInstanceChange(currentIps);}/*** 处理实例变化事件* @param currentIps 当前所有实例的 IP*/private void handleInstanceChange(Set<String> currentIps) {// 旧实例IP集合Set<String> previousIps = new HashSet<>(ipThreadPoolMap.keySet());// 新增的 IPSet<String> newIps = new HashSet<>(currentIps);newIps.removeAll(previousIps);// 移除的 IPSet<String> removedIps = new HashSet<>(previousIps);removedIps.removeAll(currentIps);// 对新增的 IP 创建线程池for (String ip : newIps) {createThreadPoolForIp(ip);}// 对移除的 IP 销毁线程池for (String ip : removedIps) {destroyThreadPoolForIp(ip);}}/*** 创建指定 IP 的线程池* @param ip 实例 IP*/public ThreadPoolExecutor createThreadPoolForIp(String ip) {// ThreadFactoryBuilder 设置线程名称ThreadFactory customThreadFactory = new ThreadFactoryBuilder().setNamePrefix("nacos-pool-" + ip + "-thread-%d").setDaemon(false).setPriority(Thread.NORM_PRIORITY).build();// 创建线程池,超出队列长度由调用者执行ThreadPoolExecutor executor = new ThreadPoolExecutor(nacosThreadPoolConfig.getCoreSize(),nacosThreadPoolConfig.getMaxSize(),nacosThreadPoolConfig.getKeepAliveTime(),TimeUnit.SECONDS,new LinkedBlockingQueue<>(nacosThreadPoolConfig.getQueueCapacity()),customThreadFactory,new ThreadPoolExecutor.CallerRunsPolicy());log.info("Created thread pool for IP: {}", ip);ipThreadPoolMap.put(ip, executor);return executor;}/*** 销毁指定 IP 的线程池* @param ip 实例 IP*/private void destroyThreadPoolForIp(String ip) {ThreadPoolExecutor executor = ipThreadPoolMap.remove(ip);if (executor == null) {return;}executor.shutdown(); // 关闭线程池,不再接受新任务try {if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {executor.shutdownNow(); // 强制关闭}} catch (InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}log.info("Destroyed thread pool for IP: {}", ip);}/*** 获取指定 IP 的线程池* @param ip 实例 IP* @return 对应的线程池,如果不存在则返回 null*/public ThreadPoolExecutor getThreadPoolForIp(String ip) {ThreadPoolExecutor executor = ipThreadPoolMap.get(ip);// 获取不到线程池手动创建if (executor == null) {executor = createThreadPoolForIp(ip);}return executor;}/*** 应用关闭时销毁所有线程池*/@PreDestroypublic void shutdownAllThreadPools() {for (Map.Entry<String, ThreadPoolExecutor> entry : ipThreadPoolMap.entrySet()) {String ip = entry.getKey();ThreadPoolExecutor executor = entry.getValue();executor.shutdown();try {if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {executor.shutdownNow();}} catch (InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}}ipThreadPoolMap.clear();log.info("succeed shutdown all thread pools.");}/*** 每分钟监控一次线程池的状态*/@Scheduled(fixedRate = 60000)public void monitorThreadPool() {for (Map.Entry<String, ThreadPoolExecutor> entry : ipThreadPoolMap.entrySet()) {String ip = entry.getKey();ThreadPoolExecutor executor = entry.getValue();log.info("Monitor thread pool. ip: {}, Active Threads: {}, Queue Size: {}",ip,executor.getActiveCount(),executor.getQueue().size());}}
}

执行线程任务 管理类 

package com.xxx.amedia.media.manager;import com.xxx.amedia.media.bo.FailureTaskBO;
import com.xxx.amedia.rpc.enums.RpcTypeEnum;
import com.xxx.amedia.rpc.model.RpcReloadResponse;
import com.xxx.amedia.rpc.proxy.RpcReloadProxy;
import com.xxx.amedia.system.api.NacosService;import cn.hutool.core.lang.Pair;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;/*** 执行线程任务 管理类* @author xxx* @date 2025-5-23 11:01:07*/
@Slf4j
@Service
public class RpcReloadManager {@Autowiredprivate NacosService nacosService;@Autowiredprivate RpcReloadProxy rpcReloadProxy;@Autowiredprivate NacosThreadPoolManager threadPoolManager;/*** 异步执行任务  所有节点  无参* @param rpcTypeEnum rpc类型枚举* @return 失败节点列表*/public List<FailureTaskBO> execute(RpcTypeEnum rpcTypeEnum) {return execute(rpcTypeEnum, Collections.emptyList());}/*** 异步执行任务  所有节点  无参* @param rpcTypeEnum rpc类型枚举* @param params rpc接口参数* @return 失败节点列表*/public List<FailureTaskBO> execute(RpcTypeEnum rpcTypeEnum, Object params) {List<String> ips = nacosService.getInstanceIps();List<FailureTaskBO> failureList = new ArrayList<>();if (CollectionUtils.isEmpty(ips)) {return failureList;}CountDownLatch countDownLatch = new CountDownLatch(ips.size());ips.forEach(execNodeIp -> {try {execute(execNodeIp, rpcTypeEnum, params, failureList);} catch (Exception ex) {FailureTaskBO failureTaskBO = FailureTaskBO.builder().ip(execNodeIp).code(500).msg("rpc任务执行失败").build();failureList.add(failureTaskBO);log.error("execute task occur exception: ip is {}, rpcTypeEnum is {}, msg is {}.", execNodeIp, rpcTypeEnum, ex.getMessage(), ex);} finally {countDownLatch.countDown();}});try {// 主线程等待所有任务完成countDownLatch.await();} catch (InterruptedException e) {// 恢复中断状态interrupt();}return failureList;}/*** 异步执行任务  指定节点列表* @param rpcTypeEnum rpc类型枚举* @param ips 指定节点列表* @return 失败节点列表*/public List<FailureTaskBO> executeForIps(RpcTypeEnum rpcTypeEnum, List<String> ips) {return executeForIps(rpcTypeEnum, ips, Collections.emptyList());}/*** 异步执行任务  指定节点列表* @param rpcTypeEnum rpc类型枚举* @param ips 指定节点列表* @param params rpc接口参数* @return 失败节点列表*/public List<FailureTaskBO> executeForIps( RpcTypeEnum rpcTypeEnum, List<String> ips, Object params) {List<FailureTaskBO> failureList = new ArrayList<>();if (CollectionUtils.isEmpty(ips)) {return failureList;}CountDownLatch countDownLatch = new CountDownLatch(ips.size());ips.forEach(execNodeIp -> {try {execute(execNodeIp, rpcTypeEnum, params, failureList);} catch (Exception ex) {FailureTaskBO failureTaskBO = FailureTaskBO.builder().ip(execNodeIp).code(500).msg("rpc reload error").build();failureList.add(failureTaskBO);log.error("execute task occur exception: ip is {}, rpcTypeEnum is {}, msg is {}.", execNodeIp, rpcTypeEnum, ex.getMessage(), ex);} finally {countDownLatch.countDown();}});try {// 主线程等待所有任务完成countDownLatch.await();} catch (InterruptedException e) {// 恢复中断状态interrupt();}return failureList;}/*** 恢复中断状态*/private void interrupt() {// 恢复中断状态Thread.currentThread().interrupt();log.info("Thread {} is interrupted.", Thread.currentThread().getName());}/*** 异步线程池执行rpc任务* @param execNodeIp 执行节点IP* @param rpcTypeEnum rpc类型枚举* @param failureList 失败节点信息* @throws ExecutionException 执行异常* @throws InterruptedException 中断异常*/private void execute(String execNodeIp, RpcTypeEnum rpcTypeEnum, Object params, List<FailureTaskBO> failureList)throws ExecutionException, InterruptedException {ThreadPoolExecutor executor = threadPoolManager.getThreadPoolForIp(execNodeIp);FutureTask<Pair<Boolean, RpcReloadResponse>> futureTask = new FutureTask<>(() -> {return rpcReloadProxy.reload(execNodeIp, rpcTypeEnum, params);});executor.submit(futureTask);Pair<Boolean, RpcReloadResponse> pair = futureTask.get();if (!pair.getKey()) {FailureTaskBO failureTaskBO = bulidFailureTaskBO(pair, execNodeIp);failureList.add(failureTaskBO);}}/*** 构建失败信息* @param pair rpc响应信息* @param execNodeIp 执行节点IP* @return FailureTaskBO*/private FailureTaskBO bulidFailureTaskBO(Pair<Boolean, RpcReloadResponse> pair, String execNodeIp) {RpcReloadResponse res = pair.getValue();FailureTaskBO failureTaskBO;if (res == null) {failureTaskBO = FailureTaskBO.builder().ip(execNodeIp).code(500).msg("rpc reload error").build();} else {failureTaskBO = FailureTaskBO.builder().ip(execNodeIp).code(res.getCode()).msg(res.getMsg()).build();}return failureTaskBO;}
}

nacos服务实例service类

package com.xxx.amedia.system.api;import com.xxx.amedia.system.converter.ServiceInstanceConverter;
import com.xxx.amedia.system.vo.ServiceInstanceVO;
import com.xxx.amedia.tool.common.constants.CommonConstants;import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;/*** nacos服务实例service类* @author xxx* @date 2025-5-17 11:38:22*/
@Service
public class NacosService{@Value("${media.rpc.service-name}")private String serviceName;@Autowiredprivate DiscoveryClient discoveryClient;@Autowiredprivate ServiceInstanceConverter serviceInstanceConverter;@Overridepublic List<ServiceInstanceVO> list() {List<ServiceInstance> serviceInstances = getInstances();return serviceInstanceConverter.toServiceInstanceVOList(serviceInstances);}/*** 从nacos获取指定服务名的实例IP集合* @return 实例IP集合*/@Overridepublic List<String> getInstanceIps() {List<ServiceInstance> instances = getInstances();if (CollectionUtils.isEmpty(instances)) {return Collections.emptyList();}List<String> instanceIps = new ArrayList<>(instances.size());instances.forEach(instance -> {instanceIps.add(instance.getHost());});return instanceIps;}/*** 检查icc-amedia-admin服务中是否存在指定 IP 的节点* @param instanceIps 获取 icc-amedia-rpc 服务的所有实例Ip* @param targetIp 目标 IP(如 10.199.1.1)* @return true 存在 / false 不存在*/@Overridepublic boolean checkNodeExists(List<String> instanceIps, String targetIp) {// 遍历实例,检查 IPfor (String instanceIp : instanceIps) {if (StringUtils.equals(targetIp, instanceIp)) {return true;}}return false;}/*** 从nacos获取指定服务名的实例集合* @return 实例集合*/private List<ServiceInstance> getInstances() {List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceName);// 按host升序排列serviceInstances.sort(Comparator.comparing(ServiceInstance::getHost));return serviceInstances;}}

http://www.xdnf.cn/news/703171.html

相关文章:

  • 深度解读漏洞扫描:原理、类型与应用实践
  • PostGIS栅格数据类型解析【geomval】
  • makefile学习笔记
  • Java图片处理实战:Imgscalr技术方案
  • 视觉中国:镜头下的中国发展图景
  • C++030(内联函数)
  • Δ-Σ ADC的工作原理
  • 2025东北CCPC(部分+详解)
  • Java从入门到精通 - 面向对象编程基础
  • 将can日志数据提取到excle中
  • HOMIE——遥操类似ALOHA主从臂的外骨骼驾驶舱收集数据:通过上肢模仿学习和全身控制RL训练自主策略
  • JVM虚拟机监控及性能调优实战!
  • mqtt c语言publish topic
  • 6 质量控制中的常用缩略语和符号(OEE)以及解释
  • 嵌入式学习之系统编程(七)线程的控制(互斥与同步)和死锁
  • CPG开源项目对比
  • 18度的井水
  • C++补充基础小知识:为什么要继承、什么时候继承、什么时候直接用
  • 高并发计数器LongAdder 实现原理与使用场景详解
  • Jmeter性能测试(应用场景、性能测试流程、搭建测试环境)
  • 实例与选项对象
  • SpringBoot+Vue+Echarts实现可视化图表的渲染
  • 自动生成程序的heap文件
  • #!/usr/bin/env python
  • JS中的属性描述符
  • Day 20
  • 生成式引擎在不同行业的应用案例
  • 第十章 Java基础-Static静态变量
  • 基于物理约束的稀疏IMU运动捕捉系统
  • spring和Mybatis的各种查询