0%

本文基于 go 1.19

哈希表作为一种非常常用的数据结构,存在于各种编程语言中,它可以让我们保存键值对数据,而且有着非常高的查找效率。 本文就以 go 语言中的 map 为例子,讲述一下哈希表在 go 中的实现。

哈希表基本操作

在开始之前,需要大概讲解一下哈希表这种数据结构,哈希表会预先在内存中分配一段比较大的内存,这段内存用在将来往里面写入数据的时候使用。 哈希表有点类似数组,都是一段连续的内存,但是我们往哈希表读写数据的时候不同于数组,数组的时候是直接通过下标访问的, 而哈希表的读写需要先计算 key 的哈希值,根据这个哈希值对哈希表长度取模得到 key 对应的哈希表的下标,然后对哈希表这个下标进行读写操作。

对于哈希表有以下几种常见的操作:

  1. 写入:根据 key 计算 hash 值,对哈希表长度取模得到 key 在内存中的地址,然后往这个地址写入数据。
  2. 读取:根据 key 计算 hash 值,对哈希表长度取模得到 key 在内存中的地址,然后读取这个地址中的数据。
  3. 修改:根据 key 计算 hash 值,对哈希表长度取模得到 key 在内存中的地址,然后修改这个内存地址里面的数据。
  4. 删除:根据 key 计算 hash 值,对哈希表长度取模得到 key 在内存中的地址,然后清空保存这个 key 的那一小块内存。

注意:计算出的 hash 值可能比分配的内存大小要大,所以才需要对其取模(hash 值 / 哈希表长度 => key 在哈希表的索引), 保证计算出的 hash 最终落到哈希表的内存范围之内。比如,keyn 计算出来的哈希值为 100,但是我们的哈希表长度只有 8, 那么 keyn 最终会落在哈希表中下标为 2 的地方(100 % 7 = 2,下标是从 0 开始的,所以这里是 7

哈希表的写入

假设我们现在要有一个长度为 8 的哈希表(下图左),我们有数据 {"a": 1, "b": 2} 需要存入这个哈希表,存入之后的哈希表为下图右:

哈希表的写入

说明:hash(a) = 5,计算 a 的哈希值得到 5,所以将 a:1 存入了哈希表中下标为 5 的地址处。b 同理。

注意:键值都会存储。

哈希表的读取

从哈希表中读取 keyb 的键值对:

哈希表的读取

哈希表的修改

现在,我们需要将 a 的值修改为 3,同样的,计算其 hash 值,得到 5,然后将哈希表中 5 这个下标的内存修改成 3

哈希表的修改

哈希表的删除

假设要将哈希表中 b 这个键删除,会先计算其 hash 值,得到 0,然后将哈希表中 0 这个下标的内存清空:

哈希表的删除

哈希表的高效之处

通过上面的分析,我们可以知道,哈希表的内存布局跟数组类似,但是哈希表的存储要通过计算 key 的哈希值来得到其在哈希表中的索引,最后对这个索引的内存进行 CRUD 操作。这样一来,如果我们需要频繁的根据键查找其对应值的话,使用哈希表无疑会大大提高效率。相比之下,如果使用数组来存储,那么每次搜索都需要将整个数组遍历一次,效率非常低下。

比如下图,假设我们一个数组内存布局如下,那么在我们查找 a:1 的时候,需要从数组的第一个元素开始遍历,每一个元素读取出来看看它的键是不是 a, 取到第 6 个元素(下标 5)的时候,发现它的键是 a,然后取出对应的值 1

哈希表的高效之处

在数组的元素个数少的时候,这种查找效率其实影响不大,但如果我们有上万个元素的时候,每次查找都要从第一个元素开始遍历,效率无疑会非常低。 相比之下,不管数据再怎么多,使用哈希表的方式,我们直接通过哈希算法计算一下就可以知道键保存到了哪个槽中。

哈希冲突解决方法

在上一节中,我们的图将哈希表的每一个槽(slot,又或者叫 cell,都是同一个东西)都表示成只有一个元素了。 但在实际中,往往会出现计算出来的哈希值对哈希表长度取模后是相等的,也就是不同的 key 会落到同一个槽中(这就是 哈希冲突),

这种情况下,一个槽存放不下的话,有两种办法可以处理:开放地址法链表法。go 里面的 map 使用的是链表法, 具体来说,就是在 hash 冲突的地方,建立一个链表来保存相同 hash 值的 key

这样一来,我们通过 hash 算法计算出哈希值的时候,并不能唯一确定对应的值了,因为有可能两个 key 经过哈希算法计算之后,得到的哈希值是一样的。 这种情况怎么办呢?很简单,因为虽然哈希值是一样的,但是它们的 key 是不一样的,再比较一下 key 就可以确定了。具体可以参考下图:

哈希冲突

有哈希冲突的情况下,读取哈希表数据的过程:

  • 计算 chash 值,得到 0,就是哈希表的索引
  • 获取哈希表中地址 0 上的数据,这会遍历冲突产生的链表
  • 比较 cb,不相等,继续比较链表下一个元素
  • 比较 cc,相等,返回 3

哈希扩容

go 里面 map 扩容有两种方式:增量扩容等量扩容

哈希表总元素个数过多导致的扩容

这种扩容方式也叫增量扩容

我们在上面说过了,其实哈希表的内存布局跟数组类似,都是先分配一段连续的内存。然后在哈希冲突的时候,对于冲突的 key 建立一个链表来保存。 这样就会出现一种情况,在链表中存在很多冲突的键,这样一来,在查找冲突 key 的时候,需要在这一堆冲突的 key 中进行查找,这个查找类似数组的查找,效率较低。

为了避免这种情况的出现,一般的哈希表设计会在元素个数总数超过一定数量的时候,对哈希表进行扩容, 这样一来,那些哈希冲突的键就可以相对均匀地分布在哈希表中,从而避免了很多哈希冲突情况下导致的查找效率低下的问题。

扩容之后的容量为原来容量的 2 倍。

增量扩容

go map 的负载因子

在 go 中,map 在实际存储的元素数量超过 mapbucket 总数量的 6.5 倍的时候(也就是平均每个 bucket 中的元素个数大于 6.5 的时候),会进行扩容, 这个 6.5 是实现 map 的那个开发者经过实验计算出来的比较合适的数,这个 6.5 被称为负载因子。

为什么负载因子是 6.5 呢,在 go 的 map 源码中有相关的说明:

选择负载因子,如果太大,会有很多溢出桶,太小,则会浪费很多空间。 我编写了一个简单的程序来检查不同负载的一些统计数据:(64位、8字节 key 和元素)

loadFactor %overflow bytes/entry hitprobe missprobe
4.00 2.13 20.77 3.00 4.00
4.50 4.05 17.30 3.25 4.50
5.00 6.85 14.77 3.50 5.00
5.50 10.55 12.94 3.75 5.50
6.00 15.27 11.67 4.00 6.00
6.50 20.90 10.79 4.25 6.50
7.00 27.14 10.15 4.50 7.00
7.50 34.03 9.73 4.75 7.50
8.00 41.10 9.40 5.00 8.00

列说明:

  • %overflow: 具有溢出桶的桶的百分比
  • bytes/entry: 每个 key/elem 对使用的开销字节
  • hitprobe: 查找当前 key 时要检查的条目数
  • missprobe: 查找缺少的 key 时要检查的条目数

哈希冲突链上元素太少导致的扩容

这种扩容方式也叫等量扩容

我们知道,在哈希冲突的时候,会建立链表来保存键冲突的元素,但是我们删除那些哈希冲突的键的时候,并不会对删除元素的内存进行释放, 如果每次删除都释放的话,在我们频繁插入跟删除的时候,效率就非常低下了。 因为插入一个元素就分配内存,删除一个元素就释放内存(分配内存和释放内存都是相对耗时的操作)。 而这样的结果是,保存冲突键的链表上,有很多空的元素,这样就会导致冲突的时候,查找键的效率降低,因为要遍历很多空的键。

删除的时候不释放,那什么时候会释放呢?哈希冲突的元素很多都被删除的时候,在 go 里面,map 会判断就算没有超过负载因子的情况下, 如果冲突链表占用的空间过大的话,也会进行扩容。但这里说的扩容其实并不是真正意义上的扩容,只是 map 的实现里面,使用的函数是同一个函数。

具体实现方式是,分配跟原哈希表相同大小的空间,然后将旧哈希表的数据迁移到新的哈希表。 这样迁移之后,对于哈希冲突链表上的那些元素,只会迁移非空的元素,最终结果就是,扩容之后,哈希冲突链表上的元素更加紧凑,在查找冲突的键的时候会更加高效。

虽然哈希表的总容量没变,但是数据分布更加紧凑了,省去了遍历空元素的时间。

等量扩容

go map 概述

  • map 只是一个哈希表。数据被排列成一组 bucket
  • 每个 bucket 最多包含 8键/值 对。
  • 哈希值的低位字节位用于选择 bucket
  • 每个 bucket 包含每个哈希的几个高位字节位(tophash),以区分单个桶中的条目。
  • 如果超过 8key 哈希到同一个桶,我们将额外的桶以链表的方式起来。(解决哈希冲突,链表法)
  • 当哈希表扩容时,我们会分配一个两倍大的新 bucket 数组。然后 bucket 从旧 bucket 数组增量复制到新 bucket 数组。
  • map 迭代器遍历 bucket 数组,并按遍历顺序返回键(遍历完普通桶之后,遍历溢出桶)。
  • 为了保持迭代语义,我们永远不会在它们的桶中移动键(bucket 内键的顺序在扩容的时候不变。如果改变了桶内键的相对顺序,键可能会返回 0 或 2 次)。
  • 在扩容哈希表时,迭代器仍在旧的桶中迭代,并且必须检查新桶,检查正在迭代的 bucket 是否已经被迁移到新桶。

go map 的整体模型

上面讲了哈希表的基本设计思路,接下来就要开始讲 go 里面 map 的设计与实现了。大体上其实就是上面说的样子,但是有下面几个不一样的地方:

下文的 bucketbmap 都是指的 "桶"。

  • go map 里面存储数据的地方是 bucket(桶),一个 bucket 可以存储 8 个元素,也就是说哈希冲突的时候还是会在同一个 bucket 中先存储。
  • 如果 bucket 也存放不下冲突的元素了,那么会新建另外一个桶(叫做溢出桶),旧的 bucket 记录这个新桶的指针,旧的 bucket 存放不下的元素,会存放到这个溢出桶中。
  • 如果溢出桶还是存放不下,那么再新建一个溢出桶,链接到上一个溢出桶中。

也就是说,在 go 的 map 实现中,哈希计算出来的值决定了 key 应该存放在哪一个 bucket 中。

go map 的整体结构如下图:

go map 整体模型
  • buckets 记录了保存 map 数据的所有 bucket(这种下文统一称为普通桶),go 中使用 bmap 这个结构体来表示 bucket,溢出桶也是使用 bmap 结构体表示。
  • 如果 bucket(普通桶)哈希冲突太多导致存放不下,会新建一个 bucket,在原来的 bucket 上会有一个指针记录新建 bucket 的地址,这个新 bucket 下文统一称为溢出桶
  • 在创建 map 的时候,如果我们指定的容量比较大(B >= 4 的时候),那么会预创建一个溢出桶。

也就是说,go 中解决哈希冲突的链表法,链表上的每一个元素是一个 bucket。go map 的实现里面,一个 bucket 可以存放 8 个键值对

上面的 bmap 的数据结构如下图:

bmap
  • bmap 就是 bucket(桶),不管是普通的桶还是溢出的桶,都是使用 bmap 结构体表示。
  • bmap 中存储数据的方式有点特别,它先存储了 8tophash 值,一个 tophash 的大小为 1 个字节,每一个 tophash 记录的是 bmap 中每一个元素的哈希值的最高的 8 bit。
  • 接下来是 bmap 存储的 8 个元素的 key,在 8 个 key 之后是 8 个 bmap 存储的值。我们会发现 keyvalue 的存储是分开的,而不是 key/valuekey/value 这种方式。go 中这种分开存储的方式有一个好处是可以减少内存对齐的开销,从而更省内存。
  • 最后是 overflow(溢出桶),如果 bmap 存满了,那就会新建一个溢出桶来保存新的数据,通过在旧的 bmap 上记录指针来记录溢出桶。

tophash 的作用是,在哈希冲突的时候,在 bucket 内进行查找的时候,是需要在 bucket 内从第一个元素遍历到最后一个元素来查找的。 如果 key 太大,直接比较 key 的话效率会比较低下,通过记录哈希值的高 8 位,我们就可以在 buckeet 内查找的时候,先比较哈希值的 前 8 位,这样一来,map 的效率受到 key 大小的影响就会比较小。当然哈希值的高 8 位有可能相同,在这种情况下,我们再比较一下 key 本身 就可以确定 bucket 的那个槽(slot/cell)是否是我们正在查找的那一个 key

go map 相关数据结构

我们大部分内容是跟下面两个结构体打交道:

  • hmap: map 的数据结构,包含了 bucket 的指针、bucket 的数量、键值对的数量等信息。
  • bmap: 桶,存储 key/value 的地方
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// go map 数据结构
type hmap struct {
count int // map 的元素数量。调用 len(map) 的时候返回此值
flags uint8 // map 标记:iterator/oldIterator/hashWriting/sameSizeGrow
B uint8 // 指示了当前哈希表持有的 buckets 的数量(2^B 是 bucket 的数量)
noverflow uint16 // 溢出桶的数量
hash0 uint32 // 哈希种子,计算 key 的哈希的时候会传入哈希函数
buckets unsafe.Pointer // 指向 buckets 的数组,大小为 2^B。 如果元素个数为 0 则为 nil。

// 哈希表扩容的时候记录 buckets 字段。
// 增量扩容时,oldbuckets 的长度是 buckets 的一半。
oldbuckets unsafe.Pointer

nevacuate uintptr // 指示扩容进度,小于此地址的 buckets 完成迁移
extra *mapextra // 可选字段
}

// mapextra 包含并非在所有 map 上都存在的字段。
// 下面 mapextra 注释是原文的翻译(看完了 map 的全部源码也还不是很懂这个结构体的作用,除了 nextOverflow 字段)。
type mapextra struct {
// 如果 key 和 elem 都不包含指针并且是内联的,那么我们将 bucket type 标记为不包含指针。这避免了扫描此类 map。
// 但是,bmap.overflow 是一个指针。 为了让溢出桶保持活动状态,
// 我们将指向所有溢出桶的指针存储在 hmap.extra.overflow 和 hmap.extra.oldoverflow 中。
// overflow 和 oldoverflow 仅在 key 和 elem 不包含指针时使用。
// overflow 包含 hmap.buckets 的溢出桶。
// oldoverflow 包含 hmap.oldbuckets 的溢出桶。
// 间接允许在 hiter 中存储指向切片的指针。
overflow *[]*bmap
oldoverflow *[]*bmap

// nextOverflow 持有指向空闲溢出桶的指针。
nextOverflow *bmap
}

// go map 中的 bucket 结构体(实际保存 key/value 的地方)
type bmap struct {
// tophash 通常包含此 bucket 中每个键的哈希值的最高的 8 位(1 字节)。
// 如果 tophash[0] < minTopHash,则 tophash[0] 为桶已迁移状态。
// 这是一个长度为 8 的数组,因为一个 bucket 只能存储 8 个元素。
// tophash 存储的是每一个元素的键的哈希的高 8 位。
//(通过比较不同键的哈希的高 8 位可以提高 bucket 内的查找性能,因为键可能很大)
tophash [bucketCnt]uint8

// 然后是 8 个键,然后是 8 个值。(这里的 8 是代码写死的)
// 注意:将所有键放在在一起,然后将所有值放在一起使代码比交替的 key/elem/key/elem/… 复杂一些,
// 但它允许我们消除填充(减少内存对齐导致的内存浪费),例如 map[int64]int8,
// 这种如果使用 key/elem 的方式存储则需要浪费几个字节用来对齐。
keys [bucketCnt]keytype // 8 个键
values [bucketCnt]valuetype // 8 个值
overflow *bmap // 指向溢出桶的指针
}

对于 map 的数据结构,需要特别说明的是,bmap 的源码中实际只包含了 tophash 字段,而后面的三个字段 keys/values/overflow 都是在编译期间动态添加的。这是因为 map 中可能存储不同类型的键值对,所以键值对占据的内存空间大小只能在编译时进行推导。 这样一来,最终的结果是,我们在 map 的源码中,访问 keyvalue 的时候都需要通过 bmap 的首地址加上偏移量来进行访问的。

比如获取 bucket 中第 ikey 的方式:

1
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))

这行代码中,add 是做指针加法运算的函数,具体来说就是第一个参数的地址加上第二个参数(偏移量),得到一个我们想要的指针。 dataOffset 代表了 bmapkeys 第一个元素的偏移量,i 代表了我们想要获取的 keykeys 中的索引:

bmap 结构

这样一来,我们就可以通过 k 这个指针来访问 bucket 中的 key 了。同样的,要访问 value 的方式也是类似的, 只要将 dataOffset + i * uintptr(t.keysize) 替换成 dataOffset + bucketCnt * uintptr(t.keysize) 即可。

这种方式虽然不太优雅,但是在性能上可以达到最优。

bmap(桶)源码剖析

bmap 就是保存键值对的地方,但是它本身的方法并不多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// bucket 是否已经完成迁移
// b 是 bucket 的指针
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > emptyOne && h < minTopHash
}

// 获取 b 的溢出桶
func (b *bmap) overflow(t *maptype) *bmap {
// bmap 数据结构的最后一个指针就是指向溢出桶的指针
return *(**bmap)(add(unsafe.Pointer(b), uintptr(t.bucketsize)-goarch.PtrSize))
}

// 设置 b 的溢出桶
// bmap 数据结构的最后一个指针指向溢出桶
func (b *bmap) setoverflow(t *maptype, ovf *bmap) {
*(**bmap)(add(unsafe.Pointer(b), uintptr(t.bucketsize)-goarch.PtrSize)) = ovf
}

// 获取 b 中保存 keys 的指针(指向了桶内的第一个 key)
func (b *bmap) keys() unsafe.Pointer {
return add(unsafe.Pointer(b), dataOffset)
}

ev 中用到了两个常量,在 buckettophash 里面,会通过下面几个标志来记录桶里面槽的状态:

1
2
3
4
5
6
7
8
9
10
11
12
// 这个 cell 是空的,并且在更高的索引或溢出处没有更多的非空 cell。
emptyRest = 0
// 这个 cell 是空的
emptyOne = 1
// key/elem 有效。 entry 已被迁移到较大的哈希表的前半部分(扩容了)。
evacuatedX = 2
// 同上,但迁移到大的哈希表的后半部分(扩容了)。
evacuatedY = 3
// cell 是空的,bucket 已经被迁移了
evacuatedEmpty = 4
// 正常填充单元格的最小 tophash。
minTopHash = 5

为了跟正常的 tophash 区分开来,如果计算出来的 tophash 小于 minTopHash,会将计算出来的 tophash 加上 minTopHash

1
2
3
4
5
6
7
8
9
10
// tophash 计算 hash 的 tophash 值。
// 这是一个字节的大小的。(hash 最高的 8 位)
func tophash(hash uintptr) uint8 {
// top 本质上就是 hash 的前面 8 个字节(goarch.PtrSize*8 - 8,左移位数:指针的字节大小 - 8 字节)
top := uint8(hash >> (goarch.PtrSize*8 - 8))
if top < minTopHash {
top += minTopHash
}
return top
}

这样一来,通过 tophash 这一个字节就可以记录桶里面槽的状态了,非常节省空间。

map 的创建的实现

我们已经了解了哈希表的基本工作机制了,现在就让我们来深入了解一下 go 里 map 的实现,先是 map 的创建, map 的创建是通过 makemap() 函数实现的(对应我们写的代码是 make(map[int]int, 10)),map 的创建步骤如下:

  1. 计算 map 所需内存,判断是否在一个合理范围之内。
  2. 使用 new 初始化 hmap 结构体。
  3. 生成随机哈希种子。
  4. 计算出一个最小的 B,也就是根据用户传递给 make 的第二个参数算出一个最小的 B 的值,最终桶的数量为 2^B 个。
  5. 如果 B 大于 0,则给哈希表的 buckets 分配内存。
  6. 最后,返回新创建好的 hmap

下文在寻址过程中,大量使用了指针的运算,所以如果对 unsafe.Pointer 比较熟悉的话,看起来会比较轻松,如果不熟悉也没关系,可以看看我另外一篇文章《深入理解 go unsafe》。

makemap 实现

makemap 具体源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// makemap 是 make(map[k]v, hint) 的实现,创建一个 map。
// 如果编译器可以确定 map 或者第一个 bucket 可以在栈上创建,h 和/或 bucket 可能为 non-nil。
// 如果 h != nil,map 可以直接在 h 中创建。
// 如果 h.buckets != nil,buckets 指针指向的那个元素可以作为第一个 bucket。
func makemap(t *maptype, hint int, h *hmap) *hmap {
// 计算所需要的内存大小
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
// 如果溢出或者超出最大分配内存,则设置 hint = 0;
// 这样的话,B 也会等于 0;
// 则最终的 map 只会有一个 bucket(2^B = 1)
if overflow || mem > maxAlloc {
hint = 0
}

// 初始化 hmap(分配 hmap 结构体本身所需要的内存)
if h == nil {
h = new(hmap)
}
// 生成一个随机的哈希种子
h.hash0 = fastrand()

// 根据传入的 hint,计算出需要的最小桶数量。
// 实际上是计算 B 的大小,桶的数量都是运行时通过 2^B 计算的。
B := uint8(0)
// 如果 hint 导致超过了负载因子,则将 B 加 1,一直加到小于负载因子。
// 简单来说就是:hint / (2^B) > 负载因子,也就是 hint 个键值对放到所有桶中,
// 每个桶中元素数量大于负载因子(6.5)的时候,则将 B 加 1。
// 注:map 扩容的时候也是这个判断标准。
for overLoadFactor(hint, B) {
B++
}
h.B = B

// 分配初始哈希表。
// 如果 B==0,则稍后(在mapassign中)延迟分配桶字段。
// 若 hint 较大,则清零此内存可能需要一段时间。
if h.B != 0 {
var nextOverflow *bmap
// 初始化 buckets(分配 buckets 所需要的内存)
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil)
// 如果 hint 比较大,则会预先分配溢出桶,记录到 extra 字段中。
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}

return h
}

overLoadFactor 实现

这里面有一个比较重要的函数,那就是 overLoadFactor,这个函数用来判断某一个数量是否超过 map 的负载因子,如果超过,那就需要扩容了:

1
2
3
4
// overLoadFactor 报告放置在 2^B 个桶中的键值对数量是否超过负载因子。
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
  • count > bucketCnt,前半部分判断很简单,就是判断数量一个桶能不能放得下。一个桶就能装下所有数据的话,根本就不用计算了,肯定没超过负载因子。
  • 后半部分判断翻译过来是:桶数量 * 负载因子(6.5) < 总键值对数量,意味着平均每个桶存储的元素个数大于 6.5 了,也就是说超过了负载因子了。
  • 在其他函数中,判断是否超过负载因子的时候都是使用上面这个函数
  • loadFactorNumloadFactorDen 是预定义的变量,它们相除就是负载因子 6.5
  • bucketShift(B) 很简单,就是 2^B

makeBucketArray 实现

在创建 map 的时候,使用了 makeBucketArray 来给 map 的桶分配内存,makeBucketArray 实现步骤如下:

  1. 如果判断到 B >= 4,也就是初始化时需要分配的桶数量大于等于 2^4 = 16 的时候,则会预先分配溢出桶,分配的溢出桶个数为 2^(b-4) 个。
  2. 接着,给 map 的桶(buckets 字段)分配内存(包含了普通桶和溢出桶,普通桶和溢出桶的内存一次性分配,溢出桶的内存在普通桶后面)。
  3. 最后,判断到需要分配溢出桶的话(B >= 4),则将溢出桶的指针写入到最后一个普通桶的 overflow 字段。

分配 buckets 内存的两种情况:

  1. 如果 B < 4,那么分配内存的过程很简单,就是分配 buckets 所需要的内存,也就是分配普通桶所需要的内存就足够了,如下图:
makeBucketArray 1
  1. 如果 B >= 4,那么分配内存的过程就相对复杂,会预先分配一部分溢出桶。在后面需要创建溢出桶的时候,就会先使用这时候创建的溢出桶,而不是直接新建,如下图:
makeBucketArray 2

说明:

  • 分配 buckets 所需要的内存的时候,会分配一部分溢出桶所需要的内存,普通桶和溢出桶的内存是连续的,分配给溢出桶的内存就在普通桶的后面。
  • makemap 中会新建 mapextra 结构体,用 nextOverflow 字段来保存溢出桶的指针,指向第一个溢出桶的位置。
  • 最后一个溢出桶的 overflow 指针(指向溢出桶的指针),指向了 buckets 入口,这里并不是说将第一个普通桶作为最后一个溢出桶的溢出桶,而是一个标记作用。因为前面的溢出桶的 overflow 字段都是 nil,而最后一个溢出桶的 overflow 不是 nil,这样一来,我们通过判断溢出桶的 overflow 是否为 nil 就可以知道是否是最后一个溢出桶。如果是最后一个溢出桶,那么将 map 里面的 extra.nextOverflow 字段设置为 nil,表示预分配的溢出桶用完了,后面如果再需要溢出桶的时候,就只能直接 new 一个了。
  • buckets 指针下面的普通桶和溢出桶所需要的内存大小都是 t.bucketsize,也就是 bmap 所需要的内存大小(当然是内存对齐之后的)。

具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// makeBucketArray 初始化 map bucket 底层的数组(分配 buckets 的内存)。
// 1<<b 是要分配的最小存储桶数。
// dirtyalloc 应该是 nil(不为 nil,表示清空 map),或者是 makeBucketArray 之前使用相同的 t 和 b 参数分配的桶数组。
// 如果 dirtyalloc 为 nil,将分配一段新的内存;否则将清除 dirtyalloc 指向的内存,将其作为新分配的内存。
//
// 参数:
// t:底层表示 map 的类型
// b:bucket 的大小为 2^b
// dirtyalloc: 不为 nil 表示要清空,用于 mapclear 函数。
//
// 返回值:
// buckets:正常桶数组入口
// nextOverflow:溢出桶数组入口
func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) {
// bucket 数量 = 1 << b
base := bucketShift(b)
nbuckets := base
// 对于小的 b,溢出桶不太可能。避免计算开销。
// 桶的数量小于 2^4 时候,由于数据较少、使用溢出桶的可能性较低,
// 会省略创建溢出桶的过程以减少额外开销。
//
// 但是大于等于 2^4 的时候,使用到溢出桶的可能性就会比较大,所以需要预先分配溢出桶。
if b >= 4 {
// 大于等于 2^4 的时候,额外创建 2^(B-4) 个溢出桶。
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets // 溢出桶所需要的内存大小
up := roundupsize(sz) // 计算需要的内存大小
if up != sz {
// 分配的内存与实际需要的内存不一样,
// 可能会比 sz 大一点,重新计算 nbuckets。
// 下面的 nbuckets 才是最终的 bucket 数量(普通桶 + 溢出桶的数量)
nbuckets = up / t.bucket.size
}
}

// buckets 所在的内存初始化/清空(mapclear)
if dirtyalloc == nil {
// 创建新的 bucket 数组
buckets = newarray(t.bucket, int(nbuckets))
} else {
// 清空原有的内存
buckets = dirtyalloc
// buckets 所需要的总内存大小(单位:字节)
size := t.bucket.size * nbuckets
// 清空 buckets 开始的 size 字节大小的内存
if t.bucket.ptrdata != 0 {
// 有指针
memclrHasPointers(buckets, size)
} else {
memclrNoHeapPointers(buckets, size)
}
}

// 上面 b >= 4 的情况
if base != nbuckets {
// 处理一下预先分配的溢出桶。
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize))) // buckets 和溢出桶内存是相邻的,计算第一个溢出桶的指针
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize))) // 最后一个溢出桶
last.setoverflow(t, (*bmap)(buckets)) // 最后一个溢出桶的 overflow 指针链接到第一个普通桶
}

// 返回普通桶、溢出桶的指针
return buckets, nextOverflow
}

map 定位 key 的实现

我们从上面的讲解中应该很清楚 mapbucket 这个数据结构了(上面的 bmap 那个图),在做查找、修改、删除操作的时候, 都需要先根据 key 找到具体的键值对保存在哪一个 bucket 以及 bucket 中的哪一个位置,所以这个操作其实是非常关键的。 在开始下文之前,就先来讲讲 map 中是如何定位一个 key 的。

其实定位的过程比较简单,假设现在要查找一个 key,那定位 key 的大概步骤如下:

  1. 计算 hash: 根据 key 计算出其哈希值 hash
  2. 计算 bucket: 哈希值对 buckets 长度取模(hash % len(buckets)),不过实际实现的时候使用了一种优化的方式,位运算(hash & (2^B - 1)),也就是由哈希值的最低 B 位来决定 key 最终使用哪一个 bucket(结果跟直接取模不一样,但是思想一样,都能保证得出的结果落在 len(buckets) 范围内)。
  3. 遍历 bucket(先是普通桶)里面的每一个槽(有 8 个),比较哈希值的最高 8 位(8 bit,也就是 tophash)是否相等,如果相等,则获取存储在 bucket 里面的 key,跟我们需要定位的 key 做比较,如果相等,则说明已经找到了 key,如果 key 不相等,则继续遍历下一个槽,直到 bucket 中所有的槽都被遍历完毕。
  4. 如果 bucket 里面的 8 个槽都遍历完了,仍然没有找到我们需要找的 key。那么会从 bucket 的溢出桶去查找,溢出桶内的查找过程跟普通桶内的查找过程是一样的。
  5. 如此遍历,直到所有溢出桶都遍历完(在找不到的情况下才会遍历 bucket (普通桶)所有的溢出桶)。

这个查找过程可以表示为下图:

key 定位

注意:确定一个 key 需要 tophashkey 都相等,如果 tophash 相等而 key 不相等,则需要继续比较 bucket 中其他的槽。

map 读取数据的实现

map 中读取某一个键的方法主要有三个:mapaccess1mapaccess2mapaccessK,这三个方法的代码其实是大同小异的,所以这里只拿 mapaccessK 来讲解。

这三个方法的不同之处在于:

  • mapaccess1 只返回 key 对应的值,对应 v := map["k"] 这种写法。
  • mapaccess2 返回 key 对应的值,以及是否存在的标志,对应 v, ok := map["k"] 这种写法。
  • mapaccessK 用于遍历 map 的时候,返回 keyvalue,对应 for k, v := range map {} 这种写法(在迭代的时候在 mapiternext 里面调用)。

mapaccessK 的查找步骤

mapaccessK 的查找步骤大概就是上一个图说的,只不过下面的描述更加详细一点(em... 有点重复了):

  • 判断 map 是否为空,为空直接返回
  • 计算 key 对应的哈希值 => hash
  • 计算桶的掩码 => mhash & m 得到 bucket 的索引
  • 判断是否正在扩容,如果是,需要判断 key 对应的桶的数据是否已经被迁移到新的桶里面:
    • 如果是,则需要从新的桶里面查找。
    • 如果还没有被迁移,则需要从旧桶中读取。
  • 计算 tophash => top(也就是 hash 的最高 8 位)
  • 遍历找到的桶的每一个槽(slot/cell
    • 比较 tophash 是否相等
    • 如果不等,判断桶后面是否都没有数据了(b.tophash[i] == emptyRest
    • 如果没有数据了,跳出循环 => 找不到 key 对应的值
    • 如果还有数据,则继续遍历下一个槽
  • 如果找到一个槽的 tophash 跟上面计算的 tophash 相等,则比较 key 是否相等
    • 是,则返回对应的值(tophashkey 都相等,则表明找到了相应的 key,返回对应的值)。
    • 否,继续遍历下一个槽(依然是先比较 tophashtophash 相等则再比较 key 是否相等)
  • 溢出桶也找不到,则返回零值(及是否找到的标志)。

mapaccessK 的实现

对于整型的键值的 map,go 里面有针对优化的实现,但其实代码逻辑上都是差不多的,所以不细说了。下面来看看 mapaccessK 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// 在迭代 map 的时候,返回键值对。
// 参数:
// t: map 类型元信息(记录了 map 中的 key/value 的类型等信息,比如 key 的大小,可用于计算内存偏移)
// h: map 结构体类型(也就是实际的哈希表类型)
// key: 需要查找的 key
// 返回值:
// 第一个返回值:当前遍历到的 key 的指针
// 第二个返回值:当前遍历到的 key 对应的值的指针
func mapaccessK(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, unsafe.Pointer) {
// map 是空的
if h == nil || h.count == 0 {
return nil, nil
}
// 根据 key 和 hash0 计算 hash
hash := t.hasher(key, uintptr(h.hash0))
// hash 的掩码,类似 IP 的掩码
//(比如,假设 B = 3,一共有 8 个元素,索引为 0~7,那么掩码 m 表示为二进制就是 111)。
// 用于跟 hash 值做 & 运算(hash & m),得到 hash 对应 bucket 的索引(0~m)
m := bucketMask(h.B)
// 根据 hash 计算 bucket 地址,b 是 hash 匹配到的 bucket
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
// 正在扩容,如果 bucket 还没迁移到新的地址,则需要从 oldbuckets 中访问
if c := h.oldbuckets; c != nil {
// 不是等量扩容,需要从旧桶中读取,所以 m 要移除最高位(右移一位)
if !h.sameSizeGrow() {
// 不是等量扩容,则将 m 除以 2,因为是 2 倍扩容,
// 所以 buckets 的大小为 oldbuckets 长度的 2 倍,
// 除以 2 才是旧的桶数量
m >>= 1
}
// key 在 oldbuckets 中的地址
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) {
// b 尚未迁移到新的 buckets 中,还在 oldbuckets 中
// 则需要从旧桶中查找
b = oldb
}
}
// 计算 tophash
top := tophash(hash)
bucketloop:
for ; b != nil; b = b.overflow(t) {
// 从 bucket 中查找,一个 bucket 可以存储的元素个数是 bucketCnt,也就是 8
for i := uintptr(0); i < bucketCnt; i++ {
// tophash 不匹配,肯定不是这个槽
if b.tophash[i] != top {
// bucket 剩余的槽是空的,不用再找了,跳出循环
if b.tophash[i] == emptyRest {
break bucketloop
}
// 继续比较下一个槽
continue
}
// tophash 匹配
// 接下来将这个槽的 key 取出来
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// key 是指针类型
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 比较 key 跟 k 是否相等
if t.key.equal(key, k) {
// 相等则读取对应的值(表示找到了匹配的 key 了)
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
// 值是指针类型
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
// 找到了,返回键值对
return k, e
}

// 注意:执行到这里,说明 tophash 相等,
// 但是 key 不匹配,仍然需要继续遍历。
}
}
// 普通桶和溢出桶的所有槽都找不到,返回 nil
return nil, nil
}

这里需要注意的是,如果在扩容的过程中查找,会先判断数据是否已经被迁移到新桶,如果还没有被迁移,则需要从旧的桶中查找。

mapaccessK 关键代码

  1. bucket 的定位代码
1
2
3
m := bucketMask(h.B)
// b 就是定位到的 bucket 所在的内存地址
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))

bucket 索引(hash & m)的定位方式如下:

bucket 定位1

因为 buckets 是指向 bmap 的指针数组,所以我们可以通过 buckets 加上 bucket 的索引,就可以定位到 bucket 的内存地址。 因为每一个 bmap 的大小是 t.bucketsize,所以 bucket 的索引乘以 t.bucketsize,也即 (hash&m)*uintptr(t.bucketsize),就是 bucket 的相对偏移量。 然后 buckets 的地址加上 bucket 的相对偏移量,就可以定位到 bucket 的内存地址。

bucket 定位2

我们需要注意的是,我们计算得到 bucket 的指针后,需要将其转换为 bmap 类型的指针,才能进行后续的操作。

然后 keyvalue 也是通过类似的指针运算来定位的。需要说明的是:

  • add 函数是做指针算术运算的函数,具体来说就是 add(a, b) 就是 a 地址加上 b 的偏移量,返回的是 unsafe.Pointer 类型的指针。
  • unsafe.Pointer(b)bucket 的内存地址
  • dataOffsetbucket 中第一个 key 的地址偏移量,
  • t.keysizekey 的大小
  • t.elemsizemap 值的大小
  • bucketCntbucket 中槽的数量(8 个,预定义的常量)。

然后大家可以结合上面的 bmap 内存布局图来理解上面指针计算的代码。

1
2
3
4
// 读取 bucket 中的第 i 个 key。
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// 读取 bucket 中的第 i 个 value
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
  1. b.tophash[i] == emptyRest 判断的理解
emptyRest

emptyRest 是一个比较特殊的标记,它表示 bucket 中的后续槽都是空的。 在 map 删除元素的时候,会判断后面还有没有元素,如果没有元素的话,就会将 b.tophash[i] 设置为 emptyRest。 这样在查找的时候,就可以通过 b.tophash[i] == emptyRest 来判断后面的槽都是空的,就不需要继续遍历了。

  1. indirectkeyindirectelem 的理解

我们发现上面读取 keyvalue 的时候有一个比较特别的操作:

1
2
3
4
5
6
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}

相信不少读者第一次看到这几行代码的时候会跟我一样有点懵逼,从 maptype 的定义我们可以看出一点端倪:

1
2
3
4
5
6
func (mt *maptype) indirectkey() bool { // store ptr to key instead of key itself
return mt.flags&1 != 0 // 存储了 key 的指针,而不是 key 本身
}
func (mt *maptype) indirectelem() bool { // store ptr to elem instead of elem itself
return mt.flags&2 != 0 // 存储了 elem 的指针,而不是 elem 本身
}

简单来说,就是 go 底层在有时候会将 keyvalue 保存为指针,而不是直接保存 keyvalue 本身。 这样一来,go 里面的 map 操作就需要根据 keyvalue 的类型来判断是否需要进行指针解引用,也就是取出实际的 keyvalue

map 写入和修改的设计与实现

在 go 中,map 的写入和修改都是通过 mapassign 函数来实现的,因为写入和修改本质上是同一个操作,都是找到对应的 key,然后修改对应的值。

mapassign 的实现步骤

  1. 计算 keyhash 值。
  2. 如果 buckets 还没有初始化,则进行分配内存。
  3. 计算出 bucket 的索引。
  4. 如果正在扩容,则迁移当前即将要操作的 bucket(也就是上一步计算出来的索引对应的 bucket)。
  5. 计算 tophash
  6. 遍历 bucket 中的槽,记录下第一个空的槽的 tophash 索引指针、key 指针,value 指针。如果最后找不到 key 的时候,会插入到这里。
  7. 如果 bucket 的所有桶都找不到 key,则判断是否需要扩容,需要的话就进行扩容,然后再执行 6 的操作。
  8. 另外一种情况,不需要扩容,而且 bucket 以及它的溢出桶也满了,则需要新建溢出桶来保存 key
  9. 最后,将 tophash/key/value 插入到需要 bucket 第一个空的槽。又或者如果已经存在,对 value 进行更新。

mapassign 图解

可以分两种情况:

  1. 普通桶和溢出桶都找不到 key 的情况下,将 key 插入桶中第一个空的槽
map_5_1
  1. bucket 中找到了 key,则会对其进行更新
map_5_2

对于 keyvalue 的存储,有以下两种情况:

  1. 直接保存在 bucket 中:

这样在我们需要修改 key/value 的时候,通过 bucket 加上 索引 * keysize/valuesize 就可以得到对应键值存储的实际内存。

map_5_4
  1. bucket 中保存的是 key/value 的内存地址(unsafe.Pointer)类型

这样如果我们需要修改/读取实际的键值的时候,就需要先从 bucket 中获取这个指针,然后解引用得到实际存储键值的内存指针。

map_5_3

注意:在我们做如下运算的时候(假设 bucket 没有存储实际的 key,而是存储了 key 的指针):

1
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))

得到的结果是,指向 keys[i](也就是第 ikey)的指针(unsafe.Pointer 类型)。 如果 key 保存在 bucket 中,通过这个指针我们就可以读写 key 了。 否则,表示这个指针指向的内存存储的只是一个指针,如果我们需要修改实际的 key, 就需要通过 key 指针(A)拿到这里存储的指针(B),然后再通过 B 来修改实际的 key

map_5_5

value 的读写同理。

mapassign 源码剖析

扩容的操作后面会有解析,这一节就先不细说了。

这个函数的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// 功能:插入 key 或者修改 map 中的 key
// 参数:
// t:map 类型元信息
// h:map 结构体(实际保存键值对的地方)
// key:我们要修改或者插入的 key
// 返回值:
// 只有一个,那就是我们插入或者修改之后的值。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
// 并发写,直接报错
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}
// 计算 key 的哈希值(是一个无符号整数)
hash := t.hasher(key, uintptr(h.hash0))

// 在调用 t.hasher 之后设置 hashWriting,
// 因为 t.hasher 可能会 panic,在这种情况下,我们实际上还没有进行写操作。
// 写标记。(如果读操作发现有写标志则会报错)
h.flags ^= hashWriting

// 如果 buckets 是空,则新建一个 bucket
if h.buckets == nil {
h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}

again:
// bucket 指示第几个 bucket(命名貌似有点不合适)
bucket := hash & bucketMask(h.B)
// 正在扩容的话,则将 bucket 迁移到新桶
if h.growing() {
growWork(t, h, bucket)
}
// b 为即将要写入的 bucket
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
// 计算 key 的 tophash
top := tophash(hash)

var inserti *uint8 // 要插入的 tophash 地址
var insertk unsafe.Pointer // 要插入的键地址
var elem unsafe.Pointer // 要插入的值地址
bucketloop:
for {
// 循环 bucket 的槽
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
// tophash 不匹配,并且当前槽为空,则记录要插入的位置(找不到 key 的时候,最后会插入到这里)
if isEmpty(b.tophash[i]) && inserti == nil {
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
// 为什么找到了可以插入的地方,不中断循环?
// 原因是,这个函数是寻找已经存在的 key 的(插入和修改都是用这个函数),
// 如果对应的 key 保存到了后面的槽里面的话,这里直接中断循环就是不对的。
// 因为在这种情况下,理应更新后面的那个槽。
}
// bucket 没有找到对应的 key,同时 bucket 中剩余的槽都是空的。
// (这意味着 map 中找不到 key,需要插入这个 key 了)
// 中止对 bucket 里面槽的遍历。
if b.tophash[i] == emptyRest {
break bucketloop
}
// 虽然找到了空闲的槽,但还是要查看 bucket 中的其他槽,看 key 是否已经存在。
// (如果存在的话,修改 key 对应的值就可以了,当然这个函数里面不会修改,而是返回值的地址,从函数外部修改)
// 这个很好理解,保存值的指针都拿到了,想修改就很简单了。
// 如果 key 已经存在,则返回已存在 key 的对应的 elem 的地址。
continue
}

// tophash 相等,依然需要比较 key 是否相等。
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 但是 key 不想等,继续比较下一个槽
if !t.key.equal(key, k) {
continue
}
// tophash 相等、key 也相等,说明已经存在,更新它即可
// 用 key 更新 k
if t.needkeyupdate() {
typedmemmove(t.key, k, key)
}
// 计算 value 所在的地址
elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
// 找到了 key,直接跳转到 done
goto done
}
// 当前的桶里面所有槽都找不到。
// 继续遍历溢出桶(在溢出桶中查找 key 是否存在/有没有空余的槽)
ovf := b.overflow(t)
if ovf == nil {
// 溢出桶也找不到,跳出循环
break
}
// b 指向下一个溢出桶,下次循环遍历这个溢出桶
b = ovf
}

// 未找到 key,需要插入这个新的 key。

// 如果我们达到了最大负载系数或者我们有太多的溢出桶,
// 同时,如果还没有开始扩容,那么现在开始扩容。
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
// 扩容哈希表会使所有内容无效,因此需要再次尝试插入。
// (上面循环获取到的插入位置的指针已经失效了,扩容之后插入的位置改变了,所以需要再次计算要插入的 bucket,以及要插入的槽中的位置)
goto again
}

// 表明没有找到可以插入的地方,则新建一个溢出桶来保存,
// 溢出桶的第一个元素就用来保存 key,返回溢出桶第一个元素 elem 的地址
// (这意味着,我们要插入的桶,所有的槽都有数据了,并且也不是我们要找的 key,所需要溢出桶了)
if inserti == nil {
// 当前存储桶及其连接的所有溢出存储桶已满,分配一个新的溢出桶。
newb := h.newoverflow(t, b)
// 溢出桶的第一个 tophash 的指针
inserti = &newb.tophash[0]
// 溢出桶的第一个 key 的指针
insertk = add(unsafe.Pointer(newb), dataOffset)
// 溢出桶的第一个 value 的指针
elem = add(insertk, bucketCnt*uintptr(t.keysize))
}

// 在插入位置存储新 key/elem
if t.indirectkey() { // 这意味着需要给 key 分配内存来保存它
kmem := newobject(t.key) // 给 key 分配内存(kmem 是保存 key 的内存指针)
*(*unsafe.Pointer)(insertk) = kmem // bucket 里面的 key 保存新分配的内存指针
insertk = kmem // insertk 指向新分配的地址(跟上一行并不重复)
// 最终效果是,insertk 和 kmem 指向了新分配的保存 key 的内存地址。
// 当然,insertk = kmem 不需要也可以,但这样一来下面也要改成:
// if t.indirectkey() {
// typedmemmove(t.key, kmem, key)
// } else {
// typedmemmove(t.key, insertk, key)
// }
}
if t.indirectelem() {
vmem := newobject(t.elem) // 给 elem 分配内存
*(*unsafe.Pointer)(elem) = vmem // bucket 里面 elem 的槽保存新分配的地址
}
// 移动 key 到 insertK(保存新的 key)
typedmemmove(t.key, insertk, key)
// 保存 tophash
*inserti = top
// map 元素个数 +1
h.count++

done:
// 并发写则报错(多个协程同时写 map)
if h.flags&hashWriting == 0 { // 写标志被清除了
fatal("concurrent map writes")
}
// 清除写标志
h.flags &^= hashWriting
// bucket 中的值保存的是指针,这个时候就不能返回 bucket 中值的地址了,而是返回 bucket 中值指向的另外一个地址的指针。
if t.indirectelem() {
// 获取指向 elem 实际存储地址的指针
elem = *((*unsafe.Pointer)(elem))
}
// 返回存储值的指针
return elem
}

有几点要注意的:

  1. go 中 map 是不允许并发读写的,如果有,直接报错。
  2. 这里面我们看到了有扩容的操作,在 go 中,map 的扩容发生在插入、修改和删除的时候,是一种渐进式扩容的方式,每次扩容会迁移两个 bucket,详细的后面讲到扩容的时候会细说。
  3. bucketloop 这个循环中,会记录下第一个空的槽,在找不到 key 的时候会进行插入操作。
  4. 如果找到,则返回保存值的地址的指针。如果没找到,则将 key 插入到上一步找到的空的槽中,如果也没有空的槽,则会新建溢出桶来保存新插入的 key
  5. 在这个函数中,会判断插入之后是否超过负载因子,又或者溢出桶是否太多,来决定是否扩容。

关于 key 匹配的过程,其实跟上面的 mapaccess 是一样的过程,先找普通桶,然后查找溢出桶。

map 删除 key 的实现

对于删除操作,其实有一些操作上面已经说过了,如如何定位一个 key。所以下面的讲述会侧重讲解跟删除操作密切相关的操作。

map 删除 key 的步骤

  1. 定位 key 所在 bucket,计算 tophash
  2. 遍历 bucket 的每一个槽,比较 tophash 以及 key,普通桶中查找不到会继续查找溢出桶。
  3. 如果查找到 key 的话,会清空对应 keybucket 内存中的 tophashkeyvalue
  4. 如果后面的槽没有元素了,则设置 emptyRest 标记。这样在后续查找的时候就可以避免不必要的搜索了。

map 删除过程图解

删除的过程比较简单,定位 key 的过程上面有过详细的讲解了,这里只详细画图阐述一下 emptyRest 的标记设置:

map_6_1

map 删除源码剖析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// 从 map 中删除 key(以及对应的 elem)
// 参数:
// t:map 类型元数据的结构体
// h:实际保存键值对的桶
// key:需要删除的 key
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
// map 是空的
if h == nil || h.count == 0 {
// 如果 key 的类型不可哈希则 panic
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return
}
// 并发写则抛出 fatal 错误
if h.flags&hashWriting != 0 {
fatal("concurrent map writes")
}

// 计算 key 的 hash
hash := t.hasher(key, uintptr(h.hash0))

// 在调用 t.hasher 之后设置 hashWriting,
// 因为 t.hasher 可能会 panic ,在这种情况下,我们实际上没有执行 write(delete)。
h.flags ^= hashWriting

// 计算 key 落在哪一个 bucket 中
bucket := hash & bucketMask(h.B)
// 如果正在扩容,则迁移 bucket 到新的内存地址中(迁移到新的桶)
// (迁移当前正在访问的 bucket)
if h.growing() {
growWork(t, h, bucket)
}
// 获取 bucket 实例(key 所在的 bucket)
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
// 记录原始的 bucket 实例
//(目的是为了方便加 emptyRest 标记)
bOrig := b
// 计算 tophash
top := tophash(hash)
search:
// 在 bucket 内进行搜索
for ; b != nil; b = b.overflow(t) {
// 遍历 bucket 的每一个槽
for i := uintptr(0); i < bucketCnt; i++ {
// tophash 不想等的时候,需要判断后面是否还有元素
if b.tophash[i] != top {
// 从 i 开始 bucket 后面都是空的了,
// 中止搜索过程(去除写标记,函数执行完毕)
if b.tophash[i] == emptyRest {
break search
}
// 还有元素,继续搜索
continue
}
// tophash 相等
// 获取 key 在 bucket 中的内存地址
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
k2 := k // k2 代表的是指向实际存储 key 的指针(unsafe.Pointer)
// k2 指向原始的地址
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
// key 不想等,继续检查下一个 slot(continue)
if !t.key.equal(key, k2) {
continue
}
// 只有在键中有指针时才清除键。
// 清空 key 的内存
if t.indirectkey() {
// 清除 bucket 里面保存 key 的内存
//(bucket 只是存储了 key 的指针)
*(*unsafe.Pointer)(k) = nil
} else if t.key.ptrdata != 0 {
// 清除包含指针的 key
memclrHasPointers(k, t.key.size)
}
// 值的地址
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
// 清空值
if t.indirectelem() {
// 清除 bucket 里面保存 elem 的内存
*(*unsafe.Pointer)(e) = nil
} else if t.elem.ptrdata != 0 {
// 清除包含指针的 elem(值)
memclrHasPointers(e, t.elem.size)
} else {
// 清除不包含指针的 elem 的内存
memclrNoHeapPointers(e, t.elem.size)
}
// tophash 设置为空标记
b.tophash[i] = emptyOne
// 如果 bucket 现在以一堆 emptyOne 状态结束,
// 将这些状态更改为 emptyRest 状态。
// 最好将其作为一个单独的函数,但 for 循环当前不可内联。(所以用 goto)
if i == bucketCnt-1 {
// 要删除的 key 是 bucket 里面的最后一个元素。
// 同时还有溢出桶,并且溢出桶里面还有元素 => 表明当前删除的 key 不是 bucket 以及溢出桶里面的最后一个元素。
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
// 不是最后一个元素
goto notLast
}
} else {
// 要删除的 key 不是 bucket 的最后一个元素。
// 并且后面还有元素。
if b.tophash[i+1] != emptyRest {
goto notLast
}
}

// 执行到这里的时候,表明刚刚删除的 key 是 bucket 以及溢出桶中的最后一个元素。
// 如果不是最后一个元素的话,上面的 if 判断已经跳转了。

// 下面的 for 循环做的事情是:
// 1. 将当前 key 的标志设置为:emptyRest,表示后面没有元素了。
// 2. 从 bucket 的第一个 key 出发遍历所有的槽(包含溢出桶),
// 将最后一个元素以后一直到被删除的 key 的中间的所有槽标记为 emptyRest
// 目的是,在后续遍历的时候可以避免一些不必要的查找操作,见到 emptyRest 就可以直接中断循环了。

// 例子,bucket 里 key 的内存布局为 | nil | a | nil | nil | b | nil |
// b 被删除的时候,b 以及 a 后面的两个元素都要标记为 emptyRest(溢出桶同理)
for {
// 给当前遍历到的 bucket 槽打上 emptyRest 标记
b.tophash[i] = emptyRest
if i == 0 { // 当前的桶遍历完了(因为是从后往前遍历)
// b 是普通桶(不是溢出桶)
if b == bOrig {
// 所有空的槽都处理完了
break
}
// b 是上一次循环处理的桶
c := b // c 是上一个 b
// 查找上一个存储桶,在其最后一个条目处继续。
// 如:bucket <- overflow1 <- overflow2 <- ... <- overflowN
// 假设 c 是 overflow2,那么下面的循环过后,b 就是 overflow1
// prevB.overflow = b => 找 prevB,也即:遍历完当前的桶,找前一个桶。
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
}
// b 指向了前一个 bucket(前一个桶)

// 处理前一个 bucket 的最后一个槽
i = bucketCnt - 1
} else {
// 继续处理前一个槽
i--
}
// 找到了一个不是空的槽,表示有数据了,不需要再打 emptyRest 标记了。
if b.tophash[i] != emptyOne {
break
}
}
// 被删除的元素如果不是最后一个元素,直接跳转到这里。
notLast:
// 找到了元素,并且删除了。
// map 的元素个数减 1
h.count--
// 重置哈希种子,使攻击者更难重复触发哈希冲突。见 issue 25237。
if h.count == 0 {
h.hash0 = fastrand()
}
// 在 bucket 内找到了对应的元素,并且删除了。
// 退出循环。
break search
}
}

// 如果当前没有写标记,则抛出 fatal 错误(不能并发读写 map)
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
// 清除写标记
h.flags &^= hashWriting
}

注意:

  1. go map 不允许并发写,所以如果发现有并发读写,则抛出 fatal 错误。
  2. 如果删除的是最后一个元素,则需要往前遍历,将每一个空的槽设置为 emptyRest 状态。
  3. 如果是 indirectkeyindirectelem,在删除的时候,只会将 bucket 中的指针置为 nil,对于实际的 keyvalue 不会进行处理。(无所谓,GC 会出手)。

map 的扩容实现

从上面的讲解中,我们知道,底层存储 key/value 的是 bucket,而 bucekt 的大小是一段有一定大小的连续内存。 如果我们插入的元素过多,我们初始化时分配的 bucket 的内存就会放不下了,这个时候 go 的 map 会有两种方式解决这个问题:

  1. 使用溢出桶(在 bmap 的上再链接一个 bmap,也就是溢出桶,普通桶放不下的时候,就放溢出桶中)
  2. 分配新的更大的空间来存放现有的这些键值对。在 go 里面新分配的内存空间将会是原来的 2 倍。

map 扩容的条件

map 的扩容发生在插入或者修改或者删除 key 的时候,扩容的条件在 mapassign 中写了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 条件:
// 1. 没有在扩容
// 2. 超过负载因子
// 3. 太多溢出桶
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
}

// 判断是否超过负载因子。
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}

// 判断是否有太多的溢出桶了
// 多的标准是:
// B <= 15 的时候,溢出桶数量大于 2^B 的时候
// B > 15 的时候,溢出桶的数量大于 2^15 的时候
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
// 如果阈值太低,我们会做额外的工作。
// 如果阈值太高,则增长和收缩的 map 会保留大量未使用的内存。
// “太多” 意味着(大约)与常规桶一样多的溢出桶。
// 有关详细信息,请参见incrnoverflow。
// B > 15 => 2 ^ 15,B <= 15 => 2 ^ B
if B > 15 {
B = 15
}
return noverflow >= uint16(1)<<(B&15)
}

hashGrow 实现

hashGrow 是被用来分配新的内存空间的,新的内存空间将被用来保存旧的 buckets需要注意的是,这个函数里面并没有做数据迁移的操作。 go 的 map 扩容的时候,数据迁移的方式是渐进式扩容,在我们插入/修改/删除 key 的时候会迁移 2 个 bucket,这样可以避免性能的瞬时抖动。 我们熟知的 redis 的扩容过程也是渐进式扩容的。

下面是 hashGrow 的实现源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// hash 扩容(这个方法只是分配了空间,实际上还没有做数据复制的操作)
// 参数:
// t:map 类型元信息
// h:实际保存键值对的结构体
func hashGrow(t *maptype, h *hmap) {
// 扩容的两种情况:
// 1、如果我们达到了负载系数,就要进行 2 倍扩容。
// 2、否则,如果溢出桶太多,进行等量扩容。
bigger := uint8(1)
// 尚未超过负载因子,进行等量扩容
if !overLoadFactor(h.count+1, h.B) {
bigger = 0 // 等量扩容
h.flags |= sameSizeGrow
}
// 记录旧的 buckets
oldbuckets := h.buckets
// 分配新的内存空间,oldbuckets 将会被渐进式迁移到 newbuckets 中
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)

// flags 标记有使用旧桶
// 正在迭代的时候扩容
// 先把 h.flags 中的迭代标记位清除。
// 最后如果发现 h.flags 中还有迭代标记位,说明在扩容的过程中有新的迭代操作,
// 那就把它转移到 oldIterator 中。
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// 提交扩容操作
h.B += bigger // 加上扩容的数量
h.flags = flags
h.oldbuckets = oldbuckets // 记录旧桶
h.buckets = newbuckets // 指向新的桶数组
h.nevacuate = 0 // 0 个桶已完成迁移(当前函数只是分配空间,不做迁移)
h.noverflow = 0 // 所有溢出桶没有了(移动到了 oldoverflow)

// 记录扩容前的溢出桶
if h.extra != nil && h.extra.overflow != nil {
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}

// 扩容之后,h.B+bigger >= 4 了,预分配了溢出桶(扩容前没有溢出桶)
// 所以这里要记录溢出桶
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}

// 哈希表数据的实际迁移过程是通过 growWork() 和 evacuate() 增量完成的。
}

数据迁移的两条线

go map 在扩容的时候,数据迁移会有两条线进行:

  1. 从第一个 bucket 开始迁移。
  2. 插入、修改、删除的时候,key 的哈希值定位到的 bucket 会被迁移。

具体如下图:

map_7_1

如果已经被迁移,则不再需要迁移。

这样一来就可以保证在一定的操作次数之后,全部 bucket 都被迁移。就算你每次插入、修改、删除都是同一个 key(也就是同一个 bucket), 我们第一条线的迁移都会在每次写操作的时候,迁移一个 bucket。这样无论如何,写操作到了一定次数之后,所有的 bucket 都会被迁移了。

什么时候 bucket 迁移之后下标会改变?

当然这里说的是增量扩容,如果是等量扩容,bucket 的下标不会改变。

先说答案:hash & m 的最高位是 1 的时候。

我们知道,在定位 bucket 的时候是通过 hash & m 的方式来定位 bucket 的索引的(具体可以看上面定位 key 的那一节), 而 2 倍扩容之后,bucket 的长度是原来的 2 倍,转换为二进制的时候,就是在原来的基础上多了一位,所以 hash & m 的结果就会多一位, 而最高的那个二进制位如果是 1,说明 hash & m 的结果是大于新的 bucket 数组长度的一半的(也就是比原来的索引都要大)。 那么会最高位是 1 会比原来的索引会大多少呢?答案是 bucket 数组的长度的一半(也就是 2^(B-1)):

map_7_2

我们可以再看看之前的那个计算 bucket 索引的图:

map_7_3

我们会发现,当 B = 3 的时候,不管 hash 是什么,hash & m 的结果都是 0~7。 而只有当 B = 4 的时候,hash & m 的结果才有可能落入 8~15 范围内,而且只有 hash & m 的最高位是 1 的时候才有可能。

所以结论是,当 hash & m 的最高位是 1 的时候,bucket 的下标就会改变。hash % m 的其他位跟之前是一样的,所以下标增加的范围其实就是 2^(B - 1),也就是旧的 buckets 的个数。

bucket 迁移图解

map_7_4

说明:

  • oldbuckets 是迁移前的桶,buckets 是迁移后的桶(也就是当前的 h.buckets)。
  • hash & m0xxx 的时候,会迁移到 x 这个 bucket 中。
  • hash & m1xxx 的时候,会迁移到 x + 2^(B-1) 这个 bucket 中(也就是 y 中),因为 1000 就是 2^(4 - 1)

假设需要迁移的是 oldbucket,下标为 3,那么 oldbucket 里面的 key 可能迁移的位置只可能是右边的 xy 指向的 bucket。 这取决于 oldbucket 里面的 key 哈希值的倒数第 4 位; 如果是 0,那么就迁移到 x 指向的 bucket,如果是 1,那么就迁移到 y 指向的 bucket

oldbucket 中所有的 key 只会迁移到 xy 中。同时 xy 也只可能有 oldbucketkey,不可能有其他旧的 bucketkey。这是由 hash & m 可以推断出来的。

bucket 迁移源码剖析

开始之前,我们要记住 xy 是怎么来的。

在开始看代码之前,我们需要明确一点:虽然整个哈希表是渐进式迁移的,但是单个 bucket 的迁移不是渐进式的。

  1. 我们先看看 growWork 函数:

这是迁移桶的函数,一次会迁移两个桶。不过实际上并不是严格的两个,因为迁移的函数会先判断桶是否已经被迁移, 如果桶还没有被迁移,才会进行迁移,如果桶已经被迁移则不做任何操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 桶迁移
// 参数:h 需要扩容的 map,t map 类型信息,bucket 旧桶的索引
// 1、迁移当前访问到的桶(mapassign、mapdelete 的时候)
// 2、继续逐个迁移 bucket,直到迁移完成(这个是从第一个桶开始迁移的)
func growWork(t *maptype, h *hmap, bucket uintptr) {
// 迁移当前访问到的桶
evacuate(t, h, bucket&h.oldbucketmask())

// 继续逐个迁移 bucket,直到迁移完成
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
  1. 然后看看 evacuate 函数:

这个就是实际做迁移操作的函数。它会根据 hash & m 的倒数第 B 位是否为 1 来决定将 key 迁移到 h.buckets 的前半部分还是后半部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// 迁移的目的地(旧桶 -> 目标桶)
// evacuate 中会定义两个这个 evacDst 变量,
// 一个指向 h.buckets 的前半部分,一个指向后半部分。(对应前一个图的 x,y)
// 迁移的时候,会根据 key 的哈希值的倒数第 4 位来决定迁移到哪个 evacDst 中。
type evacDst struct {
b *bmap // 迁移目的地 bucket
i int // key/elem 在迁移目标 bucket 里面对应的下标
k unsafe.Pointer // 目标 bucket 下一个保存 key 的地址指针
e unsafe.Pointer // 目标 bucket 下一个保存 elem 的地址指针
}

// 扩容的时候,bucket 迁移的实现
// oldbucket 需要迁移的旧桶索引
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// b 指向旧桶
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 扩容之前的桶的数量
// oldbucket+newbit 对应 y,oldbucket 对应 x
newbit := h.noldbuckets()
// 如果 b 尚未迁移,则进行迁移
if !evacuated(b) {
// xy 包含 x 和 y(低和高)迁移目的地。
// 迁移的时候,在 h.buckets 中,前 noldbuckets 个桶就是 x,后 noldbuckets 个桶就是代表 y。
var xy [2]evacDst
// x 存储了新桶的地址
x := &xy[0]
// x 指向 oldbucket 可能迁移到的新桶(h.buckets 的前半部分)
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
// x 桶中下一个用来保存旧桶的 key 的地址
x.k = add(unsafe.Pointer(x.b), dataOffset)
// x 桶中下一个用来保存旧桶的 elem 的地址
x.e = add(x.k, bucketCnt*uintptr(t.keysize))

// 如果不是等量扩容(有可能会迁移到 oldbucket+newbit 的位置上)
if !h.sameSizeGrow() {
y := &xy[1]
// y 指向增量空间上的第 oldbucket+newbit 个位置
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
// y 桶中下一个保存旧桶 key 的地址
y.k = add(unsafe.Pointer(y.b), dataOffset)
// y 桶中下一个保存旧桶 elem 的地址
y.e = add(y.k, bucketCnt*uintptr(t.keysize))
}

// 开始迁移旧桶(同时也会迁移溢出桶)
for ; b != nil; b = b.overflow(t) {
// 旧 bucket 上第一个 key
k := add(unsafe.Pointer(b), dataOffset)
// 旧 bucket 上第一个 value
e := add(k, bucketCnt*uintptr(t.keysize))
// 遍历 bucket 的槽
// 获取需要迁移的 key/elem(k/e) 对,迁移到新桶上(&xy[useY])
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) {
// 获取旧的 tophash
top := b.tophash[i]
// bucket 的这个槽是空的
if isEmpty(top) {
// 写入已迁移标记
b.tophash[i] = evacuatedEmpty
// 处理下一个 key/elem
continue
}
// tophash 错误
if top < minTopHash {
throw("bad map state")
}
// 复制一份 k
k2 := k
// k2 指向实际的 key
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
// useY 决定了是迁移到 x 还是 y,如果是等量扩容,那么就是迁移到 x
var useY uint8
// 如果不是等量扩容
if !h.sameSizeGrow() {
// 计算哈希以做出迁移决定(是否需要将此 key/elem 迁移到桶 x 或桶 y)。
hash := t.hasher(k2, uintptr(h.hash0))
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) {
// 下面这段是原英文注释的翻译(可能不太准确):
// 如果 key != key(不是一个数字),则散列可能(并且可能)与旧散列完全不同。
// 此外,它是不可再现的。
// 在迭代器存在的情况下,再现性是必需的,因为我们的迁移决策必须与迭代器所做的任何决策相匹配。
// 幸运的是,我们可以任意发送这些 key。
// 此外,tophash 对于这些类型的 key 没有意义。我们让 tophash 的最低位的决定如何迁移。
// 我们为下一个级别重新计算一个新的随机 tophash,这样在多次扩容后,这些 key 将均匀分布在所有桶中。
useY = top & 1
top = tophash(hash)
} else {
// 原理参考上面那个图。
if hash&newbit != 0 { // 取决于 oldB + 1 位是 0 还是 1
useY = 1
}
}
}

if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}

// 记录旧桶的迁移状态
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
// dst 是迁移的目标 bucket
dst := &xy[useY]

// 目标 bucket 装不下了,使用溢出桶
if dst.i == bucketCnt {
// 创建溢出桶
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
// dst.k 指向溢出桶的第一个 key 的地址
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
// dst.e 指向溢出桶的第一个 elem 的地址
dst.e = add(dst.k, bucketCnt*uintptr(t.keysize))
}
// 使用 & 运算优化,不用进行边界检查
// 记录 tophash
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
// 将旧的 key/elem 复制到 dst 指向的槽
if t.indirectkey() { // bucket 的 key 保存的是指针
// 修改实际存储 key 的内存
*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
} else {
// 修改 bucket 中保存 key 的内存
typedmemmove(t.key, dst.k, k) // copy elem
}
if t.indirectelem() { // bucket 的 elem 保存的是指针
// 修改实际存储 elem 的内存
*(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e)
} else {
// 修改 bucket 中保存 elem 的内存
typedmemmove(t.elem, dst.e, e)
}
// 目标桶的元素个数 +1
// dst 指向下一个空的槽(slot/cell)
dst.i++
// 这些更新可能会将这些指针推到 key 或 elem 数组的末尾。
// 这没关系,因为我们在桶的末端有溢出桶指针,以防止指针指向桶的末端。
// 如果指向数组末尾,在下次迁移的时候,会创建溢出桶。
dst.k = add(dst.k, uintptr(t.keysize))
dst.e = add(dst.e, uintptr(t.elemsize))
// 上面有判断 dst.i == bucketCnt,所以这里不会溢出
}
}
// 取消链接溢出桶并清除 key/elem 以帮助 GC。(清除旧桶的内存)
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// tophash 状态不能清除。
// 但是 key/elem 都可以清除。
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}

// 刚刚迁移的桶,就是顺序迁移的下一个桶,
// 则需要更新 nevacuate 字段,表示已经迁移了多少个桶
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}
  1. advanceEvacuationMark 的作用是更新 nevacuate 字段,表示已经迁移了多少个桶。

我们上面说了,哈希表扩容的时候,会有两条线,advanceEvacuationMark 就是处理顺序迁移的那条线,让 nevacuate 指向下一个未迁移的桶。 为什么需要做这个处理呢?这是因为另外一条线的迁移是随机的,访问到哪个桶就迁移哪个桶,这就导致了,顺序迁移的那条线,在将 nevacuate 指向下一个桶的时候,其实下一个桶是已经迁移了的,我们下次顺序迁移的时候肯定不需要迁移这个桶。 那么解决办法就是,继续向后查找,找到第一个未迁移的桶。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 记录迁移进度(从顺序迁移的位置往后遍历,保证 nevacuate 指向下一个尚未迁移的桶)
// 参数:h 需要扩容的 map,t map 类型信息,newbit 是旧桶的数量
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
// nevacuate 索引加 1
h.nevacuate++
// 往后找一个未迁移的桶(最多遍历 1024 个桶)。
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
// 向后遍历,找到第一个未迁移的桶。
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
// 这意味着所有旧桶都迁移完了
if h.nevacuate == newbit {
// 扩容已经完成。释放旧的普通桶数组。
h.oldbuckets = nil
// 也可以丢弃旧的溢出桶。
// 如果它们仍然被迭代器引用,那么迭代器将保存指向切片的指针。
//(但是 h 不再需要保存指向切片的指针)
if h.extra != nil {
h.extra.oldoverflow = nil
}
// 移除等量扩容标志
h.flags &^= sameSizeGrow
}
}

这里需要注意的是下面几行代码:

1
2
3
4
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}

我们在将 nevacuate 加上 1 之后,还会继续往后遍历 1024 个 bucket,如果 bucket 已迁移,则将 nevacuate 加 1。

如果没有这个操作会怎样?那就意味着可能有很多的 bucket 都已经迁移了,但是顺序迁移的位置(nevacuate)还没有更新, 这样可能会导致顺序迁移的位置每次都指向了已迁移的 bucket。 最终导致,迁移的时候,只迁移了访问到的 bucket,而没有迁移顺序迁移位置上的那个 bucket

也就是说,每次迁移的时候只会迁移 1 个 bucket,而不是 2 个,这样一来 growWork 需要调用的次数比原来更多,也就是说,需要写操作次数更多才能完成全部 bucket 的迁移

这个函数做的事情可以表示成下图:

map_7_5

在这个图中,nevacuate 一开始是 1,然后因为 3 这个 bucket 在这次 growWork 中已经迁移了, nevacuate 如果要指向下一个未迁移的 bucket 的话,就要跳过之前已经迁移的 2,以及本次 growWork 中已经迁移的 3, 所以最终 nevacuate 指向了 4,也就是下一个未迁移的 bucket

等量扩容的效果

在上面我们讲解的时候,其实已经假设了扩容是增量扩容(2 倍扩容),但实际上还有一种扩容方式,就是等量扩容。 等量扩容的时候,扩容前后的 bucket 其实数量是一样的,那么为什么还要进行扩容呢?

这是因为,溢出桶太多了,数据非常零散地分布在了很多的溢出桶中,这样会导致 bucket 中很多槽都是空的, 这样一来,进行查找、修改、删除的时候,需要遍历很多的溢出桶,这样会导致性能下降。如下图:

map_7_6

这个图中,我们假设要查找的 key 所在的普通桶以及前两个溢出桶都是空的,又或者 key 不在前面三个桶中,那只有遍历到最后一个溢出桶的时候才能找到我们要查找的 key。为了针对这种键值对数量没有达到扩容的阈值,但是溢出桶太多的情况,Go 语言提供了等量扩容的方式。

在等量扩容的时候,会将所有的溢出桶都迁移到新的 bucket 中,这样一来,bucket 中的槽就会被填满,而溢出桶也可能不再需要。

最后,针对上图的情况,key = foo 会被迁移到普通桶中,这样在查找的时候,只需要遍历普通桶就可以找到了。 当然,实际中的情况是,对于零散分布在多个溢出桶中的键值对,会被逐个往前挪,最终效果就是,桶中没有空的槽,除了最后一个 key 以后的槽。

大家可以结合下图想象一下:

当然下图只是描述了一下部分 key,如果 key 分布。实际上 key 在触发等量扩容的情况下,是零散地分布在不同的 bucket 中的(包括溢出桶)。

map_7_7

map 的迭代实现

go 的 map 迭代的时候,我们会发现,返回结果的顺序并不固定,这是因为 map 的迭代是无序的。 在 map 遍历的时候,每次都会从一个随机的 bucket 开始遍历,而且选了一个 bucket 之后, 还会从 bucket 中随机选择一个槽开始遍历,这样一来,每次遍历的结果都是不一样的。

map 迭代器数据结构

迭代器的功能:记录要遍历的 map,以及当前遍历到的 bucket 以及槽的索引,以便进行下一次遍历。

go 的 map 迭代器的数据结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 哈希迭代结构。
type hiter struct {
// 必须排在第一位。 写入 nil 表示迭代结束
key unsafe.Pointer
// 必须在第二个位置(参见 cmd/compile/internal/walk/range.go)。
elem unsafe.Pointer
t *maptype // map 的类型信息,包括 key、elem 的类型等信息
h *hmap // 需要迭代的 hmap
// hash_iter 初始化时的 bucket 指针
buckets unsafe.Pointer
// 当前正在遍历的 bucket
bptr *bmap
// 保持 hmap.buckets 的溢出桶存活
overflow *[]*bmap
// 保持 hmap.oldbuckets 的溢出桶存活
oldoverflow *[]*bmap
// bucket 迭代开始位置(随机选择的 bucket)
startBucket uintptr
// 在迭代期间开始的桶内偏移量(应该足够大以容纳 bucketCnt-1)
offset uint8
// 已经从桶数组的末尾环绕到开始
wrapped bool
B uint8 // 就是当前遍历的 hmap 的那个 B
i uint8 // 当前遍历的 bucket 内 key 的索引
bucket uintptr // 当前遍历的 bucket
checkBucket uintptr
}

几点注意的:

  1. 全部键值对遍历完的时候,key 会被置为 nil,这样一来,我们就可以通过 key 是否为 nil 来判断是否遍历完了。(当然这个不用开发者来判断,for...range 底层已经帮我们做了这个判断)。
  2. hiter 结构体保存了当前正在迭代的 bucketbptr)、bucket 中的 key 的索引(i)等信息,这样一来,我们就可以通过这些信息来确定下一个 key 的位置。

迭代器的初始化实现

go 的 map 初始化是通过 runtime.mapiterinit 来实现的,这个函数的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// mapiterinit 初始化用于 range 遍历 map 的 hiter 结构。(初始化 hiter)
// 'it' 指向的 hiter 结构由编译器顺序传递在堆栈上分配,或由 reflect_mapiterinit 在堆上分配。
// 两者都需要将 hiter 置零,因为结构包含指针。
func mapiterinit(t *maptype, h *hmap, it *hiter) {
// hiter 记录 map 的类型信息
it.t = t
// 如果 map 是空的直接返回
if h == nil || h.count == 0 {
return
}

// 个人猜测:内存对齐判断,这个由编译器决定的
if unsafe.Sizeof(hiter{})/goarch.PtrSize != 12 {
throw("hash_iter size incorrect") // see cmd/compile/internal/reflectdata/reflect.go
}
// it 关联上 map
it.h = h

// 获取桶状态的快照
it.B = h.B
// 当前的 buckets
it.buckets = h.buckets
// bucket 里面没有包含指针
if t.bucket.ptrdata == 0 {
// 分配当前切片并记住指向当前切片和旧切片的指针。
// 这将保持所有相关的溢出桶处于活动状态,即使在迭代时表增长和/或溢出桶添加到表中。
h.createOverflow()
it.overflow = h.extra.overflow
it.oldoverflow = h.extra.oldoverflow
}

// 决定从哪里开始遍历。
// 策略:随机选定一个 bucket,然后从该 bucket 开始遍历。
// 生成随机数
var r uintptr
if h.B > 31-bucketCntBits {
r = uintptr(fastrand64())
} else {
r = uintptr(fastrand())
}
// r => 扫描的入口(随机选的 bucket)
it.startBucket = r & bucketMask(h.B)
// 随机定位的 bucket 中的槽。
it.offset = uint8(r >> h.B & (bucketCnt - 1))

// 记录当前扫描的 bucket
it.bucket = it.startBucket

// 记住我们有一个迭代器。
// 可以与另一个 mapiteinit() 同时运行。
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
atomic.Or8(&h.flags, iterator|oldIterator)
}

// 开始遍历
mapiternext(it)
}

这个函数主要功能是初始化迭代器,我们最需要关心的是,这个函数里面会生成一个随机数,然后通过这个随机数来决定从哪一个 bucket 开始遍历, 以及从 bucket 中的哪一个槽开始遍历(也是随机的)。

map 遍历图解

那为什么从随机定位的 bucket 以及随机定位的 key 就可以实现遍历呢?其实很简单,如果看过我之前写的 《go chan 设计与实现》的话, 就会知道 chan 的实现中是通过数组来实现环形队列的。而我们可以借助环形队列的特性来理解 map 的遍历。遍历到最后一个 bucket 之后, 下一个 bucket 就是第一个 bucket,这样就实现了环形遍历。同样的,遍历到最后一个 key 之后,下一个 key 就是第一个 key,这样也实现了环形遍历。

我们需要做的就是,在遍历开始的时候,记录第一个 bucket,然后每次遍历 bucket 的时候, 比较当前的 bucket 是否是第一个 bucket,是的话,意味着遍历结束了。 同样的,对于 bucketkey 的遍历也是。

不过,在实际实现中,key 有点不一样,key 的遍历是通过将 i0 遍历到 7,对于每一个 i,加上 it.offset,然后对 8 取模, 这样就可以实现环形遍历了,代码如下:

1
2
3
4
for ; i < bucketCnt; i++ {
// 桶内第 i 个元素
offi := (i + it.offset) & (bucketCnt - 1)
}

具体遍历过程如下图:

map_8_1

发生在扩容期间的遍历

go 的 map 设计中,是不允许在迭代的时候进行插入、修改、删除的,但只是在获取下一个键值对的时候不允许, 在迭代器获取了键值对之后,就算还没有全部遍历完 map 的所有元素,还是可以允许做插入、修改、删除操作的。 这样一来,有可能会出现一些奇怪的现象,比如插入的 key 遍历不出来,或者遍历出来的 key 有重复等等。这是需要开发者注意的地方。

另外,迭代可以发生在扩容的过程中,但是扩容其实对迭代其实是没有什么影响的。因为迭代的时候会做一些判断尽量保证所有 key 都能被遍历到。 但不能保证我们对 map 做了写操作后依然可以全部 key 都遍历。

在遍历的过程中,如果 map 发生了扩容,那么遍历的过程就会变得复杂一些。因为在扩容的过程中,map 会新建一个 bucket, 然后将原来的 bucket 中的 key 重新散列到新的 bucket 中。所以在遍历的过程中,如果发现当前的 bucket 已经发生了扩容, 需要做一些判断,比如:

  1. 如果发现 bucket 还没有迁移,则从 oldbuckets 中遍历。
  2. 如果发现 bucket 在迁移之后索引跟原来的不一样,则跳过。

具体可以参考下图:

map_8_2

这里假设的条件是:旧桶个数为 4,增量扩容后,新桶个数为 8hiter 当前迭代的是新桶。

说明:

  1. h.oldbuckets 指向了还没迁移完的桶,h.buckets 是当前的桶。
  2. hiter 迭代器要迭代的是新的桶。迭代器初始化的时候正在进行 2 倍扩容。
  3. checkBucket 是下一个要遍历的桶(索引为 1),图中的情况是,这个桶还没有被迁移。所以需要从 h.oldbuckets 中读取。
  4. checkBucket 中的 key 有可能会迁移到 h.buckets 中的 1 或者 5 位置。(具体可以看上面桶迁移的实现那一节)
  5. 如果 key 是要被迁移到 5 中的话,那么遍历的时候会跳过,因为后面会遍历到 5 中的 key
  6. 对于第 5 点,在遍历 5 这个 bucket 的时候,由于我们是使用当前遍历的 bucket 的下标结合旧桶的长度计算在旧桶中的下标的,所以还是可以取得到旧桶,然后遍历的时候取出那些应该迁移到 5 这个 bucketkey,对于那些应该要迁移到 1key 则跳过。
  7. 下一个要遍历的桶的索引为 2

对于第 6 点桶索引计算的特别说明,如果是增量扩容,计算 bucket 的下标方式如下:

1
2
3
// bucket 是当前要遍历的 bucket 的下标
// it.h.oldbucketmask() 是旧桶的 B 的掩码
oldbucket := bucket & it.h.oldbucketmask()

当然如果这个桶已经迁移,那么还是会从新桶遍历(也就是 bucket & it.h.oldbucketmask() 里的 bucket 本身)。

键值对遍历源码剖析

map 中实现遍历的函数是 mapiternext,这个函数做的事情,也就是上面两个图描述的遍历过程,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// 参数:迭代器实例
func mapiternext(it *hiter) {
// 获取底层的 hmap 实例
h := it.h
// 正在插入、修改、删除 key 但时候,不能获取下一个 key,
// 注意:这不能保证插入、修改、删除之后,进行迭代。
if h.flags&hashWriting != 0 {
fatal("concurrent map iteration and map write")
}
t := it.t // *maptype
bucket := it.bucket // 当前遍历到的 bucket 下标(int 类型)
b := it.bptr // 当前 bucket 的指针(实际的 bucket,bmap 指针类型)
i := it.i // 迭代器当前遍历到的位置(bucket 内 key 的位置)
checkBucket := it.checkBucket // 这个用来判断是否是增量扩容的

next:
// 注意:下面的 if 的功能是,获取下一个桶。
// 如果 b 是 nil,有以下两种情况:
// a. 第一次遍历,还没有遍历到任何 bucket。
// b. 遍历完最后一个 bucket 了。
if b == nil {
// 已经迭代完了,直接退出函数
if bucket == it.startBucket && it.wrapped {
// for...range 查看到 key 是 nil 会中止迭代
it.key = nil
it.elem = nil
return
}
// 如果正在扩容
if h.growing() && it.B == h.B {
// 迭代器是在扩容过程中启动的,扩容尚未完成。
// 如果我们正在查看的存储桶尚未迁移,
// 那么我们需要遍历旧存储桶,同时只返回将迁移到此存储桶的数据。
// 那些需要迁移到另一个下标的桶则跳过。
oldbucket := bucket & it.h.oldbucketmask()
// 获取旧桶
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
if !evacuated(b) { // 旧桶还没有迁移
checkBucket = bucket
} else { // 已经迁移了,获取新桶
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
} else {
// 并没有在扩容,获取新的桶
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}

// bucket 指向了下一个要迭代的桶的下标
bucket++
// 判断当前遍历的桶是否到最后一个桶了
if bucket == bucketShift(it.B) {
bucket = 0
it.wrapped = true // 已经从第一个随机定位的 bucket 遍历到最后一个 bucket 了,下一个应该是第一个 bucket 了
}
// 开始遍历新的 bucket 的时候,重置 i
i = 0
}

// 开始遍历桶内的键值对
for ; i < bucketCnt; i++ {
// 计算桶内键值对的下标(从随机的 it.offset 下标开始遍历)
offi := (i + it.offset) & (bucketCnt - 1)
// 如果槽是空的,或者 key 已经迁移,则跳过。
if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty {
continue
}
// 桶内第 i 个元素的 key 的指针
// 获取 key 或 elem 的解析前面的小节有详细的解释了,不再赘述。
k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
// 桶内第 i 个元素的 elem 的指针
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.elemsize))

// 增量扩容的 key 判断
// 需要判断迁移之后的 key 落入的是 h.buckets 的前半部分还是后半部分(x 还是 y)
// 具体看上面的迁移实现。
if checkBucket != noCheck && !h.sameSizeGrow() {
if t.reflexivekey() || t.key.equal(k, k) {
// key 是可比较的
// 如果这个 key 将会被迁移到 h.buckets 的后半部分,跳过它
hash := t.hasher(k, uintptr(h.hash0))
if hash&bucketMask(it.B) != checkBucket {
continue
}
} else {
// 对于不可比较的 key,
// 是由 tophash 的最低位来决定迁移到前半部分还是后半部分的。
// 取 checkBucket 的最高位来比较,因为 checkBucket 的最高位决定了
// 当前遍历的 bucket 是上半部分还是后半部分的 bucket。
if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
// 如果当前的 key 没有落入当前遍历的 bucket,则跳过它
continue
}
}
}

if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
!(t.reflexivekey() || t.key.equal(k, k)) {
// 上面的判断:
// 1、表示 key 还没有迁移。
// 2、或者,key 是不可比较的。
// 则直接返回
it.key = k
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
it.elem = e
} else {
// 自迭代器启动以来,哈希表已扩容。
// key 已经被迁移到其他地方。
// 检查当前哈希表中的数据。
// 此代码处理已删除、更新或删除并重新插入 key 的情况。
// 注意:我们需要重新标记 key,因为它可能已更新为 equal() 但不是相同的 key(例如 +0.0 vs -0.0)。
// 获取当前遍历到的 key/elem。
rk, re := mapaccessK(t, h, k)
if rk == nil {
// key 已经被删除了,继续遍历下一个 key
continue
}
// 外部是通过 it 的 key/elem 来获取当前遍历到的键值对的
it.key = rk
it.elem = re
}
// 记录当前迭代的 bucket
it.bucket = bucket
// 遍历到下一个 bucket 了,更新 bptr
if it.bptr != b { // avoid unnecessary write barrier; see issue 14921
it.bptr = b
}
// 迭代器的 i 指向 bucket 内的下一个 key
it.i = i + 1
// 记录是否需要检查 key 的标记
it.checkBucket = checkBucket
// 找到了键值对(保存在 it.key/it.elem 中了),返回。
// mapiternext 外部可以通过 hiter 的 key/elem 属性来获取当前遍历到的 key/val。
// 遍历下一个元素的时候,再次调用 mapiternext 函数。
//(无所谓,hiter 会记住迭代到哪里的)
// 也即:每遍历一个元素,调用一次 mapiternext 函数。
return
}
// 当前的 bucket 遍历完了,那么继续从溢出桶中查找下一个元素。
// b 指向溢出桶,迭代溢出桶
b = b.overflow(t)
// 遍历下一个桶了,key 从 0 开始遍历
i = 0
// 遍历当前 bucket 的溢出桶
goto next
}

小结

  • 哈希表相比数组,可以很快速地查找,所以常见的编程语言都会有对应的实现。
  • 哈希冲突有两种解决方法:开放地址法链表法,go 里的 map 使用的是链表法
  • 哈希表初始化分配的是比较小的内存,只能存放少量键值对,但是随着我们插入的数据越来越多,哈希表会进行扩容,防止哈希表的读写性能下降。
  • go 的 map 扩容有两种方式:键值对太多的时候会进行 2 倍扩容,溢出桶太多会进行等量扩容。
  • map 中使用 buckets 来存储桶,一个桶里面可以存储 8 个键值对,一个桶满了的时候,会创建溢出桶来保存多出来的 key
  • map 中桶的结构体是 bmap,它里面的会将所有 key 连续存储,所有的 value 也会连续存储。
  • map 定位 key 的时候,会使用哈希值与 B 的掩码做 & 运算(我们可以将其理解为一种模运算的另外一种实现),从而得到 bucket 的下标,然后遍历这个 bucket 中的每一个槽。先比较 tophash,如果 tophash 相等再比较 key,如果 tophashkey 都相等,则表明找到了我们要找的 key。如果这两者有一个不等,继续比较下一个 key
  • 如果一个 bucket 中的所有 key 被遍历完了也没有找到,那么继续从溢出桶中查找。
  • map 读取数据是通过 mapaccess1mapaccess2mapaccessK 实现的,对于整型键值的 map 有优化的 mapaccess 实现(对于 bmap 里键值的访问更加高效)。
  • map 写入和修改数据都是通过 mapassign 函数实现的,这个函数在找不到 key 的时候会进行插入操作。
  • map 删除数据是通过 mapdelete 函数实现的。删除的时候会需要将 bucket 末尾的所有空的槽的标记更新为 emptyRest
  • map 扩容的条件有两个:超过负载因子、溢出桶太多。只有在增量扩容的时候,key 所对应的 bucket 的下标才有可能发生变化。
  • map 扩容的时候,会迁移当前正在写入、删除的 bucket,同时也会从第一个 bucket 开始迁移,一次写操作会迁移两个 bucket。这样可以保证在一定的写操作以后,所有 bucket 都能迁移完成。
  • 等量扩容的时候,key 在新桶中的 bucket 下标不变,但是 key 在桶内的分布会更加地紧凑,从而会提高查找效率。
  • map 的迭代是通过 hiter 结构体来实现的,迭代的过程中 hiter 会记录当前的 bucketkey,普通桶迭代完后,迭代溢出桶。map 的迭代是通过 mapiternext 函数实现的,每次获取键值对都是通过这个 mapiternext 函数。
  • map 迭代如果发生在增量扩容的时候,对于未迁移的 bucket,会判断 keybucket 是否会发生变化,如果 key 对应的 bucket 已经改变,则迭代的时候会跳过。

在上一篇文章《go interface 基本用法》中,我们了解了 go 中 interface 的一些基本用法,其中提到过 接口本质是一种自定义类型,本文就来详细说说为什么说 接口本质是一种自定义类型,以及这种自定义类型是如何构建起 go 的 interface 系统的。

本文使用的源码版本: go 1.19。另外本文中提到的 interface接口 是同一个东西。

前言

在了解 go interface 的设计过程中,看了不少资料,但是大多数资料都有生成汇编的操作,但是在我的电脑上指向生成汇编的操作的时候, 生成的汇编代码却不太一样,所以有很多的东西无法验证正确性,这部分内容不会出现在本文中。本文只写那些经过本机验证正确的内容,但也不用担心,因为涵盖了 go interface 设计与实现的核心部分内容,但由于水平有限,所以只能尽可能地传达我所知道的关于 interface 的一切东西。对于有疑问的部分,有兴趣的读者可以自行探索。

如果想详细地了解,建议还是去看看 iface.go,里面有接口实现的一些关键的细节。但是还是有一些东西被隐藏了起来, 导致我们无法知道我们 go 代码会是 iface.go 里面的哪一段代码实现的。

接口是什么?

接口(interface)本质上是一种结构体。

我们先来看看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// main.go
package main

type Flyable interface {
Fly()
}

// go tool compile -N -S -l main.go
func main() {
var f1 interface{}
println(f1) // CALL runtime.printeface(SB)

var f2 Flyable
println(f2) // CALL runtime.printiface(SB)
}

我们可以通过 go tool compile -N -S -l main.go 命令来生成 main.go 的伪汇编代码,生成的代码会很长,下面省略所有跟本文主题无关的代码:

1
2
3
4
// main.go:10 => println(f1)
0x0029 00041 (main.go:10) CALL runtime.printeface(SB)
// main.go:13 => println(f2)
0x004f 00079 (main.go:13) CALL runtime.printiface(SB)

我们从这段汇编代码中可以看到,我们 println(f1) 实际上是对 runtime.printeface 的调用,我们看看这个 printeface 方法:

1
2
3
func printeface(e eface) {
print("(", e._type, ",", e.data, ")")
}

我们看到了,这个 printeface 接收的参数实际上是 eface 类型,而不是 interface{} 类型,我们再来看看 println(f2) 实际调用的 runtime.printiface 方法:

1
2
3
func printiface(i iface) {
print("(", i.tab, ",", i.data, ")")
}

也就是说 interface{} 类型在底层实际上是 eface 类型,而 Flyable 类型在底层实际上是 iface 类型。

这就是本文要讲述的内容,go 中的接口变量其实是用 ifaceeface 这两个结构体来表示的:

  • iface 表示某一个具体的接口(含有方法的接口)。
  • eface 表示一个空接口(interface{}

iface 和 eface 结构体

ifaceeface 的结构体定义(runtime/iface.go):

1
2
3
4
5
6
7
8
9
10
11
// 非空接口(如:io.Reader)
type iface struct {
tab *itab // 方法表
data unsafe.Pointer // 指向变量本身的指针
}

// 空接口(interface{})
type eface struct {
_type *_type // 接口变量的类型
data unsafe.Pointer // 指向变量本身的指针
}

go 底层的类型信息是使用 _type 结构体来存储的。

比如,我们有下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

type Bird struct {
name string
}

func (b Bird) Fly() {
}

type Flyable interface {
Fly()
}

func main() {
bird := Bird{name: "b1"}
var efc interface{} = bird // efc 是 eface
var ifc Flyable = bird // ifc 是 iface

println(efc) // runtime.printeface
println(ifc) // runtime.printiface
}

在上面代码中,efceface 类型的变量,对应到 eface 结构体的话,_type 就是 Bird 这个类型本身,而 data 就是 &bird 这个指针:

类似的,ifciface 类型的变量,对应到 iface 结构体的话,data 也是 &bird 这个指针:

_type 是什么?

在 go 中,_type 是保存了变量类型的元数据的结构体,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// _type 是 go 里面所有类型的一个抽象,里面包含 GC、反射、大小等需要的细节,
// 它也决定了 data 如何解释和操作。
// 里面包含了非常多信息:类型的大小、哈希、对齐及 kind 等信息
type _type struct {
size uintptr // 数据类型共占用空间的大小
ptrdata uintptr // 含有所有指针类型前缀大小
hash uint32 // 类型 hash 值;避免在哈希表中计算
tflag tflag // 额外类型信息标志
align uint8 // 该类型变量对齐方式
fieldAlign uint8 // 该类型结构体字段对齐方式
kind uint8 // 类型编号
// 用于比较此类型对象的函数
equal func(unsafe.Pointer, unsafe.Pointer) bool
// gc 相关数据
gcdata *byte
str nameOff // 类型名字的偏移
ptrToThis typeOff
}

这个 _type 结构体定义大家随便看看就好了,实际上,go 底层的类型表示也不是上面这个结构体这么简单。

但是,我们需要知道的一点是(与本文有关的信息),通过 _type 我们可以得到结构体里面所包含的方法这些信息。 具体我们可以看 itabinit 方法(runtime/iface.go),我们会看到如下几行:

1
2
3
4
5
6
typ := m._type
x := typ.uncommon() // 结构体类型

nt := int(x.mcount) // 实际类型的方法数量
// 实际类型的方法数组,数组元素为 method
xmhdr := (*[1 << 16]method)(add(unsafe.Pointer(x), uintptr(x.moff)))[:nt:nt]

在底层,go 是通过 _type 里面 uncommon 返回的地址,加上一个偏移量(x.moff)来得到实际结构体类型的方法列表的。

我们可以参考一下下图想象一下:

itab 是什么?

我们从 iface 中可以看到,它包含了一个 *itab 类型的字段,我们看看这个 itab 的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 编译器已知的 itab 布局
type itab struct {
inter *interfacetype // 接口类型
_type *_type
hash uint32
_ [4]byte
fun [1]uintptr // 变长数组. fun[0]==0 意味着 _type 没有实现 inter 这个接口
}

// 接口类型
// 对应源代码:type xx interface {}
type interfacetype struct {
typ _type // 类型信息
pkgpath name // 包路径
mhdr []imethod // 接口的方法列表
}

根据 interfacetype 我们可以得到关于接口所有方法的信息。同样的,通过 _type 也可以获取结构体类型的所有方法信息。

从定义上,我们可以看到 itab*interfacetype*_type 有关,但实际上有什么关系从定义上其实不太能看得出来, 但是我们可以看它是怎么被使用的,现在,假设我们有如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// i 在底层是一个 interfacetype 类型
type i interface {
A()
C()
}

// t 底层会用 _type 来表示
// t 里面有 A、B、C、D 方法
// 因为实现了 i 中的所有方法,所以 t 实现了接口 i
type t struct {}
func (t) A() {}
func (t) B() {}
func (t) C() {}
func (t) D() {}

下图描述了上面代码对应的 itab 生成的过程:

说明:

  • itab 里面的 inter 是接口类型的指针(比如通过 type Reader interface{} 这种形式定义的接口,记录的是这个类型本身的信息),这个接口类型本身定义了一系列的方法,如图中的 i 包含了 AC 两个方法。
  • _type 是实际类型的指针,记录的是这个实际类型本身的信息,比如这个类型包含哪些方法。图中的 i 实现了 ABCD 四个方法,因为实现了 i 的所有方法,所以说 t 实现了 i 接口。
  • 在底层做类型转换的时候,比如 t 转换为 i 的时候(var v i = t{}),会生成一个 itab,如果 t 没有实现 i 中的所有方法,那么生成的 itab 中不包含任何方法。
  • 如果 t 实现了 i 中的所有方法,那么生成的 itab 中包含了 i 中的所有方法指针,但是实际指向的方法是实际类型的方法(也就是指向的是 t 中的方法地址)
  • mhdr 就是 itab 中的方法表,里面的方法名就是接口的所有方法名,这个方法表中保存了实际类型(t)中同名方法的函数地址,通过这个地址就可以调用实际类型的方法了。

所以,我们有如下结论:

  • itab 实际上定义了 interfacetype_type 之间方法的交集。作用是什么呢?就是用来判断一个结构体是否实现某个接口的。
  • itab 包含了接口的所有方法,这里面的方法是实际类型的子集。
  • itab 里面的方法列表包含了实际类型的方法指针(也就是实际类型的方法的地址),通过这个地址可以对实际类型进行方法的调用。
  • itab 在实际类型没有实现接口的所有方法的时候,生成失败(失败的意思是,生成的 itab 里面的方法列表是空的,在底层实现上是用 fun[0] = 0 来表示)。

生成的 itab 是怎么被使用的?

go 里面定义了一个全局变量 itabTable,用来缓存 itab,因为在判断某一个结构体是否实现了某一个接口的时候, 需要比较两者的方法集,如果结构体实现了接口的所有方法,那么就表明结构体实现了接口(这也就是生成 itab 的过程)。 如果在每一次做接口断言的时候都要做一遍这个比较,性能无疑会大大地降低,因此 go 就把这个比较得出的结果缓存起来,也就是 itab。 这样在下一次判断结构体是否实现了某一个接口的时候,就可以直接使用之前的 itab,性能也就得到提升了。

1
2
3
4
5
6
7
8
9
10
// 表里面缓存了 itab
itabTable = &itabTableInit
itabTableInit = itabTableType{size: itabInitSize}

// 全局的 itab 表
type itabTableType struct {
size uintptr // entries 的长度,2 的次方
count uintptr // 当前 entries 的数量
entries [itabInitSize]*itab // 保存 itab 的哈希表
}

itabTableType 里面的 entries 是一个哈希表,在实际保存的时候,会用 interfacetype_type 这两个生成一个哈希表的键。 也就是说,这个保存 itab 的缓存哈希表中,只要我们有 interfacetype_type 这两个信息,就可以获取一个 itab

具体怎么使用,我们可以看看下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

type Flyable interface {
Fly()
}

type Runnable interface {
Run()
}

var _ Flyable = (*Bird)(nil)
var _ Runnable = (*Bird)(nil)

type Bird struct {
}

func (b Bird) Fly() {
}

func (b Bird) Run() {
}

// GOOS=linux GOARCH=amd64 go tool compile -N -S -l main.go > main.s
func test() {
// f 的类型是 iface
var f Flyable = Bird{}
// Flyable 转 Runnable 本质上是 iface 到 iface 的转换
f.(Runnable).Run() // CALL runtime.assertI2I(SB)
// 这个 switch 里面的类型断言本质上也是 iface 到 iface 的转换
// 但是 switch 里面的类型断言失败不会引发 panic
switch f.(type) {
case Flyable: // CALL runtime.assertI2I2(SB)
case Runnable: // CALL runtime.assertI2I2(SB)
}
if _, ok := f.(Runnable); ok { // CALL runtime.assertI2I2(SB)
}

// i 的类型是 eface
var i interface{} = Bird{}
// i 转 Flyable 本质上是 eface 到 iface 的转换
i.(Flyable).Fly() // CALL runtime.assertE2I(SB)
// 这个 switch 里面的类型断言本质上也是 eface 到 iface 的转换
// 但是 switch 里面的类型断言失败不会引发 panic
switch i.(type) {
case Flyable: // CALL runtime.assertE2I2(SB)
case Runnable: // CALL runtime.assertE2I2(SB)
}
if _, ok := i.(Runnable); ok { // CALL runtime.assertE2I2(SB)
}
}

我们对上面的代码生成伪汇编代码:

1
GOOS=linux GOARCH=amd64 go tool compile -N -S -l main.go > main.s

然后我们去查看 main.s,就会发现类型断言的代码,本质上是对 runtime.assert* 方法的调用(assertI2IassertI2I2assertE2IassertE2I2), 这几个方法名都是以 assert 开头的,assert 在编程语言中的含义是,判断后面的条件是否为 true,如果 false 则抛出异常或者其他中断程序执行的操作,为 true 则接着执行。 这里的用处就是,判断一个接口是否能够转换为另一个接口或者另一个类型

但在这里有点不太一样,这里有两个函数最后有个数字 2 的,表明了我们对接口的类型转换会有两种情况,我们上面的代码生成的汇编其实已经很清楚了, 一种情况是直接断言,使用 i.(T) 这种形式,另外一种是在 switch...case 里面使用,

我们可以看看它们的源码,看看有什么不一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 直接根据 interfacetype/_type 获取 itab
func assertE2I(inter *interfacetype, t *_type) *itab {
if t == nil {
// 显式转换需要非nil接口值。
panic(&TypeAssertionError{nil, nil, &inter.typ, ""})
}
// getitab 的第三个参数是 false
// 表示 getiab 获取不到 itab 的时候需要 panic
return getitab(inter, t, false)
}

// 将 eface 转换为 iface
// 因为 e 包含了 *_type
func assertE2I2(inter *interfacetype, e eface) (r iface) {
t := e._type
if t == nil {
return
}
// getitab 的第三个参数是 true
// 表示 getitab 获取不到 itab 的时候不需要 panic
tab := getitab(inter, t, true)
if tab == nil {
return
}
r.tab = tab
r.data = e.data
return
}

getitab 的源码后面会有。

从上面的代码可以看到,其实带 2 和不带 2 后缀的关键区别在于:getitab 的调用允不允许失败。 这有点类似于 chan 里面的 selectchanselect 语句中读写 chan 不会阻塞,而其他地方会阻塞。

assertE2I2 是用在 switch...case 中的,这个调用是允许失败的,因为我们还需要判断能否转换为其他类型; 又或者 v, ok := i.(T) 的时候,也是允许失败的,但是这种情况会返回第二个值给用户判断是否转换成功。 而直接使用类型断言的时候,如 i.(T) 这种,如果 i 不能转换为 T 类型,则直接 panic

对于 go 中的接口断言可以总结如下:

  • assertI2I 用于将一个 iface 转换为另一个 iface,转换失败的时候 panic
  • assertI2I2 用于将一个 iface 转换为另一个 iface,转换失败的时候不会 panic
  • assertE2I 用于将一个 eface 转换为另一个 iface,转换失败的时候 panic
  • assertE2I2 用于将一个 eface 转换为另一个 iface,转换失败的时候不会 panic
  • assert 相关的方法后缀的 I2IE2E 里面的 I 表示的是 ifaceE 表示的是 eface
  • 2 后缀的允许失败,用于 v, ok := i.(T) 或者 switch x.(type) ... case
  • 不带 2 后缀的不允许失败,用于 i.(T) 这种形式中

当然,这里说的转换不是说直接转换,只是说,在转换的过程中会用到 assert* 方法。

如果我们足够细心,然后也去看了 assertI2IassertI2I2 的源码,就会发现,这几个方法本质上都是, 通过 interfacetype_type 来获取一个 itab 然后转换为另外一个 itab 或者 `iface。

同时,我们也应该注意到,上面的转换都是转换到 iface 而没有转换到 eface 的操作,这是因为,所有类型都可以转换为空接口(interface{},也就是 eface)。根本就不需要断言。

上面的内容可以结合下图理解一下:

itab 关键方法的实现

下面,让我们再来深入了解一下 itab 是怎么被创建出来的,以及是怎么保存到全局的哈希表中的。我们先来看看下图:

这个图描述了 go 底层存储 itab 的方式:

  • 通过一个 itabTableType 类型来存储所有的 itab
  • 在调用 getitab 的时候,会先根据 inter_type 计算出哈希值,然后从 entries 中查找是否存在,存在就返回对应的 itab,不存在则新建一个 itab
  • 在调用 itabAdd 的时候,会将 itab 加入到 itabTableType 类型变量里面的 entries 中,其中 entries 里面的键是根据 inter_type 做哈希运算得出的。

itab 两个比较关键的方法:

  • getitab 让我们可以通过 interfacetype_type 获取一个 itab,会现在缓存中找,找不到会新建一个。
  • itabAdd 是在我们缓存找不到 itab,然后新建之后,将这个新建的 itab 加入到缓存的方法。

getitab 方法的第三个参数 canfail 表示当前操作是否允许失败,上面说了,如果是用在 switch...case 或者 v, ok := i.(T) 这种是允许失败的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 获取某一个类型的 itab(从 itabTable 中查找,键是 inter 和 _type 的哈希值)
// 查找 interfacetype + _type 对应的 itab
// 找不到就新增。
func getitab(inter *interfacetype, typ *_type, canfail bool) *itab {
if len(inter.mhdr) == 0 {
throw("internal error - misuse of itab")
}

// 不包含 Uncommon 信息的类型直接报错
if typ.tflag&tflagUncommon == 0 {
if canfail {
return nil
}
name := inter.typ.nameOff(inter.mhdr[0].name)
panic(&TypeAssertionError{nil, typ, &inter.typ, name.name()})
}

// 保存返回的 itab
var m *itab

// t 指向了 itabTable(全局的 itab 表)
t := (*itabTableType)(atomic.Loadp(unsafe.Pointer(&itabTable)))
// 会先从全局 itab 表中查找,找到就直接返回
if m = t.find(inter, typ); m != nil {
goto finish
}

// 没有找到,获取锁,再次查找。
// 找到则返回
lock(&itabLock)
if m = itabTable.find(inter, typ); m != nil {
unlock(&itabLock)
goto finish
}

// 没有在缓存中找到,新建一个 itab
m = (*itab)(persistentalloc(unsafe.Sizeof(itab{})+uintptr(len(inter.mhdr)-1)*goarch.PtrSize, 0, &memstats.other_sys))
// itab 的
m.inter = inter
m._type = typ
m.hash = 0
// itab 初始化
m.init()
// 将新创建的 itab 加入到全局的 itabTable 中
itabAdd(m)
// 释放锁
unlock(&itabLock)
finish:
// == 0 表示没有任何方法
// 下面 != 0 表示有 inter 和 typ 有方法的交集
if m.fun[0] != 0 {
return m
}
// 用在 switch x.(type) 中的时候,允许失败而不是直接 panic
// 但在 x.(Flyable).Fly() 这种场景会直接 panic
if canfail {
return nil
}

// 没有找到有方法的交集,panic
panic(&TypeAssertionError{concrete: typ, asserted: &inter.typ, missingMethod: m.init()})
}

itabAdd 将给定的 itab 添加到 itab 哈希表中(itabTable)。

注意:itabAdd 中在判断到哈希表的使用量超过 75% 的时候,会进行扩容,新的容量为旧容量的 2 倍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 必须保持 itabLock。
func itabAdd(m *itab) {
// 正在分配内存的时候调用的话报错
if getg().m.mallocing != 0 {
throw("malloc deadlock")
}

t := itabTable
// 容量已经超过 75% 的负载了,hash 表扩容
if t.count >= 3*(t.size/4) {
// 75% load factor(实际上是:t.size *0.75)
// 扩展哈希表。原来 2 倍大小。
// 我们撒谎告诉 malloc 我们需要无指针内存,因为所有指向的值都不在堆中。
// 2 是 size 和 count 这两个字段需要的空间
t2 := (*itabTableType)(mallocgc((2+2*t.size)*goarch.PtrSize, nil, true))
t2.size = t.size * 2

// 复制条目。
// 注意:在复制时,其他线程可能会查找itab,但找不到它。
// 没关系,然后它们会尝试获取itab锁,因此等待复制完成。
iterate_itabs(t2.add) // 遍历旧的 hash 表,复制函数指针到 t2 中
if t2.count != t.count { // 复制出错
throw("mismatched count during itab table copy")
}

// 发布新哈希表。使用原子写入:请参见 getitab 中的注释。
// 使用 t2 覆盖 itabTable
atomicstorep(unsafe.Pointer(&itabTable), unsafe.Pointer(t2))
// 使用新的 hash 表
// 因为 t 是局部变量,指向旧的地址,
// 但是扩容之后是新的地址了,所以现在需要将新的地址赋给 t
t = itabTable
// 注:旧的哈希表可以在此处进行GC。
}
// 将 itab 加入到全局哈希表
t.add(m)
}

其实 itabAdd 的关键路径比较清晰,只是因为它是一个哈希表,所以里面在判断到当前 itab 的数量超过 itabTable 容量的 75% 的时候,会对 itabTable 进行 2 倍扩容。

根据 interfacetype 和 _type 初始化 itab

上面那个图我们说过,itab 本质上是 interfacetype_type 方法的交集,这一节我们就来看看,itab 是怎么根据这两个类型来进行初始化的。

itabinit 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// init 用 m.inter/m._type 对的所有代码指针填充 m.fun 数组。
// 如果该类型不实现接口,它将 m.fun[0] 设置为 0 ,并返回缺少的接口函数的名称。
// 可以在同一个m上多次调用,甚至同时调用。
func (m *itab) init() string {
inter := m.inter // 接口
typ := m._type // 实际的类型
x := typ.uncommon()

// inter 和 typ 都具有按名称排序的方法,并且接口名称是唯一的,因此可以在锁定步骤中迭代这两个;
// 循环时间复杂度是 O(ni+nt),不是 O(ni*nt)
ni := len(inter.mhdr) // 接口的方法数量
nt := int(x.mcount) // 实际类型的方法数量
// 实际类型的方法数组,数组元素为 method
xmhdr := (*[1 << 16]method)(add(unsafe.Pointer(x), uintptr(x.moff)))[:nt:nt] // 大小无关紧要,因为下面的指针访问不会超出范围
j := 0
// 用来保存 inter/_type 对方法列表的数组,数组元素为 unsafe.Pointer(是实际类型方法的指针)
methods := (*[1 << 16]unsafe.Pointer)(unsafe.Pointer(&m.fun[0]))[:ni:ni] // 保存 itab 方法的数组
// 第一个方法的指针
var fun0 unsafe.Pointer
imethods:
for k := 0; k < ni; k++ { // 接口方法遍历
i := &inter.mhdr[k] // i 是接口方法, imethod 类型
itype := inter.typ.typeOff(i.ityp) // 接口的方法类型
name := inter.typ.nameOff(i.name) // 接口的方法名称
iname := name.name() // 接口的方法名
ipkg := name.pkgPath() // 接口的包路径
if ipkg == "" {
ipkg = inter.pkgpath.name()
}

// 根据接口方法查找实际类型的方法
for ; j < nt; j++ { // 实际类型的方法遍历
t := &xmhdr[j] // t 是实际类型的方法,method 类型
tname := typ.nameOff(t.name) // 实际类型的方法名
// 比较接口的方法跟实际类型的方法是否一致
if typ.typeOff(t.mtyp) == itype && tname.name() == iname {
// 实际类型的包路径
pkgPath := tname.pkgPath()
if pkgPath == "" {
pkgPath = typ.nameOff(x.pkgpath).name()
}

// 如果是导出的方法
// 则保存到 itab 中
if tname.isExported() || pkgPath == ipkg {
if m != nil {
ifn := typ.textOff(t.ifn) // 实际类型的方法指针(通过这个指针可以调用实际类型的方法)
if k == 0 {
// 第一个方法
fun0 = ifn // we'll set m.fun[0] at the end
} else {
methods[k] = ifn
}
}
// 比较下一个方法
continue imethods
}
}
}
// 没有实现接口(实际类型没有实现 interface 中的任何一个方法)
m.fun[0] = 0
return iname // 返回缺失的方法名,返回值在类型断言失败的时候会需要提示用户
}
// 实现了接口
m.fun[0] = uintptr(fun0)
return ""
}

接口断言过程总览(类型转换的关键)

具体来说有四种情况,对应上面提到的 runtime.assert* 方法:

  • 实际类型转换到 iface
  • iface 转换到另一个 iface
  • 实际类型转换到 eface
  • eface 转换到 iface

这其中的关键是 interfacetype + _type 可以生成一个 itab

上面的内容可能有点混乱,让人摸不着头脑,但是我们通过上面的讲述,相信已经了解了 go 接口中底层的一些实现细节,现在,就让我们重新来捋一下,看看 go 接口到底是怎么实现的:

首先,希望我们可以达成的一个共识就是,go 的接口断言本质上是类型转换,switch...case 里面或 v, ok := i.(T) 允许转换失败,而 i.(T).xx() 这种不允许转换失败,转换失败的时候会 panic

接着,我们就可以通过下图来了解 go 里面的接口整体的实现原理了(还是以上面的代码作为例子):

  1. 将结构体赋值给接口类型:var f Flyable = Bird{}

在这个赋值过程中,创建了一个 iface 类型的变量,这个变量中的 itab 的方法表只包含了 Flyable 定义的方法。

  1. iface 转另一个 iface:
  • f.(Runnable)
  • _, ok := f.(Runnable)
  • switch f.(type) 里面的 caseRunnable

在这个断言过程中,会将 Flyable 转换为 Runnable,本质上是一个 iface 转换到另一个 iface。但是有个不同之处在于, 两个 iface 里面的方法列表是不一样的,只包含了当前 interfacetype 里面定义的方法。

  1. 将结构体赋值给空接口:var i interface{} = Bird{}

在这个过程中,创建了一个 eface 类型的变量,这个 eface 里面只包含了类型信息以及实际的 Bird 结构体实例。

  1. eface 转换到 iface
  • i.(Flyable)
  • _, ok := i.(Runnable)
  • switch i.(type) 里面的 caseFlyable

因为 _type 包含了 Bird 类型的所有信息,而 data 包含了 Bird 实例的值,所以这个转换是可行的。

panicdottypeI 与 panicdottypeE

从前面的几个小节,我们知道,go 的 iface 类型转换使用的是 runtime.assert* 几个方法,还有另外一种情况就是, 在编译期间编译器就已经知道了无法转换成功的情况,比如下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

type Flyable interface {
Fly()
}

type Cat struct {
}

func (c Cat) Fly() {
}

func (c Cat) test() {
}

// GOOS=linux GOARCH=amd64 go tool compile -N -S -l main.go > main.s
func main() {
var b interface{}
var _ = b.(int) // CALL runtime.panicdottypeE(SB)

var c Flyable = &Cat{}
c.(Cat).test() // CALL runtime.panicdottypeI(SB)
}

上面的两个转换都是错误的,第一个 b.(int) 尝试将 nil 转换为 int 类型,第二个尝试将 *Cat 类型转换为 Cat 类型, 这两个错误的类型转换都在编译期可以发现,因此它们生成的汇编代码调用的是 runtime.panicdottypeEruntime.panicdottypeI 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 在执行 e.(T) 转换时如果转换失败,则调用 panicdottypeE
// have:我们的动态类型。
// want:我们试图转换为的静态类型。
// iface:我们正在转换的静态类型。
// 转换的过程:尝试将 iface 的 have 转换为 want 失败了。
// 不是调用方法的时候的失败。
func panicdottypeE(have, want, iface *_type) {
panic(&TypeAssertionError{iface, have, want, ""})
}

// 当执行 i.(T) 转换并且转换失败时,调用 panicdottypeI
// 跟 panicdottypeE 参数相同,但是 hava 是动态的 itab 类型
func panicdottypeI(have *itab, want, iface *_type) {
var t *_type
if have != nil {
t = have._type
}
panicdottypeE(t, want, iface)
}

这两个方法都是引发一个 panic,因为我们的类型转换失败了:

iface 和 eface 里面的 data 是怎么来的?

我们先看看下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

type Bird struct {
}

func (b Bird) Fly() {
}

type Flyable interface {
Fly()
}

// GOOS=linux GOARCH=amd64 go tool compile -N -S -l main.go > main.s
func main() {
bird := Bird{}
var efc interface{} = bird // CALL runtime.convT(SB)
var ifc Flyable = bird // CALL runtime.convT(SB)
println(efc, ifc)
}

我们生成伪汇编代码发现,里面将结构体变量赋值给接口类型变量的时候,实际上是调用了 convT 方法。

convT* 方法

iface 里面还包含了几个 conv* 前缀的函数,在我们将某一具体类型的值赋值给接口类型的时候,go 底层会将具体类型的值通过 conv* 函数转换为 iface 里面的 data 指针:

1
2
3
4
5
6
7
8
9
10
// convT 将 v 指向的 t 类型的值转换为可以用作接口值的第二个字的指针(接口的第二个字是指向 data 的指针)。
// data(Pointer) => 指向 interface 第 2 个字的 Pointer
func convT(t *_type, v unsafe.Pointer) unsafe.Pointer {
// ... 其他代码
// 分配 _type 类型所需要的内存
x := mallocgc(t.size, t, true)
// 将 v 指向的值复制到刚刚分配的内存上
typedmemmove(t, x, v)
return x
}

我们发现,在这个过程,实际上是将值复制了一份:

iface.go 里面还有将无符号值转换为 data 指针的函数,但是还不知道在什么地方会用到这些方法,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 转换 uint16 类型值为 interface 里面 data 的指针。
// 如果是 0~255 的整数,返回指向 staticuint64s 数组里面对应下标的指针。
// 否则,分配新的内存地址。
func convT16(val uint16) (x unsafe.Pointer) {
// 如果小于 256,则使用共享的内存地址
if val < uint16(len(staticuint64s)) {
x = unsafe.Pointer(&staticuint64s[val])
if goarch.BigEndian {
x = add(x, 6)
}
} else {
// 否则,分配新的内存
x = mallocgc(2, uint16Type, false)
*(*uint16)(x) = val
}
return
}

个人猜测,仅仅代表个人猜测,在整数赋值给 iface 或者 eface 的时候会调用这类方法。不管调不调用,我们依然可以看看它的设计,因为有些值得学习的地方:

staticuint64s 是一个全局整型数组,里面存储的是 0~255 的整数。上面的代码可以表示为下图:

这个函数跟上面的 convT 的不同之处在于,它在判断整数如果小于 256 的时候,则使用的是 staticuint64s 数组里面对应下标的地址。 为什么这样做呢?本质上是为了节省内存,因为对于数字来说,其实除了值本身,没有包含其他的信息了,所以如果对于每一个整数都分配新的内存来保存, 无疑会造成浪费。按 convT16 里面的实现方式,对于 0~255 之间的整数,如果需要给它们分配内存,就可以使用同一个指针(指向 staticuint64s[] 数组中元素的地址)。

这实际上是享元模式。

Java 里面的小整数享元模式

go 里使用 staticuint64s 的方式,其实在 Java 里面也有类似的实现,Java 中对于小整数也是使用了享元模式, 这样在装箱的时候,就不用分配新的内存了,就可以使用共享的一块内存了,当然,某一个整数能节省的内存非常有限,如果需要分配内存的小整数非常大,那么节省下来的内存就非常客观了。 当然,也不只是能节省内存这唯一的优点,从另一方面说,它也节省了垃圾回收器回收内存的开销,因为不需要管理那么多内存。

我们来看看 Java 中的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Test {
public static void main(String[] args) {
Integer k1 = 127;
Integer k2 = 127;
System.out.println(k1 == k2); // true
System.out.println(k1.equals(k2)); // true

Integer k10 = 128;
Integer k20 = 128;
System.out.println(k10 == k20); // false
System.out.println(k10.equals(k20)); // true
}
}

Java 里面有点不一样,它是对 -128~127 范围内的整数做了享元模式的处理,而 go 里面是 0~255

上面的代码中,当我们使用 == 来比较 Integer 的时候,值相等的两个数,在 -128~127 的范围的时候,返回的是 true,超出这个范围的时候比较返回的是 false。 这是因为在 -128~127 的时候,值相等的两个数字指向了相同的内存地址,超出这个范围的时候,值相等的两个数指向了不同的地址。

Java 的详细实现可以看 java.lang.Integer.IntegerCache

总结

  • go 的的接口(interface)本质上是一种结构体,底册实现是 ifaceefaceiface 表示我们通过 type i interface{} 定义的接口,而 eface 表示 interface{}/any,也就是空接口。
  • iface 里面保存的 itab 中保存了具体类型的方法指针列表,data 保存了具体类型值的内存地址。
  • eface 里面保存的 _type 包含了具体类型的所有信息,data 保存了具体类型值的内存地址。
  • itab 是底层保存接口类型跟具体类型方法交集的结构体,如果具体类型实现了接口的所有方法,那么这个 itab 里面的保存有指向具体类型方法的指针。如果具体类型没有实现接口的全部方法,那么 itab 中的不会保存任何方法的指针(从 itab 的作用上看,我们可以看作是一个空的 itab)。
  • 不管 itab 的方法列表是否为空,interfacetype_type 比较之后生成的 itab 会缓存下来,在后续比较的时候可以直接使用缓存。
  • _type 是 go 底层用来表示某一个类型的结构体,包含了类型所需空间大小等信息。
  • 类型断言 i.(T) 本质上是 ifaceiface 的转换,或者是 efaceiface 的转换,如果没有第二个返回值,那么转换失败的时候会引发 panic
  • switch i.(type) { case ...} 本质上也是 ifaceefaceiface 的转换,但是转换失败的时候不会引发 panic
  • 全局的保存 itab 的缓存结构体,底层是使用了一个哈希表来保存 itab 的,在哈希表使用超过 75% 的时候,会触发扩容,新的哈希表容量为旧的 2 倍。
  • staticuint64s 使用了享元模式,Java 中也有类似的实现。

Go 中接口也是一个使用得非常频繁的特性,好的软件设计往往离不开接口的使用,比如依赖倒置原则(通过抽象出接口,分离了具体实现与实际使用的耦合)。 今天,就来给大家介绍一下 Go 中接口的一些基本用法。

概述

Go 中的接口跟我们常见的编程语言中的接口不太一样,go 里面实现接口是不需要使用 implements 关键字显式声明的, go 的接口为我们提供了难以置信的一系列的灵活性和抽象性。接口有两个特点:

  • 接口本质是一种自定义类型。(跟 Java 中的接口不一样)
  • 接口是一种特殊的自定义类型,其中没有数据成员,只有方法(也可以为空)

go 中的接口定义方式如下:

1
2
3
type Flyable interface {
Fly() string
}

接口是完全抽象的,不能将其实例化。但是我们创建变量的时候可以将其类型声明为接口类型:

1
var a Flyable

然后,对于接口类型变量,我们可以把任何实现了接口所有方法的类型变量赋值给它,这个过程不需要显式声明。 例如,假如 Bird 实现了 Fly 方法,那么下面的赋值就是合法的:

1
2
// Bird 实现了 Flyable 的所有方法
var a Flyable = Bird{}

go 实现接口不需要显式声明。

由此我们引出 go 接口的最重要的特性是:

  • 只要某个类型实现了接口的所有方法,那么我们就说该类型实现了此接口。该类型的值可以赋给该接口的值。
  • 因为 interface{} 没有任何方法,所以任何类型的值都可以赋值给它(类似 Java 中的 Object)

基本使用

Java 中的 interface(接口)

先看看其他语言中的 interface 是怎么使用的。

我们知道,很多编程语言里面都有 interface 这个关键字,表示的是接口,应该也用过,比如 Java 里面的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 定义一个 Flyable 接口
interface Flyable {
public void fly();
}

// 定义一个名为 Bird 的类,显式实现了 Flyable 接口
class Bird implements Flyable {
public void fly() {
System.out.println("Bird fly.");
}
}

class Test {
// fly 方法接收一个实现了 Flyable 接口的类
public static void fly(Flyable flyable) {
flyable.fly();
}

public static void main(String[] args) {
Bird b = new Bird();
// b 实现了 Flyable 接口,所以可以作为 fly 的参数
fly(b);
}
}

在这个例子中,我们定义了一个 Flyable 接口,然后定义了一个实现了 Flyable 接口的 Bird 类, 最后,定义了一个测试的类,这个类的 fly 方法接收一个 Flyable 接口类型的参数, 因为 Bird 类实现了 Flyable 接口,所以可以将 b 作为参数传递给 fly 方法。

这个例子就是 Java 中 interface 的典型用法,如果一个类要实现一个接口,我们必须显式地通过 implements 关键字来声明。 然后使用的时候,对于需要某一接口类型的参数的方法,我们可以传递实现了那个接口的对象进去。

Java 中类实现接口必须显式通过 implements 关键字声明。

go 中的 interface(接口)

go 里面也有 interface 这个关键字,但是 go 与其他语言不太一样。 go 里面结构体与接口之间不需要显式地通过 implements 关键字来声明的,在 go 中,只要一个结构体实现了 interface 的所有方法,我们就可以将这个结构体当做这个 interface 类型,比如下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import "fmt"

// 定义一个 Flyable 接口
type Flyable interface {
Fly() string
}

// Bird 结构体没有显式声明实现了 Flyable 接口(没有 implements 关键字)
// 但是 Bird 定义了 Fly() 方法,
// 所以可以作为下面 fly 函数的参数使用。
type Bird struct {
}

func (b Bird) Fly() string {
return "bird fly."
}

// 只要实现了 Flyable 的所有方法,
// 就可以作为 output 的参数。
func fly(f Flyable) {
fmt.Println(f.Fly())
}

func main() {
var b = Bird{}
// 在 go 看来,b 实现了 Fly 接口,
// 因为 Bird 里面实现了 Fly 接口的所有方法。
fly(b)
}

在上面这个例子中,Person 结构体实现了 Stringer 接口的所有方法,所以在需要 Stringer 接口的地方,都可以用 Person 的实例作为参数。

Go 中结构体实现接口不用通过 implements 关键字声明。(实际上,Go 也没有这个关键字)

go interface 的优势

go 接口的这种实现方式,有点类似于动态类型的语言,比如 Python,但是相比 Python,go 在编译期间就可以发现一些明显的错误。

比如像 Python 中下面这种代码,如果传递的 coder 没有 say_hello 方法,这种错误只有运行时才能发现:

1
2
def hello_world(coder):
coder.say_hello()

但如果是 go 的话,下面这种写法中,如果传递给 hello_world 没有实现 say 接口,那么编译的时候就会报错,无法通过编译:

1
2
3
4
5
6
7
type say interface {
say_hello()
}

func hello_world(coder say) {
coder.say_hello()
}

因此,go 的这种接口实现方式有点像动态类型的语言,在一定程度上给了开发者自由,但是也在语言层面帮开发者做了类型检查。

go 中不必像静态类型语言那样,所有地方都明确写出类型,go 的编译器帮我们做了很多工作,让我们在写 go 代码的时候更加的轻松。 interface 也是,我们无需显式实现接口,只要我们的结构体实现了接口的所有类型,那么它就可以当做那个接口类型使用(duck typing)。

空接口

go 中的 interface{} 表示一个空接口(在比较新版本中也可以使用 any 关键字来代替 interface{}),这个接口没有任何方法。因此可以将任何变量赋值给 interface{} 类型的变量。

这在一些允许不同类型或者不确定类型参数的方法中用得比较广泛,比如 fmt 里面的 println 等方法。

如何使用 interface{} 类型的参数?

这个可能是大部分人所需要关心的地方,因为这可能在日常开发中经常需要用到。

类型断言

当实际开发中,我们接收到一个接口类型参数的时候,我们可能会知道它是几种可能的情况之一了,我们就可以使用类型断言来判断 interface{} 变量是否实现了某一个接口:

1
2
3
4
5
6
7
8
9
10
11
12
func fly(f interface{}) {
// 第一个返回值 v 是 f 转换为接口之前的值,
// ok 为 true 表示 f 是 Bird 类型
if v, ok := f.(Flyable); ok {
fmt.Println("bird " + v.Fly())
}

// 断言形式:接口.(类型)
if _, ok := f.(Bird); ok {
fmt.Println("bird flying...")
}
}

在实际开发中,我们可以使用 xx.(Type) 这种形式来判断:

  • interface{} 类型的变量是否是某一个类型
  • interface{} 类型的变量是否实现了某一个接口

如,f.(Flyable) 就是判断 f 是否实现了 Flyable 接口,f.(Bird) 就是判断 f 是否是 Bird 类型。

另外一种类型断言方式

可能我们会觉得上面的那种 if 的判断方式有点繁琐,确实如此,但是如果我们不能保证 f 是某一类型的情况下,用上面这种判断方式是比较安全的。

还有另外一种判断方式,用在我们确切地知道 f 具体类型的情况:

1
2
3
func fly2(f interface{}) {
fmt.Println("bird " + f.(Flyable).Fly())
}

在这里,我们断言 fFlyable 类型,然后调用了它的 Fly 方法。

这是一种不安全的调用,如果 f 实际上没有实现了 Flyable 接口,上面这行代码会引发 panic。 而相比之下,v, ok := f.(Flyable) 这种方式会返回第二个值让我们判断这个断言是否成立。

switch...case 中判断接口类型

除了上面的断言方式,还有另外一种判断 interface{} 类型的方法,那就是使用 switch...case 语句:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func str(f interface{}) string {
// 判断 f 的类型
switch f.(type) {
case int:
// f 是 int 类型
return "int: " + strconv.Itoa(f.(int))
case int64:
// f 是 int64 类型
return "int64: " + strconv.FormatInt(f.(int64), 10)
case Flyable:
return "flyable..."
}
return "???"
}

编译器自动检测类型是否实现接口

上面我们说过了,在 go 里面,类型不用显式地声明实现了某个接口(也不能)。那么问题来了,我们开发的时候, 如果我们就是想让某一个类型实现某个接口的时候,但是漏实现了一个方法的话,IDE 是没有办法知道我们漏了的那个方法的:

1
2
3
4
5
6
7
8
9
10
11
type Flyable interface {
Fly() string
}

// 没有实现 Flyable 接口,因为没有 Fly() 方法
type Bird struct {
}

func (b Bird) Eat() string {
return "eat."
}

比如这段代码中,我们本意是要 Bird 也实现 Fly 方法的,但是因为没有显式声明,所以 IDE 没有办法知道我们的意图。 这样一来,在实际运行的时候,那些我们需要 Flyable 的地方,如果我们传了 Bird 实例的话,就会报错了。

一种简单的解决方法

如果我们明确知道 Bird 将来是要当做 Flyable 参数使用的话,我们可以加一行声明:

1
var _ Flyable = Bird{}

这样一来,因为我们有 BirdFlyable 类型的操作,所以编译器就会去帮我们检查 Bird 是否实现了 Flyable 接口了。 如果 Bird 没有实现 Flyable 中的所有方法,那么编译的时候会报错,这样一来,这些错误就不用等到实际运行的时候才能发现了

实际上,很多开源项目都能看到这种写法。看起来定义了一个空变量,但是实际上却可以帮我们进行类型检查。

这种解决方法还有另外一种写法如下:

1
var _ Flyable = (*Bird)(nil)

类型转换与接口断言

我们知道了,接口断言可以获得一个具体类型(也可以是接口)的变量,同时我们也知道了,在 go 里面也有类型转换这东西, 实际上,接口断言与类型转换都是类型转换,它们的差别只是:

interface{} 只能通过类型断言来转换为某一种具体的类型,而一般的类型转换只是针对普通类型之间的转换。

1
2
3
4
5
6
7
// 类型转换:f 由 float32 转换为 int
var f float32 = 10.8
i := int(f)

// 接口的类型断言
var f interface{}
v, ok := f.(Flyable)

如果是 interface{},需要使用类型断言转换为某一具体类型。

一个类型可以实现多个接口

上文我们说过了,只要一个类型实现了接口中的所有方法,那么那个类型就可以当作是那个接口来使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Writer interface {
Write(p []byte) (n int, err error)
}

type Closer interface {
Close() error
}

type myFile struct {
}

// 实现了 Writer 接口
func (m myFile) Write(p []byte) (n int, err error) {
return 0, nil
}

// 实现了 Closer 接口
func (m myFile) Close() error {
return nil
}

在上面这个例子中,myFile 实现了 WriteClose 方法,而这两个方法分别是 WriterCloser 接口中的所有方法。 在这种情况下,myFile 的实例既可以作为 Writer 使用,也可以作为 Closer 使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func foo(w Writer) {
w.Write([]byte("foo"))
}

func bar(c Closer) {
c.Close()
}

func test() {
m := myFile{}
// m 可以作为 Writer 接口使用
foo(m)
// m 也可以作为 Closer 接口使用
bar(m)
}

接口与 nil 不相等

有时候我们会发现,明明传了一个 nilinterface{} 类型的参数,但在我们判断实参是否与 nil 相等的时候,却发现并不相等,如下面这个例子:

1
2
3
4
5
6
7
8
9
10
func test(i interface{}) {
fmt.Println(reflect.TypeOf(i))
fmt.Println(i == nil)
}

func main() {
var b *int = nil
test(b) // 会输出:*int false
test(nil) // 会输出:<nil> true
}

这是因为 go 里面的 interface{} 实际上是包含两部分的,一部分是 type,一部分是 data,如果我们传递的 nil 是某一个类型的 nil, 那么 interface{} 类型的参数实际上接收到的值会包含对应的类型。 但如果我们传递的 nil 就是一个普通的 nil,那么 interface{} 类型参数接收到的 typedata 都为 nil, 这个时候再与 nil 比较的时候才是相等的。

嵌套的接口

在 go 中,不仅结构体与结构体之间可以嵌套,接口与接口也可以通过嵌套创造出新的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
type Writer interface {
Write(p []byte) (n int, err error)
}

type Closer interface {
Close() error
}

// 下面这个接口包含了 Writer 和 Closer 的所有方法
type WriteCloser interface {
Writer
Closer
}

WriteCloser 是一个包含了 WriterCloser 两个接口所有方法的新接口,也就是说, WriteCloser 包含了 WriteClose 方法。

这样的好处是,可以将接口拆分为更小的粒度。比如,对于某些只需要 Close 方法的地方,我们就可以用 Closer 作为参数的类型, 即使参数也实现了 Write 方法,因为我们并不关心除了 Close 以外的其他方法:

1
2
3
4
func foo(c Closer) {
// ...
c.Close()
}

而对于上面的 myFile,因为同时实现了 Writer 接口和 Closer 接口,而 WriteCloser 包含了这两个接口, 所以实际上 myFile 可以当作 WriteCloser 或者 WriterCloser 类型使用。

总结

  • 接口里面只声明了方法,没有数据成员。
  • go 中的接口不需要显式声明(也不能)。
  • 只要一个类型实现了接口的所有方法,那么该类型实现了此接口。该类型的值可以赋值给该接口类型。
  • interface{}/any 是空接口,任何类型的值都可以赋值给它。
  • 通过类型断言我们可以将 interface{} 类型转换为具体的类型。
  • 我们通过声明接口类型的 _ 变量来让编译器帮我们检查我们的类型是否实现了某一接口。
  • 一个类型可以同时实现多个接口,可以当作多个接口类型来使用。
  • nil 与值为 nilinterface{} 实际上不想等,需要注意。
  • go 中的接口可以嵌套,类似结构体的嵌套。

在上一篇文章《深入理解 go chan》中,我们讲解了 chan 相关的一些概念、原理等东西, 今天让我们再深入一下,读一下它的源码,看看底层实际上是怎么实现的。

整体设计

我们可以从以下三个角度看 chan 的设计(源码位于 runtime/chan.go,结构体 hchan 就是 chan 的底层数据结构):

  • 存储:chan 里面的数据是通过一个环形队列来存储的(实际上是一个数组,但是我们视作环形队列来操作。无缓冲 chan 不用存储,会直接从 sender 复制到 receiver
  • 发送:数据发送到 chan 的时候,如果 chan 满了,则会将发送数据的协程挂起,将其放入一个协程队列中,chan 空闲的时候会唤醒这个协程队列。如果 chan 没满,则发送队列为空。
  • 接收:从 chan 中接收数据的时候,如果 chan 是空的,则会将接收数据的协程挂起,将其放入一个协程队列中,当 chan 有数据的时候会唤醒这个协程队列。如果 chan 有数据,则接收队列为空。

文中一些比较关键的名词解释:

  • sender: 表示尝试写入 changoroutine
  • receiver: 表示尝试从 chan 读取数据的 goroutine
  • sendq 是一个队列,存储那些尝试写入 channel 但被阻塞的 goroutine
  • recvq 是一个队列,存储那些尝试读取 channel 但被阻塞的 goroutine
  • g 表示一个协程。
  • gopark 是将协程挂起的函数,协程状态:_Grunning => _Gwaiting
  • goready 是将协程改为可运行状态的函数,协程状态: _Gwaiting => _Grunnable

现在,假设我们有下面这样的一段代码,通过这段代码,我们可以大概看一下 chan 的总体设计:

1
2
3
4
5
6
7
8
9
10
11
12
package main

func main() {
// 创建一个缓冲区大小为 9 的 chan
ch := make(chan int, 9)
// 往 chan 写入 [1,2,3,4,5,6,7]
for i := 0; i < 7; i++ {
ch <- i + 1
}
// 将 1 从缓冲区移出来
<-ch
}

现在,我们的 chan 大概长得像下面这个样子,后面会详细展开将这个图中的所有元素:

上图为了说明而在 recvq 和 sendq 都画了 3 个 G,但实际上 recvq 和 sendq 至少有一个为空。因为不可能有协程正在等待接收数据的时候,还有协程的数据因为发不出去数据而阻塞。

数据结构

在底层,go 是使用 hchan 这个结构体来表示 chan 的,下面是结构体的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type hchan struct {
qcount uint // 缓冲区(环形队列)元素个数
dataqsiz uint // 缓冲区的大小(最多可容纳的元素个数)
buf unsafe.Pointer // 指向缓冲区入口的指针(从 buf 开始 qcount * elemsize 大小的内存就是缓冲区所用的内存)
elemsize uint16 // chan 对应类型元素的大小(主要用以计算第 i 个元素的内存地址)
closed uint32 // chan 是否已经关闭(0-未关闭,1-已关闭)
elemtype *_type // chan 的元素类型
sendx uint // chan 发送操作处理到的位置
recvx uint // chan 接收操作处理到的位置
recvq waitq // 等待接收数据的协程队列(双向链表)
sendq waitq // 等待发送数据的协程队列(双向链表)

// 锁
lock mutex
}

waitq 的数据结构如下:

1
2
3
4
type waitq struct {
first *sudog
last *sudog
}

waitq 用来保存阻塞在等待或接收数据的协程列表(是一个双向链表),在解除阻塞的时候,需要唤醒这两个队列中的数据。

对应上图各字段详细说明

hchan,对于 hchan 这个结构体,我们知道,在 go 里面,结构体字段是存储在一段连续的内存上的(可以看看《深入理解 go unsafe》),所以图中用了连续的一段单元格表示。

下面是各字段说明:

  • qcount: 写入 chan 缓冲区元素个数。我们的代码往 chan 中存入了 7 个数,然后从中取出了一个数,最终还剩 6 个,因此 qcount6
  • dataqsiz: hchan 缓冲区的长度。它在内存中是连续的一段内存,是一个数组,是通过 make 创建的时候传入的,是 9
  • bufhchan 缓冲区指针。指向了一个数组,这个数组就是用来保存发送到 chan 的数据的。
  • sendxrecvx:写、读操作的下标。指向了 buf 指向的数组中的下标,sendx 是下一个发送操作保存的下标,recvx 是下一个接收操作的下标。
  • recvqsendq: 阻塞在 chan 读写上的协程列表。底层是双向链表,链表的元素是 sudogsudog 是一个对 g 的封装),我们可以简单地理解为 recvqsendq 的元素就是 g(协程)。

g 和 sudog 是什么?

上面提到了 gsudogg 是底层用来表示协程的结构体,而 sudog 是对 g 的封装,记录了一些额外的信息,比如关联的 hchan

在 go 里面,协程调度的模型是 GMP 模型,G 代表协程、M 代表线程、P 表示协程调度器。我上图里面的 G 就是代表协程(当然,实际上是 sudog)。 还有一个下面会提到的就是 g0g0 表示 P 上启动的第一个协程。

GMP 模型是另外一个庞大的话题了,大家可以自行去了解一下,对理解本文也很有好处。因为在 chan 阻塞的时候实际上也是一个协程调度的过程。 具体来说,就是从 g 的栈切换到 g0 的栈,然后重新进行协程调度。这个时候 g 因为从运行状态修改为了等待状态,所以在协程调度中不会将它调度来执行, 而是会去找其他可执行的协程来执行。

创建 chan

我们的 make(chan int, 9) 最终会调用 makechan 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// chantype 是 chan 元素类型,size 是缓冲区大小
func makechan(t *chantype, size int) *hchan {
elem := t.elem

// compiler checks this but be safe.
// 检查元素个数是否合法(不能超过 1<<16 个)
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 判断内存是否对齐
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}

// mem 是 chan 缓冲区(环形队列)所需要的内存大小
// mem = 元素大小 * 元素个数
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

// 定义 hchan
var c *hchan
switch {
case mem == 0:
// 队列或者元素大小是 0(比如 make(chan int, 0))
// 只需要分配 hchan 所需要的内存
c = (*hchan)(mallocgc(hchanSize, nil, true))
// ...
case elem.ptrdata == 0:
// elem 类型里面不包含指针
// 分配的内存 = hchan 所需内存 + 缓冲区内存
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 分配的是连续的一段内存,缓冲区内存在 hchan 后面
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素类型里面包含指针
c = new(hchan)
// buf 需要另外分配内存
c.buf = mallocgc(mem, elem, true)
}

// 单个元素的大小
c.elemsize = uint16(elem.size)
// 元素类型
c.elemtype = elem
// 缓冲区大小
c.dataqsiz = uint(size)
// ...
}

创建 chan 的过程主要就是给 hchan 分配内存的过程:

  • 非缓冲 chan,只需要分配 hchan 结构体所需要的内存,无需分配环形队列内存(数据会直接从 sender 复制到 receiver
  • 缓冲 chan(不包含指针),分配 hchan 所需要的内存和环形队列所需要的内存,其中 buf 会紧挨着 hchan
  • 缓冲 chan(含指针),hchan 和环形队列所需要的内存单独进行分配

对应到文章开头的图就是,底下的 hchanbuf 那两段内存。

发送数据

<- 语法糖

《深入理解 go chan》中,我们说也过,<- 这个操作符号是一种语法糖, 实际上,<- 会被编译成一个函数调用,对于发送操作而言,c <- x 会编译为对下面的函数的调用:

1
2
3
4
5
// elem 是被发送到 chan 的数据的指针。
// 对于 ch <- x,ch 对应参数中的 c,unsafe.Pointer(&x) 对应参数中的 elem。
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}

另外,对于 select 里面的调用,chansend 会返回一个布尔值给 select 用来判断是否是要选中当前 case 分支。 如果 chan 发送成功,则返回 true,则 select 的那个分支得以执行。(select...case 本质上是 if...else,返回 false 表示判断失败。)

chansend 第二个参数的含义

chansend 第二个参数 true 表示是一个阻塞调用,另外一种是在 select 里面的发送操作,在 select 中的操作是非阻塞的。

1
2
3
4
5
6
7
8
9
package main

func main() {
ch := make(chan int, 2)
ch <- 1 // 如果 ch 满了,会阻塞
select {
case ch <- 3: // 非阻塞
}
}

select 中对 chan 的读写是非阻塞的,不会导致当前协程阻塞,如果是因为 chan 满或者空无法发送或接收, 则不会导致阻塞在 case 的某一个分支上,还可以继续判断其他 case 分支。

select 中的 send 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// go 代码:
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// 实际效果:
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
// select 里面往 chan 发送数据的分支,返回的 selected 表示当前的分支是否被选中
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}

chansend 发送实现

  1. 发送到 nil chanselect 中发送不阻塞,其他情况阻塞)

如果是在 selectcase 里面发送,则不会阻塞,其他情况会导致当前 goroutine 挂起,永远阻塞

示例代码:

1
2
3
4
5
6
7
8
// 下面的代码运行会报错:
var ch chan int
// 发送到 nil chan 会永久阻塞
ch <- 1
select {
// 这个发送失败,但是不会阻塞,可继续判断其他分支。
case ch <- 3:
}
  1. 发送到满了的 chanselect 中发送不阻塞,其他情况阻塞)

对于无缓冲而且又没有 receiver,或者是有缓冲但是缓冲满了的情况,发送也会阻塞(我们称其为 full,也就是满了,满了的 chan 是放不下任何数据了的,所以就无法再往 chan 发送数据了):

receiver 表示等待从 chan 接收数据的协程。

对于满了的 chan,什么时候可以再次发送呢?那就是receiver 接收数据的时候chan 之所以会满就是因为没有 receiver,也就是没有从 chan 接收数据的协程。

A. 对于无缓冲的 chan,在满了的情况下,当有 receiver 来读取数据的时候,数据会直接从 sender 复制到 receiver 中:

B. 对于有缓冲,但是缓冲满了的情况(图中 chan 满了,并且有两个 g 正在等待写入 chan):

这个发送过程大概如下:

  • receiverchan 中获取到 chan 队头元素,然后 chan 的队头元素出队。
  • 发送队列 sendq 对头元素出队,将其要发送的数据写入到 chan 缓冲中。最后,sendq 只剩下一个等待写入 chang

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

// 注意:以下代码可能不能正常执行,只是为了描述问题。
func main() {
// 情况 2.A.
var ch1 = make(chan int) // 无缓冲的 chan
ch1 <- 1 // 阻塞
select {
// 不阻塞,但是不会执行这个分支
case ch1 <- 1:
}

// 情况 2.B.
var ch2 = make(chan int, 1) // 有缓冲,缓冲区容量为 1
ch2 <- 1 // 1 写入之后,ch2 的缓冲区满了
go func() {
ch2 <- 2 // 阻塞,调用 gopark 挂起
}()
go func() {
ch2 <- 3 // 阻塞
}()
select {
// 不阻塞,但是不会执行这个分支
case ch2 <- 4:
}
}
  1. 发送到有缓冲,但是缓冲还没满的 chan(不阻塞,发送成功)

这种情况比较简单,就是将 sender 要发送的数据写入到 chan 缓冲区:

示例代码:

1
2
3
var ch = make(chan int, 1)
// 不阻塞,1 写入 chan 缓冲区
ch <- 1

chansend 源码解读

阻塞模式下,在发送的过程中,如果遇到无法发送成功的情况,会调用 gopark 来将协程挂起,然后当前协程陷入阻塞状态。

非阻塞模式下(select),在发送过程中,任何无法发送的情况,都会直接返回 false,表示发送失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// 参数说明:
// c 表示 hchan 实例
// ep 表示要发送的数据所在的地址
// block 是否是阻塞模式(select 语句的 case 里面的发送是非阻塞模式,其他情况是阻塞模式)
// 非阻塞模式下,遇到无法发送的情况,会返回 false。阻塞模式下,遇到无法发送的情况,协程会挂起。
// 返回值:表示是否发送成功。false 的时候,如果是 select 的 case,则表示没有选中这个 case。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 情况 1:nil chan
if c == nil {
// select 语句里面发送数据到 chan 的操作失败,直接返回 false,表示当前的 case 没有被选中。
if !block {
// select 分支没有被选中
return false
}
// 阻塞模式,协程挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

// ... 其他代码...

// 不获取锁的情况下快速失败。select 中 chan 满了的时候无法发送成功,直接返回 false,协程无需挂起。
// 场景:非阻塞模式、chan 未关闭、chan 已满(无缓冲且没有接收数据的协程、或者有缓冲但是缓冲区满)
if !block && c.closed == 0 && full(c) {
return false
}

// ... 其他代码...

// 获取锁
lock(&c.lock)

// 如果 chan 已经关闭,则释放锁并 panic,不能往一个已经关闭的 chan 发送数据
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// 情况 2.A,又或者是有缓冲但是缓冲区空,有一个正在等待接收数据的 receiver。
// 如果有协程在等待接收数据(说明 chan 缓冲区空、或者 chan 是无缓冲的)
// 则直接将元素传递给这个接收数据的协程,这样就避免了 sender -> chan -> receiver 这个数据复制的过程,效率更高。
// 返回 true 表示 select 的分支可以执行(发送成功)
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// 情况 3,发送到缓冲 chan,且 chan 未满
// 没有协程在等待接收数据。
// 缓冲区还有空余,则将数据写入到 chan 的缓冲区
if c.qcount < c.dataqsiz {
// 获取写入的地址
qp := chanbuf(c, c.sendx)
// 通过内存复制的方式写入
typedmemmove(c.elemtype, qp, ep)
// 写入的下标指向下一个位置
c.sendx++
// 如果到超出环形队列尾了,则指向第一个位置
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// chan 里面的元素个数加上 1
c.qcount++
// 释放锁
unlock(&c.lock)
// 发送成功,返回 true
return true
}

// 没有协程在接收数据,而且缓冲区满了。
// 如果是 select 语句里面的发送,则释放锁,返回 false
if !block {
unlock(&c.lock)
return false
}

// 发不出去,当前协程阻塞。
// 阻塞模式下,缓冲区满了,需要将当前协程挂起。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // chan 要操作的元素指针
mysg.waitlink = nil
mysg.g = gp // sudog 上的 g 属性
mysg.isSelect = false // 如果是 select,上面已经返回了,因此这里是 false
mysg.c = c // sudog 上的 c 属性
gp.waiting = mysg // g 正在等待的 sudog
gp.param = nil // 当通道操作唤醒被阻塞的 goroutine 时,它将 param 设置为指向已完成的阻塞操作的 sudog
c.sendq.enqueue(mysg) // 将 sudog 放入发送队列
// 在 chan 读写上阻塞的标志
gp.parkingOnChan.Store(true)
// 最关键的一步:将当前协程挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 保证 ep 指向的地址不被垃圾回收器回收
KeepAlive(ep)

// ...被唤醒了之后的一些收尾操作...

return true
}

// 参数说明:c 是 chan 实例,sg 是等待接收数据的 g,ep 是被发送进 chan 的数据,unlockf 是释放锁的函数。
// 空 chan 上发送,会直接发送给等待接收数据的协程。
// ep 指向的值会被复制到 sg 中(ep -> sg,ep 是被发送的值,sg 是要接收数据的 g)。
// 接收数据的协程会被唤醒。
// 通道 c 必须是空的并且获取了锁。send 会通过 unlockf 来释放锁。
// sg 必须已从 c 中退出队列(从 recvq 这个接收队列中移除)。
// ep 必须不能为 nil,同时指向堆或者调用者的栈。
// sg 是接收队列上的 g。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// ...其他代码...
// 如果没有忽略返回值,将值直接从 ep 复制到 sg 中
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
// 释放锁
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 最关键的一步:唤醒等待队列中的那个接收到数据的 g
//(也就是之前因为接收不到数据而被阻塞的那个 g)
goready(gp, skip+1)
}

// 参数:t 是 chan 的元素类型,sg 是接收数据的 g(协程),src 是被发送的数据的指针。
// 场景:无缓冲 chan、有缓冲但是缓冲区没数据。
// 作用:将数据直接从发送数据的协程复制到接收数据的协程。
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// 将 ep 的值直接复制到 sg 中
memmove(dst, src, t.size)
}

// full 报告 c 上的发送是否会阻塞(即通道已满)。
func full(c *hchan) bool {
// c.dataqsiz 是不可变的(创建 chan 后不会再去修改)
// 因此在 chan 操作期间的任何时间读取都是安全的。
if c.dataqsiz == 0 {
// 如果是非缓冲 chan,则看接收队列有没有数据,有则表明满了(没有正在发送的 g)
return c.recvq.first == nil
}
// 如果是缓冲 chan,只需要比较实际元素总数跟缓冲区容量即可
return c.qcount == c.dataqsiz
}

接收数据

<- 语法糖

在发送数据的那一节我们提到了,ch <- x 编译之后,实际上是对 chansend1 的函数调用。同样的,在接收数据的时候, <- 这个操作符也会根据不同情况编译成不同的函数调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// elem 是用来保存从 c 中接收到的值的地址的指针
// <- c 编译器处理之后实际上就是下面的这个函数调用。(从通道接收,但是忽略接收到的值)
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}

// received 表示是否是从 chan 中接收到的(如果 chan 关闭,则接收到的是零值,received 是 false)
// v, ok := <-c 编译之后的函数(从通道接收,第一个 v 对应 elem,第二个 ok 对应 received)
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

// select 里面的接收操作:
//
// select {
// case v, ok = <-c:
// ... foo
// default:
// ... bar
// }
//
// 实际 go 实现
//
// if selected, ok = selectnbrecv(&v, c); selected {
// ... foo
// } else {
// ... bar
// }
//
// select 里面从 chan 接收数据的分支,返回的 selected 表示当前的分支是否被选中,received 表示是否有数据被接收到
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}

还需要再提醒一下的是:chan 关闭之后,并且 chan 缓冲区所有数据被接收完之后,received 才会是 false,并不是一关闭 received 马上返回 false

chanrecv 函数 block 参数的含义

chansend 中的 block 参数的作用一样,用来判断是否是 select 模式下的接收操作,如果是,则在需要阻塞的时候不会阻塞,取而代之的是直接返回。

chanrecv 接收数据实现

  1. nil chan 接收(select 中接收不阻塞,其他情况阻塞)

nil chan 中读取的时候,如果是阻塞模式,会调用 gopark 将协程阻塞起来。

示例代码:

1
2
var ch chan int
<-ch
  1. 从空 chan 接收(select 中接收不阻塞,其他情况阻塞)

判断空的条件为:无缓冲并且没有等待发送数据的 g,或者有缓冲但是缓冲区无数据。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

// 注意:以下代码执行不了,只是展示一下实际中对应的代码
func main() {
// 情况 1,无缓冲的 chan,空的
var ch1 = make(chan int)
<-ch1 // 阻塞
select {
// 不阻塞,但是该分支不会执行
case <-ch1:

}

// 情况 2,有缓冲的 chan,空的
var ch2 = make(chan int, 1)
<-ch2 // 阻塞
select {
// 不阻塞,但是该分支不会执行
case <-ch2:

}
}
  1. 从缓冲区满的 chan 接收(不会阻塞,这个时候 sendq 一定不为空)

这种情况不会阻塞,上面已经有图了,这里不再贴了。

  1. 从缓冲区不满的 chan 接收(不会阻塞)

示例代码:

1
2
3
4
5
6
7
8
package main

func main() {
var ch = make(chan int, 2)
ch <- 1
// 从缓冲区没满的 chan 接收
<-ch
}

chanrecv 源码解读

chanrecv 函数:

  • 参数:cchan 实例,ep 是用来接收数据的指针,block 表示是否是阻塞模式。
  • 返回值:selected 表示 select 语句的 case 是否被选中,received 表示接收到的值是否有效。
  • 功能:从 c 这个通道接收数据,同时将接收到的数据写入到 ep 里。

概览:

  • ep 可能是 nil,这意味着接收到的值被忽略了(对应 <-c 这种形式的接收)。
  • 如果是非阻塞模式,并且通道无数据,返回 (false, false),也就是 select 语句中的 case 不会被选中。
  • 否则,如果 c 关闭了,会对 ep 指向的地址设置零值,然后返回 (true, false)。如果是 select 语句,意味被选中,
  • 但是 receivedfalse 表明返回的数不是通道关闭之前发送的。
  • 否则,将从通道中获取到的值写入 ep 指向的地址,并且返回 (true, true)
  • 一个非 nilep 必须指向堆或者调用者的栈。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// 从 c 读取数据,写入到 ep 指向的地址。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
// c 是 nil chan
if c == nil {
// select 里面的 case 不会被选中
if !block {
return
}
// 阻塞模式时,协程挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
// 在实际执行的时候,如果其他协程都执行完了,只剩下这一个协程(又或者全部协程都是睡眠状态,并且无法被唤醒的那种),那么会报错:
// "fatal error: all goroutines are asleep - deadlock!"
throw("unreachable")
}

// 如果是非阻塞模式(select),并且 c 是空的
if !block && empty(c) {
// chan 未关闭,并且是空的,返回 false,false
if atomic.Load(&c.closed) == 0 {
return
}

// chan 已经关闭,并且 chan 是空的
if empty(c) {
// ...
// 返回一个零值
if ep != nil {
typedmemclr(c.elemtype, ep)
}
// select 分支被选中,但是返回值是无效的,是一个零值
return true, false
}
}
// ...

// 获取锁
lock(&c.lock)

// chan 已关闭
if c.closed != 0 {
// chan 已经关闭,同时也没有数据
if c.qcount == 0 {
// ...
// 释放锁
unlock(&c.lock)
if ep != nil {
// 设置零值
typedmemclr(c.elemtype, ep)
}
// select 的分支被选中,但是返回值无效
return true, false
}
} else {
// chan 未关闭,并且有一个等待发送的元素(对应情况:chan 是满的或者无缓冲而且没有 receiver)
// 如果无缓冲:则将元素直接从 sender 复制到 receiver 中。
// 否则:意味着 c 的缓冲区满了,从环形队列中接收值,将 sg 需要发送的值添加到环形队列尾,
// 实际上这个时候,队列头和队列尾都是同一个位置,因为队列满了。
// 只不过,队列头和队列尾指向的位置会发生变化(都加 1,然后对缓冲区长度取模)。
if sg := c.sendq.dequeue(); sg != nil {
// 找到一个 sender。
// 如果无缓冲,直接从 sender 复制到 receiver
// 否则,环形队列对头元素复制给 receiver,sender 要发送的元素复制进环形队列队尾。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
// select 分支被选中,接收成功,并且接收的值是有效的。
return true, true
}
}

// 缓冲区有数据,并且缓冲区没满
if c.qcount > 0 {
// qp 是被接收元素的地址
qp := chanbuf(c, c.recvx)
// ...
// 将 qp 指向的值复制到 ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清空队列中 ep 的空间(设置为零值)
typedmemclr(c.elemtype, qp)
// 被接收的下标指向下一个元素
c.recvx++
// 环形队列,回到开头
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 缓冲区长度减 1
c.qcount--
// 释放锁
unlock(&c.lock)
// select 分支被选中,并且接收的值是有效的。
return true, true
}

// 缓冲区空的,并且是非阻塞(select)
if !block {
// 释放锁
unlock(&c.lock)
// 返回 false,false
return false, false
}

// 缓冲区空,并且是阻塞模式,同时没有等待发送的 g

// 没有 sender,阻塞
gp := getg()
mysg := acquireSudog()
// ...
// c 的 recvq,也就是等待接收的队列,在队尾添加当前的 g
c.recvq.enqueue(mysg)
// ...
// g 挂起,等待下一个发送数据的协程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

// ... 被唤醒后的操作 ...
return true, success
}

// recv 处理缓冲区已满的 chan 的接收操作(或者无缓冲,这个函数处理这两种情况)。
// 有两部分:
// 1. 等待发送数据的协程(sender),会将其要发送的数据放入 chan 中,然后这个协程会被唤醒
// 2. 被接收协程接收的值会写入到 ep 中
//
// 对于同步 chan(无缓冲 chan),两个值是同一个。
// 对于异步 chan,接收者从 chan 的缓冲区获取数据,发送方的输入放入 chan 缓冲区。
// 通道 c 必须已满并锁定。recv 会使用 unlockf 来解锁 c。
// sg 必须已经从 c 中移除(准确来说是 c.sendq)。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果无缓冲区
if c.dataqsiz == 0 {
// ...
// 直接将 sender 的要发送的值复制到 ep
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
// 有缓冲区,但是缓冲区满了。
// 从队列头获取元素,将要发送的值放入队列尾。(实际上操作的是同一个位置,因为环形队列满了)
// 需要获取的值的指针地址
qp := chanbuf(c, c.recvx)
// ...
// 如果需要接收值,则将 qp 复制到 ep(没有忽略返回值)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将要发送的值写入到 qp(sendq 对头元素要发送的值写入到 qp,也就是 chan 刚刚空出来的位置)
typedmemmove(c.elemtype, qp, sg.elem)
// 队列头、尾指针移动
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
// 释放锁
unlockf()
// ...
// 唤醒协程(这个被唤醒的协程是之前因为发送不出去被阻塞的协程)
goready(gp, skip+1)
}

// 将数据直接从 sender 复制到 receiver
// 场景:发送到无缓冲的 chan
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// dst 是 receiver 栈里保存接收值的地址,src 是 sender 栈里要被发送的值的地址
memmove(dst, src, t.size)
}

关闭 chan

chan 关闭的过程比较简单,修改 closed 为 1,然后唤醒发送队列和接收队列里面的 g,如果发送队列有 g,被唤醒之后会 panic,因为不能往一个已经关闭的 chan 发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 关闭 chan
func closechan(c *hchan) {
// 不能关闭 nil chan
if c == nil {
panic(plainError("close of nil channel"))
}

// 开启锁
lock(&c.lock)
if c.closed != 0 {
// chan 已经关闭,panic,不能重复关闭。释放锁
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

// ...
// 设置 closed 标志
c.closed = 1

// gList 用来保存阻塞在 chan 上的 g(链表,包括了 sender 和 receiver)
var glist gList

// 释放所有等待读取 chan 的协程(解除阻塞状态)
for {
// recvq 队头元素出队
sg := c.recvq.dequeue()
if sg == nil {
// sendq 已经没有元素了
break
}
// 关闭之后,从 chan 接收到的都是零值
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
// ...
glist.push(gp)
}

// 释放所有正在等待写入 chan 的协程(解除阻塞状态,这些协程会 panic)
for {
// sendq 队头元素出队
sg := c.sendq.dequeue()
if sg == nil {
// sendq 已经没有元素了
break
}
// ...
glist.push(gp)
}
// 释放锁
unlock(&c.lock)

// 将所有等待的协程修改为就绪态
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// g 状态修改为可运行状态
goready(gp, 3)
}
}

对于实际开发的作用

在上一篇文章和本文中,花了很大的篇幅来讲述 chan 的设计、实现与使用,这么多东西对我们有什么用呢?

其中非常重要的一个作用是,清楚地了解 chan 的工作机制,便于我们对程序实际运行情况进行分析, 尤其是一些非常隐晦的读写 chan 场景,毕竟稍有不慎就会导致协程泄漏,这对进程影响可能是非常大的。

比如下面的这种代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"runtime"
"time"
)

func main() {
for i := 0; i < 10000; i++ {
time.Sleep(time.Second)
go func() {
// 永远阻塞,协程泄漏
var ch chan int
ch <- 1
}()
// 我们会看到协程数量逐渐增长。
// 但是这部分挂起的协程永远不会被调度。
fmt.Printf("goroutine count: %d\n", runtime.NumGoroutine())
}
time.Sleep(time.Hour)
}

tips:在 chan 读写的地方需要注意自己的写法会不会让 goroutine 永远陷入阻塞,或者长时间阻塞。

总结

  • chan 底层是 hchan 结构体。
  • go 语法里面的 <- 不过是语法糖,在编译的时候,会编译成 hchan 相关的方法调用。最终都会调用 chansend 或者 chanrecvselect...case 里面的 chan 读写最终也会编译为对 chansendchanrecv 的调用。
  • chan 总体设计:维护了三个队列:
    • hchan.buf: chan 中暂存 sender 发送数据的队列(在有 receiver 读取的时候会从这个队列中复制到 receiver 中)
    • hchan.recvq: 接收队列,存储那些尝试读取 channel 但被阻塞的 goroutine
    • hchan.sendq: 发送队列,存储那些尝试写入 channel 但被阻塞的 goroutine
  • 读写 chan 的协程阻塞是通过 gopark 实现的,而从阻塞态转换为可运行状态是通过 goready 实现的。
  • chan 读写操作阻塞的时候,如果是在 select 语句中,则会直接返回(表示当前的分支没有被选中),否则,会调用 gopark 挂起当前协程。
  • 在关闭 chan 的时候,会调用 goready 唤醒阻塞在发送或者接收操作上的 g(协程)。
  • 无缓冲 chan 的操作有点特殊,对于无缓冲 chan,必须同时有 senderreceiver 才能发送和接收成功,否则另一边都会陷入阻塞(当然,select 不会阻塞)。

go 里面,在实际程序运行的过程中,往往会有很多协程在执行,通过启动多个协程的方式,我们可以更高效地利用系统资源。 而不同协程之间往往需要进行通信,不同于以往多线程程序的那种通信方式,在 go 里面是通过 channel (也就是 chan 类型)来进行通信的, 实现的方式简单来说就是,一个协程往 channel 里面写数据,然后其他的协程可以从 channel 中将其读取出来。 (注意:文中的 chan 表示是 go 语言里面的 chan 关键字,而 channel 只是我们描述它的时候用的一个术语)

通道(chan)的模型

在开始讲 channel 之前,也许了解一下它要解决什么样的问题会比较好,所以先来聊聊一些背景知识。

关于通道,一个比较潦草的图大概是下面这个样子的:

在图中,协程 A 将消息 msg 写入到 channel 中,然后协程 Bchannel 中读取消息,如果 B 没来得及从中读取消息,那么消息会在 chan 中存留。

这就是 go 的哲学:通过通信来实现共享内存。这不同于以往的多线程程序,在多线程程序中,往往是一块内存在不同线程之间进行共享, 然后通过一些保护机制,保证不允许多个线程同时对这块内存进行读写,比如通过 synchronized 关键字。 可能很多人都没有真正写过多线程的程序,但好像我们都有一种共识,多线程不安全。

多线程为什么不安全?

这是因为我们的程序除了通过共享一段内存之外,每一个 CPU 核心都有它本地的缓存,而 CPU 上的缓存是不共享的, 而线程可以同时在不同的 CPU 上执行。CPU 的执行过程是,先从内存中读取数据到 CPU 中,CPU 做完计算再更新到内存中。 这样一来,就有可能存在不同线程对同一段内存同时读写的问题。

这是什么问题呢?比如,A 线程计算完了但是还没有写回内存的时候,B 线程从内存读取出了 A 线程写入计算结果前的数据, 但是按我们的逻辑,B 应该是拿 A 线程的结算结果来进行逻辑运算的,这样就会出现数据不一致了,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Main {
int a = 0;

public static void main(String[] args) throws InterruptedException {
Main main = new Main();
main.run();
}

// 将 a 加 1
private void add() {
a++;
}

public void run() throws InterruptedException {
// 启动两个线程来对 a 进行加 1 的操作
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
add();
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
add();
}
});
// 启动线程
t1.start();
t2.start();

// 等待线程结束
t1.join();
t2.join();

// 我们的预期结果是 20000,但是实际运行显示了 14965
System.out.println(a);
}
}

在上面的代码中,我们预期的运行结果是 20000 的,但是实际得到了 14965(实际上,每次执行结果都会不一样),这也就是我上面所说的问题, 其中有一个线程读取到了另一个线程的计算结果写入内存前的数据,也就是说,这个线程的计算结果被覆盖了, 因为线程将计算结果写回内存的时候是相互覆盖的。

所以我们可以回答刚才的问题了,多线程不安全是因为多个线程可以对同一段内存进行读写,这就存在其中一个线程还没来得及更新内存, 然后另一个线程读取到的数据是旧的。(也即数据竞争的问题)

具体可以看下图:

CPU 执行的时候,会需要将数据从内存读取到 CPU 中,计算完毕之后,再更新内存里面的数据。

错乱发生的过程大概如下:

  1. CPU 1 先计算完了,计算的结果是 a = 3,但是还没来得及写入内存
  2. CPU 2 也从内存里面获取 a 来进行计算,但是这个时候 a 还没有被 CPU 1 更新,所以 CPU 2 拿到的还是 2
  3. CPU 2 进行计算的时候,CPU 1 将它的计算结果写入了内存,所以这个时候内存中的 a 是 3
  4. CPU 2 计算完毕,将等于 2 的变量 a 加 1 得到结果 3
  5. CPU 2 将结果 3 写入到内存,这个时候 a 的内存被更新,但是结果依然是 3

一种可行的办法 - 锁

其中一种可行的办法就是,给 add 方法加上 synchronized 关键字:

1
2
3
private synchronized void add() {
a++;
}

这个时候,在我们的代码中,对 a 读写的代码都被 synchronized 保护起来了,在这段更新之后的代码中,我们得到了正确的结果 20000

a++ 其实包含了读和写两个操作,程序运行的时候,会先将 a 读取出来,将其加上 1,然后写回到内存中。

synchronized 是同步锁,它修饰的方法不允许多个线程同时执行。synchronized 锁的粒度可大可小,粒度太大的话对性能影响也较大。

正如我们所看到的那样,synchronized 允许修饰一段代码,但是在实际中我们往往只是想保护其中某一个变量而已, 如果直接使用 synchronized 关键字来修饰一大段代码,那就意味着一个线程在执行这段代码的时候,其他线程就只能等待, 但是实际上,其中那些不涉及数据竞争的代码我们也无法执行,这样效率自然会降低,具体降低多少,取决于我们 synchronized 块的代码有多大。

go 中的处理办法

上面我们说到的多线程是通过共享内存来进行通信的,而在 go 里面,采用了 CSP(communicating sequential processes)并发模型, CSP 模型用于描述两个独立的并发实体通过共享 channel(管道)进行通信的并发模型。

CSP 是一套很复杂的东西,go 语言并没有完全实现它,仅仅是实现了 processchannel 这两个概念。process 就是 go 语言 中的 goroutine,每个 goroutine 之间是通过 channel 通讯来实现数据共享的。

然后我们上面说到,java 里面的 synchronized 关键字的粒度可能会比较大,这个是相比 go 里面的 channel 而言的, 在 go 里面,我们的代码在通信过程中很常见的一种阻塞场景是:

  • goroutine 需要从 channel 读取数据才能继续执行,但是 channel 里面还没数据,这个时候 goroutine 需要等待(会阻塞)另一个 goroutinechannel 写入数据。

对于这种场景,它隐含的逻辑是,阻塞的这个 goroutine 需要等待其他 goroutine 的结果才能继续往下执行,也就是 CSP 中的 sequential。下图是实际运行中的 chan

我们上面的 chan 模型那个图,读和写都只有一个协程,但在实际中,读 chan 和写 chan 的协程都有一个队列来保存。 我们需要明确的一点事实是:队列中的协程会一个接一个执行,队列头的协程先执行,然后我们对 chan 的读写是按顺序来读写的,先取 chan 队列头的元素,然后下一个元素

对应到上面 java 这个例子,我们在 go 里面可以怎么做呢?我们先把没有锁的 java 代码先写成 go 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import "fmt"

var a = 0

func add(ch chan int, done chan<- struct{}) {
for i := 0; i < 10000; i++ {
a++
}
done <- struct{}{}
}

func main() {
done := make(chan struct{}, 2)

// ch 充当协程之间同步的角色
ch := make(chan int, 1)
// 这里可以传任意数字
ch <- 1

go add(ch, done)
go add(ch, done)

// 等待 2 个协程执行完毕
<-done
<-done
fmt.Println(a) // 15504 每次结果不一样
}

在 go 里面,我们可以把 add 方法改成下面这个样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
func add(ch chan int, done chan<- struct{}) {
for i := 0; i < 10000; i++ {
// 阻塞,只有另外一个协程往 ch 里面写入数据的时候,
// <-ch 才得以解除阻塞状态
<-ch
// 这一行同一时刻只能一个协程执行
a++
// 往 ch 写入数据,
// 等待从 ch 中读取数据的协程得以继续执行
ch <- i
}
done <- struct{}{}
}

这种写法看起来很笨拙,我们在实际使用中可能会稍有不同,所以不需要太纠结这个例子的合理性,这里想表达的是:在 go 中,我们的协程使用 chan 的时候只会阻塞在 chan 读写的地方,其他代码不受影响,当然,这个例子也没能很好体现。

假设我们有很大一段代码,但是涉及到数据竞争的时候,协程只会阻塞在 chan 读写的那一行代码上。这样一来我们就不用通过锁来覆盖一大段代码。

这里,我们可以看到 chan 其中一个很明显的优势是,我们没有了 synchronized 那种大粒度的锁,我们的 goroutine 只会阻塞在某一个 channel 上, 在读取 channel 之前的代码,goroutine 都是可以执行的,这样就在语言层面帮我们解决了一个很大的问题, 因为粒度更小,我们的代码自然也就能处理更大的并发请求了。

进程的几种状态

在开始讲述 channel 之前,再来回忆一下进程的几种状态会便于我们理解。

我们知道,我们的电脑上,同一时刻会有很多进程一直在运行,但是我们也发现很多进程的 CPU 占用其实都是 0%,也就是不占用 CPU。 其实进程会有几种状态,进程不是一直在运行的,一般来说,会有 执行阻塞就绪 几种状态,进程不是运行态的时候,那它就不会占用你的 CPU,因此会看到 CPU 占用是 0%,它们之间的转换如下图:

  • 执行:这表示进程正在运行中,是正在使用 CPU 的进程。在就绪状态的进程会在得到 CPU 时间片的时候得以执行。
  • 阻塞:这表示进程因为某些需要的资源得不到满足而挂起了(比如,正在进行磁盘读写),这种状态下,是不用占用 CPU 资源的。
  • 就绪:这表示一个状态所需要的资源都准备好了,可以继续执行了。

进程的几种状态跟 channel 有什么关系?

在 go 里面,其实协程也存在类似的调度机制,在协程需要的资源得不到满足的时候,也会被阻塞,然后协程调度器会去执行其他可以执行的协程。

比如下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
done := make(chan struct{})
// 这个协程在 main 协程序阻塞的时候依然在执行
go func() {
// 陷入睡眠状态
time.Sleep(time.Second)
fmt.Println("done")
// 往 done 这个 chan 写入数据
done <- struct{}{}
}()
// main 协程陷入阻塞状态
<-done
}

在这个例子中 done <- struct{}{} 这一行往 done 这个 chan 写入了数据,之前一直在等待 chanmain 协程的阻塞状态解除,得以继续执行。

goroutine 在等待 chan 返回数据的时候,会陷入阻塞状态。一个因为读取 chan 陷入阻塞状态的 goroutine 在获取到数据的时候,会继续往后执行。

channel 是什么?

我们在文章开头的第一张图,其实不是很准确。在 go 里面,channel 实际上是一个队列(准确来说是环形队列),大概长得像下面这样:

队列我们都知道,我们可以从队列头读取数据,也可以将数据推入到队列尾。上图中,1 是队列头,当我们从 channel 读取数据的时候, 读取到的是 16 是队列尾,当我们往 channel 中写入数据的时候,写入的位置是 6 后面的那个空间。

channel 是一个环形队列,goroutine 通过 channel 通信的方式是,一个 goroutine 将数据写入队列尾,然后另一个 goroutine 将数据从队列头读数据。

如何使用 channel

我们再仔细看看上面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"time"
)

func main() {
done := make(chan struct{})
go func() {
time.Sleep(time.Second)
fmt.Println("done")
// 发送取消信号
done <- struct{}{}
}()
// 等待结束信号
<-done
}

这里面包含了使用 channel 的基本用法:

  • done := make(chan struct{}):创建 channel,在 go 里面是使用 chan 关键字来代表一个 channel 的。而在这个语句中,创建了一个接收 struct{} 类型数据的 chan
  • done <- struct{}{}:写入到 chan,这里,我们创建了一个空结构体,然后通过 <- 操作符将这个空结构体写入到了 chan 中。
  • <-done:从 chan 中读取数据,也是使用了 <- 操作符,然后我们丢弃了它的返回结果。

这段代码的执行过程如下图:

  1. CPU 1 上启动了 main 协程
  2. 接着在 main 协程中通过 go func 启动了一个新的协程,go 的调度机制允许不同的协程在不同的线程上执行,所以 main 执行的时候,go func 也在执行,然后,因为 done 这个 chan 中没有数据,所以 main 协程陷入阻塞。
  3. go func 在短暂的睡眠之后,输出了 done,然后向名字为 done 这个 chan 中发送了一个空结构体实例。
  4. done 里面没有写入数据之前,main 一直阻塞,在 go func 写入数据之后,main<-done,解除了阻塞状态,得以继续执行
  5. 56 因为可能是在不同的线程上执行的,所以哪一个先结束其实不一定。

下面详细说说 channel 的具体用法

创建 chan

chan 是 go 的关键字,channel 是我用来描述 chan 所表示的东西的一个术语而已,我们在 go 里面使用的话还是得用 chan 关键字。

创建 chan 是通过 make 关键字创建的:

1
ch := make(chan int)

make 函数的参数是 chan 然后加一个数据类型,这个数据类型是我们的 chan 这个环形队列里面所能存储的数据类型。 不能传递不同的类型进一个 chan 里面。

也可以传递第二个参数作为 chan 的容量,比如:

1
ch := make(chan int, 3)

这里第二个参数表明了 ch 这个 chan 到底能存储多少个 int 类型的数据。

不传递或者传 0 表示 chan 本身不能存储数据,go 底层会直接在两个 goroutine 之间传递,而不经过 chan 的复制。 (如果第二个参数大于 0,我们往 chan 写数据的时候,会先复制到 chan 这个数据结构,然后其他的 goroutinechan 中读取数据的时候,chan 会将数据复制到这个 goroutine 中)

chan 读写的几种操作

  • 写:ch <- x,将 x 发送到 channel 中
  • 读:x = <-ch,从 channel 中接收,保存到 x
  • 读,但是忽略返回值(用作协程同步,上面的例子就是):<-ch,从 ch 中接收,但是忽略接收到的结果
  • 读,并且判断是否是关闭前发送的:x, ok := <-ch,这里使用了两个返回值接收,第二个返回值表明了接收到的 x 是不是 chan 关闭之前发送进去的,true 就代表是。

需要注意的是 <-chch<- 这两个看起来好像一样,但是效果是完全不同的,ch 位于 <- 操作符右边的时候, 表示是

有一个简单区分的方法是,将 <- 想象为数据流动的方向,具体来说就是看数据是流向 chan 还是从 chan 流出,流向 chan 就是写入到 chan,从 chan 流出就是读取。

缓冲 chan 与非缓冲 chan

上面我们说到,创建 chan 的时候可以传递第二个参数表示 chan 的容量是多少,这个容量表示的是, 在没有 goroutine 从这个 chan 读取数据的时候,chan 能存放多少数据,也就是 chan 底层环形队列的长度。

下面描述了缓冲的实际场景:

无缓冲 chan

还是用我们上面的那段代码:

1
2
3
4
5
6
7
8
9
10
11
12
package main

import "fmt"

func main() {
done := make(chan struct{})
go func() {
fmt.Println("done")
done <- struct{}{}
}()
<-done
}

这里 make(chan struct{}),只有一个参数,所以 done 是一个无缓冲的 chan,这种 chan 会在发送的时候阻塞,直到有另一个协程从 chan 中获取数据。

有缓冲 chan

有缓冲的 chan 在协程往里面写入数据的时候,可以进行缓冲。缓冲的作用是,在需要读取 chan 的 goroutine 的处理速度比较慢的时候,写入 chan 的 goroutine 也可以持续运行,直到写满 chan 的缓冲区

上图的 chan 是一个有缓冲的 chan,在 chan 里面的数据还没来得及被接收的时候,chan 可以充当一个缓冲的角色。但是,如果 chan 的数据一直没有被接收,然后满了的时候,往 chan 写入数据的协程依然会陷入阻塞。但这种阻塞状态会在 chan 的数据被读取的时候解除。

下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"time"
)

func main() {
done := make(chan struct{})
// 定义一个缓冲数量为 2 的 chan
ch := make(chan int, 2)
go func() {
for {
// 模拟比较慢的处理速度
time.Sleep(time.Second)

i, ok := <-ch
// ok 为 false 表示 ch 已经关闭并且数据已经被读取完
// 这个时候中断循环
if !ok {
break
}

fmt.Printf("[%d] get from ch: %d\n", time.Now().Unix(), i)
}
// 处理完数据之后,发送结束的信号
done <- struct{}{}
}()
go func() {
// 在循环结束之后关闭 chan
defer close(ch)
for i := 0; i < 3; i++ {
// 在写入 2 个数之后,会陷入阻塞状态
// 直到上面那个协程从 ch 读取出数据,ch 才会有空余的地方可以继续接收数据
ch <- i
fmt.Printf("[%d] write to ch: %d\n", time.Now().Unix(), i)
}
}()
// 收到结束信号,解除阻塞状态,继续往下执行
<-done
}

输出如下:

1
2
3
4
5
6
[1669381752] write to ch: 0
[1669381752] write to ch: 1
[1669381753] get from ch: 0
[1669381753] write to ch: 2
[1669381754] get from ch: 1
[1669381755] get from ch: 2

我们可以看到,写入 chan 的协程在 1669381752 的时候没有写入了,然后在读取 chan 的协程从 chan 中读取了一个数出来后才能继续写入。

nil chan

chan 的零值是 nilclose 一个 nil 通道会引发 panic。往 nil 通道写入或从中读取数据会永久阻塞:

1
2
3
4
5
6
package main

func main() {
var ch chan int
<-ch
}

执行的时候会报错:fatal error: all goroutines are asleep - deadlock!

len 和 cap

  • len:通过 len 我们可以查询一个 chan 的长度,也就是有多少被发送到这个 chan 但是还没有被接收的值。
  • cap:通过 cap 可以查询一个容道的容量,也就是我们传给 make 函数的第二个参数,它表示 chan 最多可以容纳多少数据。

如果 channil,那么 lencap 都会返回 0。

chan 的方向

chan 还有一个非常重要的特性就是它是可以有方向的,这里说的方向指的是,数据的流向。在我们上面的例子中,数据既可以流入 chan,也可以从 chan 中流出,因为我们没有指定方向,没有指定那么 chan 就是双向的。

具体来说,有以下几种情况:

  • chan,没有指定方向,既可以读又可以写。
  • chan<-,只写 chan,只能往 chan 中写入数据,如果从中读数据的话,编译不会通过。
  • <-chan,只读 chan,只能从 chan 中读取数据,如果往其中写入数据的话,编译不会通过。

另外,无方向的 chan 可以转换为 chan<- 或者 <-chan,但是反过来不行

在实际使用 chan 的时候,在某些地方我们其实是只允许往 chan 里面写数据,然后另一个地方只允许从 chan 中读数据。比如下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import "fmt"

var done = make(chan struct{})

// ch 是只写 chan,如果在这个函数里面从 ch 读取数据编译不会通过
func producer(ch chan<- int) {
for i := 0; i < 3; i++ {
ch <- i
fmt.Printf("produce %d\n", i)
}
// 发送 3 个数之后,关闭 chan
close(ch)
}

// ch 是只读 chan,如果在这个函数里往 ch 写入数据编译不会通过
func consumer(ch <-chan int) {
for {
i, ok := <-ch
if !ok {
// chan 的数据已经被全部接收完,
// 发送 done 信号
done <- struct{}{}
break
}
fmt.Printf("consume %d\n", i)
}
}

func main() {
nums := make(chan int, 10)
go producer(nums)
go consumer(nums)
// 收到结束信号之后继续往下执行
<-done
}

在这个例子中,producer 这个协程里面往 chan 写入数据,写入 3 个数之后关闭,然后 consumer 这个协程序从 chan 读取数据, 在读取完所有数据之后,发送结束信号(通过 done 这个 chan),最后 main 协程收到 done 信号后退出。

这样有个好处就是,从语法层面限制了对 chan 的读写操作。而不用担心有误操作。

什么时候阻塞?什么时候不阻塞?

在开始这个话题之前,很有必要说一下,go 里面 chan 的一些实现原理,在 chan 的实现中,维护了三个队列:

  • 数据缓冲队列(chan):也就是上面说的环形队列,是一种先进先出结构(FIFO,"First In, First Out"),它的长度是 chan 的容量。此队列存放的都是同一种类型的元素。
  • 接收数据协程队列(recvq):当 chan 里面没有数据可以读取的时候,这个队列会有数据,这个队列中的协程都在等待从 chan 中读取数据。
  • 发送数据协程队列(sendq):当数据缓冲队列满了的时候(又或者如果是一个无缓冲的 chan),那么这个队列不为空,这个队列中的协程都在等待往 chan 中写入数据。

大家在实际使用的时候可以参考一下下图,下图列出了对 chan 操作的所有场景:

对于阻塞或者非阻塞,其实有一个很简单的判断标准,下面描述了所有会阻塞的情况:

  • 发送:如果没有地方能存放发送的数据,则阻塞,具体有下面几种情况:
    • nil chan
    • 有缓冲但是缓冲满了
    • 无缓冲并且没有协程在等待从 chan 中读取数据
  • 接收:如果没有可以读取的数据,则阻塞,具体有下面几种情况:
    • nil chan
    • 有缓冲,但是缓冲区空的
    • 无缓冲,但是没有协程正在往 chan 中发送数据

大家觉得抽象可以结合下面这个图想象一下:

结合现实场景想象一下,我们可以把 chan 想象成为配送员,sendq 想象为商家,recvq 想象成用户,配送员装餐点的箱子想象成缓冲区:

一个假设的前提:假设商家只能在送出去一份餐点后,才能开始制作下一份餐点。

  • 发送
    • nil chan。没有配送员了,商家的餐点肯定是送不出去了,商家只能等着关门大吉了。
    • 有缓冲但是缓冲满了。配送员会有一个箱子(缓冲区)来存放外卖,但是这个箱子现在满了,虽然接了一个单,但是没有办法再从商家那里取得外卖来送了
    • 无缓冲并且没有协程在等待从 chan 中读取数据。这个外卖是用户自取的订单,但是用户联系不上。(当然现实中商家不用等,我们假设现在商家只能送出去一份后才能开始制作下一份)
  • 接收
    • nil chan。没有配送员,用户的餐没人送,用户只能等着饿死了。
    • 有缓冲,但是缓冲区空的。商家还没制作好餐点,配送员没有取到餐,这个时候用户打电话给配送员叫他快点送,但是这个时候配送员也没有办法,因为他也没有拿到用户的餐点。这个时候用户快饿死了,但也没有办法,只有干等着,先吃饱才能搬砖。
    • 无缓冲,但是没有协程正在往 chan 中发送数据。这天,用户是下了自取的订单,然后去到店里的时候,商家还没做好,这个时候,用户啥事也干不了,也只能等了。

需要注意的是,上图中发送和接收只有一个协程,但是在实际中,正如这一节开头讲的那样,发送和接收都维护了一个队列的。 对应到上面那个现实的例子,那就是配送员可以同时从多个商家那里取餐,也可以同时给多个用户送餐,这个过程,有可能多个商家在制作需要这个配送员配送的餐点,也有可能有多个用户在等着这个配送员送餐。

<- 操作符只是语法糖

在 go 里面我们操作 chan 的方式好像非常简单,就通过 <- 操作符就已经绰绰有余了,这也是 go 的设计理念吧,尽量把语言设计得简单。 (但是,简单并不容易)但是,从另外一个角度看,go 把对 chan 的操作简化成我们现在看到的这个样子,也说明了 chan 在 go 里面的地位(一等公民)。

在 go 中,chan 实际上是一个结构体(runtime/chan.go 里面的 hchan 结构体),而且,还是一个非常复杂的结构体,但是我们在使用的时候却非常简单, 这其实是 go 设计者给开发者提供的一种语法糖,直接在语法层面极大地简化了开发者对 chan 的使用,

如果没有这个语法糖,那就需要开发者自己去创建 hchan 结构体,然后发送或者接收的时候还需要调用这个结构体的方法。 相比之下,<- 就写一个操作符就行了,而且这个符号还非常形象,指向哪就代表了数据是流向 chan (写)还是从 chan 流出(读)。

for...range 语法糖

我们上面说过了,从 chan 读取数据的时候,可能需要用两个值来接收 chan 的返回值,第二个值用来判断接收到的值是否是 chan 关闭之前发送的。

for...range 语法也可以用来从 chan 中读取数据,它会循环,直到 chan 关闭,这样直接免去了我们判断的操作,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import "fmt"

func main() {
done := make(chan struct{})

nums := make(chan int)
go func() {
for i := 0; i < 3; i++ {
fmt.Printf("send %d\n", i)
nums <- i

}
close(nums)
}()

go func() {
// 传统写法
//for {
// num, ok := <-nums
// if !ok {
// break
// }
// fmt.Printf("receive %d\n", num)
//}

// range 语法糖
for num := range nums {
fmt.Printf("receive %d\n", num)
}
done <- struct{}{}
}()

<-done
}

select 语句里面使用 chan

go 里面有一个关键字 select,可以让我们同时监听几个 chan,在任意一个 chan 有数据的时候,select 里面的 case 块得以执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan int)
ch2 := make(chan int)

// ch1 会先收到数据
go func() {
time.Sleep(time.Second)
ch1 <- 1
}()
go func() {
time.Sleep(time.Second * 2)
ch2 <- 1
}()

// select 会阻塞,直到其中某一个分支收到数据
select {
case <-ch1:
// 执行这一行代码
fmt.Println("from ch1")
case <-ch2:
// 这一行不会被执行
fmt.Println("from ch2")
}
}

select-case 的用法类似于 switch-case,也有一个 default 语句,在 select 里面

  • 如果 default 之前的 case 都不满足,则执行 default 块的代码。
  • 如果没有 default 语句,则会一直阻塞,直到某一个 case 上面的 chan 返回(有数据、或者 chan 被关闭都会返回)

当然,case 后面可以从 chan 读取数据,也可以往 chan 写数据,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan int)
// 往 nil chan 写入数据会阻塞
var ch2 chan int

// ch1 会先收到数据
go func() {
time.Sleep(time.Second)
ch1 <- 1
}()

// 会阻塞,直到其中一个 case 返回
select {
case <-ch1:
// 执行这一行代码
fmt.Println("from ch1")
case ch2 <- 1: // 永远不会满足,因为 ch2 是 nil
fmt.Println("from ch2")
}
}

select 的另外一种很常见的用法是,等待一个 chan 和一个定时器(实现控制超时的功能),比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan int)

// ch1 一秒后才收到数据
go func() {
time.Sleep(time.Second)
ch1 <- 1
}()

select {
case <-ch1:
fmt.Println("from ch1")
case <-time.After(time.Millisecond * 100):
// 执行如下代码,因为这个 case 在 100ms 后就返回了
fmt.Println("from ch2")
}
}

如果我们需要控制某些操作的超时时间,那么就可以在时间到了之后,做一些清理操作,然后终止一些工作,最后退出协程。

总结

  • go 里面通过 chan 来实现协程之间的通信,chan 大概就是一个协程给另一个协程发送信息的代理。
  • 多线程程序执行的时候,因为有 CPU 缓存,然后需要对同一块内存进行并发读写,可能会导致数据竞争的问题。
  • 在很多语言中,都提供了锁的机制,来保护一片内存同一时刻只能一个线程操作,比如 java 里面的 synchronized 关键字。
  • go 里面很多情况下,在不同协程之间通信都是使用 chan 来实现的。
  • 进程会有阻塞态、运行态,go 里面的协程也有阻塞的状态,当需要的资源得不到满足的时候就会陷入阻塞。比如等待别的协程往 chan 里面写入数据。
  • chan 的几种常见操作:make 创建、<-chan 读、chan<- 写、len 获取 chan 中未读取的元素个数、cap 获取 chan 的缓冲区容量。
  • chan 类型上不加 <- 表示是一个可读可写的 chan<-chan T 表示只读 chanchan<- T 表示只写 chan,双向的 chan 可以转换为只读或者只写 chan,但是反过来不行,只读 chan 和只写 chan 之间也不能相互转换。
  • 协程的阻塞跟不阻塞,很简单的判断方式就是,发送的时候就看有没有地方能接得住,接收的时候就看有没有数据可以拿,没有则陷入阻塞。
  • <- 是 go 语言在设计层面提供给开发者的一种语法糖,chan 底层是一个很复杂的结构体。
  • for...range 结构在遍历 chan 的时候不用判断返回值是否有效,因为返回值无效的时候会退出循环。
  • 我们可以通过 select 来同时等待多个 chan 的操作返回。