在我们前面的一些介绍 sync 包相关的文章中,我们应该也发现了,其中有不少地方使用了原子操作。 比如 sync.WaitGroupsync.Map 再到 sync.Pool,这些结构体的实现中都有原子操作的身影。 原子操作在并发编程中是一种非常重要的操作,它可以保证并发安全,而且效率也很高。 本文将会深入探讨一下 go 中原子操作的原理、使用场景、用法等内容。

什么是原子操作?

原子操作是变量级别的互斥锁。

如果让我用一句话来说明什么是原子操作,那就是:原子操作是变量级别的互斥锁。 简单来说,就是同一时刻,只能有一个 CPU 对变量进行读或写。 当我们想要对某个变量做并发安全的修改,除了使用官方提供的 Mutex,还可以使用 sync/atomic 包的原子操作, 它能够保证对变量的读取或修改期间不被其他的协程所影响。

我们可以用下图来表示:

atomic_1

说明:在上图中,我们有三个 CPU 逻辑核,其中 CPU 1 正在对变量 v 做原子操作,这个时候 CPU 2 和 CPU 3 不能对 v 做任何操作, 在 CPU 1 操作完成后,CPU 2 和 CPU 3 可以获取到 v 的最新值。

从这个角度看,我们可以把 sync/atomic 包中的原子操作看成是变量级别的互斥锁。 就是说,在 go 中,当一个协程对变量做原子操作时,其他协程不能对这个变量做任何操作,直到这个协程操作完成。

原子操作的使用场景是什么?

拿一个简单的例子来说明一下原子操作的使用场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func TestAtomic(t *testing.T) {
var sum = 0
var wg sync.WaitGroup
wg.Add(1000)

// 启动 1000 个协程,每个协程对 sum 做加法操作
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
sum++
}()
}

// 等待所有的协程都执行完毕
wg.Wait()
fmt.Println(sum) // 这里输出多少呢?
}

我们可以在自己的电脑上运行一下这段代码,看看输出的结果是多少。 不出意外的话,应该每次可能都不一样,而且应该也不是 1000,这是为什么呢?

这是因为,CPU 在对 sum 做加法的时候,需要先将 sum 目前的值读取到 CPU 的寄存器中,然后再进行加法操作,最后再写回到内存中。 如果有两个 CPU 同时取了 sum 的值,然后都进行了加法操作,然后都再写回到内存中,那么就会导致 sum 的值被覆盖,从而导致结果不正确。

举个例子,目前内存中的 sum 为 1,然后两个 CPU 同时取了这个 1 来做加法,然后都得到了结果 2, 然后这两个 CPU 将各自的计算结果写回到内存中,那么内存中的 sum 就变成了 2,而不是 3。

在这种场景下,我们可以使用原子操作来实现并发安全的加法操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func TestAtomic1(t *testing.T) {
// 将 sum 的类型改成 int32,因为原子操作只能针对 int32、int64、uint32、uint64、uintptr 这几种类型
var sum int32 = 0
var wg sync.WaitGroup
wg.Add(1000)

// 启动 1000 个协程,每个协程对 sum 做加法操作
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
// 将 sum++ 改成下面这样
atomic.AddInt32(&sum, 1)
}()
}

wg.Wait()
fmt.Println(sum) // 输出 1000
}

在上面这个例子中,我们每次执行都能得到 1000 这个结果。

因为使用原子操作的时候,同一时刻只能有一个 CPU 对变量进行读或写,所以就不会出现上面的问题了。

所以很多需要对变量做并发读写的地方,我们都可以考虑一下,是否可以使用原子操作来实现并发安全的操作(而不是使用互斥锁,互斥锁效率相比原子操作要低一些)。

原子操作是怎么实现的?

看完上面原子操作的介绍,有没有觉得原子操作很神奇,居然有这么好用的东西。那它到底是怎么实现的呢?

一般情况下,原子操作的实现需要特殊的 CPU 指令或者系统调用。 这些指令或者系统调用可以保证在执行期间不会被其他操作或事件中断,从而保证操作的原子性。

例如,在 x86 架构的 CPU 中,可以使用 LOCK 前缀来实现原子操作。 LOCK 前缀可以与其他指令一起使用,用于锁定内存总线,防止其他 CPU 访问同一内存地址,从而实现原子操作。 在使用 LOCK 前缀的指令执行期间,CPU 会将当前处理器缓存中的数据写回到内存中,并锁定该内存地址, 防止其他 CPU 修改该地址的数据(所以原子操作总是可以读取到最新的数据)。 一旦当前 CPU 对该地址的操作完成,CPU 会释放该内存地址的锁定,其他 CPU 才能继续对该地址进行访问。

x86 LOCK 的时候发生了什么

我们再来捋一下上面的内容,看看 LOCK 前缀是如何实现原子操作的:

  1. CPU 会将当前处理器缓存中的数据写回到内存中。(因此我们总能读取到最新的数据)
  2. 然后锁定该内存地址,防止其他 CPU 修改该地址的数据。
  3. 一旦当前 CPU 对该地址的操作完成,CPU 会释放该内存地址的锁定,其他 CPU 才能继续对该地址进行访问。

其他架构的 CPU 可能会略有不同,但是原理是一样的。

原子操作有什么特征?

  1. 不会被中断:原子操作是一个不可分割的操作,要么全部执行,要么全部不执行,不会出现中间状态。这是保证原子性的基本前提。同时,原子操作过程中不会有上下文切换的过程。
  2. 操作对象是共享变量:原子操作通常是对共享变量进行的,也就是说,多个协程可以同时访问这个变量,因此需要采用原子操作来保证数据的一致性和正确性。
  3. 并发安全:原子操作是并发安全的,可以保证多个协程同时进行操作时不会出现数据竞争问题(虽然说是同时,但是实际上在操作那个变量的时候是互斥的)。
  4. 无需加锁:原子操作不需要使用互斥锁来保证数据的一致性和正确性,因此可以避免互斥锁的使用带来的性能损失。
  5. 适用场景比较局限:原子操作适用于操作单个变量,如果需要同时并发读写多个变量,可能需要考虑使用互斥锁。

go 里面有哪些原子操作?

在 go 中,主要有以下几种原子操作:AddCompareAndSwapLoadStoreSwap

增减(Add)

  1. 用于进行增加或减少的原子操作,函数名以 Add 为前缀,后缀针对特定类型的名称。
  2. 原子增被操作的类型只能是数值类型,即 int32int64uint32uint64uintptr
  3. 原子增减函数的第一个参数为原值,第二个参数是要增减多少。
  4. 方法:
1
2
3
4
5
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

int32int64 的第二个参数可以是负数,这样就可以做原子减法了。

比较并交换(CompareAndSwap)

也就是我们常见的 CAS,在 CAS 操作中,会需要拿旧的值跟 old 比较,如果相等,就将 new 赋值给 addr。 如果不相等,则不做任何操作。最后返回一个 bool 值,表示是否成功 swap

也就是说,这个操作可能是不成功的。这很正常,在并发环境下,多个协程对同一个变量进行操作,肯定会存在竞争的情况。 在这种情况下,偶尔的失败是正常的,我们只需要在失败的时候,重新尝试即可。 因为原子操作需要的时间往往是比较短的,因此在失败的时候,我们可以通过自旋的方式来再次进行尝试。

在这种情况下,如果不自旋,那就需要将这个协程挂起,等待其他协程完成操作,然后再次尝试。这个过程相比自旋可能会更加耗时。 因为很有可能这次原子操作不成功,下一次就成功了。如果我们每次都将协程挂起,那么效率就会大大降低。

for + 原子操作的方式,在 go 的 sync 包中很多地方都有使用,比如 sync.Mapsync.Pool 等。 这也是使用原子操作时一个非常常见的使用模式。

CompareAndSwap 的功能:

  1. 用于比较并交换的原子操作,函数名以 CompareAndSwap 为前缀,后缀针对特定类型的名称。
  2. 原子比较并交换被操作的类型可以是数值类型或指针类型,即 int32int64uint32uint64uintptrunsafe.Pointer
  3. 原子比较并交换函数的第一个参数为原值指针,第二个参数是要比较的值,第三个参数是要交换的值。
  4. 方法:
1
2
3
4
5
6
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

载入(Load)

原子性的读取操作接受一个对应类型的指针值,返回该指针指向的值。原子性读取意味着读取值的同时,当前计算机的任何 CPU 都不会进行针对值的读写操作。

如果不使用原子 Load,当使用 v := value 这种赋值方式为变量 v 赋值时,读取到的 value 可能不是最新的,因为在读取操作时其他协程对它的读写操作可能会同时发生。

Load 操作有下面这些:

1
2
3
4
5
6
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

存储(Store)

Store 可以将 val 值保存到 *addr 中,Store 操作是原子性的,因此在执行 Store 操作时,当前计算机的任何 CPU 都不会进行针对 *addr 的读写操作。

  1. 原子性存储会将 val 值保存到 *addr 中。
  2. 与读操作对应的写入操作,sync/atomic 提供了与原子值载入 Load 函数相对应的原子值存储 Store 函数,原子性存储函数均以 Store 为前缀。

Store 操作有下面这些:

1
2
3
4
5
6
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintpre, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

交换(Swap)

SwapStore 有点类似,但是它会返回 *addr 的旧值。

1
2
3
4
5
6
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

原子操作的使用场景是什么?

文章开头的地方,我们已经说了,原子操作本质上是一种变量级别的互斥锁。 因此,原子操作的使用场景也是和互斥锁类似的,但是不一样的是,我们的锁粒度只是一个变量而已。 也就是说,当我们不允许多个 CPU 同时对变量进行读写的时候(保证变量同一时刻只能一个 CPU 操作),就可以使用原子操作。

原子操作任意类型的值 - atomic.Value

从上一节中,我们知道了在 go 中原子操作可以操作 int32int64uint32uint64uintptrunsafe.Pointer 这些类型的值。 但是在实际开发中,我们的类型还有很多,比如 stringstruct 等等,那这些类型的值如何进行原子操作呢?答案是使用 atomic.Value

atomic.Value 是一个结构体,它的内部有一个 any 类型的字段,存储了我们要原子操作的值,也就是一个任意类型的值。

atomic.Value 支持以下操作:

  • Load:原子性的读取 Value 中的值。
  • Store:原子性的存储一个值到 Value 中。
  • Swap:原子性的交换 Value 中的值,返回旧值。
  • CompareAndSwap:原子性的比较并交换 Value 中的值,如果旧值和 old 相等,则将 new 存入 Value 中,返回 true,否则返回 false

atomic.Value 的这些操作跟上面讲到的那些操作其实差不多,只不过 atomic.Value 可以操作任意类型的值。 那 atomic.Value 是如何实现的呢?

atomic.Value 源码分析

atomic.Value 是一个结构体,这个结构体只有一个字段:

1
2
3
4
// Value 提供一致类型值的原子加载和存储。
type Value struct {
v any
}

Load - 读取

Load 返回由最近的 Store 设置的值。如果还没有 Store 过任何值,则返回 nil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Load 返回由最近的 Store 设置的值。
func (v *Value) Load() (val any) {
// atomic.Value 转换为 efaceWords
vp := (*efaceWords)(unsafe.Pointer(v))

// 判断 atomic.Value 的类型
typ := LoadPointer(&vp.typ)
// 第一次 Store 还没有完成,直接返回 nil
if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {
// firstStoreInProgress 是一个特殊的变量,存储到 typ 中用来表示第一次 Store 还没有完成
return nil
}

// 获取 atomic.Value 的值
data := LoadPointer(&vp.data)
// 将 val 转换为 efaceWords 类型
vlp := (*efaceWords)(unsafe.Pointer(&val))
// 分别赋值给 val 的 typ 和 data
vlp.typ = typ
vlp.data = data
return
}

atomic.Value 的源码中,我们都可以看到 efaceWords 的身影,它实际上代表的是 interface{}/any 类型:

1
2
3
4
5
// 表示一个 interface{}/any 类型
type efaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}

看到这里我们会不会觉得很困惑,直接返回 val 不就可以了吗?为什么要将 val 转换为 efaceWords 类型呢?

这是因为 go 中的原子操作只能操作 int32int64uint32uint64uintptrunsafe.Pointer 这些类型的值, 不支持 interface{} 类型,但是如果了解 interface{} 底层结构的话,我们就知道 interface{} 底层其实就是一个结构体, 它有两个字段,一个是 type,一个是 datatype 用来存储 interface{} 的类型,data 用来存储 interface{} 的值。 而且这两个字段都是 unsafe.Pointer 类型的,所以其实我们可以对 interface{}typedata 分别进行原子操作, 这样最终其实也可以达到了原子操作 interface{} 的目的了,是不是非常地巧妙呢?

Store - 存储

StoreValue 的值设置为 val。对给定值的所有存储调用必须使用相同具体类型的值。不一致类型的存储会发生恐慌,Store(nil) 也会 panic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Store 将 Value 的值设置为 val。
func (v *Value) Store(val any) {
// 不能存储 nil 值
if val == nil {
panic("sync/atomic: store of nil value into Value")
}
// atomic.Value 转换为 efaceWords
vp := (*efaceWords)(unsafe.Pointer(v))
// val 转换为 efaceWords
vlp := (*efaceWords)(unsafe.Pointer(&val))

// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
for {
// LoadPointer 可以保证获取到的是最新的
typ := LoadPointer(&vp.typ)
// 第一次 store 的时候 typ 还是 nil,说明是第一次 store
if typ == nil {
// 尝试开始第一次 Store。
// 禁用抢占,以便其他 goroutines 可以自旋等待完成。
// (如果允许抢占,那么其他 goroutine 自旋等待的时间可能会比较长,因为可能会需要进行协程调度。)
runtime_procPin()
// 抢占失败,意味着有其他 goroutine 成功 store 了,允许抢占,再次尝试 Store
// 这也是一个原子操作。
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
runtime_procUnpin()
continue
}
// 完成第一次 store
// 因为有 firstStoreInProgress 标识的保护,所以下面的两个原子操作是安全的。
StorePointer(&vp.data, vlp.data) // 存储值(原子操作)
StorePointer(&vp.typ, vlp.typ) // 存储类型(原子操作)
runtime_procUnpin() // 允许抢占
return
}

// 另外一个 goroutine 正在进行第一次 Store。自旋等待。
if typ == unsafe.Pointer(&firstStoreInProgress) {
continue
}

// 第一次 Store 已经完成了,下面不是第一次 Store 了。
// 需要检查当前 Store 的类型跟第一次 Store 的类型是否一致,不一致就 panic。
if typ != vlp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}

// 后续的 Store 只需要 Store 值部分就可以了。
// 因为 atomic.Value 只能保存一种类型的值。
StorePointer(&vp.data, vlp.data)
return
}
}

Store 中,有以下几个注意的点:

  1. 使用 firstStoreInProgress 来确保第一次 Store 的时候,只有一个 goroutine 可以进行 Store 操作,其他的 goroutine 需要自旋等待。如果没有这个保护,那么存储 typdata 的时候就会出现竞争(因为需要两个原子操作),导致数据不一致。在这里其实可以将 firstStoreInProgress 看作是一个互斥锁。
  2. 在进行第一次 Store 的时候,会将当前的 goroutine 和 P 绑定,这样拿到 firstStoreInProgress 锁的协程就可以尽快地完成第一次 Store 操作,这样一来,其他的协程也不用等待太久。
  3. 在第一次 Store 的时候,会有两个原子操作,分别存储类型和值,但是因为有 firstStoreInProgress 的保护,所以这两个原子操作本质上是对 interface{} 的一个原子存储操作。
  4. 其他协程在看到有 firstStoreInProgress 标识的时候,就会自旋等待,直到第一次 Store 完成。
  5. 在后续的 Store 操作中,只需要存储值就可以了,因为 atomic.Value 只能保存一种类型的值。

Swap - 交换

SwapValue 的值设置为 new 并返回旧值。对给定值的所有交换调用必须使用相同具体类型的值。同时,不一致类型的交换会发生恐慌,Swap(nil) 也会 panic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// Swap 将 Value 的值设置为 new 并返回旧值。
func (v *Value) Swap(new any) (old any) {
// 不能存储 nil 值
if new == nil {
panic("sync/atomic: swap of nil value into Value")
}

// atomic.Value 转换为 efaceWords
vp := (*efaceWords)(unsafe.Pointer(v))
// new 转换为 efaceWords
np := (*efaceWords)(unsafe.Pointer(&new))

// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
for {
// 下面这部分代码跟 Store 一样,不细说了。
// 这部分代码是进行第一次存储的代码。
typ := LoadPointer(&vp.typ)
if typ == nil {
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
runtime_procUnpin()
continue
}
StorePointer(&vp.data, np.data)
StorePointer(&vp.typ, np.typ)
runtime_procUnpin()
return nil
}
if typ == unsafe.Pointer(&firstStoreInProgress) {
continue
}
if typ != np.typ {
panic("sync/atomic: swap of inconsistently typed value into Value")
}

// ---- 下面是 Swap 的特有逻辑 ----
// op 是返回值
op := (*efaceWords)(unsafe.Pointer(&old))
// 返回旧的值
op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data)
return old
}
}

CompareAndSwap - 比较并交换

CompareAndSwapValue 的值与 old 比较,如果相等则设置为 new 并返回 true,否则返回 false。 对给定值的所有比较和交换调用必须使用相同具体类型的值。同时,不一致类型的比较和交换会发生恐慌,CompareAndSwap(nil, nil) 也会 panic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// CompareAndSwap 比较并交换。
func (v *Value) CompareAndSwap(old, new any) (swapped bool) {
// 注意:old 是可以为 nil 的,new 不能为 nil。
// old 是 nil 表示是第一次进行 Store 操作。
if new == nil {
panic("sync/atomic: compare and swap of nil value into Value")
}

// atomic.Value 转换为 efaceWords
vp := (*efaceWords)(unsafe.Pointer(v))
// new 转换为 efaceWords
np := (*efaceWords)(unsafe.Pointer(&new))
// old 转换为 efaceWords
op := (*efaceWords)(unsafe.Pointer(&old))

// old 和 new 类型必须一致,且不能为 nil
if op.typ != nil && np.typ != op.typ {
panic("sync/atomic: compare and swap of inconsistently typed values")
}

// 自旋进行原子操作,这个过程不会很久,开销相比互斥锁小
for {
// LoadPointer 可以保证获取到的 typ 是最新的
typ := LoadPointer(&vp.typ)
if typ == nil { // atomic.Value 是 nil,还没 Store 过
// 准备进行第一次 Store,但是传递进来的 old 不是 nil,compare 这一步就失败了。直接返回 false
if old != nil {
return false
}

// 下面这部分代码跟 Store 一样,不细说了。
// 这部分代码是进行第一次存储的代码。
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
runtime_procUnpin()
continue
}
StorePointer(&vp.data, np.data)
StorePointer(&vp.typ, np.typ)
runtime_procUnpin()
return true
}
if typ == unsafe.Pointer(&firstStoreInProgress) {
continue
}
if typ != np.typ {
panic("sync/atomic: compare and swap of inconsistently typed value into Value")
}

// 通过运行时相等性检查比较旧版本和当前版本。
// 这允许对值类型进行比较,这是包函数所没有的。
// 下面的 CompareAndSwapPointer 仅确保 vp.data 自 LoadPointer 以来没有更改。
data := LoadPointer(&vp.data)
var i any
(*efaceWords)(unsafe.Pointer(&i)).typ = typ
(*efaceWords)(unsafe.Pointer(&i)).data = data
if i != old { // atomic.Value 跟 old 不相等
return false
}
// 只做 val 部分的 cas 操作
return CompareAndSwapPointer(&vp.data, data, np.data)
}
}

这里需要特别说明的只有最后那个比较相等的判断,也就是 data := LoadPointer(&vp.data) 以及往后的几行代码。 在开发 atomic.Value 第一版的时候,那个开发者其实是将这几行写成 CompareAndSwapPointer(&vp.data, old.data, np.data) 这种形式的。 但是在旧的写法中,会存在一个问题,如果我们做 CAS 操作的时候,如果传递的参数 old 是一个结构体的值这种类型,那么这个结构体的值是会被拷贝一份的, 同时再会被转换为 interface{}/any 类型,这个过程中,其实参数的 olddata 部分指针指向的内存跟 vp.data 指向的内存是不一样的。 这样的话,CAS 操作就会失败,这个时候就会返回 false,但是我们本意是要比较它的值,出现这种结果显然不是我们想要的。

将值作为 interface{} 参数使用的时候,会存在一个将值转换为 interface{} 的过程。具体我们可以看看 interface{} 的实现原理。

所以,在上面的实现中,会将旧值的 typdata 赋值给一个 any 类型的变量, 然后使用 i != old 这种方式进行判断,这样就可以实现在比较的时候,比较的是值,而不是由值转换为 interface{} 后的指针。

其他原子类型

我们现在知道了,atomic.Value 可以对任意类型做原子操作。 而对于其他的原子类型,比如 int32int64uint32uint64uintptrunsafe.Pointer 等, 其实在 go 中也提供了包装的类型,让我们可以以对象的方式来操作这些类型。

对应的类型如下:

  • atomic.Bool:这个比较特别,但底层实际上是一个 uint32 类型的值。我们对 atomic.Bool 做原子操作的时候,实际上是对 uint32 做原子操作。
  • atomic.Int32int32 类型的包装类型
  • atomic.Int64int64 类型的包装类型
  • atomic.Uint32uint32 类型的包装类型
  • atomic.Uint64uint64 类型的包装类型
  • atomic.Uintptruintptr 类型的包装类型
  • atomic.Pointerunsafe.Pointer 类型的包装类型

这几种类型的实现的代码基本一样,除了类型不一样,我们可以看看 atomic.Int32 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// An Int32 is an atomic int32. The zero value is zero.
type Int32 struct {
_ noCopy
v int32
}

// Load atomically loads and returns the value stored in x.
func (x *Int32) Load() int32 { return LoadInt32(&x.v) }

// Store atomically stores val into x.
func (x *Int32) Store(val int32) { StoreInt32(&x.v, val) }

// Swap atomically stores new into x and returns the previous value.
func (x *Int32) Swap(new int32) (old int32) { return SwapInt32(&x.v, new) }

// CompareAndSwap executes the compare-and-swap operation for x.
func (x *Int32) CompareAndSwap(old, new int32) (swapped bool) {
return CompareAndSwapInt32(&x.v, old, new)
}

可以看到,atomic.Int32 的实现都是基于 atomic 包中 int32 类型相关的原子操作函数来实现的。

原子操作与互斥锁比较

那我们有了互斥锁,为什么还要有原子操作呢?我们进行比较一下就知道了:

原子操作 互斥锁
保护的范围 变量 代码块
保护的粒度
性能
如何实现的 硬件指令 软件层面实现,逻辑较多

如果我们只需要对某一个变量做并发读写,那么使用原子操作就可以了,因为原子操作的性能比互斥锁高很多。 但是如果我们需要对多个变量做并发读写,那么就需要用到互斥锁了,这种场景往往是在一段代码中对不同变量做读写。

性能比较

我们前面这个表格提到了原子操作与互斥锁性能上有差异,我们写几行代码来进行比较一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 系统信息 cpu: Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz
// 10.13 ns/op
func BenchmarkMutex(b *testing.B) {
var mu sync.Mutex

for i := 0; i < b.N; i++ {
mu.Lock()
mu.Unlock()
}
}

// 5.849 ns/op
func BenchmarkAtomic(b *testing.B) {
var sum atomic.Uint64

for i := 0; i < b.N; i++ {
sum.Add(uint64(1))
}
}

在对 Mutex 的性能测试中,我只是写了简单的 Lock()UnLock() 操作,因为这种比较才算是对 Mutex 本身的测试,而在 Atomic 的性能测试中,对 sum 做原子累加的操作。最终结果是,使用 Atomic 的操作耗时大概比 Mutex 少了 40% 以上。

在实际开发中,Mutex 保护的临界区内往往有更多操作,也就意味着 Mutex 锁需要耗费更长的时间才能释放,也就是会需要耗费比上面这个 40% 还要多的时间另外一个协程才能获取到 Mutex 锁。

go 的 sync 包中的原子操作

在文章的开头,我们就说了,在 go 的 sync.Mapsync.Pool 中都有用到了原子操作,本节就来看一看这些操作。

sync.Map 中的原子操作

sync.Map 中使用到了一个 entry 结构体,这个结构体中大部分操作都是原子操作,我们可以看看它下面这两个方法的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 删除 entry
func (e *entry) delete() (value any, ok bool) {
for {
p := e.p.Load()
// 已经被删除了,不需要再删除
if p == nil || p == expunged {
return nil, false
}
// 删除成功
if e.p.CompareAndSwap(p, nil) {
return *p, true
}
}
}

// 如果条目尚未删除,trySwap 将交换一个值。
func (e *entry) trySwap(i *any) (*any, bool) {
for {
p := e.p.Load()
// 已经被删除了
if p == expunged {
return nil, false
}
// swap 成功
if e.p.CompareAndSwap(p, i) {
return p, true
}
}
}

我们可以看到一个非常典型的特征就是 for + CompareAndSwap 的组合,这个组合在 entry 中出现了很多次。

如果我们也需要对变量做并发读写,也可以尝试一下这种 for + CompareAndSwap 的组合。

sync.WaitGroup 中的原子操作

sync.WaitGroup 中有一个类型为 atomic.Uint64state 字段,这个变量是用来记录 WaitGroup 的状态的。 在实际使用中,它的高 32 位用来记录 WaitGroup 的计数器,低 32 位用来记录 WaitGroupWaiter 的数量,也就是等待条件变量满足的协程数量。

如果不使用一个变量来记录这两个值,那么我们就需要使用两个变量来记录,这样就会导致我们需要对两个变量做并发读写, 在这种情况下,我们就需要使用互斥锁来保护这两个变量,这样就会导致性能的下降。

而使用一个变量来记录这两个值,我们就可以使用原子操作来保护这个变量,这样就可以保证并发读写的安全性,同时也能得到更好的性能:

1
2
3
4
5
6
// WaitGroup 的 Add 函数:高 32 位加上 delta
state := wg.state.Add(uint64(delta) << 32)

// WaitGroup 的 Wait 函数:低 32 位加 1
// 等待者的数量加 1
wg.state.CompareAndSwap(state, state+1)

CAS 操作有失败必然有成功

当然这里是指指向同一行 CAS 代码的时候(也就是有竞争的时候),如果是指向不同行 CAS 代码的时候,那么就不一定了。 比如下面这个例子,我们把前面计算 sum 的例子改一改,改成用 CAS 操作来完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func TestCas(t *testing.T) {
var sum int32 = 0
var wg sync.WaitGroup
wg.Add(1000)

for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
// 这一行是有可能会失败的
atomic.CompareAndSwapInt32(&sum, sum, sum+1)
}()
}

wg.Wait()
fmt.Println(sum) // 不是 1000
}

在这个例子中,我们把 atomic.AddInt32(&sum, 1) 改成了 atomic.CompareAndSwapInt32(&sum, sum, sum+1), 这样就会导致有可能会有多个 goroutine 同时执行到 atomic.CompareAndSwapInt32(&sum, sum, sum+1) 这一行代码, 这样肯定会有不同的 goroutine 同时拿到一个相同的 sum 的旧值,那么在这种情况下,就会导致 CAS 操作失败。 也就是说,将 sum 替换为 sum + 1 的操作可能会失败。

失败意味着什么呢?意味着另外一个协程序先把 sum 的值加 1 了,这个时候其实我们不应该在旧的 sum 上加 1 了, 而是应该在最新的 sum 上加上 1,那我们应该怎么做呢?我们可以在 CAS 操作失败的时候,重新获取 sum 的值, 然后再次尝试 CAS 操作,直到成功为止:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func TestCas(t *testing.T) {
var sum int32 = 0
var wg sync.WaitGroup
wg.Add(1000)

for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()

// cas 失败的时候,重新获取 sum 的值进行计算。
// cas 成功则返回。
for {
if atomic.CompareAndSwapInt32(&sum, sum, sum+1) {
return
}
}
}()
}

wg.Wait()
fmt.Println(sum)
}

总结

原子操作是并发编程中非常重要的一个概念,它可以保证并发读写的安全性,同时也能得到更好的性能。

最后,总结一下本文讲到的内容:

  • 原子操作是更加底层的操作,它保护的是单个变量,而互斥锁可以保护一个代码片段,它们的使用场景是不一样的。
  • 原子操作需要通过 CPU 指令来实现,而互斥锁是在软件层面实现的。
  • go 里面的原子操作有以下这些:
    • Add:原子增减
    • CompareAndSwap:原子比较并交换
    • Load:原子读取
    • Store:原子写入
    • Swap:原子交换
  • go 里面所有类型都能使用原子操作,只是不同类型的原子操作使用的函数不太一样。
  • atomic.Value 可以用来原子操作任意类型的变量。
  • go 里面有些底层实现也使用了原子操作,比如:
    • sync.WaitGroup:使用原子操作来保证计数器和等待者数量的并发读写安全性。
    • sync.Mapentry 结构体中基本所有操作都有原子操作的身影。
  • 原子操作有失败必然有成功(说的是同一行 CAS 操作),如果 CAS 操作失败了,那么我们可以重新获取旧值,然后再次尝试 CAS 操作,直到成功为止。

总的来说,原子操作本身其实没有太复杂的逻辑,我们理解了它的原理之后,就可以很容易的使用它了。

本文基于 Go 1.19

Pool 是一组可以安全在多个 goroutine 间共享的临时对象的集合。 存储在 Pool 中的任何项目都可能在任何时候被移除,因此 Pool 不适合保存那些有状态的对象,如数据库连接、TCP 连接等。 Pool 的目的是缓存已分配但未使用的项以供以后使用,从而减少垃圾收集器的压力。 也就是说,它可以轻松构建高效、线程安全的空闲列表,但是,它并不适用于所有空闲列表。

使用实例

下面以几个实际的例子来说明 Pool 的一些使用场景。 下面是两个非常典型的使用场景,但是在实际使用中,对于那些需要频繁创建和销毁的对象的场景,我们都可以考虑使用 Pool

gin 里面 Context 对象使用 Pool 保存

ginEngine 结构体中的 ServeHTTP 方法中,可以看到 Context 对象是从 Pool 中获取的。 然后在处理完请求之后,将 Context 对象放回 Pool 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// 从 Pool 中获取 Context
c := engine.pool.Get().(*Context)
// 重置 writermem
c.writermem.reset(w)
// 重置 Request
c.Request = req
// 重置其他字段
c.reset()

engine.handleHTTPRequest(c)

// 将 Context 对象放回 Pool
engine.pool.Put(c)
}

我们可以去看看 ginContext 对象的定义,我们会发现,它其中有很多字段。 设想一下,如果每一个请求都创建一个 Context 对象,那么每一个请求都要对 Context 进行内存分配, 分配了之后,如果请求结束了,这些申请的内存在后续就要被回收掉(当然,不是马上就回收)。

这样一来,如果待回收的 Context 对象很多,那么垃圾回收器就会被压力很大。

同样的做法在 echo 这个框架中也有出现。

fmt 里面的 pp 结构体

在我们使用 fmt 包来打印的时候(比如,调用 fmt.Fprintf),其实底层是要使用一个名为 pp 的结构体来进行打印的。 如果我们的系统中需要大量地使用 fmt 库来做格式化字符串的操作,如果每次进行格式化操作的时候都 new 一个 pp 对象, 那么也会在某个时刻导致垃圾回收器的压力很大。

所以,fmt 包中使用 pp 的时候,都是从 Pool 中获取的:

1
2
3
4
5
6
7
8
9
10
// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() *pp {
// 从 Pool 中获取 pp 对象
p := ppFree.Get().(*pp)
p.panicking = false
p.erroring = false
p.wrapErrs = false
p.fmt.init(&p.buf)
return p
}

上面这个方法是获取 pp 对象的方法。我们在这里没有看到重置 pp 字段的代码,因为这些操作在 ppfree 方法中了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// free saves used pp structs in ppFree; avoids an allocation per invocation.
func (p *pp) free() {
if cap(p.buf) > 64<<10 {
return
}

// 重置 p.buf
p.buf = p.buf[:0]
p.arg = nil
p.value = reflect.Value{}
p.wrappedErr = nil
// 将 pp 放回 Pool 中
ppFree.Put(p)
}

我们不能对 sync.Pool 有任何假设,比如,不要想着 Put 进去的对象带了一个状态,然后 Get 出来的时候就能拿到这个状态。 因为这个对象可能在任何时候被清除掉。

从上面这个例子中,我们可以看到 p.buf = p.buf[:0] 这一行对 buf 进行了重置, 这给我我们的启示是,我们在使用 sync.Pool 的时候,如果我们存取的对象会带有一些不同的字段值, 那么我们可能需要对这些字段进行重置后再使用,这样就可以避免 Get 到的对象带有之前的一些状态, 不过重置这些字段的开销相比分配新的内存以及后续 GC,其实开销可以忽略不计。

Pool 整体模型

Pool 本质上是一个双端队列,这个队列支持以下操作:

  • pushHead:将一个对象放到队列的头部,这也是唯一的入队操作。
  • popHead:从队列的头部取出一个对象。如果取不到则使用 New 方法创建一个新的对象。
  • popTail:从队列的尾部取出一个对象。如果取不到则使用 New 方法创建一个新的对象。

可以简单地用下图表示:

pool_1

在阅读本文过程中,想不清楚的时候,回想一下这个模型,可能会有所帮助。

当然,在实际的实现中,比这个复杂多了,但是这个模型已经足够我们理解 Pool 的工作原理了。

Pool 的双端队列是使用数组还是链表

我们知道,队列的存储通常有两种方式,一种是数组,一种是链表,两者的优缺点如下:

优点 缺点
数组 存储空间连续,可以根据下标快速访问 大小固定,扩容成本高
链表 大小不固定,可以很容易增加新的元素 访问效率不如数组。需要额外的空间来存储前后元素的指针

那么 Pool 里面用的是数组还是链表呢?答案是:数组 + 链表

Pool 为什么要用数组 + 链表

为了快速访问队列中的元素,使用数组是最好的选择,但是数组的大小是固定的,如果队列中的元素很多,那么数组就会很快被填满。 如果我们还想继续往队列中添加元素,那么就需要对数组进行扩容,这个成本是很高的(因为本来就是需要频繁分配/销毁对象的场景才会使用 Pool)。 同时,我们知道 Pool 设计的目的就是为了减少频繁内存分配带来的性能问题,如果在使用 Pool 的过程中频繁对其进行扩容,那么就违背了 Pool 设计的初衷了。

为了解决数组扩容的问题,我们可以考虑一下使用链表。在我们 Put 的时候往链表的头部添加一个元素,然后 Get 的时候从链表的尾部取出一个元素(还需要移除)。 但是这样我们就需要一个结构体来表示我们的节点了,那么问题来了,我们又需要频繁地分配/销毁这个结构体,这样就又回到了最开始的问题了(频繁创建/销毁对象)。 所以,只使用链表也不是一个好的选择。

所以,Pool 采用了 数组 + 链表 的方式来实现双端队列,它们的关系如下:

  • 数组:存储队列中的元素,数组的大小是固定的。
  • 链表:当一个数组存储满了之后,就会新建一个数组,然后通过链表将这两个数组串联起来。

最终,Pool 的双端队列的结构如下:

pool_2

数组如何实现队列

我们知道,数组的存储空间是固定的一块连续的内存,所以我们可以通过下标来访问数组中的元素。 我们 push 的时候,将 head 下标 +1,然后 pop 的时候,也要修改对应的下标, 但是这样会导致一个问题是,早晚 head 会超出数组的下标范围,但这个时候数组可能还有很多空间, 因为在我们 push 的时候,可能也同时在 pop,所以数组中的空间可能还有很多。

所以,就可以在 head 超出数组下标范围的时候,将 head 对数组长度取模,这样就可以循环使用数组了:

pool_3

不过对于这种情况,使用下面的图更加直观:

pool_4

Pool 里面是使用 poolDequeue 这个结构体来表示上图这个队列的,本身是一个数组,但是是当作环形队列使用的。

poolChain 的最终模型

poolChain 就是上面说的 数组 + 链表 的组合,它的最终模型如下:

pool_5

说明:

  • poolChain 本身是一个双向链表,每个节点都是一个 poolDequeue,每个节点都有 prevnext 指针指向前后节点。上图的 head 是头节点,tail 是尾节点。
  • poolDequeue 是一个数组,它的大小是固定的,但是它是当作环形队列使用的。
  • tail 初始化的时候长度为 8,具体可参考 poolChainpushHead 方法,后面会说到。
  • pushHead 的时候,如果 head 中的环形队列已经满了,那么就会新建一个 poolDequeue,然后将它插入到 head 的前面。(新的 head 节点的大小为前一个 head 节点的两倍)
  • popTailpopHead 的时候,如果 tail 或者 head 中的环形队列(poolDequeue)已经空了,那么就会将它从链表中移除。

多个 P 的情况下的 poolChain

这里假设 P 跟我们机器的逻辑处理器的数量一致。(这里涉及到了 goroutine 的调度机制,不了解可以先了解一下再回来看。)

我们知道,在 go 中,每一个 goroutine 都会绑定一个 P,这样才可以充分利用多核的优势。 设想一下,如果我们有多个 goroutine 同时存储一个 Pool,会出现什么情况呢? 会导致很激烈的数据竞争,虽然没有使用 Mutex 这种相对低效的互斥锁来解决竞争问题,使用的是原子操作,但是也会导致性能下降。

所以,在 Pool 的实现中,会为每一个 P 都创建一个 poolChain,每次存取,先操作本地 P 绑定的 poolChain,这样就可以减少多个 goroutine 同时操作一个 Pool 的竞争问题了。

所以,最终的 Pool 的模型会长成下面这个样子,每个 G 关联了一个 poolChain

pool_6

注意:这里不是 Pool 实际的样子,只是为了说明 Pool 的实现原理。

最终实现中的 Pool

在实际的实现中,其实会跟上一个图有一些差异,可以说复杂很多:

pool_7

从上图可以看出,其实每个 P 关联的并不是 poolChain,而是 poolLocalpoolLocal 里面包含了一个 poolLocalInternal 和一个 padpad 是为了避免伪共享而添加的。而 poolLocalInternal 就是实际上 Pool 中存储数据的一个结构体。poolLocalInternal 中包含了两个字段,privatesharedshared 就是我们上面说的 poolChain,而 private 是一个 any 类型的字段,在我们调用 PoolPut 方法的时候,会先尝试将数据存储到 private 中,如果 private 中已经有数据了,那么就会将数据存储到 shared 中。同样的,在 Get 的时候,也会先从 private 中获取数据,如果 private 中没有数据,那么就会从 shared 中获取数据。

而相应的,Pool 存取数据会变成:

  • pushHead:我们调用 PoolPut 方法的时候,只会写入到本地 P 关联的那个 poolLocal 中。
  • popHead:这个方法也只能从本地 P 关联的 poolLocal 中取数据。
  • popTail:这个方法会从本地 P 关联的 poolLocal 中取数据,如果取不到,那么就会从其他 P 关联的 poolLocal 中取数据。

这样就可以减少多个 goroutine 同时操作一个 Pool 的竞争问题了(但是无法避免)。

Pool 模型总结

最后,我们将这一节的内容总结一下,可以得到下面这个图:

pool_8

说明:

  • go 进程内会有多个 P,每个 P 都会关联一个 poolLocal
  • poolLocal 中包含了一个 poolLocalInternal 和一个 padpoolLocalInternal 中包含了两个字段,privateshared
  • private 是一个 any 类型的字段,用来存储我们调用 PoolPut 方法的时候传入的数据,Get 的时候如果 private 中有数据,那么就会直接返回 private 中的数据。
  • shared 是一个 poolChain,用来存储我们调用 PoolPut 方法的时候传入的数据,Get 的时候如果当前 P 绑定的 poolLocal 内是空的,那么可以从其他 P 绑定的 shared 的尾部获取。
  • poolChain 是一个双向链表,每个节点都是一个 poolDequeue,每个节点都有 prevnext 指针指向前后节点。上图的 head 是头节点,tail 是尾节点。
  • poolDequeue 是一个数组,它的大小是固定的,但是它是当作环形队列使用的。
  • pushHead 的时候,如果 poolChain 的节点满了,那么会新建一个节点,其容量为前一个节点的两倍。
  • poolChain 支持三种操作:pushHeadpopHeadpopTail
  • pushHead 会将数据存储到当前 P 关联的 poolChain 的头部。

Pool 中的结构体

在开始分析源码之前,我们先来看一下 Pool 的 UML 图:

pool_9

上图中已经包含 Pool 实现的所有关键结构体了,下面我们来分析一下这些结构体的作用。

sync.Pool 结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Pool struct {
// noCopy 用于防止 Pool 被复制(可以使用 go vet 检测)
noCopy noCopy

// local 的主要作用是,多个 goroutine 同时访问 Pool 时,可以减少竞争,提升性能。
// 实际类型是 [P]poolLocal。长度是 localSize。
local unsafe.Pointer
// []poolLocal 的长度。也就是 local 切片的长度
localSize uintptr

// 存放的是上一轮 GC 时的 local 字段的值。
victim unsafe.Pointer
// victim 数组的长度
victimSize uintptr

// 新建对象的方法。
// Get 的时候如果 Pool 中没有对象可用,会调用这个方法来新建一个对象。
New func() any
}

字段说明:

  • local:就是我们上文说到的 poolLocal 类型的切片,长度是 runtime.GOMAXPROCS(0),也就是当前 P 的数量。之所以使用切片类型是因为我们可以在运行的过程中调整 P 的数量,所以它的长度并不是固定的,如果 P 的数量变了,poolLocal 也会跟着改变。
  • localSizelocal 的长度。
  • victim:上一轮 GC 时的 local 字段的值。
  • victimSizevictim 的长度。
  • New:新建对象的方法。

victim 的作用是在 GC 的时候,将 local 的值赋值给 victim,然后将 local 置为 nil,这样就可以避免在 GC 的时候,local 中的对象被回收掉。 当然,并不是完全不会回收,再经历一次 GC 之后,victim 中的对象就会被回收掉。这样做的好处是,可以避免 GC 的时候清除 Pool 中的所有对象, 这样在 GC 之后如果需要大量地从 Pool 中获取对象也不至于产生瞬时的性能抖动

victim cache 是计算机科学中的一个术语,victim cache 是位于 cpu cache 和主存之间的又一级 cache,用于存放由于失效而被丢弃(替换)的那些块。 每当失效发生时,在访问主存之前,victim cache 都会被检查,如果命中,就不会访问主存。

Pool 获取对象的流程

最终,当我们调用 PoolGet 方法的时候,会按下图的流程来获取对象:

pool_10

说明:

  1. 首先会从当前 P 关联的 poolLocal 中的 private 字段中获取对象,如果获取到了,那么直接返回。
  2. 如果 private 字段中没有对象,那么会从当前 P 关联的 poolLocal 中的 shared 字段中获取对象,如果获取到了,那么直接返回(这里使用的是 popHead 方法)。
  3. 尝试从其他 P 关联的 poolLocal 中的 shared 字段中获取对象,如果获取到了,那么直接返回(这里使用的是 popTail 方法)。
  4. 如果其他 P 关联的 poolLocal 中的 shared 字段中也没有对象,那么会从 victim 中获取对象,如果获取到了,那么直接返回。
  5. 如果 victim 中也没有对象,那么会调用 New 方法来创建一个新的对象(当然前提是我们创建 Pool 对象的时候设置了 New 字段)。
  6. 如果 New 字段也没有设置,那么会返回 nil

poolLocal 和 poolLocalInternal 结构体:

Pool 中,使用了 poolLocalpoolLocalInternal 两个结构体来表示实际存储数据的结构体。 当然我们也可以只使用 poolLocalInternal 这个结构体,但是为了避免伪共享,在 Pool 的实现中, 将 poolLocalInternal 放在了 poolLocal 的第一个字段,然后在 poolLocal 中添加了一个 pad 字段,用来填充 poolLocalInternal 到 cache line 的大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 实际存储 Pool 的数据的结构体
type poolLocalInternal struct {
// private 用于存储 Pool 的 Put 方法传入的数据。
// Get 的时候如果 private 不为空,那么直接返回 private 中的数据。
// 只能被当前 P 使用。
private any
// 本地 P 可以pushHead/popHead
// 任何 P 都可以 popTail。
shared poolChain
}

// Pool 中的 local 属性的元素类型。
// Pool 中的 local 是一个元素类型为 poolLocal 的切片,长度为 runtime.GOMAXPROCS(0)。
type poolLocal struct {
poolLocalInternal

// pad 用于填充 poolLocalInternal 到 cache line 的大小。
// 为了避免伪共享,将 poolLocalInternal 放在第一个字段。
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

其实这里比较关键的地方是 pad 字段的作用,我们知道,CPU 会将内存分成多个 cache line,CPU 从内存中读取数据的时候, 并不是一个字节一个字节地读取,而是一次性读取一个 cache line 的数据。但是如果我们的数据结构中的字段不是按照 cache line 的大小来排列的, 比如跨了两个 cache line,那么在读取的时候就会产生伪共享,这样就会降低性能。

pool_11

伪共享的原因是,数据跨了两个 cache line,那么在读取的时候,就会将两个 cache line 都读取到 CPU 的 cache 中, 这样有可能会导致不同 CPU 在发生数据竞争的时候,会使一些不相关的数据也会失效,从而导致性能下降。 如果对齐到 cache line,那么从内存读取数据的时候,就不会将一些不相关的数据也读取到 CPU 的 cache 中,从而避免了伪共享。

poolChain、poolChainElt 和 poolDequeue 结构体

为什么要把这三个放一起讲呢?因为这三个结构体就是 Pool 中做实际存取数据的结构体,三者作用如下:

  • poolChainpoolChain 是一个链表,每个节点都是 poolChainElt
  • poolChainElt:每个 poolChainElt 中包含了一个 poolDequeue,同时包含了指向前一个节点和后一个节点的指针。
  • poolDequeuepoolDequeue 是一个双端队列(环形队列,使用数组存储),用来存储 Pool 中的数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// poolChain 是 poolDequeue 的动态大小版本。
//
// 这是作为 poolDequeues 的双向链表队列实现的,其中每个 dequeue 都是前一个 dequeue 大小的两倍。
// 一旦 dequeue 填满,就会分配一个新的 dequeue,并且 pushHead 只会 push 到最新的 dequeue。
// pop 可以从头部或者尾部进行,一旦 dequeue 空了,它就会从 poolChain 中删除。
//
// poolChain 的实现是一个双向链表,每个 poolChainElt 都是一个 poolDequeue。
//(也就是元素是 poolDequeue 的双向链表)
type poolChain struct {
// head 是要推送到的 poolDequeue。
// 只能由生产者访问,因此不需要同步。
head *poolChainElt

// tail 是从 poolDequeue 中 pop 的节点。
// 这是由消费者访问的,因此读取和写入必须是原子的。
tail *poolChainElt
}

// poolChainElt 是 poolChain 中的元素。
type poolChainElt struct {
poolDequeue

// next 和 prev 链接到此 poolChain 中相邻的 poolChainElts。
//
// next 由生产者原子写入,由消费者原子读取。 它只从 nil 过渡到 non-nil。
// prev 由消费者原子写入,由生产者原子读取。 它只会从 non-nil 过渡到 nil。
next, prev *poolChainElt
}

// poolDequeue 是一个无锁的固定大小的单生产者、多消费者队列。
// 单个生产者既可以从头部 push 也可以从头部 pop,消费者只可以从尾部 pop。
//
// 它有一个附加功能,它会清空不使用的槽,从而避免不必要的对象保留。
type poolDequeue struct {
// headTail 将 32 位头索引和 32 位尾索引打包在一起。
// 两者都是 vals modulo len(vals)-1 的索引。
//
// tail = 队列尾的索引
// head = 下一个要填充的插槽的索引
//
// [tail, head) 范围内的槽位归消费者所有。
// 消费者继续拥有此范围之外的槽,直到它清空槽,此时所有权传递给生产者。
//
// 头索引存储在最高有效位中,以便我们可以原子地对它做 add 操作,同时溢出是无害的。
headTail uint64

// vals 是存储在此 dequeue 中的 interface{} 值的环形缓冲区。它的大小必须是 2 的幂。
// (队列存储的元素,接口类型)
//
// 如果插槽为空,则 vals[i].typ 为 nil,否则为非 nil。
// 一个插槽仍在使用中,直到 *both* 尾部索引超出它并且 typ 已设置为 nil。(both?)
// 这由消费者原子地设置为 nil,并由生产者原子地读取。
vals []eface
}

poolChain 的结构大概如下:

pool_12

poolDequeue 的结构大概如下:

pool_dequeue

poolDequeue 中,headTail 是一个 uint64 类型,它的高 32 位是 head,低 32 位是 tail。 这样一来,这样就可以使用原子操作来保证 headtail 的协程安全了。

poolDequeue 中,vals 是一个切片类型,元素类型是 efaceeface 是一个空接口类型,内存布局跟 interface{} 一样, 因此可以看作是一个 interface{} 类型。如果这里看不明白可以看看《go interface 设计与实现》

那为什么不直接使用 interface{} 类型呢?因为如果用了 interface{} 类型, 那么 poolDequeue 就无法区分保存的 valnil 还是一个空的槽。 那有什么解决办法呢?在 pushpop 的时候使用互斥锁可以解决这个问题(因为目前的实现是使用原子操作的,所以这才需要判断保存的 valnil 还是空槽), 但是这样就会导致性能下降。

Pool 源码剖析

在源码剖析的开始部分,不会深入去讲底层的存取细节,我们将其当作一个抽象的队列来看待即可,这样可能会更加便于理解。不过在讲完 Pool 的实现之后,最后还是会展开讲述这个复杂的 "队列" 的那些实现细节。

接下来,我们来看看 Pool 的源码实现。 Pool 提供的接口非常简单,只有 PutGet 两个方法,还有一个 New 字段,用来指定 Pool 中的元素是如何创建的:

  • New 属性:New 是一个函数,用来创建 Pool 中的元素。
  • Put 方法:Put 方法用来向 Pool 中放入一个元素。
  • Get 方法:Get 方法用来从 Pool 中获取一个元素。

Get

Get 方法的实现如下:

GetPool 中选择一个任意项,将其从 Pool 中移除,并将其返回给调用者。 Get 可能会选择忽略池并将其视为空的。 调用方不应假定传递给 Put 的值与 Get 返回的值之间存在任何关系。 (PutGet 之间可能会发生 GC,然后 Pool 里面的元素可能会被 GC 回收掉)

如果 Get 否则返回 nil 并且 p.New 不为 nil,则 Get 返回调用 p.New 的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Get 从 Pool 中获取一个对象
func (p *Pool) Get() any {
// ...
// pin 将当前的 goroutine 和 P 绑定,禁止被抢占,返回当前 P 的本地缓存(poolLocal)和 P 的 ID。
l, pid := p.pin()
// 先看 private 是否为 nil,如果不为 nil,就直接返回 private,并将 private 置为 nil。
x := l.private
l.private = nil
if x == nil {
// 尝试从本地 shared 的头部取。
x, _ = l.shared.popHead()
if x == nil { // 如果本地 shared 的头部取不到,就从其他 P 的 shared 的尾部取。
x = p.getSlow(pid)
}
}
// 将当前的 goroutine 和 P 解绑,允许被抢占。
runtime_procUnpin()
// ...
// 如果 x 为 nil 并且 p.New 不为 nil,则返回 p.New() 的结果。
// 没有就 New 一个。
if x == nil && p.New != nil {
x = p.New()
}
return x
}

Pool Get 的流程可以总结如下:

  1. 将当前的 goroutineP 绑定,禁止被抢占,返回当前 P 的本地缓存(poolLocal)和 PID
  2. 从本地 private 取,如果取不到,就从本地 shared 的头部取,如果取不到,就从其他 Pshared 的尾部取。获取到则返回
  3. 如果从其他的 Pshared 的尾部也获取不到,则从 victim 获取。获取到则返回
  4. 将当前的 goroutineP 解绑,允许被抢占。
  5. 如果 p.New 不为 nil,则返回 p.New 的结果。

再贴一下上面那个图(当然,下图包含了下面的 getSlow 的流程,并不只是 Get):

pool_10

Pool 中一个很关键的操作是 pin,它的作用是将当前的 goroutineP 绑定,禁止被抢占。 这样就可以保证在 GetPut 的时候,都可以获取到当前 P 的本地缓存(poolLocal), 否则,有可能在 GetPut 的时候,P 会被抢占,导致获取到的 poolLocal 不一致,这样 poolLocal 就会失去意义, 不得不再次陷入跟其他 goroutine 竞争的状态,又不得不考虑在如何在不同 goroutine 之间进行同步了。

而绑定了 P 后,在 GetPut 的时候,就可以使用原子操作来代替其他更大粒度的锁了, 但是我们也不必太担心,因为绑定 P 的时间窗口其实很小。

getSlow 源码剖析

Get 中,如果从 privateshared 中都取不到,就会调用 getSlow 方法。它的作用是:

  1. 尝试从其他 Pshared 的尾部取。
  2. 尝试从 victim 获取。

getSlow 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 从其他 P 的 shared 的尾部取。
func (p *Pool) getSlow(pid int) any {
// 获取 local 的大小和 local。
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 尝试从其他 P 的 shared 的尾部取。
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size)) // pid+i+1 的用途从下一个 P 开始取。
if x, _ := l.shared.popTail(); x != nil { // 尝试从每一个 P 的 shared 的尾部取,获取到则返回。
return x
}
}

// 尝试从 victim cache 取。
// 我们在尝试从所有主缓存中偷取之后执行此操作,
// 因为我们希望 victim cache 中的对象尽可能地老化。
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size { // 如果 pid 大于 size,会发生越界,直接返回 nil。这意味着 gomaxprocs 相比上一次 poolCleanup 的时候变大了。
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil { // victim 实际上也是一个 poolLocal 数组,每个 poolLocal 都有一个 private 字段,这个字段就是 victim cache。
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}

// 将 victim cache 标记为空,以便将来的 Get 不会再考虑它。
atomic.StoreUintptr(&p.victimSize, 0)

return nil
}

pin 源码剖析

pin 方法的实现如下:

pin 将当前 goroutine 固定到 P 上,禁用抢占并返回 poolLocal 池中对应的 poolLocal。 调用方必须在完成取值后调用 runtime_procUnpin() 来取消抢占。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 将当前的 goroutine 和 P 绑定,禁止被抢。
func (p *Pool) pin() (*poolLocal, int) {
// procPin 函数的目的是为了当前 G 绑定到 P 上。
pid := runtime_procPin() // 返回当前 P 的 id。

// 在 pinSlow 中,我们会存储 local,然后再存储 localSize,
// 这里我们以相反的顺序读取。 由于我们禁用了抢占,
// 因此 GC 不能在两者之间发生。
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s { // pid < s,说明当前 P 已经初始化过了。
return indexLocal(l, pid), pid // 返回当前 P 的 poolLocal。
}
return p.pinSlow() // 如果当前 P 没有初始化过,那么就调用 pinSlow()。
}

func (p *Pool) pinSlow() (*poolLocal, int) {
// 在互斥锁下重试。
// 在固定时无法锁定互斥锁。
runtime_procUnpin() // 解除当前 P 的绑定。
allPoolsMu.Lock() // 加全局锁。
defer allPoolsMu.Unlock() // 解锁。
pid := runtime_procPin() // 重新绑定当前 P。
// 在固定时不会调用 poolCleanup。(无法被抢占,GC 不会发生)
s := p.localSize
l := p.local
if uintptr(pid) < s { // 这其实是一个 double-checking,如果在加锁期间,其他 goroutine 已经初始化过了,就直接返回。
return indexLocal(l, pid), pid
}

// p.local == nil 说明 pool 还没有初始化过。
if p.local == nil { // 如果当前 P 没有初始化过,那么就将当前 P 添加到 allPools 中。
allPools = append(allPools, p)
}

// 当 local 数组为空,或者和当前的 runtime.GOMAXPROCS 不一致时,
// 将触发重新创建 local 数组,以和 P 的个数保持一致。
// 如果在 GC 之间更改了 GOMAXPROCS,我们将重新分配数组并丢弃旧数组。
size := runtime.GOMAXPROCS(0) // 获取当前 GOMAXPROCS(也就是 P 的个数)
local := make([]poolLocal, size) // 创建一个 poolLocal 数组
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release // 将当前 P 的 poolLocal 添加到 p.local 中
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release // 将当前 P 的 poolLocal 添加到 p.localSize 中
return &local[pid], pid // 返回当前 P 关联的 poolLocal,以及当前 P 的 id。
}

pinSlow 中比较关键的操作是,它会初始化当前 P 关联的 poolLocal,并将其添加到 allPools 中。 关于 allPools 的作用,可以看下一小节。

pinSlow 的流程: 1. 解除当前 P 的绑定。 2. 加全局 Pool 的锁。 3. 重新绑定当前 P。 4. 如果当前 Pid 小于 localSize,那么就返回当前 PpoolLocal。(典型的 double-checking) 5. 如果 local 还没初始化,那么将当前 PpoolLocal 添加到 allPools 中。 6. 初始化 local。最后返回当前 PpoolLocal

对于 local 的初始化,我们可以参考一下下图(我们需要知道的是,切片的底层结构体的第一个字段是一个数组):

pool_13

&local[0][]poolLocal 的首地址,unsafe.Pointer(&local[0]) 就是 poolLocal 数组的首地址。

indexLocal 源码剖析

我们在上面的代码中还可以看到一个 indexLocal 函数,它的作用是返回 poolLocal 数组中的第 i 个元素。

1
2
3
4
5
6
// l 指向了 poolLocal 数组的首地址,i 是数组的索引。
// 返回了数组中第 i 个元素,其类型是 poolLocal。
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}

我们之前在看 Pool 结构体的时候,看到过 local 字段,它的类型是 unsafe.Pointer,也就是一个指针。 然后我们再结合一下 indexLocal 的实现,就可以知道 local 字段指向的是一个 poolLocal 数组了。 其中 lpoolLocal 数组的首地址,i 是数组的索引,unsafe.Sizeof(poolLocal{})poolLocal 的大小。

这一小节没看懂可以参考一下我写的另外一篇文章 《深入理解 go unsafe》

allPools 的作用

allPools 的目的是在 GC 的时候,遍历所有的 Pool 对象,将其中的 victim 替换为 local,然后将 local 设置为 nil。 这样后续的 Get 操作在 local 获取不到的时候,可以从 victim 中获取。一定程度上减少了 GC 后的性能抖动。

Pool 中,还定义了下面几个全局变量:

1
2
3
4
5
6
7
8
9
10
11
var (
// 保护 allPools 和 oldPools 的互斥锁。
allPoolsMu Mutex

// allPools 是所有非空 primary cache 的 pool 的集合。
// 该集合由 allPoolsMu 和 pinning 保护,或者 STW 保护。
allPools []*Pool

// oldPools 是所有可能非空 victim cache 的 pool 的集合。
oldPools []*Pool
)

如果我们第一次看这些变量,可能会有点懵,不知道它们的作用是什么。 我们可以再结合一下 poolCleanup 函数的实现,就可以知道它们的作用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func poolCleanup() {
// 在垃圾收集开始时,这个函数会被调用。
// 它不能分配并且可能不应该调用任何运行时函数。

// 从 allPools 中删除 victim 缓存。
for _, p := range oldPools {
p.victim = nil // 作用是让 GC 可以回收 victim 缓存中的对象。
p.victimSize = 0
}

// 将 primary 缓存移动到 victim 缓存。
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}

// 具有非空主缓存的池现在具有非空 victim 缓存,并且没有池具有 primary 缓存。
oldPools, allPools = allPools, nil
}

poolCleanup 这个函数会在 GC 开始的时候被调用,它的作用是将 local 移动到 victim 中。 同时,将 victim 置为 nil,这样 GC 就可以回收 victim 中的对象了,也就是说要经历两轮 GC local 才会真正地被回收。 也就意味着,GC 的时候,local 其实并没有被回收,而是被移动到了 victim 中。

poolCleanup 可以用下图表示,实际上就是使用 locallocalSize 覆盖 victimvictimSize

pool_14

Put

Put 的实现比较简单,就是将对象放到 local 中,不需要 Get 那种操作其他 P 绑定的 poolLocal 的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Put 将 x 添加到 Pool 中。
func (p *Pool) Put(x any) {
if x == nil {
return
}
// ...
// 获取 poolLocal
l, _ := p.pin() // 将当前 goroutine 与 P 绑定。获取当前 P 关联的 poolLocal,以及当前 P 的 id。
if l.private == nil { // 优先放入 private
l.private = x
} else { // 如果 private 已经有值了,就放入 shared
l.shared.pushHead(x) // 这部分其他 P 也是可以访问的。
}
runtime_procUnpin() // 解除当前 P 的绑定。
// ...
}

Pool Put 的流程: 1. 如果 Put 的值是 nil,则直接返回。 2. 将当前的 goroutineP 绑定,禁止被抢占,返回当前 P 的本地缓存(poolLocal)和 PID。 3. 如果本地 private 为空,则将 x 放入本地 private。 4. 如果本地 private 不为空,则将 x 放入本地 shared 的头部。 5. 将当前的 goroutineP 解绑,允许被抢占。

这里面的 pinruntime_procUnpin,我们在前文已经介绍过了,这里就不再赘述了。

New

这里说的 Newsync.Pool 中的 New 字段,在我们尝试了所有方法都获取不到对象的时候, 会判断 PoolNew 属性是否为 nil,如果不为 nil,则会调用 New 方法,创建一个新的对象。

poolChain 和 poolDequeue 源码剖析

poolChain 是一个双向链表,它的每一个节点的元素是 poolDequeue

在上文中,对于 GetPut 的细节,还没有具体展开,因为不了解这些细节也不影响我们理解 Pool 的整体流程。 现在是时候来看看 poolChainpoolDequeue 这两个结构体的实现了,会结合起来一起看。

poolChainpoolDequeue 里面都提供了以下三个方法:

  • pushHead:将对象放到队列的头部。
  • popHead:从队列的头部取出一个对象。
  • popTail:从队列的尾部取出一个对象。

不一样的是,poolChain 里面的方法会处理链表节点的创建和销毁,而 poolDequeue 里面的方法才是实际从队列存取对象的方法。

pushHead

poolChainpushHead 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 添加一个元素到队列头部
func (c *poolChain) pushHead(val any) {
// 链表头
d := c.head
if d == nil { // 链表为空
// 初始化链表。
// 新建 poolChainElt,然后 c 的 head 和 tail 都指向这个新建的元素。
const initSize = 8 // 初始化大小为 8
// 新建一个节点,类型为 poolChainElt
d = new(poolChainElt)
d.vals = make([]eface, initSize)
// 将 c 的 head 和 tail 都指向这个新建的元素
c.head = d
// 使用原子操作保存 c.tail,因为其他 goroutine 也可能会修改 c.tail。
storePoolChainElt(&c.tail, d)
}

// poolQueue 还没满的时候可以成功 push,pushHead 会返回 true。
// poolQueue 满的时候 pushHead 返回 false。
if d.pushHead(val) {
return
}

// 当前 dequeue 已满。分配一个两倍大小的新 dequeue。
newSize := len(d.vals) * 2
if newSize >= dequeueLimit { // 限制单个 dequeue 的最大大小
newSize = dequeueLimit
}

// 新建 poolChainElt,然后 c 的 head 指向这个新建的元素。
// 同时,d 的 next 指向这个新建的元素。
d2 := &poolChainElt{prev: d} // 因为是加到队列头,所以 prev 指向 d
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}

poolChainpushHead 方法的流程:

  1. 如果链表为空,那么就初始化链表。
  2. 如果链表不为空,那么就尝试 pushHead
  3. 如果 pushHead 失败,那么就分配一个两倍大小的新 dequeue
  4. 然后把新 dequeue 放到链表头部。(push 的时候已经锁定了 goroutineP 上,所以这一步是没有并发问题的)

poolChainpushHead 方法中,唯一需要特别注意的是 storePoolChainElt(&c.tail, d) 这一行代码。 这里使用了 storePoolChainElt 方法,而不是直接使用 c.tail = d。 这是因为 c.tail 是会和其他 goroutine 存在竞争的(其他 goroutine 获取对象的时候可能会修改 tail),因此不能直接赋值,而是使用了原子操作。

poolChainpushHead 方法的流程图如下:

pool_15

队列头来说,prev 实际上指向的是 head 的下一个元素,但是又不能叫 next,因为 next 被用来表示 tail 的下一个元素,所以就叫了 prev。我们需要知道 prevnext 本质上都是指向了下一个元素,就看你是从队列头还是队列尾来查找。

poolChain 中,其实实际存储对象的时候并不是在 poolChain,而是在 poolChain 的每一个节点中的 poolDequeue 中。 所以我们再来看看 poolDequeue 中的 pushHead 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// pushHead 在队列的头部添加 val。
// 如果队列已满,则返回 false。
// 它只能由单个生产者调用。(也就是当前 goroutine 绑定的 P)
//
// 注意:head 指向的是下一个要插入的元素的位置,所以插入的时候,先将 head 指向的位置设置为 val,然后 head++。
func (d *poolDequeue) pushHead(val any) bool {
// 读取 head 和 tail 的值。
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { // 队列满了
// 不能直接 tail == head,因为初始化的时候,head 和 tail 都是 0
return false
}
// 取模意味着,当 head 超过 len(d.vals) 时,会从头开始。也就是一个环。
slot := &d.vals[head&uint32(len(d.vals)-1)] // 获取 head 对应的槽位。

// push 只有当前协程能 push,所以不需要加锁。
// 但是 popTail 可能会在另一个协程中执行,所以需要判断当前的槽位是否被 popTail 释放了。
// 因为 popTail 的操作是先 cas 修改 headTail,然后再获取 slot 的值,最后才将 slot 置 0 的。
// 如果修改了 headTail 之后还没有来得及将 slot 置 0,那么这里就会判断出槽位还没有被释放。
typ := atomic.LoadPointer(&slot.typ) // 获取槽位的类型
if typ != nil { // 槽位不为空
// 队列依然是满的
return false
}

// 如果 typ 已经是 nil,那么这里后续的操作是安全的。

if val == nil { // put 进来的值是 nil,使用 dequeueNil 代替
val = dequeueNil(nil)
}
// 将 val 赋值给槽位
*(*any)(unsafe.Pointer(slot)) = val

// 增加 head。为什么是 1<<dequeueBits 呢?
// 因为 head 是高 32 位,所以要左移 32 位
// 本质上是:head = head + 1
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}

poolDequeuepushHead 方法的流程:

pushHead 的处理流程: 1. 判断队列是否已满,如果已满则返回 false。 2. 获取下一个 push 的位置,先判断这个位置是否可用,如果不可用则返回 false。(可能和 popTail 冲突,如果 popTail 没来得及将其中的值取出来,那么这个槽就还不能使用) 3. 如果可用,则将值放入这个位置,然后将 head 指针加 1

poolDequeue 中,我们看到有一行代码比较奇怪:atomic.LoadUint64(&d.headTail),这是为了可以原子操作存取 headtail 两个值。这样就可以避免锁的使用了。

popHead

poolChainpopHead 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// popHead 会从链表头部 pop 出一个元素。
// 返回值:
// 1. any:pop 出的元素。
// 2. bool:是否成功 pop 出元素。
func (c *poolChain) popHead() (any, bool) {
d := c.head
// d == nil 的情况:
// 1. 链表为空。
// 2. 链表只有一个元素,且这个元素已经 pop 完了。(被其他协程 pop 了)
for d != nil {
// 这是因为,在我们拿到 d 之后,还没来得及 pop 的话,其他协程可能已经 pop 了。
// 所以需要 for 循环。典型的无锁编程。
//
// poolQueue 还没空的时候可以成功 pop,popHead 会返回 true。
if val, ok := d.popHead(); ok {
return val, ok
}

// 之前的 dequeue 中可能仍有未消费的元素,因此尝试后退。
d = loadPoolChainElt(&d.prev) // 获取下一个 poolChainElt
}
return nil, false
}

prev 虽然从命名上看是前一个,但是实际上是下一个节点。从 head 开始遍历的话,prev 就是下一个节点,从 tail 开始遍历的话,next 就是下一个节点。

poolChainpopHead 的处理流程:

  1. 如果链表为空,那么就返回 false
  2. 如果链表不为空,那么就尝试 popHead
  3. 如果 popHead 失败,那么就尝试从链表下一个 dequeue popHead。(循环直到最后一个 poolChainElt

poolChainpopHead 方法的流程图如下:

pool_16

poolChain 中,实际上调用的是 poolDequeuepopHead 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// popHead 删除并返回队列头部的元素。
// 如果队列为空,则返回 false。
// 它只能由单个生产者调用。(也就是当前 goroutine 绑定的 P)
func (d *poolDequeue) popHead() (any, bool) {
// slot 用来保存从队列头部取出的值
var slot *eface
for { // 获取不到槽会继续循环,直到获取到槽或者发现队列为空为止。
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head { // 队列为空
return nil, false
}

// 先将 head 减 1,然后再获取槽位。
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// 成功获取到槽
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}

// 成功获取到 slot,将 slot 的值取出来
// head - 1 了,说明这个槽是可以被安全使用的,所以不需要加锁。
// 因为 popTail 不会影响到 head,所以不会影响到这里。
// 另外,pushHead 也没有影响,因为在实际使用中,只有一个协程会 pushHead。

val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nils
}
*slot = eface{} // 将 slot 置 0
return val, true
}

poolDequeuepopHead 的处理流程:

  1. 判断队列是否为空,如果为空则返回 false
  2. 尝试将 head 指针减 1,如果失败则进行下一轮尝试(自旋,for + 原子操作是无锁编程中很常见的写法)。
  3. head 指针对应的槽位的值取出来,然后将槽位置为 nil

popTail

poolChainpopTail 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 从队列尾取出一个元素
func (c *poolChain) popTail() (any, bool) {
// 获取链表尾部的 poolChainElt
// 如果链表为空,返回 nil,false
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}

for {
// 在我们 popTail 之前获取 next 指针很重要。
// 一般来说,d 可能暂时为空,但如果 next 在 pop 之前为非 nil,
// 并且 pop 失败,则 d 永久为空,这是唯一可以安全地将 d 从链中删除的条件。
//
// 解析:next 非 nil,但是 pop 失败:d 肯定是空的了,这个时候我们可以安全地将 d 从链表中删除。
d2 := loadPoolChainElt(&d.next)

// poolQueue 还没空的时候可以成功 pop,popTail 会返回 true。
if val, ok := d.popTail(); ok {
return val, ok
}

// 队列已经空了
if d2 == nil {
// d 是唯一的 dequeue。它现在是空的,但以后可能会有新的元素 push 进来。
return nil, false
}

// 链表的尾部已经被消费完了,所以转到下一个 dequeue。
// 尝试从链表中删除它,这样下一次 pop 就不必再次查看空的 dequeue。
// (本质:移除空的 tail 元素)
//
// 开始处理 d2,d2 是 d 的下一个 dequeue。(开始尝试从 d2 中 pop)
// cas:c.tail 由 d 变为 d2。
// 如果 cas 成功,说明 d2 是最新的 tail,d 可以被移除。
// d2 的 prev 指针设置为 nil,这样 gc 可以回收 d。(d2 没有下一个元素了)
//
// c.tail(d) 指向下一个 poolChainElt
// 同时下一个 poolChainElt 的 prev 指针(d)设置为 nil。
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// 我们赢得了竞争。清除 prev 指针,以便垃圾收集器可以收集空的 dequeue,
// 以便 popHead 的时候不做多余的查找操作。
storePoolChainElt(&d2.prev, nil)
}
// d 指向 d2,继续处理下一个 dequeue
d = d2
}
}

poolChainpopTail 的处理流程:

  1. 如果链表为空,那么就返回 false
  2. 如果链表不为空,那么就尝试 popTail
  3. 如果 popTail 失败,那么就尝试从链表上一个 dequeue popTail。(循环直到第一个 poolChainElt

poolChainpopTail 方法的流程图如下:

pool_17

popTail 的时候,如果发现 poolChainElt 已经为空了,那么就会从链表中移除它:

pool_18

poolDequeuepopTail 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// popHead 和 popTail 都有取值,然后将槽置空的过程,但是它们的实现是不一样的。
// 在 popHead 中,是直接将槽的值设置为 eface{},而在 popTail 中,
// 先将 val 设置为 nil,然后将 typ 通过原子操作设置为 nil。
// 这样在 pushHead 的时候就可以安全操作了,只要先使用原子操作判断 typ 是否为 nil 就可以了。

// popTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
//
// popTail 删除并返回队列尾部的元素。
// 如果队列为空,则返回 false。
// 它可以被任意数量的消费者调用。(如何保证并发安全?原子操作)
func (d *poolDequeue) popTail() (any, bool) {
// slot 用来保存从队列尾取出来的值
var slot *eface
// 获取队列尾部的值
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
// 队列为空,直接返回
return nil, false
}

// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
//
// 确认 head 和 tail(对于我们上面的推测性检查)并增加 tail。
// 如果成功,那么我们就拥有 tail 的插槽。
ptrs2 := d.pack(head, tail+1) // 新的 headTail
// 如果返回 false,说明从 Load 到 CompareAndSwap 期间,有其他 goroutine 修改了 headTail。
// 则需要重新 Load,再次尝试(再次执行 for 循环)。
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}

// 成功获取到 slot,将 slot 的值取出来
// 问题来了:
// 在这里 cas 成功的时候,这个 slot 实际上可能是还没有释放的,在这个时候,pushHead 其实不能写入到这个 slot 中。
// 因此,我们可以在 pushHead 的代码中看到,会先判断 slot.typ 是否为 nil,如果不为 nil,说明 slot 还没有被释放,那么就直接 return 了。
// 这种情况发生在 poolDequeue 满了的时候。

// We now own slot.
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) { // 这是什么情况?非空,但是值等于 dequeueNil(nil) ?
val = nil
}

// 取出值之后,将 slot 置 0。
// 在 poolDequeue 中,值是允许为 nil 的,但是 pool 的 Put 中判断值为 nil 的时候就直接 return 了。

// 告诉 pushHead 我们已经操作完这个插槽。
// 将插槽归零也很重要,这样我们就不会留下可能使该对象比必要时间更长的引用。
//
// 我们首先写入 val,然后通过原子写入 typ 来发布我们已完成此插槽。
slot.val = nil
atomic.StorePointer(&slot.typ, nil) // 为什么要用原子操作?因为 pushHead 也会读取这个值,所以需要保证读取的是最新的值。

// 此时 pushHead 可以操作这个槽了。

return val, true
}

poolDequeuepopTail 的处理流程:

  1. 判断队列是否为空,如果为空则返回 false
  2. 尝试将 tail 指针加 1,如果失败则进行下一轮尝试(自旋)。
  3. tail 指针对应的槽位的值取出来,然后将槽位置为 nil

sync.Pool 设计要点

sync.Pool 中,我们可以看到它有许许多多的编程技巧,为了实现一个高性能的 Pool 要做的东西是非常复杂的, 但是对于我们而言,我们只会用到它暴露出来的两个非常简单的接口 PutGet,这其实也算是 Go 语言的一种设计哲学吧, 把复杂留给自己,把简单留给用户。但是,我们还是要知道它的实现原理,这样才能更好的使用它。

接下来,我们就再来总结一下 sync.Pool 高性能的一些设计要点:

  1. noCopy 字段,防止 sync.Pool 被复制。sync.Pool 是不能被复制的,否则会导致一些隐晦的错误。
  2. local 字段,用于存储与 P 关联的一个 poolLocal 对象,在多个 goroutine 同时操作的时候,可以减少不同 goroutine 之间的竞争,只有在本地的 poolLocal 中没有找到对象的时候,才会去其他 goroutine 关联的队列中去取。
  3. poolDequeuepushHeadpopTail 之间会存在 headtail 指针上的一些竞争,这些竞争问题是通过原子操作来解决的(相比互斥锁效率更高)。使用了原子操作可能就会有失败的时候,这个时候,再次重试就可以了。
  4. poolDequeue 中的 head/tail 指针使用一个字段来保存,然后通过原子操作保证 head/tail 的一致性。
  5. poolChain 使用链表的方式解决容量问题,并且新增一个元素到链表的时候,容量为上一个元素(poolChain 链表头)的两倍(非常常见的扩容策略)。双向链表,可以接受别的 PpopTail 操作,减少竞争的同时可以充分利用多核。
  6. pin 保证 P 不会被抢占。如果一个 goroutine 在执行 Put 或者 Get 期间被挂起,有可能下次恢复时,绑定的就不是上次的 P 了。那整个过程就会完全乱掉,因为获取到的 poolLocal 不是之前那个了。使用 pin 可以解决这种并发问题。
  7. 自旋操作,因为原子操作失败的时候可能存在竞争,这个时候再尝试一下就可能成功了。(for + 原子操作是无锁编程中很常见的一种编程模式,在 sync.Map 中也有很多类似操作)
  8. pad 内存对齐,可以避免伪共享。
  9. poolDequeue 中存储数据的结构是一个环形队列,是连续的内存,可以充分利用 CPU 的 cache。在访问 poolDequeue 某一项时,其附近的数据项都有可能加载到统一 cache line 中,有利于提升性能。同时它是预先分配内存的,因此其中的数据项可不断复用。

总结

最后,总结一下本文内容:

  • sync.Pool 是一个非常有用的工具,它可以帮助我们减少内存的分配和回收(通过复用对象),提升程序的性能。但是,我们要注意它的使用场景,它适合那些没有状态的对象,同时,我们不能对那些从 PoolGet 出来的对象做任何假设。
  • 我们在 Put 或者 Get 之前,可能需要对我们操作的对象重置一下,防止对后续的操作造成影响。
  • Pool 中的对象存储是使用队列的方式,这个队列的实现是一个链表(poolChain),链表的每一个节点都是一个环形队列(poolDequeue)。这个队列支持以下三种操作:
    • pushHead:将一个对象放到队列的头部。
    • popHead:将队列的头部的对象取出来。
    • popTail:将队列的尾部的对象取出来。
  • sync.Pool 的实现中,使用了很多编程技巧,比如 noCopypinpad、原子操作等等,这些技巧都是为了实现一个高性能的 Pool 而做的一些优化,我们可以学习一下,具体参考上一节。
  • sync.Pool 中,PutGet 操作的时候会先将 goroutineP 绑定,然后再去操作 P 关联的 poolLocal,这样可以减少竞争,提升性能。因为每一个 P 都有一个关联的 poolLocal,所以多个 goroutine 操作的时候,可以充分利用多核。在操作完成后,再解除绑定。
  • 考虑到 GC 直接清除 Pool 中的对象会在 GC 后可能会产生性能抖动,所以在 GC 的时候,其实并不会马上清除 Pool 中的对象,而是将这些对象放到 victim 字段中,在 Get 的过程中,如果所有的 poolLocal 中获取不到对象,则会从 victim 中去找。但是再进行 GC 的时候,旧的 victim 会被清除。也就是 Pool 中对象的淘汰会经历两次 GC

在 go 的标准库中,提供了 sync.Cond 这个并发原语,让我们可以实现多个 goroutine 等待某一条件满足之后再继续执行。 它需要配合 sync.Mutex 一起使用,因为 CondWait 方法需要在 Mutex 的保护下才能正常工作。 对于条件变量,可能大多数人只是知道它的存在,但是用到它的估计寥寥无几,因为很多并发场景的处理都能使用 chan 来实现, 而且 chan 的使用也更加简单。 但是在某些场景下,Cond 可能是最好的选择,本文就来探讨一下 Cond 的使用场景,基本用法,以及它的实现原理。

sync.Cond 是什么?

sync.Cond 表示的是条件变量,它是一种同步机制,用来协调多个 goroutine 之间的同步,当共享资源的状态发生变化的时候, 可以通过条件变量来通知所有等待的 goroutine 去重新获取共享资源。

适用场景

在实际使用中,我们可能会有多个 goroutine 在执行的过程中,由于某一条件不满足而阻塞的情况。 这个时候,我们就可以使用条件变量来实现 goroutine 之间的同步。比如,我们有一个 goroutine 用来获取数据, 但是可能会比较耗时,这个时候,我们就可以使用条件变量来实现 goroutine 之间的同步, 当数据准备好之后,就可以通过条件变量来通知所有等待的 goroutine 去重新获取共享资源。

sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候, 它可以用来通知所有等待的 goroutine 去重新获取共享资源。

sync.Cond 的基本用法

sync.Cond 的基本用法非常简单,我们只需要通过 sync.NewCond 方法来创建一个 Cond 实例, 然后通过 Wait 方法来等待条件满足,通过 Signal 或者 Broadcast 方法来通知所有等待的 goroutine 去重新获取共享资源。

NewCond 创建实例

sync.NewCond 方法用来创建一个 Cond 实例,它的参数是一个 Locker 接口,我们可以传入一个 Mutex 或者 RWMutex 实例。 这个条件变量的 Locker 接口就是用来保护共享资源的。

Wait 等待条件满足

Wait 方法用来等待条件满足,它会先释放 Cond 的锁(Cond.L),然后阻塞当前 goroutine(实际调用的是 goparkunlock),直到被 Signal 或者 Broadcast 唤醒。

它做了如下几件事情:

  1. 释放 Cond 的锁(Cond.L),然后阻塞当前 goroutine。(所以,使用之前需要先锁定)
  2. Signal 或者 Broadcast 唤醒之后,会重新获取 Cond 的锁(Cond.L)。
  3. 之后,就返回到 goroutine 阻塞的地方继续执行。

Signal 通知一个等待的 goroutine

Signal 方法用来通知一个等待的 goroutine,它会唤醒一个等待的 goroutine,然后继续执行当前 goroutine。 如果没有等待的 goroutine,则不会有任何操作。

Broadcast 通知所有等待的 goroutine

Broadcast 方法用来通知所有等待的 goroutine,它会唤醒所有等待的 goroutine,然后继续执行当前 goroutine。 如果没有等待的 goroutine,则不会有任何操作。

sync.Cond 使用实例

下面我们通过一个实例来看一下 sync.Cond 的使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package cond

import (
"fmt"
"sync"
"testing"
"time"
)

var done bool
var data string

func write(c *sync.Cond) {
fmt.Println("writing.")
// 让 reader 先获取锁,模拟条件不满足然后 wait 的情况
time.Sleep(time.Millisecond * 10)
c.L.Lock()
// 模拟耗时的写操作
time.Sleep(time.Millisecond * 50)
data = "hello world"
done = true
fmt.Println("writing done.")
c.L.Unlock()
c.Broadcast()
}

func read(c *sync.Cond) {
fmt.Println("reading")
c.L.Lock()
for !done {
fmt.Println("reader wait.")
c.Wait()
}
fmt.Println("read done.")
fmt.Println("data:", data)
defer c.L.Unlock()
}

func TestCond(t *testing.T) {
var c = sync.NewCond(&sync.Mutex{})

go read(c) // 读操作
go read(c) // 读操作
go write(c) // 写操作

time.Sleep(time.Millisecond * 100) // 等待操作完成
}

输出:

1
2
3
4
5
6
7
8
9
10
reading
reader wait. // 还没获取完数据,需要等待
writing.
reading
reader wait.
writing done. // 获取完数据了,通知所有等待的 reader
read done. // 读取到数据了
data: hello world // 输出读取到的数据
read done.
data: hello world

这个例子可以粗略地用下图来表示:

cond_1

说明:

  • read1reader2 表示两个 goroutine,它们都会调用 read 函数。
  • donefalse 的时候,reader1reader2 都会调用 c.Wait() 函数,然后阻塞等待。
  • write 表示一个 goroutine,它会调用 write 函数。
  • write 函数中,获取完数据之后,会将 done 设置为 true,然后调用 c.Broadcast() 函数,通知所有等待的 reader 去重新获取共享资源。
  • reader1reader2 在解除阻塞状态后,都会重新获取共享资源,然后输出读取到的数据。

在这个例子中,done 的功能是标记,用来表示共享资源是否已经获取完毕,如果没有获取完毕,那么 reader 就会阻塞等待。

为什么要用 sync.Cond?

在文章开头,我们说了,很多并发编程的问题都可以通过 channel 来解决。 同样的,在上面提到的 sync.Cond 的使用场景,使用 channel 也是可以实现的, 我们只要 close(ch) 来关闭 channel 就可以实现通知多个等待的协程了。

那么为什么还要用 sync.Cond 呢? 主要原因是,sync.Cond 可以重复地进行 Wait()Signal()Broadcast() 操作, 但是,如果想通过关闭 chan 来实现这个功能的话,那就只能通知一次了。 因为 channel 只能关闭一次,关闭一个已经关闭的 channel 会导致程序 panic。

使用 channel 的另外一种方式是,记录 reader 的数量,然后通过往 channel 中发送多次数据来实现通知多个 reader。 但是这样一来代码就会复杂很多,从另一个角度说,出错的概率大了很多。

close channel 广播实例

下面的例子模拟了使用 close(chan) 来实现 sync.Cond 中那种广播功能,但是只能通知一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package close_chan

import (
"fmt"
"testing"
"time"
)

var data string

func read(c <-chan struct{}) {
fmt.Println("reading.")

// 从 chan 接收数据,如果 chan 中没有数据,会阻塞。
// 如果能接收到数据,或者 chan 被关闭,会解除阻塞状态。
<-c

fmt.Println("data:", data)
}

func write(c chan struct{}) {
fmt.Println("writing.")
// 模拟耗时的写操作
time.Sleep(time.Millisecond * 10)
data = "hello world"
fmt.Println("write done.")

// 关闭 chan 的时候,会通知所有的 reader
// 所有等待从 chan 接收数据的 goroutine 都会被唤醒
close(c)
}

func TestCloseChan(t *testing.T) {
ch := make(chan struct{})

go read(ch)
go read(ch)
go write(ch)

// 不能关闭已经关闭的 chan
time.Sleep(time.Millisecond * 20)
// panic: close of closed channel
// 下面这行代码会导致 panic
//go write(ch)

time.Sleep(time.Millisecond * 100)
}

输出:

1
2
3
4
5
6
writing.
reading. // 会阻塞直到写完
reading. // 会阻塞直到写完
write done. // 写完之后,才能读
data: hello world
data: hello world

上面例子的 write 不能多次调用,否则会导致 panic。

sync.Cond 基本原理

go 的 sync.Cond 中维护了一个链表,这个链表记录了所有阻塞的 goroutine,也就是由于调用了 Wait 而阻塞的 goroutine。 而 SignalBroadcast 方法就是用来唤醒这个链表中的 goroutine 的。 Signal 方法只会唤醒链表中的第一个 goroutine,而 Broadcast 方法会唤醒链表中的所有 goroutine

下图是 Signal 方法的效果,可以看到,Signal 方法只会唤醒链表中的第一个 goroutine

cond_2

说明:

  • notifyListsync.Cond 中维护的一个链表,这个链表记录了所有阻塞的 goroutine
  • head 是链表的头节点,tail 是链表的尾节点。
  • Signal 方法只会唤醒链表中的第一个 goroutine

Broadcast 方法会唤醒 notifyList 中的所有 goroutine

sync.Cond 的设计与实现

最后,我们来看一下 sync.Cond 的设计与实现。

sync.Cond 模型

sync.Cond 的模型如下所示:

1
2
3
4
5
6
7
8
9
type Cond struct {
noCopy noCopy

// L is held while observing or changing the condition
L Locker // L 在观察或改变条件时被持有

notify notifyList
checker copyChecker
}

属性说明:

  • noCopy 是一个空结构体,用来检查 sync.Cond 是否被复制。(在编译前通过 go vet 命令来检查)
  • L 是一个 Locker 接口,用来保护条件变量。
  • notify 是一个 notifyList 类型,用来记录所有阻塞的 goroutine
  • checker 是一个 copyChecker 类型,用来检查 sync.Cond 是否被复制。(如果在运行时被复制,会导致 panic

notifyList 结构体

notifyListsync.Cond 中维护的一个链表,这个链表记录了所有因为共享资源还没准备好而阻塞的 goroutine。它的定义如下所示:

1
2
3
4
5
6
7
8
9
type notifyList struct {
wait atomic.Uint32
notify uint32

// 阻塞的 waiter 名单。
lock mutex // 锁
head *sudog // 阻塞的 goroutine 链表(链表头)
tail *sudog // 阻塞的 goroutine 链表(链表尾)
}

属性说明:

  • wait 是下一个 waiter 的编号。它在锁外自动递增。
  • notify 是下一个要通知的 waiter 的编号。它可以在锁外读取,但只能在持有锁的情况下写入。
  • lock 是一个 mutex 类型,用来保护 notifyList
  • head 是一个 sudog 类型,用来记录阻塞的 goroutine 链表的头节点。
  • tail 是一个 sudog 类型,用来记录阻塞的 goroutine 链表的尾节点。

notifyList 的方法说明:

notifyList 中包含了几个操作阻塞的 goroutine 链表的方法。

  • notifyListAdd 方法将 waiter 的编号加 1。
  • notifyListWait 方法将当前的 goroutine 加入到 notifyList 中。(也就是将当前协程挂起)
  • notifyListNotifyOne 方法将 notifyList 中的第一个 goroutine 唤醒。
  • notifyListNotifyAll 方法将 notifyList 中的所有 goroutine 唤醒。
  • notifyListCheck 方法检查 notifyList 的大小是否正确。

sync.Cond 的方法

notifyList 就不细说了,本文重点讲解一下 sync.Cond 的实现。

Wait 方法

Wait 方法用在当条件不满足的时候,将当前运行的协程挂起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *Cond) Wait() {
// 检查是否被复制
c.checker.check()
// 更新 notifyList 中需要等待的 waiter 的数量
// 返回当前需要插入 notifyList 的编号
t := runtime_notifyListAdd(&c.notify)
// 解锁
c.L.Unlock()
// 挂起当前 g,直到被唤醒
runtime_notifyListWait(&c.notify, t)
// 唤醒之后,重新加锁。
// 因为阻塞之前解锁了。
c.L.Lock()
}

对于 Wait 方法,我们需要注意的是,使用之前,我们需要先调用 L.Lock() 方法加锁,然后再调用 Wait 方法,否则会报错。

文档里面的例子:

1
2
3
4
5
6
7
c.L.Lock()
for !condition() {
c.Wait()
}
// ...使用条件...
// 这里是我们在条件满足之后,需要执行的代码。
c.L.Unlock()

好了,问题来了,调用 Wait 方法之前为什么要先加锁呢?

这是因为在我们使用共享资源的时候,可能一些代码是互斥的,所以我们需要加锁。 这样我们就可以保证在我们使用共享资源的时候,不会被其他协程修改。 但是如果因为条件不满足,我们需要等待的话,我们不可能在持有锁的情况下等待, 因为在修改条件的时候,可能也需要加锁,这样就会造成死锁。

另外一个问题是,为什么要使用 for 来检查条件是否满足,而不是使用 if 呢?

这是因为在我们调用 Wait 方法之后,可能会有其他协程唤醒我们,但是条件并没有满足, 这个时候依然是需要继续 Wait 的。

Signal 方法

Signal 方法用在当条件满足的时候,将 notifyList 中的第一个 goroutine 唤醒。

1
2
3
4
5
6
func (c *Cond) Signal() {
// 检查 sync.Cond 是否被复制了
c.checker.check()
// 唤醒 notifyList 中的第一个 goroutine
runtime_notifyListNotifyOne(&c.notify)
}

Broadcast 方法

Broadcast 方法用在当条件满足的时候,将 notifyList 中的所有 goroutine 唤醒。

1
2
3
4
5
6
func (c *Cond) Broadcast() {
// 检查 sync.Cond 是否被复制了
c.checker.check()
// 唤醒 notifyList 中的所有 goroutine
runtime_notifyListNotifyAll(&c.notify)
}

copyChecker 结构体

copyChecker 结构体用来检查 sync.Cond 是否被复制。它实际上只是一个 uintptr 类型的值。

1
2
3
4
5
6
7
8
9
10
type copyChecker uintptr

// check 方法检查 copyChecker 是否被复制了。
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}

copyChecker 的值只有两种可能:

  1. 0,表示还没有调用过 Wait, SignalBroadcast 方法。
  2. uintptr(unsafe.Pointer(&copyChecker)),表示已经调用过 Wait, SignalBroadcast 方法。在这几个方法里面会调用 check 方法,所以 copyChecker 的值会被修改。

所以如果 copyChecker 的值不是 0,也不是 uintptr(unsafe.Pointer(&copyChecker))(也就是最初的 copyChecker 的内存地址),则表示 copyChecker 被复制了。

需要注意的是,这个方法在调用 CompareAndSwapUintptr 还会检查一下,这是因为有可能会并发调用 CompareAndSwapUintptr, 如果另外一个协程调用了 CompareAndSwapUintptr 并且成功了,那么当前协程的这个 CompareAndSwapUintptr 调用会返回 false, 这个时候就需要检查是否是因为另外一个协程调用了 CompareAndSwapUintptr 而导致的,如果是的话,就不会 panic

为什么 sync.Cond 不能被复制?

从上一小节中我们可以看到,sync.Cond 其实是不允许被复制的,但是如果是在调用 Wait, SignalBroadcast 方法之前复制,那倒是没关系。

这是因为 sync.Cond 中维护了一个阻塞的 goroutine 列表。如果 sync.Cond 被复制了,那么这个列表就会被复制,这样就会导致两个 sync.Cond 都包含了这个列表;但是我们唤醒的时候,只会有其中一个 sync.Cond 被唤醒,另外一个 sync.Cond 就会一直阻塞。 所以 go 直接从语言层面限制了这种情况,不允许 sync.Cond 被复制。

总结

  • sync.Cond 是一个条件变量,它可以用来协调多个 goroutine 之间的同步,当条件满足的时候,去通知那些因为条件不满足被阻塞的 goroutine 继续执行。
  • sync.Cond 的接口比较简单,只有 Wait, SignalBroadcast 三个方法。
    • Wait 方法用来阻塞当前 goroutine,直到条件满足。调用 Wait 方法之前,需要先调用 L.Lock 方法加锁。
    • Signal 方法用来唤醒 notifyList 中的第一个 goroutine
    • Broadcast 方法用来唤醒 notifyList 中的所有 goroutine
  • sync.Cond 的实现也比较简单,它的核心就是 notifyList,它是一个链表,用来保存所有因为条件不满足而被阻塞的 goroutine
  • 用关闭 channel 的方式也可以实现类似的广播功能,但是有个问题是 channel 不能被重复关闭,所以这种方式无法被多次使用。也就是说使用这种方式无法多次广播。
  • 使用 channel 发送通知的方式也是可以的,但是这样实现起来就复杂很多了,就更容易出错了。
  • sync.Cond 中使用 copyChecker 来检查 sync.Cond 是否被复制,如果被复制了,就会 panic。需要注意的是,这里的复制是指调用了 WaitSignalBroadcast 方法之后,sync.Cond 被复制了。在调用这几个方法之前进行复制是没有影响的。

本文基于 Go 1.19

在上一篇文章中(《深入理解 go sync.Map - 基本原理》),我们探讨了 go 中 sync.Map 的一些基本内容,如 map 并发使用下存在的问题,如何解决这些问题等。 我们也知道了 sync.Map 的一些基本操作,但是我们还是不知道 sync.Map 是如何实现的,以及为什么在特定场景下,sync.Mapmap + Mutex/RWMutex 快。 本篇文章就来继续深入探讨 sync.Map,对 sync.Map 的设计与实现进行更加详尽的讲解。

sync.Map 概览

开始之前,我们先来了解一下 sync.Map 的数据结构,以及其一个大概的模型。这对于我们了解 sync.Map 的设计非常有好处。

本文用到的一些名词解析

  • readread map:都是指 sync.Map 中的只读 map,即 sync.Map 中的 m.read
  • dirtydirty map:都是指 sync.Map 中的可写 map,即 sync.Map 中的 m.dirty
  • entrysync.Map 中的 entry,这是保存值的结构体,它是一个原子类型的指针。其中的指针指向 key 对应的值。

sync.Map 的数据结构

sync.Map 的数据结构如下:

readdirtysync.Map 中最关键的两个数据结构,它们之间可以相互转化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 在 sync.Map 中的作用是一个特殊的标记
var expunged = new(any)

// sync.Map
type Map struct {
// 互斥锁
mu sync.Mutex
// 只读 map,用于读操作
read atomic.Pointer[readOnly]
// dirty map,写入操作会先写入 dirty map
dirty map[any]*entry
// 记录需要从 dirty map 中读取 key 的次数。
// 也就是没有在 read map 中找到 key 的次数。
misses int
}

// readOnly 是一个只读的 map
type readOnly struct {
m map[any]*entry // dirty map 中的 key 的一份快照
amended bool // 记录是否在 dirty map 中有部分 read map 中不存在的 key
}

// 实际存储值的结构体。
// p 有三种状态:nil, expunged, 正常状态。
type entry struct {
p atomic.Pointer[any]
}

说明:

  • expunged 是一个特殊的标记,用于表示 entry 中的值已经被删除。并且那个 keydirty map 中已经不存在了。
  • Map 也就是我们使用的 sync.Map,它有一个 mu 互斥锁,用于保护 dirty map
  • Map 中有两个 map,一个是 read map,一个是 dirty map
    • read map 是一个只读的 map,但不是我们在其他地方说的只读。它的只读的含义是,它的 key 是不能增加或者删除的。但是 value 是可以修改的。
    • dirty map 是一个可读写的 map,新增 key 的时候会写入 dirty map
  • misses 是一个 int 类型的变量,用于记录 read map 中没有找到 key 的次数。当 misses 达到一定的值的时候,会将 dirty map 中的 key 同步到 read map 中。
  • readOnly 是一个只读的 map,它的 m 字段是一个 map,用于保存 dirty map 中的 key 的一份快照。readOnly 中的 amended 字段用于记录 dirty map 中是否有 read map 中不存在的 key
  • entry 是一个结构体,它有一个 p 字段,用于保存 key 对应的值。p 字段有三种状态:nilexpunged、正常状态。expunged 是一个特殊的标记,用于表示 key 对应的值已经被删除,并且那个 keydirty map 中已经不存在了。

因为在 sync.Map 中是使用了特殊的标记来表示删除的,也就是不需要使用 delete 函数来删除 key。这样就可以利用到了原子操作了,而不需要加锁。这样就能获得更好的性能了。

sync.Map 的整体模型

上一小节我们已经介绍了 sync.Map 的数据结构,现在让我们来看一下 sync.Map 的整体模型。 它的整体模型如下:

sync_map_1

关键说明:

  • read map 是一个只读的 map,不能往里面添加 key。而 dirty map 是一个可读写的 map,可以往里面添加 key
  • sync.Map 实现中,基本都是会先从 read map 中查找 key,如果没有找到,再从 dirty map 中查找 key。然后根据查找结果来进行后续的操作。
  • 如果 read map 中没有找到 key,需要加锁才能从 dirty map 中查找 key。因为 dirty map 是一个可读写的 map,所以需要加锁来保证并发安全。

这实际上是一种读写分离的理念。

sync.Map 的工作流程

通过看它的数据结构和整体模型,想必我们依然对 sync.Map 感到很陌生。现在再来看看 sync.Map 的工作流程,这样我们就能知道其中一些字段或者结构体的实际作用了。

下面,我们通过一些 map 的常规操作来看一下 sync.Map 的工作流程:

  1. 添加 key:如果是第一次写入 key 的话(假设其值为 value),会先写入 dirty map,在 dirty map 中的 value 是一个指向 entry 结构体的指针。entry 结构体中的 p 字段也是一个指针,它指向了 value 的内存地址。
  2. 读取 key:先从 read 中读取(无锁,原子操作),read 中找不到的时候再去 dirty 中查找(有锁)。
  3. 修改 key:如果 keyread map 中存在的话,会直接修改 key 对应的 value。如果 keyread map 中不存在的话,会去 dirty map 中查找(有锁),如果在 dirty map 中也不存在的话,则修改失败。
  4. 删除 key:如果 keyread map 中存在的话,会将 key 对应的 entry 指针设置为 nil(实际上是打标记而已,并没有删除底层 mapkey)。如果在 read 中找不到,并且 dirty 有部分 read 中不存在的 key 的话,会去 dirty map 中查找(有锁),如果在 dirty map 中也不存在的话,则删除失败。

可能我们看完这一大段说明还是不会太懂,但是没关系,下面对每一个操作都有图,结合我画的图应该可以更好地理解。

深入之前需要了解的一些背景知识

sync.Map 中有一些我们需要有基本了解的背景知识,这里简单说一下。

sync.Map 中,需要读写 dirty map 的时候,都需要加锁,加的锁是 sync.Mutex。对于这把锁,我们需要知道的是: sync.Mutex 是一个互斥锁。当一个 goroutine 获得了 sync.Mutex 的使用权之后(Lock 调用成功),其他的 goroutine 就只能等待,直到该 goroutine 释放了 sync.Mutex(持有锁的 goroutine 使用了 Unlock 释放锁)。

所以,我们在源码中看到 m.mu.Lock() 这行代码的时候,就应该知道,从这一行代码直到 m.mu.Unlock() 调用之前,其他 goroutine 调用 m.mu.Lock() 的时候都会被阻塞。

sync.Map 中,dirty map 的读写都需要加锁,而读 read map 的时候不需要锁的。

原子操作

go 语言中的原子操作是指,不会被打断的操作。也就是说,当一个 goroutine 执行了一个原子操作之后,其他的 goroutine 就不能打断它,直到它执行完毕。 这可以保证我们的一些操作是完整的,比如给一个整数加上一个增量,如果不使用原子操作,而是先取出来再进行加法运算,再写回去这样操作的话, 就会出现问题,因为这个过程有可能被打断,如果另外一个 goroutine 也在进行这个操作的话,就有可能会出现数据错乱的问题。

而原子操作的 Add(比如 atomic.Int32Add 方法)可以在加法过程中不被打断,所以我们可以保证数据的完整性。 这里说的不被打断说的是:这个原子操作完成之前,其他 goroutine 不能操作这个原子类型

除了 Add 方法,atomic 包中还有 LoadStoreSwap 等方法,这些方法都是原子操作,可以保证数据的完整性。

sync.Map 中,对 entry 状态的修改都是通过原子操作实现的。

CAS

CAS 是 Compare And Swap 的缩写,意思是比较并交换。CAS 操作是一种原子操作,它的原理是:当且仅当 内存值 == 预期值 时,才会将 内存值 修改为 新值。 使用代码表示的话,大概如下:

1
2
3
4
5
6
if *addr == old {
*addr = new
return true
}

return false

也就是说:

  • CAS 原子操作会先进行比较,如果 内存值 == 预期值,则执行交换操作,将 内存值 修改为 新值,并返回 true
  • 否则,不执行交换操作,直接返回 false

CAS 如果比较发现相同就会交换,如果不相同就不交换,这个过程是原子的,不会被打断。在 sync.Map 中,修改 entry 的状态的时候,有可能会使用到 CAS。

double-checking(双重检测)

这是一种尽量减少锁占用的策略,在单例模式中可能会用到:

1
2
3
4
5
6
7
8
9
10
11
12
// 第一次检查不使用锁
if instance == nil {
mu.Lock()
defer mu.Unlock()
// 获取到锁后,还要再次检查,
// 因为有可能在等待锁的时候 instance 已经被初始化了
if instance == nil {
instance = new()
}
}

return instance

上面这个例子中,在获取到锁之后,还进行了一次检查,这是因为 mu.Lock() 如果获取不到锁,那么当前 goroutine 就会被挂起,等待锁被释放。 如果在等待锁的过程中,另外一个 goroutine 已经初始化了 instance,那么当前 goroutine 就不需要再初始化了,所以需要再次检查。

如果第二次检查发现 instance 已经被初始化了,那么就不需要再初始化了,直接返回 instance 即可。

sync.Map 中,也有类似的双重检测,比如在 Load 方法中,会先从 read 中获取 entry,如果没有,就会加锁,获取到锁后,再去检查一下 read 中是否有 entry,如果没有,才会从 dirty 中获取 entry。这是因为在等待锁的时候可能有其他 goroutine 已经将 key 放入 read 中了(比如做了 Range 遍历)。

dirty map 和 read map 之间的转换

上面我们说了,写入新的 key 的时候,其实是写入到 dirty 中的,那什么时候会将 key 写入到 read 中呢? 准确来说,sync.Map 是不会往 read map 中写入 key 的,但是可以使用 dirty map 来覆盖 read map

dirty map 转换为 read map

dirty map 转换为 read map 的时机是:

  • missess 的次数达到了 len(dirty) 的时候。这意味着,很多次在 read map 中都找不到 key,这种情况下是需要加锁才能再从 dirty map 中查找的。这种情况下,就会将 dirty map 转换为 read map,这样后续在 read map 中能找到 key 的话就不需要加锁了。
  • 使用 Range 遍历的时候,如果发现 dirty map 中有些 keyread map 中没有,那么就会将 dirty map 转换为 read map。然后遍历的时候就遍历一下 read map 就可以了。(如果 read map 中的 keydirty map 中的 key 完全一致,那直接遍历 read map 就足够了。)
sync_map_2

dirty map 转换为 read map 的操作其实是很简单的,就是使用 dirty map 直接覆盖掉 read map,然后将 dirty map 置为 nil,同时 misses 重置为 0

简单来说,如果因为新增了 key 需要频繁加锁的时候,就会将 dirty map 转换为 read map

read map 转换为 dirty map

read map 转换为 dirty map 的时机是:

  • dirty mapnil 的情况下,需要往 dirty map 中增加新的 key
sync_map_3

read map 转换为 dirty map 的时候,会将 read map 中正常的 key 复制到 dirty map 中。 但是这个操作完了之后,read map 中的那些被删除的 key 占用的空间是还没有被释放的。 那什么时候释放呢?那就是上面说的 dirty map 转换为 read map 的时候。

sync.Map 中 entry 的状态

sync.Map 中,read mapdirty map 中相同 keyentry 都指向了相同的内容(共享的)。 这样一来,我们就不需要维护两份相同的 value 了,这一方面减少了内存使用的同时,也可以保证同一个 key 的数据在 readdirty 中看到都是一致的。 因为我们可以通过原子操作来保证对 entry 的修改是安全的(但是增加 key 依然是需要加锁的)。

entry 的状态有三种:

  • nil:被删除了,read mapdirty map 都有这个 key
  • expunged:被删除了,但是 dirty map 中没有这个 key
  • 正常状态:可以被正常读取。

它们的转换关系如下:

sync_map_4

说明:

  1. key 被删除
  2. dirty mapnil 的时候,需要写入新的 keyread 中被删除的 key 状态会由 nil 修改为 expunged
  3. 被删除的 key,重新写入
  4. read 中被删除的 keydirty map 中不存在的),在再次写入的时候会发生

注意:expunged 和正常状态之间不能直接转换,expungedkey 需要写入的话,需要先修改其状态为 nil。正常状态被删除之后先转换为 nil,然后在创建新的 map 的时候才会转换为正常状态。也就是 1->24->3 这两种转换)

不存在由正常状态转换为 expunged 或者由 expunged 转换为正常状态的情况。

entry 状态存在的意义

entry 的状态存在的意义是什么呢?我们去翻阅源码的时候会发现,其实 sync.Map 在删除的时候, 如果在 read map 中找到了 key,那么删除操作只是将 entry 的状态修改为 nil(通过原子操作修改),并没有真正的删除 key

也就是并不像我们使用普通 map 的时候那种 delete 操作,会将 keymap 中删除。 这样带来的一个好处就是,删除操作我们也不需要加锁了,因为我们只是修改了 entry 的状态,而不是真正的删除 key。 这样就可以获得更好的性能了。

就算转换为了 nil 状态,也依然可以转换为 expunged 或者正常状态,具体看上一个图。

read.amended 的含义

我们往 sync.Map 中写入新的 key 的时候,会先写入 dirty map,但是不会写入 read map。 这样一来,我们在读取的时候就需要注意了,因为我们要查找的 key 是有可能只存在于 dirty map 中的, 那么我们是不是每次在 read map 中找不到的时候都需要先去 dirty map 中查找呢?

答案是否定的。我们从 dirty map 中进行查找是有代价的,因为要加锁。如果不加锁,遇到其他 goroutine 写入 dirty map 的时候就报错了。 针对这种情况,一种比较简单的解决方法是,增加一个标志,记录一下 read mapdirty map 中的 key 是否是完全一致的。 如果是一致的,那么我们就不需要再加锁,然后去 dirty map 中查找了。否则,我们就需要加锁,然后去 dirty map 中查找。

sync.Map 中的 amended 字段就是这里说的标志字段。单单说文字可能有点抽象,我们可以结合下图理解一下:

sync_map_5

read.amended 的含义就是 read mapdirty map 中的 key 是否是完全一致的。如果为 true,说明有些 key 只存在于 dirty map 中。

sync.Map 源码剖析

sync.Map 提供的方法并不多,它能做的操作跟普通的 map 差不多,只是在并发的情况下,它能保证线程安全。 下面是 sync.Map 所能提供的方法:

  • Store/Swap(增/改): 往 sync.Map 中写入新的 key。(Store 实际调用了 Swap 方法)
  • Load(查): 从 sync.Map 中读取 key
  • LoadOrStore(查/增/改): 从 sync.Map 中读取 key,如果不存在,就写入新的 key
  • Delete/LoadAndDelete(删): 从 sync.Map 中删除 key。(Delete 实际调用了 LoadAndDelete 方法)
  • Range: 遍历 sync.Map 中的所有 key

还有两个可能比较少用到的方法:

  • CompareAndDelete: 从 sync.Map 中删除 key,但是只有在 key 的值跟 old 相等的时候才会删除。
  • CompareAndSwap: 从 sync.Map 中写入新的 key,但是只有在 key 的值跟 old 相等的时候才会写入。

接下来我们会从源码的角度来分析一下 sync.Map 的实现。

Store/Swap 源码剖析

Store 实际上是对 Swap 方法的调用,所以我们看 Swap 方法的源码就够了:

Swap 方法的作用是:交换一个 key 的值,并返回之前的值(如果有的话)。 返回值中的 prev 就是之前的值,loaded 表示 key 是否存在。

下面是 Swap 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
// 读取 read map
read := m.loadReadOnly()
// 先从 read map 中读取 key
if e, ok := read.m[key]; ok {
// 在 read map 中读取到了 key
if v, ok := e.trySwap(&value); ok { // ok 表示是否成功交换
// swap 成功
if v == nil { // 之前的值为 nil,表示 key 之前已经被删除的了
return nil, false
} // 之前的值不为 nil,表示存在
return *v, true
}

// 执行到这里表示:
// read map 中存在 key,但是已经被删除。(为 expunged 状态)
}

// read map 中找不到 key,加锁,从 dirty map 中继续找
m.mu.Lock()
// double checking,二次检查,因为有可能等待锁的时候 read map 已经发生了变化
read = m.loadReadOnly()
if e, ok := read.m[key]; ok { // read map 中存在 key
if e.unexpungeLocked() { // 将 entry 由 expunged 状态改为 nil 状态
// key 之前已经被删除了,并且之前 dirty map 中不存在 key,
// 所以这里需要将 key 添加到 dirty map 中。
m.dirty[key] = e
}
// 写入新的值,v 是旧的值
if v := e.swapLocked(&value); v != nil {
// v 不为 nil,表示之前存在
loaded = true
previous = *v
}
} else if e, ok := m.dirty[key]; ok { // read map 中不存在 key,但是 dirty map 中存在 key
// 写入新的值,v 是旧的值
if v := e.swapLocked(&value); v != nil {
// v 不为 nil,表示之前存在
loaded = true
previous = *v
}
} else { // read map 中不存在 key,dirty map 中也不存在 key(需要写入新的 key)
if !read.amended { // dirty map 和 read map 的 key 完全一致)
// 现在要写入新的 key 了,所以这个 amended 状态得修改了。
// 我们正在将第一个新键添加到 dirty map 中。
// 确保它已分配并将 read map 的 amended 标记设置为 true。
m.dirtyLocked()
// amended 设置为 true,因为下面要写入一个在 read map 中不存在的 key
m.read.Store(&readOnly{m: read.m, amended: true})
}
// 新增的 key,dirty map 中不存在,所以直接写入
m.dirty[key] = newEntry(value)
}
// 解锁
m.mu.Unlock()
return previous, loaded
}

Swap/Store 图示

sync_map_6

注意:这里的 read mapdirty map 中都没有包含 entry,我们知道它们中相同的 key 都指向相同的 entry 就可以了。

Swap 的操作流程

  1. read map 中读取 key,如果存在,就直接交换 value,并返回之前的 value
  2. 如果 read map 中不存在 key,就加锁,加锁后,再从 read map 中读取 key,如果存在,就直接交换 value,并返回之前的 value。(double checking
  3. 加锁后,如果在 read map 中依然找不到 key,再从 dirty map 中读取 key,如果存在,就直接交换 value,并返回之前的 value
  4. 如果 read mapdirty map 都不存在 key,就将 key 添加到 dirty map 中,并返回 nil。在这一步中,如果 read mapdirty mapkey 完全一致,就将 read mapamended 状态设置为 true

在第 4 步中,还有一个关键操作就是 dirtyLocked(),这个操作的作用是保证 dirty map 初始化,如果 dirty map 已经初始化,就不会做任何操作。 如果 dirty mapnil,那么会初始化,然后将 read map 中未被删除的 key 添加到 dirty map 中。

dirtyLocked() 源码剖析

dirtyLocked() 的作用是保证 dirty map 初始化,如果 dirty map 已经初始化,就不会做任何操作。

之所以 dirty map 需要初始化,是因为在 dirty map 转换为 read map 的时候,dirty map 会被设置为 nil, 但是新增 key 的时候是要写入到 dirty map 的,所以需要重新初始化。 具体可以看上面的 dirty map 和 read map 的之间的转换 这一节。

dirtyLocked() 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 1. 如果 m.dirty 为 nil,则创建一个新的 dirty map。
// 2. 否则,不做任何操作
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

read := m.loadReadOnly()
// dirty map 初始化
m.dirty = make(map[any]*entry, len(read.m))
// 对于 read map 中的 key,如果不是 expunged,则将其复制到 dirty map 中。
// read map 中 nil 的 key 会被转换为 expunged 状态。
for k, e := range read.m {
// 不是 expunged 的 entry,才会被复制到 dirty map 中。
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}

dirtyLocked() 图示:

sync_map_7

dirtyLocked() 里有个需要注意的地方就是,它会将 read map 中的 nilkey 转换为 expunged 状态。 expunged 状态表明这个 key 只是在 read map 中,而不在 dirty map 中。 做完迁移之后,dirty map 其实就不包含那些被删除的 key 了。

Swap/Store 关键说明

Swap 方法里面其实基本已经包含了 sync.Map 主要设计理念了,下文讲解其他方法的时候,其中一些细节不再做过多的解释了:

  1. sync.Map 在做很多操作的时候,都会先从 read map 中读取,如果 read map 中不存在,再从 dirty map 中读取。
  2. 如果需要从 dirty map 中读取,那么会先加锁,然后再从 dirty map 中读取。
  3. sync.Map 在对 entry 进行操作的时候,都是通过原子操作进行的。(这是因为有些写操作是没有 mu.Lock() 保护的

而对于 dirty mapread map 的转换等只是一些实现细节的上的问题,我们如果了解了它的设计理念,那么就可以很容易的理解它的实现了。

Swap/Store 里的原子操作

这里面用了很多原子操作:

  • m.loadReadOnly(): 读取 read map
  • e.trySwap(&value): 交换 key 的值。key 存在的时候,直接通过原子操作使用新的值覆盖旧的。(如果 key 只存在于 read map 中的话,这个操作会失败。)
  • e.unexpungeLocked(): 将 entryexpunged 状态改为 nil 状态。
  • e.swapLocked(&value): 交换 key 的值。key 存在的时候,直接通过原子操作使用新的值覆盖旧的。
  • m.read.Store(&readOnly{m: read.m, amended: true}): 将 read mapamended 状态设置为 true

为什么使用原子操作

为什么要使用原子操作呢?这是因为 sync.Map 中有一些写操作是没有加锁的,比如删除的时候, 删除的时候只是将 entry 的状态通过原子操作改成了 nil 状态。 如果不使用原子操作,那么就会出现并发问题。

比如:在 m.mu.Lock() 保护的临界区内先读取了 entry 的状态,我们还没来得及对其做任何操作, 在另外一个 goroutineentry 的状态被修改了,那么我们临界区内的 entry 状态已经成为它的历史状态了, 如果这个时候再基于这个状态做任何操作都会导致并发问题。

Load 源码剖析

Load 方法的作用是从 sync.Map 中读取 key 对应的值。 在 sync.Map 的实现中,key 的查找都遵循以下的查找流程:

sync_map_8

注意:从 read map 查找不需要加锁,从 dirty map 中查找需要加锁。

下面是 Load 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Load 返回存储在 map 中的键值,如果不存在值则返回 nil。
// ok 结果表明是否在 map 中找到了值。
func (m *Map) Load(key any) (value any, ok bool) {
// 通过原子操作获取只读 map
read := m.loadReadOnly()
e, ok := read.m[key]
// 不在只读 map 中,并且 dirty map 包含一些 key 不在 read.m 中。
if !ok && read.amended {
m.mu.Lock()

// double checking
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended { // 仍然不在只读 map 中,并且 dirty map 包含一些 key 不在 read.m 中。
e, ok = m.dirty[key] // 从 dirty map 中获取
// 不管条目是否存在,记录一个未命中:这个键将走慢路径,直到脏映射被提升为读映射。
m.missLocked() // read 中读不到
}
m.mu.Unlock()
}
// key 不存在
if !ok {
return nil, false
}
// key 存在,通过原子操作获取值
return e.load()
}

Load 图示

sync_map_9

其实 Load 的过程大概就是前一个图的查找 key 的过程,只不过其中有一步 missLocked(), 这个操作是用来记录 key 未命中的次数的。在达到一定次数之后,会将 dirty map 提升为 read map

missLocked 源码剖析

missLocked 的实现是很简单的,就是将 misses 加 1,如果 misses 达到了 dirty map 的大小, 就会将 dirty map 提升为 read map

1
2
3
4
5
6
7
8
9
10
11
12
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
// 未命中的次数达到 len(m.dirty),将 dirty map 提升为 read map
m.read.Store(&readOnly{m: m.dirty})
// 重置 dirty map
m.dirty = nil
// 重置 misses
m.misses = 0
}

这个过程可以用下图表示:

sync_map_10

Load 工作流程

Load 方法的工作流程如下:

  1. 通过原子操作获取 read map。如果 read map 中存在 key,则直接返回 key 对应的值。
  2. 如果 dirty map 中包含了一些 read map 中不存在的 key,则需要加锁,再次获取 read map
  3. 如果 read map 中不存在 key,则从 dirty map 中获取 key 对应的值(同时调用 missLocked())。否则返回从 read map 中获取到的 key 对应的值。

LoadOrStore 源码剖析

LoadOrStore 方法的作用是从 sync.Map 中读取 key 对应的值,如果不存在则将 keyvalue 存入 sync.Map 中。 其实它跟 Load 方法整体流程上也是差不多的,只不过它在找到 key 的时候,会将 keyvalue 存入 sync.Map 中。 如果没有找到 key,则新增 keydirty map 中。

下面是 LoadOrStore 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// LoadOrStore 返回键的现有值(如果存在)。
// 否则,它存储并返回给定的值。
// 返回值:loaded 表明是否是加载的值,而不是存储的值。actual 是当前存储的值。
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {
// 如果从 read map 中获取到了 key,则不需要加锁。
read := m.loadReadOnly()
if e, ok := read.m[key]; ok { // key 是 expunged 状态的时候,ok 为 false
actual, loaded, ok := e.tryLoadOrStore(value)
if ok { // Load 或者 Store 成功
return actual, loaded
}
}

// 加锁
m.mu.Lock()
// double checking
read = m.loadReadOnly()
if e, ok := read.m[key]; ok {
// key 存在于 read map 中
if e.unexpungeLocked() { // 状态:expunged => nil
// 之前是 expunged 状态,现在变成了 nil 状态。需要在 dirty map 中写入 e。
m.dirty[key] = e
}
// 再次对 entry 执行尝试 Load 或者 Store 新的值的操作
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
// key 存在于 dirty map 中
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked() // misses++,表示 read map 中没有该 key
} else {
// key 不存在于 read map 和 dirty map 中。
if !read.amended {
// 下面需要往 dirty map 中写入新的 key,所以需要确保 dirty map 被初始化。
m.dirtyLocked()
// dirty map 中现在有一些 read map 中不存在的 key,所以需要将 read map 的 amended 置为 true。
m.read.Store(&readOnly{m: read.m, amended: true})
}
// 写入 dirty map
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()

return actual, loaded
}

LoadOrStore 图示

sync_map_11

LoadOrStore 工作流程

  1. keyread map 中找到,尝试在 read mapLoadStore,操作成功则返回。找不到则加锁,然后二次检查(double checking)。
  2. read map 中依然找不到,但是 keydirty map 中找到,尝试在 dirty mapLoadStore,操作成功则返回。(missLocked
  3. key 不存在,往 dirty map 中写入 keyvalue。(如果 dirty mapnil,则先进行初始化),然后read mapamended 修改为 true

tryLoadOrStore 源码剖析

我们发现,在 LoadOrStore 方法中,找到 key 之后,都是调用 tryLoadOrStore 方法来进行 LoadStore 操作的。 它的作用就是在 entry 上尝试 LoadStore 操作,简单来说就是,如果 key 已经存在则 Load,否则 Store(当然,实际上没有这么简单)。

我们先来看看它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 如果 entry 未被删除,tryLoadOrStore 会自动加载或存储一个值。
// 如果 entry 被删除,tryLoadOrStore 将保持条目不变并返回 ok==false。
//
// 返回值:
// ok:操作是否成功(Load 成功、Store 成功)
// loaded:表示是否是 Load 出来的
// actual:Load 到的值
func (e *entry) tryLoadOrStore(i any) (actual any, loaded, ok bool) {
// 获取 entry 的状态
p := e.p.Load()
// 这个 key 只存在于 read map 中,并且它已经被删除了
if p == expunged {
return nil, false, false
}
// key 是正常状态,Load 成功,返回
if p != nil {
return *p, true, true
}

// p 是 nil,说明 key 不存在,需要 Store
ic := i
for { // 循环直到 Load 或者 Store 成功(类似自旋锁)
// Store 成功
if e.p.CompareAndSwap(nil, &ic) {
return i, false, true
}
// Store 失败,重新获取 entry 的状态
p = e.p.Load()
// 被删除了
if p == expunged {
return nil, false, false
}
// 还没被删除,说明 key 存在
if p != nil {
return *p, true, true
}
}
}

tryLoadOrStore 的逻辑可以用下图表示:

sync_map_12

pnil 的情况下,会有一个 for 循环一直尝试 Load 或者 Store,一旦成功就会返回。

unexpungeLocked 的作用

LoadOrStore 方法中,我们发现,如果 keyread map 中找到,会先调用 unexpungeLocked 方法。 读到这里,可能很多读者对 expungeunexpunge 有点懵逼,不知道它们是干什么的。

简单来说,expunge 就是表明 key 已经被删除了,并且这个 key 只存在于 read map 中(在 dirty map 中不存在)。 而 unexpunge 的作用就是取消 expunge 的效果(因为要往这个 key 写入新的值了),紧接着我们会往 dirty map 中写入这个 key

我们可以结合下图来思考一下:

sync_map_13

注意:实际中 entry 并不是连续存储的。

expunged 状态说明:

  1. p == expungedkey 已被删除,并且 dirty map 不为 nil,并且 dirty 中没有这个 key
  2. p == nilkey 已被删除,并且 dirty mapnil,或 dirty[k] 指向该 entry。(Store)
  3. p != nilkey 正常,返回其值。(Load)

Delete 源码剖析

Delete 方法实际上只是 LoadAndDelete 的 wrapper 函数,所以我们看 LoadAndDelete 就够了。 删除操作在 sync.Map 中是一个很简单的操作,如果在 read map 中找到了要删除的 key, 那么我们只需要将其设置为 nil 就可以了。虽然它是一个写操作,但是依然不需要加锁。

如果在 read map 中找到了 key,则可以不加锁也把它删除。因为 sync.Map 中的删除只是一个标记。

例外的情况是,它在 read map 中找不到,然后就需要加锁,然后做 double checking,然后再去 dirty map 中查找了。

LoadAndDelete 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// LoadAndDelete 删除键的值,返回以前的值(如果有)。
// loaded 报告 key 是否存在。
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
// 获取 read map
read := m.loadReadOnly()
// 从 read map 查找 key
e, ok := read.m[key]
if !ok && read.amended { // read map 找不到那个 key,需要继续从 dirty map 中查找
m.mu.Lock() // 加锁
read = m.loadReadOnly() // double checking
e, ok = read.m[key]
if !ok && read.amended { // 需要继续从 dirty map 中查找
e, ok = m.dirty[key] // 从 dirty map 中删除 key
delete(m.dirty, key) // 直接做删除 key 的操作
// 累加未命中 read map 的次数
m.missLocked()
}
m.mu.Unlock()
}
if ok { // key 存在,做删除操作(设置 entry 为 nil 状态)
return e.delete()
}
// key 找不到,不需要做删除操作
return nil, false
}

删除的操作会有两种情况:

  • 存在于 read map 中,则直接删除。(设置 entry 指针为 nil,但是不会删除 read map 中的 key
  • 只存在于 dirty map 中,则直接删除。这种情况下,会删除 dirty map 中的 key

LoadAndDelete 图示

sync_map_14

LoadAndDelete 工作流程

  1. read map 中查找 key,如果找到了,那么直接删除 key(将 entry 的指针设置为 nil),并返回 value
  2. 如果 read map 中没有找到 key,并且 read.amendedtrue,那么就需要加锁,然后做 double checking
  3. 加锁后在 read map 依然找不到,并且 read.amendedtrue,那么就需要从 dirty map 中查找 key
  4. 同时在临界区内直接执行 delete 操作,将 keydirty map 中删除。同时累加 misses 次数。
  5. 最后,如果找到了 key 对应的 entry,则将其删除(设置 entry 指针为 nil),并返回 value

Range 源码剖析

Range 方法的作用是遍历 sync.Map 中的所有 keyvalue,它接受一个函数作为参数,如果这个函数返回 false,那么就会停止遍历。

Range 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Range 依次为映射中存在的每个键和值调用 f。 如果 f 返回 false,则 range 停止迭代。
func (m *Map) Range(f func(key, value any) bool) {
// 我们需要能够遍历在调用 Range 开始时已经存在的所有键。
read := m.loadReadOnly()
if read.amended {
// dirty map 中包含了 read map 中没有的 key
m.mu.Lock()
read = m.loadReadOnly()
if read.amended {
// 使用 m.dirty 中的数据覆盖 m.read 中的数据
read = readOnly{m: m.dirty}
m.read.Store(&read)
// 重置 dirty map
m.dirty = nil
// 重置 misses
m.misses = 0
}
m.mu.Unlock()
}

// 所有的 key 都在 read map 中了,遍历 read map 即可
for k, e := range read.m {
v, ok := e.load()
if !ok { // 已经被删除
continue
}
if !f(k, v) { // f 可以返回一个 bool 值,如果返回 false,那么就停止遍历
break
}
}
}

Range 图示

sync_map_15

Range 遍历的时候,只会遍历 read map 中的 key。如果 read.amendedtrue,那么就需要加锁,然后做 double checking, 如果二次检查 read.amended 还是 true,那么就需要将 dirty map 中的数据覆盖到 read map 中。

Range 工作流程

  1. 为了保证能遍历 sync.Map 中所有的 key,需要判断 read.amended 是否为 true
  2. 如果为 true,说明只有 dirty map 中包含了所有的 key,那么就需要将 dirty map 转换为 read map。(这样的好处是,可以在遍历过程中,不需要加锁)
  3. 然后开始遍历,遍历的时候只需要遍历 read map 即可,因为这个时候 read map 中包含了所有的 key
  4. 遍历过程中,如果发现 key 已经被删除,则直接跳过。否则将 keyvalue 传递给 f 函数,如果 f 函数返回 false,那么就停止遍历。

CompareAndSwap 源码剖析

CompareAndSwap 方法的作用是比较 key 对应的 value 是否为 old,如果是,则将 key 对应的 value 设置为 new

CompareAndSwap 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 如果映射中存储的值等于旧值,则 CompareAndSwap 会交换 key 的旧值和新值
// 旧值必须是可比较的类型。
func (m *Map) CompareAndSwap(key, old, new any) bool {
// 获取 read map
read := m.loadReadOnly()
// 从 read map 读取 key 对应的 value
if e, ok := read.m[key]; ok {
// 在 read map 中找到了,进行 CAS 操作
return e.tryCompareAndSwap(old, new)
} else if !read.amended {
// 在 dirty map 也没有,返回 false
return false
}

// 加锁
m.mu.Lock()
defer m.mu.Unlock()
read = m.loadReadOnly()
swapped := false
if e, ok := read.m[key]; ok { // double checking
// 在 read map 中找到了,进行 CAS 操作
swapped = e.tryCompareAndSwap(old, new)
} else if e, ok := m.dirty[key]; ok {
// 在 dirty map 中找到了,进行 CAS 操作
swapped = e.tryCompareAndSwap(old, new)
// 累加 misses 次数
m.missLocked()
}
return swapped
}

CompareAndSwap 图示

sync_map_16

其实到这里,我们应该发现了,其实 sync.Map 的大多数方法的实现都是先从 read map 中读取,如果没有找到,那么就从 dirty map 中读取。 只是从 read map 中读取的时候,需要加锁,然后做 double checking

CompareAndSwap 工作流程

  1. 首先从 read map 中读取 key 对应的 value。如果找到则进行 CAS 操作,如果没有找到,那么就需要加锁,然后做 double checking
  2. 如果还是没找到。则从 dirty map 中查找,找到则做 CAS 操作,然后累加 misses 次数。
  3. 如果还是没找到,那么就返回 false

CompareAndDelete 源码剖析

CompareAndDelete 方法的作用是比较 key 对应的 value 是否为 old,如果是,则将 key 对应的 value 删除。

CompareAndDelete 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 如果 key 的值等于 old,CompareAndDelete 会删除它的条目。
// 旧值必须是可比较的类型。
//
// 如果 map 中的 key 的值不等于 old,则 CompareAndDelete 返回 false(即使旧值是 nil 接口值)。
func (m *Map) CompareAndDelete(key, old any) (deleted bool) {
// 获取 read map
read := m.loadReadOnly()
e, ok := read.m[key]
// read map 中不存在这个 key,并且 dirty map 中包含了一些 read map 中没有的 key
if !ok && read.amended {
// 加锁
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
// double checking
if !ok && read.amended { // dirty map 中包含 read map 中不存在的 key
e, ok = m.dirty[key]
// 累加 misses 次数
m.missLocked()
}
m.mu.Unlock()
}

// 如果 key 存在,并且其值等于 old,则将其删除。
for ok {
p := e.p.Load()
// 已经被删除,或者值不等于 old,返回 false,表示删除失败
if p == nil || p == expunged || *p != old {
return false
}
// 将其删除(本质上是一个 CAS 操作,将其状态修改为了 nil)
if e.p.CompareAndSwap(p, nil) {
return true
}
}
// key 找不到,返回 false
return false
}

CompareAndDelete 图示

sync_map_17

CompareAndDelete 工作流程

  1. 首先从 read map 中读取 key 对应的 value。如果找到则进行 CAS 操作,如果没有找到,那么就需要加锁,然后做 double checking
  2. 如果还是没找到。并且 dirty map 中包含了部分 read map 中不存在的 key,则从 dirty map 中查找,找到则做 CAS 操作,然后累加 misses 次数。
  3. 如果找到了 key,会通过原子操作读取其之前的值。如果发现它已经被删除或者旧值不等于 old,则返回 false。否则通过 CAS 操作将其删除,然后返回 true
  4. 如果没有找到 key,则返回 false

entry 的一些说明

entry 这个结构体是 sync.Map 中实际保存值的结构体,它保存了指向了 key 对应值的指针。

在上面阅读代码的过程中,我们发现,entry 中有很多方法使用了 try 前缀,比如 trySwap, tryLoadOrStore 等。对于这类方法,我们需要知道的是:

  1. 它并不保证操作一定成功,因为一些写操作是不需要持有互斥锁就可以进行的(比如删除操作,只是一个原子操作,将 entry 指向了 nil)。
  2. 这类方法里面,有一个 for 循环,来进行多次尝试,直到操作成功,又或者发现 entry 已经被删除的时候就返回。类似自旋锁。
  3. 这类方法里面对 entry 状态的修改是通过 CAS 操作来实现的。

sync.Map 源码总结

一顿源码看下来,我们不难发现,sync.Map 的大部分方法整体处理流程上是非常相似的,都是先从 read map 中读取,如果没有找到,那么就需要加锁,然后做 double checking。如果还是没找到,那么就从 dirty map 中查找,如果还是没找到,那么就返回 false

这样做的目的都是在尽量地减少锁的占用,从而获得更好的性能。

同时,如果在 dirty map 中查找的次数多了,会触发 dirty map 转换为 read map 的操作流程,这样一来,下一次搜索同样的 key 就不再需要加锁了。

最后一个关键的点是,在 sync.Map 中没有被锁保护的地方,都是通过原子操作来实现的,这样一来,就可以保证在多核 CPU 上的并发安全。

总结

  • sync.Map 中的 key 有两份,一份在 read map 中,一份在 dirty map 中。read map 中的 key 是不可变的,而 dirty map 中的 key 是可变的。
  • sync.Map 中的大多数操作的操作流程如下:
    • 首先从 read map 中读取 key 对应的 value。找到则做相应操作。
    • 如果没找到,则加锁,再做一次 double checking。找到则做相应操作。
    • 如果还是没找到,那么就从 dirty map 中查找,找到则做相应操作。
    • dirty map 找到的时候,需要累加 misses 次数,如果 misses 次数超过了 dirty map 的大小,那么就会触发 dirty map 转换为 read map 的操作流程。
  • sync.Map 中的 read mapdirty map 中相同的 key 指向了同一个 value(是一个 entry 结构体实例)。
  • entry 有三种状态:
    • nil: 表示 key 已被删除。
    • expunged: 表示 key 已被删除,并且 dirty map 中没有这个 key,这个 key 只存在于 read map 中。
    • *v: 表示一个指向具体值的指针,是正常状态。
  • sync.Map 中的大部分方法都是通过原子操作来实现的,这样一来,就可以保证在多核 CPU 上的并发安全。就算没有在锁保护的临界区内,这种操作依然可以保证对 map 的操作不会出现错乱的情况。
  • read map 中有一个字段标识了是否 dirty map 中存在部分 read map 中不存在的 key。这样一来,如果在 read map 中找不到 key 的时候,就可以先判断一下 read.amended 是否为 true,如果是 true,才需要进行加锁,然后再去 dirty map 中查找。这样一来,就可以减少加锁的次数,从而获得更好的性能。
  • dirty mapread map 之间是会相互转换:
    • dirty map 中查找 key 的次数超过了 dirty map 的大小,就会触发 dirty map 转换为 read map 的操作流程。
    • 需要写入新的 key 的时候,如果 dirty mapnil,那么会将 read map 中未删除的 key 写入到一个新创建的 dirty map 中。
  • sync.Map 性能更好的原因:尽量减少了加锁的次数,很多地方使用原子操作来保证并发安全。(如果我们的业务场景是写多读少,那么这一点可能就不成立了。)

我们知道,go 里面提供了 map 这种类型让我们可以存储键值对数据,但是如果我们在并发的情况下使用 map 的话,就会发现它是不支持并发地进行读写的(会报错)。 在这种情况下,我们可以使用 sync.Mutex 来保证并发安全,但是这样会导致我们在读写的时候,都需要加锁,这样就会导致性能的下降。 除了使用互斥锁这种相对低效的方式,我们还可以使用 sync.Map 来保证并发安全,它在某些场景下有比使用 sync.Mutex 更高的性能。 本文就来探讨一下 sync.Map 中的一些大家比较感兴趣的问题,比如为什么有了 map 还要 sync.Map?它为什么快?sync.Map 的适用场景(注意:不是所有情况下都快。)等。

关于 sync.Map 的设计与实现原理,会在下一篇中再做讲解。

map 在并发下的问题

如果我们看过 map 的源码,就会发现其中有不少会引起 fatal 错误的地方,比如 mapaccess1(从 map 中读取 key 的函数)里面,如果发现正在写 map,则会有 fatal 错误。 (如果还没看过,可以跟着这篇 《go map 设计与实现》 看一下)

1
2
3
if h.flags&hashWriting != 0 {
fatal("concurrent map read and map write")
}

map 并发读写异常的例子

下面是一个实际使用中的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var m = make(map[int]int)

// 往 map 写 key 的协程
go func() {
// 往 map 写入数据
for i := 0; i < 10000; i++ {
m[i] = i
}
}()

// 从 map 读取 key 的协程
go func() {
// 从 map 读取数据
for i := 10000; i > 0; i-- {
_ = m[i]
}
}()

// 等待两个协程执行完毕
time.Sleep(time.Second)

这会导致报错:

1
fatal error: concurrent map read and map write

这是因为我们同时对 map 进行读写,而 map 不支持并发读写,所以会报错。如果 map 允许并发读写,那么可能在我们使用的时候会有很多错乱的情况出现。 (具体如何错乱,我们可以对比多线程的场景思考一下,本文不展开了)。

使用 sync.Mutex 保证并发安全

对于 map 并发读写报错的问题,其中一种解决方案就是使用 sync.Mutex 来保证并发安全, 但是这样会导致我们在读写的时候,都需要加锁,这样就会导致性能的下降。

使用 sync.Mutex 来保证并发安全,上面的代码可以改成下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var m = make(map[int]int)
// 互斥锁
var mu sync.Mutex

// 写 map 的协程
go func() {
for i := 0; i < 10000; i++ {
mu.Lock() // 写 map,加互斥锁
m[i] = i
mu.Unlock()
}
}()

// 读 map 的协程序
go func() {
for i := 10000; i > 0; i-- {
mu.Lock() // 读 map,加互斥锁
_ = m[i]
mu.Unlock()
}
}()

time.Sleep(time.Second)

这样就不会报错了,但是性能会有所下降,因为我们在读写的时候都需要加锁。(如果需要更高性能,可以继续读下去,不要急着使用 sync.Mutex

sync.Mutex 的常见的用法是在结构体中嵌入 sync.Mutex,而不是定义独立的两个变量。

使用 sync.RWMutex 保证并发安全

在上一小节中,我们使用了 sync.Mutex 来保证并发安全,但是在读和写的时候我们都需要加互斥锁。 这就意味着,就算多个协程进行并发读,也需要等待锁。 但是互斥锁的粒度太大了,但实际上,并发读是没有什么太大问题的,应该被允许才对,如果我们允许并发读,那么就可以提高性能

当然 go 的开发者也考虑到了这一点,所以在 sync 包中提供了 sync.RWMutex,这个锁可以允许进行并发读,但是写的时候还是需要等待锁。 也就是说,一个协程在持有写锁的时候,其他协程是既不能读也不能写的,只能等待写锁释放才能进行读写

使用 sync.RWMutex 来保证并发安全,我们可以改成下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
var m = make(map[int]int)
// 读写锁(允许并发读,写的时候是互斥的)
var mu sync.RWMutex

// 写入 map 的协程
go func() {
for i := 0; i < 10000; i++ {
// 写入的时候需要加锁
mu.Lock()
m[i] = i
mu.Unlock()
}
}()

// 读取 map 的协程
go func() {
for i := 10000; i > 0; i-- {
// 读取的时候需要加锁,但是这个锁是读锁
// 多个协程可以同时使用 RLock 而不需要等待
mu.RLock()
_ = m[i]
mu.RUnlock()
}
}()

// 另外一个读取 map 的协程
go func() {
for i := 20000; i > 10000; i-- {
// 读取的时候需要加锁,但是这个锁是读锁
// 多个协程可以同时使用 RLock 而不需要等待
mu.RLock()
_ = m[i]
mu.RUnlock()
}
}()

time.Sleep(time.Second)

这样就不会报错了,而且性能也提高了,因为我们在读的时候,不需要等待锁。

说明:

  • 多个协程可以同时使用 RLock 而不需要等待,这是读锁。
  • 只有一个协程可以使用 Lock,这是写锁,有写锁的时候,其他协程不能读也不能写。
  • 持有写锁的协程,可以使用 Unlock 来释放锁。
  • 写锁释放之后,其他协程才能获取到锁(读锁或者写锁)。

也就是说,使用 sync.RWMutex 的时候,读操作是可以并发执行的,但是写操作是互斥的。 这样一来,相比 sync.Mutex 来说等待锁的次数就少了,自然也就能获得更好的性能了。

gin 框架里面就使用了 sync.RWMutex 来保证 Keys 读写操作的并发安全。

有了读写锁为什么还要有 sync.Map?

通过上面的内容,我们知道了,有下面两种方式可以保证并发安全:

  • 使用 sync.Mutex,但是这样的话,读写都是互斥的,性能不好。
  • 使用 sync.RWMutex,可以并发读,但是写的时候是互斥的,性能相对 sync.Mutex 要好一些。

但是就算我们使用了 sync.RWMutex,也还是有一些锁的开销。那么我们能不能再优化一下呢?答案是可以的。那就是使用 sync.Map

sync.Map 在锁的基础上做了进一步优化,在一些场景下使用原子操作来保证并发安全,性能更好。

使用原子操作替代读锁

但是就算使用 sync.RWMutex,读操作依然还有锁的开销,那么有没有更好的方式呢? 答案是有的,就是使用原子操作来替代读锁。

举一个很常见的例子就是多个协程同时读取一个变量,然后对这个变量进行累加操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var a int32

var wg sync.WaitGroup
wg.Add(2)

go func() {
for i := 0; i < 10000; i++ {
a++
}
wg.Done()
}()

go func() {
for i := 0; i < 10000; i++ {
a++
}
wg.Done()
}()

wg.Wait()

// a 期望结果应该是 20000才对。
fmt.Println(a) // 实际:17089,而且每次都不一样

这个例子中,我们期望的结果是 a 的值是 20000,但是实际上,每次运行的结果都不一样,而且都不会等于 20000。 其中很简单粗暴的一种解决方法是加锁,但是这样的话,性能就不好了,但是我们可以使用原子操作来解决这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var a atomic.Int32

var wg sync.WaitGroup
wg.Add(2)

go func() {
for i := 0; i < 10000; i++ {
a.Add(1)
}
wg.Done()
}()

go func() {
for i := 0; i < 10000; i++ {
a.Add(1)
}
wg.Done()
}()

wg.Wait()

fmt.Println(a.Load()) // 20000

锁跟原子操作的性能差多少?

我们来看一下,使用锁和原子操作的性能差多少:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func BenchmarkMutexAdd(b *testing.B) {
var a int32
var mu sync.Mutex

for i := 0; i < b.N; i++ {
mu.Lock()
a++
mu.Unlock()
}
}

func BenchmarkAtomicAdd(b *testing.B) {
var a atomic.Int32
for i := 0; i < b.N; i++ {
a.Add(1)
}
}

结果:

1
2
BenchmarkMutexAdd-12     	100000000	        10.07 ns/op
BenchmarkAtomicAdd-12 205196968 5.847 ns/op

我们可以看到,使用原子操作的性能比使用锁的性能要好一些。

也许我们会觉得上面这个例子是写操作,那么读操作呢?我们来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func BenchmarkMutex(b *testing.B) {
var mu sync.RWMutex

for i := 0; i < b.N; i++ {
mu.RLock()
mu.RUnlock()
}
}

func BenchmarkAtomic(b *testing.B) {
var a atomic.Int32
for i := 0; i < b.N; i++ {
_ = a.Load()
}
}

结果:

1
2
BenchmarkMutex-12     	100000000	        10.12 ns/op
BenchmarkAtomic-12 1000000000 0.3133 ns/op

我们可以看到,使用原子操作的性能比使用锁的性能要好很多。而且在 BenchmarkMutex 里面甚至还没有做读取数据的操作。

sync.Map 里面的原子操作

sync.Map 里面相比 sync.RWMutex,性能更好的原因就是使用了原子操作。 在我们从 sync.Map 里面读取数据的时候,会先使用一个原子 Load 操作来读取 sync.Map 里面的 key(从 read 中读取)。 注意:这里拿到的是 key 的一份快照,我们对其进行读操作的时候也可以同时往 sync.Map 中写入新的 key,这是保证它高性能的一个很关键的设计(类似读写分离)。

sync.Map 里面的 Load 方法里面就包含了上述的流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Load 方法从 sync.Map 里面读取数据。
func (m *Map) Load(key any) (value any, ok bool) {
// 先从只读 map 里面读取数据。
// 这一步是不需要锁的,只有一个原子操作。
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended { // 如果没有找到,并且 dirty 里面有一些 read 中没有的 key,那么就需要从 dirty 里面读取数据。
// 这里才需要锁
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}

// key 不存在
if !ok {
return nil, false
}
// 使用原子操作读取
return e.Load()
}

上面的代码我们可能还看不懂,但是没关系,这里我们只需要知道的是,从 sync.Map 读取数据的时候,会先做原子操作,如果没找到,再进行加锁操作,这样就减少了使用锁的频率了,自然也就可以获得更好的性能(但要注意的是并不是所有情况下都能获得更好的性能)。至于具体实现,在下一篇文章中会进行更加详细的分析。

也就是说,sync.Map 之所以更快,是因为相比 RWMutex,进一步减少了锁的使用,而这也就是 sync.Map 存在的原因了

sync.Map 的基本用法

现在我们知道了,sync.Map 里面是利用了原子操作来减少锁的使用。但是我们好像连 sync.Map 的一些基本操作都还不了解,现在就让我们再来看看 sync.Map 的基本用法。

sync.Map 的使用还是挺简单的,map 中有的操作,在 sync.Map 都有,只不过区别是,在 sync.Map 中,所有的操作都需要通过调用其方法来进行。 sync.Map 里面几个常用的方法有(CRUD):

  • Store:我们新增或者修改数据的时候,都可以使用 Store 方法。
  • Load:读取数据的方法。
  • Range:遍历数据的方法。
  • Delete:删除数据的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var m sync.Map

// 写入/修改
m.Store("foo", 1)

// 读取
fmt.Println(m.Load("foo")) // 1 true

// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Println(key, value) // foo 1
return true
})

// 删除
m.Delete("foo")
fmt.Println(m.Load("foo")) // nil false

注意:在 sync.Map 中,keyvalue 都是 interface{} 类型的,也就是说,我们可以使用任意类型的 keyvalue。 而不像 map,只能存在一种类型的 keyvalue。从这个角度来看,它的类型类似于 map[any]any

另外一个需要注意的是,Range 方法的参数是一个函数,这个函数如果返回 false,那么遍历就会停止。

sync.Map 的使用场景

sync.Map 源码中,已经告诉了我们 sync.Map 的使用场景:

1
2
3
4
5
The Map type is optimized for two common use cases: (1) when the entry for a given
key is only ever written once but read many times, as in caches that only grow,
or (2) when multiple goroutines read, write, and overwrite entries for disjoint
sets of keys. In these two cases, use of a Map may significantly reduce lock
contention compared to a Go map paired with a separate Mutex or RWMutex.

翻译过来就是,Map 类型针对两种常见用例进行了优化:

  • 当给定 key 的条目只写入一次但读取多次时,如在只会增长的缓存中。(读多写少)
  • 当多个 goroutine 读取、写入和覆盖不相交的键集的条目。(不同 goroutine 操作不同的 key)

在这两种情况下,与 Go map 与单独的 MutexRWMutex 配对相比,使用 sync.Map 可以显著减少锁竞争(很多时候只需要原子操作就可以)。

总结

  • 普通的 map 不支持并发读写。
  • 有以下两种方式可以实现 map 的并发读写:
    • 使用 sync.Mutex 互斥锁。读和写的时候都使用互斥锁,性能相比 sync.RWMutex 会差一些。
    • 使用 sync.RWMutex 读写锁。读的锁是可以共享的,但是写锁是独占的。性能相比 sync.Mutex 会好一些。
  • sync.Map 里面会先进行原子操作来读取 key,如果读取不到的时候,才会需要加锁。所以性能相比 sync.Mutexsync.RWMutex 会好一些。
  • sync.Map 里面几个常用的方法有(CRUD):
    • Store:我们新增或者修改数据的时候,都可以使用 Store 方法。
    • Load:读取数据的方法。
    • Range:遍历数据的方法。
    • Delete:删除数据的方法。
  • sync.Map 的使用场景,sync.Map 针对以下两种场景做了优化:
    • key 只会写入一次,但是会被读取多次的场景。
    • 多个 goroutine 读取、写入和覆盖不相交的键集的条目。
0%