0%

深入理解 go sync.Cond

在 go 的标准库中,提供了 sync.Cond 这个并发原语,让我们可以实现多个 goroutine 等待某一条件满足之后再继续执行。 它需要配合 sync.Mutex 一起使用,因为 CondWait 方法需要在 Mutex 的保护下才能正常工作。 对于条件变量,可能大多数人只是知道它的存在,但是用到它的估计寥寥无几,因为很多并发场景的处理都能使用 chan 来实现, 而且 chan 的使用也更加简单。 但是在某些场景下,Cond 可能是最好的选择,本文就来探讨一下 Cond 的使用场景,基本用法,以及它的实现原理。

sync.Cond 是什么?

sync.Cond 表示的是条件变量,它是一种同步机制,用来协调多个 goroutine 之间的同步,当共享资源的状态发生变化的时候, 可以通过条件变量来通知所有等待的 goroutine 去重新获取共享资源。

适用场景

在实际使用中,我们可能会有多个 goroutine 在执行的过程中,由于某一条件不满足而阻塞的情况。 这个时候,我们就可以使用条件变量来实现 goroutine 之间的同步。比如,我们有一个 goroutine 用来获取数据, 但是可能会比较耗时,这个时候,我们就可以使用条件变量来实现 goroutine 之间的同步, 当数据准备好之后,就可以通过条件变量来通知所有等待的 goroutine 去重新获取共享资源。

sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候, 它可以用来通知所有等待的 goroutine 去重新获取共享资源。

sync.Cond 的基本用法

sync.Cond 的基本用法非常简单,我们只需要通过 sync.NewCond 方法来创建一个 Cond 实例, 然后通过 Wait 方法来等待条件满足,通过 Signal 或者 Broadcast 方法来通知所有等待的 goroutine 去重新获取共享资源。

NewCond 创建实例

sync.NewCond 方法用来创建一个 Cond 实例,它的参数是一个 Locker 接口,我们可以传入一个 Mutex 或者 RWMutex 实例。 这个条件变量的 Locker 接口就是用来保护共享资源的。

Wait 等待条件满足

Wait 方法用来等待条件满足,它会先释放 Cond 的锁(Cond.L),然后阻塞当前 goroutine(实际调用的是 goparkunlock),直到被 Signal 或者 Broadcast 唤醒。

它做了如下几件事情:

  1. 释放 Cond 的锁(Cond.L),然后阻塞当前 goroutine。(所以,使用之前需要先锁定)
  2. Signal 或者 Broadcast 唤醒之后,会重新获取 Cond 的锁(Cond.L)。
  3. 之后,就返回到 goroutine 阻塞的地方继续执行。

Signal 通知一个等待的 goroutine

Signal 方法用来通知一个等待的 goroutine,它会唤醒一个等待的 goroutine,然后继续执行当前 goroutine。 如果没有等待的 goroutine,则不会有任何操作。

Broadcast 通知所有等待的 goroutine

Broadcast 方法用来通知所有等待的 goroutine,它会唤醒所有等待的 goroutine,然后继续执行当前 goroutine。 如果没有等待的 goroutine,则不会有任何操作。

sync.Cond 使用实例

下面我们通过一个实例来看一下 sync.Cond 的使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package cond

import (
"fmt"
"sync"
"testing"
"time"
)

var done bool
var data string

func write(c *sync.Cond) {
fmt.Println("writing.")
// 让 reader 先获取锁,模拟条件不满足然后 wait 的情况
time.Sleep(time.Millisecond * 10)
c.L.Lock()
// 模拟耗时的写操作
time.Sleep(time.Millisecond * 50)
data = "hello world"
done = true
fmt.Println("writing done.")
c.L.Unlock()
c.Broadcast()
}

func read(c *sync.Cond) {
fmt.Println("reading")
c.L.Lock()
for !done {
fmt.Println("reader wait.")
c.Wait()
}
fmt.Println("read done.")
fmt.Println("data:", data)
defer c.L.Unlock()
}

func TestCond(t *testing.T) {
var c = sync.NewCond(&sync.Mutex{})

go read(c) // 读操作
go read(c) // 读操作
go write(c) // 写操作

time.Sleep(time.Millisecond * 100) // 等待操作完成
}

输出:

1
2
3
4
5
6
7
8
9
10
reading
reader wait. // 还没获取完数据,需要等待
writing.
reading
reader wait.
writing done. // 获取完数据了,通知所有等待的 reader
read done. // 读取到数据了
data: hello world // 输出读取到的数据
read done.
data: hello world

这个例子可以粗略地用下图来表示:

cond_1

说明:

  • read1reader2 表示两个 goroutine,它们都会调用 read 函数。
  • donefalse 的时候,reader1reader2 都会调用 c.Wait() 函数,然后阻塞等待。
  • write 表示一个 goroutine,它会调用 write 函数。
  • write 函数中,获取完数据之后,会将 done 设置为 true,然后调用 c.Broadcast() 函数,通知所有等待的 reader 去重新获取共享资源。
  • reader1reader2 在解除阻塞状态后,都会重新获取共享资源,然后输出读取到的数据。

在这个例子中,done 的功能是标记,用来表示共享资源是否已经获取完毕,如果没有获取完毕,那么 reader 就会阻塞等待。

为什么要用 sync.Cond?

在文章开头,我们说了,很多并发编程的问题都可以通过 channel 来解决。 同样的,在上面提到的 sync.Cond 的使用场景,使用 channel 也是可以实现的, 我们只要 close(ch) 来关闭 channel 就可以实现通知多个等待的协程了。

那么为什么还要用 sync.Cond 呢? 主要原因是,sync.Cond 可以重复地进行 Wait()Signal()Broadcast() 操作, 但是,如果想通过关闭 chan 来实现这个功能的话,那就只能通知一次了。 因为 channel 只能关闭一次,关闭一个已经关闭的 channel 会导致程序 panic。

使用 channel 的另外一种方式是,记录 reader 的数量,然后通过往 channel 中发送多次数据来实现通知多个 reader。 但是这样一来代码就会复杂很多,从另一个角度说,出错的概率大了很多。

close channel 广播实例

下面的例子模拟了使用 close(chan) 来实现 sync.Cond 中那种广播功能,但是只能通知一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package close_chan

import (
"fmt"
"testing"
"time"
)

var data string

func read(c <-chan struct{}) {
fmt.Println("reading.")

// 从 chan 接收数据,如果 chan 中没有数据,会阻塞。
// 如果能接收到数据,或者 chan 被关闭,会解除阻塞状态。
<-c

fmt.Println("data:", data)
}

func write(c chan struct{}) {
fmt.Println("writing.")
// 模拟耗时的写操作
time.Sleep(time.Millisecond * 10)
data = "hello world"
fmt.Println("write done.")

// 关闭 chan 的时候,会通知所有的 reader
// 所有等待从 chan 接收数据的 goroutine 都会被唤醒
close(c)
}

func TestCloseChan(t *testing.T) {
ch := make(chan struct{})

go read(ch)
go read(ch)
go write(ch)

// 不能关闭已经关闭的 chan
time.Sleep(time.Millisecond * 20)
// panic: close of closed channel
// 下面这行代码会导致 panic
//go write(ch)

time.Sleep(time.Millisecond * 100)
}

输出:

1
2
3
4
5
6
writing.
reading. // 会阻塞直到写完
reading. // 会阻塞直到写完
write done. // 写完之后,才能读
data: hello world
data: hello world

上面例子的 write 不能多次调用,否则会导致 panic。

sync.Cond 基本原理

go 的 sync.Cond 中维护了一个链表,这个链表记录了所有阻塞的 goroutine,也就是由于调用了 Wait 而阻塞的 goroutine。 而 SignalBroadcast 方法就是用来唤醒这个链表中的 goroutine 的。 Signal 方法只会唤醒链表中的第一个 goroutine,而 Broadcast 方法会唤醒链表中的所有 goroutine

下图是 Signal 方法的效果,可以看到,Signal 方法只会唤醒链表中的第一个 goroutine

cond_2

说明:

  • notifyListsync.Cond 中维护的一个链表,这个链表记录了所有阻塞的 goroutine
  • head 是链表的头节点,tail 是链表的尾节点。
  • Signal 方法只会唤醒链表中的第一个 goroutine

Broadcast 方法会唤醒 notifyList 中的所有 goroutine

sync.Cond 的设计与实现

最后,我们来看一下 sync.Cond 的设计与实现。

sync.Cond 模型

sync.Cond 的模型如下所示:

1
2
3
4
5
6
7
8
9
type Cond struct {
noCopy noCopy

// L is held while observing or changing the condition
L Locker // L 在观察或改变条件时被持有

notify notifyList
checker copyChecker
}

属性说明:

  • noCopy 是一个空结构体,用来检查 sync.Cond 是否被复制。(在编译前通过 go vet 命令来检查)
  • L 是一个 Locker 接口,用来保护条件变量。
  • notify 是一个 notifyList 类型,用来记录所有阻塞的 goroutine
  • checker 是一个 copyChecker 类型,用来检查 sync.Cond 是否被复制。(如果在运行时被复制,会导致 panic

notifyList 结构体

notifyListsync.Cond 中维护的一个链表,这个链表记录了所有因为共享资源还没准备好而阻塞的 goroutine。它的定义如下所示:

1
2
3
4
5
6
7
8
9
type notifyList struct {
wait atomic.Uint32
notify uint32

// 阻塞的 waiter 名单。
lock mutex // 锁
head *sudog // 阻塞的 goroutine 链表(链表头)
tail *sudog // 阻塞的 goroutine 链表(链表尾)
}

属性说明:

  • wait 是下一个 waiter 的编号。它在锁外自动递增。
  • notify 是下一个要通知的 waiter 的编号。它可以在锁外读取,但只能在持有锁的情况下写入。
  • lock 是一个 mutex 类型,用来保护 notifyList
  • head 是一个 sudog 类型,用来记录阻塞的 goroutine 链表的头节点。
  • tail 是一个 sudog 类型,用来记录阻塞的 goroutine 链表的尾节点。

notifyList 的方法说明:

notifyList 中包含了几个操作阻塞的 goroutine 链表的方法。

  • notifyListAdd 方法将 waiter 的编号加 1。
  • notifyListWait 方法将当前的 goroutine 加入到 notifyList 中。(也就是将当前协程挂起)
  • notifyListNotifyOne 方法将 notifyList 中的第一个 goroutine 唤醒。
  • notifyListNotifyAll 方法将 notifyList 中的所有 goroutine 唤醒。
  • notifyListCheck 方法检查 notifyList 的大小是否正确。

sync.Cond 的方法

notifyList 就不细说了,本文重点讲解一下 sync.Cond 的实现。

Wait 方法

Wait 方法用在当条件不满足的时候,将当前运行的协程挂起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *Cond) Wait() {
// 检查是否被复制
c.checker.check()
// 更新 notifyList 中需要等待的 waiter 的数量
// 返回当前需要插入 notifyList 的编号
t := runtime_notifyListAdd(&c.notify)
// 解锁
c.L.Unlock()
// 挂起当前 g,直到被唤醒
runtime_notifyListWait(&c.notify, t)
// 唤醒之后,重新加锁。
// 因为阻塞之前解锁了。
c.L.Lock()
}

对于 Wait 方法,我们需要注意的是,使用之前,我们需要先调用 L.Lock() 方法加锁,然后再调用 Wait 方法,否则会报错。

文档里面的例子:

1
2
3
4
5
6
7
c.L.Lock()
for !condition() {
c.Wait()
}
// ...使用条件...
// 这里是我们在条件满足之后,需要执行的代码。
c.L.Unlock()

好了,问题来了,调用 Wait 方法之前为什么要先加锁呢?

这是因为在我们使用共享资源的时候,可能一些代码是互斥的,所以我们需要加锁。 这样我们就可以保证在我们使用共享资源的时候,不会被其他协程修改。 但是如果因为条件不满足,我们需要等待的话,我们不可能在持有锁的情况下等待, 因为在修改条件的时候,可能也需要加锁,这样就会造成死锁。

另外一个问题是,为什么要使用 for 来检查条件是否满足,而不是使用 if 呢?

这是因为在我们调用 Wait 方法之后,可能会有其他协程唤醒我们,但是条件并没有满足, 这个时候依然是需要继续 Wait 的。

Signal 方法

Signal 方法用在当条件满足的时候,将 notifyList 中的第一个 goroutine 唤醒。

1
2
3
4
5
6
func (c *Cond) Signal() {
// 检查 sync.Cond 是否被复制了
c.checker.check()
// 唤醒 notifyList 中的第一个 goroutine
runtime_notifyListNotifyOne(&c.notify)
}

Broadcast 方法

Broadcast 方法用在当条件满足的时候,将 notifyList 中的所有 goroutine 唤醒。

1
2
3
4
5
6
func (c *Cond) Broadcast() {
// 检查 sync.Cond 是否被复制了
c.checker.check()
// 唤醒 notifyList 中的所有 goroutine
runtime_notifyListNotifyAll(&c.notify)
}

copyChecker 结构体

copyChecker 结构体用来检查 sync.Cond 是否被复制。它实际上只是一个 uintptr 类型的值。

1
2
3
4
5
6
7
8
9
10
type copyChecker uintptr

// check 方法检查 copyChecker 是否被复制了。
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}

copyChecker 的值只有两种可能:

  1. 0,表示还没有调用过 Wait, SignalBroadcast 方法。
  2. uintptr(unsafe.Pointer(&copyChecker)),表示已经调用过 Wait, SignalBroadcast 方法。在这几个方法里面会调用 check 方法,所以 copyChecker 的值会被修改。

所以如果 copyChecker 的值不是 0,也不是 uintptr(unsafe.Pointer(&copyChecker))(也就是最初的 copyChecker 的内存地址),则表示 copyChecker 被复制了。

需要注意的是,这个方法在调用 CompareAndSwapUintptr 还会检查一下,这是因为有可能会并发调用 CompareAndSwapUintptr, 如果另外一个协程调用了 CompareAndSwapUintptr 并且成功了,那么当前协程的这个 CompareAndSwapUintptr 调用会返回 false, 这个时候就需要检查是否是因为另外一个协程调用了 CompareAndSwapUintptr 而导致的,如果是的话,就不会 panic

为什么 sync.Cond 不能被复制?

从上一小节中我们可以看到,sync.Cond 其实是不允许被复制的,但是如果是在调用 Wait, SignalBroadcast 方法之前复制,那倒是没关系。

这是因为 sync.Cond 中维护了一个阻塞的 goroutine 列表。如果 sync.Cond 被复制了,那么这个列表就会被复制,这样就会导致两个 sync.Cond 都包含了这个列表;但是我们唤醒的时候,只会有其中一个 sync.Cond 被唤醒,另外一个 sync.Cond 就会一直阻塞。 所以 go 直接从语言层面限制了这种情况,不允许 sync.Cond 被复制。

总结

  • sync.Cond 是一个条件变量,它可以用来协调多个 goroutine 之间的同步,当条件满足的时候,去通知那些因为条件不满足被阻塞的 goroutine 继续执行。
  • sync.Cond 的接口比较简单,只有 Wait, SignalBroadcast 三个方法。
    • Wait 方法用来阻塞当前 goroutine,直到条件满足。调用 Wait 方法之前,需要先调用 L.Lock 方法加锁。
    • Signal 方法用来唤醒 notifyList 中的第一个 goroutine
    • Broadcast 方法用来唤醒 notifyList 中的所有 goroutine
  • sync.Cond 的实现也比较简单,它的核心就是 notifyList,它是一个链表,用来保存所有因为条件不满足而被阻塞的 goroutine
  • 用关闭 channel 的方式也可以实现类似的广播功能,但是有个问题是 channel 不能被重复关闭,所以这种方式无法被多次使用。也就是说使用这种方式无法多次广播。
  • 使用 channel 发送通知的方式也是可以的,但是这样实现起来就复杂很多了,就更容易出错了。
  • sync.Cond 中使用 copyChecker 来检查 sync.Cond 是否被复制,如果被复制了,就会 panic。需要注意的是,这里的复制是指调用了 WaitSignalBroadcast 方法之后,sync.Cond 被复制了。在调用这几个方法之前进行复制是没有影响的。