golang 中你应该知道的时间控制

Every man is a damn fool for at least five minutes every day; wisdom consists in not exceeding the limit.
— Elbert Hubbard

问题引入

最近我遇到一个需求,我的程序需要定时触发一些任务,这就需要对时间做相对精细的掌控。
先从程序的组件架构图开始说起:
image.png

这里具体的需求是 manager 需要周期性触发分配任务,而worker 也要周期性从db里拿到任务并执行,另外比较重要的一点是manager 和 worker 执行的时长都是固定的。

让我以manager 执行20min分配任务,worker 执行60 min执行任务来举例,从时间维度来绘图:
image.png

可以看出来,不管是manager还是worker,这里核心的要求,就是定时做一些事情。
而在我的Go使用生涯中,曾遇到过很多时间控制相关的场景,我想这是一次好机会,是时候整体梳理一下分享出来了。

所以,在Go中我们该如何进行时间控制呢?又用哪些使用场景呢?我的这个需求又该如何实现呢?

时间控制

在梳理过程中我发现可以将时间控制大致分为两类场景:

  1. 超时、资源回收场景(channel+context、channel + time.After/time.NewTimer)
  2. 周期性场景 (channel + time.After/time.NewTimer、cron package)

不同的场景下需要控制的内容,各有侧重,让我们继续往下看。

channel + context

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Millisecond*800))
defer cancel()
done := make(chan bool)
go func(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(time.Millisecond*100))
defer cancel()
// Call downstream interface
fmt.Println("Call downstream interface")
// if success
done <- true
}(ctx)

// do something
select {
case <-done:
fmt.Println("call successfully!!!")
// do something
return
case <-ctx.Done():
fmt.Println("timeout or do something else")
// retry or recycle resource
return
}
}

Better Go Playground
主要逻辑是使用context来进行时间控制。主 goroutine 设置了800毫秒的超时时间,如果子goroutine 没有执行完毕,我们最多等待800毫秒,超过这个时间,我们就可以考虑重试、资源回收等动作,如果子goroutine请求成功,我们则可以继续主goroutine的业务逻辑。

可以看出这里时间控制的侧重点是请求超时和资源回收。

PS: context 更深入的内容,可以参考我的另一篇文章:

channel + time.After/time.NewTimer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
ch := make(chan struct{}, 1)
go func() {
fmt.Println("do something...")
time.Sleep(4 * time.Second)
ch <- struct{}{}
}()

select {
case <-ch:
fmt.Println("thing done")
case <-time.After(3 * time.Second):
fmt.Println("timeout")
}
}

Better Go Playground
使用time.After也可以做超时控制。但在每次调用time.After时,都会创建一个新的定时器和对应的通道,并且没有好的资源回收方法,我并不推荐这种用法。

为了避免这种潜在的内存泄漏,一个常见的做法是使用 time.NewTimer 替代 time.After,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func main() {
exitCh := make(chan struct{})
ch := make(chan struct{}, 1)
timer := time.NewTimer(3 * time.Second)

// Listen for specified signal
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

go func() {
fmt.Println("do something...")
time.Sleep(4 * time.Second)
ch <- struct{}{}
}()

// In a goroutine, listen to os.Interrupt and SIGTERM signals
go func() {
<-sig
fmt.Println("Received an interrupt, stopping services...")
close(exitCh) // Close exitCh when an interrupt signal is received
}()

for {
select {
case <-ch:
fmt.Println("done")
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
fmt.Println("timeout")
return
case <-exitCh:
fmt.Println("Service exiting")
if !timer.Stop() {
<-timer.C
}
// Execute your resource recovery code here ...
return
}
}
}

Better Go Playground
和 context 相比,我认为context是一个更优雅的方式,因为可以结合context 的特性,做更精细的控制(比如主goroutine cancel、子goroutine也会cancel)。但是time channel 的方式,是一种更简洁的办法,两者在超时控制场景各有不同的应用场景。

当然,time channel 还可以用于做一些周期性工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Define a task, use an empty struct as a simple example here
type Task struct{}

func worker(taskCh <-chan *Task) {
for task := range taskCh {
// Execute the task
// For demonstration, assume each task takes 1 second
time.Sleep(2 * time.Second)
_ = task
fmt.Println("run task")
}
}

func main() {
taskCh := make(chan *Task, 100)

// Start 3 worker goroutines to perform tasks
for i := 0; i < 3; i++ {
go worker(taskCh)
}

// Create a timer, execute once every second
timer := time.NewTicker(500 * time.Millisecond)

go func() {
for range timer.C {
// Every time the timer signal is received, print the current length of the queue
fmt.Printf("Current task queue length is: %d\n", len(taskCh))
}
}()

// Simulate continuous task adding
for {
taskCh <- &Task{}
time.Sleep(500 * time.Millisecond) // Simulate adding a task every 500 milliseconds
}
}

Note: In actual code, you might need a better way to stop the timer and exit the program. The for loop in this example will keep running until the program is forcefully terminated.
Also, the task in this example is an empty struct. You will need to define a task structure that fits your business requirements in real-world cases.

Better Go Playground
这里的例子,可以用于观测自身的服务是否正常。它会周期性检查任务channel长度,如果长度长时间为零,那么我们的任务大概率是出现了异常。

cron package

周期性观察任务channel的长度是一个轻量级任务,但当我们遇到更为复杂的场景怎么办呢?
答案是 cron package

Callers may register Funcs to be invoked on a given schedule. Cron will run them in their own goroutines.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
c := cron.New()
c.AddFunc("0 30 * * * *", func() { fmt.Println("Every hour on the half hour") })
c.AddFunc("@hourly", func() { fmt.Println("Every hour") })
c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty") })
c.Start()
..
// Funcs are invoked in their own goroutine, asynchronously.
...
// Funcs may also be added to a running Cron
c.AddFunc("@daily", func() { fmt.Println("Every day") })
..
// Inspect the cron job entries' next and previous run times.
inspect(c.Entries())
..
c.Stop() // Stop the scheduler (does not stop any jobs already running).

更多内容参考:cron package - github.com/robfig/cron - Go Packages

实际应用

我的需求显然可以使用 cron package 进行实现。
image.png

经过思考我做了以上实现,manager 在进程启动时就触发一次任务分配,后续则每隔一小时进行触发,这样worker显然可以更充分的利用时间去执行更多任务。

总结

不管是 time 标准库还是 cron 库底层都是借助了channel 的能力。感谢 golang context、channel 的设计,让我们开发时间控制相关的代码工作变得更轻松。

More

在整理文章时,我想到了以下问题:

  1. 为什么会有这种周期性触发的需求?
    1. 周期性是为了不让自身任务产生堆积,并且每个周期内动态的做更多任务。在每个周期内,我们根据下游接口承受的最大的能力范围,尽量多的去做任务。
  2. Go 1.23 Release Notes - The Go Programming Language提到 “Timers and Tickers that are no longer referred to by the program become eligible for garbage collection immediately” 这是怎么做到的呢?之前为什么不会立即被回收呢?
  3. select 中当 <-done 和 <-ctx.Done() 的消息同时出现,会选择走哪一个呢?为什么?
1
2
3
4
5
6
7
8
9
10
select {
case <-done:
fmt.Println("call successfully!!!")
// do something
return
case <-ctx.Done():
fmt.Println("timeout or do something else")
// retry or recycle resource
return
}

针对第二和第三点,欢迎留下你的思考。

参考

cron package - github.com/robfig/cron - Go Packages

更多该系列文章,参考medium链接:
https://wesley-wei.medium.com/list/you-should-know-in-golang-e9491363cd9a

English post: https://programmerscareer.com/golang-time-control/
作者:Wesley Wei – Twitter Wesley Wei – Medium
注意:原文在 2024-06-18 09:57 时创作于 https://programmerscareer.com/golang-time-control/. 本文为作者原创,转载请注明出处。

拥抱AI时代的到来:工具及概念介绍 介绍Go1.23中的iter包

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×