1.sync.WaitGroup 介绍
sync.WaitGroup 按照官方注释给的解释,它可以等待一组 Goroutine 集合的结束,主 goroutine
通过调用 Add()
函数来设置一定数量进行等待的 goroutines
,然后其余的一些 goroutines
则进行各自的运行结束之后再调用 Done()
,这样一来,等待的主 goroutine
会阻塞知道其余所有 goroutines
都结束。
A WaitGroup waits for a collection of goroutines to finish.The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block until all goroutines have finished.
注:以下golang
源码版本为:1.15
2.源码解析
结构体定义
1 | // A WaitGroup must not be copied after first use. |
第一个参数:noCopy
noCopy:它是 sync 包下的一个特殊标记,可以嵌入到结构中,在第一次使用后不可复制,使用go vet
作为检测使用,并因此只能进行指针传递,从而保证全局唯一;
比如下面的演示代码中:
1 | func main () { |
run 的时候没问题,但是如果你使用 go vet 做个检查就有警告了:
1 | $ go vet proc.go |
所以它能够保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝;
第二个参数:state1
state1:是用来存放任务计数器和等待者计数器,应该会涉及到一些位操作相关的内容,估计还是挺棘手的,需要注意的是:WaitGroup 在 64 位和 32 位机器是持有不同的状态,其相对应的具体内容如下:
state [0] | state [1] | state [2] | |
---|---|---|---|
64 位 | waiter | counter | sema |
32 位 | sema | waiter | counter |
注意其中 waiter
是等待者计数,counter
是任务计数,sema
是信号量。具体可以看看注释(不过这是之前版本的注释,官方并未修改):
64-bit value: high 32 bits are counter, low 32 bits are waiter count.
64-bit atomic operations require 64-bit alignment, but 32-bit compilers do not ensure it.So we allocate 12 bytes and then use the aligned 8 bytes in them as state, and the other 4 as storage for the sema.
其实如果看其他书籍或者博客,会发现之前版本的 WaitGroup
结构体定义并非如此:
1 | type WaitGroup struct { |
关于新版本的 go
为什么重新设计可以看看注释,我认为应该是为了操作的便利性。
同时sync.WaitGroup
提供的私有方法 sync.WaitGroup.state
能够帮我们从 state1
字段中取出它的状态(statep = waiter + counter
)和信号量(semap
)。
1 | // 获取statep、semap的值 |
Add()
sync.WaitGroup
对外暴露了三个方法分别是 sync.WaitGroup.Add
、sync.WaitGroup.Wait
和 sync.WaitGroup.Done
,而有意思的是sync.WaitGroup.Done
向 sync.WaitGroup.Add
方法传入了 -1(没错,就是这么简单,也说明了这个 delta
可以为负数),接下来先从sync.WaitGroup.Add
开始分析起。
通过 Add()
函数我们传进了 delta
这么一个值,它要加到 WaitGroup
的计数器当中,它可以是负数;如果计数器变为零时,所有被阻塞的 goroutines
都会被释放,如果计数器变为负数时,则会报 panic
(意思就是,计数器不能为负)。
其源码具体如下:
1 | func (wg *WaitGroup) Add(delta int) { |
捋一捋,以上的 Add()
函数主要完成的以下内容:
- 首先通过
wg.state()
得到statep
(waiter + counter
) 和semap
,然后将Add()
传进的参数delta
添加至counter
,再将statep
中的waiter
和coutner
分别抽离出来并封装成为v
和w
,然后对v
和w
做一些校验,比如- 计数器
counter
为负,则触发panic
; - 还有
Wait()
方法没有在Add()
方法之后调用,则触发panic;
- 计数器
- 如果
Add()
添加正常则返回。 - 再对原
state
和新的state
不等的两种情况进行判断是否出错,出错则报panic,具体的两种情况为:- Add 与 Wait 同时调用;
- 如果counter已经0,但Wait 继续增加等待计数器 waiters,这种情况永远不会触发信号量;
- 最后对于
w > 0
的情况,会进行 for 循环直到调用计数器归零,也就是所有任务都执行完成时,就会通过sync.runtime_Semrelease
唤醒处于等待状态的所有Goroutine
。
Done()
Done()
就简单将counter
值减1,这里就能够理解为什么上面说 delta
可以为负数了。
1 | // Done decrements the WaitGroup counter by one. |
Wait()
Wait()
会在计数器 counter
大于 0 并且不存在等待的 Goroutine
时,调用 sync.runtime_Semacquire
陷入睡眠状态,直到 couner
的值为0时进行唤醒。
1 | func (wg *WaitGroup) Wait() { |
可以看到当 sync.WaitGroup
的计数器 Counter
归零时,陷入睡眠状态的 Goroutine 就被唤醒,上述方法会立刻返回。
3.小结
通过源码的解析,可以得到以下的一些认识:
WaitGroup
利用信号量来实现任务结束的通知;Wait()
可以被调用多次,也即可以同时有多个 Goroutine 等待当前sync.WaitGroup
计数器的归零,并且每个都会收到完成的通知;sync.WaitGroup.Done
只是对sync.WaitGroup.Add
方法的简单封装,我们可以向sync.WaitGroup.Add
方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒其他等待的 Goroutine;WaitGroup
作为参数传递的时候需要注意传递指针,或者尽量避免传递;WaitGroup
不能保证多个goroutine
执行次序
但是使用的时候需要注意以下几点:
Add()
操作必须早于Wait()
,否则会panic
;Add()
设置的值必须与实际等待的goroutine
个数一致,否则会panic
;WaitGroup
必须在Wait()
方法返回之后才能被重新使用;WaitGroup
只可保持一份,不可拷贝给其他变量,否则会造成意想不到的BUG;
最后:实在忍不了吐槽一下,虽然很多地方我分析得不是很好,但是 go夜读
的一些源码解析也太烂了,直接将注解扔进谷歌翻译/百度翻译,最后也都不审核一下,简直就是不能入眼。