当前位置: 首页 > news >正文

设计一套流程引擎队列分发器

一、组件概述

ProcessQueueDistributor 是一个流程任务分发器,用于从流程消息队列中分配流程实例任务,并交由线程池调度执行。其主要职责是:

  • 定时轮询流程任务队列;

  • 控制任务执行并发度;

  • 管理运行中任务状态;

  • 在任务执行完毕后自动清理与唤醒等待线程。

 

二、关键设计目标

目标说明
支持异步并发执行使用线程池对多个流程任务并发执行,提高系统吞吐能力
控制最大并发数使用 Semaphore 限制正在执行的任务数量,避免线程池资源耗尽
保证线程安全所有共享状态通过线程安全容器或同步机制保护
自动唤醒等待

任务完成时通过 semaphore.release() 唤醒等待线程

 

三、核心类与职责

1. ProcessQueueDistributor

  • 职责:主调度类,用于从消息中心拉取任务并分发到线程池中执行。

  • 关键字段

    • threadPoolExecutor:线程池,用于执行流程任务。

    • semaphoreForCheck:信号量,控制“是否继续分发”的阻塞与唤醒。

    • runningTasks:Set,记录当前正在执行的流程任务 Holder。

  • 核心方法

    • start():启动调度器主循环。

    • checkRemainTask():检查是否仍有任务执行中。

    • distribute():尝试从消息中心拉取任务,构造执行单元并提交。

    • onShutdown():优雅关闭执行线程。

2. TaskRunnable implements Runnable

  • 职责:流程任务的实际执行体。

  • 执行流程

    1. 获取流程任务;

    2. 执行流程;

    3. 任务完成后,清除 runningTasks 中记录;

    4. 调用 semaphoreForCheck.release() 唤醒阻塞等待线程。

  • finally 关键逻辑

finally {runningTasks.remove(currentHolder);semaphoreForCheck.release();  // 关键点:通知主循环继续调度
}

四、工作机制详解

 

五、信号量机制详解(semaphoreForCheck)

  • 目的:避免空轮询、浪费 CPU 资源;

  • 用法

    • runningTasks 非空时,调用 semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS) 暂时阻塞;

    • 等待线程池中某个任务执行完毕,调用 semaphore.release() 释放许可;

    • 释放后主线程被唤醒,重新检查是否有任务要执行;

  • 风险点

    • 若某个流程任务异常中断,未进入 finally 执行 release(),将导致调度线程长时间阻塞。

六、运行中任务管理

  • runningTasks 用于记录当前正在处理的流程任务;

  • 类型一般为 ConcurrentHashSet<ProcessHolder>

  • 在任务开始前加入,在 TaskRunnablefinally 中移除。

 

七、线程池配置建议

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,TimeUnit.SECONDS,new LinkedBlockingQueue<>(queueCapacity),new ThreadFactoryBuilder().setNameFormat("process-runner-%d").build(),new ThreadPoolExecutor.AbortPolicy()
);

建议设置:

  • 合理的 corePoolSizequeueCapacity 以控制任务积压;

  • 自定义线程名便于排查问题;

  • 若有外部监控系统,可添加线程池监控指标采集。

八、总结

ProcessQueueDistributor 是流程引擎中的调度中枢,设计合理地控制任务调度频率与并发性。通过 Semaphore 实现任务完成通知,是对传统线程池轮询机制的优化。配合 MessageCenterTaskRunnable,可实现稳定、高性能的分布式流程调度能力。

 

http://www.xdnf.cn/news/928405.html

相关文章:

  • 2025年AI编程工具推荐
  • 外部排序全解析:从基础到优化策略(王道)
  • go工具库:hertz api框架 hertz client的使用
  • 无线网络扫描与分析工具 LizardSystems Wi-Fi Scanner 25.05
  • 【python深度学习】Day 47 注意力热图可视化
  • 蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析
  • transformers 的Trainer的用法
  • Cloudflare 免费域名邮箱 支持 Catch-all 无限别名收件
  • JAVA理论第四战-线程池
  • 【AI论文】反思、重试、奖励:通过强化学习实现大型语言模型的自我提升
  • archlinux中使用 Emoji 字体
  • keil 5打开编译keil 4解决方案,兼容exe查找下载
  • 编程关键字
  • 【区块链基础】区块链的 Fork(分叉)深度解析:原理、类型、历史案例及共识机制的影响
  • 分类与扩展
  • 【推荐算法】推荐算法演进史:从协同过滤到深度强化学习
  • 「Java基本语法」代码格式与注释规范
  • 第二十七课:手搓梯度提升树
  • AI掘金时代:探讨如何用价值杠杆撬动付费用户增长
  • 记录下three.js学习过程中不理解问题①
  • 测试(面经 八股)
  • 《真假信号》速读笔记
  • Python爬虫实战:研究Unirest库相关技术
  • 王劲松《人民日报》撰文 重读抗战家书不忘来时路
  • Windows小说阅读软件推荐
  • Linux 文件系统核心:inode 与 block 深度解析(附实战案例与源码级原理)
  • 618来了,推荐京东云服务器
  • ROS C++ 实现消息通信与服务通信
  • 交叉熵损失函数和极大似然估计是什么,区别是什么
  • 关于队列的使用