0%

go sync.Pool 设计与实现

本文基于 Go 1.19

Pool 是一组可以安全在多个 goroutine 间共享的临时对象的集合。 存储在 Pool 中的任何项目都可能在任何时候被移除,因此 Pool 不适合保存那些有状态的对象,如数据库连接、TCP 连接等。 Pool 的目的是缓存已分配但未使用的项以供以后使用,从而减少垃圾收集器的压力。 也就是说,它可以轻松构建高效、线程安全的空闲列表,但是,它并不适用于所有空闲列表。

使用实例

下面以几个实际的例子来说明 Pool 的一些使用场景。 下面是两个非常典型的使用场景,但是在实际使用中,对于那些需要频繁创建和销毁的对象的场景,我们都可以考虑使用 Pool

gin 里面 Context 对象使用 Pool 保存

ginEngine 结构体中的 ServeHTTP 方法中,可以看到 Context 对象是从 Pool 中获取的。 然后在处理完请求之后,将 Context 对象放回 Pool 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// 从 Pool 中获取 Context
c := engine.pool.Get().(*Context)
// 重置 writermem
c.writermem.reset(w)
// 重置 Request
c.Request = req
// 重置其他字段
c.reset()

engine.handleHTTPRequest(c)

// 将 Context 对象放回 Pool
engine.pool.Put(c)
}

我们可以去看看 ginContext 对象的定义,我们会发现,它其中有很多字段。 设想一下,如果每一个请求都创建一个 Context 对象,那么每一个请求都要对 Context 进行内存分配, 分配了之后,如果请求结束了,这些申请的内存在后续就要被回收掉(当然,不是马上就回收)。

这样一来,如果待回收的 Context 对象很多,那么垃圾回收器就会被压力很大。

同样的做法在 echo 这个框架中也有出现。

fmt 里面的 pp 结构体

在我们使用 fmt 包来打印的时候(比如,调用 fmt.Fprintf),其实底层是要使用一个名为 pp 的结构体来进行打印的。 如果我们的系统中需要大量地使用 fmt 库来做格式化字符串的操作,如果每次进行格式化操作的时候都 new 一个 pp 对象, 那么也会在某个时刻导致垃圾回收器的压力很大。

所以,fmt 包中使用 pp 的时候,都是从 Pool 中获取的:

1
2
3
4
5
6
7
8
9
10
// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() *pp {
// 从 Pool 中获取 pp 对象
p := ppFree.Get().(*pp)
p.panicking = false
p.erroring = false
p.wrapErrs = false
p.fmt.init(&p.buf)
return p
}

上面这个方法是获取 pp 对象的方法。我们在这里没有看到重置 pp 字段的代码,因为这些操作在 ppfree 方法中了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// free saves used pp structs in ppFree; avoids an allocation per invocation.
func (p *pp) free() {
if cap(p.buf) > 64<<10 {
return
}

// 重置 p.buf
p.buf = p.buf[:0]
p.arg = nil
p.value = reflect.Value{}
p.wrappedErr = nil
// 将 pp 放回 Pool 中
ppFree.Put(p)
}

我们不能对 sync.Pool 有任何假设,比如,不要想着 Put 进去的对象带了一个状态,然后 Get 出来的时候就能拿到这个状态。 因为这个对象可能在任何时候被清除掉。

从上面这个例子中,我们可以看到 p.buf = p.buf[:0] 这一行对 buf 进行了重置, 这给我我们的启示是,我们在使用 sync.Pool 的时候,如果我们存取的对象会带有一些不同的字段值, 那么我们可能需要对这些字段进行重置后再使用,这样就可以避免 Get 到的对象带有之前的一些状态, 不过重置这些字段的开销相比分配新的内存以及后续 GC,其实开销可以忽略不计。

Pool 整体模型

Pool 本质上是一个双端队列,这个队列支持以下操作:

  • pushHead:将一个对象放到队列的头部,这也是唯一的入队操作。
  • popHead:从队列的头部取出一个对象。如果取不到则使用 New 方法创建一个新的对象。
  • popTail:从队列的尾部取出一个对象。如果取不到则使用 New 方法创建一个新的对象。

可以简单地用下图表示:

pool_1

在阅读本文过程中,想不清楚的时候,回想一下这个模型,可能会有所帮助。

当然,在实际的实现中,比这个复杂多了,但是这个模型已经足够我们理解 Pool 的工作原理了。

Pool 的双端队列是使用数组还是链表

我们知道,队列的存储通常有两种方式,一种是数组,一种是链表,两者的优缺点如下:

优点 缺点
数组 存储空间连续,可以根据下标快速访问 大小固定,扩容成本高
链表 大小不固定,可以很容易增加新的元素 访问效率不如数组。需要额外的空间来存储前后元素的指针

那么 Pool 里面用的是数组还是链表呢?答案是:数组 + 链表

Pool 为什么要用数组 + 链表

为了快速访问队列中的元素,使用数组是最好的选择,但是数组的大小是固定的,如果队列中的元素很多,那么数组就会很快被填满。 如果我们还想继续往队列中添加元素,那么就需要对数组进行扩容,这个成本是很高的(因为本来就是需要频繁分配/销毁对象的场景才会使用 Pool)。 同时,我们知道 Pool 设计的目的就是为了减少频繁内存分配带来的性能问题,如果在使用 Pool 的过程中频繁对其进行扩容,那么就违背了 Pool 设计的初衷了。

为了解决数组扩容的问题,我们可以考虑一下使用链表。在我们 Put 的时候往链表的头部添加一个元素,然后 Get 的时候从链表的尾部取出一个元素(还需要移除)。 但是这样我们就需要一个结构体来表示我们的节点了,那么问题来了,我们又需要频繁地分配/销毁这个结构体,这样就又回到了最开始的问题了(频繁创建/销毁对象)。 所以,只使用链表也不是一个好的选择。

所以,Pool 采用了 数组 + 链表 的方式来实现双端队列,它们的关系如下:

  • 数组:存储队列中的元素,数组的大小是固定的。
  • 链表:当一个数组存储满了之后,就会新建一个数组,然后通过链表将这两个数组串联起来。

最终,Pool 的双端队列的结构如下:

pool_2

数组如何实现队列

我们知道,数组的存储空间是固定的一块连续的内存,所以我们可以通过下标来访问数组中的元素。 我们 push 的时候,将 head 下标 +1,然后 pop 的时候,也要修改对应的下标, 但是这样会导致一个问题是,早晚 head 会超出数组的下标范围,但这个时候数组可能还有很多空间, 因为在我们 push 的时候,可能也同时在 pop,所以数组中的空间可能还有很多。

所以,就可以在 head 超出数组下标范围的时候,将 head 对数组长度取模,这样就可以循环使用数组了:

pool_3

不过对于这种情况,使用下面的图更加直观:

pool_4

Pool 里面是使用 poolDequeue 这个结构体来表示上图这个队列的,本身是一个数组,但是是当作环形队列使用的。

poolChain 的最终模型

poolChain 就是上面说的 数组 + 链表 的组合,它的最终模型如下:

pool_5

说明:

  • poolChain 本身是一个双向链表,每个节点都是一个 poolDequeue,每个节点都有 prevnext 指针指向前后节点。上图的 head 是头节点,tail 是尾节点。
  • poolDequeue 是一个数组,它的大小是固定的,但是它是当作环形队列使用的。
  • tail 初始化的时候长度为 8,具体可参考 poolChainpushHead 方法,后面会说到。
  • pushHead 的时候,如果 head 中的环形队列已经满了,那么就会新建一个 poolDequeue,然后将它插入到 head 的前面。(新的 head 节点的大小为前一个 head 节点的两倍)
  • popTailpopHead 的时候,如果 tail 或者 head 中的环形队列(poolDequeue)已经空了,那么就会将它从链表中移除。

多个 P 的情况下的 poolChain

这里假设 P 跟我们机器的逻辑处理器的数量一致。(这里涉及到了 goroutine 的调度机制,不了解可以先了解一下再回来看。)

我们知道,在 go 中,每一个 goroutine 都会绑定一个 P,这样才可以充分利用多核的优势。 设想一下,如果我们有多个 goroutine 同时存储一个 Pool,会出现什么情况呢? 会导致很激烈的数据竞争,虽然没有使用 Mutex 这种相对低效的互斥锁来解决竞争问题,使用的是原子操作,但是也会导致性能下降。

所以,在 Pool 的实现中,会为每一个 P 都创建一个 poolChain,每次存取,先操作本地 P 绑定的 poolChain,这样就可以减少多个 goroutine 同时操作一个 Pool 的竞争问题了。

所以,最终的 Pool 的模型会长成下面这个样子,每个 G 关联了一个 poolChain

pool_6

注意:这里不是 Pool 实际的样子,只是为了说明 Pool 的实现原理。

最终实现中的 Pool

在实际的实现中,其实会跟上一个图有一些差异,可以说复杂很多:

pool_7

从上图可以看出,其实每个 P 关联的并不是 poolChain,而是 poolLocalpoolLocal 里面包含了一个 poolLocalInternal 和一个 padpad 是为了避免伪共享而添加的。而 poolLocalInternal 就是实际上 Pool 中存储数据的一个结构体。poolLocalInternal 中包含了两个字段,privatesharedshared 就是我们上面说的 poolChain,而 private 是一个 any 类型的字段,在我们调用 PoolPut 方法的时候,会先尝试将数据存储到 private 中,如果 private 中已经有数据了,那么就会将数据存储到 shared 中。同样的,在 Get 的时候,也会先从 private 中获取数据,如果 private 中没有数据,那么就会从 shared 中获取数据。

而相应的,Pool 存取数据会变成:

  • pushHead:我们调用 PoolPut 方法的时候,只会写入到本地 P 关联的那个 poolLocal 中。
  • popHead:这个方法也只能从本地 P 关联的 poolLocal 中取数据。
  • popTail:这个方法会从本地 P 关联的 poolLocal 中取数据,如果取不到,那么就会从其他 P 关联的 poolLocal 中取数据。

这样就可以减少多个 goroutine 同时操作一个 Pool 的竞争问题了(但是无法避免)。

Pool 模型总结

最后,我们将这一节的内容总结一下,可以得到下面这个图:

pool_8

说明:

  • go 进程内会有多个 P,每个 P 都会关联一个 poolLocal
  • poolLocal 中包含了一个 poolLocalInternal 和一个 padpoolLocalInternal 中包含了两个字段,privateshared
  • private 是一个 any 类型的字段,用来存储我们调用 PoolPut 方法的时候传入的数据,Get 的时候如果 private 中有数据,那么就会直接返回 private 中的数据。
  • shared 是一个 poolChain,用来存储我们调用 PoolPut 方法的时候传入的数据,Get 的时候如果当前 P 绑定的 poolLocal 内是空的,那么可以从其他 P 绑定的 shared 的尾部获取。
  • poolChain 是一个双向链表,每个节点都是一个 poolDequeue,每个节点都有 prevnext 指针指向前后节点。上图的 head 是头节点,tail 是尾节点。
  • poolDequeue 是一个数组,它的大小是固定的,但是它是当作环形队列使用的。
  • pushHead 的时候,如果 poolChain 的节点满了,那么会新建一个节点,其容量为前一个节点的两倍。
  • poolChain 支持三种操作:pushHeadpopHeadpopTail
  • pushHead 会将数据存储到当前 P 关联的 poolChain 的头部。

Pool 中的结构体

在开始分析源码之前,我们先来看一下 Pool 的 UML 图:

pool_9

上图中已经包含 Pool 实现的所有关键结构体了,下面我们来分析一下这些结构体的作用。

sync.Pool 结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Pool struct {
// noCopy 用于防止 Pool 被复制(可以使用 go vet 检测)
noCopy noCopy

// local 的主要作用是,多个 goroutine 同时访问 Pool 时,可以减少竞争,提升性能。
// 实际类型是 [P]poolLocal。长度是 localSize。
local unsafe.Pointer
// []poolLocal 的长度。也就是 local 切片的长度
localSize uintptr

// 存放的是上一轮 GC 时的 local 字段的值。
victim unsafe.Pointer
// victim 数组的长度
victimSize uintptr

// 新建对象的方法。
// Get 的时候如果 Pool 中没有对象可用,会调用这个方法来新建一个对象。
New func() any
}

字段说明:

  • local:就是我们上文说到的 poolLocal 类型的切片,长度是 runtime.GOMAXPROCS(0),也就是当前 P 的数量。之所以使用切片类型是因为我们可以在运行的过程中调整 P 的数量,所以它的长度并不是固定的,如果 P 的数量变了,poolLocal 也会跟着改变。
  • localSizelocal 的长度。
  • victim:上一轮 GC 时的 local 字段的值。
  • victimSizevictim 的长度。
  • New:新建对象的方法。

victim 的作用是在 GC 的时候,将 local 的值赋值给 victim,然后将 local 置为 nil,这样就可以避免在 GC 的时候,local 中的对象被回收掉。 当然,并不是完全不会回收,再经历一次 GC 之后,victim 中的对象就会被回收掉。这样做的好处是,可以避免 GC 的时候清除 Pool 中的所有对象, 这样在 GC 之后如果需要大量地从 Pool 中获取对象也不至于产生瞬时的性能抖动

victim cache 是计算机科学中的一个术语,victim cache 是位于 cpu cache 和主存之间的又一级 cache,用于存放由于失效而被丢弃(替换)的那些块。 每当失效发生时,在访问主存之前,victim cache 都会被检查,如果命中,就不会访问主存。

Pool 获取对象的流程

最终,当我们调用 PoolGet 方法的时候,会按下图的流程来获取对象:

pool_10

说明:

  1. 首先会从当前 P 关联的 poolLocal 中的 private 字段中获取对象,如果获取到了,那么直接返回。
  2. 如果 private 字段中没有对象,那么会从当前 P 关联的 poolLocal 中的 shared 字段中获取对象,如果获取到了,那么直接返回(这里使用的是 popHead 方法)。
  3. 尝试从其他 P 关联的 poolLocal 中的 shared 字段中获取对象,如果获取到了,那么直接返回(这里使用的是 popTail 方法)。
  4. 如果其他 P 关联的 poolLocal 中的 shared 字段中也没有对象,那么会从 victim 中获取对象,如果获取到了,那么直接返回。
  5. 如果 victim 中也没有对象,那么会调用 New 方法来创建一个新的对象(当然前提是我们创建 Pool 对象的时候设置了 New 字段)。
  6. 如果 New 字段也没有设置,那么会返回 nil

poolLocal 和 poolLocalInternal 结构体:

Pool 中,使用了 poolLocalpoolLocalInternal 两个结构体来表示实际存储数据的结构体。 当然我们也可以只使用 poolLocalInternal 这个结构体,但是为了避免伪共享,在 Pool 的实现中, 将 poolLocalInternal 放在了 poolLocal 的第一个字段,然后在 poolLocal 中添加了一个 pad 字段,用来填充 poolLocalInternal 到 cache line 的大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 实际存储 Pool 的数据的结构体
type poolLocalInternal struct {
// private 用于存储 Pool 的 Put 方法传入的数据。
// Get 的时候如果 private 不为空,那么直接返回 private 中的数据。
// 只能被当前 P 使用。
private any
// 本地 P 可以pushHead/popHead
// 任何 P 都可以 popTail。
shared poolChain
}

// Pool 中的 local 属性的元素类型。
// Pool 中的 local 是一个元素类型为 poolLocal 的切片,长度为 runtime.GOMAXPROCS(0)。
type poolLocal struct {
poolLocalInternal

// pad 用于填充 poolLocalInternal 到 cache line 的大小。
// 为了避免伪共享,将 poolLocalInternal 放在第一个字段。
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

其实这里比较关键的地方是 pad 字段的作用,我们知道,CPU 会将内存分成多个 cache line,CPU 从内存中读取数据的时候, 并不是一个字节一个字节地读取,而是一次性读取一个 cache line 的数据。但是如果我们的数据结构中的字段不是按照 cache line 的大小来排列的, 比如跨了两个 cache line,那么在读取的时候就会产生伪共享,这样就会降低性能。

pool_11

伪共享的原因是,数据跨了两个 cache line,那么在读取的时候,就会将两个 cache line 都读取到 CPU 的 cache 中, 这样有可能会导致不同 CPU 在发生数据竞争的时候,会使一些不相关的数据也会失效,从而导致性能下降。 如果对齐到 cache line,那么从内存读取数据的时候,就不会将一些不相关的数据也读取到 CPU 的 cache 中,从而避免了伪共享。

poolChain、poolChainElt 和 poolDequeue 结构体

为什么要把这三个放一起讲呢?因为这三个结构体就是 Pool 中做实际存取数据的结构体,三者作用如下:

  • poolChainpoolChain 是一个链表,每个节点都是 poolChainElt
  • poolChainElt:每个 poolChainElt 中包含了一个 poolDequeue,同时包含了指向前一个节点和后一个节点的指针。
  • poolDequeuepoolDequeue 是一个双端队列(环形队列,使用数组存储),用来存储 Pool 中的数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// poolChain 是 poolDequeue 的动态大小版本。
//
// 这是作为 poolDequeues 的双向链表队列实现的,其中每个 dequeue 都是前一个 dequeue 大小的两倍。
// 一旦 dequeue 填满,就会分配一个新的 dequeue,并且 pushHead 只会 push 到最新的 dequeue。
// pop 可以从头部或者尾部进行,一旦 dequeue 空了,它就会从 poolChain 中删除。
//
// poolChain 的实现是一个双向链表,每个 poolChainElt 都是一个 poolDequeue。
//(也就是元素是 poolDequeue 的双向链表)
type poolChain struct {
// head 是要推送到的 poolDequeue。
// 只能由生产者访问,因此不需要同步。
head *poolChainElt

// tail 是从 poolDequeue 中 pop 的节点。
// 这是由消费者访问的,因此读取和写入必须是原子的。
tail *poolChainElt
}

// poolChainElt 是 poolChain 中的元素。
type poolChainElt struct {
poolDequeue

// next 和 prev 链接到此 poolChain 中相邻的 poolChainElts。
//
// next 由生产者原子写入,由消费者原子读取。 它只从 nil 过渡到 non-nil。
// prev 由消费者原子写入,由生产者原子读取。 它只会从 non-nil 过渡到 nil。
next, prev *poolChainElt
}

// poolDequeue 是一个无锁的固定大小的单生产者、多消费者队列。
// 单个生产者既可以从头部 push 也可以从头部 pop,消费者只可以从尾部 pop。
//
// 它有一个附加功能,它会清空不使用的槽,从而避免不必要的对象保留。
type poolDequeue struct {
// headTail 将 32 位头索引和 32 位尾索引打包在一起。
// 两者都是 vals modulo len(vals)-1 的索引。
//
// tail = 队列尾的索引
// head = 下一个要填充的插槽的索引
//
// [tail, head) 范围内的槽位归消费者所有。
// 消费者继续拥有此范围之外的槽,直到它清空槽,此时所有权传递给生产者。
//
// 头索引存储在最高有效位中,以便我们可以原子地对它做 add 操作,同时溢出是无害的。
headTail uint64

// vals 是存储在此 dequeue 中的 interface{} 值的环形缓冲区。它的大小必须是 2 的幂。
// (队列存储的元素,接口类型)
//
// 如果插槽为空,则 vals[i].typ 为 nil,否则为非 nil。
// 一个插槽仍在使用中,直到 *both* 尾部索引超出它并且 typ 已设置为 nil。(both?)
// 这由消费者原子地设置为 nil,并由生产者原子地读取。
vals []eface
}

poolChain 的结构大概如下:

pool_12

poolDequeue 的结构大概如下:

pool_dequeue

poolDequeue 中,headTail 是一个 uint64 类型,它的高 32 位是 head,低 32 位是 tail。 这样一来,这样就可以使用原子操作来保证 headtail 的协程安全了。

poolDequeue 中,vals 是一个切片类型,元素类型是 efaceeface 是一个空接口类型,内存布局跟 interface{} 一样, 因此可以看作是一个 interface{} 类型。如果这里看不明白可以看看《go interface 设计与实现》

那为什么不直接使用 interface{} 类型呢?因为如果用了 interface{} 类型, 那么 poolDequeue 就无法区分保存的 valnil 还是一个空的槽。 那有什么解决办法呢?在 pushpop 的时候使用互斥锁可以解决这个问题(因为目前的实现是使用原子操作的,所以这才需要判断保存的 valnil 还是空槽), 但是这样就会导致性能下降。

Pool 源码剖析

在源码剖析的开始部分,不会深入去讲底层的存取细节,我们将其当作一个抽象的队列来看待即可,这样可能会更加便于理解。不过在讲完 Pool 的实现之后,最后还是会展开讲述这个复杂的 "队列" 的那些实现细节。

接下来,我们来看看 Pool 的源码实现。 Pool 提供的接口非常简单,只有 PutGet 两个方法,还有一个 New 字段,用来指定 Pool 中的元素是如何创建的:

  • New 属性:New 是一个函数,用来创建 Pool 中的元素。
  • Put 方法:Put 方法用来向 Pool 中放入一个元素。
  • Get 方法:Get 方法用来从 Pool 中获取一个元素。

Get

Get 方法的实现如下:

GetPool 中选择一个任意项,将其从 Pool 中移除,并将其返回给调用者。 Get 可能会选择忽略池并将其视为空的。 调用方不应假定传递给 Put 的值与 Get 返回的值之间存在任何关系。 (PutGet 之间可能会发生 GC,然后 Pool 里面的元素可能会被 GC 回收掉)

如果 Get 否则返回 nil 并且 p.New 不为 nil,则 Get 返回调用 p.New 的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Get 从 Pool 中获取一个对象
func (p *Pool) Get() any {
// ...
// pin 将当前的 goroutine 和 P 绑定,禁止被抢占,返回当前 P 的本地缓存(poolLocal)和 P 的 ID。
l, pid := p.pin()
// 先看 private 是否为 nil,如果不为 nil,就直接返回 private,并将 private 置为 nil。
x := l.private
l.private = nil
if x == nil {
// 尝试从本地 shared 的头部取。
x, _ = l.shared.popHead()
if x == nil { // 如果本地 shared 的头部取不到,就从其他 P 的 shared 的尾部取。
x = p.getSlow(pid)
}
}
// 将当前的 goroutine 和 P 解绑,允许被抢占。
runtime_procUnpin()
// ...
// 如果 x 为 nil 并且 p.New 不为 nil,则返回 p.New() 的结果。
// 没有就 New 一个。
if x == nil && p.New != nil {
x = p.New()
}
return x
}

Pool Get 的流程可以总结如下:

  1. 将当前的 goroutineP 绑定,禁止被抢占,返回当前 P 的本地缓存(poolLocal)和 PID
  2. 从本地 private 取,如果取不到,就从本地 shared 的头部取,如果取不到,就从其他 Pshared 的尾部取。获取到则返回
  3. 如果从其他的 Pshared 的尾部也获取不到,则从 victim 获取。获取到则返回
  4. 将当前的 goroutineP 解绑,允许被抢占。
  5. 如果 p.New 不为 nil,则返回 p.New 的结果。

再贴一下上面那个图(当然,下图包含了下面的 getSlow 的流程,并不只是 Get):

pool_10

Pool 中一个很关键的操作是 pin,它的作用是将当前的 goroutineP 绑定,禁止被抢占。 这样就可以保证在 GetPut 的时候,都可以获取到当前 P 的本地缓存(poolLocal), 否则,有可能在 GetPut 的时候,P 会被抢占,导致获取到的 poolLocal 不一致,这样 poolLocal 就会失去意义, 不得不再次陷入跟其他 goroutine 竞争的状态,又不得不考虑在如何在不同 goroutine 之间进行同步了。

而绑定了 P 后,在 GetPut 的时候,就可以使用原子操作来代替其他更大粒度的锁了, 但是我们也不必太担心,因为绑定 P 的时间窗口其实很小。

getSlow 源码剖析

Get 中,如果从 privateshared 中都取不到,就会调用 getSlow 方法。它的作用是:

  1. 尝试从其他 Pshared 的尾部取。
  2. 尝试从 victim 获取。

getSlow 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 从其他 P 的 shared 的尾部取。
func (p *Pool) getSlow(pid int) any {
// 获取 local 的大小和 local。
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 尝试从其他 P 的 shared 的尾部取。
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size)) // pid+i+1 的用途从下一个 P 开始取。
if x, _ := l.shared.popTail(); x != nil { // 尝试从每一个 P 的 shared 的尾部取,获取到则返回。
return x
}
}

// 尝试从 victim cache 取。
// 我们在尝试从所有主缓存中偷取之后执行此操作,
// 因为我们希望 victim cache 中的对象尽可能地老化。
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size { // 如果 pid 大于 size,会发生越界,直接返回 nil。这意味着 gomaxprocs 相比上一次 poolCleanup 的时候变大了。
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil { // victim 实际上也是一个 poolLocal 数组,每个 poolLocal 都有一个 private 字段,这个字段就是 victim cache。
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}

// 将 victim cache 标记为空,以便将来的 Get 不会再考虑它。
atomic.StoreUintptr(&p.victimSize, 0)

return nil
}

pin 源码剖析

pin 方法的实现如下:

pin 将当前 goroutine 固定到 P 上,禁用抢占并返回 poolLocal 池中对应的 poolLocal。 调用方必须在完成取值后调用 runtime_procUnpin() 来取消抢占。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 将当前的 goroutine 和 P 绑定,禁止被抢。
func (p *Pool) pin() (*poolLocal, int) {
// procPin 函数的目的是为了当前 G 绑定到 P 上。
pid := runtime_procPin() // 返回当前 P 的 id。

// 在 pinSlow 中,我们会存储 local,然后再存储 localSize,
// 这里我们以相反的顺序读取。 由于我们禁用了抢占,
// 因此 GC 不能在两者之间发生。
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s { // pid < s,说明当前 P 已经初始化过了。
return indexLocal(l, pid), pid // 返回当前 P 的 poolLocal。
}
return p.pinSlow() // 如果当前 P 没有初始化过,那么就调用 pinSlow()。
}

func (p *Pool) pinSlow() (*poolLocal, int) {
// 在互斥锁下重试。
// 在固定时无法锁定互斥锁。
runtime_procUnpin() // 解除当前 P 的绑定。
allPoolsMu.Lock() // 加全局锁。
defer allPoolsMu.Unlock() // 解锁。
pid := runtime_procPin() // 重新绑定当前 P。
// 在固定时不会调用 poolCleanup。(无法被抢占,GC 不会发生)
s := p.localSize
l := p.local
if uintptr(pid) < s { // 这其实是一个 double-checking,如果在加锁期间,其他 goroutine 已经初始化过了,就直接返回。
return indexLocal(l, pid), pid
}

// p.local == nil 说明 pool 还没有初始化过。
if p.local == nil { // 如果当前 P 没有初始化过,那么就将当前 P 添加到 allPools 中。
allPools = append(allPools, p)
}

// 当 local 数组为空,或者和当前的 runtime.GOMAXPROCS 不一致时,
// 将触发重新创建 local 数组,以和 P 的个数保持一致。
// 如果在 GC 之间更改了 GOMAXPROCS,我们将重新分配数组并丢弃旧数组。
size := runtime.GOMAXPROCS(0) // 获取当前 GOMAXPROCS(也就是 P 的个数)
local := make([]poolLocal, size) // 创建一个 poolLocal 数组
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release // 将当前 P 的 poolLocal 添加到 p.local 中
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release // 将当前 P 的 poolLocal 添加到 p.localSize 中
return &local[pid], pid // 返回当前 P 关联的 poolLocal,以及当前 P 的 id。
}

pinSlow 中比较关键的操作是,它会初始化当前 P 关联的 poolLocal,并将其添加到 allPools 中。 关于 allPools 的作用,可以看下一小节。

pinSlow 的流程: 1. 解除当前 P 的绑定。 2. 加全局 Pool 的锁。 3. 重新绑定当前 P。 4. 如果当前 Pid 小于 localSize,那么就返回当前 PpoolLocal。(典型的 double-checking) 5. 如果 local 还没初始化,那么将当前 PpoolLocal 添加到 allPools 中。 6. 初始化 local。最后返回当前 PpoolLocal

对于 local 的初始化,我们可以参考一下下图(我们需要知道的是,切片的底层结构体的第一个字段是一个数组):

pool_13

&local[0][]poolLocal 的首地址,unsafe.Pointer(&local[0]) 就是 poolLocal 数组的首地址。

indexLocal 源码剖析

我们在上面的代码中还可以看到一个 indexLocal 函数,它的作用是返回 poolLocal 数组中的第 i 个元素。

1
2
3
4
5
6
// l 指向了 poolLocal 数组的首地址,i 是数组的索引。
// 返回了数组中第 i 个元素,其类型是 poolLocal。
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}

我们之前在看 Pool 结构体的时候,看到过 local 字段,它的类型是 unsafe.Pointer,也就是一个指针。 然后我们再结合一下 indexLocal 的实现,就可以知道 local 字段指向的是一个 poolLocal 数组了。 其中 lpoolLocal 数组的首地址,i 是数组的索引,unsafe.Sizeof(poolLocal{})poolLocal 的大小。

这一小节没看懂可以参考一下我写的另外一篇文章 《深入理解 go unsafe》

allPools 的作用

allPools 的目的是在 GC 的时候,遍历所有的 Pool 对象,将其中的 victim 替换为 local,然后将 local 设置为 nil。 这样后续的 Get 操作在 local 获取不到的时候,可以从 victim 中获取。一定程度上减少了 GC 后的性能抖动。

Pool 中,还定义了下面几个全局变量:

1
2
3
4
5
6
7
8
9
10
11
var (
// 保护 allPools 和 oldPools 的互斥锁。
allPoolsMu Mutex

// allPools 是所有非空 primary cache 的 pool 的集合。
// 该集合由 allPoolsMu 和 pinning 保护,或者 STW 保护。
allPools []*Pool

// oldPools 是所有可能非空 victim cache 的 pool 的集合。
oldPools []*Pool
)

如果我们第一次看这些变量,可能会有点懵,不知道它们的作用是什么。 我们可以再结合一下 poolCleanup 函数的实现,就可以知道它们的作用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func poolCleanup() {
// 在垃圾收集开始时,这个函数会被调用。
// 它不能分配并且可能不应该调用任何运行时函数。

// 从 allPools 中删除 victim 缓存。
for _, p := range oldPools {
p.victim = nil // 作用是让 GC 可以回收 victim 缓存中的对象。
p.victimSize = 0
}

// 将 primary 缓存移动到 victim 缓存。
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}

// 具有非空主缓存的池现在具有非空 victim 缓存,并且没有池具有 primary 缓存。
oldPools, allPools = allPools, nil
}

poolCleanup 这个函数会在 GC 开始的时候被调用,它的作用是将 local 移动到 victim 中。 同时,将 victim 置为 nil,这样 GC 就可以回收 victim 中的对象了,也就是说要经历两轮 GC local 才会真正地被回收。 也就意味着,GC 的时候,local 其实并没有被回收,而是被移动到了 victim 中。

poolCleanup 可以用下图表示,实际上就是使用 locallocalSize 覆盖 victimvictimSize

pool_14

Put

Put 的实现比较简单,就是将对象放到 local 中,不需要 Get 那种操作其他 P 绑定的 poolLocal 的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Put 将 x 添加到 Pool 中。
func (p *Pool) Put(x any) {
if x == nil {
return
}
// ...
// 获取 poolLocal
l, _ := p.pin() // 将当前 goroutine 与 P 绑定。获取当前 P 关联的 poolLocal,以及当前 P 的 id。
if l.private == nil { // 优先放入 private
l.private = x
} else { // 如果 private 已经有值了,就放入 shared
l.shared.pushHead(x) // 这部分其他 P 也是可以访问的。
}
runtime_procUnpin() // 解除当前 P 的绑定。
// ...
}

Pool Put 的流程: 1. 如果 Put 的值是 nil,则直接返回。 2. 将当前的 goroutineP 绑定,禁止被抢占,返回当前 P 的本地缓存(poolLocal)和 PID。 3. 如果本地 private 为空,则将 x 放入本地 private。 4. 如果本地 private 不为空,则将 x 放入本地 shared 的头部。 5. 将当前的 goroutineP 解绑,允许被抢占。

这里面的 pinruntime_procUnpin,我们在前文已经介绍过了,这里就不再赘述了。

New

这里说的 Newsync.Pool 中的 New 字段,在我们尝试了所有方法都获取不到对象的时候, 会判断 PoolNew 属性是否为 nil,如果不为 nil,则会调用 New 方法,创建一个新的对象。

poolChain 和 poolDequeue 源码剖析

poolChain 是一个双向链表,它的每一个节点的元素是 poolDequeue

在上文中,对于 GetPut 的细节,还没有具体展开,因为不了解这些细节也不影响我们理解 Pool 的整体流程。 现在是时候来看看 poolChainpoolDequeue 这两个结构体的实现了,会结合起来一起看。

poolChainpoolDequeue 里面都提供了以下三个方法:

  • pushHead:将对象放到队列的头部。
  • popHead:从队列的头部取出一个对象。
  • popTail:从队列的尾部取出一个对象。

不一样的是,poolChain 里面的方法会处理链表节点的创建和销毁,而 poolDequeue 里面的方法才是实际从队列存取对象的方法。

pushHead

poolChainpushHead 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 添加一个元素到队列头部
func (c *poolChain) pushHead(val any) {
// 链表头
d := c.head
if d == nil { // 链表为空
// 初始化链表。
// 新建 poolChainElt,然后 c 的 head 和 tail 都指向这个新建的元素。
const initSize = 8 // 初始化大小为 8
// 新建一个节点,类型为 poolChainElt
d = new(poolChainElt)
d.vals = make([]eface, initSize)
// 将 c 的 head 和 tail 都指向这个新建的元素
c.head = d
// 使用原子操作保存 c.tail,因为其他 goroutine 也可能会修改 c.tail。
storePoolChainElt(&c.tail, d)
}

// poolQueue 还没满的时候可以成功 push,pushHead 会返回 true。
// poolQueue 满的时候 pushHead 返回 false。
if d.pushHead(val) {
return
}

// 当前 dequeue 已满。分配一个两倍大小的新 dequeue。
newSize := len(d.vals) * 2
if newSize >= dequeueLimit { // 限制单个 dequeue 的最大大小
newSize = dequeueLimit
}

// 新建 poolChainElt,然后 c 的 head 指向这个新建的元素。
// 同时,d 的 next 指向这个新建的元素。
d2 := &poolChainElt{prev: d} // 因为是加到队列头,所以 prev 指向 d
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}

poolChainpushHead 方法的流程:

  1. 如果链表为空,那么就初始化链表。
  2. 如果链表不为空,那么就尝试 pushHead
  3. 如果 pushHead 失败,那么就分配一个两倍大小的新 dequeue
  4. 然后把新 dequeue 放到链表头部。(push 的时候已经锁定了 goroutineP 上,所以这一步是没有并发问题的)

poolChainpushHead 方法中,唯一需要特别注意的是 storePoolChainElt(&c.tail, d) 这一行代码。 这里使用了 storePoolChainElt 方法,而不是直接使用 c.tail = d。 这是因为 c.tail 是会和其他 goroutine 存在竞争的(其他 goroutine 获取对象的时候可能会修改 tail),因此不能直接赋值,而是使用了原子操作。

poolChainpushHead 方法的流程图如下:

pool_15

队列头来说,prev 实际上指向的是 head 的下一个元素,但是又不能叫 next,因为 next 被用来表示 tail 的下一个元素,所以就叫了 prev。我们需要知道 prevnext 本质上都是指向了下一个元素,就看你是从队列头还是队列尾来查找。

poolChain 中,其实实际存储对象的时候并不是在 poolChain,而是在 poolChain 的每一个节点中的 poolDequeue 中。 所以我们再来看看 poolDequeue 中的 pushHead 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// pushHead 在队列的头部添加 val。
// 如果队列已满,则返回 false。
// 它只能由单个生产者调用。(也就是当前 goroutine 绑定的 P)
//
// 注意:head 指向的是下一个要插入的元素的位置,所以插入的时候,先将 head 指向的位置设置为 val,然后 head++。
func (d *poolDequeue) pushHead(val any) bool {
// 读取 head 和 tail 的值。
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { // 队列满了
// 不能直接 tail == head,因为初始化的时候,head 和 tail 都是 0
return false
}
// 取模意味着,当 head 超过 len(d.vals) 时,会从头开始。也就是一个环。
slot := &d.vals[head&uint32(len(d.vals)-1)] // 获取 head 对应的槽位。

// push 只有当前协程能 push,所以不需要加锁。
// 但是 popTail 可能会在另一个协程中执行,所以需要判断当前的槽位是否被 popTail 释放了。
// 因为 popTail 的操作是先 cas 修改 headTail,然后再获取 slot 的值,最后才将 slot 置 0 的。
// 如果修改了 headTail 之后还没有来得及将 slot 置 0,那么这里就会判断出槽位还没有被释放。
typ := atomic.LoadPointer(&slot.typ) // 获取槽位的类型
if typ != nil { // 槽位不为空
// 队列依然是满的
return false
}

// 如果 typ 已经是 nil,那么这里后续的操作是安全的。

if val == nil { // put 进来的值是 nil,使用 dequeueNil 代替
val = dequeueNil(nil)
}
// 将 val 赋值给槽位
*(*any)(unsafe.Pointer(slot)) = val

// 增加 head。为什么是 1<<dequeueBits 呢?
// 因为 head 是高 32 位,所以要左移 32 位
// 本质上是:head = head + 1
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}

poolDequeuepushHead 方法的流程:

pushHead 的处理流程: 1. 判断队列是否已满,如果已满则返回 false。 2. 获取下一个 push 的位置,先判断这个位置是否可用,如果不可用则返回 false。(可能和 popTail 冲突,如果 popTail 没来得及将其中的值取出来,那么这个槽就还不能使用) 3. 如果可用,则将值放入这个位置,然后将 head 指针加 1

poolDequeue 中,我们看到有一行代码比较奇怪:atomic.LoadUint64(&d.headTail),这是为了可以原子操作存取 headtail 两个值。这样就可以避免锁的使用了。

popHead

poolChainpopHead 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// popHead 会从链表头部 pop 出一个元素。
// 返回值:
// 1. any:pop 出的元素。
// 2. bool:是否成功 pop 出元素。
func (c *poolChain) popHead() (any, bool) {
d := c.head
// d == nil 的情况:
// 1. 链表为空。
// 2. 链表只有一个元素,且这个元素已经 pop 完了。(被其他协程 pop 了)
for d != nil {
// 这是因为,在我们拿到 d 之后,还没来得及 pop 的话,其他协程可能已经 pop 了。
// 所以需要 for 循环。典型的无锁编程。
//
// poolQueue 还没空的时候可以成功 pop,popHead 会返回 true。
if val, ok := d.popHead(); ok {
return val, ok
}

// 之前的 dequeue 中可能仍有未消费的元素,因此尝试后退。
d = loadPoolChainElt(&d.prev) // 获取下一个 poolChainElt
}
return nil, false
}

prev 虽然从命名上看是前一个,但是实际上是下一个节点。从 head 开始遍历的话,prev 就是下一个节点,从 tail 开始遍历的话,next 就是下一个节点。

poolChainpopHead 的处理流程:

  1. 如果链表为空,那么就返回 false
  2. 如果链表不为空,那么就尝试 popHead
  3. 如果 popHead 失败,那么就尝试从链表下一个 dequeue popHead。(循环直到最后一个 poolChainElt

poolChainpopHead 方法的流程图如下:

pool_16

poolChain 中,实际上调用的是 poolDequeuepopHead 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// popHead 删除并返回队列头部的元素。
// 如果队列为空,则返回 false。
// 它只能由单个生产者调用。(也就是当前 goroutine 绑定的 P)
func (d *poolDequeue) popHead() (any, bool) {
// slot 用来保存从队列头部取出的值
var slot *eface
for { // 获取不到槽会继续循环,直到获取到槽或者发现队列为空为止。
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head { // 队列为空
return nil, false
}

// 先将 head 减 1,然后再获取槽位。
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// 成功获取到槽
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}

// 成功获取到 slot,将 slot 的值取出来
// head - 1 了,说明这个槽是可以被安全使用的,所以不需要加锁。
// 因为 popTail 不会影响到 head,所以不会影响到这里。
// 另外,pushHead 也没有影响,因为在实际使用中,只有一个协程会 pushHead。

val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nils
}
*slot = eface{} // 将 slot 置 0
return val, true
}

poolDequeuepopHead 的处理流程:

  1. 判断队列是否为空,如果为空则返回 false
  2. 尝试将 head 指针减 1,如果失败则进行下一轮尝试(自旋,for + 原子操作是无锁编程中很常见的写法)。
  3. head 指针对应的槽位的值取出来,然后将槽位置为 nil

popTail

poolChainpopTail 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 从队列尾取出一个元素
func (c *poolChain) popTail() (any, bool) {
// 获取链表尾部的 poolChainElt
// 如果链表为空,返回 nil,false
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}

for {
// 在我们 popTail 之前获取 next 指针很重要。
// 一般来说,d 可能暂时为空,但如果 next 在 pop 之前为非 nil,
// 并且 pop 失败,则 d 永久为空,这是唯一可以安全地将 d 从链中删除的条件。
//
// 解析:next 非 nil,但是 pop 失败:d 肯定是空的了,这个时候我们可以安全地将 d 从链表中删除。
d2 := loadPoolChainElt(&d.next)

// poolQueue 还没空的时候可以成功 pop,popTail 会返回 true。
if val, ok := d.popTail(); ok {
return val, ok
}

// 队列已经空了
if d2 == nil {
// d 是唯一的 dequeue。它现在是空的,但以后可能会有新的元素 push 进来。
return nil, false
}

// 链表的尾部已经被消费完了,所以转到下一个 dequeue。
// 尝试从链表中删除它,这样下一次 pop 就不必再次查看空的 dequeue。
// (本质:移除空的 tail 元素)
//
// 开始处理 d2,d2 是 d 的下一个 dequeue。(开始尝试从 d2 中 pop)
// cas:c.tail 由 d 变为 d2。
// 如果 cas 成功,说明 d2 是最新的 tail,d 可以被移除。
// d2 的 prev 指针设置为 nil,这样 gc 可以回收 d。(d2 没有下一个元素了)
//
// c.tail(d) 指向下一个 poolChainElt
// 同时下一个 poolChainElt 的 prev 指针(d)设置为 nil。
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// 我们赢得了竞争。清除 prev 指针,以便垃圾收集器可以收集空的 dequeue,
// 以便 popHead 的时候不做多余的查找操作。
storePoolChainElt(&d2.prev, nil)
}
// d 指向 d2,继续处理下一个 dequeue
d = d2
}
}

poolChainpopTail 的处理流程:

  1. 如果链表为空,那么就返回 false
  2. 如果链表不为空,那么就尝试 popTail
  3. 如果 popTail 失败,那么就尝试从链表上一个 dequeue popTail。(循环直到第一个 poolChainElt

poolChainpopTail 方法的流程图如下:

pool_17

popTail 的时候,如果发现 poolChainElt 已经为空了,那么就会从链表中移除它:

pool_18

poolDequeuepopTail 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// popHead 和 popTail 都有取值,然后将槽置空的过程,但是它们的实现是不一样的。
// 在 popHead 中,是直接将槽的值设置为 eface{},而在 popTail 中,
// 先将 val 设置为 nil,然后将 typ 通过原子操作设置为 nil。
// 这样在 pushHead 的时候就可以安全操作了,只要先使用原子操作判断 typ 是否为 nil 就可以了。

// popTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
//
// popTail 删除并返回队列尾部的元素。
// 如果队列为空,则返回 false。
// 它可以被任意数量的消费者调用。(如何保证并发安全?原子操作)
func (d *poolDequeue) popTail() (any, bool) {
// slot 用来保存从队列尾取出来的值
var slot *eface
// 获取队列尾部的值
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
// 队列为空,直接返回
return nil, false
}

// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
//
// 确认 head 和 tail(对于我们上面的推测性检查)并增加 tail。
// 如果成功,那么我们就拥有 tail 的插槽。
ptrs2 := d.pack(head, tail+1) // 新的 headTail
// 如果返回 false,说明从 Load 到 CompareAndSwap 期间,有其他 goroutine 修改了 headTail。
// 则需要重新 Load,再次尝试(再次执行 for 循环)。
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}

// 成功获取到 slot,将 slot 的值取出来
// 问题来了:
// 在这里 cas 成功的时候,这个 slot 实际上可能是还没有释放的,在这个时候,pushHead 其实不能写入到这个 slot 中。
// 因此,我们可以在 pushHead 的代码中看到,会先判断 slot.typ 是否为 nil,如果不为 nil,说明 slot 还没有被释放,那么就直接 return 了。
// 这种情况发生在 poolDequeue 满了的时候。

// We now own slot.
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) { // 这是什么情况?非空,但是值等于 dequeueNil(nil) ?
val = nil
}

// 取出值之后,将 slot 置 0。
// 在 poolDequeue 中,值是允许为 nil 的,但是 pool 的 Put 中判断值为 nil 的时候就直接 return 了。

// 告诉 pushHead 我们已经操作完这个插槽。
// 将插槽归零也很重要,这样我们就不会留下可能使该对象比必要时间更长的引用。
//
// 我们首先写入 val,然后通过原子写入 typ 来发布我们已完成此插槽。
slot.val = nil
atomic.StorePointer(&slot.typ, nil) // 为什么要用原子操作?因为 pushHead 也会读取这个值,所以需要保证读取的是最新的值。

// 此时 pushHead 可以操作这个槽了。

return val, true
}

poolDequeuepopTail 的处理流程:

  1. 判断队列是否为空,如果为空则返回 false
  2. 尝试将 tail 指针加 1,如果失败则进行下一轮尝试(自旋)。
  3. tail 指针对应的槽位的值取出来,然后将槽位置为 nil

sync.Pool 设计要点

sync.Pool 中,我们可以看到它有许许多多的编程技巧,为了实现一个高性能的 Pool 要做的东西是非常复杂的, 但是对于我们而言,我们只会用到它暴露出来的两个非常简单的接口 PutGet,这其实也算是 Go 语言的一种设计哲学吧, 把复杂留给自己,把简单留给用户。但是,我们还是要知道它的实现原理,这样才能更好的使用它。

接下来,我们就再来总结一下 sync.Pool 高性能的一些设计要点:

  1. noCopy 字段,防止 sync.Pool 被复制。sync.Pool 是不能被复制的,否则会导致一些隐晦的错误。
  2. local 字段,用于存储与 P 关联的一个 poolLocal 对象,在多个 goroutine 同时操作的时候,可以减少不同 goroutine 之间的竞争,只有在本地的 poolLocal 中没有找到对象的时候,才会去其他 goroutine 关联的队列中去取。
  3. poolDequeuepushHeadpopTail 之间会存在 headtail 指针上的一些竞争,这些竞争问题是通过原子操作来解决的(相比互斥锁效率更高)。使用了原子操作可能就会有失败的时候,这个时候,再次重试就可以了。
  4. poolDequeue 中的 head/tail 指针使用一个字段来保存,然后通过原子操作保证 head/tail 的一致性。
  5. poolChain 使用链表的方式解决容量问题,并且新增一个元素到链表的时候,容量为上一个元素(poolChain 链表头)的两倍(非常常见的扩容策略)。双向链表,可以接受别的 PpopTail 操作,减少竞争的同时可以充分利用多核。
  6. pin 保证 P 不会被抢占。如果一个 goroutine 在执行 Put 或者 Get 期间被挂起,有可能下次恢复时,绑定的就不是上次的 P 了。那整个过程就会完全乱掉,因为获取到的 poolLocal 不是之前那个了。使用 pin 可以解决这种并发问题。
  7. 自旋操作,因为原子操作失败的时候可能存在竞争,这个时候再尝试一下就可能成功了。(for + 原子操作是无锁编程中很常见的一种编程模式,在 sync.Map 中也有很多类似操作)
  8. pad 内存对齐,可以避免伪共享。
  9. poolDequeue 中存储数据的结构是一个环形队列,是连续的内存,可以充分利用 CPU 的 cache。在访问 poolDequeue 某一项时,其附近的数据项都有可能加载到统一 cache line 中,有利于提升性能。同时它是预先分配内存的,因此其中的数据项可不断复用。

总结

最后,总结一下本文内容:

  • sync.Pool 是一个非常有用的工具,它可以帮助我们减少内存的分配和回收(通过复用对象),提升程序的性能。但是,我们要注意它的使用场景,它适合那些没有状态的对象,同时,我们不能对那些从 PoolGet 出来的对象做任何假设。
  • 我们在 Put 或者 Get 之前,可能需要对我们操作的对象重置一下,防止对后续的操作造成影响。
  • Pool 中的对象存储是使用队列的方式,这个队列的实现是一个链表(poolChain),链表的每一个节点都是一个环形队列(poolDequeue)。这个队列支持以下三种操作:
    • pushHead:将一个对象放到队列的头部。
    • popHead:将队列的头部的对象取出来。
    • popTail:将队列的尾部的对象取出来。
  • sync.Pool 的实现中,使用了很多编程技巧,比如 noCopypinpad、原子操作等等,这些技巧都是为了实现一个高性能的 Pool 而做的一些优化,我们可以学习一下,具体参考上一节。
  • sync.Pool 中,PutGet 操作的时候会先将 goroutineP 绑定,然后再去操作 P 关联的 poolLocal,这样可以减少竞争,提升性能。因为每一个 P 都有一个关联的 poolLocal,所以多个 goroutine 操作的时候,可以充分利用多核。在操作完成后,再解除绑定。
  • 考虑到 GC 直接清除 Pool 中的对象会在 GC 后可能会产生性能抖动,所以在 GC 的时候,其实并不会马上清除 Pool 中的对象,而是将这些对象放到 victim 字段中,在 Get 的过程中,如果所有的 poolLocal 中获取不到对象,则会从 victim 中去找。但是再进行 GC 的时候,旧的 victim 会被清除。也就是 Pool 中对象的淘汰会经历两次 GC