当前位置: 首页 > ds >正文

Go语言手搓协程池

协程池介绍

协程池简单理解就是有一个池子一样的东西,里面装着固定数量的goroutine,当有一个任务到来的时候,会将这个任务交给池子里的一个空闲的goroutine去处理,如果池子里没有空闲的goroutine了,任务就会阻塞等待。所以协程池有三个角色Worker,Task,Pool。

相关角色和方法

协程池所有的角色:task 任务,poll 池子,worker 任务执行单元

属性定义:

Task:具体的任务

Pool:池子

worker:用于执行任务的 goroutine

方法定义:

  • NewTask:创建任务

  • NewPool:创建协程池

  • AddTask:向协程池添加任务

  • run:worker 的逻辑,开始执行

  • incRunning:增加 worker 的数量

  • decRunning:减少 worker 的数量

  • getRunningWorkers:获得当前正在工作的 worker 数量

  • getCap:获得当前 worker 的容量

注意:用 for range 遍历 channel 时,若通道未关闭且无数据就会被阻塞;若通道已关闭且无数据则不会阻塞,会直接退出循环。

Coding

package mainimport ("fmt""sync""sync/atomic""time"
)type Task struct {f func() error //具体的任务逻辑
}type Pool struct {RunningWorkers int64      //运行着的 worker 数量Capacity       int64      //协程池 worker 数量JobCh          chan *Task //任务存放的位置sync.Mutex
}func NewTask(funcArg func() error) *Task {return &Task{f: funcArg,}
}func NewPool(capacity int64, taskNum int64) *Pool {return &Pool{Capacity: capacity,JobCh:    make(chan *Task, taskNum),}
}func (p *Pool) GetCap() int64 {return p.Capacity
}func (p *Pool) incRunning() {atomic.AddInt64(&p.RunningWorkers, 1)
}func (p *Pool) decRunning() {atomic.AddInt64(&p.RunningWorkers, -1)
}func (p *Pool) getRunningWorkers() int64 {return atomic.LoadInt64(&p.RunningWorkers)
}func (p *Pool) run() {p.incRunning()go func() {defer func() {p.decRunning()}()// 如果 channel 没有 task,并且也没有被 close()// 那么会一直阻塞在这里,为了正确的退出,我们在 main 函数里设置了 3s 的等待时间,确保程序正确退出for task := range p.JobCh {task.f()}}()
}// AddTask 往协程池添加任务
func (p *Pool) AddTask(task *Task) {//加锁,防止启动多个 worker,导致下面的判断出问题p.Lock()defer p.Unlock()if p.getRunningWorkers() < p.GetCap() {//启动创建一个 workerp.run()}//将任务放入 channel,等待消费p.JobCh <- task
}func main() {//创建协程池:3个协程,容量为 10 的任务队列 channelpool := NewPool(3, 10)for i := 0; i < 20; i++ {//将任务放入池中pool.AddTask(NewTask(func() error {fmt.Printf("I am Task \n")return nil}))}//确保 20 个任务都执行完成time.Sleep(time.Second * 3)
}

http://www.xdnf.cn/news/2582.html

相关文章:

  • 11前端项目总结----详情页放大镜和轮播图
  • 基于STM32、HAL库的HX711模数转换器ADC驱动程序设计
  • TV Launcher汉化版下载-TV Launcher启动器极简pro下载
  • 【Misc】PNG宽高修改 - PNG图片宽高CRC爆破
  • 消息中间件
  • 传统行业的数字化转型:如何通过RTMP推流技术提升实时直播体验
  • Spring MVC 请求映射处理:@RequestMapping 与 @Pathvariable
  • H5实现一个二维码生成器页面
  • 华为OD机试真题——阿里巴巴找黄金宝箱Ⅰ(2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳实现
  • MySQL 存储引擎与服务体系深度解析
  • 登高架设作业指的是什么?有什么安全操作规程?
  • 基于QT(C++)实现数字图像处理—Canny边缘检测
  • 【WEB3】web3.0是什么
  • FreeMarker语法深度解析与Node.js集成实践指南
  • 衡石科技:HENGSHI SENSE 数据权限解决方案
  • Shadertoy着色器移植到Three.js经验总结
  • 【Linux系统】详解Linux权限
  • AI工作流自动化与智能应用开发平台
  • WEB服务器的部署及优化
  • 线上JVM调优与全栈性能优化 - Java架构师面试实战
  • DataStreamAPI实践原理——快速上手
  • 学习笔记—双指针算法—移动零
  • [原创](现代Delphi 12指南):[macOS 64bit App开发]: NSString类型与CFStringRef类型字符串相互转换.
  • 通过数据增强打造抗噪音多模态大模型
  • MySQL 大数据量分页查询优化指南
  • Git 撤回合并提交
  • WPF之XAML基础
  • AlexNet网络搭建
  • OneNet云平台
  • java16