0%

在上一篇文章《深入理解 go Mutex》中, 我们已经对 go Mutex 的实现原理有了一个大致的了解,也知道了 Mutex 可以实现并发读写的安全。 今天,我们再来看看另外一种锁,RWMutex,有时候,其实我们读数据的频率要远远高于写数据的频率, 而且不同协程应该可以同时读取的,这个时候,RWMutex 就派上用场了。

RWMutex 的实现原理和 Mutex 类似,只是在 Mutex 的基础上,区分了读锁和写锁:

  • 读锁:只要没有写锁,就可以获取读锁,多个协程可以同时获取读锁(可以并行读)。
  • 写锁:只能有一个协程获取写锁,其他协程想获取读锁或写锁都只能等待。

下面就让我们来深入了解一下 RWMutex 的基本使用和实现原理等内容。

RWMutex 的整体模型

正如 RWMutex 的命名那样,它是区分了读锁和写锁的锁,所以我们可以从读和写两个方面来看 RWMutex 的模型。

下文中的 reader 指的是进行读操作的 goroutine,writer 指的是进行写操作的 goroutine。

读操作模型

我们可以用下图来表示 RWMutex 的读操作模型:

rwmutex_1

上图使用了 w.Lock,是因为 RWMutex 的实现中,写锁是使用 Mutex 来实现的。

说明:

  • 读操作的时候可以同时有多个 goroutine 持有 RLock,然后进入临界区。(也就是可以并行读),上图的 G1G2G3 就是同时持有 RLock 的几个 goroutine。
  • 在读操作的时候,如果有 goroutine 持有 RLock,那么其他 goroutine (不管是读还是写)就只能等待,直到所有持有 RLock 的 goroutine 释放锁。
  • 也就是上图的 G4 需要等待 G1G2G3 释放锁之后才能进入临界区。
  • 最后,因为 G5G6 这两个协程获取锁的时机比 G4 晚,所以它们会在 G4 释放锁之后才能进入临界区。

写操作模型

我们可以用下图来表示 RWMutex 的写操作模型:

rwmutex_2

说明:

  • 写操作的时候只能有一个 goroutine 持有 Lock,然后进入临界区,释放写锁之前,所有其他的 goroutine 都只能等待。
  • 上图的 G1~G5 表示的是按时间顺序先后获取锁的几个 goroutine。
  • 上面几个 goroutine 获取锁的过程是:
    • G1 获取写锁,进入临界区。然后 G2G3G4G5 都在等待。
    • G1 释放写锁之后,G2G3 可以同时获取读锁,进入临界区。然后 G3G4G5 都在等待。
    • G2G3 可以同时获取读锁,进入临界区。然后 G4G5 都在等待。
    • G2G3 释放读锁之后,G4 获取写锁,进入临界区。然后 G5 在等待。
    • 最后,G4 释放写锁,G5 获取读锁,进入临界区。

基本用法

RWMutex 中包含了以下的方法:

  • Lock:获取写锁,如果有其他 goroutine 持有读锁或写锁,那么就会阻塞等待。
  • Unlock:释放写锁。
  • RLock:获取读锁,如果有其他 goroutine 持有写锁,那么就会阻塞等待。
  • RUnlock:释放读锁。

其他不常用的方法:

  • RLocker:返回一个读锁,该锁包含了 RLockRUnlock 方法,可以用来获取读锁和释放读锁。
  • TryLock: 尝试获取写锁,如果获取成功,返回 true,否则返回 false。不会阻塞等待。
  • TryRLock: 尝试获取读锁,如果获取成功,返回 true,否则返回 false。不会阻塞等待。

一个简单的例子

我们可以通过下面的例子来看一下 RWMutex 的基本用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package mutex

import (
"sync"
"testing"
)

var config map[string]string
var mu sync.RWMutex

func TestRWMutex(t *testing.T) {
config = make(map[string]string)

// 启动 10 个 goroutine 来写
var wg1 sync.WaitGroup
wg1.Add(10)
for i := 0; i < 10; i++ {
go func() {
set("foo", "bar")
wg1.Done()
}()
}

// 启动 100 个 goroutine 来读
var wg2 sync.WaitGroup
wg2.Add(100)
for i := 0; i < 100; i++ {
go func() {
get("foo")
wg2.Done()
}()
}

wg1.Wait()
wg2.Wait()
}

// 获取配置
func get(key string) string {
// 获取读锁,可以多个 goroutine 并发读取
mu.RLock()
defer mu.RUnlock()

if v, ok := config[key]; ok {
return v
}

return ""
}

// 设置配置
func set(key, val string) {
// 获取写锁
mu.Lock()
defer mu.Unlock()

config[key] = val
}

上面的例子中,我们启动了 10 个 goroutine 来写配置,启动了 100 个 goroutine 来读配置。 这跟我们现实开发中的场景是一样的,很多时候其实是读多写少的。 如果我们在读的时候也使用互斥锁,那么就会导致读的性能非常差,因为读操作一般都不会有副作用的,但是如果使用互斥锁,那么就只能一个一个的读了。

而如果我们使用 RWMutex,那么就可以同时有多个 goroutine 来读取配置,这样就可以大大提高读的性能。 因为我们进行读操作的时候,可以多个 goroutine 并发读取,这样就可以大大提高读的性能。

RWMutex 使用的注意事项

《深入理解 go Mutex》中,我们已经讲过了 Mutex 的使用注意事项, 其实 RWMutex 的使用注意事项也是差不多的:

  • 不要忘记释放锁,不管是读锁还是写锁。
  • Lock 之后,没有释放锁之前,不能再次使用 Lock
  • Unlock 之前,必须已经调用了 Lock,否则会 panic
  • 在第一次使用 RWMutex 之后,不能复制,因为这样一来 RWMutex 的状态也会被复制。这个可以使用 go vet 来检查。

源码剖析

RWMutex 的一些实现原理跟 Mutex 是一样的,比如阻塞的时候使用信号量等,在 Mutex 那一篇中已经有讲解了,这里不再赘述。 这里就 RWMutex 的实现原理进行一些简单的剖析。

RWMutex 结构体

RWMutex 的结构体定义如下:

1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // 互斥锁,用于保护读写锁的状态
writerSem uint32 // writer 信号量
readerSem uint32 // reader 信号量
readerCount atomic.Int32 // 所有 reader 数量
readerWait atomic.Int32 // writer 等待完成的 reader 数量
}

各字段含义:

  • w:互斥锁,用于保护读写锁的状态。RWMutex 的写锁是互斥锁,所以直接使用 Mutex 就可以了。
  • writerSem:writer 信号量,用于实现写锁的阻塞等待。
  • readerSem:reader 信号量,用于实现读锁的阻塞等待。
  • readerCount:所有 reader 数量(包括已经获取读锁的和正在等待获取读锁的 reader)。
  • readerWait:writer 等待完成的 reader 数量(也就是获取写锁的时刻,已经获取到读锁的 reader 数量)。

因为要区分读锁和写锁,所以在 RWMutex 中,我们需要两个信号量,一个用于实现写锁的阻塞等待,一个用于实现读锁的阻塞等待。 我们需要特别注意的是 readerCountreaderWait 这两个字段,我们可能会比较好奇,为什么有了 readerCount 这个字段, 还需要 readerWait 这个字段呢?

这是因为,我们在尝试获取写锁的时候,可能会有多个 reader 正在使用读锁,这时候我们需要知道有多少个 reader 正在使用读锁, 等待这些 reader 释放读锁之后,就获取写锁了,而 readerWait 这个字段就是用来记录这个数量的。 在 Lock 中获取写锁的时候,如果观测到 readerWait 不为 0 则会阻塞等待,直到 readerWait 为 0 之后才会真正获取写锁,然后才可以进行写操作。

读锁源码剖析

获取读锁的方法如下:

1
2
3
4
5
6
7
// 获取读锁
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
// 有 writer 在使用锁,阻塞等待 writer 完成
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}

读锁的实现很简单,先将 readerCount 加 1,如果加 1 之后的值小于 0,说明有 writer 正在使用锁,那么就需要阻塞等待 writer 完成。

释放读锁的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 释放读锁
func (rw *RWMutex) RUnlock() {
// readerCount 减 1,如果 readerCount 小于 0 说明有 writer 在等待
if r := rw.readerCount.Add(-1); r < 0 {
// 有 writer 在等待,唤醒 writer
rw.rUnlockSlow(r)
}
}

// 唤醒 writer
func (rw *RWMutex) rUnlockSlow(r int32) {
// 未 Lock 就 Unlock,panic
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
// readerWait 减 1,返回值是新的 readerWait 值
if rw.readerWait.Add(-1) == 0 {
// 最后一个 reader 唤醒 writer
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

读锁的实现总结:

  • 获取读锁的时候,会将 readerCount 加 1
  • 如果正在获取读锁的时候,发现 readerCount 小于 0,说明有 writer 正在使用锁,那么就需要阻塞等待 writer 完成。
  • 释放读锁的时候,会将 readerCount 减 1
  • 如果 readerCount 减 1 之后小于 0,说明有 writer 正在等待,那么就需要唤醒 writer。
  • 唤醒 writer 的时候,会将 readerWait 减 1,如果 readerWait 减 1 之后为 0,说明 writer 获取锁的时候存在的 reader 都已经释放了读锁,可以获取写锁了。

·rwmutexMaxReaders算是一个特殊的标识,在获取写锁的时候会将readerCount的值减去rwmutexMaxReaders, 所以在其他地方可以根据readerCount` 是否小于 0 来判断是否有 writer 正在使用锁。

写锁源码剖析

获取写锁的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 获取写锁
func (rw *RWMutex) Lock() {
// 首先,解决与其他写入者的竞争。
rw.w.Lock()
// 向读者宣布有一个待处理的写入。
// r 就是当前还没有完成的读操作,等这部分读操作完成之后才可以获取写锁。
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// 等待活跃的 reader
if r != 0 && rw.readerWait.Add(r) != 0 {
// 阻塞,等待最后一个 reader 唤醒
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}

释放写锁的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 释放写锁
func (rw *RWMutex) Unlock() {
// 向 readers 宣布没有活动的 writer。
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders { // r >= 0 并且 < rwmutexMaxReaders 才是正常的(r 是持有写锁期间尝试获取读锁的 reader 数量)
fatal("sync: Unlock of unlocked RWMutex")
}
// 如果有 reader 在等待写锁释放,那么唤醒这些 reader。
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 允许其他的 writer 继续进行。
rw.w.Unlock()
}

写锁的实现总结:

  • 获取写锁的时候,会将 readerCount 减去 rwmutexMaxReaders,这样就可以区分读锁和写锁了。
  • 如果 readerCount 减去 rwmutexMaxReaders 之后不为 0,说明有 reader 正在使用读锁,那么就需要阻塞等待这些 reader 释放读锁。
  • 释放写锁的时候,会将 readerCount 加上 rwmutexMaxReaders
  • 如果 readerCount 加上 rwmutexMaxReaders 之后大于 0,说明有 reader 正在等待写锁释放,那么就需要唤醒这些 reader。

TryRLock 和 TryLock

TryRLockTryLock 的实现都很简单,都是尝试获取读锁或者写锁,如果获取不到就返回 false,获取到了就返回 true,这两个方法不会阻塞等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// TryRLock 尝试锁定 rw 以进行读取,并报告是否成功。
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
// 有 goroutine 持有写锁
if c < 0 {
return false
}
// 尝试获取读锁
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}

// TryLock 尝试锁定 rw 以进行写入,并报告是否成功。
func (rw *RWMutex) TryLock() bool {
// 写锁被占用
if !rw.w.TryLock() {
return false
}
// 读锁被占用
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
// 释放写锁
rw.w.Unlock()
return false
}
// 成功获取到锁
return true
}

总结

RWMutex 使用起来比较简单,相比 Mutex 而言,它区分了读锁和写锁,可以提高并发性能。最后,总结一下本文内容:

  • RWMutex 有两种锁:读锁和写锁。
  • 读锁可以被多个 goroutine 同时持有,写锁只能被一个 goroutine 持有。也就是可以并发读,但只能互斥写。
  • 写锁被占用的时候,其他的读和写操作都会被阻塞。读锁被占用的时候,其他的写操作会被阻塞,但是读操作不会被阻塞。除非读操作发生在一个新的写操作之后。
  • RWMutex 包含以下几个方法:
    • Lock:获取写锁,如果有其他的写锁或者读锁被占用,那么就会阻塞等待。
    • Unlock:释放写锁。
    • RLock:获取读锁,如果写锁被占用,那么就会阻塞等待。
    • RUnlock:释放读锁。
  • 也包含了两个非阻塞的方法:
    • TryLock:尝试获取写锁,如果获取不到就返回 false,获取到了就返回 true
    • TryRLock:尝试获取读锁,如果获取不到就返回 false,获取到了就返回 true
  • RWMutex 使用的注意事项跟 Mutex 差不多:
    • 使用之后不能复制
    • Unlock 之前需要有 Lock 调用,否则 panicRUnlock 之前需要有 RLock 调用,否则 panic
    • 不要忘记使用 UnlockRUnlock 释放锁。
  • RWMutex 的实现:
    • 写锁还是使用 Mutex 来实现。
    • 获取读锁和写锁的时候,如果获取不到都会阻塞等待,直到被唤醒。
    • 获取写锁的时候,会将 readerCount 减去 rwmutexMaxReaders,这样就可以直到有写锁被占用。释放写锁的时候,会将 readerCount 加上 rwmutexMaxReaders
    • 获取写锁的时候,如果还有读操作未完成,那么这一次获取写锁只会等待这部分未完成的读操作完成。所有后续的操作只能等待这一次写锁释放。

在我们的日常开发中,总会有时候需要对一些变量做并发读写,比如 web 应用在同时接到多个请求之后, 需要对一些资源做初始化,而这些资源可能是只需要初始化一次的,而不是每一个 http 请求都初始化, 在这种情况下,我们需要限制只能一个协程来做初始化的操作,比如初始化数据库连接等, 这个时候,我们就需要有一种机制,可以限制只有一个协程来执行这些初始化的代码。 在 go 语言中,我们可以使用互斥锁(Mutex)来实现这种功能。

互斥锁的定义

这里引用一下维基百科的定义:

互斥锁(Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两个线程同时对同一公共资源 (比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。 临街区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。

互斥,顾名思义,也就是只有一个线程能持有锁。当然,在 go 中,是只有一个协程能持有锁。

下面是一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
var sum int // 和
var mu sync.Mutex // 互斥锁

// add 将 sum 加 1
func add() {
// 获取锁,只能有一个协程获取到锁,
// 其他协程需要阻塞等待锁释放才能获取到锁。
mu.Lock()
// 临界区域
sum++
mu.Unlock()
}

func TestMutex(t *testing.T) {
// 启动 1000 个协程
var wg sync.WaitGroup
wg.Add(1000)

for i := 0; i < 1000; i++ {
go func() {
// 每个协程里面调用 add()
add()
wg.Done()
}()
}

// 等待所有协程执行完毕
wg.Wait()
// 最终 sum 的值应该是 1000
assert.Equal(t, 1000, sum)
}

上面的例子中,我们定义了一个全局变量 sum,用于存储和,然后定义了一个互斥锁 mu, 在 add() 函数中,我们使用 mu.Lock() 来加锁,然后对 sum 进行加 1 操作, 最后使用 mu.Unlock() 来解锁,这样就保证了在任意时刻,只有一个协程能够对 sum 进行加 1 操作, 从而保证了在并发执行 add() 操作的时候 sum 的值是正确的。

上面这个例子,在我之前的文章中已经作为例子出现过很多次了,这里不再赘述了。

go Mutex 的基本用法

Mutex 我们一般只会用到它的两个方法:

  • Lock:获取互斥锁。(只会有一个协程可以获取到锁,通常用在临界区开始的地方。)
  • Unlock: 释放互斥锁。(释放获取到的锁,通常用在临界区结束的地方。)

Mutex 的模型可以用下图表示:

mutex_1

说明:

  • 同一时刻只能有一个协程获取到 Mutex 的使用权,其他协程需要排队等待(也就是上图的 G1->G2->Gn)。
  • 拥有锁的协程从临界区退出的时候需要使用 Unlock 来释放锁,这个时候等待队列的下一个协程可以获取到锁(实际实现比这里说的复杂很多,后面会细说),从而进入临界区。
  • 等待的协程会在 Lock 调用处阻塞,Unlock 的时候会使得一个等待的协程解除阻塞的状态,得以继续执行。

上面提到的这几点也是 Mutex 的基本原理。

互斥锁使用的两个例子

了解了 go Mutex 基本原理之后,让我们再来看看 Mutex 的一些使用的例子。

gin Context 中的 Set 方法

一个很常见的场景就是,并发对 map 进行读写,熟悉 go 的朋友应该知道,go 中的 map 是不支持并发读写的, 如果我们对 map 进行并发读写会导致 panic

而在 ginContext 结构体中,也有一个 map 类型的字段 Keys,用来在上下文间传递键值对数据, 所以在通过 Set 来设置键值对的时候需要使用 c.mu.Lock() 来先获取互斥锁,然后再对 Keys 做设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Set is used to store a new key/value pair exclusively for this context.
// It also lazy initializes c.Keys if it was not used previously.
func (c *Context) Set(key string, value any) {
// 获取锁
c.mu.Lock()
// 如果 Keys 还没初始化,则进行初始化
if c.Keys == nil {
c.Keys = make(map[string]any)
}

// 设置键值对
c.Keys[key] = value
// 释放锁
c.mu.Unlock()
}

同样的,对 Keys 做读操作的时候也需要使用互斥锁:

1
2
3
4
5
6
7
8
9
10
11
// Get returns the value for the given key, ie: (value, true).
// If the value does not exist it returns (nil, false)
func (c *Context) Get(key string) (value any, exists bool) {
// 获取锁
c.mu.RLock()
// 读取 key
value, exists = c.Keys[key]
// 释放锁
c.mu.RUnlock()
return
}

可能会有人觉得奇怪,为什么从 map 中读也还需要锁。这是因为,如果读的时候没有锁保护, 那么就有可能在 Set 设置的过程中,同时也在进行读操作,这样就会 panic 了。

这个例子想要说明的是,像 map 这种数据结构本身就不支持并发读写,我们这种情况下只有使用 Mutex 了。

sync.Pool 中的 pinSlow 方法

sync.Pool 的实现中,有一个全局变量记录了进程内所有的 sync.Pool 对象,那就是 allPools 变量, 另外有一个锁 allPoolsMu 用来保护对 allPools 的读写操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
var (
// 保护 allPools 和 oldPools 的互斥锁。
allPoolsMu Mutex

// allPools is the set of pools that have non-empty primary
// caches. Protected by either 1) allPoolsMu and pinning or 2)
// STW.
allPools []*Pool

// oldPools is the set of pools that may have non-empty victim
// caches. Protected by STW.
oldPools []*Pool
)

pinSlow 方法中会在 allPoolsMu 的保护下对 allPools 做读写操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock() // 获取锁
defer allPoolsMu.Unlock() // 函数返回的时候释放锁
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p) // 全局变量修改
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}

这个例子主要是为了说明使用 mu 的另外一种非常常见的场景:并发读写全局变量

互斥锁使用的注意事项

互斥锁如果使用不当,可能会导致死锁或者出现 panic 的情况,下面是一些常见的错误:

  1. 忘记使用 Unlock 释放锁。
  2. Lock 之后还没 Unlock 之前又使用 Lock 获取锁。也就是重复上锁,go 中的 Mutex 不可重入。
  3. 死锁:位于临界区内不同的两个协程都想获取对方持有的不同的锁。
  4. 还没 Lock 之前就 Unlock。这会导致 panic,因为这是没有任何意义的。
  5. 复制 Mutex,比如将 Mutex 作为参数传递。

对于第 1 点,我们往往可以使用 defer 关键字来做释放锁的操作。第 2 点不太好发现,只能在开发的时候多加注意。 第 3 点我们在使用锁的时候可以考虑尽量避免在临界区内再去使用别的锁。 最后,Mutex 是不可以复制的,这个可以在编译之前通过 go vet 来做检查。

为什么 Mutex 不能被复制呢?因为 Mutex 中包含了锁的状态,如果复制了,那么这个状态也会被复制, 如果在复制前进行 Lock,复制后进行 Unlock,那就意味着 LockUnlock 操作的其实是两个不同的状态, 这样显然是不行的,是释放不了锁的。

虽然不可以复制,但是我们可以通过传递指针类型的参数来传递 Mutex

互斥锁锁定的是什么?

在前一篇文章中,我们提到过,原子操作本质上是变量级的互斥锁。而互斥锁本身锁定的又是什么呢? 其实互斥锁本质上是一个信号量,它通过获取释放信号量,最终使得协程获得某一个代码块的执行权力。

也就是说,互斥锁,锁定的是一块代码块。

我们以 go-zero 里面的 collection/fifo.go 为例子说明一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Take takes the first element out of q if not empty.
func (q *Queue) Take() (any, bool) {
// 获取互斥锁(只能有一个协程获取到锁)
q.lock.Lock()
// 函数返回的时候释放互斥锁(获取到锁的协程释放锁之后,其他协程才能进行抢占锁)
defer q.lock.Unlock()

// 下面的代码只有抢占到(也就是互斥锁锁定的代码块)
if q.count == 0 {
return nil, false
}

element := q.elements[q.head]
q.head = (q.head + 1) % len(q.elements)
q.count--

return element, true
}

除了锁定代码块的这一个作用,有另外一个比较关键的地方也是我们不能忽视的, 那就是 互斥锁并不保证临界区内操作的变量不能被其他协程访问。 互斥锁只能保证一段代码只能一个协程执行,但是对于临界区内涉及的共享资源, 你在临界区外也依然是可以对其进行读写的。

我们以上面的代码说明一下:在上面的 Take 函数中,我们对 q.headq.count 都进行了操作, 虽然这些操作代码位于临界区内,但是临界区并不保证持有锁期间其他协程不会在临界区外去修改 q.headq.count

下面就是一个非常典型的错误的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import (
"fmt"
"sync"
"testing"
)

var mu sync.Mutex
var sum int

// 在锁的保护下对 sum 做读写操作
func test() {
mu.Lock()
sum++
mu.Unlock()
}

func TestMutex(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1000)

for i := 0; i < 500; i++ {
go func() {
test()
wg.Done()
}()

// 位于临界区外,也依然是可以对 sum 做读写操作的。
sum++
}

wg.Wait()

fmt.Println(sum)
}

靠谱的做法是,对于有共享资源的读写的操作都使用 Mutex 保护起来。

当然,如果我们只有一个变量,那么可能使用原子操作就足够了。

互斥锁实现原理

互斥锁的实现有以下几个关键的地方:

  • 信号量:这是操作系统中的同步对象。
  • 等待队列:获取不到互斥锁的协程,会放入到一个先入先出队列的队列尾部。这样信号量释放的时候,可以依次对它们唤醒。
  • 原子操作:互斥锁的实现中,使用了一个字段来记录了几种不同的状态,使用原子操作可以保证几种状态可以一次性变更完成。

我们先来看看 Mutex结构体定义:

1
2
3
4
type Mutex struct {
state int32 // 状态字段
sema uint32 // 信号量
}

其中 state 字段记录了四种不同的信息:

mutex_2

这四种不同信息在源码中定义了不同的常量:

1
2
3
4
5
6
7
8
const (
mutexLocked = 1 << iota // 表示有 goroutine 拥有锁
mutexWoken // 唤醒(就是第 2 位)
mutexStarving // 饥饿(第 3 位)
mutexWaiterShift = iota // 表示第 4 位开始,表示等待者的数量

starvationThresholdNs = 1e6 // 1ms 进入饥饿模式的等待时间阈值
)

sema 的含义比较简单,就是一个用作不同 goroutine 同步的信号量。

信号量

go 的 Mutex 是基于信号量来实现的,那信号量又是什么呢?

维基百科:信号量是一个同步对象,用于保持在 0 至指定最大值之间的一个计数值。当线程完成一次对该 semaphore 对象的等待(wait)时,该计数值减一;当线程完成一次对 semaphore 对象的释放(release)时,计数值加一。

上面这个解释有点难懂,通俗地说,就是一个数字,调用 wait 的时候,这个数字减去 1,调用 release 的时候,这个数字加上 1。 (还有一个隐含的逻辑是,如果这个数小于 0,那么调用 wait 的时候会阻塞,直到它大于 0。)

对应到 go 的 Mutex 中,有两个操作信号量的函数:

  • runtime_Semrelease: 自动递增信号量并通知等待的 goroutine。
  • runtime_SemacquireMutex: 是一直等到信号量大于 0,然后自动递减。

我们注意到了,其实 runtime_SemacquireMutex 是有一个前提条件的,那就是等到信号量大于 0。 其实信号量的两个操作 P/V 就是一个加 1 一个减 1,所以在实际使用的时候,也是需要一个获取锁的操作对应一个释放锁的操作, 否则,其他协程都无法获取到锁,因为信号量一直不满足。

等待队列

go 中如果已经有 goroutine 持有互斥锁,那么其他的协程会放入一个 FIFO 队列中,如下图:

mutex_3

说明:

  • G1 表示持有互斥锁的 goroutine,G2...Gn 表示一个 goroutine 的等待队列,这是一个先入先出的队列。
  • G1 先持有锁,得以进入临界区,其他想抢占锁的 goroutine 阻塞在 Lock 调用处。
  • G1 在使用完锁后,会使用 Unlock 来释放锁,本质上是释放了信号量,然后会唤醒 FIFO 队列头部的 goroutine
  • G2FIFO 队列中移除,进入临界区。G2 使用完锁之后也会使用 Unlock 来释放锁。

上面只是一个大概模型,在实际实现中,比这个复杂很多倍,下面会继续深入讲解。

原子操作

go 的 Mutex 实现中,state 字段是一个 32 位的整数,不同的位记录了四种不同信息,在这种情况下, 只需要通过原子操作就可以保证一次性实现对四种不同状态信息的更改,而不需要更多额外的同步机制。

但是毋庸置疑,这种实现会大大降低代码的可读性,因为通过一个整数来记录不同的信息, 就意味着,需要通过各种位运算来实现对这个整数不同位的修改,比如将上锁的操作:

1
new |= mutexLocked

当然,这只是 Mutex 实现中最简单的一种位运算了。下面以 state 记录的四种不同信息为维度来具体讲解一下:

  • mutexLocked:这是 state 的最低位,1 表示锁被占用,0 表示锁没有被占用。
    • new := mutexLocked 新状态为上锁状态
  • mutexWoken: 这是表示是否有协程被唤醒了的状态
    • new = (old - 1<<mutexWaiterShift) | mutexWoken 等待者数量减去 1 的同时,设置唤醒标识
    • new &^= mutexWoken 清除唤醒标识
  • mutexStarving:饥饿模式的标识
    • new |= mutexStarving 设置饥饿标识
  • 等待者数量:state >> mutexWaiterShift 就是等待者的数量,也就是上面提到的 FIFO 队列中 goroutine 的数量
    • new += 1 << mutexWaiterShift 等待者数量加 1
    • delta := int32(mutexLocked - 1<<mutexWaiterShift) 上锁的同时,将等待者数量减 1

这里并没有涵盖 Mutex 中所有的位运算,其他操作在下文讲解源码实现的时候会提到。

在上面做了这一系列的位运算之后,我们会得到一个新的 state 状态,假设名为 new,那么我们就可以通过 CAS 操作来将 Mutexstate 字段更新:

1
atomic.CompareAndSwapInt32(&m.state, old, new)

通过上面这个原子操作,我们就可以一次性地更新 Mutexstate 字段,也就是一次性更新了四种状态信息。

这种通过一个整数记录不同状态的写法在 sync 包其他的一些地方也有用到,比如 WaitGroup 中的 state 字段。

最后,对于这种操作,我们需要注意的是,因为我们在执行 CAS 前后是没有其他什么锁或者其他的保护机制的, 这也就意味着上面的这个 CAS 操作是有可能会失败的,那如果失败了怎么办呢?

如果失败了,也就意味着肯定有另外一个 goroutine 率先执行了 CAS 操作并且成功了,将 state 修改为了一个新的值。 这个时候,其实我们前面做的一系列位运算得到的结果实际上已经不对了,在这种情况下,我们需要获取最新的 state,然后再次计算得到一个新的 state

所以我们会在源码里面看到 CAS 操作是写在 for 循环里面的。

Mutex 的公平性

在前面,我们提到 goroutien 获取不到锁的时候,会进入一个 FIFO 队列的队列尾,在实际实现中,其实没有那么简单, 为了获得更好的性能,在实现的时候会尽量先让运行状态的 goroutine 获得锁,当然如果队列中的 goroutine 等待太久(大于 1ms), 那么就会先让队列中的 goroutine 获得锁。

下面是文档中的说明:

Mutex 可以处于两种操作模式:正常模式和饥饿模式。在正常模式下,等待者按照FIFO(先进先出)的顺序排队,但是被唤醒的等待者不拥有互斥锁,会与新到达的 Goroutine 竞争所有权。新到达的 Goroutine 有优势——它们已经在 CPU 上运行,数量可能很多,因此被唤醒的等待者有很大的机会失去锁。在这种情况下,它将排在等待队列的前面。如果等待者未能在1毫秒内获取到互斥锁,则将互斥锁切换到饥饿模式。 在饥饿模式下,互斥锁的所有权直接从解锁 Goroutine 移交给队列前面的等待者。新到达的 Goroutine 即使看起来未被锁定,也不会尝试获取互斥锁,也不会尝试自旋。相反,它们会将自己排队在等待队列的末尾。如果等待者获得互斥锁的所有权并发现(1)它是队列中的最后一个等待者,或者(2)它等待时间少于1毫秒,则将互斥锁切换回正常模式。 正常模式的性能要优于饥饿模式,因为 Goroutine 可以连续多次获取互斥锁,即使有被阻塞的等待者。饥饿模式很重要,可以防止尾部延迟的病态情况。

简单总结:

  • Mutex 有两种模式:正常模式、饥饿模式。
  • 正常模式下:
    • 被唤醒的 goroutine 和正在运行的 goroutine 竞争锁。这样可以运行中的协程有机会先获取到锁,从而避免了协程切换的开销。性能更好。
  • 饥饿模式下:
    • 优先让队列中的 goroutine 获得锁,并且直接放弃时间片,让给队列中的 goroutine,运行中的 goroutine 想获取锁要到队尾排队。更加公平。

Mutex 源码剖析

Mutex 本身的源码其实很少,但是复杂程度是非常高的,所以第一次看的时候可能会非常懵逼,但是不妨碍我们去了解它的大概实现原理。

Mutex 中主要有两个方法,LockUnlock,使用起来非常的简单,但是它的实现可不简单。下面我们就来深入了解一下它的实现。

Lock

Lock 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Lock 获取锁。
// 如果锁已在使用中,则调用 goroutine 将阻塞,直到互斥量可用。
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 上锁成功则直接返回
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}

// Slow path (outlined so that the fast path can be inlined)
// 没有上锁成功,这个时候需要做的事情就有点多了。
m.lockSlow()
}

Lock 方法中,第一次获取锁的时候是非常简单的,一个简单的原子操作设置一下 mutexLocked 标识就完成了。 但是如果这个原子操作失败了,表示有其他 goroutine 先获取到了锁,这个时候就需要调用 lockSlow 来做一些额外的操作了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// 获取 mutex 锁
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 当前协程开始等待的时间
starving := false // 当前协程是否是饥饿模式
awoke := false // 唤醒标志(是否当前协程就是被唤醒的协程)
iter := 0 // 自旋次数(超过一定次数如果还没能获得锁,就进入等待)
old := m.state // 旧的状态,每次 for 循环会重新获取当前的状态字段

for {
// 自旋:目的是让正在运行中的 goroutine 尽快获取到锁。
// 两种情况不会自旋:
// 1. 饥饿模式:在饥饿模式下,锁会直接交给等待队列中的 goroutine,所以不会自旋。
// 2. 锁被释放了:另外如果运行到这里的时候,发现锁已经被释放了,也就不需要自旋了。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 设置 mutexWoken 标识
// 如果自旋是有意义的,则会进入到这里,尝试设置 mutexWoken 标识。
// 设置成功在持有锁的 goroutine 获取锁的时候不会唤醒等待队列中的 goroutine,下一个获取锁的就是当前 goroutine。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 各个判断的含义:
// !awoke 已经被唤醒过一次了,说明当前协程是被从等待队列中唤醒的协程/又或者已经成功设置 mutexWoken 标识了,不需要再唤醒了。
// old&mutexWoken == 0 如果不等于 0 说明有 goroutine 被唤醒了,不会尝试设置 mutexWoken 标识
// old>>mutexWaiterShift != 0 如果等待队列为空,当前 goroutine 就是下一个抢占锁的 goroutine
// 前面的判断都通过了,才会进行 CAS 操作尝试设置 mutexWoken 标识
awoke = true
}
runtime_doSpin() // 自旋
iter++ // 自旋次数 +1(超过一定次数会停止自旋)
old = m.state // 再次获取锁的最新状态,之后会检查是否锁被释放了
continue // 继续下一次检查
}

new := old
// 饥饿模式下,新到达的 goroutines 必须排队。
// 不是饥饿状态,直接竞争锁。
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 进入等待队列的两种情况:
// 1. 锁依然被占用。
// 2. 进入了饥饿模式。
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // 等待者数量 +1
}
// 已经等待超过了 1ms,且锁被其他协程占用,则进入饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 唤醒之后,需要重置唤醒标志。
// 不管有没有获取到锁,都是要清除这个标识的:
// 获取到锁肯定要清除,如果获取到锁,需要让其他运行中的 goroutine 来抢占锁;
// 如果没有获取到锁,goroutine 会阻塞,这个时候是需要持有锁的 goroutine 来唤醒的,如果有 mutexWoken 标识,持有锁的 goroutine 唤醒不了。
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // 重置唤醒标志
}

// 成功设置新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 原来锁的状态已释放,并且不是饥饿状态,正常请求到了锁,返回
if old&(mutexLocked|mutexStarving) == 0 { // 这意味着当前的 goroutine 成功获取了锁
break
}

// 如果已经被唤醒过,会被加入到等待队列头。
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待
// queueLifo 为 true,表示加入到队列头。否则,加入到队列尾。
// (首次加入队列加入到队尾,不是首次加入则加入队头,这样等待最久的 goroutine 优先能够获取到锁。)
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 从等待队列中唤醒,检查锁是否应该进入饥饿模式。
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

// 获取当前的锁最新状态
old = m.state
// 如果锁已经处于饥饿状态,直接抢到锁,返回。
// 饥饿模式下,被唤醒的协程可以直接获取到锁。
// 新来的 goroutine 都需要进入队列等待。
if old&mutexStarving != 0 {
// 如果这个 goroutine 被唤醒并且 Mutex 处于饥饿模式,P 的所有权已经移交给我们,
// 但 Mutex 处于不一致的状态:mutexLocked 未设置,我们仍然被视为等待者。修复这个问题。
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 加锁,并且减少等待者数量。
// 实际上是两步操作合成了一步:
// 1. m.state = m.state + 1 (获取锁)
// 2. m.state = m.state - 1<<mutexWaiterShift(waiter - 1)
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 清除饥饿状态的两种情况:
// 1. 如果不需要进入饥饿模式(当前被唤醒的 goroutine 的等待时间小于 1ms)
// 2. 原来的等待者数量为 1,说明是最后一个被唤醒的 goroutine。
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式
delta -= mutexStarving
}
// 原子操作,设置新状态。
atomic.AddInt32(&m.state, delta)
break
}
// 设置唤醒标记,重新抢占锁(会与那些运行中的 goroutine 一起竞争锁)
awoke = true
iter = 0
} else {
// CAS 更新状态失败,获取最新状态,然后重试
old = m.state
}
}
}

我们可以看到,lockSlow 的处理非常的复杂,又要考虑让运行中的 goroutine 尽快获取到锁,又要考虑不能让等待队列中的 goroutine 等待太久。

代码中注释很多,再简单总结一下其中的流程:

  1. 为了让循环中的 goroutine 可以先获取到锁,会先让 goroutine 自旋等待锁的释放,这是因为运行中的 goroutine 正在占用 CPU,让它先获取到锁可以避免一些不必要的协程切换,从而获得更好的性能。
  2. 自旋完毕之后,会尝试获取锁,同时也要根据旧的锁状态来更新锁的不同状态信息,比如是否进入饥饿模式等。
  3. 计算得到一个新的 state 后,会进行 CAS 操作尝试更新 state 状态。
  4. CAS 失败会重试上面的流程。
  5. CAS 成功之后会做如下操作:
  • 判断当前是否已经获取到锁,如果是,则返回,Lock 成功了。
  • 会判断当前的 goroutine 是否是已经被唤醒过,如果是,会将当前 goroutine 加入到等待队列头部。
  • 调用 runtime_SemacquireMutex,进入阻塞状态,等待下一次唤醒。
  • 唤醒之后,判断是否需要进入饥饿模式。
  • 最后,如果已经是饥饿模式,当前 goroutine 直接获取到锁,退出循环,否则,再进行下一次抢占锁的循环中。

具体流程我们可以参考一下下面的流程图:

mutex_4

图中有一些矩形方框描述了 unlockSlow 的关键流程。

Unlock

Unlock 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
// Unlock 释放互斥锁。
// 如果 m 在进入 Unlock 时未被锁定,则会出现运行时错误。
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
// unlock 成功
// unLock 操作实际上是将 state 减去 1。
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 { // 等待队列为空的时候直接返回了
// 唤醒一个等待锁的 goroutine
m.unlockSlow(new)
}
}

Unlock 做了两件事:

  1. 释放当前 goroutine 持有的互斥锁:也就是将 state 减去 1
  2. 唤醒等待队列中的下一个 goroutine

如果只有一个 goroutine 在使用锁,只需要简单地释放锁就可以了。 但是如果有其他的 goroutine 在阻塞等待,那么持有互斥锁的 goroutine 就有义务去唤醒下一个 goroutine。

唤醒的流程相对复杂一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// unlockSlow 唤醒下一个等待锁的协程。
func (m *Mutex) unlockSlow(new int32) {
// 如果未加锁,则会抛出错误。
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}

// 下面的操作是唤醒一个在等待锁的协程。
// 存在两种情况:
// 1. 正常模式:
// a. 不需要唤醒:没有等待者、锁已经被抢占、有其他运行中的协程在尝试获取锁、已经进入了饥饿模式
// b. 需要唤醒:其他情况
// 2. 饥饿模式:唤醒等待队列头部的那个协程
if new&mutexStarving == 0 {
// 不是饥饿模式
old := new
// 自旋
for {
// 下面几种情况不需要唤醒:
// 1. 没有等待者了(没得唤醒)
// 2. 锁已经被占用(只能有一个 goroutine 持有锁)
// 3. 有其他运行中的协程已经被唤醒(运行中的 goroutine 通过自旋先抢占到了锁)
// 4. 饥饿模式(饥饿模式下,所有新的 goroutine 都要排队,饥饿模式会直接唤醒等待队列头部的 gorutine)
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 获取到唤醒等待者的权力,开始唤醒一个等待者。
// 下面这一行实际上是两个操作:
// 1. waiter 数量 - 1
// 2. 设置 mutexWoken 标志
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 正常模式下唤醒了一个 goroutine
//(第二个参数为 false,表示当前的 goroutine 在释放信号量后还会继续执行直到用完时间片)
runtime_Semrelease(&m.sema, false, 1)
return
}
// 唤醒失败,进行下一次尝试。
old = m.state
}
} else {
// 饥饿模式:将互斥锁的所有权移交给下一个等待者,并放弃我们的时间片,以便下一个等待者可以立即开始运行。
// 注意:如果“mutexLocked”未设置,等待者在唤醒后会将其设置。
// 但是,如果设置了“mutexStarving”,则仍然认为互斥锁已被锁定,因此新到来的goroutine不会获取它。
//
// 当前的 goroutine 放弃 CPU 时间片,让给阻塞在 sema 的 goroutine。
runtime_Semrelease(&m.sema, true, 1)
}
}

unlockSlow 逻辑相比 lockSlow 要简单许多,我们可以再结合下面的流程图来阅读上面的源码:

mutex_5

runtime_Semrelease 第二个参数的含义

细心的朋友可能注意到了,在 unlockSlow 的实现中,有两处地方调用了 runtime_Semrelease 这个方法, 这个方法的作用是释放一个信号量,这样可以让阻塞在信号量上的 goroutine 得以继续执行。 它的第一个参数我们都知道,是信号量,而第二个参数 truefalse 分别传递了一次, 那么 truefalse 分别有什么作用呢?

答案是,设置为 true 的时候,当前的 goroutine 会直接放弃自己的时间片, 将 P 的使用权交给 Mutex 等待队列中的第一个 goroutine, 这样的目的是,让 Mutex 等待队列中的 goroutine 可以尽快地获取到锁。

总结

互斥锁在并发编程中也算是非常常见的一种操作了,使用互斥锁可以限制只有一个 goroutine 可以进入临界区, 这对于并发修改全局变量、初始化等情况非常好用。最后,再总结一下本文所讲述的内容:

  • 互斥锁是一种用于多线程编程中,防止两个线程同时对同一公共资源进行读写的机制。go 中的互斥锁实现是 sync.Mutex
  • Mutex 的操作只有两个:
    • Lock 获取锁,同一时刻只能有一个 goroutine 可以获取到锁,其他 goroutine 会先通过自旋抢占锁,抢不到则阻塞等待。
    • Unlock 释放锁,释放锁之前必须有 goroutine 持有锁。释放锁之后,会唤醒等待队列中的下一个 goroutine。
  • Mutex 常见的使用场景有两个:
    • 并发读写 map:如 ginContextKeys 属性的读写。
    • 并发读写全局变量:如 sync.Pool 中对 allPools 的读写。
  • 使用 Mutex 需要注意以下几点:
    • 不要忘记使用 Unlock 释放锁
    • Lock 之后,没有释放锁之前,不能再次使用 Lock
    • 注意不同 goroutine 竞争不同锁的情况,需要考虑一下是否有可能会死锁
    • Unlock 之前,必须已经调用了 Lock,否则会 panic
    • 在第一次使用 Mutex 之后,不能复制,因为这样一来 Mutex 的状态也会被复制。这个可以使用 go vet 来检查。
  • 互斥锁可以保护一块代码块只能有一个 goroutine 执行,但是不保证临界区内操作的变量不被其他 goroutine 做并发读写操作。
  • go 的 Mutex 基于以下技术实现:
    • 信号量:这是操作系统层面的同步机制
    • 队列:在 goroutine 获取不到锁的时候,会将这些 goroutine 放入一个 FIFO 队列中,下次唤醒会唤醒队列头的 goroutine
    • 原子操作:state 字段记录了四种不同的信息,通过原子操作就可以保证数据的完整性
  • go Mutex 的公平性:
    • 正在运行的 goroutine 如果需要锁的话,尽量让它先获取到锁,可以避免不必要的协程上下文切换。会和被唤醒的 goroutine 一起竞争锁。
    • 但是如果等待队列中的 goroutine 超过了 1ms 还没有获取到锁,那么会进入饥饿模式
  • go Mutex 的两种模式:
    • 正常模式:运行中的 goroutine 有一定机会比等待队列中的 goroutine 先获取到锁,这种模式有更好的性能。
    • 饥饿模式:所有后来的 goroutine 都直接进入等待队列,会依次从等待队列头唤醒 goroutine。可以有效避免尾延迟。
  • 饥饿模式下,Unlock 的时候会直接将当前 goroutine 所在 P 的使用权交给等待队列头部的 goroutine,放弃原本属于自己的时间片。

在我们前面的一些介绍 sync 包相关的文章中,我们应该也发现了,其中有不少地方使用了原子操作。 比如 sync.WaitGroupsync.Map 再到 sync.Pool,这些结构体的实现中都有原子操作的身影。 原子操作在并发编程中是一种非常重要的操作,它可以保证并发安全,而且效率也很高。 本文将会深入探讨一下 go 中原子操作的原理、使用场景、用法等内容。

什么是原子操作?

原子操作是变量级别的互斥锁。

如果让我用一句话来说明什么是原子操作,那就是:原子操作是变量级别的互斥锁。 简单来说,就是同一时刻,只能有一个 CPU 对变量进行读或写。 当我们想要对某个变量做并发安全的修改,除了使用官方提供的 Mutex,还可以使用 sync/atomic 包的原子操作, 它能够保证对变量的读取或修改期间不被其他的协程所影响。

我们可以用下图来表示:

atomic_1

说明:在上图中,我们有三个 CPU 逻辑核,其中 CPU 1 正在对变量 v 做原子操作,这个时候 CPU 2 和 CPU 3 不能对 v 做任何操作, 在 CPU 1 操作完成后,CPU 2 和 CPU 3 可以获取到 v 的最新值。

从这个角度看,我们可以把 sync/atomic 包中的原子操作看成是变量级别的互斥锁。 就是说,在 go 中,当一个协程对变量做原子操作时,其他协程不能对这个变量做任何操作,直到这个协程操作完成。

原子操作的使用场景是什么?

拿一个简单的例子来说明一下原子操作的使用场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func TestAtomic(t *testing.T) {
var sum = 0
var wg sync.WaitGroup
wg.Add(1000)

// 启动 1000 个协程,每个协程对 sum 做加法操作
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
sum++
}()
}

// 等待所有的协程都执行完毕
wg.Wait()
fmt.Println(sum) // 这里输出多少呢?
}

我们可以在自己的电脑上运行一下这段代码,看看输出的结果是多少。 不出意外的话,应该每次可能都不一样,而且应该也不是 1000,这是为什么呢?

这是因为,CPU 在对 sum 做加法的时候,需要先将 sum 目前的值读取到 CPU 的寄存器中,然后再进行加法操作,最后再写回到内存中。 如果有两个 CPU 同时取了 sum 的值,然后都进行了加法操作,然后都再写回到内存中,那么就会导致 sum 的值被覆盖,从而导致结果不正确。

举个例子,目前内存中的 sum 为 1,然后两个 CPU 同时取了这个 1 来做加法,然后都得到了结果 2, 然后这两个 CPU 将各自的计算结果写回到内存中,那么内存中的 sum 就变成了 2,而不是 3。

在这种场景下,我们可以使用原子操作来实现并发安全的加法操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func TestAtomic1(t *testing.T) {
// 将 sum 的类型改成 int32,因为原子操作只能针对 int32、int64、uint32、uint64、uintptr 这几种类型
var sum int32 = 0
var wg sync.WaitGroup
wg.Add(1000)

// 启动 1000 个协程,每个协程对 sum 做加法操作
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
// 将 sum++ 改成下面这样
atomic.AddInt32(&sum, 1)
}()
}

wg.Wait()
fmt.Println(sum) // 输出 1000
}

在上面这个例子中,我们每次执行都能得到 1000 这个结果。

因为使用原子操作的时候,同一时刻只能有一个 CPU 对变量进行读或写,所以就不会出现上面的问题了。

所以很多需要对变量做并发读写的地方,我们都可以考虑一下,是否可以使用原子操作来实现并发安全的操作(而不是使用互斥锁,互斥锁效率相比原子操作要低一些)。

原子操作是怎么实现的?

看完上面原子操作的介绍,有没有觉得原子操作很神奇,居然有这么好用的东西。那它到底是怎么实现的呢?

一般情况下,原子操作的实现需要特殊的 CPU 指令或者系统调用。 这些指令或者系统调用可以保证在执行期间不会被其他操作或事件中断,从而保证操作的原子性。

例如,在 x86 架构的 CPU 中,可以使用 LOCK 前缀来实现原子操作。 LOCK 前缀可以与其他指令一起使用,用于锁定内存总线,防止其他 CPU 访问同一内存地址,从而实现原子操作。 在使用 LOCK 前缀的指令执行期间,CPU 会将当前处理器缓存中的数据写回到内存中,并锁定该内存地址, 防止其他 CPU 修改该地址的数据(所以原子操作总是可以读取到最新的数据)。 一旦当前 CPU 对该地址的操作完成,CPU 会释放该内存地址的锁定,其他 CPU 才能继续对该地址进行访问。

x86 LOCK 的时候发生了什么

我们再来捋一下上面的内容,看看 LOCK 前缀是如何实现原子操作的:

  1. CPU 会将当前处理器缓存中的数据写回到内存中。(因此我们总能读取到最新的数据)
  2. 然后锁定该内存地址,防止其他 CPU 修改该地址的数据。
  3. 一旦当前 CPU 对该地址的操作完成,CPU 会释放该内存地址的锁定,其他 CPU 才能继续对该地址进行访问。

其他架构的 CPU 可能会略有不同,但是原理是一样的。

原子操作有什么特征?

  1. 不会被中断:原子操作是一个不可分割的操作,要么全部执行,要么全部不执行,不会出现中间状态。这是保证原子性的基本前提。同时,原子操作过程中不会有上下文切换的过程。
  2. 操作对象是共享变量:原子操作通常是对共享变量进行的,也就是说,多个协程可以同时访问这个变量,因此需要采用原子操作来保证数据的一致性和正确性。
  3. 并发安全:原子操作是并发安全的,可以保证多个协程同时进行操作时不会出现数据竞争问题(虽然说是同时,但是实际上在操作那个变量的时候是互斥的)。
  4. 无需加锁:原子操作不需要使用互斥锁来保证数据的一致性和正确性,因此可以避免互斥锁的使用带来的性能损失。
  5. 适用场景比较局限:原子操作适用于操作单个变量,如果需要同时并发读写多个变量,可能需要考虑使用互斥锁。

go 里面有哪些原子操作?

在 go 中,主要有以下几种原子操作:AddCompareAndSwapLoadStoreSwap

增减(Add)

  1. 用于进行增加或减少的原子操作,函数名以 Add 为前缀,后缀针对特定类型的名称。
  2. 原子增被操作的类型只能是数值类型,即 int32int64uint32uint64uintptr
  3. 原子增减函数的第一个参数为原值,第二个参数是要增减多少。
  4. 方法:
1
2
3
4
5
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

int32int64 的第二个参数可以是负数,这样就可以做原子减法了。

比较并交换(CompareAndSwap)

也就是我们常见的 CAS,在 CAS 操作中,会需要拿旧的值跟 old 比较,如果相等,就将 new 赋值给 addr。 如果不相等,则不做任何操作。最后返回一个 bool 值,表示是否成功 swap

也就是说,这个操作可能是不成功的。这很正常,在并发环境下,多个协程对同一个变量进行操作,肯定会存在竞争的情况。 在这种情况下,偶尔的失败是正常的,我们只需要在失败的时候,重新尝试即可。 因为原子操作需要的时间往往是比较短的,因此在失败的时候,我们可以通过自旋的方式来再次进行尝试。

在这种情况下,如果不自旋,那就需要将这个协程挂起,等待其他协程完成操作,然后再次尝试。这个过程相比自旋可能会更加耗时。 因为很有可能这次原子操作不成功,下一次就成功了。如果我们每次都将协程挂起,那么效率就会大大降低。

for + 原子操作的方式,在 go 的 sync 包中很多地方都有使用,比如 sync.Mapsync.Pool 等。 这也是使用原子操作时一个非常常见的使用模式。

CompareAndSwap 的功能:

  1. 用于比较并交换的原子操作,函数名以 CompareAndSwap 为前缀,后缀针对特定类型的名称。
  2. 原子比较并交换被操作的类型可以是数值类型或指针类型,即 int32int64uint32uint64uintptrunsafe.Pointer
  3. 原子比较并交换函数的第一个参数为原值指针,第二个参数是要比较的值,第三个参数是要交换的值。
  4. 方法:
1
2
3
4
5
6
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

载入(Load)

原子性的读取操作接受一个对应类型的指针值,返回该指针指向的值。原子性读取意味着读取值的同时,当前计算机的任何 CPU 都不会进行针对值的读写操作。

如果不使用原子 Load,当使用 v := value 这种赋值方式为变量 v 赋值时,读取到的 value 可能不是最新的,因为在读取操作时其他协程对它的读写操作可能会同时发生。

Load 操作有下面这些:

1
2
3
4
5
6
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

存储(Store)

Store 可以将 val 值保存到 *addr 中,Store 操作是原子性的,因此在执行 Store 操作时,当前计算机的任何 CPU 都不会进行针对 *addr 的读写操作。

  1. 原子性存储会将 val 值保存到 *addr 中。
  2. 与读操作对应的写入操作,sync/atomic 提供了与原子值载入 Load 函数相对应的原子值存储 Store 函数,原子性存储函数均以 Store 为前缀。

Store 操作有下面这些:

1
2
3
4
5
6
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintpre, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

交换(Swap)

SwapStore 有点类似,但是它会返回 *addr 的旧值。

1
2
3
4
5
6
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

原子操作的使用场景是什么?

文章开头的地方,我们已经说了,原子操作本质上是一种变量级别的互斥锁。 因此,原子操作的使用场景也是和互斥锁类似的,但是不一样的是,我们的锁粒度只是一个变量而已。 也就是说,当我们不允许多个 CPU 同时对变量进行读写的时候(保证变量同一时刻只能一个 CPU 操作),就可以使用原子操作。

原子操作任意类型的值 - atomic.Value

从上一节中,我们知道了在 go 中原子操作可以操作 int32int64uint32uint64uintptrunsafe.Pointer 这些类型的值。 但是在实际开发中,我们的类型还有很多,比如 stringstruct 等等,那这些类型的值如何进行原子操作呢?答案是使用 atomic.Value

atomic.Value 是一个结构体,它的内部有一个 any 类型的字段,存储了我们要原子操作的值,也就是一个任意类型的值。

atomic.Value 支持以下操作:

  • Load:原子性的读取 Value 中的值。
  • Store:原子性的存储一个值到 Value 中。
  • Swap:原子性的交换 Value 中的值,返回旧值。
  • CompareAndSwap:原子性的比较并交换 Value 中的值,如果旧值和 old 相等,则将 new 存入 Value 中,返回 true,否则返回 false

atomic.Value 的这些操作跟上面讲到的那些操作其实差不多,只不过 atomic.Value 可以操作任意类型的值。 那 atomic.Value 是如何实现的呢?

atomic.Value 源码分析

atomic.Value 是一个结构体,这个结构体只有一个字段:

1
2
3
4
// Value 提供一致类型值的原子加载和存储。
type Value struct {
v any
}

Load - 读取

Load 返回由最近的 Store 设置的值。如果还没有 Store 过任何值,则返回 nil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Load 返回由最近的 Store 设置的值。
func (v *Value) Load() (val any) {
// atomic.Value 转换为 efaceWords
vp := (*efaceWords)(unsafe.Pointer(v))

// 判断 atomic.Value 的类型
typ := LoadPointer(&vp.typ)
// 第一次 Store 还没有完成,直接返回 nil
if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {
// firstStoreInProgress 是一个特殊的变量,存储到 typ 中用来表示第一次 Store 还没有完成
return nil
}

// 获取 atomic.Value 的值
data := LoadPointer(&vp.data)
// 将 val 转换为 efaceWords 类型
vlp := (*efaceWords)(unsafe.Pointer(&val))
// 分别赋值给 val 的 typ 和 data
vlp.typ = typ
vlp.data = data
return
}

atomic.Value 的源码中,我们都可以看到 efaceWords 的身影,它实际上代表的是 interface{}/any 类型:

1
2
3
4
5
// 表示一个 interface{}/any 类型
type efaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}

看到这里我们会不会觉得很困惑,直接返回 val 不就可以了吗?为什么要将 val 转换为 efaceWords 类型呢?

这是因为 go 中的原子操作只能操作 int32int64uint32uint64uintptrunsafe.Pointer 这些类型的值, 不支持 interface{} 类型,但是如果了解 interface{} 底层结构的话,我们就知道 interface{} 底层其实就是一个结构体, 它有两个字段,一个是 type,一个是 datatype 用来存储 interface{} 的类型,data 用来存储 interface{} 的值。 而且这两个字段都是 unsafe.Pointer 类型的,所以其实我们可以对 interface{}typedata 分别进行原子操作, 这样最终其实也可以达到了原子操作 interface{} 的目的了,是不是非常地巧妙呢?

Store - 存储

StoreValue 的值设置为 val。对给定值的所有存储调用必须使用相同具体类型的值。不一致类型的存储会发生恐慌,Store(nil) 也会 panic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Store 将 Value 的值设置为 val。
func (v *Value) Store(val any) {
// 不能存储 nil 值
if val == nil {
panic("sync/atomic: store of nil value into Value")
}
// atomic.Value 转换为 efaceWords
vp := (*efaceWords)(unsafe.Pointer(v))
// val 转换为 efaceWords
vlp := (*efaceWords)(unsafe.Pointer(&val))

// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
for {
// LoadPointer 可以保证获取到的是最新的
typ := LoadPointer(&vp.typ)
// 第一次 store 的时候 typ 还是 nil,说明是第一次 store
if typ == nil {
// 尝试开始第一次 Store。
// 禁用抢占,以便其他 goroutines 可以自旋等待完成。
// (如果允许抢占,那么其他 goroutine 自旋等待的时间可能会比较长,因为可能会需要进行协程调度。)
runtime_procPin()
// 抢占失败,意味着有其他 goroutine 成功 store 了,允许抢占,再次尝试 Store
// 这也是一个原子操作。
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
runtime_procUnpin()
continue
}
// 完成第一次 store
// 因为有 firstStoreInProgress 标识的保护,所以下面的两个原子操作是安全的。
StorePointer(&vp.data, vlp.data) // 存储值(原子操作)
StorePointer(&vp.typ, vlp.typ) // 存储类型(原子操作)
runtime_procUnpin() // 允许抢占
return
}

// 另外一个 goroutine 正在进行第一次 Store。自旋等待。
if typ == unsafe.Pointer(&firstStoreInProgress) {
continue
}

// 第一次 Store 已经完成了,下面不是第一次 Store 了。
// 需要检查当前 Store 的类型跟第一次 Store 的类型是否一致,不一致就 panic。
if typ != vlp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}

// 后续的 Store 只需要 Store 值部分就可以了。
// 因为 atomic.Value 只能保存一种类型的值。
StorePointer(&vp.data, vlp.data)
return
}
}

Store 中,有以下几个注意的点:

  1. 使用 firstStoreInProgress 来确保第一次 Store 的时候,只有一个 goroutine 可以进行 Store 操作,其他的 goroutine 需要自旋等待。如果没有这个保护,那么存储 typdata 的时候就会出现竞争(因为需要两个原子操作),导致数据不一致。在这里其实可以将 firstStoreInProgress 看作是一个互斥锁。
  2. 在进行第一次 Store 的时候,会将当前的 goroutine 和 P 绑定,这样拿到 firstStoreInProgress 锁的协程就可以尽快地完成第一次 Store 操作,这样一来,其他的协程也不用等待太久。
  3. 在第一次 Store 的时候,会有两个原子操作,分别存储类型和值,但是因为有 firstStoreInProgress 的保护,所以这两个原子操作本质上是对 interface{} 的一个原子存储操作。
  4. 其他协程在看到有 firstStoreInProgress 标识的时候,就会自旋等待,直到第一次 Store 完成。
  5. 在后续的 Store 操作中,只需要存储值就可以了,因为 atomic.Value 只能保存一种类型的值。

Swap - 交换

SwapValue 的值设置为 new 并返回旧值。对给定值的所有交换调用必须使用相同具体类型的值。同时,不一致类型的交换会发生恐慌,Swap(nil) 也会 panic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// Swap 将 Value 的值设置为 new 并返回旧值。
func (v *Value) Swap(new any) (old any) {
// 不能存储 nil 值
if new == nil {
panic("sync/atomic: swap of nil value into Value")
}

// atomic.Value 转换为 efaceWords
vp := (*efaceWords)(unsafe.Pointer(v))
// new 转换为 efaceWords
np := (*efaceWords)(unsafe.Pointer(&new))

// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
for {
// 下面这部分代码跟 Store 一样,不细说了。
// 这部分代码是进行第一次存储的代码。
typ := LoadPointer(&vp.typ)
if typ == nil {
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
runtime_procUnpin()
continue
}
StorePointer(&vp.data, np.data)
StorePointer(&vp.typ, np.typ)
runtime_procUnpin()
return nil
}
if typ == unsafe.Pointer(&firstStoreInProgress) {
continue
}
if typ != np.typ {
panic("sync/atomic: swap of inconsistently typed value into Value")
}

// ---- 下面是 Swap 的特有逻辑 ----
// op 是返回值
op := (*efaceWords)(unsafe.Pointer(&old))
// 返回旧的值
op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data)
return old
}
}

CompareAndSwap - 比较并交换

CompareAndSwapValue 的值与 old 比较,如果相等则设置为 new 并返回 true,否则返回 false。 对给定值的所有比较和交换调用必须使用相同具体类型的值。同时,不一致类型的比较和交换会发生恐慌,CompareAndSwap(nil, nil) 也会 panic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// CompareAndSwap 比较并交换。
func (v *Value) CompareAndSwap(old, new any) (swapped bool) {
// 注意:old 是可以为 nil 的,new 不能为 nil。
// old 是 nil 表示是第一次进行 Store 操作。
if new == nil {
panic("sync/atomic: compare and swap of nil value into Value")
}

// atomic.Value 转换为 efaceWords
vp := (*efaceWords)(unsafe.Pointer(v))
// new 转换为 efaceWords
np := (*efaceWords)(unsafe.Pointer(&new))
// old 转换为 efaceWords
op := (*efaceWords)(unsafe.Pointer(&old))

// old 和 new 类型必须一致,且不能为 nil
if op.typ != nil && np.typ != op.typ {
panic("sync/atomic: compare and swap of inconsistently typed values")
}

// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
for {
// LoadPointer 可以保证获取到的 typ 是最新的
typ := LoadPointer(&vp.typ)
if typ == nil { // atomic.Value 是 nil,还没 Store 过
// 准备进行第一次 Store,但是传递进来的 old 不是 nil,compare 这一步就失败了。直接返回 false
if old != nil {
return false
}

// 下面这部分代码跟 Store 一样,不细说了。
// 这部分代码是进行第一次存储的代码。
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
runtime_procUnpin()
continue
}
StorePointer(&vp.data, np.data)
StorePointer(&vp.typ, np.typ)
runtime_procUnpin()
return true
}
if typ == unsafe.Pointer(&firstStoreInProgress) {
continue
}
if typ != np.typ {
panic("sync/atomic: compare and swap of inconsistently typed value into Value")
}

// 通过运行时相等性检查比较旧版本和当前版本。
// 这允许对值类型进行比较,这是包函数所没有的。
// 下面的 CompareAndSwapPointer 仅确保 vp.data 自 LoadPointer 以来没有更改。
data := LoadPointer(&vp.data)
var i any
(*efaceWords)(unsafe.Pointer(&i)).typ = typ
(*efaceWords)(unsafe.Pointer(&i)).data = data
if i != old { // atomic.Value 跟 old 不相等
return false
}
// 只做 val 部分的 cas 操作
return CompareAndSwapPointer(&vp.data, data, np.data)
}
}

这里需要特别说明的只有最后那个比较相等的判断,也就是 data := LoadPointer(&vp.data) 以及往后的几行代码。 在开发 atomic.Value 第一版的时候,那个开发者其实是将这几行写成 CompareAndSwapPointer(&vp.data, old.data, np.data) 这种形式的。 但是在旧的写法中,会存在一个问题,如果我们做 CAS 操作的时候,如果传递的参数 old 是一个结构体的值这种类型,那么这个结构体的值是会被拷贝一份的, 同时再会被转换为 interface{}/any 类型,这个过程中,其实参数的 olddata 部分指针指向的内存跟 vp.data 指向的内存是不一样的。 这样的话,CAS 操作就会失败,这个时候就会返回 false,但是我们本意是要比较它的值,出现这种结果显然不是我们想要的。

将值作为 interface{} 参数使用的时候,会存在一个将值转换为 interface{} 的过程。具体我们可以看看 interface{} 的实现原理。

所以,在上面的实现中,会将旧值的 typdata 赋值给一个 any 类型的变量, 然后使用 i != old 这种方式进行判断,这样就可以实现在比较的时候,比较的是值,而不是由值转换为 interface{} 后的指针。

其他原子类型

我们现在知道了,atomic.Value 可以对任意类型做原子操作。 而对于其他的原子类型,比如 int32int64uint32uint64uintptrunsafe.Pointer 等, 其实在 go 中也提供了包装的类型,让我们可以以对象的方式来操作这些类型。

对应的类型如下:

  • atomic.Bool:这个比较特别,但底层实际上是一个 uint32 类型的值。我们对 atomic.Bool 做原子操作的时候,实际上是对 uint32 做原子操作。
  • atomic.Int32int32 类型的包装类型
  • atomic.Int64int64 类型的包装类型
  • atomic.Uint32uint32 类型的包装类型
  • atomic.Uint64uint64 类型的包装类型
  • atomic.Uintptruintptr 类型的包装类型
  • atomic.Pointerunsafe.Pointer 类型的包装类型

这几种类型的实现的代码基本一样,除了类型不一样,我们可以看看 atomic.Int32 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// An Int32 is an atomic int32. The zero value is zero.
type Int32 struct {
_ noCopy
v int32
}

// Load atomically loads and returns the value stored in x.
func (x *Int32) Load() int32 { return LoadInt32(&x.v) }

// Store atomically stores val into x.
func (x *Int32) Store(val int32) { StoreInt32(&x.v, val) }

// Swap atomically stores new into x and returns the previous value.
func (x *Int32) Swap(new int32) (old int32) { return SwapInt32(&x.v, new) }

// CompareAndSwap executes the compare-and-swap operation for x.
func (x *Int32) CompareAndSwap(old, new int32) (swapped bool) {
return CompareAndSwapInt32(&x.v, old, new)
}

可以看到,atomic.Int32 的实现都是基于 atomic 包中 int32 类型相关的原子操作函数来实现的。

原子操作与互斥锁比较

那我们有了互斥锁,为什么还要有原子操作呢?我们进行比较一下就知道了:

原子操作 互斥锁
保护的范围 变量 代码块
保护的粒度
性能
如何实现的 硬件指令 软件层面实现,逻辑较多

如果我们只需要对某一个变量做并发读写,那么使用原子操作就可以了,因为原子操作的性能比互斥锁高很多。 但是如果我们需要对多个变量做并发读写,那么就需要用到互斥锁了,这种场景往往是在一段代码中对不同变量做读写。

性能比较

我们前面这个表格提到了原子操作与互斥锁性能上有差异,我们写几行代码来进行比较一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 系统信息 cpu: Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz
// 10.13 ns/op
func BenchmarkMutex(b *testing.B) {
var mu sync.Mutex

for i := 0; i < b.N; i++ {
mu.Lock()
mu.Unlock()
}
}

// 5.849 ns/op
func BenchmarkAtomic(b *testing.B) {
var sum atomic.Uint64

for i := 0; i < b.N; i++ {
sum.Add(uint64(1))
}
}

在对 Mutex 的性能测试中,我只是写了简单的 Lock()UnLock() 操作,因为这种比较才算是对 Mutex 本身的测试,而在 Atomic 的性能测试中,对 sum 做原子累加的操作。最终结果是,使用 Atomic 的操作耗时大概比 Mutex 少了 40% 以上。

在实际开发中,Mutex 保护的临界区内往往有更多操作,也就意味着 Mutex 锁需要耗费更长的时间才能释放,也就是会需要耗费比上面这个 40% 还要多的时间另外一个协程才能获取到 Mutex 锁。

go 的 sync 包中的原子操作

在文章的开头,我们就说了,在 go 的 sync.Mapsync.Pool 中都有用到了原子操作,本节就来看一看这些操作。

sync.Map 中的原子操作

sync.Map 中使用到了一个 entry 结构体,这个结构体中大部分操作都是原子操作,我们可以看看它下面这两个方法的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 删除 entry
func (e *entry) delete() (value any, ok bool) {
for {
p := e.p.Load()
// 已经被删除了,不需要再删除
if p == nil || p == expunged {
return nil, false
}
// 删除成功
if e.p.CompareAndSwap(p, nil) {
return *p, true
}
}
}

// 如果条目尚未删除,trySwap 将交换一个值。
func (e *entry) trySwap(i *any) (*any, bool) {
for {
p := e.p.Load()
// 已经被删除了
if p == expunged {
return nil, false
}
// swap 成功
if e.p.CompareAndSwap(p, i) {
return p, true
}
}
}

我们可以看到一个非常典型的特征就是 for + CompareAndSwap 的组合,这个组合在 entry 中出现了很多次。

如果我们也需要对变量做并发读写,也可以尝试一下这种 for + CompareAndSwap 的组合。

sync.WaitGroup 中的原子操作

sync.WaitGroup 中有一个类型为 atomic.Uint64state 字段,这个变量是用来记录 WaitGroup 的状态的。 在实际使用中,它的高 32 位用来记录 WaitGroup 的计数器,低 32 位用来记录 WaitGroupWaiter 的数量,也就是等待条件变量满足的协程数量。

如果不使用一个变量来记录这两个值,那么我们就需要使用两个变量来记录,这样就会导致我们需要对两个变量做并发读写, 在这种情况下,我们就需要使用互斥锁来保护这两个变量,这样就会导致性能的下降。

而使用一个变量来记录这两个值,我们就可以使用原子操作来保护这个变量,这样就可以保证并发读写的安全性,同时也能得到更好的性能:

1
2
3
4
5
6
// WaitGroup 的 Add 函数:高 32 位加上 delta
state := wg.state.Add(uint64(delta) << 32)

// WaitGroup 的 Wait 函数:低 32 位加 1
// 等待者的数量加 1
wg.state.CompareAndSwap(state, state+1)

CAS 操作有失败必然有成功

当然这里是指指向同一行 CAS 代码的时候(也就是有竞争的时候),如果是指向不同行 CAS 代码的时候,那么就不一定了。 比如下面这个例子,我们把前面计算 sum 的例子改一改,改成用 CAS 操作来完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func TestCas(t *testing.T) {
var sum int32 = 0
var wg sync.WaitGroup
wg.Add(1000)

for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
// 这一行是有可能会失败的
atomic.CompareAndSwapInt32(&sum, sum, sum+1)
}()
}

wg.Wait()
fmt.Println(sum) // 不是 1000
}

在这个例子中,我们把 atomic.AddInt32(&sum, 1) 改成了 atomic.CompareAndSwapInt32(&sum, sum, sum+1), 这样就会导致有可能会有多个 goroutine 同时执行到 atomic.CompareAndSwapInt32(&sum, sum, sum+1) 这一行代码, 这样肯定会有不同的 goroutine 同时拿到一个相同的 sum 的旧值,那么在这种情况下,就会导致 CAS 操作失败。 也就是说,将 sum 替换为 sum + 1 的操作可能会失败。

失败意味着什么呢?意味着另外一个协程序先把 sum 的值加 1 了,这个时候其实我们不应该在旧的 sum 上加 1 了, 而是应该在最新的 sum 上加上 1,那我们应该怎么做呢?我们可以在 CAS 操作失败的时候,重新获取 sum 的值, 然后再次尝试 CAS 操作,直到成功为止:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func TestCas(t *testing.T) {
var sum int32 = 0
var wg sync.WaitGroup
wg.Add(1000)

for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()

// cas 失败的时候,重新获取 sum 的值进行计算。
// cas 成功则返回。
for {
if atomic.CompareAndSwapInt32(&sum, sum, sum+1) {
return
}
}
}()
}

wg.Wait()
fmt.Println(sum)
}

总结

原子操作是并发编程中非常重要的一个概念,它可以保证并发读写的安全性,同时也能得到更好的性能。

最后,总结一下本文讲到的内容:

  • 原子操作是更加底层的操作,它保护的是单个变量,而互斥锁可以保护一个代码片段,它们的使用场景是不一样的。
  • 原子操作需要通过 CPU 指令来实现,而互斥锁是在软件层面实现的。
  • go 里面的原子操作有以下这些:
    • Add:原子增减
    • CompareAndSwap:原子比较并交换
    • Load:原子读取
    • Store:原子写入
    • Swap:原子交换
  • go 里面所有类型都能使用原子操作,只是不同类型的原子操作使用的函数不太一样。
  • atomic.Value 可以用来原子操作任意类型的变量。
  • go 里面有些底层实现也使用了原子操作,比如:
    • sync.WaitGroup:使用原子操作来保证计数器和等待者数量的并发读写安全性。
    • sync.Mapentry 结构体中基本所有操作都有原子操作的身影。
  • 原子操作有失败必然有成功(说的是同一行 CAS 操作),如果 CAS 操作失败了,那么我们可以重新获取旧值,然后再次尝试 CAS 操作,直到成功为止。

总的来说,原子操作本身其实没有太复杂的逻辑,我们理解了它的原理之后,就可以很容易的使用它了。

本文基于 Go 1.19

Pool 是一组可以安全在多个 goroutine 间共享的临时对象的集合。 存储在 Pool 中的任何项目都可能在任何时候被移除,因此 Pool 不适合保存那些有状态的对象,如数据库连接、TCP 连接等。 Pool 的目的是缓存已分配但未使用的项以供以后使用,从而减少垃圾收集器的压力。 也就是说,它可以轻松构建高效、线程安全的空闲列表,但是,它并不适用于所有空闲列表。

使用实例

下面以几个实际的例子来说明 Pool 的一些使用场景。 下面是两个非常典型的使用场景,但是在实际使用中,对于那些需要频繁创建和销毁的对象的场景,我们都可以考虑使用 Pool

gin 里面 Context 对象使用 Pool 保存

ginEngine 结构体中的 ServeHTTP 方法中,可以看到 Context 对象是从 Pool 中获取的。 然后在处理完请求之后,将 Context 对象放回 Pool 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// 从 Pool 中获取 Context
c := engine.pool.Get().(*Context)
// 重置 writermem
c.writermem.reset(w)
// 重置 Request
c.Request = req
// 重置其他字段
c.reset()

engine.handleHTTPRequest(c)

// 将 Context 对象放回 Pool
engine.pool.Put(c)
}

我们可以去看看 ginContext 对象的定义,我们会发现,它其中有很多字段。 设想一下,如果每一个请求都创建一个 Context 对象,那么每一个请求都要对 Context 进行内存分配, 分配了之后,如果请求结束了,这些申请的内存在后续就要被回收掉(当然,不是马上就回收)。

这样一来,如果待回收的 Context 对象很多,那么垃圾回收器就会被压力很大。

同样的做法在 echo 这个框架中也有出现。

fmt 里面的 pp 结构体

在我们使用 fmt 包来打印的时候(比如,调用 fmt.Fprintf),其实底层是要使用一个名为 pp 的结构体来进行打印的。 如果我们的系统中需要大量地使用 fmt 库来做格式化字符串的操作,如果每次进行格式化操作的时候都 new 一个 pp 对象, 那么也会在某个时刻导致垃圾回收器的压力很大。

所以,fmt 包中使用 pp 的时候,都是从 Pool 中获取的:

1
2
3
4
5
6
7
8
9
10
// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() *pp {
// 从 Pool 中获取 pp 对象
p := ppFree.Get().(*pp)
p.panicking = false
p.erroring = false
p.wrapErrs = false
p.fmt.init(&p.buf)
return p
}

上面这个方法是获取 pp 对象的方法。我们在这里没有看到重置 pp 字段的代码,因为这些操作在 ppfree 方法中了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// free saves used pp structs in ppFree; avoids an allocation per invocation.
func (p *pp) free() {
if cap(p.buf) > 64<<10 {
return
}

// 重置 p.buf
p.buf = p.buf[:0]
p.arg = nil
p.value = reflect.Value{}
p.wrappedErr = nil
// 将 pp 放回 Pool 中
ppFree.Put(p)
}

我们不能对 sync.Pool 有任何假设,比如,不要想着 Put 进去的对象带了一个状态,然后 Get 出来的时候就能拿到这个状态。 因为这个对象可能在任何时候被清除掉。

从上面这个例子中,我们可以看到 p.buf = p.buf[:0] 这一行对 buf 进行了重置, 这给我我们的启示是,我们在使用 sync.Pool 的时候,如果我们存取的对象会带有一些不同的字段值, 那么我们可能需要对这些字段进行重置后再使用,这样就可以避免 Get 到的对象带有之前的一些状态, 不过重置这些字段的开销相比分配新的内存以及后续 GC,其实开销可以忽略不计。

Pool 整体模型

Pool 本质上是一个双端队列,这个队列支持以下操作:

  • pushHead:将一个对象放到队列的头部,这也是唯一的入队操作。
  • popHead:从队列的头部取出一个对象。如果取不到则使用 New 方法创建一个新的对象。
  • popTail:从队列的尾部取出一个对象。如果取不到则使用 New 方法创建一个新的对象。

可以简单地用下图表示:

pool_1

在阅读本文过程中,想不清楚的时候,回想一下这个模型,可能会有所帮助。

当然,在实际的实现中,比这个复杂多了,但是这个模型已经足够我们理解 Pool 的工作原理了。

Pool 的双端队列是使用数组还是链表

我们知道,队列的存储通常有两种方式,一种是数组,一种是链表,两者的优缺点如下:

优点 缺点
数组 存储空间连续,可以根据下标快速访问 大小固定,扩容成本高
链表 大小不固定,可以很容易增加新的元素 访问效率不如数组。需要额外的空间来存储前后元素的指针

那么 Pool 里面用的是数组还是链表呢?答案是:数组 + 链表

Pool 为什么要用数组 + 链表

为了快速访问队列中的元素,使用数组是最好的选择,但是数组的大小是固定的,如果队列中的元素很多,那么数组就会很快被填满。 如果我们还想继续往队列中添加元素,那么就需要对数组进行扩容,这个成本是很高的(因为本来就是需要频繁分配/销毁对象的场景才会使用 Pool)。 同时,我们知道 Pool 设计的目的就是为了减少频繁内存分配带来的性能问题,如果在使用 Pool 的过程中频繁对其进行扩容,那么就违背了 Pool 设计的初衷了。

为了解决数组扩容的问题,我们可以考虑一下使用链表。在我们 Put 的时候往链表的头部添加一个元素,然后 Get 的时候从链表的尾部取出一个元素(还需要移除)。 但是这样我们就需要一个结构体来表示我们的节点了,那么问题来了,我们又需要频繁地分配/销毁这个结构体,这样就又回到了最开始的问题了(频繁创建/销毁对象)。 所以,只使用链表也不是一个好的选择。

所以,Pool 采用了 数组 + 链表 的方式来实现双端队列,它们的关系如下:

  • 数组:存储队列中的元素,数组的大小是固定的。
  • 链表:当一个数组存储满了之后,就会新建一个数组,然后通过链表将这两个数组串联起来。

最终,Pool 的双端队列的结构如下:

pool_2

数组如何实现队列

我们知道,数组的存储空间是固定的一块连续的内存,所以我们可以通过下标来访问数组中的元素。 我们 push 的时候,将 head 下标 +1,然后 pop 的时候,也要修改对应的下标, 但是这样会导致一个问题是,早晚 head 会超出数组的下标范围,但这个时候数组可能还有很多空间, 因为在我们 push 的时候,可能也同时在 pop,所以数组中的空间可能还有很多。

所以,就可以在 head 超出数组下标范围的时候,将 head 对数组长度取模,这样就可以循环使用数组了:

pool_3

不过对于这种情况,使用下面的图更加直观:

pool_4

Pool 里面是使用 poolDequeue 这个结构体来表示上图这个队列的,本身是一个数组,但是是当作环形队列使用的。

poolChain 的最终模型

poolChain 就是上面说的 数组 + 链表 的组合,它的最终模型如下:

pool_5

说明:

  • poolChain 本身是一个双向链表,每个节点都是一个 poolDequeue,每个节点都有 prevnext 指针指向前后节点。上图的 head 是头节点,tail 是尾节点。
  • poolDequeue 是一个数组,它的大小是固定的,但是它是当作环形队列使用的。
  • tail 初始化的时候长度为 8,具体可参考 poolChainpushHead 方法,后面会说到。
  • pushHead 的时候,如果 head 中的环形队列已经满了,那么就会新建一个 poolDequeue,然后将它插入到 head 的前面。(新的 head 节点的大小为前一个 head 节点的两倍)
  • popTailpopHead 的时候,如果 tail 或者 head 中的环形队列(poolDequeue)已经空了,那么就会将它从链表中移除。

多个 P 的情况下的 poolChain

这里假设 P 跟我们机器的逻辑处理器的数量一致。(这里涉及到了 goroutine 的调度机制,不了解可以先了解一下再回来看。)

我们知道,在 go 中,每一个 goroutine 都会绑定一个 P,这样才可以充分利用多核的优势。 设想一下,如果我们有多个 goroutine 同时存储一个 Pool,会出现什么情况呢? 会导致很激烈的数据竞争,虽然没有使用 Mutex 这种相对低效的互斥锁来解决竞争问题,使用的是原子操作,但是也会导致性能下降。

所以,在 Pool 的实现中,会为每一个 P 都创建一个 poolChain,每次存取,先操作本地 P 绑定的 poolChain,这样就可以减少多个 goroutine 同时操作一个 Pool 的竞争问题了。

所以,最终的 Pool 的模型会长成下面这个样子,每个 G 关联了一个 poolChain

pool_6

注意:这里不是 Pool 实际的样子,只是为了说明 Pool 的实现原理。

最终实现中的 Pool

在实际的实现中,其实会跟上一个图有一些差异,可以说复杂很多:

pool_7

从上图可以看出,其实每个 P 关联的并不是 poolChain,而是 poolLocalpoolLocal 里面包含了一个 poolLocalInternal 和一个 padpad 是为了避免伪共享而添加的。而 poolLocalInternal 就是实际上 Pool 中存储数据的一个结构体。poolLocalInternal 中包含了两个字段,privatesharedshared 就是我们上面说的 poolChain,而 private 是一个 any 类型的字段,在我们调用 PoolPut 方法的时候,会先尝试将数据存储到 private 中,如果 private 中已经有数据了,那么就会将数据存储到 shared 中。同样的,在 Get 的时候,也会先从 private 中获取数据,如果 private 中没有数据,那么就会从 shared 中获取数据。

而相应的,Pool 存取数据会变成:

  • pushHead:我们调用 PoolPut 方法的时候,只会写入到本地 P 关联的那个 poolLocal 中。
  • popHead:这个方法也只能从本地 P 关联的 poolLocal 中取数据。
  • popTail:这个方法会从本地 P 关联的 poolLocal 中取数据,如果取不到,那么就会从其他 P 关联的 poolLocal 中取数据。

这样就可以减少多个 goroutine 同时操作一个 Pool 的竞争问题了(但是无法避免)。

Pool 模型总结

最后,我们将这一节的内容总结一下,可以得到下面这个图:

pool_8

说明:

  • go 进程内会有多个 P,每个 P 都会关联一个 poolLocal
  • poolLocal 中包含了一个 poolLocalInternal 和一个 padpoolLocalInternal 中包含了两个字段,privateshared
  • private 是一个 any 类型的字段,用来存储我们调用 PoolPut 方法的时候传入的数据,Get 的时候如果 private 中有数据,那么就会直接返回 private 中的数据。
  • shared 是一个 poolChain,用来存储我们调用 PoolPut 方法的时候传入的数据,Get 的时候如果当前 P 绑定的 poolLocal 内是空的,那么可以从其他 P 绑定的 shared 的尾部获取。
  • poolChain 是一个双向链表,每个节点都是一个 poolDequeue,每个节点都有 prevnext 指针指向前后节点。上图的 head 是头节点,tail 是尾节点。
  • poolDequeue 是一个数组,它的大小是固定的,但是它是当作环形队列使用的。
  • pushHead 的时候,如果 poolChain 的节点满了,那么会新建一个节点,其容量为前一个节点的两倍。
  • poolChain 支持三种操作:pushHeadpopHeadpopTail
  • pushHead 会将数据存储到当前 P 关联的 poolChain 的头部。

Pool 中的结构体

在开始分析源码之前,我们先来看一下 Pool 的 UML 图:

pool_9

上图中已经包含 Pool 实现的所有关键结构体了,下面我们来分析一下这些结构体的作用。

sync.Pool 结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Pool struct {
// noCopy 用于防止 Pool 被复制(可以使用 go vet 检测)
noCopy noCopy

// local 的主要作用是,多个 goroutine 同时访问 Pool 时,可以减少竞争,提升性能。
// 实际类型是 [P]poolLocal。长度是 localSize。
local unsafe.Pointer
// []poolLocal 的长度。也就是 local 切片的长度
localSize uintptr

// 存放的是上一轮 GC 时的 local 字段的值。
victim unsafe.Pointer
// victim 数组的长度
victimSize uintptr

// 新建对象的方法。
// Get 的时候如果 Pool 中没有对象可用,会调用这个方法来新建一个对象。
New func() any
}

字段说明:

  • local:就是我们上文说到的 poolLocal 类型的切片,长度是 runtime.GOMAXPROCS(0),也就是当前 P 的数量。之所以使用切片类型是因为我们可以在运行的过程中调整 P 的数量,所以它的长度并不是固定的,如果 P 的数量变了,poolLocal 也会跟着改变。
  • localSizelocal 的长度。
  • victim:上一轮 GC 时的 local 字段的值。
  • victimSizevictim 的长度。
  • New:新建对象的方法。

victim 的作用是在 GC 的时候,将 local 的值赋值给 victim,然后将 local 置为 nil,这样就可以避免在 GC 的时候,local 中的对象被回收掉。 当然,并不是完全不会回收,再经历一次 GC 之后,victim 中的对象就会被回收掉。这样做的好处是,可以避免 GC 的时候清除 Pool 中的所有对象, 这样在 GC 之后如果需要大量地从 Pool 中获取对象也不至于产生瞬时的性能抖动

victim cache 是计算机科学中的一个术语,victim cache 是位于 cpu cache 和主存之间的又一级 cache,用于存放由于失效而被丢弃(替换)的那些块。 每当失效发生时,在访问主存之前,victim cache 都会被检查,如果命中,就不会访问主存。

Pool 获取对象的流程

最终,当我们调用 PoolGet 方法的时候,会按下图的流程来获取对象:

pool_10

说明:

  1. 首先会从当前 P 关联的 poolLocal 中的 private 字段中获取对象,如果获取到了,那么直接返回。
  2. 如果 private 字段中没有对象,那么会从当前 P 关联的 poolLocal 中的 shared 字段中获取对象,如果获取到了,那么直接返回(这里使用的是 popHead 方法)。
  3. 尝试从其他 P 关联的 poolLocal 中的 shared 字段中获取对象,如果获取到了,那么直接返回(这里使用的是 popTail 方法)。
  4. 如果其他 P 关联的 poolLocal 中的 shared 字段中也没有对象,那么会从 victim 中获取对象,如果获取到了,那么直接返回。
  5. 如果 victim 中也没有对象,那么会调用 New 方法来创建一个新的对象(当然前提是我们创建 Pool 对象的时候设置了 New 字段)。
  6. 如果 New 字段也没有设置,那么会返回 nil

poolLocal 和 poolLocalInternal 结构体:

Pool 中,使用了 poolLocalpoolLocalInternal 两个结构体来表示实际存储数据的结构体。 当然我们也可以只使用 poolLocalInternal 这个结构体,但是为了避免伪共享,在 Pool 的实现中, 将 poolLocalInternal 放在了 poolLocal 的第一个字段,然后在 poolLocal 中添加了一个 pad 字段,用来填充 poolLocalInternal 到 cache line 的大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 实际存储 Pool 的数据的结构体
type poolLocalInternal struct {
// private 用于存储 Pool 的 Put 方法传入的数据。
// Get 的时候如果 private 不为空,那么直接返回 private 中的数据。
// 只能被当前 P 使用。
private any
// 本地 P 可以pushHead/popHead
// 任何 P 都可以 popTail。
shared poolChain
}

// Pool 中的 local 属性的元素类型。
// Pool 中的 local 是一个元素类型为 poolLocal 的切片,长度为 runtime.GOMAXPROCS(0)。
type poolLocal struct {
poolLocalInternal

// pad 用于填充 poolLocalInternal 到 cache line 的大小。
// 为了避免伪共享,将 poolLocalInternal 放在第一个字段。
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

其实这里比较关键的地方是 pad 字段的作用,我们知道,CPU 会将内存分成多个 cache line,CPU 从内存中读取数据的时候, 并不是一个字节一个字节地读取,而是一次性读取一个 cache line 的数据。但是如果我们的数据结构中的字段不是按照 cache line 的大小来排列的, 比如跨了两个 cache line,那么在读取的时候就会产生伪共享,这样就会降低性能。

pool_11

伪共享的原因是,数据跨了两个 cache line,那么在读取的时候,就会将两个 cache line 都读取到 CPU 的 cache 中, 这样有可能会导致不同 CPU 在发生数据竞争的时候,会使一些不相关的数据也会失效,从而导致性能下降。 如果对齐到 cache line,那么从内存读取数据的时候,就不会将一些不相关的数据也读取到 CPU 的 cache 中,从而避免了伪共享。

poolChain、poolChainElt 和 poolDequeue 结构体

为什么要把这三个放一起讲呢?因为这三个结构体就是 Pool 中做实际存取数据的结构体,三者作用如下:

  • poolChainpoolChain 是一个链表,每个节点都是 poolChainElt
  • poolChainElt:每个 poolChainElt 中包含了一个 poolDequeue,同时包含了指向前一个节点和后一个节点的指针。
  • poolDequeuepoolDequeue 是一个双端队列(环形队列,使用数组存储),用来存储 Pool 中的数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// poolChain 是 poolDequeue 的动态大小版本。
//
// 这是作为 poolDequeues 的双向链表队列实现的,其中每个 dequeue 都是前一个 dequeue 大小的两倍。
// 一旦 dequeue 填满,就会分配一个新的 dequeue,并且 pushHead 只会 push 到最新的 dequeue。
// pop 可以从头部或者尾部进行,一旦 dequeue 空了,它就会从 poolChain 中删除。
//
// poolChain 的实现是一个双向链表,每个 poolChainElt 都是一个 poolDequeue。
//(也就是元素是 poolDequeue 的双向链表)
type poolChain struct {
// head 是要推送到的 poolDequeue。
// 只能由生产者访问,因此不需要同步。
head *poolChainElt

// tail 是从 poolDequeue 中 pop 的节点。
// 这是由消费者访问的,因此读取和写入必须是原子的。
tail *poolChainElt
}

// poolChainElt 是 poolChain 中的元素。
type poolChainElt struct {
poolDequeue

// next 和 prev 链接到此 poolChain 中相邻的 poolChainElts。
//
// next 由生产者原子写入,由消费者原子读取。 它只从 nil 过渡到 non-nil。
// prev 由消费者原子写入,由生产者原子读取。 它只会从 non-nil 过渡到 nil。
next, prev *poolChainElt
}

// poolDequeue 是一个无锁的固定大小的单生产者、多消费者队列。
// 单个生产者既可以从头部 push 也可以从头部 pop,消费者只可以从尾部 pop。
//
// 它有一个附加功能,它会清空不使用的槽,从而避免不必要的对象保留。
type poolDequeue struct {
// headTail 将 32 位头索引和 32 位尾索引打包在一起。
// 两者都是 vals modulo len(vals)-1 的索引。
//
// tail = 队列尾的索引
// head = 下一个要填充的插槽的索引
//
// [tail, head) 范围内的槽位归消费者所有。
// 消费者继续拥有此范围之外的槽,直到它清空槽,此时所有权传递给生产者。
//
// 头索引存储在最高有效位中,以便我们可以原子地对它做 add 操作,同时溢出是无害的。
headTail uint64

// vals 是存储在此 dequeue 中的 interface{} 值的环形缓冲区。它的大小必须是 2 的幂。
// (队列存储的元素,接口类型)
//
// 如果插槽为空,则 vals[i].typ 为 nil,否则为非 nil。
// 一个插槽仍在使用中,直到 *both* 尾部索引超出它并且 typ 已设置为 nil。(both?)
// 这由消费者原子地设置为 nil,并由生产者原子地读取。
vals []eface
}

poolChain 的结构大概如下:

pool_12

poolDequeue 的结构大概如下:

pool_dequeue

poolDequeue 中,headTail 是一个 uint64 类型,它的高 32 位是 head,低 32 位是 tail。 这样一来,这样就可以使用原子操作来保证 headtail 的协程安全了。

poolDequeue 中,vals 是一个切片类型,元素类型是 efaceeface 是一个空接口类型,内存布局跟 interface{} 一样, 因此可以看作是一个 interface{} 类型。如果这里看不明白可以看看《go interface 设计与实现》

那为什么不直接使用 interface{} 类型呢?因为如果用了 interface{} 类型, 那么 poolDequeue 就无法区分保存的 valnil 还是一个空的槽。 那有什么解决办法呢?在 pushpop 的时候使用互斥锁可以解决这个问题(因为目前的实现是使用原子操作的,所以这才需要判断保存的 valnil 还是空槽), 但是这样就会导致性能下降。

Pool 源码剖析

在源码剖析的开始部分,不会深入去讲底层的存取细节,我们将其当作一个抽象的队列来看待即可,这样可能会更加便于理解。不过在讲完 Pool 的实现之后,最后还是会展开讲述这个复杂的 "队列" 的那些实现细节。

接下来,我们来看看 Pool 的源码实现。 Pool 提供的接口非常简单,只有 PutGet 两个方法,还有一个 New 字段,用来指定 Pool 中的元素是如何创建的:

  • New 属性:New 是一个函数,用来创建 Pool 中的元素。
  • Put 方法:Put 方法用来向 Pool 中放入一个元素。
  • Get 方法:Get 方法用来从 Pool 中获取一个元素。

Get

Get 方法的实现如下:

GetPool 中选择一个任意项,将其从 Pool 中移除,并将其返回给调用者。 Get 可能会选择忽略池并将其视为空的。 调用方不应假定传递给 Put 的值与 Get 返回的值之间存在任何关系。 (PutGet 之间可能会发生 GC,然后 Pool 里面的元素可能会被 GC 回收掉)

如果 Get 否则返回 nil 并且 p.New 不为 nil,则 Get 返回调用 p.New 的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Get 从 Pool 中获取一个对象
func (p *Pool) Get() any {
// ...
// pin 将当前的 goroutine 和 P 绑定,禁止被抢占,返回当前 P 的本地缓存(poolLocal)和 P 的 ID。
l, pid := p.pin()
// 先看 private 是否为 nil,如果不为 nil,就直接返回 private,并将 private 置为 nil。
x := l.private
l.private = nil
if x == nil {
// 尝试从本地 shared 的头部取。
x, _ = l.shared.popHead()
if x == nil { // 如果本地 shared 的头部取不到,就从其他 P 的 shared 的尾部取。
x = p.getSlow(pid)
}
}
// 将当前的 goroutine 和 P 解绑,允许被抢占。
runtime_procUnpin()
// ...
// 如果 x 为 nil 并且 p.New 不为 nil,则返回 p.New() 的结果。
// 没有就 New 一个。
if x == nil && p.New != nil {
x = p.New()
}
return x
}

Pool Get 的流程可以总结如下:

  1. 将当前的 goroutineP 绑定,禁止被抢占,返回当前 P 的本地缓存(poolLocal)和 PID
  2. 从本地 private 取,如果取不到,就从本地 shared 的头部取,如果取不到,就从其他 Pshared 的尾部取。获取到则返回
  3. 如果从其他的 Pshared 的尾部也获取不到,则从 victim 获取。获取到则返回
  4. 将当前的 goroutineP 解绑,允许被抢占。
  5. 如果 p.New 不为 nil,则返回 p.New 的结果。

再贴一下上面那个图(当然,下图包含了下面的 getSlow 的流程,并不只是 Get):

pool_10

Pool 中一个很关键的操作是 pin,它的作用是将当前的 goroutineP 绑定,禁止被抢占。 这样就可以保证在 GetPut 的时候,都可以获取到当前 P 的本地缓存(poolLocal), 否则,有可能在 GetPut 的时候,P 会被抢占,导致获取到的 poolLocal 不一致,这样 poolLocal 就会失去意义, 不得不再次陷入跟其他 goroutine 竞争的状态,又不得不考虑在如何在不同 goroutine 之间进行同步了。

而绑定了 P 后,在 GetPut 的时候,就可以使用原子操作来代替其他更大粒度的锁了, 但是我们也不必太担心,因为绑定 P 的时间窗口其实很小。

getSlow 源码剖析

Get 中,如果从 privateshared 中都取不到,就会调用 getSlow 方法。它的作用是:

  1. 尝试从其他 Pshared 的尾部取。
  2. 尝试从 victim 获取。

getSlow 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 从其他 P 的 shared 的尾部取。
func (p *Pool) getSlow(pid int) any {
// 获取 local 的大小和 local。
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 尝试从其他 P 的 shared 的尾部取。
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size)) // pid+i+1 的用途从下一个 P 开始取。
if x, _ := l.shared.popTail(); x != nil { // 尝试从每一个 P 的 shared 的尾部取,获取到则返回。
return x
}
}

// 尝试从 victim cache 取。
// 我们在尝试从所有主缓存中偷取之后执行此操作,
// 因为我们希望 victim cache 中的对象尽可能地老化。
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size { // 如果 pid 大于 size,会发生越界,直接返回 nil。这意味着 gomaxprocs 相比上一次 poolCleanup 的时候变大了。
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil { // victim 实际上也是一个 poolLocal 数组,每个 poolLocal 都有一个 private 字段,这个字段就是 victim cache。
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}

// 将 victim cache 标记为空,以便将来的 Get 不会再考虑它。
atomic.StoreUintptr(&p.victimSize, 0)

return nil
}

pin 源码剖析

pin 方法的实现如下:

pin 将当前 goroutine 固定到 P 上,禁用抢占并返回 poolLocal 池中对应的 poolLocal。 调用方必须在完成取值后调用 runtime_procUnpin() 来取消抢占。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 将当前的 goroutine 和 P 绑定,禁止被抢。
func (p *Pool) pin() (*poolLocal, int) {
// procPin 函数的目的是为了当前 G 绑定到 P 上。
pid := runtime_procPin() // 返回当前 P 的 id。

// 在 pinSlow 中,我们会存储 local,然后再存储 localSize,
// 这里我们以相反的顺序读取。 由于我们禁用了抢占,
// 因此 GC 不能在两者之间发生。
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s { // pid < s,说明当前 P 已经初始化过了。
return indexLocal(l, pid), pid // 返回当前 P 的 poolLocal。
}
return p.pinSlow() // 如果当前 P 没有初始化过,那么就调用 pinSlow()。
}

func (p *Pool) pinSlow() (*poolLocal, int) {
// 在互斥锁下重试。
// 在固定时无法锁定互斥锁。
runtime_procUnpin() // 解除当前 P 的绑定。
allPoolsMu.Lock() // 加全局锁。
defer allPoolsMu.Unlock() // 解锁。
pid := runtime_procPin() // 重新绑定当前 P。
// 在固定时不会调用 poolCleanup。(无法被抢占,GC 不会发生)
s := p.localSize
l := p.local
if uintptr(pid) < s { // 这其实是一个 double-checking,如果在加锁期间,其他 goroutine 已经初始化过了,就直接返回。
return indexLocal(l, pid), pid
}

// p.local == nil 说明 pool 还没有初始化过。
if p.local == nil { // 如果当前 P 没有初始化过,那么就将当前 P 添加到 allPools 中。
allPools = append(allPools, p)
}

// 当 local 数组为空,或者和当前的 runtime.GOMAXPROCS 不一致时,
// 将触发重新创建 local 数组,以和 P 的个数保持一致。
// 如果在 GC 之间更改了 GOMAXPROCS,我们将重新分配数组并丢弃旧数组。
size := runtime.GOMAXPROCS(0) // 获取当前 GOMAXPROCS(也就是 P 的个数)
local := make([]poolLocal, size) // 创建一个 poolLocal 数组
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release // 将当前 P 的 poolLocal 添加到 p.local 中
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release // 将当前 P 的 poolLocal 添加到 p.localSize 中
return &local[pid], pid // 返回当前 P 关联的 poolLocal,以及当前 P 的 id。
}

pinSlow 中比较关键的操作是,它会初始化当前 P 关联的 poolLocal,并将其添加到 allPools 中。 关于 allPools 的作用,可以看下一小节。

pinSlow 的流程: 1. 解除当前 P 的绑定。 2. 加全局 Pool 的锁。 3. 重新绑定当前 P。 4. 如果当前 Pid 小于 localSize,那么就返回当前 PpoolLocal。(典型的 double-checking) 5. 如果 local 还没初始化,那么将当前 PpoolLocal 添加到 allPools 中。 6. 初始化 local。最后返回当前 PpoolLocal

对于 local 的初始化,我们可以参考一下下图(我们需要知道的是,切片的底层结构体的第一个字段是一个数组):

pool_13

&local[0][]poolLocal 的首地址,unsafe.Pointer(&local[0]) 就是 poolLocal 数组的首地址。

indexLocal 源码剖析

我们在上面的代码中还可以看到一个 indexLocal 函数,它的作用是返回 poolLocal 数组中的第 i 个元素。

1
2
3
4
5
6
// l 指向了 poolLocal 数组的首地址,i 是数组的索引。
// 返回了数组中第 i 个元素,其类型是 poolLocal。
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}

我们之前在看 Pool 结构体的时候,看到过 local 字段,它的类型是 unsafe.Pointer,也就是一个指针。 然后我们再结合一下 indexLocal 的实现,就可以知道 local 字段指向的是一个 poolLocal 数组了。 其中 lpoolLocal 数组的首地址,i 是数组的索引,unsafe.Sizeof(poolLocal{})poolLocal 的大小。

这一小节没看懂可以参考一下我写的另外一篇文章 《深入理解 go unsafe》

allPools 的作用

allPools 的目的是在 GC 的时候,遍历所有的 Pool 对象,将其中的 victim 替换为 local,然后将 local 设置为 nil。 这样后续的 Get 操作在 local 获取不到的时候,可以从 victim 中获取。一定程度上减少了 GC 后的性能抖动。

Pool 中,还定义了下面几个全局变量:

1
2
3
4
5
6
7
8
9
10
11
var (
// 保护 allPools 和 oldPools 的互斥锁。
allPoolsMu Mutex

// allPools 是所有非空 primary cache 的 pool 的集合。
// 该集合由 allPoolsMu 和 pinning 保护,或者 STW 保护。
allPools []*Pool

// oldPools 是所有可能非空 victim cache 的 pool 的集合。
oldPools []*Pool
)

如果我们第一次看这些变量,可能会有点懵,不知道它们的作用是什么。 我们可以再结合一下 poolCleanup 函数的实现,就可以知道它们的作用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func poolCleanup() {
// 在垃圾收集开始时,这个函数会被调用。
// 它不能分配并且可能不应该调用任何运行时函数。

// 从 allPools 中删除 victim 缓存。
for _, p := range oldPools {
p.victim = nil // 作用是让 GC 可以回收 victim 缓存中的对象。
p.victimSize = 0
}

// 将 primary 缓存移动到 victim 缓存。
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}

// 具有非空主缓存的池现在具有非空 victim 缓存,并且没有池具有 primary 缓存。
oldPools, allPools = allPools, nil
}

poolCleanup 这个函数会在 GC 开始的时候被调用,它的作用是将 local 移动到 victim 中。 同时,将 victim 置为 nil,这样 GC 就可以回收 victim 中的对象了,也就是说要经历两轮 GC local 才会真正地被回收。 也就意味着,GC 的时候,local 其实并没有被回收,而是被移动到了 victim 中。

poolCleanup 可以用下图表示,实际上就是使用 locallocalSize 覆盖 victimvictimSize

pool_14

Put

Put 的实现比较简单,就是将对象放到 local 中,不需要 Get 那种操作其他 P 绑定的 poolLocal 的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Put 将 x 添加到 Pool 中。
func (p *Pool) Put(x any) {
if x == nil {
return
}
// ...
// 获取 poolLocal
l, _ := p.pin() // 将当前 goroutine 与 P 绑定。获取当前 P 关联的 poolLocal,以及当前 P 的 id。
if l.private == nil { // 优先放入 private
l.private = x
} else { // 如果 private 已经有值了,就放入 shared
l.shared.pushHead(x) // 这部分其他 P 也是可以访问的。
}
runtime_procUnpin() // 解除当前 P 的绑定。
// ...
}

Pool Put 的流程: 1. 如果 Put 的值是 nil,则直接返回。 2. 将当前的 goroutineP 绑定,禁止被抢占,返回当前 P 的本地缓存(poolLocal)和 PID。 3. 如果本地 private 为空,则将 x 放入本地 private。 4. 如果本地 private 不为空,则将 x 放入本地 shared 的头部。 5. 将当前的 goroutineP 解绑,允许被抢占。

这里面的 pinruntime_procUnpin,我们在前文已经介绍过了,这里就不再赘述了。

New

这里说的 Newsync.Pool 中的 New 字段,在我们尝试了所有方法都获取不到对象的时候, 会判断 PoolNew 属性是否为 nil,如果不为 nil,则会调用 New 方法,创建一个新的对象。

poolChain 和 poolDequeue 源码剖析

poolChain 是一个双向链表,它的每一个节点的元素是 poolDequeue

在上文中,对于 GetPut 的细节,还没有具体展开,因为不了解这些细节也不影响我们理解 Pool 的整体流程。 现在是时候来看看 poolChainpoolDequeue 这两个结构体的实现了,会结合起来一起看。

poolChainpoolDequeue 里面都提供了以下三个方法:

  • pushHead:将对象放到队列的头部。
  • popHead:从队列的头部取出一个对象。
  • popTail:从队列的尾部取出一个对象。

不一样的是,poolChain 里面的方法会处理链表节点的创建和销毁,而 poolDequeue 里面的方法才是实际从队列存取对象的方法。

pushHead

poolChainpushHead 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 添加一个元素到队列头部
func (c *poolChain) pushHead(val any) {
// 链表头
d := c.head
if d == nil { // 链表为空
// 初始化链表。
// 新建 poolChainElt,然后 c 的 head 和 tail 都指向这个新建的元素。
const initSize = 8 // 初始化大小为 8
// 新建一个节点,类型为 poolChainElt
d = new(poolChainElt)
d.vals = make([]eface, initSize)
// 将 c 的 head 和 tail 都指向这个新建的元素
c.head = d
// 使用原子操作保存 c.tail,因为其他 goroutine 也可能会修改 c.tail。
storePoolChainElt(&c.tail, d)
}

// poolQueue 还没满的时候可以成功 push,pushHead 会返回 true。
// poolQueue 满的时候 pushHead 返回 false。
if d.pushHead(val) {
return
}

// 当前 dequeue 已满。分配一个两倍大小的新 dequeue。
newSize := len(d.vals) * 2
if newSize >= dequeueLimit { // 限制单个 dequeue 的最大大小
newSize = dequeueLimit
}

// 新建 poolChainElt,然后 c 的 head 指向这个新建的元素。
// 同时,d 的 next 指向这个新建的元素。
d2 := &poolChainElt{prev: d} // 因为是加到队列头,所以 prev 指向 d
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}

poolChainpushHead 方法的流程:

  1. 如果链表为空,那么就初始化链表。
  2. 如果链表不为空,那么就尝试 pushHead
  3. 如果 pushHead 失败,那么就分配一个两倍大小的新 dequeue
  4. 然后把新 dequeue 放到链表头部。(push 的时候已经锁定了 goroutineP 上,所以这一步是没有并发问题的)

poolChainpushHead 方法中,唯一需要特别注意的是 storePoolChainElt(&c.tail, d) 这一行代码。 这里使用了 storePoolChainElt 方法,而不是直接使用 c.tail = d。 这是因为 c.tail 是会和其他 goroutine 存在竞争的(其他 goroutine 获取对象的时候可能会修改 tail),因此不能直接赋值,而是使用了原子操作。

poolChainpushHead 方法的流程图如下:

pool_15

队列头来说,prev 实际上指向的是 head 的下一个元素,但是又不能叫 next,因为 next 被用来表示 tail 的下一个元素,所以就叫了 prev。我们需要知道 prevnext 本质上都是指向了下一个元素,就看你是从队列头还是队列尾来查找。

poolChain 中,其实实际存储对象的时候并不是在 poolChain,而是在 poolChain 的每一个节点中的 poolDequeue 中。 所以我们再来看看 poolDequeue 中的 pushHead 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// pushHead 在队列的头部添加 val。
// 如果队列已满,则返回 false。
// 它只能由单个生产者调用。(也就是当前 goroutine 绑定的 P)
//
// 注意:head 指向的是下一个要插入的元素的位置,所以插入的时候,先将 head 指向的位置设置为 val,然后 head++。
func (d *poolDequeue) pushHead(val any) bool {
// 读取 head 和 tail 的值。
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { // 队列满了
// 不能直接 tail == head,因为初始化的时候,head 和 tail 都是 0
return false
}
// 取模意味着,当 head 超过 len(d.vals) 时,会从头开始。也就是一个环。
slot := &d.vals[head&uint32(len(d.vals)-1)] // 获取 head 对应的槽位。

// push 只有当前协程能 push,所以不需要加锁。
// 但是 popTail 可能会在另一个协程中执行,所以需要判断当前的槽位是否被 popTail 释放了。
// 因为 popTail 的操作是先 cas 修改 headTail,然后再获取 slot 的值,最后才将 slot 置 0 的。
// 如果修改了 headTail 之后还没有来得及将 slot 置 0,那么这里就会判断出槽位还没有被释放。
typ := atomic.LoadPointer(&slot.typ) // 获取槽位的类型
if typ != nil { // 槽位不为空
// 队列依然是满的
return false
}

// 如果 typ 已经是 nil,那么这里后续的操作是安全的。

if val == nil { // put 进来的值是 nil,使用 dequeueNil 代替
val = dequeueNil(nil)
}
// 将 val 赋值给槽位
*(*any)(unsafe.Pointer(slot)) = val

// 增加 head。为什么是 1<<dequeueBits 呢?
// 因为 head 是高 32 位,所以要左移 32 位
// 本质上是:head = head + 1
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}

poolDequeuepushHead 方法的流程:

pushHead 的处理流程: 1. 判断队列是否已满,如果已满则返回 false。 2. 获取下一个 push 的位置,先判断这个位置是否可用,如果不可用则返回 false。(可能和 popTail 冲突,如果 popTail 没来得及将其中的值取出来,那么这个槽就还不能使用) 3. 如果可用,则将值放入这个位置,然后将 head 指针加 1

poolDequeue 中,我们看到有一行代码比较奇怪:atomic.LoadUint64(&d.headTail),这是为了可以原子操作存取 headtail 两个值。这样就可以避免锁的使用了。

popHead

poolChainpopHead 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// popHead 会从链表头部 pop 出一个元素。
// 返回值:
// 1. any:pop 出的元素。
// 2. bool:是否成功 pop 出元素。
func (c *poolChain) popHead() (any, bool) {
d := c.head
// d == nil 的情况:
// 1. 链表为空。
// 2. 链表只有一个元素,且这个元素已经 pop 完了。(被其他协程 pop 了)
for d != nil {
// 这是因为,在我们拿到 d 之后,还没来得及 pop 的话,其他协程可能已经 pop 了。
// 所以需要 for 循环。典型的无锁编程。
//
// poolQueue 还没空的时候可以成功 pop,popHead 会返回 true。
if val, ok := d.popHead(); ok {
return val, ok
}

// 之前的 dequeue 中可能仍有未消费的元素,因此尝试后退。
d = loadPoolChainElt(&d.prev) // 获取下一个 poolChainElt
}
return nil, false
}

prev 虽然从命名上看是前一个,但是实际上是下一个节点。从 head 开始遍历的话,prev 就是下一个节点,从 tail 开始遍历的话,next 就是下一个节点。

poolChainpopHead 的处理流程:

  1. 如果链表为空,那么就返回 false
  2. 如果链表不为空,那么就尝试 popHead
  3. 如果 popHead 失败,那么就尝试从链表下一个 dequeue popHead。(循环直到最后一个 poolChainElt

poolChainpopHead 方法的流程图如下:

pool_16

poolChain 中,实际上调用的是 poolDequeuepopHead 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// popHead 删除并返回队列头部的元素。
// 如果队列为空,则返回 false。
// 它只能由单个生产者调用。(也就是当前 goroutine 绑定的 P)
func (d *poolDequeue) popHead() (any, bool) {
// slot 用来保存从队列头部取出的值
var slot *eface
for { // 获取不到槽会继续循环,直到获取到槽或者发现队列为空为止。
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head { // 队列为空
return nil, false
}

// 先将 head 减 1,然后再获取槽位。
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// 成功获取到槽
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}

// 成功获取到 slot,将 slot 的值取出来
// head - 1 了,说明这个槽是可以被安全使用的,所以不需要加锁。
// 因为 popTail 不会影响到 head,所以不会影响到这里。
// 另外,pushHead 也没有影响,因为在实际使用中,只有一个协程会 pushHead。

val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nils
}
*slot = eface{} // 将 slot 置 0
return val, true
}

poolDequeuepopHead 的处理流程:

  1. 判断队列是否为空,如果为空则返回 false
  2. 尝试将 head 指针减 1,如果失败则进行下一轮尝试(自旋,for + 原子操作是无锁编程中很常见的写法)。
  3. head 指针对应的槽位的值取出来,然后将槽位置为 nil

popTail

poolChainpopTail 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 从队列尾取出一个元素
func (c *poolChain) popTail() (any, bool) {
// 获取链表尾部的 poolChainElt
// 如果链表为空,返回 nil,false
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}

for {
// 在我们 popTail 之前获取 next 指针很重要。
// 一般来说,d 可能暂时为空,但如果 next 在 pop 之前为非 nil,
// 并且 pop 失败,则 d 永久为空,这是唯一可以安全地将 d 从链中删除的条件。
//
// 解析:next 非 nil,但是 pop 失败:d 肯定是空的了,这个时候我们可以安全地将 d 从链表中删除。
d2 := loadPoolChainElt(&d.next)

// poolQueue 还没空的时候可以成功 pop,popTail 会返回 true。
if val, ok := d.popTail(); ok {
return val, ok
}

// 队列已经空了
if d2 == nil {
// d 是唯一的 dequeue。它现在是空的,但以后可能会有新的元素 push 进来。
return nil, false
}

// 链表的尾部已经被消费完了,所以转到下一个 dequeue。
// 尝试从链表中删除它,这样下一次 pop 就不必再次查看空的 dequeue。
// (本质:移除空的 tail 元素)
//
// 开始处理 d2,d2 是 d 的下一个 dequeue。(开始尝试从 d2 中 pop)
// cas:c.tail 由 d 变为 d2。
// 如果 cas 成功,说明 d2 是最新的 tail,d 可以被移除。
// d2 的 prev 指针设置为 nil,这样 gc 可以回收 d。(d2 没有下一个元素了)
//
// c.tail(d) 指向下一个 poolChainElt
// 同时下一个 poolChainElt 的 prev 指针(d)设置为 nil。
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// 我们赢得了竞争。清除 prev 指针,以便垃圾收集器可以收集空的 dequeue,
// 以便 popHead 的时候不做多余的查找操作。
storePoolChainElt(&d2.prev, nil)
}
// d 指向 d2,继续处理下一个 dequeue
d = d2
}
}

poolChainpopTail 的处理流程:

  1. 如果链表为空,那么就返回 false
  2. 如果链表不为空,那么就尝试 popTail
  3. 如果 popTail 失败,那么就尝试从链表上一个 dequeue popTail。(循环直到第一个 poolChainElt

poolChainpopTail 方法的流程图如下:

pool_17

popTail 的时候,如果发现 poolChainElt 已经为空了,那么就会从链表中移除它:

pool_18

poolDequeuepopTail 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// popHead 和 popTail 都有取值,然后将槽置空的过程,但是它们的实现是不一样的。
// 在 popHead 中,是直接将槽的值设置为 eface{},而在 popTail 中,
// 先将 val 设置为 nil,然后将 typ 通过原子操作设置为 nil。
// 这样在 pushHead 的时候就可以安全操作了,只要先使用原子操作判断 typ 是否为 nil 就可以了。

// popTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
//
// popTail 删除并返回队列尾部的元素。
// 如果队列为空,则返回 false。
// 它可以被任意数量的消费者调用。(如何保证并发安全?原子操作)
func (d *poolDequeue) popTail() (any, bool) {
// slot 用来保存从队列尾取出来的值
var slot *eface
// 获取队列尾部的值
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
// 队列为空,直接返回
return nil, false
}

// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
//
// 确认 head 和 tail(对于我们上面的推测性检查)并增加 tail。
// 如果成功,那么我们就拥有 tail 的插槽。
ptrs2 := d.pack(head, tail+1) // 新的 headTail
// 如果返回 false,说明从 Load 到 CompareAndSwap 期间,有其他 goroutine 修改了 headTail。
// 则需要重新 Load,再次尝试(再次执行 for 循环)。
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}

// 成功获取到 slot,将 slot 的值取出来
// 问题来了:
// 在这里 cas 成功的时候,这个 slot 实际上可能是还没有释放的,在这个时候,pushHead 其实不能写入到这个 slot 中。
// 因此,我们可以在 pushHead 的代码中看到,会先判断 slot.typ 是否为 nil,如果不为 nil,说明 slot 还没有被释放,那么就直接 return 了。
// 这种情况发生在 poolDequeue 满了的时候。

// We now own slot.
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) { // 这是什么情况?非空,但是值等于 dequeueNil(nil) ?
val = nil
}

// 取出值之后,将 slot 置 0。
// 在 poolDequeue 中,值是允许为 nil 的,但是 pool 的 Put 中判断值为 nil 的时候就直接 return 了。

// 告诉 pushHead 我们已经操作完这个插槽。
// 将插槽归零也很重要,这样我们就不会留下可能使该对象比必要时间更长的引用。
//
// 我们首先写入 val,然后通过原子写入 typ 来发布我们已完成此插槽。
slot.val = nil
atomic.StorePointer(&slot.typ, nil) // 为什么要用原子操作?因为 pushHead 也会读取这个值,所以需要保证读取的是最新的值。

// 此时 pushHead 可以操作这个槽了。

return val, true
}

poolDequeuepopTail 的处理流程:

  1. 判断队列是否为空,如果为空则返回 false
  2. 尝试将 tail 指针加 1,如果失败则进行下一轮尝试(自旋)。
  3. tail 指针对应的槽位的值取出来,然后将槽位置为 nil

sync.Pool 设计要点

sync.Pool 中,我们可以看到它有许许多多的编程技巧,为了实现一个高性能的 Pool 要做的东西是非常复杂的, 但是对于我们而言,我们只会用到它暴露出来的两个非常简单的接口 PutGet,这其实也算是 Go 语言的一种设计哲学吧, 把复杂留给自己,把简单留给用户。但是,我们还是要知道它的实现原理,这样才能更好的使用它。

接下来,我们就再来总结一下 sync.Pool 高性能的一些设计要点:

  1. noCopy 字段,防止 sync.Pool 被复制。sync.Pool 是不能被复制的,否则会导致一些隐晦的错误。
  2. local 字段,用于存储与 P 关联的一个 poolLocal 对象,在多个 goroutine 同时操作的时候,可以减少不同 goroutine 之间的竞争,只有在本地的 poolLocal 中没有找到对象的时候,才会去其他 goroutine 关联的队列中去取。
  3. poolDequeuepushHeadpopTail 之间会存在 headtail 指针上的一些竞争,这些竞争问题是通过原子操作来解决的(相比互斥锁效率更高)。使用了原子操作可能就会有失败的时候,这个时候,再次重试就可以了。
  4. poolDequeue 中的 head/tail 指针使用一个字段来保存,然后通过原子操作保证 head/tail 的一致性。
  5. poolChain 使用链表的方式解决容量问题,并且新增一个元素到链表的时候,容量为上一个元素(poolChain 链表头)的两倍(非常常见的扩容策略)。双向链表,可以接受别的 PpopTail 操作,减少竞争的同时可以充分利用多核。
  6. pin 保证 P 不会被抢占。如果一个 goroutine 在执行 Put 或者 Get 期间被挂起,有可能下次恢复时,绑定的就不是上次的 P 了。那整个过程就会完全乱掉,因为获取到的 poolLocal 不是之前那个了。使用 pin 可以解决这种并发问题。
  7. 自旋操作,因为原子操作失败的时候可能存在竞争,这个时候再尝试一下就可能成功了。(for + 原子操作是无锁编程中很常见的一种编程模式,在 sync.Map 中也有很多类似操作)
  8. pad 内存对齐,可以避免伪共享。
  9. poolDequeue 中存储数据的结构是一个环形队列,是连续的内存,可以充分利用 CPU 的 cache。在访问 poolDequeue 某一项时,其附近的数据项都有可能加载到统一 cache line 中,有利于提升性能。同时它是预先分配内存的,因此其中的数据项可不断复用。

总结

最后,总结一下本文内容:

  • sync.Pool 是一个非常有用的工具,它可以帮助我们减少内存的分配和回收(通过复用对象),提升程序的性能。但是,我们要注意它的使用场景,它适合那些没有状态的对象,同时,我们不能对那些从 PoolGet 出来的对象做任何假设。
  • 我们在 Put 或者 Get 之前,可能需要对我们操作的对象重置一下,防止对后续的操作造成影响。
  • Pool 中的对象存储是使用队列的方式,这个队列的实现是一个链表(poolChain),链表的每一个节点都是一个环形队列(poolDequeue)。这个队列支持以下三种操作:
    • pushHead:将一个对象放到队列的头部。
    • popHead:将队列的头部的对象取出来。
    • popTail:将队列的尾部的对象取出来。
  • sync.Pool 的实现中,使用了很多编程技巧,比如 noCopypinpad、原子操作等等,这些技巧都是为了实现一个高性能的 Pool 而做的一些优化,我们可以学习一下,具体参考上一节。
  • sync.Pool 中,PutGet 操作的时候会先将 goroutineP 绑定,然后再去操作 P 关联的 poolLocal,这样可以减少竞争,提升性能。因为每一个 P 都有一个关联的 poolLocal,所以多个 goroutine 操作的时候,可以充分利用多核。在操作完成后,再解除绑定。
  • 考虑到 GC 直接清除 Pool 中的对象会在 GC 后可能会产生性能抖动,所以在 GC 的时候,其实并不会马上清除 Pool 中的对象,而是将这些对象放到 victim 字段中,在 Get 的过程中,如果所有的 poolLocal 中获取不到对象,则会从 victim 中去找。但是再进行 GC 的时候,旧的 victim 会被清除。也就是 Pool 中对象的淘汰会经历两次 GC

在 go 的标准库中,提供了 sync.Cond 这个并发原语,让我们可以实现多个 goroutine 等待某一条件满足之后再继续执行。 它需要配合 sync.Mutex 一起使用,因为 CondWait 方法需要在 Mutex 的保护下才能正常工作。 对于条件变量,可能大多数人只是知道它的存在,但是用到它的估计寥寥无几,因为很多并发场景的处理都能使用 chan 来实现, 而且 chan 的使用也更加简单。 但是在某些场景下,Cond 可能是最好的选择,本文就来探讨一下 Cond 的使用场景,基本用法,以及它的实现原理。

sync.Cond 是什么?

sync.Cond 表示的是条件变量,它是一种同步机制,用来协调多个 goroutine 之间的同步,当共享资源的状态发生变化的时候, 可以通过条件变量来通知所有等待的 goroutine 去重新获取共享资源。

适用场景

在实际使用中,我们可能会有多个 goroutine 在执行的过程中,由于某一条件不满足而阻塞的情况。 这个时候,我们就可以使用条件变量来实现 goroutine 之间的同步。比如,我们有一个 goroutine 用来获取数据, 但是可能会比较耗时,这个时候,我们就可以使用条件变量来实现 goroutine 之间的同步, 当数据准备好之后,就可以通过条件变量来通知所有等待的 goroutine 去重新获取共享资源。

sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候, 它可以用来通知所有等待的 goroutine 去重新获取共享资源。

sync.Cond 的基本用法

sync.Cond 的基本用法非常简单,我们只需要通过 sync.NewCond 方法来创建一个 Cond 实例, 然后通过 Wait 方法来等待条件满足,通过 Signal 或者 Broadcast 方法来通知所有等待的 goroutine 去重新获取共享资源。

NewCond 创建实例

sync.NewCond 方法用来创建一个 Cond 实例,它的参数是一个 Locker 接口,我们可以传入一个 Mutex 或者 RWMutex 实例。 这个条件变量的 Locker 接口就是用来保护共享资源的。

Wait 等待条件满足

Wait 方法用来等待条件满足,它会先释放 Cond 的锁(Cond.L),然后阻塞当前 goroutine(实际调用的是 goparkunlock),直到被 Signal 或者 Broadcast 唤醒。

它做了如下几件事情:

  1. 释放 Cond 的锁(Cond.L),然后阻塞当前 goroutine。(所以,使用之前需要先锁定)
  2. Signal 或者 Broadcast 唤醒之后,会重新获取 Cond 的锁(Cond.L)。
  3. 之后,就返回到 goroutine 阻塞的地方继续执行。

Signal 通知一个等待的 goroutine

Signal 方法用来通知一个等待的 goroutine,它会唤醒一个等待的 goroutine,然后继续执行当前 goroutine。 如果没有等待的 goroutine,则不会有任何操作。

Broadcast 通知所有等待的 goroutine

Broadcast 方法用来通知所有等待的 goroutine,它会唤醒所有等待的 goroutine,然后继续执行当前 goroutine。 如果没有等待的 goroutine,则不会有任何操作。

sync.Cond 使用实例

下面我们通过一个实例来看一下 sync.Cond 的使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package cond

import (
"fmt"
"sync"
"testing"
"time"
)

var done bool
var data string

func write(c *sync.Cond) {
fmt.Println("writing.")
// 让 reader 先获取锁,模拟条件不满足然后 wait 的情况
time.Sleep(time.Millisecond * 10)
c.L.Lock()
// 模拟耗时的写操作
time.Sleep(time.Millisecond * 50)
data = "hello world"
done = true
fmt.Println("writing done.")
c.L.Unlock()
c.Broadcast()
}

func read(c *sync.Cond) {
fmt.Println("reading")
c.L.Lock()
for !done {
fmt.Println("reader wait.")
c.Wait()
}
fmt.Println("read done.")
fmt.Println("data:", data)
defer c.L.Unlock()
}

func TestCond(t *testing.T) {
var c = sync.NewCond(&sync.Mutex{})

go read(c) // 读操作
go read(c) // 读操作
go write(c) // 写操作

time.Sleep(time.Millisecond * 100) // 等待操作完成
}

输出:

1
2
3
4
5
6
7
8
9
10
reading
reader wait. // 还没获取完数据,需要等待
writing.
reading
reader wait.
writing done. // 获取完数据了,通知所有等待的 reader
read done. // 读取到数据了
data: hello world // 输出读取到的数据
read done.
data: hello world

这个例子可以粗略地用下图来表示:

cond_1

说明:

  • read1reader2 表示两个 goroutine,它们都会调用 read 函数。
  • donefalse 的时候,reader1reader2 都会调用 c.Wait() 函数,然后阻塞等待。
  • write 表示一个 goroutine,它会调用 write 函数。
  • write 函数中,获取完数据之后,会将 done 设置为 true,然后调用 c.Broadcast() 函数,通知所有等待的 reader 去重新获取共享资源。
  • reader1reader2 在解除阻塞状态后,都会重新获取共享资源,然后输出读取到的数据。

在这个例子中,done 的功能是标记,用来表示共享资源是否已经获取完毕,如果没有获取完毕,那么 reader 就会阻塞等待。

为什么要用 sync.Cond?

在文章开头,我们说了,很多并发编程的问题都可以通过 channel 来解决。 同样的,在上面提到的 sync.Cond 的使用场景,使用 channel 也是可以实现的, 我们只要 close(ch) 来关闭 channel 就可以实现通知多个等待的协程了。

那么为什么还要用 sync.Cond 呢? 主要原因是,sync.Cond 可以重复地进行 Wait()Signal()Broadcast() 操作, 但是,如果想通过关闭 chan 来实现这个功能的话,那就只能通知一次了。 因为 channel 只能关闭一次,关闭一个已经关闭的 channel 会导致程序 panic。

使用 channel 的另外一种方式是,记录 reader 的数量,然后通过往 channel 中发送多次数据来实现通知多个 reader。 但是这样一来代码就会复杂很多,从另一个角度说,出错的概率大了很多。

close channel 广播实例

下面的例子模拟了使用 close(chan) 来实现 sync.Cond 中那种广播功能,但是只能通知一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package close_chan

import (
"fmt"
"testing"
"time"
)

var data string

func read(c <-chan struct{}) {
fmt.Println("reading.")

// 从 chan 接收数据,如果 chan 中没有数据,会阻塞。
// 如果能接收到数据,或者 chan 被关闭,会解除阻塞状态。
<-c

fmt.Println("data:", data)
}

func write(c chan struct{}) {
fmt.Println("writing.")
// 模拟耗时的写操作
time.Sleep(time.Millisecond * 10)
data = "hello world"
fmt.Println("write done.")

// 关闭 chan 的时候,会通知所有的 reader
// 所有等待从 chan 接收数据的 goroutine 都会被唤醒
close(c)
}

func TestCloseChan(t *testing.T) {
ch := make(chan struct{})

go read(ch)
go read(ch)
go write(ch)

// 不能关闭已经关闭的 chan
time.Sleep(time.Millisecond * 20)
// panic: close of closed channel
// 下面这行代码会导致 panic
//go write(ch)

time.Sleep(time.Millisecond * 100)
}

输出:

1
2
3
4
5
6
writing.
reading. // 会阻塞直到写完
reading. // 会阻塞直到写完
write done. // 写完之后,才能读
data: hello world
data: hello world

上面例子的 write 不能多次调用,否则会导致 panic。

sync.Cond 基本原理

go 的 sync.Cond 中维护了一个链表,这个链表记录了所有阻塞的 goroutine,也就是由于调用了 Wait 而阻塞的 goroutine。 而 SignalBroadcast 方法就是用来唤醒这个链表中的 goroutine 的。 Signal 方法只会唤醒链表中的第一个 goroutine,而 Broadcast 方法会唤醒链表中的所有 goroutine

下图是 Signal 方法的效果,可以看到,Signal 方法只会唤醒链表中的第一个 goroutine

cond_2

说明:

  • notifyListsync.Cond 中维护的一个链表,这个链表记录了所有阻塞的 goroutine
  • head 是链表的头节点,tail 是链表的尾节点。
  • Signal 方法只会唤醒链表中的第一个 goroutine

Broadcast 方法会唤醒 notifyList 中的所有 goroutine

sync.Cond 的设计与实现

最后,我们来看一下 sync.Cond 的设计与实现。

sync.Cond 模型

sync.Cond 的模型如下所示:

1
2
3
4
5
6
7
8
9
type Cond struct {
noCopy noCopy

// L is held while observing or changing the condition
L Locker // L 在观察或改变条件时被持有

notify notifyList
checker copyChecker
}

属性说明:

  • noCopy 是一个空结构体,用来检查 sync.Cond 是否被复制。(在编译前通过 go vet 命令来检查)
  • L 是一个 Locker 接口,用来保护条件变量。
  • notify 是一个 notifyList 类型,用来记录所有阻塞的 goroutine
  • checker 是一个 copyChecker 类型,用来检查 sync.Cond 是否被复制。(如果在运行时被复制,会导致 panic

notifyList 结构体

notifyListsync.Cond 中维护的一个链表,这个链表记录了所有因为共享资源还没准备好而阻塞的 goroutine。它的定义如下所示:

1
2
3
4
5
6
7
8
9
type notifyList struct {
wait atomic.Uint32
notify uint32

// 阻塞的 waiter 名单。
lock mutex // 锁
head *sudog // 阻塞的 goroutine 链表(链表头)
tail *sudog // 阻塞的 goroutine 链表(链表尾)
}

属性说明:

  • wait 是下一个 waiter 的编号。它在锁外自动递增。
  • notify 是下一个要通知的 waiter 的编号。它可以在锁外读取,但只能在持有锁的情况下写入。
  • lock 是一个 mutex 类型,用来保护 notifyList
  • head 是一个 sudog 类型,用来记录阻塞的 goroutine 链表的头节点。
  • tail 是一个 sudog 类型,用来记录阻塞的 goroutine 链表的尾节点。

notifyList 的方法说明:

notifyList 中包含了几个操作阻塞的 goroutine 链表的方法。

  • notifyListAdd 方法将 waiter 的编号加 1。
  • notifyListWait 方法将当前的 goroutine 加入到 notifyList 中。(也就是将当前协程挂起)
  • notifyListNotifyOne 方法将 notifyList 中的第一个 goroutine 唤醒。
  • notifyListNotifyAll 方法将 notifyList 中的所有 goroutine 唤醒。
  • notifyListCheck 方法检查 notifyList 的大小是否正确。

sync.Cond 的方法

notifyList 就不细说了,本文重点讲解一下 sync.Cond 的实现。

Wait 方法

Wait 方法用在当条件不满足的时候,将当前运行的协程挂起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *Cond) Wait() {
// 检查是否被复制
c.checker.check()
// 更新 notifyList 中需要等待的 waiter 的数量
// 返回当前需要插入 notifyList 的编号
t := runtime_notifyListAdd(&c.notify)
// 解锁
c.L.Unlock()
// 挂起当前 g,直到被唤醒
runtime_notifyListWait(&c.notify, t)
// 唤醒之后,重新加锁。
// 因为阻塞之前解锁了。
c.L.Lock()
}

对于 Wait 方法,我们需要注意的是,使用之前,我们需要先调用 L.Lock() 方法加锁,然后再调用 Wait 方法,否则会报错。

文档里面的例子:

1
2
3
4
5
6
7
c.L.Lock()
for !condition() {
c.Wait()
}
// ...使用条件...
// 这里是我们在条件满足之后,需要执行的代码。
c.L.Unlock()

好了,问题来了,调用 Wait 方法之前为什么要先加锁呢?

这是因为在我们使用共享资源的时候,可能一些代码是互斥的,所以我们需要加锁。 这样我们就可以保证在我们使用共享资源的时候,不会被其他协程修改。 但是如果因为条件不满足,我们需要等待的话,我们不可能在持有锁的情况下等待, 因为在修改条件的时候,可能也需要加锁,这样就会造成死锁。

另外一个问题是,为什么要使用 for 来检查条件是否满足,而不是使用 if 呢?

这是因为在我们调用 Wait 方法之后,可能会有其他协程唤醒我们,但是条件并没有满足, 这个时候依然是需要继续 Wait 的。

Signal 方法

Signal 方法用在当条件满足的时候,将 notifyList 中的第一个 goroutine 唤醒。

1
2
3
4
5
6
func (c *Cond) Signal() {
// 检查 sync.Cond 是否被复制了
c.checker.check()
// 唤醒 notifyList 中的第一个 goroutine
runtime_notifyListNotifyOne(&c.notify)
}

Broadcast 方法

Broadcast 方法用在当条件满足的时候,将 notifyList 中的所有 goroutine 唤醒。

1
2
3
4
5
6
func (c *Cond) Broadcast() {
// 检查 sync.Cond 是否被复制了
c.checker.check()
// 唤醒 notifyList 中的所有 goroutine
runtime_notifyListNotifyAll(&c.notify)
}

copyChecker 结构体

copyChecker 结构体用来检查 sync.Cond 是否被复制。它实际上只是一个 uintptr 类型的值。

1
2
3
4
5
6
7
8
9
10
type copyChecker uintptr

// check 方法检查 copyChecker 是否被复制了。
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}

copyChecker 的值只有两种可能:

  1. 0,表示还没有调用过 Wait, SignalBroadcast 方法。
  2. uintptr(unsafe.Pointer(&copyChecker)),表示已经调用过 Wait, SignalBroadcast 方法。在这几个方法里面会调用 check 方法,所以 copyChecker 的值会被修改。

所以如果 copyChecker 的值不是 0,也不是 uintptr(unsafe.Pointer(&copyChecker))(也就是最初的 copyChecker 的内存地址),则表示 copyChecker 被复制了。

需要注意的是,这个方法在调用 CompareAndSwapUintptr 还会检查一下,这是因为有可能会并发调用 CompareAndSwapUintptr, 如果另外一个协程调用了 CompareAndSwapUintptr 并且成功了,那么当前协程的这个 CompareAndSwapUintptr 调用会返回 false, 这个时候就需要检查是否是因为另外一个协程调用了 CompareAndSwapUintptr 而导致的,如果是的话,就不会 panic

为什么 sync.Cond 不能被复制?

从上一小节中我们可以看到,sync.Cond 其实是不允许被复制的,但是如果是在调用 Wait, SignalBroadcast 方法之前复制,那倒是没关系。

这是因为 sync.Cond 中维护了一个阻塞的 goroutine 列表。如果 sync.Cond 被复制了,那么这个列表就会被复制,这样就会导致两个 sync.Cond 都包含了这个列表;但是我们唤醒的时候,只会有其中一个 sync.Cond 被唤醒,另外一个 sync.Cond 就会一直阻塞。 所以 go 直接从语言层面限制了这种情况,不允许 sync.Cond 被复制。

总结

  • sync.Cond 是一个条件变量,它可以用来协调多个 goroutine 之间的同步,当条件满足的时候,去通知那些因为条件不满足被阻塞的 goroutine 继续执行。
  • sync.Cond 的接口比较简单,只有 Wait, SignalBroadcast 三个方法。
    • Wait 方法用来阻塞当前 goroutine,直到条件满足。调用 Wait 方法之前,需要先调用 L.Lock 方法加锁。
    • Signal 方法用来唤醒 notifyList 中的第一个 goroutine
    • Broadcast 方法用来唤醒 notifyList 中的所有 goroutine
  • sync.Cond 的实现也比较简单,它的核心就是 notifyList,它是一个链表,用来保存所有因为条件不满足而被阻塞的 goroutine
  • 用关闭 channel 的方式也可以实现类似的广播功能,但是有个问题是 channel 不能被重复关闭,所以这种方式无法被多次使用。也就是说使用这种方式无法多次广播。
  • 使用 channel 发送通知的方式也是可以的,但是这样实现起来就复杂很多了,就更容易出错了。
  • sync.Cond 中使用 copyChecker 来检查 sync.Cond 是否被复制,如果被复制了,就会 panic。需要注意的是,这里的复制是指调用了 WaitSignalBroadcast 方法之后,sync.Cond 被复制了。在调用这几个方法之前进行复制是没有影响的。