Golang 中你应该知道的 WaitGroup 知识

Nothing is at last sacred but the integrity of your own mind.
— Ralph Waldo Emerson

1. sync.WaitGroup 概述:

简介sync.WaitGroup的功能和作用。

Go语言作为云原生的代表,在并发编程方面以极为容易上手而出名。而在并发编程使用过程中,大多数情况人都会用到WaitGroup。在使用过程中,我时常在想它是如何实现的,所以本文重点讨论我对WaitGroup的一些理解。

在 Go 语言中,sync.WaitGroup 能够让主程序或者其他 goroutine 等待多个 goroutine 执行完毕后再继续执行。
它主要用于以下场景:

  • 等待一组 goroutine 完成:当我们有多个并发任务需要执行,并且希望在这些任务全部完成后再继续执行后续操作时
  • 确保资源安全释放:在并发操作中,需要确保所有 goroutine 执行完毕后再释放资源,以避免资源竞争和数据不一致。

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"sync"
)

func main() {
var counter int64
var mu sync.Mutex
var wg sync.WaitGroup

for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for j := 0; j < 1000; j++ {
mu.Lock()
counter++
mu.Unlock()
}
wg.Done()
}()
}

wg.Wait()
fmt.Println("Final Counter:", counter)
}

Run in Go 1.22

2 sync.WaitGroup in Go 1.17:

深入剖析sync.WaitGroup的结构和内部机制。

我了解到 Go 1.20 之前的结构有一些很巧妙的地方,所以这里会重点以 Go1.17举例进行说明,后面也会提及 Go 1.20之后的实现。

cs.opensource.google/go/go/+/refs/tags/go1.17:src/sync/waitgroup.go

1
2
3
4
5
6
7
8
9
10
type WaitGroup struct {
noCopy noCopy

// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}
  • nocopy是为了保证该结构不会被进行拷贝,这是一种保护机制,会在后面进行介绍。
  • state1主要是存储着计数状态和信号量,接下来会重点介绍。

2.1 注释剖析

想理解上面的注释,首先要理解内存对齐,以及 state1 在Add() 和 Wait() 的使用

内存对齐要求数据在内存中的地址必须是某个特定值的倍数,这样可以提升CPU读取内存数据的效率。例如:

  • 32 位对齐:数据的起始地址必须是 4 的倍数。
  • 64 位对齐:数据的起始地址必须是 8 的倍数。

而在 Add() 和 Wait() 中 counter 和 waiter 被合在了一起当成一个 64 位的整数进行使用。

1
2
3
4
5
statep, semap := wg.state()
...
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)

当需要变化 counter 和 waiter 的值的时候,是通过 atomic 来原子操作这个 64 位整数。但atomic 在使用时有一些需要注意的点,golang 的官方文档 sync/atomic/#pkg-note-BUG 有这一段话:

On ARM, 386, and 32-bit MIPS, it is the caller’s responsibility to arrange for 64-bit alignment of 64-bit words accessed atomically via the primitive atomic functions (types Int64 and Uint64 are automatically aligned). The first word in an allocated struct, array, or slice; in a global variable; or in a local variable (because the subject of all atomic operations will escape to the heap) can be relied upon to be 64-bit aligned.

可以看出,基于这个前提,在32为系统中,我们需要自行保证 count+waiter 的 64 位对齐。那么问题来了,如果是你来保证,你会怎么实现呢?

2.2 state()

让我们来学习一下官方的实现:

1
2
3
4
5
6
7
8
9
10
state1 [3]uint32

// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}

image.png
如图中所示:

  1. 64位系统均满足8字节对齐,32位系统有可能满足8字节对齐。
  2. 32位系统不满足8字节对齐时,sema前置4字节,来保证后面的state满足8字节对齐

只不过是改变 sema 的位置顺序,就可以保证 counter+waiter 一定会 64 位对齐,确实非常巧妙。

PS:后面或许可以介绍一下内存对齐的文章。

2.3 简易实现过程

现在,我们可以将原本的结构看成以下的代码,我们暂时先不考虑内存对齐和并发安全等方面因素,方便我们理解:

1
2
3
4
5
type WaitGroup struct {
counter int32
waiter uint32
sema uint32
}

这些字段的大概使用如下:

  • counter 代表目前尚未完成的个数。WaitGroup.Add(n) 将会导致 counter += n, 而 WaitGroup.Done() 将导致 counter--
  • waiter 代表目前已调用 WaitGroup.Wait 的 goroutine 的个数。
  • sema 对应于 golang 中 runtime 内部的信号量的实现。WaitGroup 中会用到 sema 的两个相关函数,runtime_Semacquire 和 runtime_Semreleaseruntime_Semacquire 表示增加一个信号量,并挂起 当前 goroutine。runtime_Semrelease 表示减少一个信号量,并唤醒 sema 上其中一个正在等待的 goroutine。

3. Add()、Done()、Wait()

cs.opensource.google/go/go/+/refs/tags/go1.17:src/sync/waitgroup.go

打开链接,总共不到150行,请先看一遍代码帮助理解。

结合我们常用的使用场景, 重点过程如下:

  1. 当调用 WaitGroup.Add(n) 时,counter 将会自增: counter += n
1
state := atomic.AddUint64(statep, uint64(delta)<<32)
  1. 当调用 WaitGroup.Wait() 时,会将 waiter++。同时调用 runtime_Semacquire(semap), 增加信号量,并挂起当前 goroutine。
1
2
3
4
if atomic.CompareAndSwapUint64(statep, state, state+1) {
...
runtime_Semacquire(semap)
...
  1. 当调用 WaitGroup.Done() 时,将会 counter--。如果自减后的 counter 等于 0,说明 WaitGroup 的等待过程已经结束,则需要调用 runtime_Semrelease 释放信号量,唤醒正在 WaitGroup.Wait 的 goroutine。
1
2
3
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}

4. WaitGroup in Go 1.20

cs.opensource.google/go/go/+/refs/tags/go1.20:src/sync/waitgroup.go

相信应该有人意识到一个问题,counter 和 waiter 在改变时需要保证并发安全,为什么不直接使用 atomic.Uint64 呢?

那是因为 atomic.Uint64 在靠后的版本中才支持的。

在Go 1.20中,我们就可以注意到内存对齐的逻辑被 atomic.Uint64 代替了,虽然在 Go 1.20 release 文档中没有提到这个看起来很微小的变动,但其中确实有很多值得学习的地方。

参考:[go] sync: use atomic.Uint64 for WaitGroup state,atomic.Uint64实现并没有额外的性能损耗。

5. No Copy

waitGroup结构中,我们看到了nocopy,为什么要有nocopy呢?先看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import "fmt"

// Define a struct type
type Person struct {
Name string
Age int
}

func main() {
// Create a struct instance
person := Person{Name: "Alice", Age: 30}

// Create a pointer to the struct
p := &person

// Access and modify the struct's fields through the pointer
fmt.Println(p.Name) // Output: Alice
fmt.Println((*p).Name) // Output: Alice

p1 := p
p.Age = 32
fmt.Println(p).Age) // Output: 32
fmt.Println(p1).Age) // Output: 32
}

Go 中的指针拷贝是浅拷贝,浅拷贝(仅复制顶层结构)将导致原始结构和副本都指向相同的基础数据。如果一个结构中的数据被修改,从而影响另一个结构,这可能会导致不一致。

而使用 nocopy字段可以帮助进行静态编译检查,只要是该对象或对象中存在nocopy字段,可以通过go vet功能,来检查代码中该对象是否有被copy。

More about Pointers: Pointers You Should Know in Golang | by Wesley Wei | Jul, 2024 | Programmer’s Career

6. WaitGroup 的注意事项:

探讨WaitGroup在使用上的一些局限性和潜在的陷阱,了解如何避免这些问题。

如果你查看了Go中的源码,可以发现以下大家总结好的经典注意事项:

  • Add() 操作需要早于 Wait() 操作
  • 调用 Done() 次数要与 Add() 计数器值相等
  • 计数器 (counter) 的值小于 0,会 panic
  • Add() 和 Wait() 不能并行调用,比如在 2 个不同 goroutine 里调用,会 panic
  • 要重复调用 WaitGroup,必须等 Wait() 执行完才能进行下一轮调用

7. 信号量

前面提到了信号量,信号量是一种保护共享资源的机制,用于防止多个线程同时访问某个资源。先简单了解unix/linux系统下的信号量的实现:

信号量包含一个非负整型的变量,有两个原子操作 wait(down) 和 signal(up)。wait 又可以称为 P 或 down 操作,减 1 操作;signal 也被称为 V 或 up 操作,加 1 操作。信号量通过原子操作实现的 加 1 或 减 1 运算来实现对并发资源的控制。

  1. wait(P、down) 操作,如果信号量的非负整型变量 S > 0,wait 将其减 1;如果 S = 0,wait 将该线程阻塞。
  2. signal(V、up) 操作,增量后,如果预增量值为负(意味着有进程在等待资源),则将阻塞进程从信号量的等待队列转移到就绪队列;如果没有线程阻塞在信号量上,signal 将 S 加 1。

对应Go中使用 WaitGroup 的常用场景:

  1. 先执行 Wait() 中的 runtime_Semacquire(semap) ,此时semap 为0,应该会增加信号量,并挂起当前 goroutine。
  2. 当所有goroutine Done() 执行完毕,执行 runtime_Semrelease 释放信号量,唤醒正在 WaitGroup.Wait 的 goroutine。

对应的源码: sema.go

1
2
3
4
5
6
7
8
9
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0, waitReasonSemacquire)
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}

以 semacquire1 为例(wait、P、down):

  • 尝试获取信号量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
if cansemacquire(addr) {
return
}
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
  • **阻塞等待:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
for {
...
if cansemacquire(addr) {
root.nwait.Add(-1)
unlock(&root.lock)
break
}
root.queue(addr, s, lifo)
goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
...
}

具体的逻辑比较复杂,有兴趣的话,可以自行研究。更多关于信号量知识,参考:Semaphore (programming) - Wikipedia

参考

Golang WaitGroup 原理深度剖析 | 编程沉思录
深入理解Go语言(08):sync.WaitGroup源码分析 - 九卷 - 博客园

English post: https://programmerscareer.com/golang-waitgroup/
作者:Wesley Wei – Twitter Wesley Wei – Medium
注意:原文在 2024-07-08 22:37 时创作于 https://programmerscareer.com/golang-waitgroup/. 本文为作者原创,转载请注明出处。

我的 10+ 个 obsidian 写作插件 Go 1.23 的 Timer 相关改动

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×