当前位置: 首页 > web >正文

异步并发控制代码详细分析

异步并发控制代码详细分析

完整代码

/*** 控制异步请求并发数的简单函数式实现* @param {Array<Function>} tasks - 异步任务数组,每个任务都是返回Promise的函数* @param {number} limit - 最大并发数* @returns {Promise<Array>} - 所有任务的结果数组(按照添加顺序)*/
function limitConcurrency(tasks, limit) {// 任务结果数组(保持与tasks相同顺序)const results = new Array(tasks.length);// 记录已完成的任务数量let completedCount = 0;// 当前正在执行的任务数量let runningCount = 0;// 下一个要执行的任务索引let nextIndex = 0;return new Promise((resolve) => {// 定义执行下一批任务的函数function runNextTasks() {// 当所有任务都已完成时,返回结果if (completedCount === tasks.length) {resolve(results);return;}// 尝试启动新任务,直到达到并发上限或任务都已分配while (runningCount < limit && nextIndex < tasks.length) {const taskIndex = nextIndex++;const task = tasks[taskIndex];// 增加运行计数runningCount++;// 执行任务并处理结果Promise.resolve(task()).then(result => {// 保存结果到对应位置results[taskIndex] = result;console.log(taskIndex,'taskIndextaskIndex');completedCount++;runningCount--;// 尝试执行更多任务runNextTasks();}).catch(error => {// 错误处理:记录错误并继续results[taskIndex] = { error };completedCount++;runningCount--;// 尝试执行更多任务runNextTasks();});}}// 开始执行任务runNextTasks();});
}// 使用示例:控制5个setTimeout的并发执行
const tasks = [() => new Promise(resolve => setTimeout(() => {console.log('任务1完成');resolve('结果1');}, 5000)),() => new Promise(resolve => setTimeout(() => {console.log('任务2完成');resolve('结果2');}, 1000)),() => new Promise(resolve => setTimeout(() => {console.log('任务3完成');resolve('结果3');}, 1000)),() => new Promise(resolve => setTimeout(() => {console.log('任务4完成');resolve('结果4');}, 1800)),() => new Promise(resolve => setTimeout(() => {console.log('任务5完成');resolve('结果5');}, 1200))
];// 限制并发数为2
limitConcurrency(tasks, 2).then(results => {console.log('所有任务完成,结果:', results);
});

详细执行流程分析

初始状态

  • tasks.length = 5(索引: 0, 1, 2, 3, 4)
  • limit = 2
  • results = [empty × 5]
  • completedCount = 0
  • runningCount = 0
  • nextIndex = 0

第一次调用 runNextTasks()

检查完成条件
if (completedCount === tasks.length) // 0 === 5 → false,继续执行
while循环 - 第一轮
while (runningCount < limit && nextIndex < tasks.length)
// 0 < 2 && 0 < 5 → true,进入循环// 第一次循环:
const taskIndex = nextIndex++; // taskIndex = 0, nextIndex = 1
const task = tasks[taskIndex]; // task = tasks[0](5秒任务)
runningCount++; // runningCount = 1// 启动 taskIndex=0 的任务(5秒延时)
Promise.resolve(task()).then(...)
while循环 - 第二轮
while (runningCount < limit && nextIndex < tasks.length)
// 1 < 2 && 1 < 5 → true,继续循环// 第二次循环:
const taskIndex = nextIndex++; // taskIndex = 1, nextIndex = 2
const task = tasks[taskIndex]; // task = tasks[1](1秒任务)
runningCount++; // runningCount = 2// 启动 taskIndex=1 的任务(1秒延时)
Promise.resolve(task()).then(...)
while循环 - 第三轮
while (runningCount < limit && nextIndex < tasks.length)
// 2 < 2 && 2 < 5 → false,退出while循环

此时状态:

  • 正在执行:taskIndex=0(5秒)、taskIndex=1(1秒)
  • 等待队列:taskIndex=2、taskIndex=3、taskIndex=4
  • runningCount = 2(已达上限)
  • nextIndex = 2

1秒后 - taskIndex=1 完成

Promise.then 回调执行
.then(result => {results[taskIndex] = result; // results[1] = '结果2'console.log(taskIndex,'taskIndextaskIndex'); // 输出: 1 taskIndextaskIndexcompletedCount++; // completedCount = 1runningCount--;   // runningCount = 1(释放一个槽位)runNextTasks();   // 递归调用
})
递归调用 runNextTasks()
// 检查完成条件
if (completedCount === tasks.length) // 1 === 5 → false// while循环
while (runningCount < limit && nextIndex < tasks.length)
// 1 < 2 && 2 < 5 → trueconst taskIndex = nextIndex++; // taskIndex = 2, nextIndex = 3
const task = tasks[taskIndex]; // task = tasks[2](1秒任务)
runningCount++; // runningCount = 2// 启动 taskIndex=2 的任务

此时状态:

  • 正在执行:taskIndex=0(还剩4秒)、taskIndex=2(1秒)
  • 等待队列:taskIndex=3、taskIndex=4
  • completedCount = 1
  • runningCount = 2
  • nextIndex = 3

2秒后 - taskIndex=2 完成

类似地,taskIndex=2 完成后会:

  1. results[2] = '结果3'
  2. completedCount = 2
  3. runningCount = 1
  4. 递归调用启动 taskIndex=3

3.8秒后 - taskIndex=3 完成

taskIndex=3(1800ms)完成后:

  1. results[3] = '结果4'
  2. completedCount = 3
  3. runningCount = 1
  4. 递归调用启动 taskIndex=4

5秒后 - taskIndex=0 完成

taskIndex=0(5000ms)完成后:

  1. results[0] = '结果1'
  2. completedCount = 4
  3. runningCount = 1
  4. 递归调用,但 nextIndex=5,无新任务启动

5.2秒后 - taskIndex=4 完成

taskIndex=4(1200ms,从3.8秒开始)完成后:

  1. results[4] = '结果5'
  2. completedCount = 5
  3. runningCount = 0
  4. 递归调用检查完成条件:completedCount === tasks.length5 === 5 → true
  5. 调用 resolve(results)

时间轴图示

时间 | 正在执行的任务              | 完成的任务 | 等待队列
-----|---------------------------|-----------|------------------
T0   | taskIndex=0, taskIndex=1  |           | 2,3,4
T1   | taskIndex=0, taskIndex=2  | 1         | 3,4
T2   | taskIndex=0, taskIndex=3  | 1,2       | 4
T3.8 | taskIndex=0, taskIndex=4  | 1,2,3     |
T5   | taskIndex=4               | 1,2,3,0   |
T5.2 | 无                        | 1,2,3,0,4 | → resolve

关键变量状态变化

时间点nextIndexrunningCountcompletedCount说明
初始000开始状态
启动后220启动了taskIndex=0,1
T1321taskIndex=1完成,启动taskIndex=2
T2422taskIndex=2完成,启动taskIndex=3
T3.8523taskIndex=3完成,启动taskIndex=4
T5514taskIndex=0完成
T5.2505taskIndex=4完成,resolve

控制台输出顺序

1 taskIndextaskIndex     // T1: taskIndex=1完成
任务2完成
2 taskIndextaskIndex     // T2: taskIndex=2完成
任务3完成
3 taskIndextaskIndex     // T3.8: taskIndex=3完成
任务4完成
0 taskIndextaskIndex     // T5: taskIndex=0完成
任务1完成
4 taskIndextaskIndex     // T5.2: taskIndex=4完成
任务5完成
所有任务完成,结果: ['结果1', '结果2', '结果3', '结果4', '结果5']

核心机制总结

  1. 并发槽位管理:通过 runningCountlimit 控制同时执行的任务数
  2. 动态调度:每个任务完成时立即尝试启动新任务(递归调用 runNextTasks()
  3. 顺序保证:使用闭包中的 taskIndex 确保结果按原数组顺序存储
  4. 流水线执行:不等待批次完成,而是任务完成即补充,最大化并发效率

这种设计实现了受限并发的流水线处理,既控制了资源使用,又保证了高效执行和结果有序性。

http://www.xdnf.cn/news/10028.html

相关文章:

  • (c++)string的模拟实现
  • 【Office】Excel两列数据比较方法总结
  • 基于大模型预测的FicatIII-IV期股骨头坏死综合治疗研究报告
  • 多模态大语言模型arxiv论文略读(100)
  • LNMP环境中php7.2升级到php7.4
  • Android Native 之 adbd进程分析
  • 视频监控汇聚平台EasyCVR安防小知识:如何通过视频融合平台解决信息孤岛问题?
  • @Pushgateway 数据自动清理
  • 碰一碰发视频系统--基于H5场景开发
  • 选择if day5
  • QPS 和 TPS 详解
  • 竞争加剧,美团的战略升维:反内卷、科技与全球化
  • C++ 游戏开发详细流程
  • 大规模JSON反序列化性能优化实战:Jackson vs FastJSON深度对比与定制化改造
  • Elasticsearch 分析器介绍
  • Camera相机人脸识别系列专题分析之六:MTK ISP6S平台人脸识别fdnode流程FdNodeImp.cpp详解
  • Xamarin劝退之踩坑笔记
  • 苹果签名!
  • 数据清理的例子
  • 【仿生机器人】仿生机器人认知-情感系统架构设计报告
  • 【Java工程师面试全攻略】Day4:JVM原理与性能调优深度解析
  • 达梦数据库:同1台服务器如何启动不同版本的DMAP服务
  • 【Docker管理工具】部署Docker管理面板DweebUI
  • 思维革命:DeepSeek-R1-0528 如何用一次小更新颠覆大模型格局
  • 每日算法-250530
  • 企业级安全实践:SSL/TLS 加密与权限管理(二)
  • 支持功能安全ASIL-B的矩阵管理芯片IS32LT3365,助力ADB大灯系统轻松实现功能安全等级
  • Tailwind CSS 实战:基于 Kooboo 构建 AI 对话框页面(五):语音合成输出与交互增强
  • JVM 性能调优
  • Day40打卡 @浙大疏锦行