Nothing is at last sacred but the integrity of your own mind.
— Ralph Waldo Emerson
1. sync.WaitGroup 概述:
简介sync.WaitGroup的功能和作用。
Go语言作为云原生的代表,在并发编程方面以极为容易上手而出名。而在并发编程使用过程中,大多数情况人都会用到WaitGroup。在使用过程中,我时常在想它是如何实现的,所以本文重点讨论我对WaitGroup的一些理解。
在 Go 语言中,sync.WaitGroup
能够让主程序或者其他 goroutine 等待多个 goroutine 执行完毕后再继续执行。
它主要用于以下场景:
- 等待一组 goroutine 完成:当我们有多个并发任务需要执行,并且希望在这些任务全部完成后再继续执行后续操作时
- 确保资源安全释放:在并发操作中,需要确保所有 goroutine 执行完毕后再释放资源,以避免资源竞争和数据不一致。
举个例子:
1 | package main |
2 sync.WaitGroup in Go 1.17:
深入剖析sync.WaitGroup的结构和内部机制。
我了解到 Go 1.20 之前的结构有一些很巧妙的地方,所以这里会重点以 Go1.17举例进行说明,后面也会提及 Go 1.20之后的实现。
cs.opensource.google/go/go/+/refs/tags/go1.17:src/sync/waitgroup.go
1 | type WaitGroup struct { |
nocopy
是为了保证该结构不会被进行拷贝,这是一种保护机制,会在后面进行介绍。state1
主要是存储着计数状态和信号量,接下来会重点介绍。
2.1 注释剖析
想理解上面的注释,首先要理解内存对齐,以及 state1 在Add() 和 Wait() 的使用
内存对齐要求数据在内存中的地址必须是某个特定值的倍数,这样可以提升CPU读取内存数据的效率。例如:
- 32 位对齐:数据的起始地址必须是 4 的倍数。
- 64 位对齐:数据的起始地址必须是 8 的倍数。
而在 Add() 和 Wait() 中 counter 和 waiter 被合在了一起当成一个 64 位的整数进行使用。
1 | statep, semap := wg.state() |
当需要变化 counter 和 waiter 的值的时候,是通过 atomic 来原子操作这个 64 位整数。但atomic 在使用时有一些需要注意的点,golang 的官方文档 sync/atomic/#pkg-note-BUG 有这一段话:
On ARM, 386, and 32-bit MIPS, it is the caller’s responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically via the primitive atomic functions (types Int64 and Uint64 are automatically aligned). The first word in an allocated struct, array, or slice; in a global variable; or in a local variable (because the subject of all atomic operations will escape to the heap) can be relied upon to be 64-bit aligned.
可以看出,基于这个前提,在32为系统中,我们需要自行保证 count+waiter
的 64 位对齐。那么问题来了,如果是你来保证,你会怎么实现呢?
2.2 state()
让我们来学习一下官方的实现:
1 | state1 [3]uint32 |
如图中所示:
- 64位系统均满足8字节对齐,32位系统有可能满足8字节对齐。
- 32位系统不满足8字节对齐时,sema前置4字节,来保证后面的state满足8字节对齐
只不过是改变 sema
的位置顺序,就可以保证 counter+waiter
一定会 64 位对齐,确实非常巧妙。
PS:后面或许可以介绍一下内存对齐的文章。
2.3 简易实现过程
现在,我们可以将原本的结构看成以下的代码,我们暂时先不考虑内存对齐和并发安全等方面因素,方便我们理解:
1 | type WaitGroup struct { |
这些字段的大概使用如下:
counter
代表目前尚未完成的个数。WaitGroup.Add(n)
将会导致counter += n
, 而WaitGroup.Done()
将导致counter--
。waiter
代表目前已调用WaitGroup.Wait
的 goroutine 的个数。sema
对应于 golang 中 runtime 内部的信号量的实现。WaitGroup 中会用到 sema 的两个相关函数,runtime_Semacquire
和runtime_Semrelease
。runtime_Semacquire
表示增加一个信号量,并挂起 当前 goroutine。runtime_Semrelease
表示减少一个信号量,并唤醒 sema 上其中一个正在等待的 goroutine。
3. Add()、Done()、Wait()
cs.opensource.google/go/go/+/refs/tags/go1.17:src/sync/waitgroup.go
打开链接,总共不到150行,请先看一遍代码帮助理解。
结合我们常用的使用场景, 重点过程如下:
- 当调用
WaitGroup.Add(n)
时,counter 将会自增:counter += n
1 | state := atomic.AddUint64(statep, uint64(delta)<<32) |
- 当调用
WaitGroup.Wait()
时,会将waiter++
。同时调用runtime_Semacquire(semap)
, 增加信号量,并挂起当前 goroutine。
1 | if atomic.CompareAndSwapUint64(statep, state, state+1) { |
- 当调用
WaitGroup.Done()
时,将会counter--
。如果自减后的 counter 等于 0,说明 WaitGroup 的等待过程已经结束,则需要调用 runtime_Semrelease 释放信号量,唤醒正在WaitGroup.Wait
的 goroutine。
1 | for ; w != 0; w-- { |
4. WaitGroup in Go 1.20
cs.opensource.google/go/go/+/refs/tags/go1.20:src/sync/waitgroup.go
相信应该有人意识到一个问题,counter
和 waiter
在改变时需要保证并发安全,为什么不直接使用 atomic.Uint64 呢?
那是因为 atomic.Uint64
在靠后的版本中才支持的。
在Go 1.20中,我们就可以注意到内存对齐的逻辑被 atomic.Uint64
代替了,虽然在 Go 1.20 release 文档中没有提到这个看起来很微小的变动,但其中确实有很多值得学习的地方。
参考:[go] sync: use atomic.Uint64 for WaitGroup state,atomic.Uint64实现并没有额外的性能损耗。
5. No Copy
在waitGroup
结构中,我们看到了nocopy
,为什么要有nocopy
呢?先看一个例子:
1 | package main |
Go 中的指针拷贝是浅拷贝,浅拷贝(仅复制顶层结构)将导致原始结构和副本都指向相同的基础数据。如果一个结构中的数据被修改,从而影响另一个结构,这可能会导致不一致。
而使用 nocopy
字段可以帮助进行静态编译检查,只要是该对象或对象中存在nocopy
字段,可以通过go vet功能,来检查代码中该对象是否有被copy。
More about Pointers: Pointers You Should Know in Golang | by Wesley Wei | Jul, 2024 | Programmer’s Career
6. WaitGroup 的注意事项:
探讨WaitGroup在使用上的一些局限性和潜在的陷阱,了解如何避免这些问题。
如果你查看了Go中的源码,可以发现以下大家总结好的经典注意事项:
- Add() 操作需要早于 Wait() 操作
- 调用 Done() 次数要与 Add() 计数器值相等
- 计数器 (counter) 的值小于 0,会 panic
- Add() 和 Wait() 不能并行调用,比如在 2 个不同 goroutine 里调用,会 panic
- 要重复调用 WaitGroup,必须等 Wait() 执行完才能进行下一轮调用
7. 信号量
前面提到了信号量,信号量是一种保护共享资源的机制,用于防止多个线程同时访问某个资源。先简单了解unix/linux系统下的信号量的实现:
信号量包含一个非负整型的变量,有两个原子操作 wait(down) 和 signal(up)。wait 又可以称为 P 或 down 操作,减 1 操作;signal 也被称为 V 或 up 操作,加 1 操作。信号量通过原子操作实现的 加 1
或 减 1
运算来实现对并发资源的控制。
- wait(P、down) 操作,如果信号量的非负整型变量 S > 0,wait 将其减 1;如果 S = 0,wait 将该线程阻塞。
- signal(V、up) 操作,增量后,如果预增量值为负(意味着有进程在等待资源),则将阻塞进程从信号量的等待队列转移到就绪队列;如果没有线程阻塞在信号量上,signal 将 S 加 1。
对应Go中使用 WaitGroup 的常用场景:
- 先执行 Wait() 中的 runtime_Semacquire(semap) ,此时semap 为0,应该会增加信号量,并挂起当前 goroutine。
- 当所有goroutine Done() 执行完毕,执行 runtime_Semrelease 释放信号量,唤醒正在
WaitGroup.Wait
的 goroutine。
对应的源码: sema.go:
1 | //go:linkname sync_runtime_Semacquire sync.runtime_Semacquire |
以 semacquire1 为例(wait、P、down):
- 尝试获取信号量:
1 | if cansemacquire(addr) { |
- **阻塞等待:
1 | for { |
具体的逻辑比较复杂,有兴趣的话,可以自行研究。更多关于信号量知识,参考:Semaphore (programming) - Wikipedia。
参考
Golang WaitGroup 原理深度剖析 | 编程沉思录
深入理解Go语言(08):sync.WaitGroup源码分析 - 九卷 - 博客园
English post: https://programmerscareer.com/golang-waitgroup/
作者:Wesley Wei – Twitter Wesley Wei – Medium
注意:原文在 2024-07-08 22:37 时创作于 https://programmerscareer.com/golang-waitgroup/. 本文为作者原创,转载请注明出处。
评论