golang中你应该知道的协程协作知识

Golang中高效并发的高级技术

golang golang-advanced-01(from github.com/MariaLetta/free-gophers-pack)|300

2024已经过去了,这篇文章是2025年的第一篇文章,祝大家新年快乐。2024年是个值得纪念的一年,还有大约一个月就是中国的新年了,到时候我会空出来时间给过去的一年做出一些总结!

现在就让我们先看看2025年的第一篇文章吧。


虽然 Go 已经把协程做的非常的轻量级,但依然存在部分开销,其中最大的开销来自栈扩张的开销。

每个 Goroutine 初始 stack size 为 2KB ,在协程内调用函数时,会通过 morestack 判断是否需要栈扩张,如需要,则需要调用 copystack 拷贝完整的栈。

而在现实中,大部分线上程序的运行模式都有一定的固定规律,例如一个 Server 所有请求都会经过一系列固定的函数调用链,而如果这些函数最终会导致 stack 被固定扩张到 8KB,那么这个程序每一次请求都会重复调用 copystack。

基于此,我们希望每一个请求都能尽可能复用前一个请求已经扩张好栈的 Goroutine。

复用的协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Pool struct {
tasks chan func()
}

func (p *Pool) Go(task func()) {
select {
case p.tasks <- task: // reuse exist worker
return
default:
}
go func() { // start a new worker
for {
task()
task = <-p.tasks // waiting for new task
}
}()
}

以上是一个最简单的 Goroutine 池实现,并未考虑超时销毁等逻辑。如果该系统的并发度是 3,那么我们仅需要常驻 3个 Goroutine 便可以完成所有任务。例如:

协作的协程

「不要通过共享内存来通信,而应该通过通信来共享内存」。

由于协程可以轻量切换上下文的,我们可以轻易地通过通信的方式将内存对象“传输”给其他需要使用的协程,而无需通过加互斥锁的方式来保证并发安全性。

在并发编程中,生产者消费者模型是最常见的编程场景,我们就以此举例,来看看在 Go 中如何达成多协程并发协作的目的。利用 Go 语言内置的 channel 数据结构,我们仅需几行代码就能够实现一个最简单的消息队列:

简单生产消费模型

1
2
3
4
5
6
7
8
9
10
11
12
13
type Factory struct {
queue chan string
}

func (f *Factory) Produce(msg string) {
f.queue <- val
}

func (f *Factory) Consume() {
for msg := range f.queue {
handler(msg)
}
}

在这个模型中,生产者负责生成数据并将其放入共享的缓冲区,而消费者则从缓冲区中取出并处理这些数据。缓冲区的存在使得生产者和消费者能够独立运行,提高了系统的效率和响应性。例如:

批量消费

基于 Channel 实现的消息队列有一个缺陷是获取一个元素的代价过大,且无法一次操作就获得全部的元素。在许多场景中我们希望能够批量消费当前所有元素。此时我们可以利用加锁的链表实现消息队列,同时利于 Channel 作为协程间的信号通知器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type ListFactory struct {
mu sync.Mutex
queue []string
wakeup chan struct{} // size=1
}

func NewListFactory() *ListFactory {
f := new(ListFactory)
f.wakeup = make(chan struct{}, 1)
return f
}

func (f *ListFactory) Produce(msg string) {
f.mu.Lock()
f.queue = append(f.queue, msg)
if len(f.queue) == 1 {
select {
case f.wakeup <- struct{}{}:
default:
}
}
f.mu.Unlock()
}

func (f *ListFactory) Consume() {
for {
<-f.wakeup

f.mu.Lock()
cache := f.queue
f.queue = nil
f.mu.Unlock()
batchHandler(cache)
}
}
}

举个例子:

为了进一步理解代码,我们可以看一些错误case。

  1. 错误 case1 死锁:
1
2
// Close the wakeup channel to signal the consumer to stop
// close(factory.wakeup)

如果缺少close(factory.wakeup), 在执行最后一次 <-f.wakeup 后,下面的代码就会产生死锁,因为无法执行到,也无法正常退出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
		if len(f.queue) == 0 {
f.mu.Unlock()
break
}```

2. 错误case 2 未及时消费:

```go
func (f *ListFactory) Consume() {
defer f.done.Done()

for {
f.mu.Lock()
if len(f.queue) == 0 {
f.mu.Unlock()
break
}

cache := f.queue
f.queue = nil // Clear the queue.
f.mu.Unlock()

<-f.wakeup
batchHandler(cache)
time.Sleep(1 * time.Second) // Simulate batch processing time
}
}

以上代码,基本只会输出 All tasks completed,因为检测queue长度为空,提前break了。

提升消费效率

在上面的实现里,我们的消费者 Goroutine 每当队列中有 1 条消息,就有可能会唤醒执行。而在有些场景下,我们希望消费者能够积蓄一定数量的消息再进行批量处理,典型的场景如异步的日志写入,数据的合并发送等。

最简单的做法是每次 Sleep 一个窗口时间去积累消息发送,但这样做的代价是,即便后续没有任何数据需要写入,我们整体消费速度都会被拖长到至少一个窗口时间。

在这种情况下,我们可以利用 runtime.Gosched() 实现有限度的延迟等待:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (f *ListFactory) Consume() {
for {
<-f.wakeup
runtime.Gosched()
f.mu.Lock()
cache := f.queue
f.queue = nil
f.mu.Unlock()
batchHandler(cache)
}
}


func main() {
N := 32
f := new(ListFactory)
go func() {
for i := 0; i < N; i++ {
f.Produce("hello")
runtime.Gosched()
}
}()
f.Consume()
}

runtime.Gosched() 的作用

runtime.Gosched() 是 Go 语言 runtime 包中的一个函数,可以简单理解其作用是让出当前 goroutine 的 CPU 时间片,允许其他 goroutine 获得执行机会。

在程序处于繁忙状态时,会有大量(假设为 N 个) Goroutine 不停调用 Produce 函数,而消费者的 Goroutine 仅有 1 个,假设每个 Goroutine 的调度机会相同,那么消费者 Goroutine 每获得一次调度机会,意味着生产者 Goroutine 至少已经生产了 N 条消息,这样便批量消费至少 N 条消息(理想情况下)。
而如果该模型处于空闲状态下,生产者的 Goroutine 数量非常少,而此时即便消费者调用 runtime.Gosched(),也能够很快由于没有其他 Goroutine 需要被调度而获得执行机会,从而尽快地消费。

这种主动让渡执行权的方式,能够让协程间拥有更加主动地调度能力,从而发挥协程名副其实的”协作”能力。

同样为了进一步理解代码,我们可以看一些错误case。

  1. case 1: 死锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Consume processes messages from the queue in batches.
func (f *ListFactory) Consume() {
defer f.done.Done()
for {
// Wait for a signal from the producer.
<-f.wakeup
// Yield CPU time to allow producer to run.
runtime.Gosched()

f.mu.Lock()
if len(f.queue) == 0 {
f.mu.Unlock()
break // Exit loop if queue is empty.
}

cache := f.queue
f.queue = nil // Clear the queue.
f.mu.Unlock()

// Process the batch of messages.
batchHandler(cache)
}
}

func main() {
N := 32 // Number of messages to produce
f := NewListFactory()

// Start the consumer in the main goroutine.
f.done.Add(1)
f.Consume()

// Start a producer goroutine.
go func() {
for i := 0; i < N*3; i++ {
f.Produce(fmt.Sprintf("Message %d", i+1))
runtime.Gosched() // Simulate production delay.
}

// Signal the consumer that no more tasks will come.
close(f.wakeup)
}()

f.done.Wait()
fmt.Println("All tasks completed.")
}

消费者挂起等待生产者,而生产者在主协程中也在等待,互相等待导致死锁。

  1. Case 2:未消费退出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Consume processes messages from the queue in batches.
func (f *ListFactory) Consume() {
defer f.done.Done()
for {
// Yield CPU time to allow producer to run.
runtime.Gosched()

f.mu.Lock()
if len(f.queue) == 0 {
f.mu.Unlock()
break // Exit loop if queue is empty.
}

cache := f.queue
f.queue = nil // Clear the queue.
f.mu.Unlock()

// Process the batch of messages.
batchHandler(cache)

// Wait for a signal from the producer.
<-f.wakeup
}
}

func main() {
N := 32 // Number of messages to produce
f := NewListFactory()

// Start the consumer in the main goroutine.
f.done.Add(1)
f.Consume()

// Start a producer goroutine.
go func() {
for i := 0; i < N*3; i++ {
f.Produce(fmt.Sprintf("Message %d", i+1))
runtime.Gosched() // Simulate production delay.
}

// Signal the consumer that no more tasks will come.
close(f.wakeup)
}()

f.done.Wait()
fmt.Println("All tasks completed.")
}

runtime.Gosched() 挂起无效,只有一个主协程,生产者的协程并没有启动。

更多内容

最近文章:

随机文章:


更多该系列文章,参考medium链接:

https://wesley-wei.medium.com/list/you-should-know-in-golang-e9491363cd9a

English post: https://programmerscareer.com/golang-advanced-01/
作者:微信公众号,Medium,LinkedIn,Twitter
发表日期:原文在 2025-01-01 16:32 时创作于 https://programmerscareer.com/zh-cn/golang-advanced-01/
版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证

golang中你应该知道的常见优化手段 在Golang中您应该知道的责任链模式(设计模式06)

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×