0%

在 go 的标准库中,提供了 sync.Cond 这个并发原语,让我们可以实现多个 goroutine 等待某一条件满足之后再继续执行。 它需要配合 sync.Mutex 一起使用,因为 CondWait 方法需要在 Mutex 的保护下才能正常工作。 对于条件变量,可能大多数人只是知道它的存在,但是用到它的估计寥寥无几,因为很多并发场景的处理都能使用 chan 来实现, 而且 chan 的使用也更加简单。 但是在某些场景下,Cond 可能是最好的选择,本文就来探讨一下 Cond 的使用场景,基本用法,以及它的实现原理。

sync.Cond 是什么?

sync.Cond 表示的是条件变量,它是一种同步机制,用来协调多个 goroutine 之间的同步,当共享资源的状态发生变化的时候, 可以通过条件变量来通知所有等待的 goroutine 去重新获取共享资源。

适用场景

在实际使用中,我们可能会有多个 goroutine 在执行的过程中,由于某一条件不满足而阻塞的情况。 这个时候,我们就可以使用条件变量来实现 goroutine 之间的同步。比如,我们有一个 goroutine 用来获取数据, 但是可能会比较耗时,这个时候,我们就可以使用条件变量来实现 goroutine 之间的同步, 当数据准备好之后,就可以通过条件变量来通知所有等待的 goroutine 去重新获取共享资源。

sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候, 它可以用来通知所有等待的 goroutine 去重新获取共享资源。

sync.Cond 的基本用法

sync.Cond 的基本用法非常简单,我们只需要通过 sync.NewCond 方法来创建一个 Cond 实例, 然后通过 Wait 方法来等待条件满足,通过 Signal 或者 Broadcast 方法来通知所有等待的 goroutine 去重新获取共享资源。

NewCond 创建实例

sync.NewCond 方法用来创建一个 Cond 实例,它的参数是一个 Locker 接口,我们可以传入一个 Mutex 或者 RWMutex 实例。 这个条件变量的 Locker 接口就是用来保护共享资源的。

Wait 等待条件满足

Wait 方法用来等待条件满足,它会先释放 Cond 的锁(Cond.L),然后阻塞当前 goroutine(实际调用的是 goparkunlock),直到被 Signal 或者 Broadcast 唤醒。

它做了如下几件事情:

  1. 释放 Cond 的锁(Cond.L),然后阻塞当前 goroutine。(所以,使用之前需要先锁定)
  2. Signal 或者 Broadcast 唤醒之后,会重新获取 Cond 的锁(Cond.L)。
  3. 之后,就返回到 goroutine 阻塞的地方继续执行。

Signal 通知一个等待的 goroutine

Signal 方法用来通知一个等待的 goroutine,它会唤醒一个等待的 goroutine,然后继续执行当前 goroutine。 如果没有等待的 goroutine,则不会有任何操作。

Broadcast 通知所有等待的 goroutine

Broadcast 方法用来通知所有等待的 goroutine,它会唤醒所有等待的 goroutine,然后继续执行当前 goroutine。 如果没有等待的 goroutine,则不会有任何操作。

sync.Cond 使用实例

下面我们通过一个实例来看一下 sync.Cond 的使用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package cond

import (
"fmt"
"sync"
"testing"
"time"
)

var done bool
var data string

func write(c *sync.Cond) {
fmt.Println("writing.")
// 让 reader 先获取锁,模拟条件不满足然后 wait 的情况
time.Sleep(time.Millisecond * 10)
c.L.Lock()
// 模拟耗时的写操作
time.Sleep(time.Millisecond * 50)
data = "hello world"
done = true
fmt.Println("writing done.")
c.L.Unlock()
c.Broadcast()
}

func read(c *sync.Cond) {
fmt.Println("reading")
c.L.Lock()
for !done {
fmt.Println("reader wait.")
c.Wait()
}
fmt.Println("read done.")
fmt.Println("data:", data)
defer c.L.Unlock()
}

func TestCond(t *testing.T) {
var c = sync.NewCond(&sync.Mutex{})

go read(c) // 读操作
go read(c) // 读操作
go write(c) // 写操作

time.Sleep(time.Millisecond * 100) // 等待操作完成
}

输出:

1
2
3
4
5
6
7
8
9
10
reading
reader wait. // 还没获取完数据,需要等待
writing.
reading
reader wait.
writing done. // 获取完数据了,通知所有等待的 reader
read done. // 读取到数据了
data: hello world // 输出读取到的数据
read done.
data: hello world

这个例子可以粗略地用下图来表示:

cond_1

说明:

  • read1reader2 表示两个 goroutine,它们都会调用 read 函数。
  • donefalse 的时候,reader1reader2 都会调用 c.Wait() 函数,然后阻塞等待。
  • write 表示一个 goroutine,它会调用 write 函数。
  • write 函数中,获取完数据之后,会将 done 设置为 true,然后调用 c.Broadcast() 函数,通知所有等待的 reader 去重新获取共享资源。
  • reader1reader2 在解除阻塞状态后,都会重新获取共享资源,然后输出读取到的数据。

在这个例子中,done 的功能是标记,用来表示共享资源是否已经获取完毕,如果没有获取完毕,那么 reader 就会阻塞等待。

为什么要用 sync.Cond?

在文章开头,我们说了,很多并发编程的问题都可以通过 channel 来解决。 同样的,在上面提到的 sync.Cond 的使用场景,使用 channel 也是可以实现的, 我们只要 close(ch) 来关闭 channel 就可以实现通知多个等待的协程了。

那么为什么还要用 sync.Cond 呢? 主要原因是,sync.Cond 可以重复地进行 Wait()Signal()Broadcast() 操作, 但是,如果想通过关闭 chan 来实现这个功能的话,那就只能通知一次了。 因为 channel 只能关闭一次,关闭一个已经关闭的 channel 会导致程序 panic。

使用 channel 的另外一种方式是,记录 reader 的数量,然后通过往 channel 中发送多次数据来实现通知多个 reader。 但是这样一来代码就会复杂很多,从另一个角度说,出错的概率大了很多。

close channel 广播实例

下面的例子模拟了使用 close(chan) 来实现 sync.Cond 中那种广播功能,但是只能通知一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package close_chan

import (
"fmt"
"testing"
"time"
)

var data string

func read(c <-chan struct{}) {
fmt.Println("reading.")

// 从 chan 接收数据,如果 chan 中没有数据,会阻塞。
// 如果能接收到数据,或者 chan 被关闭,会解除阻塞状态。
<-c

fmt.Println("data:", data)
}

func write(c chan struct{}) {
fmt.Println("writing.")
// 模拟耗时的写操作
time.Sleep(time.Millisecond * 10)
data = "hello world"
fmt.Println("write done.")

// 关闭 chan 的时候,会通知所有的 reader
// 所有等待从 chan 接收数据的 goroutine 都会被唤醒
close(c)
}

func TestCloseChan(t *testing.T) {
ch := make(chan struct{})

go read(ch)
go read(ch)
go write(ch)

// 不能关闭已经关闭的 chan
time.Sleep(time.Millisecond * 20)
// panic: close of closed channel
// 下面这行代码会导致 panic
//go write(ch)

time.Sleep(time.Millisecond * 100)
}

输出:

1
2
3
4
5
6
writing.
reading. // 会阻塞直到写完
reading. // 会阻塞直到写完
write done. // 写完之后,才能读
data: hello world
data: hello world

上面例子的 write 不能多次调用,否则会导致 panic。

sync.Cond 基本原理

go 的 sync.Cond 中维护了一个链表,这个链表记录了所有阻塞的 goroutine,也就是由于调用了 Wait 而阻塞的 goroutine。 而 SignalBroadcast 方法就是用来唤醒这个链表中的 goroutine 的。 Signal 方法只会唤醒链表中的第一个 goroutine,而 Broadcast 方法会唤醒链表中的所有 goroutine

下图是 Signal 方法的效果,可以看到,Signal 方法只会唤醒链表中的第一个 goroutine

cond_2

说明:

  • notifyListsync.Cond 中维护的一个链表,这个链表记录了所有阻塞的 goroutine
  • head 是链表的头节点,tail 是链表的尾节点。
  • Signal 方法只会唤醒链表中的第一个 goroutine

Broadcast 方法会唤醒 notifyList 中的所有 goroutine

sync.Cond 的设计与实现

最后,我们来看一下 sync.Cond 的设计与实现。

sync.Cond 模型

sync.Cond 的模型如下所示:

1
2
3
4
5
6
7
8
9
type Cond struct {
noCopy noCopy

// L is held while observing or changing the condition
L Locker // L 在观察或改变条件时被持有

notify notifyList
checker copyChecker
}

属性说明:

  • noCopy 是一个空结构体,用来检查 sync.Cond 是否被复制。(在编译前通过 go vet 命令来检查)
  • L 是一个 Locker 接口,用来保护条件变量。
  • notify 是一个 notifyList 类型,用来记录所有阻塞的 goroutine
  • checker 是一个 copyChecker 类型,用来检查 sync.Cond 是否被复制。(如果在运行时被复制,会导致 panic

notifyList 结构体

notifyListsync.Cond 中维护的一个链表,这个链表记录了所有因为共享资源还没准备好而阻塞的 goroutine。它的定义如下所示:

1
2
3
4
5
6
7
8
9
type notifyList struct {
wait atomic.Uint32
notify uint32

// 阻塞的 waiter 名单。
lock mutex // 锁
head *sudog // 阻塞的 goroutine 链表(链表头)
tail *sudog // 阻塞的 goroutine 链表(链表尾)
}

属性说明:

  • wait 是下一个 waiter 的编号。它在锁外自动递增。
  • notify 是下一个要通知的 waiter 的编号。它可以在锁外读取,但只能在持有锁的情况下写入。
  • lock 是一个 mutex 类型,用来保护 notifyList
  • head 是一个 sudog 类型,用来记录阻塞的 goroutine 链表的头节点。
  • tail 是一个 sudog 类型,用来记录阻塞的 goroutine 链表的尾节点。

notifyList 的方法说明:

notifyList 中包含了几个操作阻塞的 goroutine 链表的方法。

  • notifyListAdd 方法将 waiter 的编号加 1。
  • notifyListWait 方法将当前的 goroutine 加入到 notifyList 中。(也就是将当前协程挂起)
  • notifyListNotifyOne 方法将 notifyList 中的第一个 goroutine 唤醒。
  • notifyListNotifyAll 方法将 notifyList 中的所有 goroutine 唤醒。
  • notifyListCheck 方法检查 notifyList 的大小是否正确。

sync.Cond 的方法

notifyList 就不细说了,本文重点讲解一下 sync.Cond 的实现。

Wait 方法

Wait 方法用在当条件不满足的时候,将当前运行的协程挂起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *Cond) Wait() {
// 检查是否被复制
c.checker.check()
// 更新 notifyList 中需要等待的 waiter 的数量
// 返回当前需要插入 notifyList 的编号
t := runtime_notifyListAdd(&c.notify)
// 解锁
c.L.Unlock()
// 挂起当前 g,直到被唤醒
runtime_notifyListWait(&c.notify, t)
// 唤醒之后,重新加锁。
// 因为阻塞之前解锁了。
c.L.Lock()
}

对于 Wait 方法,我们需要注意的是,使用之前,我们需要先调用 L.Lock() 方法加锁,然后再调用 Wait 方法,否则会报错。

文档里面的例子:

1
2
3
4
5
6
7
c.L.Lock()
for !condition() {
c.Wait()
}
// ...使用条件...
// 这里是我们在条件满足之后,需要执行的代码。
c.L.Unlock()

好了,问题来了,调用 Wait 方法之前为什么要先加锁呢?

这是因为在我们使用共享资源的时候,可能一些代码是互斥的,所以我们需要加锁。 这样我们就可以保证在我们使用共享资源的时候,不会被其他协程修改。 但是如果因为条件不满足,我们需要等待的话,我们不可能在持有锁的情况下等待, 因为在修改条件的时候,可能也需要加锁,这样就会造成死锁。

另外一个问题是,为什么要使用 for 来检查条件是否满足,而不是使用 if 呢?

这是因为在我们调用 Wait 方法之后,可能会有其他协程唤醒我们,但是条件并没有满足, 这个时候依然是需要继续 Wait 的。

Signal 方法

Signal 方法用在当条件满足的时候,将 notifyList 中的第一个 goroutine 唤醒。

1
2
3
4
5
6
func (c *Cond) Signal() {
// 检查 sync.Cond 是否被复制了
c.checker.check()
// 唤醒 notifyList 中的第一个 goroutine
runtime_notifyListNotifyOne(&c.notify)
}

Broadcast 方法

Broadcast 方法用在当条件满足的时候,将 notifyList 中的所有 goroutine 唤醒。

1
2
3
4
5
6
func (c *Cond) Broadcast() {
// 检查 sync.Cond 是否被复制了
c.checker.check()
// 唤醒 notifyList 中的所有 goroutine
runtime_notifyListNotifyAll(&c.notify)
}

copyChecker 结构体

copyChecker 结构体用来检查 sync.Cond 是否被复制。它实际上只是一个 uintptr 类型的值。

1
2
3
4
5
6
7
8
9
10
type copyChecker uintptr

// check 方法检查 copyChecker 是否被复制了。
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}

copyChecker 的值只有两种可能:

  1. 0,表示还没有调用过 Wait, SignalBroadcast 方法。
  2. uintptr(unsafe.Pointer(&copyChecker)),表示已经调用过 Wait, SignalBroadcast 方法。在这几个方法里面会调用 check 方法,所以 copyChecker 的值会被修改。

所以如果 copyChecker 的值不是 0,也不是 uintptr(unsafe.Pointer(&copyChecker))(也就是最初的 copyChecker 的内存地址),则表示 copyChecker 被复制了。

需要注意的是,这个方法在调用 CompareAndSwapUintptr 还会检查一下,这是因为有可能会并发调用 CompareAndSwapUintptr, 如果另外一个协程调用了 CompareAndSwapUintptr 并且成功了,那么当前协程的这个 CompareAndSwapUintptr 调用会返回 false, 这个时候就需要检查是否是因为另外一个协程调用了 CompareAndSwapUintptr 而导致的,如果是的话,就不会 panic

为什么 sync.Cond 不能被复制?

从上一小节中我们可以看到,sync.Cond 其实是不允许被复制的,但是如果是在调用 Wait, SignalBroadcast 方法之前复制,那倒是没关系。

这是因为 sync.Cond 中维护了一个阻塞的 goroutine 列表。如果 sync.Cond 被复制了,那么这个列表就会被复制,这样就会导致两个 sync.Cond 都包含了这个列表;但是我们唤醒的时候,只会有其中一个 sync.Cond 被唤醒,另外一个 sync.Cond 就会一直阻塞。

总结

  • sync.Cond 是一个条件变量,它可以用来协调多个 goroutine 之间的同步,当条件满足的时候,去通知那些因为条件不满足被阻塞的 goroutine 继续执行。
  • sync.Cond 的接口比较简单,只有 Wait, SignalBroadcast 三个方法。
    • Wait 方法用来阻塞当前 goroutine,直到条件满足。调用 Wait 方法之前,需要先调用 L.Lock 方法加锁。
    • Signal 方法用来唤醒 notifyList 中的第一个 goroutine
    • Broadcast 方法用来唤醒 notifyList 中的所有 goroutine
  • sync.Cond 的实现也比较简单,它的核心就是 notifyList,它是一个链表,用来保存所有因为条件不满足而被阻塞的 goroutine
  • 用关闭 channel 的方式也可以实现类似的广播功能,但是有个问题是 channel 不能被重复关闭,所以这种方式无法被多次使用。也就是说使用这种方式无法多次广播。
  • 使用 channel 发送通知的方式也是可以的,但是这样实现起来就复杂很多了,就更容易出错了。
  • sync.Cond 中使用 copyChecker 来检查 sync.Cond 是否被复制,如果被复制了,就会 panic。需要注意的是,这里的复制是指调用了 WaitSignalBroadcast 方法之后,sync.Cond 被复制了。在调用这几个方法之前进行复制是没有影响的。

本文基于 Go 1.19

在上一篇文章中(《深入理解 go sync.Map - 基本原理》),我们探讨了 go 中 sync.Map 的一些基本内容,如 map 并发使用下存在的问题,如何解决这些问题等。 我们也知道了 sync.Map 的一些基本操作,但是我们还是不知道 sync.Map 是如何实现的,以及为什么在特定场景下,sync.Mapmap + Mutex/RWMutex 快。 本篇文章就来继续深入探讨 sync.Map,对 sync.Map 的设计与实现进行更加详尽的讲解。

sync.Map 概览

开始之前,我们先来了解一下 sync.Map 的数据结构,以及其一个大概的模型。这对于我们了解 sync.Map 的设计非常有好处。

本文用到的一些名词解析

  • readread map:都是指 sync.Map 中的只读 map,即 sync.Map 中的 m.read
  • dirtydirty map:都是指 sync.Map 中的可写 map,即 sync.Map 中的 m.dirty
  • entrysync.Map 中的 entry,这是保存值的结构体,它是一个原子类型的指针。其中的指针指向 key 对应的值。

sync.Map 的数据结构

sync.Map 的数据结构如下:

readdirtysync.Map 中最关键的两个数据结构,它们之间可以相互转化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 在 sync.Map 中的作用是一个特殊的标记
var expunged = new(any)

// sync.Map
type Map struct {
// 互斥锁
mu sync.Mutex
// 只读 map,用于读操作
read atomic.Pointer[readOnly]
// dirty map,写入操作会先写入 dirty map
dirty map[any]*entry
// 记录需要从 dirty map 中读取 key 的次数。
// 也就是没有在 read map 中找到 key 的次数。
misses int
}

// readOnly 是一个只读的 map
type readOnly struct {
m map[any]*entry // dirty map 中的 key 的一份快照
amended bool // 记录是否在 dirty map 中有部分 read map 中不存在的 key
}

// 实际存储值的结构体。
// p 有三种状态:nil, expunged, 正常状态。
type entry struct {
p atomic.Pointer[any]
}

说明:

  • expunged 是一个特殊的标记,用于表示 entry 中的值已经被删除。并且那个 keydirty map 中已经不存在了。
  • Map 也就是我们使用的 sync.Map,它有一个 mu 互斥锁,用于保护 dirty map
  • Map 中有两个 map,一个是 read map,一个是 dirty map
    • read map 是一个只读的 map,但不是我们在其他地方说的只读。它的只读的含义是,它的 key 是不能增加或者删除的。但是 value 是可以修改的。
    • dirty map 是一个可读写的 map,新增 key 的时候会写入 dirty map
  • misses 是一个 int 类型的变量,用于记录 read map 中没有找到 key 的次数。当 misses 达到一定的值的时候,会将 dirty map 中的 key 同步到 read map 中。
  • readOnly 是一个只读的 map,它的 m 字段是一个 map,用于保存 dirty map 中的 key 的一份快照。readOnly 中的 amended 字段用于记录 dirty map 中是否有 read map 中不存在的 key
  • entry 是一个结构体,它有一个 p 字段,用于保存 key 对应的值。p 字段有三种状态:nilexpunged、正常状态。expunged 是一个特殊的标记,用于表示 key 对应的值已经被删除,并且那个 keydirty map 中已经不存在了。

因为在 sync.Map 中是使用了特殊的标记来表示删除的,也就是不需要使用 delete 函数来删除 key。这样就可以利用到了原子操作了,而不需要加锁。这样就能获得更好的性能了。

sync.Map 的整体模型

上一小节我们已经介绍了 sync.Map 的数据结构,现在让我们来看一下 sync.Map 的整体模型。 它的整体模型如下:

sync_map_1

关键说明:

  • read map 是一个只读的 map,不能往里面添加 key。而 dirty map 是一个可读写的 map,可以往里面添加 key
  • sync.Map 实现中,基本都是会先从 read map 中查找 key,如果没有找到,再从 dirty map 中查找 key。然后根据查找结果来进行后续的操作。
  • 如果 read map 中没有找到 key,需要加锁才能从 dirty map 中查找 key。因为 dirty map 是一个可读写的 map,所以需要加锁来保证并发安全。

这实际上是一种读写分离的理念。

sync.Map 的工作流程

通过看它的数据结构和整体模型,想必我们依然对 sync.Map 感到很陌生。现在再来看看 sync.Map 的工作流程,这样我们就能知道其中一些字段或者结构体的实际作用了。

下面,我们通过一些 map 的常规操作来看一下 sync.Map 的工作流程:

  1. 添加 key:如果是第一次写入 key 的话(假设其值为 value),会先写入 dirty map,在 dirty map 中的 value 是一个指向 entry 结构体的指针。entry 结构体中的 p 字段也是一个指针,它指向了 value 的内存地址。
  2. 读取 key:先从 read 中读取(无锁,原子操作),read 中找不到的时候再去 dirty 中查找(有锁)。
  3. 修改 key:如果 keyread map 中存在的话,会直接修改 key 对应的 value。如果 keyread map 中不存在的话,会去 dirty map 中查找(有锁),如果在 dirty map 中也不存在的话,则修改失败。
  4. 删除 key:如果 keyread map 中存在的话,会将 key 对应的 entry 指针设置为 nil(实际上是打标记而已,并没有删除底层 mapkey)。如果在 read 中找不到,并且 dirty 有部分 read 中不存在的 key 的话,会去 dirty map 中查找(有锁),如果在 dirty map 中也不存在的话,则删除失败。

可能我们看完这一大段说明还是不会太懂,但是没关系,下面对每一个操作都有图,结合我画的图应该可以更好地理解。

深入之前需要了解的一些背景知识

sync.Map 中有一些我们需要有基本了解的背景知识,这里简单说一下。

sync.Map 中,需要读写 dirty map 的时候,都需要加锁,加的锁是 sync.Mutex。对于这把锁,我们需要知道的是: sync.Mutex 是一个互斥锁。当一个 goroutine 获得了 sync.Mutex 的使用权之后(Lock 调用成功),其他的 goroutine 就只能等待,直到该 goroutine 释放了 sync.Mutex(持有锁的 goroutine 使用了 Unlock 释放锁)。

所以,我们在源码中看到 m.mu.Lock() 这行代码的时候,就应该知道,从这一行代码直到 m.mu.Unlock() 调用之前,其他 goroutine 调用 m.mu.Lock() 的时候都会被阻塞。

sync.Map 中,dirty map 的读写都需要加锁,而读 read map 的时候不需要锁的。

原子操作

go 语言中的原子操作是指,不会被打断的操作。也就是说,当一个 goroutine 执行了一个原子操作之后,其他的 goroutine 就不能打断它,直到它执行完毕。 这可以保证我们的一些操作是完整的,比如给一个整数加上一个增量,如果不使用原子操作,而是先取出来再进行加法运算,再写回去这样操作的话, 就会出现问题,因为这个过程有可能被打断,如果另外一个 goroutine 也在进行这个操作的话,就有可能会出现数据错乱的问题。

而原子操作的 Add(比如 atomic.Int32Add 方法)可以在加法过程中不被打断,所以我们可以保证数据的完整性。 这里说的不被打断说的是:这个原子操作完成之前,其他 goroutine 不能操作这个原子类型

除了 Add 方法,atomic 包中还有 LoadStoreSwap 等方法,这些方法都是原子操作,可以保证数据的完整性。

sync.Map 中,对 entry 状态的修改都是通过原子操作实现的。

CAS

CAS 是 Compare And Swap 的缩写,意思是比较并交换。CAS 操作是一种原子操作,它的原理是:当且仅当 内存值 == 预期值 时,才会将 内存值 修改为 新值。 使用代码表示的话,大概如下:

1
2
3
4
5
6
if *addr == old {
*addr = new
return true
}

return false

也就是说:

  • CAS 原子操作会先进行比较,如果 内存值 == 预期值,则执行交换操作,将 内存值 修改为 新值,并返回 true
  • 否则,不执行交换操作,直接返回 false

CAS 如果比较发现相同就会交换,如果不相同就不交换,这个过程是原子的,不会被打断。在 sync.Map 中,修改 entry 的状态的时候,有可能会使用到 CAS。

double-checking(双重检测)

这是一种尽量减少锁占用的策略,在单例模式中可能会用到:

1
2
3
4
5
6
7
8
9
10
11
12
// 第一次检查不使用锁
if instance == nil {
mu.Lock()
defer mu.Unlock()
// 获取到锁后,还要再次检查,
// 因为有可能在等待锁的时候 instance 已经被初始化了
if instance == nil {
instance = new()
}
}

return instance

上面这个例子中,在获取到锁之后,还进行了一次检查,这是因为 mu.Lock() 如果获取不到锁,那么当前 goroutine 就会被挂起,等待锁被释放。 如果在等待锁的过程中,另外一个 goroutine 已经初始化了 instance,那么当前 goroutine 就不需要再初始化了,所以需要再次检查。

如果第二次检查发现 instance 已经被初始化了,那么就不需要再初始化了,直接返回 instance 即可。

sync.Map 中,也有类似的双重检测,比如在 Load 方法中,会先从 read 中获取 entry,如果没有,就会加锁,获取到锁后,再去检查一下 read 中是否有 entry,如果没有,才会从 dirty 中获取 entry。这是因为在等待锁的时候可能有其他 goroutine 已经将 key 放入 read 中了(比如做了 Range 遍历)。

dirty map 和 read map 之间的转换

上面我们说了,写入新的 key 的时候,其实是写入到 dirty 中的,那什么时候会将 key 写入到 read 中呢? 准确来说,sync.Map 是不会往 read map 中写入 key 的,但是可以使用 dirty map 来覆盖 read map

dirty map 转换为 read map

dirty map 转换为 read map 的时机是:

  • missess 的次数达到了 len(dirty) 的时候。这意味着,很多次在 read map 中都找不到 key,这种情况下是需要加锁才能再从 dirty map 中查找的。这种情况下,就会将 dirty map 转换为 read map,这样后续在 read map 中能找到 key 的话就不需要加锁了。
  • 使用 Range 遍历的时候,如果发现 dirty map 中有些 keyread map 中没有,那么就会将 dirty map 转换为 read map。然后遍历的时候就遍历一下 read map 就可以了。(如果 read map 中的 keydirty map 中的 key 完全一致,那直接遍历 read map 就足够了。)
sync_map_2

dirty map 转换为 read map 的操作其实是很简单的,就是使用 dirty map 直接覆盖掉 read map,然后将 dirty map 置为 nil,同时 misses 重置为 0

简单来说,如果因为新增了 key 需要频繁加锁的时候,就会将 dirty map 转换为 read map

read map 转换为 dirty map

read map 转换为 dirty map 的时机是:

  • dirty mapnil 的情况下,需要往 dirty map 中增加新的 key
sync_map_3

read map 转换为 dirty map 的时候,会将 read map 中正常的 key 复制到 dirty map 中。 但是这个操作完了之后,read map 中的那些被删除的 key 占用的空间是还没有被释放的。 那什么时候释放呢?那就是上面说的 dirty map 转换为 read map 的时候。

sync.Map 中 entry 的状态

sync.Map 中,read mapdirty map 中相同 keyentry 都指向了相同的内容(共享的)。 这样一来,我们就不需要维护两份相同的 value 了,这一方面减少了内存使用的同时,也可以保证同一个 key 的数据在 readdirty 中看到都是一致的。 因为我们可以通过原子操作来保证对 entry 的修改是安全的(但是增加 key 依然是需要加锁的)。

entry 的状态有三种:

  • nil:被删除了,read mapdirty map 都有这个 key
  • expunged:被删除了,但是 dirty map 中没有这个 key
  • 正常状态:可以被正常读取。

它们的转换关系如下:

sync_map_4

说明:

  1. key 被删除
  2. dirty mapnil 的时候,需要写入新的 keyread 中被删除的 key 状态会由 nil 修改为 expunged
  3. 被删除的 key,重新写入
  4. read 中被删除的 keydirty map 中不存在的),在再次写入的时候会发生

注意:expunged 和正常状态之间不能直接转换,expungedkey 需要写入的话,需要先修改其状态为 nil。正常状态被删除之后先转换为 nil,然后在创建新的 map 的时候才会转换为正常状态。也就是 1->24->3 这两种转换)

不存在由正常状态转换为 expunged 或者由 expunged 转换为正常状态的情况。

entry 状态存在的意义

entry 的状态存在的意义是什么呢?我们去翻阅源码的时候会发现,其实 sync.Map 在删除的时候, 如果在 read map 中找到了 key,那么删除操作只是将 entry 的状态修改为 nil(通过原子操作修改),并没有真正的删除 key

也就是并不像我们使用普通 map 的时候那种 delete 操作,会将 keymap 中删除。 这样带来的一个好处就是,删除操作我们也不需要加锁了,因为我们只是修改了 entry 的状态,而不是真正的删除 key。 这样就可以获得更好的性能了。

就算转换为了 nil 状态,也依然可以转换为 expunged 或者正常状态,具体看上一个图。

read.amended 的含义

我们往 sync.Map 中写入新的 key 的时候,会先写入 dirty map,但是不会写入 read map。 这样一来,我们在读取的时候就需要注意了,因为我们要查找的 key 是有可能只存在于 dirty map 中的, 那么我们是不是每次在 read map 中找不到的时候都需要先去 dirty map 中查找呢?

答案是否定的。我们从 dirty map 中进行查找是有代价的,因为要加锁。如果不加锁,遇到其他 goroutine 写入 dirty map 的时候就报错了。 针对这种情况,一种比较简单的解决方法是,增加一个标志,记录一下 read mapdirty map 中的 key 是否是完全一致的。 如果是一致的,那么我们就不需要再加锁,然后去 dirty map 中查找了。否则,我们就需要加锁,然后去 dirty map 中查找。

sync.Map 中的 amended 字段就是这里说的标志字段。单单说文字可能有点抽象,我们可以结合下图理解一下:

sync_map_5

read.amended 的含义就是 read mapdirty map 中的 key 是否是完全一致的。如果为 true,说明有些 key 只存在于 dirty map 中。

sync.Map 源码剖析

sync.Map 提供的方法并不多,它能做的操作跟普通的 map 差不多,只是在并发的情况下,它能保证线程安全。 下面是 sync.Map 所能提供的方法:

  • Store/Swap(增/改): 往 sync.Map 中写入新的 key。(Store 实际调用了 Swap 方法)
  • Load(查): 从 sync.Map 中读取 key
  • LoadOrStore(查/增/改): 从 sync.Map 中读取 key,如果不存在,就写入新的 key
  • Delete/LoadAndDelete(删): 从 sync.Map 中删除 key。(Delete 实际调用了 LoadAndDelete 方法)
  • Range: 遍历 sync.Map 中的所有 key

还有两个可能比较少用到的方法:

  • CompareAndDelete: 从 sync.Map 中删除 key,但是只有在 key 的值跟 old 相等的时候才会删除。
  • CompareAndSwap: 从 sync.Map 中写入新的 key,但是只有在 key 的值跟 old 相等的时候才会写入。

接下来我们会从源码的角度来分析一下 sync.Map 的实现。

Store/Swap 源码剖析

Store 实际上是对 Swap 方法的调用,所以我们看 Swap 方法的源码就够了:

Swap 方法的作用是:交换一个 key 的值,并返回之前的值(如果有的话)。 返回值中的 prev 就是之前的值,loaded 表示 key 是否存在。

下面是 Swap 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
// 读取 read map
read := m.loadReadOnly()
// 先从 read map 中读取 key
if e, ok := read.m[key]; ok {
// 在 read map 中读取到了 key
if v, ok := e.trySwap(&value); ok { // ok 表示是否成功交换
// swap 成功
if v == nil { // 之前的值为 nil,表示 key 之前已经被删除的了
return nil, false
} // 之前的值不为 nil,表示存在
return *v, true
}

// 执行到这里表示:
// read map 中存在 key,但是已经被删除。(为 expunged 状态)
}

// read map 中找不到 key,加锁,从 dirty map 中继续找
m.mu.Lock()
// double checking,二次检查,因为有可能等待锁的时候 read map 已经发生了变化
read = m.loadReadOnly()
if e, ok := read.m[key]; ok { // read map 中存在 key
if e.unexpungeLocked() { // 将 entry 由 expunged 状态改为 nil 状态
// key 之前已经被删除了,并且之前 dirty map 中不存在 key,
// 所以这里需要将 key 添加到 dirty map 中。
m.dirty[key] = e
}
// 写入新的值,v 是旧的值
if v := e.swapLocked(&value); v != nil {
// v 不为 nil,表示之前存在
loaded = true
previous = *v
}
} else if e, ok := m.dirty[key]; ok { // read map 中不存在 key,但是 dirty map 中存在 key
// 写入新的值,v 是旧的值
if v := e.swapLocked(&value); v != nil {
// v 不为 nil,表示之前存在
loaded = true
previous = *v
}
} else { // read map 中不存在 key,dirty map 中也不存在 key(需要写入新的 key)
if !read.amended { // dirty map 和 read map 的 key 完全一致)
// 现在要写入新的 key 了,所以这个 amended 状态得修改了。
// 我们正在将第一个新键添加到 dirty map 中。
// 确保它已分配并将 read map 的 amended 标记设置为 true。
m.dirtyLocked()
// amended 设置为 true,因为下面要写入一个在 read map 中不存在的 key
m.read.Store(&readOnly{m: read.m, amended: true})
}
// 新增的 key,dirty map 中不存在,所以直接写入
m.dirty[key] = newEntry(value)
}
// 解锁
m.mu.Unlock()
return previous, loaded
}

Swap/Store 图示

sync_map_6

注意:这里的 read mapdirty map 中都没有包含 entry,我们知道它们中相同的 key 都指向相同的 entry 就可以了。

Swap 的操作流程

  1. read map 中读取 key,如果存在,就直接交换 value,并返回之前的 value
  2. 如果 read map 中不存在 key,就加锁,加锁后,再从 read map 中读取 key,如果存在,就直接交换 value,并返回之前的 value。(double checking
  3. 加锁后,如果在 read map 中依然找不到 key,再从 dirty map 中读取 key,如果存在,就直接交换 value,并返回之前的 value
  4. 如果 read mapdirty map 都不存在 key,就将 key 添加到 dirty map 中,并返回 nil。在这一步中,如果 read mapdirty mapkey 完全一致,就将 read mapamended 状态设置为 true

在第 4 步中,还有一个关键操作就是 dirtyLocked(),这个操作的作用是保证 dirty map 初始化,如果 dirty map 已经初始化,就不会做任何操作。 如果 dirty mapnil,那么会初始化,然后将 read map 中未被删除的 key 添加到 dirty map 中。

dirtyLocked() 源码剖析

dirtyLocked() 的作用是保证 dirty map 初始化,如果 dirty map 已经初始化,就不会做任何操作。

之所以 dirty map 需要初始化,是因为在 dirty map 转换为 read map 的时候,dirty map 会被设置为 nil, 但是新增 key 的时候是要写入到 dirty map 的,所以需要重新初始化。 具体可以看上面的 dirty map 和 read map 的之间的转换 这一节。

dirtyLocked() 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 1. 如果 m.dirty 为 nil,则创建一个新的 dirty map。
// 2. 否则,不做任何操作
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

read := m.loadReadOnly()
// dirty map 初始化
m.dirty = make(map[any]*entry, len(read.m))
// 对于 read map 中的 key,如果不是 expunged,则将其复制到 dirty map 中。
// read map 中 nil 的 key 会被转换为 expunged 状态。
for k, e := range read.m {
// 不是 expunged 的 entry,才会被复制到 dirty map 中。
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}

dirtyLocked() 图示:

sync_map_7

dirtyLocked() 里有个需要注意的地方就是,它会将 read map 中的 nilkey 转换为 expunged 状态。 expunged 状态表明这个 key 只是在 read map 中,而不在 dirty map 中。 做完迁移之后,dirty map 其实就不包含那些被删除的 key 了。

Swap/Store 关键说明

Swap 方法里面其实基本已经包含了 sync.Map 主要设计理念了,下文讲解其他方法的时候,其中一些细节不再做过多的解释了:

  1. sync.Map 在做很多操作的时候,都会先从 read map 中读取,如果 read map 中不存在,再从 dirty map 中读取。
  2. 如果需要从 dirty map 中读取,那么会先加锁,然后再从 dirty map 中读取。
  3. sync.Map 在对 entry 进行操作的时候,都是通过原子操作进行的。(这是因为有些写操作是没有 mu.Lock() 保护的

而对于 dirty mapread map 的转换等只是一些实现细节的上的问题,我们如果了解了它的设计理念,那么就可以很容易的理解它的实现了。

Swap/Store 里的原子操作

这里面用了很多原子操作:

  • m.loadReadOnly(): 读取 read map
  • e.trySwap(&value): 交换 key 的值。key 存在的时候,直接通过原子操作使用新的值覆盖旧的。(如果 key 只存在于 read map 中的话,这个操作会失败。)
  • e.unexpungeLocked(): 将 entryexpunged 状态改为 nil 状态。
  • e.swapLocked(&value): 交换 key 的值。key 存在的时候,直接通过原子操作使用新的值覆盖旧的。
  • m.read.Store(&readOnly{m: read.m, amended: true}): 将 read mapamended 状态设置为 true

为什么使用原子操作

为什么要使用原子操作呢?这是因为 sync.Map 中有一些写操作是没有加锁的,比如删除的时候, 删除的时候只是将 entry 的状态通过原子操作改成了 nil 状态。 如果不使用原子操作,那么就会出现并发问题。

比如:在 m.mu.Lock() 保护的临界区内先读取了 entry 的状态,我们还没来得及对其做任何操作, 在另外一个 goroutineentry 的状态被修改了,那么我们临界区内的 entry 状态已经成为它的历史状态了, 如果这个时候再基于这个状态做任何操作都会导致并发问题。

Load 源码剖析

Load 方法的作用是从 sync.Map 中读取 key 对应的值。 在 sync.Map 的实现中,key 的查找都遵循以下的查找流程:

sync_map_8

注意:从 read map 查找不需要加锁,从 dirty map 中查找需要加锁。

下面是 Load 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Load 返回存储在 map 中的键值,如果不存在值则返回 nil。
// ok 结果表明是否在 map 中找到了值。
func (m *Map) Load(key any) (value any, ok bool) {
// 通过原子操作获取只读 map
read := m.loadReadOnly()
e, ok := read.m[key]
// 不在只读 map 中,并且 dirty map 包含一些 key 不在 read.m 中。
if !ok && read.amended {
m.mu.Lock()

// double checking
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended { // 仍然不在只读 map 中,并且 dirty map 包含一些 key 不在 read.m 中。
e, ok = m.dirty[key] // 从 dirty map 中获取
// 不管条目是否存在,记录一个未命中:这个键将走慢路径,直到脏映射被提升为读映射。
m.missLocked() // read 中读不到
}
m.mu.Unlock()
}
// key 不存在
if !ok {
return nil, false
}
// key 存在,通过原子操作获取值
return e.load()
}

Load 图示

sync_map_9

其实 Load 的过程大概就是前一个图的查找 key 的过程,只不过其中有一步 missLocked(), 这个操作是用来记录 key 未命中的次数的。在达到一定次数之后,会将 dirty map 提升为 read map

missLocked 源码剖析

missLocked 的实现是很简单的,就是将 misses 加 1,如果 misses 达到了 dirty map 的大小, 就会将 dirty map 提升为 read map

1
2
3
4
5
6
7
8
9
10
11
12
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
// 未命中的次数达到 len(m.dirty),将 dirty map 提升为 read map
m.read.Store(&readOnly{m: m.dirty})
// 重置 dirty map
m.dirty = nil
// 重置 misses
m.misses = 0
}

这个过程可以用下图表示:

sync_map_10

Load 工作流程

Load 方法的工作流程如下:

  1. 通过原子操作获取 read map。如果 read map 中存在 key,则直接返回 key 对应的值。
  2. 如果 dirty map 中包含了一些 read map 中不存在的 key,则需要加锁,再次获取 read map
  3. 如果 read map 中不存在 key,则从 dirty map 中获取 key 对应的值(同时调用 missLocked())。否则返回从 read map 中获取到的 key 对应的值。

LoadOrStore 源码剖析

LoadOrStore 方法的作用是从 sync.Map 中读取 key 对应的值,如果不存在则将 keyvalue 存入 sync.Map 中。 其实它跟 Load 方法整体流程上也是差不多的,只不过它在找到 key 的时候,会将 keyvalue 存入 sync.Map 中。 如果没有找到 key,则新增 keydirty map 中。

下面是 LoadOrStore 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// LoadOrStore 返回键的现有值(如果存在)。
// 否则,它存储并返回给定的值。
// 返回值:loaded 表明是否是加载的值,而不是存储的值。actual 是当前存储的值。
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {
// 如果从 read map 中获取到了 key,则不需要加锁。
read := m.loadReadOnly()
if e, ok := read.m[key]; ok { // key 是 expunged 状态的时候,ok 为 false
actual, loaded, ok := e.tryLoadOrStore(value)
if ok { // Load 或者 Store 成功
return actual, loaded
}
}

// 加锁
m.mu.Lock()
// double checking
read = m.loadReadOnly()
if e, ok := read.m[key]; ok {
// key 存在于 read map 中
if e.unexpungeLocked() { // 状态:expunged => nil
// 之前是 expunged 状态,现在变成了 nil 状态。需要在 dirty map 中写入 e。
m.dirty[key] = e
}
// 再次对 entry 执行尝试 Load 或者 Store 新的值的操作
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
// key 存在于 dirty map 中
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked() // misses++,表示 read map 中没有该 key
} else {
// key 不存在于 read map 和 dirty map 中。
if !read.amended {
// 下面需要往 dirty map 中写入新的 key,所以需要确保 dirty map 被初始化。
m.dirtyLocked()
// dirty map 中现在有一些 read map 中不存在的 key,所以需要将 read map 的 amended 置为 true。
m.read.Store(&readOnly{m: read.m, amended: true})
}
// 写入 dirty map
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()

return actual, loaded
}

LoadOrStore 图示

sync_map_11

LoadOrStore 工作流程

  1. keyread map 中找到,尝试在 read mapLoadStore,操作成功则返回。找不到则加锁,然后二次检查(double checking)。
  2. read map 中依然找不到,但是 keydirty map 中找到,尝试在 dirty mapLoadStore,操作成功则返回。(missLocked
  3. key 不存在,往 dirty map 中写入 keyvalue。(如果 dirty mapnil,则先进行初始化),然后read mapamended 修改为 true

tryLoadOrStore 源码剖析

我们发现,在 LoadOrStore 方法中,找到 key 之后,都是调用 tryLoadOrStore 方法来进行 LoadStore 操作的。 它的作用就是在 entry 上尝试 LoadStore 操作,简单来说就是,如果 key 已经存在则 Load,否则 Store(当然,实际上没有这么简单)。

我们先来看看它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 如果 entry 未被删除,tryLoadOrStore 会自动加载或存储一个值。
// 如果 entry 被删除,tryLoadOrStore 将保持条目不变并返回 ok==false。
//
// 返回值:
// ok:操作是否成功(Load 成功、Store 成功)
// loaded:表示是否是 Load 出来的
// actual:Load 到的值
func (e *entry) tryLoadOrStore(i any) (actual any, loaded, ok bool) {
// 获取 entry 的状态
p := e.p.Load()
// 这个 key 只存在于 read map 中,并且它已经被删除了
if p == expunged {
return nil, false, false
}
// key 是正常状态,Load 成功,返回
if p != nil {
return *p, true, true
}

// p 是 nil,说明 key 不存在,需要 Store
ic := i
for { // 循环直到 Load 或者 Store 成功(类似自旋锁)
// Store 成功
if e.p.CompareAndSwap(nil, &ic) {
return i, false, true
}
// Store 失败,重新获取 entry 的状态
p = e.p.Load()
// 被删除了
if p == expunged {
return nil, false, false
}
// 还没被删除,说明 key 存在
if p != nil {
return *p, true, true
}
}
}

tryLoadOrStore 的逻辑可以用下图表示:

sync_map_12

pnil 的情况下,会有一个 for 循环一直尝试 Load 或者 Store,一旦成功就会返回。

unexpungeLocked 的作用

LoadOrStore 方法中,我们发现,如果 keyread map 中找到,会先调用 unexpungeLocked 方法。 读到这里,可能很多读者对 expungeunexpunge 有点懵逼,不知道它们是干什么的。

简单来说,expunge 就是表明 key 已经被删除了,并且这个 key 只存在于 read map 中(在 dirty map 中不存在)。 而 unexpunge 的作用就是取消 expunge 的效果(因为要往这个 key 写入新的值了),紧接着我们会往 dirty map 中写入这个 key

我们可以结合下图来思考一下:

sync_map_13

注意:实际中 entry 并不是连续存储的。

expunged 状态说明:

  1. p == expungedkey 已被删除,并且 dirty map 不为 nil,并且 dirty 中没有这个 key
  2. p == nilkey 已被删除,并且 dirty mapnil,或 dirty[k] 指向该 entry。(Store)
  3. p != nilkey 正常,返回其值。(Load)

Delete 源码剖析

Delete 方法实际上只是 LoadAndDelete 的 wrapper 函数,所以我们看 LoadAndDelete 就够了。 删除操作在 sync.Map 中是一个很简单的操作,如果在 read map 中找到了要删除的 key, 那么我们只需要将其设置为 nil 就可以了。虽然它是一个写操作,但是依然不需要加锁。

如果在 read map 中找到了 key,则可以不加锁也把它删除。因为 sync.Map 中的删除只是一个标记。

例外的情况是,它在 read map 中找不到,然后就需要加锁,然后做 double checking,然后再去 dirty map 中查找了。

LoadAndDelete 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// LoadAndDelete 删除键的值,返回以前的值(如果有)。
// loaded 报告 key 是否存在。
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
// 获取 read map
read := m.loadReadOnly()
// 从 read map 查找 key
e, ok := read.m[key]
if !ok && read.amended { // read map 找不到那个 key,需要继续从 dirty map 中查找
m.mu.Lock() // 加锁
read = m.loadReadOnly() // double checking
e, ok = read.m[key]
if !ok && read.amended { // 需要继续从 dirty map 中查找
e, ok = m.dirty[key] // 从 dirty map 中删除 key
delete(m.dirty, key) // 直接做删除 key 的操作
// 累加未命中 read map 的次数
m.missLocked()
}
m.mu.Unlock()
}
if ok { // key 存在,做删除操作(设置 entry 为 nil 状态)
return e.delete()
}
// key 找不到,不需要做删除操作
return nil, false
}

删除的操作会有两种情况:

  • 存在于 read map 中,则直接删除。(设置 entry 指针为 nil,但是不会删除 read map 中的 key
  • 只存在于 dirty map 中,则直接删除。这种情况下,会删除 dirty map 中的 key

LoadAndDelete 图示

sync_map_14

LoadAndDelete 工作流程

  1. read map 中查找 key,如果找到了,那么直接删除 key(将 entry 的指针设置为 nil),并返回 value
  2. 如果 read map 中没有找到 key,并且 read.amendedtrue,那么就需要加锁,然后做 double checking
  3. 加锁后在 read map 依然找不到,并且 read.amendedtrue,那么就需要从 dirty map 中查找 key
  4. 同时在临界区内直接执行 delete 操作,将 keydirty map 中删除。同时累加 misses 次数。
  5. 最后,如果找到了 key 对应的 entry,则将其删除(设置 entry 指针为 nil),并返回 value

Range 源码剖析

Range 方法的作用是遍历 sync.Map 中的所有 keyvalue,它接受一个函数作为参数,如果这个函数返回 false,那么就会停止遍历。

Range 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Range 依次为映射中存在的每个键和值调用 f。 如果 f 返回 false,则 range 停止迭代。
func (m *Map) Range(f func(key, value any) bool) {
// 我们需要能够遍历在调用 Range 开始时已经存在的所有键。
read := m.loadReadOnly()
if read.amended {
// dirty map 中包含了 read map 中没有的 key
m.mu.Lock()
read = m.loadReadOnly()
if read.amended {
// 使用 m.dirty 中的数据覆盖 m.read 中的数据
read = readOnly{m: m.dirty}
m.read.Store(&read)
// 重置 dirty map
m.dirty = nil
// 重置 misses
m.misses = 0
}
m.mu.Unlock()
}

// 所有的 key 都在 read map 中了,遍历 read map 即可
for k, e := range read.m {
v, ok := e.load()
if !ok { // 已经被删除
continue
}
if !f(k, v) { // f 可以返回一个 bool 值,如果返回 false,那么就停止遍历
break
}
}
}

Range 图示

sync_map_15

Range 遍历的时候,只会遍历 read map 中的 key。如果 read.amendedtrue,那么就需要加锁,然后做 double checking, 如果二次检查 read.amended 还是 true,那么就需要将 dirty map 中的数据覆盖到 read map 中。

Range 工作流程

  1. 为了保证能遍历 sync.Map 中所有的 key,需要判断 read.amended 是否为 true
  2. 如果为 true,说明只有 dirty map 中包含了所有的 key,那么就需要将 dirty map 转换为 read map。(这样的好处是,可以在遍历过程中,不需要加锁)
  3. 然后开始遍历,遍历的时候只需要遍历 read map 即可,因为这个时候 read map 中包含了所有的 key
  4. 遍历过程中,如果发现 key 已经被删除,则直接跳过。否则将 keyvalue 传递给 f 函数,如果 f 函数返回 false,那么就停止遍历。

CompareAndSwap 源码剖析

CompareAndSwap 方法的作用是比较 key 对应的 value 是否为 old,如果是,则将 key 对应的 value 设置为 new

CompareAndSwap 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 如果映射中存储的值等于旧值,则 CompareAndSwap 会交换 key 的旧值和新值
// 旧值必须是可比较的类型。
func (m *Map) CompareAndSwap(key, old, new any) bool {
// 获取 read map
read := m.loadReadOnly()
// 从 read map 读取 key 对应的 value
if e, ok := read.m[key]; ok {
// 在 read map 中找到了,进行 CAS 操作
return e.tryCompareAndSwap(old, new)
} else if !read.amended {
// 在 dirty map 也没有,返回 false
return false
}

// 加锁
m.mu.Lock()
defer m.mu.Unlock()
read = m.loadReadOnly()
swapped := false
if e, ok := read.m[key]; ok { // double checking
// 在 read map 中找到了,进行 CAS 操作
swapped = e.tryCompareAndSwap(old, new)
} else if e, ok := m.dirty[key]; ok {
// 在 dirty map 中找到了,进行 CAS 操作
swapped = e.tryCompareAndSwap(old, new)
// 累加 misses 次数
m.missLocked()
}
return swapped
}

CompareAndSwap 图示

sync_map_16

其实到这里,我们应该发现了,其实 sync.Map 的大多数方法的实现都是先从 read map 中读取,如果没有找到,那么就从 dirty map 中读取。 只是从 read map 中读取的时候,需要加锁,然后做 double checking

CompareAndSwap 工作流程

  1. 首先从 read map 中读取 key 对应的 value。如果找到则进行 CAS 操作,如果没有找到,那么就需要加锁,然后做 double checking
  2. 如果还是没找到。则从 dirty map 中查找,找到则做 CAS 操作,然后累加 misses 次数。
  3. 如果还是没找到,那么就返回 false

CompareAndDelete 源码剖析

CompareAndDelete 方法的作用是比较 key 对应的 value 是否为 old,如果是,则将 key 对应的 value 删除。

CompareAndDelete 的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 如果 key 的值等于 old,CompareAndDelete 会删除它的条目。
// 旧值必须是可比较的类型。
//
// 如果 map 中的 key 的值不等于 old,则 CompareAndDelete 返回 false(即使旧值是 nil 接口值)。
func (m *Map) CompareAndDelete(key, old any) (deleted bool) {
// 获取 read map
read := m.loadReadOnly()
e, ok := read.m[key]
// read map 中不存在这个 key,并且 dirty map 中包含了一些 read map 中没有的 key
if !ok && read.amended {
// 加锁
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
// double checking
if !ok && read.amended { // dirty map 中包含 read map 中不存在的 key
e, ok = m.dirty[key]
// 累加 misses 次数
m.missLocked()
}
m.mu.Unlock()
}

// 如果 key 存在,并且其值等于 old,则将其删除。
for ok {
p := e.p.Load()
// 已经被删除,或者值不等于 old,返回 false,表示删除失败
if p == nil || p == expunged || *p != old {
return false
}
// 将其删除(本质上是一个 CAS 操作,将其状态修改为了 nil)
if e.p.CompareAndSwap(p, nil) {
return true
}
}
// key 找不到,返回 false
return false
}

CompareAndDelete 图示

sync_map_17

CompareAndDelete 工作流程

  1. 首先从 read map 中读取 key 对应的 value。如果找到则进行 CAS 操作,如果没有找到,那么就需要加锁,然后做 double checking
  2. 如果还是没找到。并且 dirty map 中包含了部分 read map 中不存在的 key,则从 dirty map 中查找,找到则做 CAS 操作,然后累加 misses 次数。
  3. 如果找到了 key,会通过原子操作读取其之前的值。如果发现它已经被删除或者旧值不等于 old,则返回 false。否则通过 CAS 操作将其删除,然后返回 true
  4. 如果没有找到 key,则返回 false

entry 的一些说明

entry 这个结构体是 sync.Map 中实际保存值的结构体,它保存了指向了 key 对应值的指针。

在上面阅读代码的过程中,我们发现,entry 中有很多方法使用了 try 前缀,比如 trySwap, tryLoadOrStore 等。对于这类方法,我们需要知道的是:

  1. 它并不保证操作一定成功,因为一些写操作是不需要持有互斥锁就可以进行的(比如删除操作,只是一个原子操作,将 entry 指向了 nil)。
  2. 这类方法里面,有一个 for 循环,来进行多次尝试,直到操作成功,又或者发现 entry 已经被删除的时候就返回。类似自旋锁。
  3. 这类方法里面对 entry 状态的修改是通过 CAS 操作来实现的。

sync.Map 源码总结

一顿源码看下来,我们不难发现,sync.Map 的大部分方法整体处理流程上是非常相似的,都是先从 read map 中读取,如果没有找到,那么就需要加锁,然后做 double checking。如果还是没找到,那么就从 dirty map 中查找,如果还是没找到,那么就返回 false

这样做的目的都是在尽量地减少锁的占用,从而获得更好的性能。

同时,如果在 dirty map 中查找的次数多了,会触发 dirty map 转换为 read map 的操作流程,这样一来,下一次搜索同样的 key 就不再需要加锁了。

最后一个关键的点是,在 sync.Map 中没有被锁保护的地方,都是通过原子操作来实现的,这样一来,就可以保证在多核 CPU 上的并发安全。

总结

  • sync.Map 中的 key 有两份,一份在 read map 中,一份在 dirty map 中。read map 中的 key 是不可变的,而 dirty map 中的 key 是可变的。
  • sync.Map 中的大多数操作的操作流程如下:
    • 首先从 read map 中读取 key 对应的 value。找到则做相应操作。
    • 如果没找到,则加锁,再做一次 double checking。找到则做相应操作。
    • 如果还是没找到,那么就从 dirty map 中查找,找到则做相应操作。
    • dirty map 找到的时候,需要累加 misses 次数,如果 misses 次数超过了 dirty map 的大小,那么就会触发 dirty map 转换为 read map 的操作流程。
  • sync.Map 中的 read mapdirty map 中相同的 key 指向了同一个 value(是一个 entry 结构体实例)。
  • entry 有三种状态:
    • nil: 表示 key 已被删除。
    • expunged: 表示 key 已被删除,并且 dirty map 中没有这个 key,这个 key 只存在于 read map 中。
    • *v: 表示一个指向具体值的指针,是正常状态。
  • sync.Map 中的大部分方法都是通过原子操作来实现的,这样一来,就可以保证在多核 CPU 上的并发安全。就算没有在锁保护的临界区内,这种操作依然可以保证对 map 的操作不会出现错乱的情况。
  • read map 中有一个字段标识了是否 dirty map 中存在部分 read map 中不存在的 key。这样一来,如果在 read map 中找不到 key 的时候,就可以先判断一下 read.amended 是否为 true,如果是 true,才需要进行加锁,然后再去 dirty map 中查找。这样一来,就可以减少加锁的次数,从而获得更好的性能。
  • dirty mapread map 之间是会相互转换:
    • dirty map 中查找 key 的次数超过了 dirty map 的大小,就会触发 dirty map 转换为 read map 的操作流程。
    • 需要写入新的 key 的时候,如果 dirty mapnil,那么会将 read map 中未删除的 key 写入到一个新创建的 dirty map 中。
  • sync.Map 性能更好的原因:尽量减少了加锁的次数,很多地方使用原子操作来保证并发安全。(如果我们的业务场景是写多读少,那么这一点可能就不成立了。)

我们知道,go 里面提供了 map 这种类型让我们可以存储键值对数据,但是如果我们在并发的情况下使用 map 的话,就会发现它是不支持并发地进行读写的(会报错)。 在这种情况下,我们可以使用 sync.Mutex 来保证并发安全,但是这样会导致我们在读写的时候,都需要加锁,这样就会导致性能的下降。 除了使用互斥锁这种相对低效的方式,我们还可以使用 sync.Map 来保证并发安全,它在某些场景下有比使用 sync.Mutex 更高的性能。 本文就来探讨一下 sync.Map 中的一些大家比较感兴趣的问题,比如为什么有了 map 还要 sync.Map?它为什么快?sync.Map 的适用场景(注意:不是所有情况下都快。)等。

关于 sync.Map 的设计与实现原理,会在下一篇中再做讲解。

map 在并发下的问题

如果我们看过 map 的源码,就会发现其中有不少会引起 fatal 错误的地方,比如 mapaccess1(从 map 中读取 key 的函数)里面,如果发现正在写 map,则会有 fatal 错误。 (如果还没看过,可以跟着这篇 《go map 设计与实现》 看一下)

1
2
3
if h.flags&hashWriting != 0 {
fatal("concurrent map read and map write")
}

map 并发读写异常的例子

下面是一个实际使用中的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var m = make(map[int]int)

// 往 map 写 key 的协程
go func() {
// 往 map 写入数据
for i := 0; i < 10000; i++ {
m[i] = i
}
}()

// 从 map 读取 key 的协程
go func() {
// 从 map 读取数据
for i := 10000; i > 0; i-- {
_ = m[i]
}
}()

// 等待两个协程执行完毕
time.Sleep(time.Second)

这会导致报错:

1
fatal error: concurrent map read and map write

这是因为我们同时对 map 进行读写,而 map 不支持并发读写,所以会报错。如果 map 允许并发读写,那么可能在我们使用的时候会有很多错乱的情况出现。 (具体如何错乱,我们可以对比多线程的场景思考一下,本文不展开了)。

使用 sync.Mutex 保证并发安全

对于 map 并发读写报错的问题,其中一种解决方案就是使用 sync.Mutex 来保证并发安全, 但是这样会导致我们在读写的时候,都需要加锁,这样就会导致性能的下降。

使用 sync.Mutex 来保证并发安全,上面的代码可以改成下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var m = make(map[int]int)
// 互斥锁
var mu sync.Mutex

// 写 map 的协程
go func() {
for i := 0; i < 10000; i++ {
mu.Lock() // 写 map,加互斥锁
m[i] = i
mu.Unlock()
}
}()

// 读 map 的协程序
go func() {
for i := 10000; i > 0; i-- {
mu.Lock() // 读 map,加互斥锁
_ = m[i]
mu.Unlock()
}
}()

time.Sleep(time.Second)

这样就不会报错了,但是性能会有所下降,因为我们在读写的时候都需要加锁。(如果需要更高性能,可以继续读下去,不要急着使用 sync.Mutex

sync.Mutex 的常见的用法是在结构体中嵌入 sync.Mutex,而不是定义独立的两个变量。

使用 sync.RWMutex 保证并发安全

在上一小节中,我们使用了 sync.Mutex 来保证并发安全,但是在读和写的时候我们都需要加互斥锁。 这就意味着,就算多个协程进行并发读,也需要等待锁。 但是互斥锁的粒度太大了,但实际上,并发读是没有什么太大问题的,应该被允许才对,如果我们允许并发读,那么就可以提高性能

当然 go 的开发者也考虑到了这一点,所以在 sync 包中提供了 sync.RWMutex,这个锁可以允许进行并发读,但是写的时候还是需要等待锁。 也就是说,一个协程在持有写锁的时候,其他协程是既不能读也不能写的,只能等待写锁释放才能进行读写

使用 sync.RWMutex 来保证并发安全,我们可以改成下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
var m = make(map[int]int)
// 读写锁(允许并发读,写的时候是互斥的)
var mu sync.RWMutex

// 写入 map 的协程
go func() {
for i := 0; i < 10000; i++ {
// 写入的时候需要加锁
mu.Lock()
m[i] = i
mu.Unlock()
}
}()

// 读取 map 的协程
go func() {
for i := 10000; i > 0; i-- {
// 读取的时候需要加锁,但是这个锁是读锁
// 多个协程可以同时使用 RLock 而不需要等待
mu.RLock()
_ = m[i]
mu.RUnlock()
}
}()

// 另外一个读取 map 的协程
go func() {
for i := 20000; i > 10000; i-- {
// 读取的时候需要加锁,但是这个锁是读锁
// 多个协程可以同时使用 RLock 而不需要等待
mu.RLock()
_ = m[i]
mu.RUnlock()
}
}()

time.Sleep(time.Second)

这样就不会报错了,而且性能也提高了,因为我们在读的时候,不需要等待锁。

说明:

  • 多个协程可以同时使用 RLock 而不需要等待,这是读锁。
  • 只有一个协程可以使用 Lock,这是写锁,有写锁的时候,其他协程不能读也不能写。
  • 持有写锁的协程,可以使用 Unlock 来释放锁。
  • 写锁释放之后,其他协程才能获取到锁(读锁或者写锁)。

也就是说,使用 sync.RWMutex 的时候,读操作是可以并发执行的,但是写操作是互斥的。 这样一来,相比 sync.Mutex 来说等待锁的次数就少了,自然也就能获得更好的性能了。

gin 框架里面就使用了 sync.RWMutex 来保证 Keys 读写操作的并发安全。

有了读写锁为什么还要有 sync.Map?

通过上面的内容,我们知道了,有下面两种方式可以保证并发安全:

  • 使用 sync.Mutex,但是这样的话,读写都是互斥的,性能不好。
  • 使用 sync.RWMutex,可以并发读,但是写的时候是互斥的,性能相对 sync.Mutex 要好一些。

但是就算我们使用了 sync.RWMutex,也还是有一些锁的开销。那么我们能不能再优化一下呢?答案是可以的。那就是使用 sync.Map

sync.Map 在锁的基础上做了进一步优化,在一些场景下使用原子操作来保证并发安全,性能更好。

使用原子操作替代读锁

但是就算使用 sync.RWMutex,读操作依然还有锁的开销,那么有没有更好的方式呢? 答案是有的,就是使用原子操作来替代读锁。

举一个很常见的例子就是多个协程同时读取一个变量,然后对这个变量进行累加操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var a int32

var wg sync.WaitGroup
wg.Add(2)

go func() {
for i := 0; i < 10000; i++ {
a++
}
wg.Done()
}()

go func() {
for i := 0; i < 10000; i++ {
a++
}
wg.Done()
}()

wg.Wait()

// a 期望结果应该是 20000才对。
fmt.Println(a) // 实际:17089,而且每次都不一样

这个例子中,我们期望的结果是 a 的值是 20000,但是实际上,每次运行的结果都不一样,而且都不会等于 20000。 其中很简单粗暴的一种解决方法是加锁,但是这样的话,性能就不好了,但是我们可以使用原子操作来解决这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var a atomic.Int32

var wg sync.WaitGroup
wg.Add(2)

go func() {
for i := 0; i < 10000; i++ {
a.Add(1)
}
wg.Done()
}()

go func() {
for i := 0; i < 10000; i++ {
a.Add(1)
}
wg.Done()
}()

wg.Wait()

fmt.Println(a.Load()) // 20000

锁跟原子操作的性能差多少?

我们来看一下,使用锁和原子操作的性能差多少:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func BenchmarkMutexAdd(b *testing.B) {
var a int32
var mu sync.Mutex

for i := 0; i < b.N; i++ {
mu.Lock()
a++
mu.Unlock()
}
}

func BenchmarkAtomicAdd(b *testing.B) {
var a atomic.Int32
for i := 0; i < b.N; i++ {
a.Add(1)
}
}

结果:

1
2
BenchmarkMutexAdd-12     	100000000	        10.07 ns/op
BenchmarkAtomicAdd-12 205196968 5.847 ns/op

我们可以看到,使用原子操作的性能比使用锁的性能要好一些。

也许我们会觉得上面这个例子是写操作,那么读操作呢?我们来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func BenchmarkMutex(b *testing.B) {
var mu sync.RWMutex

for i := 0; i < b.N; i++ {
mu.RLock()
mu.RUnlock()
}
}

func BenchmarkAtomic(b *testing.B) {
var a atomic.Int32
for i := 0; i < b.N; i++ {
_ = a.Load()
}
}

结果:

1
2
BenchmarkMutex-12     	100000000	        10.12 ns/op
BenchmarkAtomic-12 1000000000 0.3133 ns/op

我们可以看到,使用原子操作的性能比使用锁的性能要好很多。而且在 BenchmarkMutex 里面甚至还没有做读取数据的操作。

sync.Map 里面的原子操作

sync.Map 里面相比 sync.RWMutex,性能更好的原因就是使用了原子操作。 在我们从 sync.Map 里面读取数据的时候,会先使用一个原子 Load 操作来读取 sync.Map 里面的 key(从 read 中读取)。 注意:这里拿到的是 key 的一份快照,我们对其进行读操作的时候也可以同时往 sync.Map 中写入新的 key,这是保证它高性能的一个很关键的设计(类似读写分离)。

sync.Map 里面的 Load 方法里面就包含了上述的流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Load 方法从 sync.Map 里面读取数据。
func (m *Map) Load(key any) (value any, ok bool) {
// 先从只读 map 里面读取数据。
// 这一步是不需要锁的,只有一个原子操作。
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended { // 如果没有找到,并且 dirty 里面有一些 read 中没有的 key,那么就需要从 dirty 里面读取数据。
// 这里才需要锁
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}

// key 不存在
if !ok {
return nil, false
}
// 使用原子操作读取
return e.Load()
}

上面的代码我们可能还看不懂,但是没关系,这里我们只需要知道的是,从 sync.Map 读取数据的时候,会先做原子操作,如果没找到,再进行加锁操作,这样就减少了使用锁的频率了,自然也就可以获得更好的性能(但要注意的是并不是所有情况下都能获得更好的性能)。至于具体实现,在下一篇文章中会进行更加详细的分析。

也就是说,sync.Map 之所以更快,是因为相比 RWMutex,进一步减少了锁的使用,而这也就是 sync.Map 存在的原因了

sync.Map 的基本用法

现在我们知道了,sync.Map 里面是利用了原子操作来减少锁的使用。但是我们好像连 sync.Map 的一些基本操作都还不了解,现在就让我们再来看看 sync.Map 的基本用法。

sync.Map 的使用还是挺简单的,map 中有的操作,在 sync.Map 都有,只不过区别是,在 sync.Map 中,所有的操作都需要通过调用其方法来进行。 sync.Map 里面几个常用的方法有(CRUD):

  • Store:我们新增或者修改数据的时候,都可以使用 Store 方法。
  • Load:读取数据的方法。
  • Range:遍历数据的方法。
  • Delete:删除数据的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var m sync.Map

// 写入/修改
m.Store("foo", 1)

// 读取
fmt.Println(m.Load("foo")) // 1 true

// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Println(key, value) // foo 1
return true
})

// 删除
m.Delete("foo")
fmt.Println(m.Load("foo")) // nil false

注意:在 sync.Map 中,keyvalue 都是 interface{} 类型的,也就是说,我们可以使用任意类型的 keyvalue。 而不像 map,只能存在一种类型的 keyvalue。从这个角度来看,它的类型类似于 map[any]any

另外一个需要注意的是,Range 方法的参数是一个函数,这个函数如果返回 false,那么遍历就会停止。

sync.Map 的使用场景

sync.Map 源码中,已经告诉了我们 sync.Map 的使用场景:

1
2
3
4
5
The Map type is optimized for two common use cases: (1) when the entry for a given
key is only ever written once but read many times, as in caches that only grow,
or (2) when multiple goroutines read, write, and overwrite entries for disjoint
sets of keys. In these two cases, use of a Map may significantly reduce lock
contention compared to a Go map paired with a separate Mutex or RWMutex.

翻译过来就是,Map 类型针对两种常见用例进行了优化:

  • 当给定 key 的条目只写入一次但读取多次时,如在只会增长的缓存中。(读多写少)
  • 当多个 goroutine 读取、写入和覆盖不相交的键集的条目。(不同 goroutine 操作不同的 key)

在这两种情况下,与 Go map 与单独的 MutexRWMutex 配对相比,使用 sync.Map 可以显著减少锁竞争(很多时候只需要原子操作就可以)。

总结

  • 普通的 map 不支持并发读写。
  • 有以下两种方式可以实现 map 的并发读写:
    • 使用 sync.Mutex 互斥锁。读和写的时候都使用互斥锁,性能相比 sync.RWMutex 会差一些。
    • 使用 sync.RWMutex 读写锁。读的锁是可以共享的,但是写锁是独占的。性能相比 sync.Mutex 会好一些。
  • sync.Map 里面会先进行原子操作来读取 key,如果读取不到的时候,才会需要加锁。所以性能相比 sync.Mutexsync.RWMutex 会好一些。
  • sync.Map 里面几个常用的方法有(CRUD):
    • Store:我们新增或者修改数据的时候,都可以使用 Store 方法。
    • Load:读取数据的方法。
    • Range:遍历数据的方法。
    • Delete:删除数据的方法。
  • sync.Map 的使用场景,sync.Map 针对以下两种场景做了优化:
    • key 只会写入一次,但是会被读取多次的场景。
    • 多个 goroutine 读取、写入和覆盖不相交的键集的条目。

我们选择 go 语言的一个重要原因是,它有非常高的性能。但是它反射的性能却一直为人所诟病,本篇文章就来看看 go 反射的性能问题。

go 的性能测试

在开始之前,有必要先了解一下 go 的性能测试。在 go 里面进行性能测试很简单,只需要在测试函数前面加上 Benchmark 前缀, 然后在函数体里面使用 b.N 来进行循环,就可以得到每次循环的耗时。如下面这个例子:

1
2
3
4
5
6
func BenchmarkNew(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
New()
}
}

我们可以使用命令 go test -bench=. reflect_test.go 来运行这个测试函数,又或者如果使用 goland 的话,直接点击运行按钮就可以了。

说明:

  • *_test.go 文件中 Benchmark* 前缀函数是性能测试函数,它的参数是 *testing.B 类型。
  • b.ReportAllocs():报告内存分配次数,这是一个非常重要的指标,因为内存分配相比单纯的 CPU 计算是比较耗时的操作。在性能测试中,我们需要关注内存分配次数,以及每次内存分配的大小。
  • b.N:是一个循环次数,每次循环都会执行 New() 函数,然后记录下来每次循环的耗时。

go 里面很多优化都致力于减少内存分配,减少内存分配很多情况下都可以提高性能。

输出:

1
BenchmarkNew-20    1000000000	  0.1286 ns/op	  0 B/op   0 allocs/op

输出说明:

  • BenchmarkNew-20BenchmarkNew 是测试函数名,-20 是 CPU 核数。
  • 1000000000:循环次数。
  • 0.1286 ns/op:每次循环的耗时,单位是纳秒。这里表示每次循环耗时 0.1286 纳秒。
  • 0 B/op:每次循环内存分配的大小,单位是字节。这里表示每次循环没有分配内存。
  • 0 allocs/op:每次循环内存分配的次数。这里表示每次循环没有分配内存。

go 反射慢的原因

动态语言的灵活性是以牺牲性能为代价的,go 语言也不例外,go 的 interface{} 提供了一定的灵活性,但是处理 interface{} 的时候就要有一些性能上的损耗了。

我们都知道,go 是一门静态语言,这意味着我们在编译的时候就知道了所有的类型,而不是在运行时才知道类型。 但是 go 里面有一个 interface{} 类型,它可以表示任意类型,这就意味着我们可以在运行时才知道类型。 但本质上,interface{} 类型还是静态类型,只不过它的类型和值是动态的。 在 interface{} 类型里面,存储了两个指针,一个指向类型信息,一个指向值信息。具体可参考《go interface 设计与实现》

go interface{} 带来的灵活性

有了 interface{} 类型,让 go 也拥有了动态语言的特性,比如,定义一个函数,它的参数是 interface{} 类型, 那么我们就可以传入任意类型的值给这个函数。比如下面这个函数(做任意整型的加法,返回 int64 类型):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func convert(i interface{}) int64 {
typ := reflect.TypeOf(i)
switch typ.Kind() {
case reflect.Int:
return int64(i.(int))
case reflect.Int8:
return int64(i.(int8))
case reflect.Int16:
return int64(i.(int16))
case reflect.Int32:
return int64(i.(int32))
case reflect.Int64:
return i.(int64)
default:
panic("not support")
}
}

func add(a, b interface{}) int64 {
return convert(a) + convert(b)
}

说明:

  • convert() 函数:将 interface{} 类型转换为 int64 类型。对于非整型的类型,会 panic。(当然不是很严谨,还没涵盖 uint* 类型)
  • add() 函数:做任意整型的加法,返回 int64 类型。

相比之下,如果是确定的类型,我们根本不需要判断类型,直接相加就可以了:

1
2
3
func add1(a, b int64) int64 {
return a + b
}

我们可以通过以下的 benchmark 来对比一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
func BenchmarkAdd(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
add(1, 2)
}
}

func BenchmarkAdd1(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
add1(1, 2)
}
}

结果:

1
2
BenchmarkAdd-12         179697526                6.667 ns/op           0 B/op          0 allocs/op
BenchmarkAdd1-12 1000000000 0.2353 ns/op 0 B/op 0 allocs/op

我们可以看到非常明显的性能差距,add() 要比 add1() 慢了非常多,而且这还只是做了一些简单的类型判断及类型转换的情况下。

go 灵活性的代价(慢的原因)

通过这个例子我们知道,go 虽然通过 interface{} 为我们提供了一定的灵活性支持,但是使用这种动态的特性是有一定代价的,比如:

  • 我们在运行时才知道类型,那么我们就需要在运行时去做类型判断(也就是通过反射),这种判断会有一定开销(本来是确定的一种类型,但是现在可能要在 20 多个类型中匹配才能确定它的类型是什么)。同时,判断到属于某一类型之后,往往需要转换为具体的类型,这也是一种开销。
  • 同时,我们可能需要去做一些属性、方法的查找等操作(Field, FieldByName, Method, MethodByName),这些操作都是在运行时做的,所以会有一定的性能损耗。
  • 另外,在做属性、方法之类的查找的时候,查找性能取决于属性、方法的数量,如果属性、方法的数量很多,那么查找性能就会相对慢。通过 index (Field, Method)查找相比通过 name (FieldByName, MethodByName)查找快很多,后者有内存分配的操作
  • 在我们通过反射来做这些操作的时候,多出了很多操作,比如,简单的两个 int 类型相加,本来可以直接相加。但是通过反射,我们不得不先根据 interface{} 创建一个反射对象,然后再做类型判断,再做类型转换,最后再做加法。

总的来说,go 的 interface{} 类型虽然给我们提供了一定的灵活性,让开发者也可以在 go 里面实现一些动态语言的特性, 但是这种灵活性是以牺牲一定的性能来作为代价的,它会让一些简单的操作变得复杂,一方面生成的编译指令会多出几十倍,另一方面也有可能在这过程有内存分配的发生(比如 FieldByName)。

慢是相对的

从上面的例子中,我们发现 go 的反射好像慢到了让人无法忍受的地步,然后就有人提出了一些解决方案, 比如:通过代码生成的方式避免运行时的反射操作,从而提高性能。比如 easyjson

但是这类方案都会让代码变得繁杂起来。我们需要权衡之后再做决定。为什么呢?因为反射虽然慢,但我们要知道的是,如果我们的应用中有网络调用,任何一次网络调用的时间往往都不会少于 1ms,而这 1ms 足够 go 做很多次反射操作了。这给我们什么启示呢?如果我们不是做中间件或者是做一些高性能的服务,而是做一些 web 应用,那么我们可以考虑一下性能瓶颈是不是在反射这里,如果是,那么我们就可以考虑一下代码生成的方式来提高性能,如果不是,那么我们真的需要牺牲代码的可维护性、可读性来提高反射的性能吗?优化几个慢查询带来的收益是不是更高呢?

go 反射性能优化

如果可以的话,最好的优化就是不要用反射

通过代码生成的方式避免序列化和反序列化时的反射操作

这里以 easyjson 为例,我们来看一下它是怎么做的。假设我们有如下结构体,我们需要对其进行 json 序列化/反序列化:

1
2
3
4
5
// person.go
type Person struct {
Name string `json:"name"`
Age int `json:"age"`
}

使用 easyjson 的话,我们需要为结构体生成代码,这里我们使用 easyjson 的命令行工具来生成代码:

1
easyjson -all person.go

这样,我们就会在当前目录下生成 person_easyjson.go 文件,里面包含了 MarshalJSONUnmarshalJSON 方法,这两个方法就是我们需要的序列化和反序列化方法。不同于标准库里面的 json.Marshaljson.Unmarshal,这两个方法是不需要反射的,它们的性能会比标准库的方法要好很多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func easyjsonDb0593a3EncodeGithubComGinGonicGinCEasy(out *jwriter.Writer, in Person) {
out.RawByte('{')
first := true
_ = first
{
const prefix string = ",\"name\":"
out.RawString(prefix[1:])
out.String(string(in.Name))
}
{
const prefix string = ",\"age\":"
out.RawString(prefix)
out.Int(int(in.Age))
}
out.RawByte('}')
}

// MarshalJSON supports json.Marshaler interface
func (v Person) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
easyjsonDb0593a3EncodeGithubComGinGonicGinCEasy(&w, v)
return w.Buffer.BuildBytes(), w.Error
}

我们看到,我们对 Person 的序列化操作现在只需要几行代码就可以完成了,但是也有很明显的缺点,生成的代码会很多。

性能差距:

1
2
3
4
5
6
7
8
goos: darwin
goarch: amd64
pkg: github.com/gin-gonic/gin/c/easy
cpu: Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz
BenchmarkJson
BenchmarkJson-12 3680560 305.9 ns/op 152 B/op 2 allocs/op
BenchmarkEasyJson
BenchmarkEasyJson-12 16834758 71.37 ns/op 128 B/op 1 allocs/op

我们可以看到,使用 easyjson 生成的代码,序列化的性能比标准库的方法要好很多,好了 4 倍以上。

反射结果缓存

这种方法适用于需要根据名称查找结构体字段或者查找方法的场景。

假设我们有一个结构体 Person,其中有 5 个方法,M1M2M3M4M5,我们需要通过名称来查找其中的方法,那么我们可以使用 reflect 包来实现:

1
2
3
p := &Person{}
v := reflect.ValueOf(p)
v.MethodByName("M4")

这是很容易想到的办法,但是性能如何呢?通过性能测试,我们可以看到,这种方式的性能是非常差的:

1
2
3
4
5
6
7
8
9
func BenchmarkMethodByName(b *testing.B) {
p := &Person{}
v := reflect.ValueOf(p)

b.ReportAllocs()
for i := 0; i < b.N; i++ {
v.MethodByName("M4")
}
}

结果:

1
BenchmarkMethodByName-12         5051679               237.1 ns/op           120 B/op          3 allocs/op

相比之下,我们如果使用索引来获取其中的方法的话,性能会好很多:

1
2
3
4
5
6
7
8
9
func BenchmarkMethod(b *testing.B) {
p := &Person{}
v := reflect.ValueOf(p)

b.ReportAllocs()
for i := 0; i < b.N; i++ {
v.Method(3)
}
}

结果:

1
BenchmarkMethod-12              200091475                5.958 ns/op           0 B/op          0 allocs/op

我们可以看到两种性能相差几十倍。那么我们是不是可以通过 Method 方法来替代 MethodByName 从而获得更好的性能呢?答案是可以的,我们可以缓存 MethodByName 的结果(就是方法名对应的下标),下次通过反射获取对应方法的时候直接通过这个下标来获取:

这里需要通过 reflect.Type 的 MethodByName 来获取反射的方法对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 缓存方法名对应的方法下标
var indexCache = make(map[string]int)

func methodIndex(p interface{}, method string) int {
if _, ok := indexCache[method]; !ok {
m, ok := reflect.TypeOf(p).MethodByName(method)
if !ok {
panic("method not found!")
}

indexCache[method] = m.Index
}

return indexCache[method]
}

性能测试:

1
2
3
4
5
6
7
8
9
10
11
func BenchmarkMethodByNameCache(b *testing.B) {
p := &Person{}
v := reflect.ValueOf(p)

b.ReportAllocs()
var idx int
for i := 0; i < b.N; i++ {
idx = methodIndex(p, "M4")
v.Method(idx)
}
}

结果:

1
2
3
// 相比原来的 MethodByName 快了将近 20 倍
BenchmarkMethodByNameCache-12 86208202 13.65 ns/op 0 B/op 0 allocs/op
BenchmarkMethodByName-12 5082429 235.9 ns/op 120 B/op 3 allocs/op

跟这个例子类似的是 Field/FieldByName 方法,可以采用同样的优化方式。这个可能是更加常见的操作,反序列化可能需要通过字段名查找字段,然后进行赋值。

使用类型断言代替反射

在实际使用中,如果只是需要进行一些简单的类型判断的话,比如判断是否实现某一个接口,那么可以使用类型断言来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type Talk interface {
Say()
}

type person struct {
}

func (p person) Say() {
}

func BenchmarkReflectCall(b *testing.B) {
p := person{}
v := reflect.ValueOf(p)

for i := 0; i < b.N; i++ {
idx := methodIndex(&p, "Say")
v.Method(idx).Call(nil)
}
}

func BenchmarkAssert(b *testing.B) {
p := person{}

for i := 0; i < b.N; i++ {
var inter interface{} = p
if v, ok := inter.(Talk); ok {
v.Say()
}
}
}

结果:

1
2
3
4
5
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz
BenchmarkReflectCall-12 6906339 173.1 ns/op
BenchmarkAssert-12 171741784 6.922 ns/op

在这个例子中,我们就算使用了缓存版本的反射,性能也跟类型断言差了将近 25 倍。

因此,在我们使用反射之前,我们需要先考虑一下是否可以通过类型断言来实现,如果可以的话,那么就不需要使用反射了。

总结

  • go 提供了性能测试的工具,我们可以通过 go test -bench=. 这种命令来进行性能测试,运行命令之后,文件夹下的测试文件中的 Benchmark* 函数会被执行。
  • 性能测试的结果中,除了平均执行耗时之外,还有内存分配的次数和内存分配的字节数,这些都是我们需要关注的指标。其中内存分配的次数和内存分配的字节数是可以通过 b.ReportAllocs() 来进行统计的。内存分配的次数和内存分配的字节数越少,性能越好。
  • 反射虽然慢,但是也带来了一定的灵活性,它的慢主要由以下几个方面的原因造成的:
    • 运行时需要进行类型判断,相比确定的类型,运行时可能需要在 20 多种类型中进行判断。
    • 类型判断之后,往往需要将 interface{} 转换为具体的类型,这个转换也是需要消耗一定时间的。
    • 方法、字段的查找也是需要消耗一定时间的。尤其是 FieldByName, MethodByName 这种方法,它们需要遍历所有的字段和方法,然后进行比较,这个比较的过程也是需要消耗一定时间的。而且这个过程还需要分配内存,这会进一步降低性能。
  • 慢不慢是一个相对的概念,如果我们的应用大部分时间是在 IO 等待,那么反射的性能大概率不会成为瓶颈。优化其他地方可能会带来更大的收益,同时也可以在不影响代码可维护性的前提下,使用一些时空复杂度更低的反射方法,比如使用 Field 代替 FieldByName 等。
  • 如果可以的话,尽量不使用反射就是最好的优化。
  • 反射的一些性能优化方式有如下几种(不完全,需要根据实际情况做优化):
    • 使用生成代码的方式,生成特定的序列化和反序列化方法,这样就可以避免反射的开销。
    • 将第一次反射拿到的结果缓存起来,这样如果后续需要反射的话,就可以直接使用缓存的结果,避免反射的开销。(空间换时间
    • 如果只是需要进行简单的类型判断,可以先考虑一下类型断言能不能实现我们想要的效果,它相比反射的开销要小很多。

反射是一个很庞大的话题,这里只是简单的介绍了一小部分反射的性能问题,讨论了一些可行的优化方案,但是每个人使用反射的场景都不一样,所以需要根据实际情况来做优化。

go 的反射是很脆弱的,保证反射代码正确运行的前提是,在调用反射对象的方法之前, 先问一下自己正在调用的方法是不是适合于所有用于创建反射对象的原始类型。 go 反射的错误大多数都来自于调用了一个不适合当前类型的方法(比如在一个整型反射对象上调用 Field() 方法)。 而且,这些错误通常是在运行时才会暴露出来,而不是在编译时,如果我们传递的类型在反射代码中没有被覆盖到那么很容易就会 panic

本文就介绍一下使用 go 反射时很大概率会出现的错误。

获取 Value 的值之前没有判断类型

对于 reflect.Value,我们有很多方法可以获取它的值,比如 Int()String() 等等。 但是,这些方法都有一个前提,就是反射对象底层必须是我们调用的那个方法对应的类型,否则会 panic,比如下面这个例子:

1
2
3
4
var f float32 = 1.0
v := reflect.ValueOf(f)
// 报错:panic: reflect: call of reflect.Value.Int on float32 Value
fmt.Println(v.Int())

上面这个例子中,f 是一个 float32 类型的浮点数,然后我们尝试通过 Int() 方法来获取一个整数,但是这个方法只能用于 int 类型的反射对象,所以会报错。

  • 涉及的方法:Addr, Bool, Bytes, Complex, Int, Uint, Float, Interface;调用这些方法的时候,如果类型不对则会 panic
  • 判断反射对象能否转换为某一类型的方法:CanAddr, CanInterface, CanComplex, CanFloat, CanInt, CanUint
  • 其他类型是否能转换判断方法:CanConvert,可以判断一个反射对象能否转换为某一类型。

通过 CanConvert 方法来判断一个反射对象能否转换为某一类型:

1
2
// true
fmt.Println(v.CanConvert(reflect.TypeOf(1.0)))

如果我们想将反射对象转换为我们的自定义类型,就可以通过 CanConvert 来判断是否能转换,然后再调用 Convert 方法来转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Person struct {
Name string
}

func TestReflect(t *testing.T) {
p := Person{Name: "foo"}
v := reflect.ValueOf(p)

// v 可以转换为 Person 类型
assert.True(t, v.CanConvert(reflect.TypeOf(Person{})))

// v 可以转换为 Person 类型
p1 := v.Convert(reflect.TypeOf(Person{}))
assert.Equal(t, "foo", p1.Interface().(Person).Name)
}

说明:

  • reflect.TypeOf(Person{}) 可以取得 Person 类型的信息
  • v.Convert 可以将 v 转换为 reflect.TypeOf(Person{}) 指定的类型

没有传递指针给 reflect.ValueOf

如果我们想通过反射对象来修改原变量,就必须传递一个指针,否则会报错(暂不考虑 slice, map, 结构体字段包含指针字段的特殊情况):

1
2
3
4
5
6
7
func TestReflect(t *testing.T) {
p := Person{Name: "foo"}
v := reflect.ValueOf(p)

// 报错:panic: reflect: reflect.Value.SetString using unaddressable value
v.FieldByName("Name").SetString("bar")
}

这个错误的原因是,v 是一个 Person 类型的值,而不是指针,所以我们不能通过 v.FieldByName("Name") 来修改它的字段。

对于反射对象来说,只拿到了 p 的拷贝,而不是 p 本身,所以我们不能通过反射对象来修改 p。

在一个无效的 Value 上操作

我们有很多方法可以创建 reflect.Value,而且这类方法没有 error 返回值,这就意味着,就算我们创建 reflect.Value 的时候传递了一个无效的值,也不会报错,而是会返回一个无效的 reflect.Value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func TestReflect(t *testing.T) {
var p = Person{}
v := reflect.ValueOf(p)

// Person 不存在 foo 方法
// FieldByName 返回一个表示 Field 的反射对象 reflect.Value
v1 := v.FieldByName("foo")
assert.False(t, v1.IsValid())

// v1 是无效的,只有 String 方法可以调用
// 其他方法调用都会 panic
assert.Panics(t, func() {
// panic: reflect: call of reflect.Value.NumMethod on zero Value
fmt.Println(v1.NumMethod())
})
}

对于这个问题,我们可以通过 IsValid 方法来判断 reflect.Value 是否有效:

1
2
3
4
5
6
7
8
9
10
11
12
func TestReflect(t *testing.T) {
var p = Person{}
v := reflect.ValueOf(p)

v1 := v.FieldByName("foo")
// 通过 IsValid 判断 reflect.Value 是否有效
if v1.IsValid() {
fmt.Println("p has foo field")
} else {
fmt.Println("p has no foo field")
}
}

Field() 方法在传递的索引超出范围的时候,直接 panic,而不会返回一个 invalid 的 reflect.Value。

IsValid 报告反射对象 v 是否代表一个值。 如果 v 是零值,则返回 false。 如果 IsValid 返回 false,则除 String 之外的所有其他方法都将发生 panic。 大多数函数和方法从不返回无效值。

什么时候 IsValid 返回 false

reflect.ValueIsValid 的返回值表示 reflect.Value 是否有效,而不是它代表的值是否有效。比如:

1
2
3
4
5
var b *int = nil
v := reflect.ValueOf(b)
fmt.Println(v.IsValid()) // true
fmt.Println(v.Elem().IsValid()) // false
fmt.Println(reflect.Indirect(v).IsValid()) // false

在上面这个例子中,v 是有效的,它表示了一个指针,指针指向的对象为 nil。 但是 v.Elem()reflect.Indirect(v) 都是无效的,因为它们表示的是指针指向的对象,而指针指向的对象为 nil。 我们无法基于 nil 来做任何反射操作。

其他情况下 IsValid 返回 false

除了上面的情况,IsValid 还有其他情况下会返回 false

  • 空的反射值对象,获取通过 nil 创建的反射对象,其 IsValid 会返回 false
  • 结构体反射对象通过 FieldByName 获取了一个不存在的字段,其 IsValid 会返回 false
  • 结构体反射对象通过 MethodByName 获取了一个不存在的方法,其 IsValid 会返回 false
  • map 反射对象通过 MapIndex 获取了一个不存在的 key,其 IsValid 会返回 false

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func TestReflect(t *testing.T) {
// 空的反射对象
fmt.Println(reflect.Value{}.IsValid()) // false
// 基于 nil 创建的反射对象
fmt.Println(reflect.ValueOf(nil).IsValid()) // false

s := struct{}{}
// 获取不存在的字段
fmt.Println(reflect.ValueOf(s).FieldByName("").IsValid()) // false
// 获取不存在的方法
fmt.Println(reflect.ValueOf(s).MethodByName("").IsValid()) // false

m := map[int]int{}
// 获取 map 的不存在的 key
fmt.Println(reflect.ValueOf(m).MapIndex(reflect.ValueOf(3)).IsValid())
}

注意:还有其他一些情况也会使 IsValid 返回 false,这里只是列出了部分情况。 我们在使用的时候需要注意我们正在使用的反射对象会不会是无效的。

通过反射修改不可修改的值

对于 reflect.Value 对象,我们可以通过 CanSet 方法来判断它是否可以被设置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func TestReflect(t *testing.T) {
p := Person{Name: "foo"}

// 传递值来创建的发射对象,
// 不能修改其值,因为它是一个副本
v := reflect.ValueOf(p)
assert.False(t, v.CanSet())
assert.False(t, v.Field(0).CanSet())

// 下面这一行代码会 panic:
// panic: reflect: reflect.Value.SetString using unaddressable value
// v.Field(0).SetString("bar")

// 指针反射对象本身不能修改,
// 其指向的对象(也就是 v1.Elem())可以修改
v1 := reflect.ValueOf(&p)
assert.False(t, v1.CanSet())
assert.True(t, v1.Elem().CanSet())
}

CanSet 报告 v 的值是否可以更改。只有可寻址(addressable)且不是通过使用未导出的结构字段获得的值才能更改。 如果 CanSet 返回 false,调用 Set 或任何类型特定的 setter(例如 SetBoolSetInt)将 panicCanSet 的条件是可寻址。

对于传值创建的反射对象,我们无法通过反射对象来修改原变量,CanSet 方法返回 false例外的情况是,如果这个值中包含了指针,我们依然可以通过那个指针来修改其指向的对象。

只有通过 Elem 方法的返回值才能设置指针指向的对象。

在错误的 Value 上调用 Elem 方法

reflect.ValueElem() 返回 interface 的反射对象包含的值或指针反射对象指向的值。如果反射对象的 Kind 不是 reflect.Interfacereflect.Pointer,它会发生 panic。 如果反射对象为 nil,则返回零值。

我们知道,interface 类型实际上包含了类型和数据。而我们传递给 reflect.ValueOf 的参数就是 interface,所以在反射对象中也提供了方法来获取 interface 类型的类型和数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func TestReflect(t *testing.T) {
p := Person{Name: "foo"}

v := reflect.ValueOf(p)

// 下面这一行会报错:
// panic: reflect: call of reflect.Value.Elem on struct Value
// v.Elem()
fmt.Println(v.Type())

// v1 是 *Person 类型的反射对象,是一个指针
v1 := reflect.ValueOf(&p)
fmt.Println(v1.Elem(), v1.Type())
}

在上面的例子中,v 是一个 Person 类型的反射对象,它不是一个指针,所以我们不能通过 v.Elem() 来获取它指向的对象。 而 v1 是一个指针,所以我们可以通过 v1.Elem() 来获取它指向的对象。

调用了一个其类型不能调用的方法

这可能是最常见的一类错误了,因为在 go 的反射系统中,我们调用的一些方法又会返回一个相同类型的反射对象,但是这个新的反射对象可能是一个不同的类型了。同时返回的这个反射对象是否有效也是未知的。

在 go 中,反射有两大对象 reflect.Typereflect.Value,它们都存在一些方法只适用于某些特定的类型,也就是说, 在 go 的反射设计中,只分为了类型两大类。但是实际的 go 中的类型就有很多种,比如 intstringstructinterfaceslicemapchanfunc 等等。

我们先不说 reflect.Type,我们从 reflect.Value 的角度看看,将这么多类型的值都抽象为 reflect.Value 之后, 我们如何获取某些类型值特定的信息呢?比如获取结构体的某一个字段的值,或者调用某一个方法。 这个问题很好解决,需要获取结构体字段是吧,那给你提供一个 Field() 方法,需要调用方法吧,那给你提供一个 Call() 方法。

但是这样一来,有另外一个问题就是,如果我们的 reflect.Value 是从一个 int 类型的值创建的, 那么我们调用 Field() 方法就会发生 panic,因为 int 类型的值是没有 Field() 方法的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func TestReflect(t *testing.T) {
p := Person{Name: "foo"}
v := reflect.ValueOf(p)

// 获取反射对象的 Name 字段
assert.Equal(t, "foo", v.Field(0).String())

var i = 1
v1 := reflect.ValueOf(i)
assert.Panics(t, func() {
// 下面这一行会 panic:
// v1 没有 Field 方法
fmt.Println(v1.Field(0).String())
})
}

至于有哪些方法是某些类型特定的,可以参考一下下面两个文档:

总结

  • 在调用 Int()Float() 等方法时,需要确保反射对象的类型是正确的类型,否则会 panic,比如在一个 flaot 类型的反射对象上调用 Int() 方法就会 panic
  • 如果想修改原始的变量,创建 reflect.Value 时需要传入原始变量的指针。
  • 如果 reflect.ValueIsValid() 方法返回 false,那么它就是一个无效的反射对象,调用它的任何方法都会 panic,除了 String 方法。
  • 对于基于值创建的 reflect.Value,如果想要修改它的值,我们无法调用这个反射对象的 Set* 方法,因为修改一个变量的拷贝没有任何意义。
  • 同时,我们也无法通过 reflect.Value 去修改结构体中未导出的字段,即使我们创建 reflect.Value 时传入的是结构体的指针。
  • Elem() 只可以在指针或者 interface 类型的反射对象上调用,否则会 panic,它的作用是获取指针指向的对象的反射对象,又或者获取接口 data 的反射对象。
  • reflect.Valuereflect.Type 都有很多类型特定的方法,比如 Field()Call() 等,这些方法只能在某些类型的反射对象上调用,否则会 panic