当前位置: 首页 > news >正文

策略调度平台实现总结

策略调度平台实现总结

一.背景

在当前的调度系统中,为了充分利用票台资源,针对票台返回的失败原因,系统设计了多种调度策略(如取消订单、暂停、中断等),以减少无用的重试操作,避免资源浪费。然而,这些策略的配置分散在QConfig中,缺乏统一管理和可视化支持,导致以下问题:

  • 配置入口分散:暂停、取消、中断等策略配置未统一管理,修改时需跨多个模块检查。
  • 人工校验依赖:新增配置时需手动排查冲突或重复配置,易遗漏。
  • 多策略同步修改困难:C、Z、Q三家策略需分别修改,缺乏批量操作或关联同步机制。
  • 无可视化工具:依赖开发人员直接修改代码或QConfig文件,易出错。
  • 缺乏监控机制:无法及时感知未知的失败原因。
  • 策略差异对比低效:需人工逐行对比三家策略配置,效率低下且容易出错。

为了解决上述问题,提升系统的可维护性和可扩展性,我们提出了一套全面的调度策略可视化改造方案。以下是具体实现细节和功能描述。


二.目标

通过本次改造,我们旨在构建一个高效、可靠、易用的配置管理系统,实现以下目标:

  1. 集中化管理:将所有调度策略配置集中在一个平台,避免分散和混乱。
  2. 可视化操作:提供用户友好的界面,支持非技术人员轻松查看和修改配置。
  3. 自动化工具支持
    • 自动化策略差异对比,快速识别不一致。
    • 自动检测配置冲突,避免重复或错误配置。
  4. 实时监控:及时发现未知失败原因,快速响应问题。
  5. 批量更新:支持批量操作,简化多策略同步修改流程。
  6. 高性能与扩展性:支持大规模配置存储和高效查询,满足快速迭代需求。
  7. 灰度功能支持:通过自定义灰度实现,满足不同业务场景下的灰度需求。
  8. 告警功能支持:当发现未知失败原因时,根据配置的告警规则,自动发送邮件和短信通知到用户。

三.技术实现

1. 自定义注解扫描Bean并注册到Spring容器中

为了实现调度策略的动态加载和管理,我们设计了一个基于Spring注解扫描的机制,通过自定义注解@ScheduleStrategyScan扫描指定包路径下的策略实现类,并将其手动注册到Spring容器中。以下是具体实现:

自定义注解@ScheduleStrategyScan
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(ScheduleStrategyRegister.class)
public @interface ScheduleStrategyScan {String[] basePackages() default {}; // 扫描的基础包路径Line line(); // 产线类型
}
注册Bean到Spring容器

通过实现ImportBeanDefinitionRegistrar接口,我们可以在Spring启动时动态注册Bean。以下是ScheduleStrategyRegister的核心实现:

public class ScheduleStrategyRegister implements ImportBeanDefinitionRegistrar {private final GrabTicketLogger logger = GrabTicketLoggerFactory.getLogger(ScheduleStrategyRegister.class);@Overridepublic void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry registry) {registerBeanByClass(registry,ScheduleStrategyFactory.class);AnnotationAttributes attributes = AnnotationAttributes.fromMap(annotationMetadata.getAnnotationAttributes(ScheduleStrategyScan.class.getName()));String[] basePackages = attributes.getStringArray("basePackages");Line line = attributes.getEnum("line");for (String scanPackage : basePackages) {ClassPathScanningCandidateComponentProvider packageScanner = new ClassPathScanningCandidateComponentProvider(false);packageScanner.addIncludeFilter(new AssignableTypeFilter(IScheduleStrategy.class));Set<BeanDefinition> definitions = packageScanner.findCandidateComponents(scanPackage);registerScheduleStrategyBeans(registry, line, basePackages, definitions);}}/*** @param registry* @param line* @param scanPackages* @param scheduleGroupCandidates 注册调度策略bean* @author ljm* @date 15:51 2025/3/5**/private void registerScheduleStrategyBeans(BeanDefinitionRegistry registry, Line line, String[] scanPackages, Set<BeanDefinition> scheduleGroupCandidates) {// 获取所有可能的Bean定义Map<String, BeanDefinition> possibleBeanMap = Arrays.stream(registry.getBeanDefinitionNames()).collect(Collectors.toMap(Function.identity(), registry::getBeanDefinition));// 移除不满足条件的BeanDefinitionpossibleBeanMap.entrySet().removeIf(entry -> {String className = entry.getValue().getBeanClassName();return StringUtils.isBlank(className) || Arrays.stream(scanPackages).noneMatch(p -> StringUtils.startsWith(className, p));});// 注册新的Bean定义并移除重复的for (BeanDefinition candidate : scheduleGroupCandidates) {try {Class<?> strategyClazz = Class.forName(candidate.getBeanClassName());// 移除重复的定义(通过@Component注解的)if (AnnotationUtils.findAnnotation(strategyClazz, Component.class) != null) {possibleBeanMap.entrySet().stream().filter(entry -> Objects.equals(entry.getValue().getBeanClassName(), candidate.getBeanClassName())).forEach(entry -> registry.removeBeanDefinition(entry.getKey()));}// 根据规则生成Bean的名称String beanName = BeanHelper.mapBeanName(line, strategyClazz.getSimpleName());// 注册Bean定义registry.registerBeanDefinition(beanName, candidate);} catch (Exception e) {// 处理类未找到的异常logger.error("registerScheduleStrategyBeans", e);}}}private void registerBeanByClass(BeanDefinitionRegistry registry, Class clazz) {if (registry.containsBeanDefinition(clazz.getName())) {return;}RootBeanDefinition beanDefinition = new RootBeanDefinition(clazz);registry.registerBeanDefinition(clazz.getName(), beanDefinition);}
}

2. 定时刷新策略配置至本地内存

为了支持动态更新调度策略,我们设计了一个ScheduleRulesProvider类,通过定时任务定期从远程同步规则,并将规则刷新到本地缓存中。以下是核心实现:

public class ScheduleRulesProvider {private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("ScheduleRulesRefresher-%d").build(), new ThreadPoolExecutor.DiscardPolicy());@Getterprivate final Line line;private  Map<String, List<ScheduleStrategyRuleDto>> ruleMap = new ConcurrentHashMap<>();public ScheduleRulesProvider(Line line) {this.line = line;//定时刷新SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(this::refresh, 30, 10, TimeUnit.SECONDS);refresh();}public List<ScheduleStrategyRuleDto> getScheduleRules(String channel, String bizType) {return Collections.unmodifiableList(ruleMap.getOrDefault(composeKey(channel, bizType), Collections.emptyList()));}private void refresh() {List<ScheduleStrategyRuleDto> rules = AdapterSyncRemoteScheduleRulesManager.syncRemoteScheduleRules(line);if (CollectionUtils.isNotEmpty(rules)) {Map<String, List<ScheduleStrategyRuleDto>> result = rules.stream().collect(Collectors.groupingBy(x ->composeKey(x.getChannel(), x.getBizType())));result.forEach((k, l) -> l.sort(Comparator.comparingInt(ScheduleStrategyRuleDto::getSequence).reversed()));this.ruleMap = result;}}private String composeKey(String channel, String bizType) {String defaultChannel = StringUtils.defaultIfBlank(channel, "ALL");return String.format("%s_%s", defaultChannel, bizType);}

3. 通过原因按优先级高低匹配到执行策略类名

在众多策略中,失败原因的优先级决定了执行策略的选择。我们通过权重将策略排序,从而选择最佳的策略。

    private void refresh() {List<ScheduleStrategyRuleDto> rules = AdapterSyncRemoteScheduleRulesManager.syncRemoteScheduleRules(line);if (CollectionUtils.isNotEmpty(rules)) {Map<String, List<ScheduleStrategyRuleDto>> result = rules.stream().collect(Collectors.groupingBy(x ->composeKey(x.getChannel(), x.getBizType())));// 根据权重降序排序result.forEach((k, l) -> l.sort(Comparator.comparingInt(ScheduleStrategyRuleDto::getSequence).reversed()));this.ruleMap = result;}}

4. 通过执行策略类名找到Bean

通过从远程获取的执行策略类名调用**getStrategy(Line line, String name)**查找到具体实现类的Bean

       public class ScheduleStrategyFactory implements ApplicationContextAware {private final GrabTicketLogger logger = GrabTicketLoggerFactory.getLogger(ScheduleStrategyFactory.class);private static final Map<String, IScheduleStrategy> STRATEGY_CACHE = new ConcurrentHashMap<>();private ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {try {this.applicationContext = applicationContext;cacheScheduleStrategy(applicationContext);} catch (Exception e) {logger.error("HandlerFactory", e);}}private void cacheScheduleStrategy(ApplicationContext applicationContext) {Map<String, IScheduleStrategy> result = applicationContext.getBeansOfType(IScheduleStrategy.class);if (MapUtils.isEmpty(result)) {return;}STRATEGY_CACHE.putAll(result);}/*** @param name* @return com.ctrip.train.grabticket.common.schedule.IScheduleStrategy* @author ljm* @date 10:02 2025/3/4**/public static IScheduleStrategy getStrategy(Line line, String name) {String beanName = BeanHelper.mapBeanName(line, name);return STRATEGY_CACHE.get(beanName);}}

5. 未知原因结合NLP技术和正则匹配智能提取关键信息并保存

对于未知失败原因,我们结合NLP技术和正则匹配提取关键信息,并保存到数据库中

public class ReasonUtil {// HNLP 关键字处理,强制使用自定义字典 目录\src\resources\data\dictionary\custompublic static final Segment SEGMENT = HanLP.newSegment().enableCustomDictionaryForcing(true).enableMultithreading(true);static {// 预热字典extractKeyInformation("测试");}/*** @param text* @return java.lang.String* 提取关键信息* @author ljm* @date 17:17 2025/4/8**/public static String extractKeyInformation(String text) {if (StringUtils.isBlank(text)) {return Strings.EMPTY;}// 去除空格text = text.replace(" ", "");text = RegularUtil.replaceMatches(text, RegularUtil.ORDER_ID_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.MASKED_MOBILE_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.ID_CARD_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.MASKED_NAME_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.PASSPORT_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.MASKED_EMAIL_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.TRAIN_NUMBER_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.DATE_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.ENGLISH_NAME_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.USERNAME_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.PASSENGER_NAME_PATTERN);text = RegularUtil.replaceMatches(text, RegularUtil.PHONE_PATTERN);List<Term> termList = SEGMENT.seg(text);StringBuilder result = new StringBuilder();for (Term term : termList) {String nature = term.nature.toString();//过滤人名,站名,地名if ("nr".equals(nature) || "ns".equals(nature)) {result.append("***");continue;}result.append(term.word);}return result.toString();}
}

6. 灰度发布策略

为了支持灵活的灰度发布策略功能,我们设计了一个通用的灰度匹配接口IGrayMatchProvider,使用方通过实现该接口并注册到调度平台组件中,系统在使用灰度功能时会默认调用使用方的实现。

灰度匹配接口
public interface IGrayMatchProvider<T extends GrayMatchModel, U extends GrayConfigModel> {/*** 判断是否满足灰度* @param grayMatchModel 灰度匹配模型* @param config 灰度配置* @return 是否满足灰度*/boolean isMatch(T grayMatchModel, String config);
}
使用方实现灰度匹配逻辑
@Component
public class CustomGrayMatchProvider implements IGrayMatchProvider<MyGrayMatchModel, MyGrayConfigModel> {@Overridepublic boolean isMatch(MyGrayMatchModel grayMatchModel, String config) {MyGrayConfigModel grayConfig = parseConfig(config);return grayMatchModel.getUserId() % grayConfig.getDivisor() == 0;}private MyGrayConfigModel parseConfig(String config) {return new MyGrayConfigModel(config);}
}

7. 策略执行参数解析

在调度策略中,不同的策略可能需要不同的执行参数。为了支持动态解析这些参数,我们在策略执行类的接口中定义了一个parseConfig方法。所有具体的策略执行类都实现该接口,并通过泛型实现 JSON 配置的反序列化。

  public interface IScheduleStrategy<Config extends BaseStrategyConfig, Context extends BaseScheduleContext,R> {/*** 初始化策略配置*/Config parseConfig(String strategyConfig);/*** 执行处理逻辑*/HandleResult<R> handle(Context context);
}

8. 告警功能

为了能及时感知到未知原因没有匹配到执行策略的情况,我们设计了一个告警服务模块,当系统发现未知失败原因时,会根据配置的告警规则,自动发送邮件和短信通知到用户。以下是具体实现细节:

告警规则匹配逻辑
    private void unMatchedReasonAlert(ProcessUnKnownReasonContext processUnKnownReasonContext, String keyInformation) {SchedulePlatformAlertSettingDaoExt schedulePlatformAlertSetting = SchedulePlatformDaoFactory.getInstance().schedulePlatformAlertSetting();SchedulePlatformAlertSetting platformAlertSetting = schedulePlatformAlertSetting.queryAlertSetting(processUnKnownReasonContext.getLine().getLineName(),List.of(AlertSettingStatus.OPEN.getStatus()));if (platformAlertSetting == null) {logger.warn("NotFindPlatformAlertSetting", String.format("Line:%s,未找到告警配置", processUnKnownReasonContext.getLine().getLineName()));MetricHelper.metricUnFindAlertSetting(processUnKnownReasonContext.getLine());return;}// 发送告警IRedisClient schedulePlatformInstance = RedisClientManager.getSchedulePlatformInstance();String bizType = StringUtils.defaultIfBlank(processUnKnownReasonContext.getBizType(), "ALL");String channel = StringUtils.defaultIfBlank(processUnKnownReasonContext.getChannel(), "ALL");String key = String.format("%s_%s_%s_%s", processUnKnownReasonContext.getLine().getLineName(), bizType, channel, keyInformation);String dailyLimitKey = String.format("dailyLimit_%s_%s_%s_%s", processUnKnownReasonContext.getLine().getLineName(), bizType, channel, keyInformation);Long sendTimesForToday = schedulePlatformInstance.get(dailyLimitKey, Long.class);if (sendTimesForToday != null && sendTimesForToday > platformAlertSetting.getDailyLimit()) {logger.info("overDailyLimit", String.format("Line:%s,超出当日发送最大次数%s", processUnKnownReasonContext.getLine().getLineName(), platformAlertSetting.getDailyLimit()));return;}boolean markedSuccess = schedulePlatformInstance.setIfNullWithAtom(key, keyInformation, Long.valueOf(platformAlertSetting.getIntervalSeconds()));if (!markedSuccess) {return;}String messageBody = String.format("策略调度平台未匹配到有效执行策略\n" +"产线:%s\n" +"业务:%s\n" +"渠道:%s\n" +"订单号:%s\n" +"原始原因:%s\n" +"提取后关键信息:%s",processUnKnownReasonContext.getLine().getLineName(), bizType, channel, processUnKnownReasonContext.getOrderNumber(),processUnKnownReasonContext.getOriginReason(), keyInformation);// 对当天发送次数进行计数schedulePlatformInstance.incrWithExpire(dailyLimitKey,DateUtils.ldtToUnixTime(LocalDateTime.now().with(LocalTime.MAX)));SmsUtils.send(platformAlertSetting.getNotifyMobiles(), messageBody);if (StringUtils.isBlank(platformAlertSetting.getNotifyEmails())) {return;}messageBody = String.format("策略调度平台未匹配到有效执行策略<br>" +"产线:%s<br>" +"业务:%s<br>" +"渠道:%s<br>" +"订单号:%s<br>" +"原始原因:%s<br>" +"提取后关键信息:%s",processUnKnownReasonContext.getLine().getLineName(), bizType, channel, processUnKnownReasonContext.getOrderNumber(),processUnKnownReasonContext.getOriginReason(), keyInformation);List<String> emailList = Splitter.on(";").splitToStream(platformAlertSetting.getNotifyEmails()).toList();EmailHelper.send(emailList, "策略调度平台", messageBody);}

四. 功能整合与性能优化

1. 功能点整合与协同

在调度策略可视化改造方案中,各个功能点通过紧密的协同工作,共同支撑系统的高效运行。以下是各功能点的整合与协同机制:

  • 配置管理与动态加载的协同
    • 通过数据库表存储调度策略配置,结合自定义注解扫描和动态注册机制,实现了配置的集中化管理和动态加载。配置的修改无需重启服务,系统通过定时任务定期刷新规则,确保最新配置能够实时生效。
    • 在配置加载过程中,系统会自动检测冲突或重复配置,避免人工校验的遗漏。通过自动化工具支持,用户可以快速对比策略差异,确保多策略同步修改的一致性。
  • 失败原因匹配与灰度发布的协同
    • 通过借鉴Mysql LIKE 查询逻辑实现失败原因的模糊匹配,结合优先级排序机制,确保系统能够快速找到最佳的调度策略。
    • 在匹配到调度策略后,系统会根据灰度匹配接口的实现,判断是否满足灰度条件。灰度功能的引入使得新策略可以在小范围内逐步推广,降低风险。
  • 未知原因处理与告警机制的协同
    • 对于未匹配到的失败原因,系统会结合 NLP 技术和正则匹配提取关键信息,并将其保存到数据库中,为后续分析提供数据支持。
    • 当发现未知失败原因时,系统会根据配置的告警规则,自动发送邮件和短信通知到相关人员,确保问题能够及时被感知和处理。

2. 系统性能与稳定性优化

为了确保调度策略可视化平台的高效运行和稳定性,我们在系统设计中引入了多项性能优化和稳定性保障措施:

  • 高性能规则匹配
    • 通过将规则加载到内存中,避免了频繁的数据库查询操作,显著提升了匹配性能。
    • 在规则加载时,系统会根据优先级对规则进行排序,确保高优先级规则优先匹配,减少不必要的匹配计算。
  • 稳定性保障
    • 在定时刷新规则的任务中,系统引入了异常捕获机制,确保任务失败时不会影响系统的正常运行。
    • 在告警功能中,系统通过 Redis 实现了限流和降级机制,避免因告警消息过多导致系统资源耗尽。
  • 扩展性设计
    • 系统采用模块化设计,各个功能点(如配置管理、失败原因匹配、灰度发布、告警机制等)相互独立,便于后续扩展和维护。
    • 通过通用的灰度匹配接口,系统支持不同业务场景下的灰度需求,满足多样化的业务需求。

五. 总结

通过本次调度策略可视化改造,我们成功解决了现有配置管理中的痛点问题,显著提升了系统的可维护性和可扩展性。具体成果包括:

  • 集中化管理:所有配置集中存储在数据库中,避免分散和混乱。
  • 可视化操作:用户友好的界面支持非技术人员轻松操作。
  • 自动化工具支持:自动化差异对比和冲突检测功能,大幅提升操作效率。
  • 实时监控:及时发现未知失败原因,保障系统稳定性。
  • 批量更新:简化多策略同步修改流程,提高工作效率。
  • 灰度功能支持:通过 IGrayMatchProvider 接口,支持灵活的灰度实现,满足不同业务场景下的灰度需求。
  • 告警功能支持:通过配置的告警规则,自动发送邮件和短信通知到用户,确保问题能够及时被感知和处理。

未来,我们将继续优化平台功能,探索更多智能化工具,为调度系统的稳定运行和高效管理提供更强有力的支持

http://www.xdnf.cn/news/553519.html

相关文章:

  • MySQL基础关键_014_MySQL 练习题
  • KeepassXC (Win10) 打不开的解决方法
  • Nginx笔记
  • 开疆智能Profinet转RS485网关连接电磁流量计到西门子PLC配置案例
  • STM32--串口函数
  • 随机数种子seed和相关系数ρ
  • vue3 + echarts(5.6.0)实现渐变漏斗图
  • vue2实现【瀑布流布局】
  • 粤港澳编程题
  • 【HTML-2】HTML 标题标签:构建网页结构的基础
  • Tomcat配置详情
  • 解码数据语言:如何优雅的进行数仓字典建设?
  • C++:迭代器
  • C++数据结构——红黑树
  • 如何使用通义灵码辅助开发鸿蒙OS - AI编程助手提升效率
  • centos7配置静态ip 网关 DNS
  • 数据实时同步:inotify + rsync 实现数据实时同步
  • 《深入理解指针数组:创建与使用指南》
  • 【C/C++】static关键字的作用
  • 计算机图形学Games101笔记--几何
  • 计算机视觉与深度学习 | matlab实现ARIMA-WOA-CNN-LSTM时间序列预测(完整源码和数据)
  • VMD查看蛋白质-配体的分子动力学模拟轨迹
  • 【Redis实战篇】达人探店
  • Golang的代码注释规范与实践
  • 机器学习第十八讲:混淆矩阵 → 诊断模型在医疗检查中的误诊情况
  • 33、魔法防御术——React 19 安全攻防实战
  • 每日算法刷题Day11 5.20:leetcode不定长滑动窗口求最长/最大6道题,结束不定长滑动窗口求最长/最大,用时1h20min
  • AMO——下层RL与上层模仿相结合的自适应运动优化:让人形行走操作(loco-manipulation)兼顾可行性和动力学约束
  • 【优秀三方库研读】在 quill 开源库中 QUILL_MAGIC_SEPARATOR 的作用是什么,解决了什么问题
  • 【爬虫】12306自动化购票