Golang WaitGroup 用法 源码阅读笔记
使用
sync.WaitGroup
可以用来阻塞等待一组并发任务完成
下面是如何使用sync.WaitGroup的使用
最重要的就是不能并发调用Add()
和Wait()
var wg sync.WaitGroupfor ... {wg.Add(1) // 不能和wg.Wait()并发执行go func() {// 不能在启动的函数里面执行wg.Add(), 否则会panicdefer wg.Done()// dosomething} ()
}wg.Wait()
下面是官方示例
package mainimport ("sync"
)type httpPkg struct{}func (httpPkg) Get(url string) {}var http httpPkgfunc main() {var wg sync.WaitGroupvar urls = []string{"http://www.golang.org/","http://www.google.com/","http://www.example.com/",}for _, url := range urls {// Increment the WaitGroup counter.wg.Add(1)// Launch a goroutine to fetch the URL.go func(url string) {// Decrement the counter when the goroutine completes.defer wg.Done()// Fetch the URL.http.Get(url)}(url)}// Wait for all HTTP fetches to complete.wg.Wait()
}
源码解读
结构体
type WaitGroup struct {noCopy noCopystate atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.sema uint32
}
noCopy
标识结构体不能被复制
state
的高32位表示counter的值,低32位表示waiter的值
sema
用来阻塞和唤醒goroutine
方法
<font style="color:rgb(36, 41, 46);background-color:rgb(233, 236, 239);">func(wg *WaitGroup) Done()</font>
<font style="color:rgb(36, 41, 46);background-color:rgb(233, 236, 239);">func(wg *WaitGroup) Add(delta int)</font>
<font style="color:rgb(36, 41, 46);background-color:rgb(233, 236, 239);">func(wg *WaitGroup) Wait()</font>
Done()
// Done 将计数器(counter)值减 1
func (wg *WaitGroup) Done() {wg.Add(-1)
}
调用了add方法
Add()
func (wg *WaitGroup) Add(delta int) {state := wg.state.Add(uint64(delta) << 32)v := int32(state >> 32) // counter数量w := uint32(state) // waiter数量// counter < 0 -->> panicif v < 0 {panic("sync: negative WaitGroup counter")}// 并发调用Add、Wait-->> panicif w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 操作成功if v > 0 || w == 0 {return}// 并发调用panicif wg.state.Load() != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// v == 0 && counter != 0// 释放等待的waiter,将state的状态置为0wg.state.Store(0)for ; w != 0; w-- {runtime_Semrelease(&wg.sema, false, 0)}
}
从源码的panic来看,不能并发调用Add()
&Wait()
不能让counter < 0
Wait()
func (wg *WaitGroup) Wait() {// CAS操作失败后for {state := wg.state.Load()v := int32(state >> 32)w := uint32(state)if v == 0 {// counter == 0 直接返回,没有需要等待的goroutinereturn}// if wg.state.CompareAndSwap(state, state+1) {runtime_SemacquireWaitGroup(&wg.sema)// 检验,唤醒前就已经将state置为0if wg.state.Load() != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}return}}
}
总结
参考
https://segmentfault.com/a/1190000045998688