Go语言手搓协程池
协程池介绍
协程池简单理解就是有一个池子一样的东西,里面装着固定数量的goroutine,当有一个任务到来的时候,会将这个任务交给池子里的一个空闲的goroutine去处理,如果池子里没有空闲的goroutine了,任务就会阻塞等待。所以协程池有三个角色Worker,Task,Pool。
相关角色和方法
协程池所有的角色:task 任务,poll 池子,worker 任务执行单元
属性定义:
Task:具体的任务
Pool:池子
worker:用于执行任务的 goroutine
方法定义:
-
NewTask:创建任务
-
NewPool:创建协程池
-
AddTask:向协程池添加任务
-
run:worker 的逻辑,开始执行
-
incRunning:增加 worker 的数量
-
decRunning:减少 worker 的数量
-
getRunningWorkers:获得当前正在工作的 worker 数量
-
getCap:获得当前 worker 的容量
注意:用 for range 遍历 channel 时,若通道未关闭且无数据就会被阻塞;若通道已关闭且无数据则不会阻塞,会直接退出循环。
Coding
package mainimport ("fmt""sync""sync/atomic""time"
)type Task struct {f func() error //具体的任务逻辑
}type Pool struct {RunningWorkers int64 //运行着的 worker 数量Capacity int64 //协程池 worker 数量JobCh chan *Task //任务存放的位置sync.Mutex
}func NewTask(funcArg func() error) *Task {return &Task{f: funcArg,}
}func NewPool(capacity int64, taskNum int64) *Pool {return &Pool{Capacity: capacity,JobCh: make(chan *Task, taskNum),}
}func (p *Pool) GetCap() int64 {return p.Capacity
}func (p *Pool) incRunning() {atomic.AddInt64(&p.RunningWorkers, 1)
}func (p *Pool) decRunning() {atomic.AddInt64(&p.RunningWorkers, -1)
}func (p *Pool) getRunningWorkers() int64 {return atomic.LoadInt64(&p.RunningWorkers)
}func (p *Pool) run() {p.incRunning()go func() {defer func() {p.decRunning()}()// 如果 channel 没有 task,并且也没有被 close()// 那么会一直阻塞在这里,为了正确的退出,我们在 main 函数里设置了 3s 的等待时间,确保程序正确退出for task := range p.JobCh {task.f()}}()
}// AddTask 往协程池添加任务
func (p *Pool) AddTask(task *Task) {//加锁,防止启动多个 worker,导致下面的判断出问题p.Lock()defer p.Unlock()if p.getRunningWorkers() < p.GetCap() {//启动创建一个 workerp.run()}//将任务放入 channel,等待消费p.JobCh <- task
}func main() {//创建协程池:3个协程,容量为 10 的任务队列 channelpool := NewPool(3, 10)for i := 0; i < 20; i++ {//将任务放入池中pool.AddTask(NewTask(func() error {fmt.Printf("I am Task \n")return nil}))}//确保 20 个任务都执行完成time.Sleep(time.Second * 3)
}