蓝凌的流程引擎队列分发器
一、组件概述
ProcessQueueDistributor
是一个流程任务分发器,用于从流程消息队列中分配流程实例任务,并交由线程池调度执行。其主要职责是:
-
定时轮询流程任务队列;
-
控制任务执行并发度;
-
管理运行中任务状态;
-
在任务执行完毕后自动清理与唤醒等待线程。
二、关键设计目标
目标 | 说明 |
---|---|
支持异步并发执行 | 使用线程池对多个流程任务并发执行,提高系统吞吐能力 |
控制最大并发数 | 使用 Semaphore 限制正在执行的任务数量,避免线程池资源耗尽 |
保证线程安全 | 所有共享状态通过线程安全容器或同步机制保护 |
自动唤醒等待 | 任务完成时通过 |
三、核心类与职责
1. ProcessQueueDistributor
-
职责:主调度类,用于从消息中心拉取任务并分发到线程池中执行。
-
关键字段:
-
threadPoolExecutor
:线程池,用于执行流程任务。 -
semaphoreForCheck
:信号量,控制“是否继续分发”的阻塞与唤醒。 -
runningTasks
:Set,记录当前正在执行的流程任务 Holder。
-
-
核心方法:
-
start()
:启动调度器主循环。 -
checkRemainTask()
:检查是否仍有任务执行中。 -
distribute()
:尝试从消息中心拉取任务,构造执行单元并提交。 -
onShutdown()
:优雅关闭执行线程。
-
2. TaskRunnable implements Runnable
-
职责:流程任务的实际执行体。
-
执行流程:
-
获取流程任务;
-
执行流程;
-
任务完成后,清除
runningTasks
中记录; -
调用
semaphoreForCheck.release()
唤醒阻塞等待线程。
-
-
finally 关键逻辑:
finally {runningTasks.remove(currentHolder);semaphoreForCheck.release(); // 关键点:通知主循环继续调度
}
四、工作机制详解
五、信号量机制详解(semaphoreForCheck)
-
目的:避免空轮询、浪费 CPU 资源;
-
用法:
-
当
runningTasks
非空时,调用semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS)
暂时阻塞; -
等待线程池中某个任务执行完毕,调用
semaphore.release()
释放许可; -
释放后主线程被唤醒,重新检查是否有任务要执行;
-
-
风险点:
-
若某个流程任务异常中断,未进入
finally
执行release()
,将导致调度线程长时间阻塞。
-
六、运行中任务管理
-
runningTasks
用于记录当前正在处理的流程任务; -
类型一般为
ConcurrentHashSet<ProcessHolder>
; -
在任务开始前加入,在
TaskRunnable
的finally
中移除。
七、线程池配置建议
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,TimeUnit.SECONDS,new LinkedBlockingQueue<>(queueCapacity),new ThreadFactoryBuilder().setNameFormat("process-runner-%d").build(),new ThreadPoolExecutor.AbortPolicy()
);
建议设置:
-
合理的
corePoolSize
和queueCapacity
以控制任务积压; -
自定义线程名便于排查问题;
-
若有外部监控系统,可添加线程池监控指标采集。
八、总结
ProcessQueueDistributor
是流程引擎中的调度中枢,设计合理地控制任务调度频率与并发性。通过 Semaphore
实现任务完成通知,是对传统线程池轮询机制的优化。配合 MessageCenter
和 TaskRunnable
,可实现稳定、高性能的分布式流程调度能力。