本文基于 go 1.19
哈希表作为一种非常常用的数据结构,存在于各种编程语言中,它可以让我们保存键值对 数据,而且有着非常高的查找效率。
本文就以 go 语言中的 map
为例子,讲述一下哈希表在 go
中的实现。
哈希表基本操作
在开始之前,需要大概讲解一下哈希表这种数据结构,哈希表会预先在内存中分配一段比较大的内存,这段内存用在将来往里面写入数据的时候使用。
哈希表有点类似数组,都是一段连续的内存,但是我们往哈希表读写数据的时候不同于数组,数组的时候是直接通过下标访问 的,
而哈希表的读写需要先计算 key
的哈希值,根据这个哈希值对哈希表长度取模得到 key
对应的哈希表的下标,然后对哈希表这个下标进行读写操作。
对于哈希表有以下几种常见的操作:
写入:根据 key
计算 hash
值,对哈希表长度取模得到 key
在内存中的地址,然后往这个地址写入数据。
读取:根据 key
计算 hash
值,对哈希表长度取模得到 key
在内存中的地址,然后读取这个地址中的数据。
修改:根据 key
计算 hash
值,对哈希表长度取模得到 key
在内存中的地址,然后修改这个内存地址里面的数据。
删除:根据 key
计算 hash
值,对哈希表长度取模得到 key
在内存中的地址,然后清空保存这个 key
的那一小块内存。
注意:计算出的 hash
值可能比分配的内存大小要大,所以才需要对其取模(hash
值 /
哈希表长度 => key
在哈希表的索引), 保证计算出的
hash
最终落到哈希表的内存范围之内。比如,keyn
计算出来的哈希值为 100
,但是我们的哈希表长度只有
8
, 那么 keyn
最终会落在哈希表中下标为
2
的地方(100 % 7 = 2
,下标是从 0
开始的,所以这里是 7
)
哈希表的写入
假设我们现在要有一个长度为 8 的哈希表(下图左),我们有数据
{"a": 1, "b": 2}
需要存入这个哈希表,存入之后的哈希表为下图右:
哈希表的写入
说明:hash(a) = 5
,计算 a
的哈希值得到
5
,所以将 a:1
存入了哈希表中下标为
5
的地址处。b
同理。
注意:键值都会存储。
哈希表的读取
从哈希表中读取 key
为 b
的键值对:
哈希表的读取
哈希表的修改
现在,我们需要将 a
的值修改为
3
,同样的,计算其 hash
值,得到
5
,然后将哈希表中 5
这个下标的内存修改成
3
:
哈希表的修改
哈希表的删除
假设要将哈希表中 b
这个键删除,会先计算其
hash
值,得到 0
,然后将哈希表中 0
这个下标的内存清空:
哈希表的删除
哈希表的高效之处
通过上面的分析,我们可以知道,哈希表的内存布局跟数组类似,但是哈希表的存储要通过计算
key
的哈希值来得到其在哈希表中的索引,最后对这个索引的内存进行 CRUD
操作。这样一来,如果我们需要频繁的根据键查找其对应值的话,使用哈希表无疑会大大提高效率。相比之下,如果使用数组来存储,那么每次搜索都需要将整个数组遍历一次,效率非常低下。
比如下图,假设我们一个数组内存布局如下,那么在我们查找
a:1
的时候,需要从数组的第一个元素开始遍历,每一个元素读取出来看看它的键是不是
a
, 取到第 6
个元素(下标
5
)的时候,发现它的键是 a
,然后取出对应的值
1
。
哈希表的高效之处
在数组的元素个数少的时候,这种查找效率其实影响不大,但如果我们有上万个元素的时候,每次查找都要从第一个元素开始遍历,效率无疑会非常低。
相比之下,不管数据再怎么多,使用哈希表的方式,我们直接通过哈希算法计算一下就可以知道键保存到了哪个槽中。
哈希冲突解决方法
在上一节中,我们的图将哈希表的每一个槽(slot
,又或者叫
cell
,都是同一个东西)都表示成只有一个元素了。
但在实际中,往往会出现计算出来的哈希值对哈希表长度取模后是相等的,也就是不同的
key
会落到同一个槽中(这就是
哈希冲突 ),
这种情况下,一个槽存放不下的话,有两种办法可以处理:开放地址法 和链表法 。go
里面的 map
使用的是链表法 ,
具体来说,就是在 hash
冲突的地方,建立一个链表来保存相同
hash
值的 key
。
这样一来,我们通过 hash
算法计算出哈希值的时候,并不能唯一确定对应的值了,因为有可能两个
key
经过哈希算法计算之后,得到的哈希值是一样的。
这种情况怎么办呢?很简单,因为虽然哈希值是一样的,但是它们的
key
是不一样的,再比较一下 key
就可以确定了。具体可以参考下图:
哈希冲突
有哈希冲突的情况下,读取哈希表数据的过程:
计算 c
的 hash
值,得到
0
,就是哈希表的索引
获取哈希表中地址 0
上的数据,这会遍历冲突产生的链表
比较 c
跟
b
,不相等,继续比较链表下一个元素
比较 c
跟 c
,相等,返回
3
哈希扩容
go 里面 map
扩容有两种方式:增量扩容 和等量扩容 。
哈希表总元素个数过多导致的扩容
这种扩容方式也叫增量扩容 。
我们在上面说过了,其实哈希表的内存布局跟数组类似,都是先分配一段连续的内存。然后在哈希冲突的时候,对于冲突的
key
建立一个链表来保存。
这样就会出现一种情况,在链表中存在很多冲突的键,这样一来,在查找冲突
key
的时候,需要在这一堆冲突的 key
中进行查找,这个查找类似数组的查找,效率较低。
为了避免这种情况的出现,一般的哈希表设计会在元素个数总数超过一定数量的时候,对哈希表进行扩容,
这样一来,那些哈希冲突的键就可以相对均匀地分布在哈希表中,从而避免了很多哈希冲突情况下导致的查找效率低下的问题。
扩容之后的容量为原来容量的 2 倍。
增量扩容
go map 的负载因子
在 go 中,map
在实际存储的元素数量超过 map
里 bucket
总数量的 6.5
倍的时候(也就是平均每个 bucket
中的元素个数大于
6.5
的时候),会进行扩容, 这个 6.5
是实现
map
的那个开发者经过实验计算出来的比较合适的数,这个
6.5
被称为负载因子。
为什么负载因子是 6.5
呢,在 go 的 map
源码中有相关的说明:
选择负载因子,如果太大,会有很多溢出桶,太小,则会浪费很多空间 。
我编写了一个简单的程序来检查不同负载的一些统计数据:(64位、8字节 key
和元素)
4.00
2.13
20.77
3.00
4.00
4.50
4.05
17.30
3.25
4.50
5.00
6.85
14.77
3.50
5.00
5.50
10.55
12.94
3.75
5.50
6.00
15.27
11.67
4.00
6.00
6.50
20.90
10.79
4.25
6.50
7.00
27.14
10.15
4.50
7.00
7.50
34.03
9.73
4.75
7.50
8.00
41.10
9.40
5.00
8.00
列说明:
%overflow
: 具有溢出桶的桶的百分比
bytes/entry
: 每个 key/elem 对使用的开销字节
hitprobe
: 查找当前 key 时要检查的条目数
missprobe
: 查找缺少的 key 时要检查的条目数
哈希冲突链上元素太少导致的扩容
这种扩容方式也叫等量扩容 。
我们知道,在哈希冲突的时候,会建立链表来保存键冲突的元素,但是我们删除那些哈希冲突的键的时候,并不会对删除元素的内存进行释放,
如果每次删除都释放的话,在我们频繁插入跟删除的时候,效率就非常低下了。
因为插入一个元素就分配内存,删除一个元素就释放内存(分配内存和释放内存都是相对耗时的操作)。
而这样的结果是,保存冲突键的链表上,有很多空的元素,这样就会导致冲突的时候,查找键的效率降低,因为要遍历很多空的键。
删除的时候不释放,那什么时候会释放呢?哈希冲突的元素很多都被删除的时候,在
go 里面,map
会判断就算没有超过负载因子的情况下,
如果冲突链表占用的空间过大的话,也会进行扩容。但这里说的扩容其实并不是真正意义上的扩容,只是
map
的实现里面,使用的函数是同一个函数。
具体实现方式是,分配跟原哈希表相同大小的空间,然后将旧哈希表的数据迁移到新的哈希表。
这样迁移之后,对于哈希冲突链表上的那些元素,只会迁移非空的元素,最终结果就是,扩容之后,哈希冲突链表上的元素更加紧凑,在查找冲突的键的时候会更加高效。
虽然哈希表的总容量没变,但是数据分布更加紧凑了,省去了遍历空元素的时间。
等量扩容
go map 概述
map
只是一个哈希表。数据被排列成一组
bucket
。
每个 bucket
最多包含 8
个
键/值
对。
哈希值的低位字节位用于选择 bucket
。
每个 bucket
包含每个哈希的几个高位字节位(tophash
),以区分单个桶中的条目。
如果超过 8
个 key
哈希到同一个桶,我们将额外的桶以链表的方式起来。(解决哈希冲突,链表法)
当哈希表扩容时,我们会分配一个两倍大的新 bucket
数组。然后 bucket
从旧 bucket
数组增量复制到新
bucket
数组。
map
迭代器遍历 bucket
数组,并按遍历顺序返回键(遍历完普通桶之后,遍历溢出桶)。
为了保持迭代语义,我们永远不会在它们的桶中移动键(bucket
内键的顺序在扩容的时候不变。如果改变了桶内键的相对顺序,键可能会返回 0
或 2 次)。
在扩容哈希表时,迭代器仍在旧的桶中迭代,并且必须检查新桶,检查正在迭代的
bucket
是否已经被迁移到新桶。
go map 的整体模型
上面讲了哈希表的基本设计思路,接下来就要开始讲 go 里面
map
的设计与实现了。大体上其实就是上面说的样子,但是有下面几个不一样的地方:
下文的 bucket
和 bmap
都是指的 "桶"。
go map
里面存储数据的地方是
bucket
(桶),一个 bucket
可以存储
8
个元素,也就是说哈希冲突的时候还是会在同一个
bucket
中先存储。
如果 bucket
也存放不下冲突的元素了,那么会新建另外一个桶(叫做溢出桶),旧的
bucket
记录这个新桶的指针,旧的 bucket
存放不下的元素,会存放到这个溢出桶中。
如果溢出桶还是存放不下,那么再新建一个溢出桶,链接到上一个溢出桶中。
也就是说,在 go 的 map
实现中,哈希计算出来的值决定了
key
应该存放在哪一个 bucket
中。
go map
的整体结构如下图:
go map 整体模型
buckets
记录了保存 map
数据的所有
bucket
(这种下文统一称为普通桶 ),go
中使用 bmap
这个结构体来表示
bucket
,溢出桶也是使用 bmap
结构体表示。
如果
bucket
(普通桶)哈希冲突太多导致存放不下,会新建一个
bucket
,在原来的 bucket
上会有一个指针记录新建
bucket
的地址,这个新 bucket
下文统一称为溢出桶 。
在创建 map
的时候,如果我们指定的容量比较大(B >= 4
的时候),那么会预创建一个溢出桶。
也就是说,go 中解决哈希冲突的链表法,链表上的每一个元素是一个
bucket
。go map
的实现里面,一个
bucket
可以存放 8
个键值对 。
上面的 bmap
的数据结构如下图:
bmap
bmap
就是
bucket
(桶),不管是普通的桶还是溢出的桶,都是使用
bmap
结构体表示。
bmap
中存储数据的方式有点特别,它先存储了
8
个 tophash
值,一个 tophash
的大小为 1 个字节,每一个 tophash
记录的是
bmap
中每一个元素的哈希值的最高的 8 bit。
接下来是 bmap
存储的 8 个元素的 key
,在 8
个 key
之后是 8 个 bmap
存储的值。我们会发现
key
和 value
的存储是分开的,而不是
key/value
、key/value
这种方式。go
中这种分开存储的方式有一个好处是可以减少内存对齐的开销,从而更省内存。
最后是 overflow
(溢出桶),如果 bmap
存满了,那就会新建一个溢出桶来保存新的数据,通过在旧的 bmap
上记录指针来记录溢出桶。
tophash
的作用是,在哈希冲突的时候,在
bucket
内进行查找的时候,是需要在 bucket
内从第一个元素遍历到最后一个元素来查找的。 如果 key
太大,直接比较 key
的话效率会比较低下,通过记录哈希值的高 8
位,我们就可以在 buckeet
内查找的时候,先比较哈希值的 前 8
位,这样一来,map
的效率受到 key
大小的影响就会比较小。当然哈希值的高 8
位有可能相同,在这种情况下,我们再比较一下 key
本身
就可以确定 bucket
的那个槽(slot
/cell
)是否是我们正在查找的那一个
key
。
go map 相关数据结构
我们大部分内容是跟下面两个结构体打交道:
hmap
: map
的数据结构,包含了
bucket
的指针、bucket
的数量、键值对的数量等信息。
bmap
: 桶,存储 key/value
的地方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 type hmap struct { count int flags uint8 B uint8 noverflow uint16 hash0 uint32 buckets unsafe.Pointer oldbuckets unsafe.Pointer nevacuate uintptr extra *mapextra } type mapextra struct { overflow *[]*bmap oldoverflow *[]*bmap nextOverflow *bmap } type bmap struct { tophash [bucketCnt]uint8 keys [bucketCnt]keytype values [bucketCnt]valuetype overflow *bmap }
对于 map
的数据结构,需要特别说明的是,bmap
的源码中实际只包含了 tophash
字段,而后面的三个字段
keys/values/overflow
都是在编译期间动态添加的。这是因为
map
中可能存储不同类型的键值对,所以键值对占据的内存空间大小只能在编译时进行推导。
这样一来,最终的结果是,我们在 map
的源码中,访问
key
和 value
的时候都需要通过
bmap
的首地址加上偏移量来进行访问的。
比如获取 bucket
中第 i
个 key
的方式:
1 k := add(unsafe.Pointer(b), dataOffset+i*uintptr (t.keysize))
这行代码中,add
是做指针加法运算的函数,具体来说就是第一个参数的地址加上第二个参数(偏移量),得到一个我们想要的指针。
dataOffset
代表了 bmap
的 keys
第一个元素的偏移量,i
代表了我们想要获取的 key
在 keys
中的索引:
bmap 结构
这样一来,我们就可以通过 k
这个指针来访问
bucket
中的 key
了。同样的,要访问
value
的方式也是类似的, 只要将
dataOffset + i * uintptr(t.keysize)
替换成
dataOffset + bucketCnt * uintptr(t.keysize)
即可。
这种方式虽然不太优雅,但是在性能上可以达到最优。
bmap(桶)源码剖析
bmap
就是保存键值对的地方,但是它本身的方法并不多:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func evacuated (b *bmap) bool { h := b.tophash[0 ] return h > emptyOne && h < minTopHash } func (b *bmap) overflow(t *maptype) *bmap { return *(**bmap)(add(unsafe.Pointer(b), uintptr (t.bucketsize)-goarch.PtrSize)) } func (b *bmap) setoverflow(t *maptype, ovf *bmap) { *(**bmap)(add(unsafe.Pointer(b), uintptr (t.bucketsize)-goarch.PtrSize)) = ovf } func (b *bmap) keys() unsafe.Pointer { return add(unsafe.Pointer(b), dataOffset) }
在 ev
中用到了两个常量,在 bucket
的
tophash
里面,会通过下面几个标志来记录桶里面槽的状态:
1 2 3 4 5 6 7 8 9 10 11 12 emptyRest = 0 emptyOne = 1 evacuatedX = 2 evacuatedY = 3 evacuatedEmpty = 4 minTopHash = 5
为了跟正常的 tophash
区分开来,如果计算出来的
tophash
小于 minTopHash
,会将计算出来的
tophash
加上 minTopHash
:
1 2 3 4 5 6 7 8 9 10 func tophash (hash uintptr ) uint8 { top := uint8 (hash >> (goarch.PtrSize*8 - 8 )) if top < minTopHash { top += minTopHash } return top }
这样一来,通过 tophash
这一个字节就可以记录桶里面槽的状态了,非常节省空间。
map 的创建的实现
我们已经了解了哈希表的基本工作机制了,现在就让我们来深入了解一下 go
里 map
的实现,先是 map
的创建,
map
的创建是通过 makemap()
函数实现的(对应我们写的代码是
make(map[int]int, 10)
),map
的创建步骤如下:
计算 map
所需内存,判断是否在一个合理范围之内。
使用 new
初始化 hmap
结构体。
生成随机哈希种子。
计算出一个最小的 B
,也就是根据用户传递给
make
的第二个参数算出一个最小的 B
的值,最终桶的数量为 2^B
个。
如果 B
大于 0
,则给哈希表的
buckets
分配内存。
最后,返回新创建好的 hmap
。
下文在寻址过程中,大量使用了指针的运算,所以如果对
unsafe.Pointer
比较熟悉的话,看起来会比较轻松,如果不熟悉也没关系,可以看看我另外一篇文章《深入理解
go unsafe》。
makemap 实现
makemap
具体源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func makemap (t *maptype, hint int , h *hmap) *hmap { mem, overflow := math.MulUintptr(uintptr (hint), t.bucket.size) if overflow || mem > maxAlloc { hint = 0 } if h == nil { h = new (hmap) } h.hash0 = fastrand() B := uint8 (0 ) for overLoadFactor(hint, B) { B++ } h.B = B if h.B != 0 { var nextOverflow *bmap h.buckets, nextOverflow = makeBucketArray(t, h.B, nil ) if nextOverflow != nil { h.extra = new (mapextra) h.extra.nextOverflow = nextOverflow } } return h }
overLoadFactor 实现
这里面有一个比较重要的函数,那就是
overLoadFactor
,这个函数用来判断某一个数量是否超过
map
的负载因子,如果超过,那就需要扩容了:
1 2 3 4 func overLoadFactor (count int , B uint8 ) bool { return count > bucketCnt && uintptr (count) > loadFactorNum*(bucketShift(B)/loadFactorDen) }
count > bucketCnt
,前半部分判断很简单,就是判断数量一个桶能不能放得下。一个桶就能装下所有数据的话,根本就不用计算了,肯定没超过负载因子。
后半部分判断翻译过来是:桶数量 * 负载因子(6.5) < 总键值对数量
,意味着平均每个桶存储的元素个数大于
6.5
了,也就是说超过了负载因子了。
在其他函数中,判断是否超过负载因子的时候都是使用上面这个函数 。
loadFactorNum
和 loadFactorDen
是预定义的变量,它们相除就是负载因子 6.5
。
bucketShift(B)
很简单,就是 2^B
。
makeBucketArray 实现
在创建 map
的时候,使用了 makeBucketArray
来给 map
的桶分配内存,makeBucketArray
实现步骤如下:
如果判断到
B >= 4
,也就是初始化时需要分配的桶数量大于等于
2^4 = 16
的时候,则会预先分配溢出桶,分配的溢出桶个数为
2^(b-4)
个。
接着,给 map
的桶(buckets
字段)分配内存(包含了普通桶和溢出桶,普通桶和溢出桶的内存一次性分配,溢出桶的内存在普通桶后面)。
最后,判断到需要分配溢出桶的话(B >= 4
),则将溢出桶的指针写入到最后一个普通桶的
overflow
字段。
分配 buckets
内存的两种情况:
如果 B < 4
,那么分配内存的过程很简单,就是分配
buckets
所需要的内存,也就是分配普通桶所需要的内存就足够了,如下图:
makeBucketArray 1
如果
B >= 4
,那么分配内存的过程就相对复杂,会预先分配一部分溢出桶。在后面需要创建溢出桶的时候,就会先使用这时候创建的溢出桶,而不是直接新建,如下图:
makeBucketArray 2
说明:
分配 buckets
所需要的内存的时候,会分配一部分溢出桶所需要的内存,普通桶和溢出桶的内存是连续的,分配给溢出桶的内存就在普通桶的后面。
在 makemap
中会新建 mapextra
结构体,用
nextOverflow
字段来保存溢出桶的指针,指向第一个溢出桶的位置。
最后一个溢出桶的 overflow
指针(指向溢出桶的指针),指向了 buckets
入口,这里并不是说将第一个普通桶作为最后一个溢出桶的溢出桶,而是一个标记作用。因为前面的溢出桶的
overflow
字段都是 nil
,而最后一个溢出桶的
overflow
不是
nil
,这样一来,我们通过判断溢出桶的 overflow
是否为 nil
就可以知道是否是最后一个溢出桶。如果是最后一个溢出桶,那么将
map
里面的 extra.nextOverflow
字段设置为
nil
,表示预分配的溢出桶用完了,后面如果再需要溢出桶的时候,就只能直接
new
一个了。
buckets
指针下面的普通桶和溢出桶所需要的内存大小都是
t.bucketsize
,也就是 bmap
所需要的内存大小(当然是内存对齐之后的)。
具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 func makeBucketArray (t *maptype, b uint8 , dirtyalloc unsafe.Pointer) (buckets unsafe.Pointer, nextOverflow *bmap) { base := bucketShift(b) nbuckets := base if b >= 4 { nbuckets += bucketShift(b - 4 ) sz := t.bucket.size * nbuckets up := roundupsize(sz) if up != sz { nbuckets = up / t.bucket.size } } if dirtyalloc == nil { buckets = newarray(t.bucket, int (nbuckets)) } else { buckets = dirtyalloc size := t.bucket.size * nbuckets if t.bucket.ptrdata != 0 { memclrHasPointers(buckets, size) } else { memclrNoHeapPointers(buckets, size) } } if base != nbuckets { nextOverflow = (*bmap)(add(buckets, base*uintptr (t.bucketsize))) last := (*bmap)(add(buckets, (nbuckets-1 )*uintptr (t.bucketsize))) last.setoverflow(t, (*bmap)(buckets)) } return buckets, nextOverflow }
map 定位 key 的实现
我们从上面的讲解中应该很清楚 map
中 bucket
这个数据结构了(上面的 bmap
那个图),在做查找、修改、删除操作的时候, 都需要先根据 key
找到具体的键值对保存在哪一个 bucket
以及
bucket
中的哪一个位置,所以这个操作其实是非常关键的。
在开始下文之前,就先来讲讲 map
中是如何定位一个
key
的。
其实定位的过程比较简单,假设现在要查找一个 key
,那定位
key
的大概步骤如下:
计算 hash
: 根据 key
计算出其哈希值
hash
。
计算 bucket
: 哈希值对 buckets
长度取模(hash % len(buckets)
),不过实际实现的时候使用了一种优化的方式,位运算(hash & (2^B - 1)
),也就是由哈希值的最低
B
位来决定 key
最终使用哪一个
bucket
(结果跟直接取模不一样,但是思想一样,都能保证得出的结果落在
len(buckets)
范围内)。
遍历 bucket
(先是普通桶)里面的每一个槽(有 8
个),比较哈希值的最高 8 位(8 bit,也就是
tophash
)是否相等,如果相等,则获取存储在
bucket
里面的 key
,跟我们需要定位的
key
做比较,如果相等,则说明已经找到了
key
,如果 key
不相等,则继续遍历下一个槽,直到
bucket
中所有的槽都被遍历完毕。
如果 bucket
里面的 8
个槽都遍历完了,仍然没有找到我们需要找的 key
。那么会从
bucket
的溢出桶去查找,溢出桶内的查找过程跟普通桶内的查找过程是一样的。
如此遍历,直到所有溢出桶都遍历完(在找不到的情况下才会遍历
bucket
(普通桶)所有的溢出桶)。
这个查找过程可以表示为下图:
key 定位
注意:确定一个 key
需要 tophash
和
key
都相等,如果 tophash
相等而
key
不相等,则需要继续比较 bucket
中其他的槽。
map 读取数据的实现
从 map
中读取某一个键的方法主要有三个:mapaccess1
、mapaccess2
、mapaccessK
,这三个方法的代码其实是大同小异的,所以这里只拿
mapaccessK
来讲解。
这三个方法的不同之处在于:
mapaccess1
只返回 key
对应的值,对应
v := map["k"]
这种写法。
mapaccess2
返回 key
对应的值,以及是否存在的标志,对应 v, ok := map["k"]
这种写法。
mapaccessK
用于遍历 map
的时候,返回
key
和 value
,对应
for k, v := range map {}
这种写法(在迭代的时候在
mapiternext
里面调用)。
mapaccessK
的查找步骤
mapaccessK
的查找步骤大概就是上一个图说的,只不过下面的描述更加详细一点(em...
有点重复了):
判断 map
是否为空,为空直接返回
计算 key
对应的哈希值 => hash
计算桶的掩码 => m
,hash & m
得到
bucket
的索引
判断是否正在扩容,如果是,需要判断 key
对应的桶的数据是否已经被迁移到新的桶里面:
如果是,则需要从新的桶里面查找。
如果还没有被迁移,则需要从旧桶中读取。
计算 tophash
=> top
(也就是
hash
的最高 8 位)
遍历找到的桶的每一个槽(slot/cell
)
比较 tophash
是否相等
如果不等,判断桶后面是否都没有数据了(b.tophash[i] == emptyRest
)
如果没有数据了,跳出循环 => 找不到 key
对应的值
如果还有数据,则继续遍历下一个槽
如果找到一个槽的 tophash
跟上面计算的
tophash
相等,则比较 key
是否相等
是,则返回对应的值(tophash
和 key
都相等,则表明找到了相应的
key
,返回对应的值 )。
否,继续遍历下一个槽(依然是先比较
tophash
,tophash
相等则再比较 key
是否相等)
溢出桶也找不到,则返回零值(及是否找到的标志)。
mapaccessK
的实现
对于整型的键值的 map
,go
里面有针对优化的实现,但其实代码逻辑上都是差不多的,所以不细说了。下面来看看
mapaccessK
的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 func mapaccessK (t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, unsafe.Pointer) { if h == nil || h.count == 0 { return nil , nil } hash := t.hasher(key, uintptr (h.hash0)) m := bucketMask(h.B) b := (*bmap)(add(h.buckets, (hash&m)*uintptr (t.bucketsize))) if c := h.oldbuckets; c != nil { if !h.sameSizeGrow() { m >>= 1 } oldb := (*bmap)(add(c, (hash&m)*uintptr (t.bucketsize))) if !evacuated(oldb) { b = oldb } } top := tophash(hash) bucketloop: for ; b != nil ; b = b.overflow(t) { for i := uintptr (0 ); i < bucketCnt; i++ { if b.tophash[i] != top { if b.tophash[i] == emptyRest { break bucketloop } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr (t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } if t.key.equal(key, k) { e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr (t.keysize)+i*uintptr (t.elemsize)) if t.indirectelem() { e = *((*unsafe.Pointer)(e)) } return k, e } } } return nil , nil }
这里需要注意的是,如果在扩容的过程中查找,会先判断数据是否已经被迁移到新桶,如果还没有被迁移,则需要从旧的桶中查找。
mapaccessK
关键代码
bucket
的定位代码
1 2 3 m := bucketMask(h.B) b := (*bmap)(add(h.buckets, (hash&m)*uintptr (t.bucketsize)))
bucket
索引(hash & m
)的定位方式如下:
bucket 定位1
因为 buckets
是指向 bmap
的指针数组,所以我们可以通过 buckets
加上
bucket
的索引,就可以定位到 bucket
的内存地址。 因为每一个 bmap
的大小是
t.bucketsize
,所以 bucket
的索引乘以
t.bucketsize
,也即
(hash&m)*uintptr(t.bucketsize)
,就是
bucket
的相对偏移量。 然后 buckets
的地址加上
bucket
的相对偏移量,就可以定位到 bucket
的内存地址。
bucket 定位2
我们需要注意的是,我们计算得到 bucket
的指针后,需要将其转换为 bmap
类型的指针,才能进行后续的操作。
然后 key
和 value
也是通过类似的指针运算来定位的。需要说明的是:
add
函数是做指针算术运算的函数,具体来说就是
add(a, b)
就是 a
地址加上 b
的偏移量,返回的是 unsafe.Pointer
类型的指针。
unsafe.Pointer(b)
是 bucket
的内存地址
dataOffset
是 bucket
中第一个
key
的地址偏移量,
t.keysize
是 key
的大小
t.elemsize
是 map
值的大小
bucketCnt
是 bucket
中槽的数量(8
个,预定义的常量)。
然后大家可以结合上面的 bmap
内存布局图来理解上面指针计算的代码。
1 2 3 4 k := add(unsafe.Pointer(b), dataOffset+i*uintptr (t.keysize)) e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr (t.keysize)+i*uintptr (t.elemsize))
b.tophash[i] == emptyRest
判断的理解
emptyRest
emptyRest
是一个比较特殊的标记,它表示
bucket
中的后续槽都是空的。 在 map
删除元素的时候,会判断后面还有没有元素,如果没有元素的话,就会将
b.tophash[i]
设置为 emptyRest
。
这样在查找的时候,就可以通过 b.tophash[i] == emptyRest
来判断后面的槽都是空的,就不需要继续遍历了。
indirectkey
和 indirectelem
的理解
我们发现上面读取 key
和 value
的时候有一个比较特别的操作:
1 2 3 4 5 6 if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } if t.indirectelem() { e = *((*unsafe.Pointer)(e)) }
相信不少读者第一次看到这几行代码的时候会跟我一样有点懵逼,从
maptype
的定义我们可以看出一点端倪:
1 2 3 4 5 6 func (mt *maptype) indirectkey() bool { return mt.flags&1 != 0 } func (mt *maptype) indirectelem() bool { return mt.flags&2 != 0 }
简单来说,就是 go 底层在有时候会将 key
和
value
保存为指针,而不是直接保存 key
和
value
本身。 这样一来,go 里面的 map
操作就需要根据 key
和 value
的类型来判断是否需要进行指针解引用,也就是取出实际的 key
和
value
。
map 写入和修改的设计与实现
在 go 中,map
的写入和修改都是通过
mapassign
函数来实现的,因为写入和修改本质上是同一个操作,都是找到对应的
key
,然后修改对应的值。
mapassign
的实现步骤
计算 key
的 hash
值。
如果 buckets
还没有初始化,则进行分配内存。
计算出 bucket
的索引。
如果正在扩容,则迁移当前即将要操作的
bucket
(也就是上一步计算出来的索引对应的
bucket
)。
计算 tophash
。
遍历 bucket
中的槽,记录下第一个空的槽的
tophash
索引指针、key
指针,value
指针。如果最后找不到 key
的时候,会插入到这里。
如果 bucket
的所有桶都找不到
key
,则判断是否需要扩容,需要的话就进行扩容,然后再执行 6
的操作。
另外一种情况,不需要扩容,而且 bucket
以及它的溢出桶也满了,则需要新建溢出桶来保存 key
最后,将 tophash/key/value
插入到需要
bucket
第一个空的槽。又或者如果已经存在,对
value
进行更新。
mapassign
图解
可以分两种情况:
普通桶和溢出桶都找不到 key
的情况下,将
key
插入桶中第一个空的槽
map_5_1
在 bucket
中找到了
key
,则会对其进行更新
map_5_2
对于 key
和 value
的存储,有以下两种情况:
直接保存在 bucket
中:
这样在我们需要修改 key
/value
的时候,通过
bucket
加上 索引 * keysize/valuesize
就可以得到对应键值存储的实际内存。
map_5_4
在 bucket
中保存的是 key/value
的内存地址(unsafe.Pointer
)类型
这样如果我们需要修改/读取实际的键值的时候,就需要先从
bucket
中获取这个指针,然后解引用得到实际存储键值的内存指针。
map_5_3
注意:在我们做如下运算的时候(假设 bucket
没有存储实际的
key
,而是存储了 key
的指针):
1 k := add(unsafe.Pointer(b), dataOffset+i*uintptr (t.keysize))
得到的结果是,指向 keys[i]
(也就是第 i
个
key
)的指针(unsafe.Pointer
类型)。 如果
key
保存在 bucket
中,通过这个指针我们就可以读写 key
了。
否则,表示这个指针指向的内存存储的只是一个指针,如果我们需要修改实际的
key
, 就需要通过 key
指针(A
)拿到这里存储的指针(B
),然后再通过
B
来修改实际的 key
。
map_5_5
对 value
的读写同理。
mapassign
源码剖析
扩容的操作后面会有解析,这一节就先不细说了。
这个函数的定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 func mapassign (t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { if h == nil { panic (plainError("assignment to entry in nil map" )) } if h.flags&hashWriting != 0 { fatal("concurrent map writes" ) } hash := t.hasher(key, uintptr (h.hash0)) h.flags ^= hashWriting if h.buckets == nil { h.buckets = newobject(t.bucket) } again: bucket := hash & bucketMask(h.B) if h.growing() { growWork(t, h, bucket) } b := (*bmap)(add(h.buckets, bucket*uintptr (t.bucketsize))) top := tophash(hash) var inserti *uint8 var insertk unsafe.Pointer var elem unsafe.Pointer bucketloop: for { for i := uintptr (0 ); i < bucketCnt; i++ { if b.tophash[i] != top { if isEmpty(b.tophash[i]) && inserti == nil { inserti = &b.tophash[i] insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr (t.keysize)) elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr (t.keysize)+i*uintptr (t.elemsize)) } if b.tophash[i] == emptyRest { break bucketloop } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr (t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } if !t.key.equal(key, k) { continue } if t.needkeyupdate() { typedmemmove(t.key, k, key) } elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr (t.keysize)+i*uintptr (t.elemsize)) goto done } ovf := b.overflow(t) if ovf == nil { break } b = ovf } if !h.growing() && (overLoadFactor(h.count+1 , h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) goto again } if inserti == nil { newb := h.newoverflow(t, b) inserti = &newb.tophash[0 ] insertk = add(unsafe.Pointer(newb), dataOffset) elem = add(insertk, bucketCnt*uintptr (t.keysize)) } if t.indirectkey() { kmem := newobject(t.key) *(*unsafe.Pointer)(insertk) = kmem insertk = kmem } if t.indirectelem() { vmem := newobject(t.elem) *(*unsafe.Pointer)(elem) = vmem } typedmemmove(t.key, insertk, key) *inserti = top h.count++ done: if h.flags&hashWriting == 0 { fatal("concurrent map writes" ) } h.flags &^= hashWriting if t.indirectelem() { elem = *((*unsafe.Pointer)(elem)) } return elem }
有几点要注意的:
go 中 map
是不允许并发读写的,如果有,直接报错。
这里面我们看到了有扩容的操作,在 go 中,map
的扩容发生在插入、修改和删除的时候,是一种渐进式扩容的方式,每次扩容会迁移两个
bucket
,详细的后面讲到扩容的时候会细说。
在 bucketloop
这个循环中,会记录下第一个空的槽,在找不到 key
的时候会进行插入操作。
如果找到,则返回保存值的地址的指针。如果没找到,则将
key
插入到上一步找到的空的槽中,如果也没有空的槽,则会新建溢出桶来保存新插入的
key
。
在这个函数中,会判断插入之后是否超过负载因子,又或者溢出桶是否太多,来决定是否扩容。
关于 key
匹配的过程,其实跟上面的 mapaccess
是一样的过程,先找普通桶,然后查找溢出桶。
map 删除 key 的实现
对于删除操作,其实有一些操作上面已经说过了,如如何定位一个
key
。所以下面的讲述会侧重讲解跟删除操作密切相关的操作。
map 删除 key 的步骤
定位 key
所在 bucket
,计算
tophash
。
遍历 bucket
的每一个槽,比较 tophash
以及
key
,普通桶中查找不到会继续查找溢出桶。
如果查找到 key
的话,会清空对应 key
在
bucket
内存中的 tophash
、key
和
value
。
如果后面的槽没有元素了,则设置 emptyRest
标记。这样在后续查找的时候就可以避免不必要的搜索了。
map 删除过程图解
删除的过程比较简单,定位 key
的过程上面有过详细的讲解了,这里只详细画图阐述一下
emptyRest
的标记设置:
map_6_1
map 删除源码剖析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 func mapdelete (t *maptype, h *hmap, key unsafe.Pointer) { if h == nil || h.count == 0 { if t.hashMightPanic() { t.hasher(key, 0 ) } return } if h.flags&hashWriting != 0 { fatal("concurrent map writes" ) } hash := t.hasher(key, uintptr (h.hash0)) h.flags ^= hashWriting bucket := hash & bucketMask(h.B) if h.growing() { growWork(t, h, bucket) } b := (*bmap)(add(h.buckets, bucket*uintptr (t.bucketsize))) bOrig := b top := tophash(hash) search: for ; b != nil ; b = b.overflow(t) { for i := uintptr (0 ); i < bucketCnt; i++ { if b.tophash[i] != top { if b.tophash[i] == emptyRest { break search } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr (t.keysize)) k2 := k if t.indirectkey() { k2 = *((*unsafe.Pointer)(k2)) } if !t.key.equal(key, k2) { continue } if t.indirectkey() { *(*unsafe.Pointer)(k) = nil } else if t.key.ptrdata != 0 { memclrHasPointers(k, t.key.size) } e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr (t.keysize)+i*uintptr (t.elemsize)) if t.indirectelem() { *(*unsafe.Pointer)(e) = nil } else if t.elem.ptrdata != 0 { memclrHasPointers(e, t.elem.size) } else { memclrNoHeapPointers(e, t.elem.size) } b.tophash[i] = emptyOne if i == bucketCnt-1 { if b.overflow(t) != nil && b.overflow(t).tophash[0 ] != emptyRest { goto notLast } } else { if b.tophash[i+1 ] != emptyRest { goto notLast } } for { b.tophash[i] = emptyRest if i == 0 { if b == bOrig { break } c := b for b = bOrig; b.overflow(t) != c; b = b.overflow(t) { } i = bucketCnt - 1 } else { i-- } if b.tophash[i] != emptyOne { break } } notLast: h.count-- if h.count == 0 { h.hash0 = fastrand() } break search } } if h.flags&hashWriting == 0 { fatal("concurrent map writes" ) } h.flags &^= hashWriting }
注意:
go map
不允许并发写,所以如果发现有并发读写,则抛出
fatal 错误。
如果删除的是最后一个元素,则需要往前遍历,将每一个空的槽设置为
emptyRest
状态。
如果是
indirectkey
、indirectelem
,在删除的时候,只会将
bucket
中的指针置为 nil
,对于实际的
key
和 value
不会进行处理。(无所谓,GC
会出手)。
map 的扩容实现
从上面的讲解中,我们知道,底层存储 key/value
的是
bucket
,而 bucekt
的大小是一段有一定大小的连续内存。
如果我们插入的元素过多,我们初始化时分配的 bucket
的内存就会放不下了,这个时候 go 的 map
会有两种方式解决这个问题:
使用溢出桶(在 bmap
的上再链接一个
bmap
,也就是溢出桶,普通桶放不下的时候,就放溢出桶中)
分配新的更大的空间来存放现有的这些键值对。在 go
里面新分配的内存空间将会是原来的 2 倍。
map 扩容的条件
map
的扩容发生在插入或者修改或者删除 key
的时候,扩容的条件在 mapassign
中写了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 if !h.growing() && (overLoadFactor(h.count+1 , h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) } func overLoadFactor (count int , B uint8 ) bool { return count > bucketCnt && uintptr (count) > loadFactorNum*(bucketShift(B)/loadFactorDen) } func tooManyOverflowBuckets (noverflow uint16 , B uint8 ) bool { if B > 15 { B = 15 } return noverflow >= uint16 (1 )<<(B&15 ) }
hashGrow 实现
hashGrow
是被用来分配新的内存空间的,新的内存空间将被用来保存旧的
buckets
。需要注意的是,这个函数里面并没有做数据迁移的操作。
go 的 map
扩容的时候,数据迁移的方式是渐进式扩容,在我们插入/修改/删除
key
的时候会迁移 2 个
bucket
,这样可以避免性能的瞬时抖动。 我们熟知的
redis
的扩容过程也是渐进式扩容的。
下面是 hashGrow
的实现源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 func hashGrow (t *maptype, h *hmap) { bigger := uint8 (1 ) if !overLoadFactor(h.count+1 , h.B) { bigger = 0 h.flags |= sameSizeGrow } oldbuckets := h.buckets newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil ) flags := h.flags &^ (iterator | oldIterator) if h.flags&iterator != 0 { flags |= oldIterator } h.B += bigger h.flags = flags h.oldbuckets = oldbuckets h.buckets = newbuckets h.nevacuate = 0 h.noverflow = 0 if h.extra != nil && h.extra.overflow != nil { if h.extra.oldoverflow != nil { throw("oldoverflow is not nil" ) } h.extra.oldoverflow = h.extra.overflow h.extra.overflow = nil } if nextOverflow != nil { if h.extra == nil { h.extra = new (mapextra) } h.extra.nextOverflow = nextOverflow } }
数据迁移的两条线
go map
在扩容的时候,数据迁移会有两条线进行:
从第一个 bucket
开始迁移。
插入、修改、删除的时候,key
的哈希值定位到的
bucket
会被迁移。
具体如下图:
map_7_1
如果已经被迁移,则不再需要迁移。
这样一来就可以保证在一定的操作次数之后,全部 bucket
都被迁移。就算你每次插入、修改、删除都是同一个
key
(也就是同一个 bucket
),
我们第一条线的迁移都会在每次写操作的时候,迁移一个
bucket
。这样无论如何,写操作到了一定次数之后,所有的
bucket
都会被迁移了。
什么时候 bucket
迁移之后下标会改变?
当然这里说的是增量扩容,如果是等量扩容,bucket
的下标不会改变。
先说答案:hash & m
的最高位是 1 的时候。
我们知道,在定位 bucket
的时候是通过
hash & m
的方式来定位 bucket
的索引的(具体可以看上面定位 key
的那一节), 而 2
倍扩容之后,bucket
的长度是原来的 2
倍,转换为二进制的时候,就是在原来的基础上多了一位,所以
hash & m
的结果就会多一位, 而最高的那个二进制位如果是
1,说明 hash & m
的结果是大于新的 bucket
数组长度的一半的(也就是比原来的索引都要大 )。
那么会最高位是 1 会比原来的索引会大多少呢?答案是 bucket
数组的长度的一半(也就是 2^(B-1)
):
map_7_2
我们可以再看看之前的那个计算 bucket
索引的图:
map_7_3
我们会发现,当 B = 3
的时候,不管 hash
是什么,hash & m
的结果都是 0~7
。 而只有当
B = 4
的时候,hash & m
的结果才有可能落入
8~15
范围内,而且只有 hash & m
的最高位是
1 的时候才有可能。
所以结论是,当 hash & m
的最高位是 1
的时候,bucket
的下标就会改变。 而
hash % m
的其他位跟之前是一样的,所以下标增加的范围其实就是
2^(B - 1)
,也就是旧的 buckets
的个数。
bucket 迁移图解
map_7_4
说明:
oldbuckets
是迁移前的桶,buckets
是迁移后的桶(也就是当前的 h.buckets
)。
hash & m
为 0xxx
的时候,会迁移到
x
这个 bucket
中。
hash & m
为 1xxx
的时候,会迁移到
x + 2^(B-1)
这个 bucket
中(也就是
y
中),因为 1000
就是
2^(4 - 1)
。
假设需要迁移的是 oldbucket
,下标为 3
,那么
oldbucket
里面的 key
可能迁移的位置只可能是右边的 x
和 y
指向的
bucket
。 这取决于 oldbucket
里面的
key
哈希值的倒数第 4
位; 如果是
0
,那么就迁移到 x
指向的
bucket
,如果是 1
,那么就迁移到 y
指向的 bucket
。
oldbucket
中所有的 key
只会迁移到
x
或 y
中。同时 x
和
y
也只可能有 oldbucket
的
key
,不可能有其他旧的 bucket
的
key
。这是由 hash & m
可以推断出来的。
bucket 迁移源码剖析
开始之前,我们要记住 x
和 y
是怎么来的。
在开始看代码之前,我们需要明确一点:虽然整个哈希表是渐进式迁移的,但是单个
bucket
的迁移不是渐进式的。
我们先看看 growWork
函数:
这是迁移桶的函数,一次会迁移两个桶。不过实际上并不是严格的两个,因为迁移的函数会先判断桶是否已经被迁移,
如果桶还没有被迁移,才会进行迁移,如果桶已经被迁移则不做任何操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 func growWork (t *maptype, h *hmap, bucket uintptr ) { evacuate(t, h, bucket&h.oldbucketmask()) if h.growing() { evacuate(t, h, h.nevacuate) } }
然后看看 evacuate
函数:
这个就是实际做迁移操作的函数。它会根据 hash & m
的倒数第 B
位是否为 1 来决定将 key
迁移到
h.buckets
的前半部分还是后半部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 type evacDst struct { b *bmap i int k unsafe.Pointer e unsafe.Pointer } func evacuate (t *maptype, h *hmap, oldbucket uintptr ) { b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr (t.bucketsize))) newbit := h.noldbuckets() if !evacuated(b) { var xy [2 ]evacDst x := &xy[0 ] x.b = (*bmap)(add(h.buckets, oldbucket*uintptr (t.bucketsize))) x.k = add(unsafe.Pointer(x.b), dataOffset) x.e = add(x.k, bucketCnt*uintptr (t.keysize)) if !h.sameSizeGrow() { y := &xy[1 ] y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr (t.bucketsize))) y.k = add(unsafe.Pointer(y.b), dataOffset) y.e = add(y.k, bucketCnt*uintptr (t.keysize)) } for ; b != nil ; b = b.overflow(t) { k := add(unsafe.Pointer(b), dataOffset) e := add(k, bucketCnt*uintptr (t.keysize)) for i := 0 ; i < bucketCnt; i, k, e = i+1 , add(k, uintptr (t.keysize)), add(e, uintptr (t.elemsize)) { top := b.tophash[i] if isEmpty(top) { b.tophash[i] = evacuatedEmpty continue } if top < minTopHash { throw("bad map state" ) } k2 := k if t.indirectkey() { k2 = *((*unsafe.Pointer)(k2)) } var useY uint8 if !h.sameSizeGrow() { hash := t.hasher(k2, uintptr (h.hash0)) if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) { useY = top & 1 top = tophash(hash) } else { if hash&newbit != 0 { useY = 1 } } } if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY { throw("bad evacuatedN" ) } b.tophash[i] = evacuatedX + useY dst := &xy[useY] if dst.i == bucketCnt { dst.b = h.newoverflow(t, dst.b) dst.i = 0 dst.k = add(unsafe.Pointer(dst.b), dataOffset) dst.e = add(dst.k, bucketCnt*uintptr (t.keysize)) } dst.b.tophash[dst.i&(bucketCnt-1 )] = top if t.indirectkey() { *(*unsafe.Pointer)(dst.k) = k2 } else { typedmemmove(t.key, dst.k, k) } if t.indirectelem() { *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e) } else { typedmemmove(t.elem, dst.e, e) } dst.i++ dst.k = add(dst.k, uintptr (t.keysize)) dst.e = add(dst.e, uintptr (t.elemsize)) } } if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 { b := add(h.oldbuckets, oldbucket*uintptr (t.bucketsize)) ptr := add(b, dataOffset) n := uintptr (t.bucketsize) - dataOffset memclrHasPointers(ptr, n) } } if oldbucket == h.nevacuate { advanceEvacuationMark(h, t, newbit) } }
advanceEvacuationMark
的作用是更新
nevacuate
字段,表示已经迁移了多少个桶。
我们上面说了,哈希表扩容的时候,会有两条线,advanceEvacuationMark
就是处理顺序迁移的那条线,让 nevacuate
指向下一个未迁移的桶。
为什么需要做这个处理呢?这是因为另外一条线的迁移是随机的,访问到哪个桶就迁移哪个桶,这就导致了,顺序迁移的那条线,在将
nevacuate
指向下一个桶的时候,其实下一个桶是已经迁移了的,我们下次顺序迁移的时候肯定不需要迁移这个桶。
那么解决办法就是,继续向后查找,找到第一个未迁移的桶。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func advanceEvacuationMark (h *hmap, t *maptype, newbit uintptr ) { h.nevacuate++ stop := h.nevacuate + 1024 if stop > newbit { stop = newbit } for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) { h.nevacuate++ } if h.nevacuate == newbit { h.oldbuckets = nil if h.extra != nil { h.extra.oldoverflow = nil } h.flags &^= sameSizeGrow } }
这里需要注意的是下面几行代码:
1 2 3 4 stop := h.nevacuate + 1024 if stop > newbit { stop = newbit }
我们在将 nevacuate
加上 1 之后,还会继续往后遍历 1024 个
bucket,如果 bucket 已迁移,则将 nevacuate
加 1。
如果没有这个操作会怎样?那就意味着可能有很多的 bucket
都已经迁移了,但是顺序迁移的位置(nevacuate
)还没有更新,
这样可能会导致顺序迁移的位置每次都指向了已迁移的 bucket
。
最终导致,迁移的时候,只迁移了访问到的
bucket
,而没有迁移顺序迁移位置上的那个
bucket
。
也就是说,每次迁移的时候只会迁移 1 个
bucket
,而不是 2 个,这样一来 growWork
需要调用的次数比原来更多,也就是说,需要写操作次数更多才能完成全部
bucket
的迁移 。
这个函数做的事情可以表示成下图:
map_7_5
在这个图中,nevacuate
一开始是 1
,然后因为
3
这个 bucket
在这次 growWork
中已经迁移了, nevacuate
如果要指向下一个未迁移的
bucket
的话,就要跳过之前已经迁移的
2
,以及本次 growWork
中已经迁移的
3
, 所以最终 nevacuate
指向了
4
,也就是下一个未迁移的 bucket
。
等量扩容的效果
在上面我们讲解的时候,其实已经假设了扩容是增量扩容(2
倍扩容),但实际上还有一种扩容方式,就是等量扩容。
等量扩容的时候,扩容前后的 bucket
其实数量是一样的,那么为什么还要进行扩容呢?
这是因为,溢出桶太多了,数据非常零散地分布在了很多的溢出桶中,这样会导致
bucket
中很多槽都是空的,
这样一来,进行查找、修改、删除的时候,需要遍历很多的溢出桶,这样会导致性能下降。如下图:
map_7_6
这个图中,我们假设要查找的 key
所在的普通桶以及前两个溢出桶都是空的,又或者 key
不在前面三个桶中,那只有遍历到最后一个溢出桶的时候才能找到我们要查找的
key
。为了针对这种键值对数量没有达到扩容的阈值,但是溢出桶太多的情况,Go
语言提供了等量扩容的方式。
在等量扩容的时候,会将所有的溢出桶都迁移到新的 bucket
中,这样一来,bucket
中的槽就会被填满,而溢出桶也可能不再需要。
最后,针对上图的情况,key = foo
会被迁移到普通桶中,这样在查找的时候,只需要遍历普通桶就可以找到了。
当然,实际中的情况是,对于零散分布在多个溢出桶中的键值对,会被逐个往前挪,最终效果就是,桶中没有空的槽,除了最后一个
key
以后的槽。
大家可以结合下图想象一下:
当然下图只是描述了一下部分 key
,如果 key
分布。实际上 key
在触发等量扩容的情况下,是零散地分布在不同的 bucket
中的(包括溢出桶)。
map_7_7
map 的迭代实现
go 的 map
迭代的时候,我们会发现,返回结果的顺序并不固定,这是因为
map
的迭代是无序的。 在 map
遍历的时候,每次都会从一个随机的 bucket
开始遍历,而且选了一个 bucket
之后, 还会从
bucket
中随机选择一个槽开始遍历,这样一来,每次遍历的结果都是不一样的。
map 迭代器数据结构
迭代器的功能:记录要遍历的 map
,以及当前遍历到的
bucket
以及槽的索引,以便进行下一次遍历。
go 的 map
迭代器的数据结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 type hiter struct { key unsafe.Pointer elem unsafe.Pointer t *maptype h *hmap buckets unsafe.Pointer bptr *bmap overflow *[]*bmap oldoverflow *[]*bmap startBucket uintptr offset uint8 wrapped bool B uint8 i uint8 bucket uintptr checkBucket uintptr }
几点注意的:
全部键值对遍历完的时候,key
会被置为
nil
,这样一来,我们就可以通过 key
是否为
nil
来判断是否遍历完了。(当然这个不用开发者来判断,for...range
底层已经帮我们做了这个判断)。
hiter
结构体保存了当前正在迭代的
bucket
(bptr
)、bucket
中的
key
的索引(i
)等信息,这样一来,我们就可以通过这些信息来确定下一个
key
的位置。
迭代器的初始化实现
go 的 map
初始化是通过 runtime.mapiterinit
来实现的,这个函数的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 func mapiterinit (t *maptype, h *hmap, it *hiter) { it.t = t if h == nil || h.count == 0 { return } if unsafe.Sizeof(hiter{})/goarch.PtrSize != 12 { throw("hash_iter size incorrect" ) } it.h = h it.B = h.B it.buckets = h.buckets if t.bucket.ptrdata == 0 { h.createOverflow() it.overflow = h.extra.overflow it.oldoverflow = h.extra.oldoverflow } var r uintptr if h.B > 31 -bucketCntBits { r = uintptr (fastrand64()) } else { r = uintptr (fastrand()) } it.startBucket = r & bucketMask(h.B) it.offset = uint8 (r >> h.B & (bucketCnt - 1 )) it.bucket = it.startBucket if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator { atomic.Or8(&h.flags, iterator|oldIterator) } mapiternext(it) }
这个函数主要功能是初始化迭代器,我们最需要关心的是,这个函数里面会生成一个随机数,然后通过这个随机数来决定从哪一个
bucket
开始遍历, 以及从 bucket
中的哪一个槽开始遍历(也是随机的)。
map 遍历图解
那为什么从随机定位的 bucket
以及随机定位的
key
就可以实现遍历呢?其实很简单,如果看过我之前写的 《go
chan 设计与实现》的话, 就会知道 chan
的实现中是通过数组来实现环形队列的。而我们可以借助环形队列的特性来理解
map
的遍历。遍历到最后一个 bucket
之后,
下一个 bucket
就是第一个
bucket
,这样就实现了环形遍历。同样的,遍历到最后一个
key
之后,下一个 key
就是第一个
key
,这样也实现了环形遍历。
我们需要做的就是,在遍历开始的时候,记录第一个
bucket
,然后每次遍历 bucket
的时候,
比较当前的 bucket
是否是第一个
bucket
,是的话,意味着遍历结束了。 同样的,对于
bucket
内 key
的遍历也是。
不过,在实际实现中,key
有点不一样,key
的遍历是通过将 i
从 0
遍历到
7
,对于每一个 i
,加上
it.offset
,然后对 8
取模,
这样就可以实现环形遍历了,代码如下:
1 2 3 4 for ; i < bucketCnt; i++ { offi := (i + it.offset) & (bucketCnt - 1 ) }
具体遍历过程如下图:
map_8_1
发生在扩容期间的遍历
go 的 map
设计中,是不允许在迭代的时候进行插入、修改、删除的,但只是在获取下一个键值对的时候不允许,
在迭代器获取了键值对之后,就算还没有全部遍历完 map
的所有元素,还是可以允许做插入、修改、删除操作的。
这样一来,有可能会出现一些奇怪的现象,比如插入的 key
遍历不出来,或者遍历出来的 key
有重复等等。这是需要开发者注意的地方。
另外,迭代可以发生在扩容的过程中,但是扩容其实对迭代其实是没有什么影响的。因为迭代的时候会做一些判断尽量保证所有
key
都能被遍历到。 但不能保证我们对 map
做了写操作后依然可以全部 key
都遍历。
在遍历的过程中,如果 map
发生了扩容,那么遍历的过程就会变得复杂一些。因为在扩容的过程中,map
会新建一个 bucket
, 然后将原来的 bucket
中的
key
重新散列到新的 bucket
中。所以在遍历的过程中,如果发现当前的 bucket
已经发生了扩容, 需要做一些判断,比如:
如果发现 bucket
还没有迁移,则从
oldbuckets
中遍历。
如果发现 bucket
在迁移之后索引跟原来的不一样,则跳过。
具体可以参考下图:
map_8_2
这里假设的条件是:旧桶个数为 4
,增量扩容后,新桶个数为
8
。hiter
当前迭代的是新桶。
说明:
h.oldbuckets
指向了还没迁移完的桶,h.buckets
是当前的桶。
hiter
迭代器要迭代的是新的桶。迭代器初始化的时候正在进行 2 倍扩容。
checkBucket
是下一个要遍历的桶(索引为
1
),图中的情况是,这个桶还没有被迁移。所以需要从
h.oldbuckets
中读取。
checkBucket
中的 key
有可能会迁移到
h.buckets
中的 1
或者 5
位置。(具体可以看上面桶迁移的实现那一节)
如果 key
是要被迁移到 5
中的话,那么遍历的时候会跳过,因为后面会遍历到 5
中的
key
。
对于第 5 点,在遍历 5
这个 bucket
的时候,由于我们是使用当前遍历的 bucket
的下标结合旧桶的长度计算在旧桶中的下标的,所以还是可以取得到旧桶,然后遍历的时候取出那些应该迁移到
5
这个 bucket
的
key
,对于那些应该要迁移到 1
的
key
则跳过。
下一个要遍历的桶的索引为 2
。
对于第 6 点桶索引计算的特别说明,如果是增量扩容,计算
bucket
的下标方式如下:
1 2 3 oldbucket := bucket & it.h.oldbucketmask()
当然如果这个桶已经迁移,那么还是会从新桶遍历(也就是
bucket & it.h.oldbucketmask()
里的 bucket
本身)。
键值对遍历源码剖析
map
中实现遍历的函数是
mapiternext
,这个函数做的事情,也就是上面两个图描述的遍历过程,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 func mapiternext (it *hiter) { h := it.h if h.flags&hashWriting != 0 { fatal("concurrent map iteration and map write" ) } t := it.t bucket := it.bucket b := it.bptr i := it.i checkBucket := it.checkBucket next: if b == nil { if bucket == it.startBucket && it.wrapped { it.key = nil it.elem = nil return } if h.growing() && it.B == h.B { oldbucket := bucket & it.h.oldbucketmask() b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr (t.bucketsize))) if !evacuated(b) { checkBucket = bucket } else { b = (*bmap)(add(it.buckets, bucket*uintptr (t.bucketsize))) checkBucket = noCheck } } else { b = (*bmap)(add(it.buckets, bucket*uintptr (t.bucketsize))) checkBucket = noCheck } bucket++ if bucket == bucketShift(it.B) { bucket = 0 it.wrapped = true } i = 0 } for ; i < bucketCnt; i++ { offi := (i + it.offset) & (bucketCnt - 1 ) if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty { continue } k := add(unsafe.Pointer(b), dataOffset+uintptr (offi)*uintptr (t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr (t.keysize)+uintptr (offi)*uintptr (t.elemsize)) if checkBucket != noCheck && !h.sameSizeGrow() { if t.reflexivekey() || t.key.equal(k, k) { hash := t.hasher(k, uintptr (h.hash0)) if hash&bucketMask(it.B) != checkBucket { continue } } else { if checkBucket>>(it.B-1 ) != uintptr (b.tophash[offi]&1 ) { continue } } } if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) || !(t.reflexivekey() || t.key.equal(k, k)) { it.key = k if t.indirectelem() { e = *((*unsafe.Pointer)(e)) } it.elem = e } else { rk, re := mapaccessK(t, h, k) if rk == nil { continue } it.key = rk it.elem = re } it.bucket = bucket if it.bptr != b { it.bptr = b } it.i = i + 1 it.checkBucket = checkBucket return } b = b.overflow(t) i = 0 goto next }
小结
哈希表相比数组,可以很快速地查找,所以常见的编程语言都会有对应的实现。
哈希冲突有两种解决方法:开放地址法 和链表法 ,go
里的 map
使用的是链表法 。
哈希表初始化分配的是比较小的内存,只能存放少量键值对,但是随着我们插入的数据越来越多,哈希表会进行扩容,防止哈希表的读写性能下降。
go 的 map
扩容有两种方式:键值对太多的时候会进行 2
倍扩容,溢出桶太多会进行等量扩容。
map
中使用 buckets
来存储桶,一个桶里面可以存储 8
个键值对,一个桶满了的时候,会创建溢出桶来保存多出来的
key
。
map
中桶的结构体是 bmap
,它里面的会将所有
key
连续存储,所有的 value
也会连续存储。
map
定位 key
的时候,会使用哈希值与
B
的掩码做 &
运算(我们可以将其理解为一种模运算的另外一种实现),从而得到
bucket
的下标,然后遍历这个 bucket
中的每一个槽。先比较 tophash
,如果 tophash
相等再比较 key
,如果 tophash
和
key
都相等,则表明找到了我们要找的
key
。如果这两者有一个不等,继续比较下一个
key
。
如果一个 bucket
中的所有 key
被遍历完了也没有找到,那么继续从溢出桶中查找。
map
读取数据是通过
mapaccess1
、mapaccess2
、mapaccessK
实现的,对于整型键值的 map
有优化的 mapaccess
实现(对于 bmap
里键值的访问更加高效)。
map
写入和修改数据都是通过 mapassign
函数实现的,这个函数在找不到 key
的时候会进行插入操作。
map
删除数据是通过 mapdelete
函数实现的。删除的时候会需要将 bucket
末尾的所有空的槽的标记更新为 emptyRest
。
map
扩容的条件有两个:超过负载因子、溢出桶太多。只有在增量扩容的时候,key
所对应的 bucket
的下标才有可能发生变化。
map
扩容的时候,会迁移当前正在写入、删除的
bucket
,同时也会从第一个 bucket
开始迁移,一次写操作会迁移两个
bucket
。这样可以保证在一定的写操作以后,所有
bucket
都能迁移完成。
等量扩容的时候,key
在新桶中的 bucket
下标不变,但是 key
在桶内的分布会更加地紧凑,从而会提高查找效率。
map
的迭代是通过 hiter
结构体来实现的,迭代的过程中 hiter
会记录当前的
bucket
、key
,普通桶迭代完后,迭代溢出桶。map
的迭代是通过 mapiternext
函数实现的,每次获取键值对都是通过这个 mapiternext
函数。
map
迭代如果发生在增量扩容的时候,对于未迁移的
bucket
,会判断 key
的 bucket
是否会发生变化,如果 key
对应的 bucket
已经改变,则迭代的时候会跳过。