Go: 监控模式
Go: 监控模式
Go: 监控模式
Go 能实现监控模式,归功于 sync
包和 sync.Cond
结构体。监控模式允许 goroutine 在进入睡眠模式前等待一个定特定条件,而不会阻塞执行或消耗资源。
条件变量
我们举个例子,来看看这个模式可以带来的好处。我将使用 Bryan Mills 的演示文稿中提供的示例:
1type Item = int
2type Queue struct {
3 items []Item
4 *sync.Cond
5}
6func NewQueue() *Queue {
7 q := new(Queue)
8 q.Cond = sync.NewCond(&sync.Mutex{})
9 return q
10}
11func (q *Queue) Put(item Item) {
12 q.L.Lock()
13 defer q.L.Unlock()
14 q.items = append(q.items, item)
15 q.Signal()
16}
17func (q *Queue) GetMany(n int) []Item {
18 q.L.Lock()
19 defer q.L.Unlock()
20 for len(q.items) < n {
21 q.Wait()
22 }
23 items := q.items[:n:n]
24 q.items = q.items[n:]
25 return items
26}
27func main() {
28 q := NewQueue()
29 var wg sync.WaitGroup
30 for n := 10; n > 0; n-- {
31 wg.Add(1)
32 go func(n int) {
33 items := q.GetMany(n)
34 fmt.Printf("%2d: %2d\n", n, items)
35 wg.Done()
36 }(n)
37 }
38 for i := 0; i < 100; i++ {
39 q.Put(i)
40 }
41 wg.Wait()
42}
Queue
是一个非常简单的结体构,由一个切片和 sync.Cond
结构组成。然后,我们做两件事:
- 启动 10 个 goroutines,并将尝试一次消费 X 个元素。如果这些元素不够数目,那么 goroutine 将进去睡眠状态并等待被唤醒
- 主 goroutine 将用 100 个元素填入队列。每添加一个元素,它将唤醒一个等待消费的 goroutine。
程序的输出,
1 4: [31 32 33 34]
2 8: [10 11 12 13 14 15 16 17]
3 5: [35 36 37 38 39]
4 3: [ 7 8 9]
5 6: [40 41 42 43 44 45]
6 2: [18 19]
7 9: [46 47 48 49 50 51 52 53 54]
810: [21 22 23 24 25 26 27 28 29 30]
9 1: [20]
10 7: [ 0 1 2 3 4 5 6]
如果多次运行此程序,将获得不同的输出。我们可以看到,由于是按批次检索值的,每个 goroutine 获取的值是一个连续的序列。这一点对于理解 sync.Cond
与 channels
的差异很重要。
sync.Cond vs Channels
用单个 channel
解决这个问题并不容易,因为它会被消费者一个接一个地拉出来。
为了解决这个问题,Bryan Mills 编写了一个包含两个通道组合的等价解决方案(第65页):
1type Item = int
2type waiter struct {
3 n int
4 c chan []Item
5}
6type state struct {
7 items []Item
8 wait []waiter
9}
10type Queue struct {
11 s chan state
12}
13func NewQueue() *Queue {
14 s := make(chan state, 1)
15 s <- state{}
16 return &Queue{s}
17}
18func (q *Queue) Put(item Item) {
19 s := <-q.s
20 s.items = append(s.items, item)
21 for len(s.wait) > 0 {
22 w := s.wait[0]
23 if len(s.items) < w.n {
24 break
25 }
26 w.c <- s.items[:w.n:w.n]
27 s.items = s.items[w.n:]
28 s.wait = s.wait[1:]
29 }
30 q.s <- s
31}
32func (q *Queue) GetMany(n int) []Item {
33 s := <-q.s
34 if len(s.wait) == 0 && len(s.items) >= n {
35 items := s.items[:n:n]
36 s.items = s.items[n:]
37 q.s <- s
38 return items
39 }
40 c := make(chan []Item)
41 s.wait = append(s.wait, waiter{n, c})
42 q.s <- s
43 return <-c
44}
结果类似:
11: [ 0]
210: [ 1 2 3 4 5 6 7 8 9 10]
35: [11 12 13 14 15]
48: [16 17 18 19 20 21 22 23]
56: [24 25 26 27 28 29]
63: [37 38 39]
77: [30 31 32 33 34 35 36]
89: [46 47 48 49 50 51 52 53 54]
92: [44 45]
104: [40 41 42 43]
在可读性和语义方面,条件变量在这里可能有一个小优势。但是,它也有限制。
注意事项
我们运行包含 100 个元素的基准测试,如示例所示:
1WithCond-8 15.7µs ± 2%
2WithChan-8 19.4µs ± 1%
在这里使用条件变量要快一些。让我们试试 10k 个元素的基准测试:
1WithCond-8 2.84ms ± 1%
2WithChan-8 917µs ± 1%
可以看到 channel
的速度要快得多。 Bryan Mills 在“饥饿”部分(第45页)中解释了这个问题:
假设我们调用 GetMany(3000) 的同时有一个调用者在密集的循环中执行 GetMany(3)。两个服务可能几乎同时醒来,但 GetMany(3) 调用将能够消耗三个元素,而 GetMany(3000) 将没有足够的元素就绪。队列将保持耗尽状态,较大的调用将一直阻塞。 该演示文稿还强调了在处理条件变量时我们可能面临的其他问题。如果模式看起来很简单,我们在使用它时应该小心。之前看到的例子向我们展示了如何更有效地使用
channel
并通过通信进行共享。
内部流程
内部实现非常简单,基于发号系统。以下是上一个示例的简单表示:
进入等待模式的每个 goroutine 将从变量 wait
开始分号,该变量从 0 开始。这表示等待队列。
然后,每次调用 Signal()
都会增加另一个名为 notify
的计数器,该计数器代表需要通知或唤醒的 goroutine 队列。
我们的 sync.Cond
结构包含一个负责发号的结构:
1type notifyList struct {
2 wait uint32
3 notify uint32
4 lock uintptr
5 head unsafe.Pointer
6 tail unsafe.Pointer
7}
这是就是上面提到的 wait
和 notify
变量。该结构还通过 head
和 tail
保存等待的 goroutine 的链表,其中每个 goroutine 在其内部结构中保持对所获取的票号的引用。
当收到信号时,Go 会在链表上进行迭代,直到分配给被检查的 goroutine 的票号与 notify
变量的编号匹配,如匹配则唤醒当前票号的 goroutine。一旦找到 goroutine,其状态将从等待模式变为可运行模式,然后在 Go 调度程序中处理。
如果你想深入了解 Go 调度程序,我强烈建议你阅读 William Kennedy 关于 Go 调度程序的教程。
https://medium.com/a-journey-with-go/go-monitor-pattern-9decd26fb28
作者:Vincent Blanchon 译者:咔叽咔叽 校对:DingdingZhou