策略调度平台实现总结
策略调度平台实现总结
一.背景
在当前的调度系统中,为了充分利用票台资源,针对票台返回的失败原因,系统设计了多种调度策略(如取消订单、暂停、中断等),以减少无用的重试操作,避免资源浪费。然而,这些策略的配置分散在QConfig中,缺乏统一管理和可视化支持,导致以下问题:
- 配置入口分散:暂停、取消、中断等策略配置未统一管理,修改时需跨多个模块检查。
- 人工校验依赖:新增配置时需手动排查冲突或重复配置,易遗漏。
- 多策略同步修改困难:C、Z、Q三家策略需分别修改,缺乏批量操作或关联同步机制。
- 无可视化工具:依赖开发人员直接修改代码或QConfig文件,易出错。
- 缺乏监控机制:无法及时感知未知的失败原因。
- 策略差异对比低效:需人工逐行对比三家策略配置,效率低下且容易出错。
为了解决上述问题,提升系统的可维护性和可扩展性,我们提出了一套全面的调度策略可视化改造方案。以下是具体实现细节和功能描述。
二.目标
通过本次改造,我们旨在构建一个高效、可靠、易用的配置管理系统,实现以下目标:
- 集中化管理:将所有调度策略配置集中在一个平台,避免分散和混乱。
- 可视化操作:提供用户友好的界面,支持非技术人员轻松查看和修改配置。
- 自动化工具支持
- 自动化策略差异对比,快速识别不一致。
- 自动检测配置冲突,避免重复或错误配置。
- 实时监控:及时发现未知失败原因,快速响应问题。
- 批量更新:支持批量操作,简化多策略同步修改流程。
- 高性能与扩展性:支持大规模配置存储和高效查询,满足快速迭代需求。
- 灰度功能支持:通过自定义灰度实现,满足不同业务场景下的灰度需求。
- 告警功能支持:当发现未知失败原因时,根据配置的告警规则,自动发送邮件和短信通知到用户。
三.技术实现
1. 自定义注解扫描Bean并注册到Spring容器中
为了实现调度策略的动态加载和管理,我们设计了一个基于Spring注解扫描的机制,通过自定义注解@ScheduleStrategyScan
扫描指定包路径下的策略实现类,并将其手动注册到Spring容器中。以下是具体实现:
自定义注解@ScheduleStrategyScan
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(ScheduleStrategyRegister.class)
public @interface ScheduleStrategyScan {String[] basePackages() default {}; // 扫描的基础包路径Line line(); // 产线类型
}
注册Bean到Spring容器
通过实现ImportBeanDefinitionRegistrar
接口,我们可以在Spring启动时动态注册Bean。以下是ScheduleStrategyRegister
的核心实现:
public class ScheduleStrategyRegister implements ImportBeanDefinitionRegistrar {private final GrabTicketLogger logger = GrabTicketLoggerFactory.getLogger(ScheduleStrategyRegister.class);@Overridepublic void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry registry) {registerBeanByClass(registry,ScheduleStrategyFactory.class);AnnotationAttributes attributes = AnnotationAttributes.fromMap(annotationMetadata.getAnnotationAttributes(ScheduleStrategyScan.class.getName()));String[] basePackages = attributes.getStringArray("basePackages");Line line = attributes.getEnum("line");for (String scanPackage : basePackages) {ClassPathScanningCandidateComponentProvider packageScanner = new ClassPathScanningCandidateComponentProvider(false);packageScanner.addIncludeFilter(new AssignableTypeFilter(IScheduleStrategy.class));Set<BeanDefinition> definitions = packageScanner.findCandidateComponents(scanPackage);registerScheduleStrategyBeans(registry, line, basePackages, definitions);}}/*** @param registry* @param line* @param scanPackages* @param scheduleGroupCandidates 注册调度策略bean* @author ljm* @date 15:51 2025/3/5**/private void registerScheduleStrategyBeans(BeanDefinitionRegistry registry, Line line, String[] scanPackages, Set<BeanDefinition> scheduleGroupCandidates) {// 获取所有可能的Bean定义Map<String, BeanDefinition> possibleBeanMap = Arrays.stream(registry.getBeanDefinitionNames()).collect(Collectors.toMap(Function.identity(), registry::getBeanDefinition));// 移除不满足条件的BeanDefinitionpossibleBeanMap.entrySet().removeIf(entry -> {String className = entry.getValue().getBeanClassName();return StringUtils.isBlank(className) || Arrays.stream(scanPackages).noneMatch(p -> StringUtils.startsWith(className, p));});// 注册新的Bean定义并移除重复的for (BeanDefinition candidate : scheduleGroupCandidates) {try {Class<?> strategyClazz = Class.forName(candidate.getBeanClassName());// 移除重复的定义(通过@Component注解的)if (AnnotationUtils.findAnnotation(strategyClazz, Component.class) != null) {possibleBeanMap.entrySet().stream().filter(entry -> Objects.equals(entry.getValue().getBeanClassName(), candidate.getBeanClassName())).forEach(entry -> registry.removeBeanDefinition(entry.getKey()));}// 根据规则生成Bean的名称String beanName = BeanHelper.mapBeanName(line, strategyClazz.getSimpleName());// 注册Bean定义registry.registerBeanDefinition(beanName, candidate);} catch (Exception e) {// 处理类未找到的异常logger.error("registerScheduleStrategyBeans", e);}}}private void registerBeanByClass(BeanDefinitionRegistry registry, Class clazz) {if (registry.containsBeanDefinition(clazz.getName())) {return;}RootBeanDefinition beanDefinition = new RootBeanDefinition(clazz);registry.registerBeanDefinition(clazz.getName(), beanDefinition);}
}
2. 定时刷新策略配置至本地内存
为了支持动态更新调度策略,我们设计了一个ScheduleRulesProvider
类,通过定时任务定期从远程同步规则,并将规则刷新到本地缓存中。以下是核心实现:
public class ScheduleRulesProvider {private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("ScheduleRulesRefresher-%d").build(), new ThreadPoolExecutor.DiscardPolicy());@Getterprivate final Line line;private Map<String, List<ScheduleStrategyRuleDto>> ruleMap = new ConcurrentHashMap<>();public ScheduleRulesProvider(Line line) {this.line = line;//定时刷新SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(this::refresh, 30, 10, TimeUnit.SECONDS);refresh();}public List<ScheduleStrategyRuleDto> getScheduleRules(String channel, String bizType) {return Collections.unmodifiableList(ruleMap.getOrDefault(composeKey(channel, bizType), Collections.emptyList()));}private void refresh() {List<ScheduleStrategyRuleDto> rules = AdapterSyncRemoteScheduleRulesManager.syncRemoteScheduleRules(line);if (CollectionUtils.isNotEmpty(rules)) {Map<String, List<ScheduleStrategyRuleDto>> result = rules.stream().collect(Collectors.groupingBy(x ->composeKey(x.getChannel(), x.getBizType())));result.forEach((k, l) -> l.sort(Comparator.comparingInt(ScheduleStrategyRuleDto::getSequence).reversed()));this.ruleMap = result;}}private String composeKey(String channel, String bizType) {String defaultChannel = StringUtils.defaultIfBlank(channel, "ALL");return String.format("%s_%s", defaultChannel, bizType);}
3. 通过原因按优先级高低匹配到执行策略类名
在众多策略中,失败原因的优先级决定了执行策略的选择。我们通过权重将策略排序,从而选择最佳的策略。
private void refresh() {List<ScheduleStrategyRuleDto> rules = AdapterSyncRemoteScheduleRulesManager.syncRemoteScheduleRules(line);if (CollectionUtils.isNotEmpty(rules)) {Map<String, List<ScheduleStrategyRuleDto>> result = rules.stream().collect(Collectors.groupingBy(x ->composeKey(x.getChannel(), x.getBizType())));// 根据权重降序排序result.forEach((k, l) -> l.sort(Comparator.comparingInt(ScheduleStrategyRuleDto::getSequence).reversed()));this.ruleMap = result;}}
4. 通过执行策略类名找到Bean
通过从远程获取的执行策略类名调用**getStrategy(Line line, String name)**查找到具体实现类的Bean
public class ScheduleStrategyFactory implements ApplicationContextAware {private final GrabTicketLogger logger = GrabTicketLoggerFactory.getLogger(ScheduleStrategyFactory.class);private static final Map<String, IScheduleStrategy> STRATEGY_CACHE = new ConcurrentHashMap<>();private ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {try {this.applicationContext = applicationContext;cacheScheduleStrategy(applicationContext);} catch (Exception e) {logger.error("HandlerFactory", e);}}private void cacheScheduleStrategy(ApplicationContext applicationContext) {Map<String, IScheduleStrategy> result = applicationContext.getBeansOfType(IScheduleStrategy.class);if (MapUtils.isEmpty(result)) {return;}STRATEGY_CACHE.putAll(result);}/*** @param name* @return com.ctrip.train.grabticket.common.schedule.IScheduleStrategy* @author ljm* @date 10:02 2025/3/4**/public static IScheduleStrategy getStrategy(Line line, String name) {String beanName = BeanHelper.mapBeanName(line, name);return STRATEGY_CACHE.get(beanName);}}
5. 未知原因结合NLP技术和正则匹配智能提取关键信息并保存
对于未知失败原因,我们结合NLP技术和正则匹配提取关键信息,并保存到数据库中
public class ReasonUtil {// HNLP 关键字处理,强制使用自定义字典 目录\src\resources\data\dictionary\custompublic static final Segment SEGMENT = HanLP.newSegment().enableCustomDictionaryForcing(true).enableMultithreading(true);static {// 预热字典extractKeyInformation("测试");}/*** @param text* @return java.lang.String* 提取关键信息* @author ljm* @date 17:17 2025/4/8**/public static String extractKeyInformation(String text) {if (StringUtils.isBlank(text)) {return Strings.EMPTY;}// 去除空格text = text.replace(" ", "");text = RegularUtil.replaceMatches(text, RegularUtil.ORDER_ID_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.MASKED_MOBILE_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.ID_CARD_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.MASKED_NAME_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.PASSPORT_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.MASKED_EMAIL_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.TRAIN_NUMBER_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.DATE_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.ENGLISH_NAME_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.USERNAME_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.PASSENGER_NAME_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.PHONE_PATTERN);List<Term> termList = SEGMENT.seg(text);StringBuilder result = new StringBuilder();for (Term term : termList) {String nature = term.nature.toString();//过滤人名,站名,地名if ("nr".equals(nature) || "ns".equals(nature)) {result.append("***");continue;}result.append(term.word);}return result.toString();}
}
6. 灰度发布策略
为了支持灵活的灰度发布策略功能,我们设计了一个通用的灰度匹配接口IGrayMatchProvider
,使用方通过实现该接口并注册到调度平台组件中,系统在使用灰度功能时会默认调用使用方的实现。
灰度匹配接口
public interface IGrayMatchProvider<T extends GrayMatchModel, U extends GrayConfigModel> {/*** 判断是否满足灰度* @param grayMatchModel 灰度匹配模型* @param config 灰度配置* @return 是否满足灰度*/boolean isMatch(T grayMatchModel, String config);
}
使用方实现灰度匹配逻辑
@Component
public class CustomGrayMatchProvider implements IGrayMatchProvider<MyGrayMatchModel, MyGrayConfigModel> {@Overridepublic boolean isMatch(MyGrayMatchModel grayMatchModel, String config) {MyGrayConfigModel grayConfig = parseConfig(config);return grayMatchModel.getUserId() % grayConfig.getDivisor() == 0;}private MyGrayConfigModel parseConfig(String config) {return new MyGrayConfigModel(config);}
}
7. 策略执行参数解析
在调度策略中,不同的策略可能需要不同的执行参数。为了支持动态解析这些参数,我们在策略执行类的接口中定义了一个parseConfig
方法。所有具体的策略执行类都实现该接口,并通过泛型实现 JSON 配置的反序列化。
public interface IScheduleStrategy<Config extends BaseStrategyConfig, Context extends BaseScheduleContext,R> {/*** 初始化策略配置*/Config parseConfig(String strategyConfig);/*** 执行处理逻辑*/HandleResult<R> handle(Context context);
}
8. 告警功能
为了能及时感知到未知原因没有匹配到执行策略的情况,我们设计了一个告警服务模块,当系统发现未知失败原因时,会根据配置的告警规则,自动发送邮件和短信通知到用户。以下是具体实现细节:
告警规则匹配逻辑
private void unMatchedReasonAlert(ProcessUnKnownReasonContext processUnKnownReasonContext, String keyInformation) {SchedulePlatformAlertSettingDaoExt schedulePlatformAlertSetting = SchedulePlatformDaoFactory.getInstance().schedulePlatformAlertSetting();SchedulePlatformAlertSetting platformAlertSetting = schedulePlatformAlertSetting.queryAlertSetting(processUnKnownReasonContext.getLine().getLineName(),List.of(AlertSettingStatus.OPEN.getStatus()));if (platformAlertSetting == null) {logger.warn("NotFindPlatformAlertSetting", String.format("Line:%s,未找到告警配置", processUnKnownReasonContext.getLine().getLineName()));MetricHelper.metricUnFindAlertSetting(processUnKnownReasonContext.getLine());return;}// 发送告警IRedisClient schedulePlatformInstance = RedisClientManager.getSchedulePlatformInstance();String bizType = StringUtils.defaultIfBlank(processUnKnownReasonContext.getBizType(), "ALL");String channel = StringUtils.defaultIfBlank(processUnKnownReasonContext.getChannel(), "ALL");String key = String.format("%s_%s_%s_%s", processUnKnownReasonContext.getLine().getLineName(), bizType, channel, keyInformation);String dailyLimitKey = String.format("dailyLimit_%s_%s_%s_%s", processUnKnownReasonContext.getLine().getLineName(), bizType, channel, keyInformation);Long sendTimesForToday = schedulePlatformInstance.get(dailyLimitKey, Long.class);if (sendTimesForToday != null && sendTimesForToday > platformAlertSetting.getDailyLimit()) {logger.info("overDailyLimit", String.format("Line:%s,超出当日发送最大次数%s", processUnKnownReasonContext.getLine().getLineName(), platformAlertSetting.getDailyLimit()));return;}boolean markedSuccess = schedulePlatformInstance.setIfNullWithAtom(key, keyInformation, Long.valueOf(platformAlertSetting.getIntervalSeconds()));if (!markedSuccess) {return;}String messageBody = String.format("策略调度平台未匹配到有效执行策略\n" +"产线:%s\n" +"业务:%s\n" +"渠道:%s\n" +"订单号:%s\n" +"原始原因:%s\n" +"提取后关键信息:%s",processUnKnownReasonContext.getLine().getLineName(), bizType, channel, processUnKnownReasonContext.getOrderNumber(),processUnKnownReasonContext.getOriginReason(), keyInformation);// 对当天发送次数进行计数schedulePlatformInstance.incrWithExpire(dailyLimitKey,DateUtils.ldtToUnixTime(LocalDateTime.now().with(LocalTime.MAX)));SmsUtils.send(platformAlertSetting.getNotifyMobiles(), messageBody);if (StringUtils.isBlank(platformAlertSetting.getNotifyEmails())) {return;}messageBody = String.format("策略调度平台未匹配到有效执行策略<br>" +"产线:%s<br>" +"业务:%s<br>" +"渠道:%s<br>" +"订单号:%s<br>" +"原始原因:%s<br>" +"提取后关键信息:%s",processUnKnownReasonContext.getLine().getLineName(), bizType, channel, processUnKnownReasonContext.getOrderNumber(),processUnKnownReasonContext.getOriginReason(), keyInformation);List<String> emailList = Splitter.on(";").splitToStream(platformAlertSetting.getNotifyEmails()).toList();EmailHelper.send(emailList, "策略调度平台", messageBody);}
四. 功能整合与性能优化
1. 功能点整合与协同
在调度策略可视化改造方案中,各个功能点通过紧密的协同工作,共同支撑系统的高效运行。以下是各功能点的整合与协同机制:
- 配置管理与动态加载的协同:
- 通过数据库表存储调度策略配置,结合自定义注解扫描和动态注册机制,实现了配置的集中化管理和动态加载。配置的修改无需重启服务,系统通过定时任务定期刷新规则,确保最新配置能够实时生效。
- 在配置加载过程中,系统会自动检测冲突或重复配置,避免人工校验的遗漏。通过自动化工具支持,用户可以快速对比策略差异,确保多策略同步修改的一致性。
- 失败原因匹配与灰度发布的协同:
- 通过借鉴Mysql
LIKE
查询逻辑实现失败原因的模糊匹配,结合优先级排序机制,确保系统能够快速找到最佳的调度策略。 - 在匹配到调度策略后,系统会根据灰度匹配接口的实现,判断是否满足灰度条件。灰度功能的引入使得新策略可以在小范围内逐步推广,降低风险。
- 通过借鉴Mysql
- 未知原因处理与告警机制的协同:
- 对于未匹配到的失败原因,系统会结合 NLP 技术和正则匹配提取关键信息,并将其保存到数据库中,为后续分析提供数据支持。
- 当发现未知失败原因时,系统会根据配置的告警规则,自动发送邮件和短信通知到相关人员,确保问题能够及时被感知和处理。
2. 系统性能与稳定性优化
为了确保调度策略可视化平台的高效运行和稳定性,我们在系统设计中引入了多项性能优化和稳定性保障措施:
- 高性能规则匹配:
- 通过将规则加载到内存中,避免了频繁的数据库查询操作,显著提升了匹配性能。
- 在规则加载时,系统会根据优先级对规则进行排序,确保高优先级规则优先匹配,减少不必要的匹配计算。
- 稳定性保障:
- 在定时刷新规则的任务中,系统引入了异常捕获机制,确保任务失败时不会影响系统的正常运行。
- 在告警功能中,系统通过 Redis 实现了限流和降级机制,避免因告警消息过多导致系统资源耗尽。
- 扩展性设计:
- 系统采用模块化设计,各个功能点(如配置管理、失败原因匹配、灰度发布、告警机制等)相互独立,便于后续扩展和维护。
- 通过通用的灰度匹配接口,系统支持不同业务场景下的灰度需求,满足多样化的业务需求。
五. 总结
通过本次调度策略可视化改造,我们成功解决了现有配置管理中的痛点问题,显著提升了系统的可维护性和可扩展性。具体成果包括:
- 集中化管理:所有配置集中存储在数据库中,避免分散和混乱。
- 可视化操作:用户友好的界面支持非技术人员轻松操作。
- 自动化工具支持:自动化差异对比和冲突检测功能,大幅提升操作效率。
- 实时监控:及时发现未知失败原因,保障系统稳定性。
- 批量更新:简化多策略同步修改流程,提高工作效率。
- 灰度功能支持:通过
IGrayMatchProvider
接口,支持灵活的灰度实现,满足不同业务场景下的灰度需求。 - 告警功能支持:通过配置的告警规则,自动发送邮件和短信通知到用户,确保问题能够及时被感知和处理。
未来,我们将继续优化平台功能,探索更多智能化工具,为调度系统的稳定运行和高效管理提供更强有力的支持