当前位置: 首页 > news >正文

蓝凌的流程引擎队列分发器

一、组件概述

ProcessQueueDistributor 是一个流程任务分发器,用于从流程消息队列中分配流程实例任务,并交由线程池调度执行。其主要职责是:

  • 定时轮询流程任务队列;

  • 控制任务执行并发度;

  • 管理运行中任务状态;

  • 在任务执行完毕后自动清理与唤醒等待线程。

二、关键设计目标

目标说明
支持异步并发执行使用线程池对多个流程任务并发执行,提高系统吞吐能力
控制最大并发数使用 Semaphore 限制正在执行的任务数量,避免线程池资源耗尽
保证线程安全所有共享状态通过线程安全容器或同步机制保护
自动唤醒等待

任务完成时通过 semaphore.release() 唤醒等待线程

三、核心类与职责

1. ProcessQueueDistributor

  • 职责:主调度类,用于从消息中心拉取任务并分发到线程池中执行。

  • 关键字段

    • threadPoolExecutor:线程池,用于执行流程任务。

    • semaphoreForCheck:信号量,控制“是否继续分发”的阻塞与唤醒。

    • runningTasks:Set,记录当前正在执行的流程任务 Holder。

  • 核心方法

    • start():启动调度器主循环。

    • checkRemainTask():检查是否仍有任务执行中。

    • distribute():尝试从消息中心拉取任务,构造执行单元并提交。

    • onShutdown():优雅关闭执行线程。

2. TaskRunnable implements Runnable

  • 职责:流程任务的实际执行体。

  • 执行流程

    1. 获取流程任务;

    2. 执行流程;

    3. 任务完成后,清除 runningTasks 中记录;

    4. 调用 semaphoreForCheck.release() 唤醒阻塞等待线程。

  • finally 关键逻辑

finally {runningTasks.remove(currentHolder);semaphoreForCheck.release();  // 关键点:通知主循环继续调度
}

四、工作机制详解

五、信号量机制详解(semaphoreForCheck)

  • 目的:避免空轮询、浪费 CPU 资源;

  • 用法

    • runningTasks 非空时,调用 semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS) 暂时阻塞;

    • 等待线程池中某个任务执行完毕,调用 semaphore.release() 释放许可;

    • 释放后主线程被唤醒,重新检查是否有任务要执行;

  • 风险点

    • 若某个流程任务异常中断,未进入 finally 执行 release(),将导致调度线程长时间阻塞。

六、运行中任务管理

  • runningTasks 用于记录当前正在处理的流程任务;

  • 类型一般为 ConcurrentHashSet<ProcessHolder>

  • 在任务开始前加入,在 TaskRunnablefinally 中移除。

七、线程池配置建议

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,TimeUnit.SECONDS,new LinkedBlockingQueue<>(queueCapacity),new ThreadFactoryBuilder().setNameFormat("process-runner-%d").build(),new ThreadPoolExecutor.AbortPolicy()
);

建议设置:

  • 合理的 corePoolSizequeueCapacity 以控制任务积压;

  • 自定义线程名便于排查问题;

  • 若有外部监控系统,可添加线程池监控指标采集。

八、总结

ProcessQueueDistributor 是流程引擎中的调度中枢,设计合理地控制任务调度频率与并发性。通过 Semaphore 实现任务完成通知,是对传统线程池轮询机制的优化。配合 MessageCenterTaskRunnable,可实现稳定、高性能的分布式流程调度能力。

http://www.xdnf.cn/news/923023.html

相关文章:

  • Python whl安装包简介与制作完全指南
  • 【优选算法】前缀和
  • Windows 下端口占用排查与释放全攻略
  • LeetCode-413. 等差数列划分
  • Go深入学习延迟语句
  • 【QT】输入类控件 详解
  • 嵌入式里的时间魔法:RTC 与 BKP 深度拆解
  • 数据通信基础
  • 迷宫问题(一)(C++版本)
  • @ExceptionHandler 默认无法拦截 Aspect(切面)中抛出的异常
  • centos7编译安装LNMP架构
  • docker安装RabbitMQ
  • 一些因子的解释
  • 人工智能--AI换脸
  • iview框架主题色的应用
  • WebWorker-----高频面试题(浏览器篇)
  • 【每天一道算法题】用JavaScript实现的字符串比较算法
  • 【云架构】
  • 后端下载限速(redis记录实时并发,bucket4j动态限速)
  • Java 常用 API 分类总结(算法竞赛考前速记篇)- 适用于算法竞赛(如 CCF CSP、蓝桥杯、NOI)
  • 【PhysUnits】15.17 比例因子模块 (ratio.rs)
  • 河南建筑安全员B证考试最新精选题
  • Python 函数全攻略:函数基础
  • JavaSec-SpringBoot框架
  • JAVA理论第三章-多线程
  • Python实例题:Python计算微积分
  • 2025年06月07日Github流行趋势
  • go语言学习 第9章:映射(Map)
  • 推客系统小程序开发:告别低效推广,开启精准获客新时代
  • C++课设:实现简易文件加密工具(凯撒密码、异或加密、Base64编码)