0%

在上一篇文章《深入理解 go chan》中,我们讲解了 chan 相关的一些概念、原理等东西, 今天让我们再深入一下,读一下它的源码,看看底层实际上是怎么实现的。

整体设计

我们可以从以下三个角度看 chan 的设计(源码位于 runtime/chan.go,结构体 hchan 就是 chan 的底层数据结构):

  • 存储:chan 里面的数据是通过一个环形队列来存储的(实际上是一个数组,但是我们视作环形队列来操作。无缓冲 chan 不用存储,会直接从 sender 复制到 receiver
  • 发送:数据发送到 chan 的时候,如果 chan 满了,则会将发送数据的协程挂起,将其放入一个协程队列中,chan 空闲的时候会唤醒这个协程队列。如果 chan 没满,则发送队列为空。
  • 接收:从 chan 中接收数据的时候,如果 chan 是空的,则会将接收数据的协程挂起,将其放入一个协程队列中,当 chan 有数据的时候会唤醒这个协程队列。如果 chan 有数据,则接收队列为空。

文中一些比较关键的名词解释:

  • sender: 表示尝试写入 changoroutine
  • receiver: 表示尝试从 chan 读取数据的 goroutine
  • sendq 是一个队列,存储那些尝试写入 channel 但被阻塞的 goroutine
  • recvq 是一个队列,存储那些尝试读取 channel 但被阻塞的 goroutine
  • g 表示一个协程。
  • gopark 是将协程挂起的函数,协程状态:_Grunning => _Gwaiting
  • goready 是将协程改为可运行状态的函数,协程状态: _Gwaiting => _Grunnable

现在,假设我们有下面这样的一段代码,通过这段代码,我们可以大概看一下 chan 的总体设计:

1
2
3
4
5
6
7
8
9
10
11
12
package main

func main() {
// 创建一个缓冲区大小为 9 的 chan
ch := make(chan int, 9)
// 往 chan 写入 [1,2,3,4,5,6,7]
for i := 0; i < 7; i++ {
ch <- i + 1
}
// 将 1 从缓冲区移出来
<-ch
}

现在,我们的 chan 大概长得像下面这个样子,后面会详细展开将这个图中的所有元素:

上图为了说明而在 recvq 和 sendq 都画了 3 个 G,但实际上 recvq 和 sendq 至少有一个为空。因为不可能有协程正在等待接收数据的时候,还有协程的数据因为发不出去数据而阻塞。

数据结构

在底层,go 是使用 hchan 这个结构体来表示 chan 的,下面是结构体的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type hchan struct {
qcount uint // 缓冲区(环形队列)元素个数
dataqsiz uint // 缓冲区的大小(最多可容纳的元素个数)
buf unsafe.Pointer // 指向缓冲区入口的指针(从 buf 开始 qcount * elemsize 大小的内存就是缓冲区所用的内存)
elemsize uint16 // chan 对应类型元素的大小(主要用以计算第 i 个元素的内存地址)
closed uint32 // chan 是否已经关闭(0-未关闭,1-已关闭)
elemtype *_type // chan 的元素类型
sendx uint // chan 发送操作处理到的位置
recvx uint // chan 接收操作处理到的位置
recvq waitq // 等待接收数据的协程队列(双向链表)
sendq waitq // 等待发送数据的协程队列(双向链表)

// 锁
lock mutex
}

waitq 的数据结构如下:

1
2
3
4
type waitq struct {
first *sudog
last *sudog
}

waitq 用来保存阻塞在等待或接收数据的协程列表(是一个双向链表),在解除阻塞的时候,需要唤醒这两个队列中的数据。

对应上图各字段详细说明

hchan,对于 hchan 这个结构体,我们知道,在 go 里面,结构体字段是存储在一段连续的内存上的(可以看看《深入理解 go unsafe》),所以图中用了连续的一段单元格表示。

下面是各字段说明:

  • qcount: 写入 chan 缓冲区元素个数。我们的代码往 chan 中存入了 7 个数,然后从中取出了一个数,最终还剩 6 个,因此 qcount6
  • dataqsiz: hchan 缓冲区的长度。它在内存中是连续的一段内存,是一个数组,是通过 make 创建的时候传入的,是 9
  • bufhchan 缓冲区指针。指向了一个数组,这个数组就是用来保存发送到 chan 的数据的。
  • sendxrecvx:写、读操作的下标。指向了 buf 指向的数组中的下标,sendx 是下一个发送操作保存的下标,recvx 是下一个接收操作的下标。
  • recvqsendq: 阻塞在 chan 读写上的协程列表。底层是双向链表,链表的元素是 sudogsudog 是一个对 g 的封装),我们可以简单地理解为 recvqsendq 的元素就是 g(协程)。

g 和 sudog 是什么?

上面提到了 gsudogg 是底层用来表示协程的结构体,而 sudog 是对 g 的封装,记录了一些额外的信息,比如关联的 hchan

在 go 里面,协程调度的模型是 GMP 模型,G 代表协程、M 代表线程、P 表示协程调度器。我上图里面的 G 就是代表协程(当然,实际上是 sudog)。 还有一个下面会提到的就是 g0g0 表示 P 上启动的第一个协程。

GMP 模型是另外一个庞大的话题了,大家可以自行去了解一下,对理解本文也很有好处。因为在 chan 阻塞的时候实际上也是一个协程调度的过程。 具体来说,就是从 g 的栈切换到 g0 的栈,然后重新进行协程调度。这个时候 g 因为从运行状态修改为了等待状态,所以在协程调度中不会将它调度来执行, 而是会去找其他可执行的协程来执行。

创建 chan

我们的 make(chan int, 9) 最终会调用 makechan 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// chantype 是 chan 元素类型,size 是缓冲区大小
func makechan(t *chantype, size int) *hchan {
elem := t.elem

// compiler checks this but be safe.
// 检查元素个数是否合法(不能超过 1<<16 个)
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 判断内存是否对齐
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}

// mem 是 chan 缓冲区(环形队列)所需要的内存大小
// mem = 元素大小 * 元素个数
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

// 定义 hchan
var c *hchan
switch {
case mem == 0:
// 队列或者元素大小是 0(比如 make(chan int, 0))
// 只需要分配 hchan 所需要的内存
c = (*hchan)(mallocgc(hchanSize, nil, true))
// ...
case elem.ptrdata == 0:
// elem 类型里面不包含指针
// 分配的内存 = hchan 所需内存 + 缓冲区内存
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 分配的是连续的一段内存,缓冲区内存在 hchan 后面
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素类型里面包含指针
c = new(hchan)
// buf 需要另外分配内存
c.buf = mallocgc(mem, elem, true)
}

// 单个元素的大小
c.elemsize = uint16(elem.size)
// 元素类型
c.elemtype = elem
// 缓冲区大小
c.dataqsiz = uint(size)
// ...
}

创建 chan 的过程主要就是给 hchan 分配内存的过程:

  • 非缓冲 chan,只需要分配 hchan 结构体所需要的内存,无需分配环形队列内存(数据会直接从 sender 复制到 receiver
  • 缓冲 chan(不包含指针),分配 hchan 所需要的内存和环形队列所需要的内存,其中 buf 会紧挨着 hchan
  • 缓冲 chan(含指针),hchan 和环形队列所需要的内存单独进行分配

对应到文章开头的图就是,底下的 hchanbuf 那两段内存。

发送数据

<- 语法糖

《深入理解 go chan》中,我们说也过,<- 这个操作符号是一种语法糖, 实际上,<- 会被编译成一个函数调用,对于发送操作而言,c <- x 会编译为对下面的函数的调用:

1
2
3
4
5
// elem 是被发送到 chan 的数据的指针。
// 对于 ch <- x,ch 对应参数中的 c,unsafe.Pointer(&x) 对应参数中的 elem。
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}

另外,对于 select 里面的调用,chansend 会返回一个布尔值给 select 用来判断是否是要选中当前 case 分支。 如果 chan 发送成功,则返回 true,则 select 的那个分支得以执行。(select...case 本质上是 if...else,返回 false 表示判断失败。)

chansend 第二个参数的含义

chansend 第二个参数 true 表示是一个阻塞调用,另外一种是在 select 里面的发送操作,在 select 中的操作是非阻塞的。

1
2
3
4
5
6
7
8
9
package main

func main() {
ch := make(chan int, 2)
ch <- 1 // 如果 ch 满了,会阻塞
select {
case ch <- 3: // 非阻塞
}
}

select 中对 chan 的读写是非阻塞的,不会导致当前协程阻塞,如果是因为 chan 满或者空无法发送或接收, 则不会导致阻塞在 case 的某一个分支上,还可以继续判断其他 case 分支。

select 中的 send 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// go 代码:
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// 实际效果:
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
// select 里面往 chan 发送数据的分支,返回的 selected 表示当前的分支是否被选中
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}

chansend 发送实现

  1. 发送到 nil chanselect 中发送不阻塞,其他情况阻塞)

如果是在 selectcase 里面发送,则不会阻塞,其他情况会导致当前 goroutine 挂起,永远阻塞

示例代码:

1
2
3
4
5
6
7
8
// 下面的代码运行会报错:
var ch chan int
// 发送到 nil chan 会永久阻塞
ch <- 1
select {
// 这个发送失败,但是不会阻塞,可继续判断其他分支。
case ch <- 3:
}
  1. 发送到满了的 chanselect 中发送不阻塞,其他情况阻塞)

对于无缓冲而且又没有 receiver,或者是有缓冲但是缓冲满了的情况,发送也会阻塞(我们称其为 full,也就是满了,满了的 chan 是放不下任何数据了的,所以就无法再往 chan 发送数据了):

receiver 表示等待从 chan 接收数据的协程。

对于满了的 chan,什么时候可以再次发送呢?那就是receiver 接收数据的时候chan 之所以会满就是因为没有 receiver,也就是没有从 chan 接收数据的协程。

A. 对于无缓冲的 chan,在满了的情况下,当有 receiver 来读取数据的时候,数据会直接从 sender 复制到 receiver 中:

B. 对于有缓冲,但是缓冲满了的情况(图中 chan 满了,并且有两个 g 正在等待写入 chan):

这个发送过程大概如下:

  • receiverchan 中获取到 chan 队头元素,然后 chan 的队头元素出队。
  • 发送队列 sendq 对头元素出队,将其要发送的数据写入到 chan 缓冲中。最后,sendq 只剩下一个等待写入 chang

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

// 注意:以下代码可能不能正常执行,只是为了描述问题。
func main() {
// 情况 2.A.
var ch1 = make(chan int) // 无缓冲的 chan
ch1 <- 1 // 阻塞
select {
// 不阻塞,但是不会执行这个分支
case ch1 <- 1:
}

// 情况 2.B.
var ch2 = make(chan int, 1) // 有缓冲,缓冲区容量为 1
ch2 <- 1 // 1 写入之后,ch2 的缓冲区满了
go func() {
ch2 <- 2 // 阻塞,调用 gopark 挂起
}()
go func() {
ch2 <- 3 // 阻塞
}()
select {
// 不阻塞,但是不会执行这个分支
case ch2 <- 4:
}
}
  1. 发送到有缓冲,但是缓冲还没满的 chan(不阻塞,发送成功)

这种情况比较简单,就是将 sender 要发送的数据写入到 chan 缓冲区:

示例代码:

1
2
3
var ch = make(chan int, 1)
// 不阻塞,1 写入 chan 缓冲区
ch <- 1

chansend 源码解读

阻塞模式下,在发送的过程中,如果遇到无法发送成功的情况,会调用 gopark 来将协程挂起,然后当前协程陷入阻塞状态。

非阻塞模式下(select),在发送过程中,任何无法发送的情况,都会直接返回 false,表示发送失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// 参数说明:
// c 表示 hchan 实例
// ep 表示要发送的数据所在的地址
// block 是否是阻塞模式(select 语句的 case 里面的发送是非阻塞模式,其他情况是阻塞模式)
// 非阻塞模式下,遇到无法发送的情况,会返回 false。阻塞模式下,遇到无法发送的情况,协程会挂起。
// 返回值:表示是否发送成功。false 的时候,如果是 select 的 case,则表示没有选中这个 case。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 情况 1:nil chan
if c == nil {
// select 语句里面发送数据到 chan 的操作失败,直接返回 false,表示当前的 case 没有被选中。
if !block {
// select 分支没有被选中
return false
}
// 阻塞模式,协程挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

// ... 其他代码...

// 不获取锁的情况下快速失败。select 中 chan 满了的时候无法发送成功,直接返回 false,协程无需挂起。
// 场景:非阻塞模式、chan 未关闭、chan 已满(无缓冲且没有接收数据的协程、或者有缓冲但是缓冲区满)
if !block && c.closed == 0 && full(c) {
return false
}

// ... 其他代码...

// 获取锁
lock(&c.lock)

// 如果 chan 已经关闭,则释放锁并 panic,不能往一个已经关闭的 chan 发送数据
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// 情况 2.A,又或者是有缓冲但是缓冲区空,有一个正在等待接收数据的 receiver。
// 如果有协程在等待接收数据(说明 chan 缓冲区空、或者 chan 是无缓冲的)
// 则直接将元素传递给这个接收数据的协程,这样就避免了 sender -> chan -> receiver 这个数据复制的过程,效率更高。
// 返回 true 表示 select 的分支可以执行(发送成功)
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// 情况 3,发送到缓冲 chan,且 chan 未满
// 没有协程在等待接收数据。
// 缓冲区还有空余,则将数据写入到 chan 的缓冲区
if c.qcount < c.dataqsiz {
// 获取写入的地址
qp := chanbuf(c, c.sendx)
// 通过内存复制的方式写入
typedmemmove(c.elemtype, qp, ep)
// 写入的下标指向下一个位置
c.sendx++
// 如果到超出环形队列尾了,则指向第一个位置
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// chan 里面的元素个数加上 1
c.qcount++
// 释放锁
unlock(&c.lock)
// 发送成功,返回 true
return true
}

// 没有协程在接收数据,而且缓冲区满了。
// 如果是 select 语句里面的发送,则释放锁,返回 false
if !block {
unlock(&c.lock)
return false
}

// 发不出去,当前协程阻塞。
// 阻塞模式下,缓冲区满了,需要将当前协程挂起。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // chan 要操作的元素指针
mysg.waitlink = nil
mysg.g = gp // sudog 上的 g 属性
mysg.isSelect = false // 如果是 select,上面已经返回了,因此这里是 false
mysg.c = c // sudog 上的 c 属性
gp.waiting = mysg // g 正在等待的 sudog
gp.param = nil // 当通道操作唤醒被阻塞的 goroutine 时,它将 param 设置为指向已完成的阻塞操作的 sudog
c.sendq.enqueue(mysg) // 将 sudog 放入发送队列
// 在 chan 读写上阻塞的标志
gp.parkingOnChan.Store(true)
// 最关键的一步:将当前协程挂起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 保证 ep 指向的地址不被垃圾回收器回收
KeepAlive(ep)

// ...被唤醒了之后的一些收尾操作...

return true
}

// 参数说明:c 是 chan 实例,sg 是等待接收数据的 g,ep 是被发送进 chan 的数据,unlockf 是释放锁的函数。
// 空 chan 上发送,会直接发送给等待接收数据的协程。
// ep 指向的值会被复制到 sg 中(ep -> sg,ep 是被发送的值,sg 是要接收数据的 g)。
// 接收数据的协程会被唤醒。
// 通道 c 必须是空的并且获取了锁。send 会通过 unlockf 来释放锁。
// sg 必须已从 c 中退出队列(从 recvq 这个接收队列中移除)。
// ep 必须不能为 nil,同时指向堆或者调用者的栈。
// sg 是接收队列上的 g。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// ...其他代码...
// 如果没有忽略返回值,将值直接从 ep 复制到 sg 中
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
// 释放锁
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 最关键的一步:唤醒等待队列中的那个接收到数据的 g
//(也就是之前因为接收不到数据而被阻塞的那个 g)
goready(gp, skip+1)
}

// 参数:t 是 chan 的元素类型,sg 是接收数据的 g(协程),src 是被发送的数据的指针。
// 场景:无缓冲 chan、有缓冲但是缓冲区没数据。
// 作用:将数据直接从发送数据的协程复制到接收数据的协程。
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// 将 ep 的值直接复制到 sg 中
memmove(dst, src, t.size)
}

// full 报告 c 上的发送是否会阻塞(即通道已满)。
func full(c *hchan) bool {
// c.dataqsiz 是不可变的(创建 chan 后不会再去修改)
// 因此在 chan 操作期间的任何时间读取都是安全的。
if c.dataqsiz == 0 {
// 如果是非缓冲 chan,则看接收队列有没有数据,有则表明满了(没有正在发送的 g)
return c.recvq.first == nil
}
// 如果是缓冲 chan,只需要比较实际元素总数跟缓冲区容量即可
return c.qcount == c.dataqsiz
}

接收数据

<- 语法糖

在发送数据的那一节我们提到了,ch <- x 编译之后,实际上是对 chansend1 的函数调用。同样的,在接收数据的时候, <- 这个操作符也会根据不同情况编译成不同的函数调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// elem 是用来保存从 c 中接收到的值的地址的指针
// <- c 编译器处理之后实际上就是下面的这个函数调用。(从通道接收,但是忽略接收到的值)
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}

// received 表示是否是从 chan 中接收到的(如果 chan 关闭,则接收到的是零值,received 是 false)
// v, ok := <-c 编译之后的函数(从通道接收,第一个 v 对应 elem,第二个 ok 对应 received)
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

// select 里面的接收操作:
//
// select {
// case v, ok = <-c:
// ... foo
// default:
// ... bar
// }
//
// 实际 go 实现
//
// if selected, ok = selectnbrecv(&v, c); selected {
// ... foo
// } else {
// ... bar
// }
//
// select 里面从 chan 接收数据的分支,返回的 selected 表示当前的分支是否被选中,received 表示是否有数据被接收到
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}

还需要再提醒一下的是:chan 关闭之后,并且 chan 缓冲区所有数据被接收完之后,received 才会是 false,并不是一关闭 received 马上返回 false

chanrecv 函数 block 参数的含义

chansend 中的 block 参数的作用一样,用来判断是否是 select 模式下的接收操作,如果是,则在需要阻塞的时候不会阻塞,取而代之的是直接返回。

chanrecv 接收数据实现

  1. nil chan 接收(select 中接收不阻塞,其他情况阻塞)

nil chan 中读取的时候,如果是阻塞模式,会调用 gopark 将协程阻塞起来。

示例代码:

1
2
var ch chan int
<-ch
  1. 从空 chan 接收(select 中接收不阻塞,其他情况阻塞)

判断空的条件为:无缓冲并且没有等待发送数据的 g,或者有缓冲但是缓冲区无数据。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

// 注意:以下代码执行不了,只是展示一下实际中对应的代码
func main() {
// 情况 1,无缓冲的 chan,空的
var ch1 = make(chan int)
<-ch1 // 阻塞
select {
// 不阻塞,但是该分支不会执行
case <-ch1:

}

// 情况 2,有缓冲的 chan,空的
var ch2 = make(chan int, 1)
<-ch2 // 阻塞
select {
// 不阻塞,但是该分支不会执行
case <-ch2:

}
}
  1. 从缓冲区满的 chan 接收(不会阻塞,这个时候 sendq 一定不为空)

这种情况不会阻塞,上面已经有图了,这里不再贴了。

  1. 从缓冲区不满的 chan 接收(不会阻塞)

示例代码:

1
2
3
4
5
6
7
8
package main

func main() {
var ch = make(chan int, 2)
ch <- 1
// 从缓冲区没满的 chan 接收
<-ch
}

chanrecv 源码解读

chanrecv 函数:

  • 参数:cchan 实例,ep 是用来接收数据的指针,block 表示是否是阻塞模式。
  • 返回值:selected 表示 select 语句的 case 是否被选中,received 表示接收到的值是否有效。
  • 功能:从 c 这个通道接收数据,同时将接收到的数据写入到 ep 里。

概览:

  • ep 可能是 nil,这意味着接收到的值被忽略了(对应 <-c 这种形式的接收)。
  • 如果是非阻塞模式,并且通道无数据,返回 (false, false),也就是 select 语句中的 case 不会被选中。
  • 否则,如果 c 关闭了,会对 ep 指向的地址设置零值,然后返回 (true, false)。如果是 select 语句,意味被选中,
  • 但是 receivedfalse 表明返回的数不是通道关闭之前发送的。
  • 否则,将从通道中获取到的值写入 ep 指向的地址,并且返回 (true, true)
  • 一个非 nilep 必须指向堆或者调用者的栈。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// 从 c 读取数据,写入到 ep 指向的地址。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
// c 是 nil chan
if c == nil {
// select 里面的 case 不会被选中
if !block {
return
}
// 阻塞模式时,协程挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
// 在实际执行的时候,如果其他协程都执行完了,只剩下这一个协程(又或者全部协程都是睡眠状态,并且无法被唤醒的那种),那么会报错:
// "fatal error: all goroutines are asleep - deadlock!"
throw("unreachable")
}

// 如果是非阻塞模式(select),并且 c 是空的
if !block && empty(c) {
// chan 未关闭,并且是空的,返回 false,false
if atomic.Load(&c.closed) == 0 {
return
}

// chan 已经关闭,并且 chan 是空的
if empty(c) {
// ...
// 返回一个零值
if ep != nil {
typedmemclr(c.elemtype, ep)
}
// select 分支被选中,但是返回值是无效的,是一个零值
return true, false
}
}
// ...

// 获取锁
lock(&c.lock)

// chan 已关闭
if c.closed != 0 {
// chan 已经关闭,同时也没有数据
if c.qcount == 0 {
// ...
// 释放锁
unlock(&c.lock)
if ep != nil {
// 设置零值
typedmemclr(c.elemtype, ep)
}
// select 的分支被选中,但是返回值无效
return true, false
}
} else {
// chan 未关闭,并且有一个等待发送的元素(对应情况:chan 是满的或者无缓冲而且没有 receiver)
// 如果无缓冲:则将元素直接从 sender 复制到 receiver 中。
// 否则:意味着 c 的缓冲区满了,从环形队列中接收值,将 sg 需要发送的值添加到环形队列尾,
// 实际上这个时候,队列头和队列尾都是同一个位置,因为队列满了。
// 只不过,队列头和队列尾指向的位置会发生变化(都加 1,然后对缓冲区长度取模)。
if sg := c.sendq.dequeue(); sg != nil {
// 找到一个 sender。
// 如果无缓冲,直接从 sender 复制到 receiver
// 否则,环形队列对头元素复制给 receiver,sender 要发送的元素复制进环形队列队尾。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
// select 分支被选中,接收成功,并且接收的值是有效的。
return true, true
}
}

// 缓冲区有数据,并且缓冲区没满
if c.qcount > 0 {
// qp 是被接收元素的地址
qp := chanbuf(c, c.recvx)
// ...
// 将 qp 指向的值复制到 ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清空队列中 ep 的空间(设置为零值)
typedmemclr(c.elemtype, qp)
// 被接收的下标指向下一个元素
c.recvx++
// 环形队列,回到开头
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 缓冲区长度减 1
c.qcount--
// 释放锁
unlock(&c.lock)
// select 分支被选中,并且接收的值是有效的。
return true, true
}

// 缓冲区空的,并且是非阻塞(select)
if !block {
// 释放锁
unlock(&c.lock)
// 返回 false,false
return false, false
}

// 缓冲区空,并且是阻塞模式,同时没有等待发送的 g

// 没有 sender,阻塞
gp := getg()
mysg := acquireSudog()
// ...
// c 的 recvq,也就是等待接收的队列,在队尾添加当前的 g
c.recvq.enqueue(mysg)
// ...
// g 挂起,等待下一个发送数据的协程
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

// ... 被唤醒后的操作 ...
return true, success
}

// recv 处理缓冲区已满的 chan 的接收操作(或者无缓冲,这个函数处理这两种情况)。
// 有两部分:
// 1. 等待发送数据的协程(sender),会将其要发送的数据放入 chan 中,然后这个协程会被唤醒
// 2. 被接收协程接收的值会写入到 ep 中
//
// 对于同步 chan(无缓冲 chan),两个值是同一个。
// 对于异步 chan,接收者从 chan 的缓冲区获取数据,发送方的输入放入 chan 缓冲区。
// 通道 c 必须已满并锁定。recv 会使用 unlockf 来解锁 c。
// sg 必须已经从 c 中移除(准确来说是 c.sendq)。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果无缓冲区
if c.dataqsiz == 0 {
// ...
// 直接将 sender 的要发送的值复制到 ep
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
// 有缓冲区,但是缓冲区满了。
// 从队列头获取元素,将要发送的值放入队列尾。(实际上操作的是同一个位置,因为环形队列满了)
// 需要获取的值的指针地址
qp := chanbuf(c, c.recvx)
// ...
// 如果需要接收值,则将 qp 复制到 ep(没有忽略返回值)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将要发送的值写入到 qp(sendq 对头元素要发送的值写入到 qp,也就是 chan 刚刚空出来的位置)
typedmemmove(c.elemtype, qp, sg.elem)
// 队列头、尾指针移动
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
// 释放锁
unlockf()
// ...
// 唤醒协程(这个被唤醒的协程是之前因为发送不出去被阻塞的协程)
goready(gp, skip+1)
}

// 将数据直接从 sender 复制到 receiver
// 场景:发送到无缓冲的 chan
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// dst 是 receiver 栈里保存接收值的地址,src 是 sender 栈里要被发送的值的地址
memmove(dst, src, t.size)
}

关闭 chan

chan 关闭的过程比较简单,修改 closed 为 1,然后唤醒发送队列和接收队列里面的 g,如果发送队列有 g,被唤醒之后会 panic,因为不能往一个已经关闭的 chan 发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 关闭 chan
func closechan(c *hchan) {
// 不能关闭 nil chan
if c == nil {
panic(plainError("close of nil channel"))
}

// 开启锁
lock(&c.lock)
if c.closed != 0 {
// chan 已经关闭,panic,不能重复关闭。释放锁
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

// ...
// 设置 closed 标志
c.closed = 1

// gList 用来保存阻塞在 chan 上的 g(链表,包括了 sender 和 receiver)
var glist gList

// 释放所有等待读取 chan 的协程(解除阻塞状态)
for {
// recvq 队头元素出队
sg := c.recvq.dequeue()
if sg == nil {
// sendq 已经没有元素了
break
}
// 关闭之后,从 chan 接收到的都是零值
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
// ...
glist.push(gp)
}

// 释放所有正在等待写入 chan 的协程(解除阻塞状态,这些协程会 panic)
for {
// sendq 队头元素出队
sg := c.sendq.dequeue()
if sg == nil {
// sendq 已经没有元素了
break
}
// ...
glist.push(gp)
}
// 释放锁
unlock(&c.lock)

// 将所有等待的协程修改为就绪态
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// g 状态修改为可运行状态
goready(gp, 3)
}
}

对于实际开发的作用

在上一篇文章和本文中,花了很大的篇幅来讲述 chan 的设计、实现与使用,这么多东西对我们有什么用呢?

其中非常重要的一个作用是,清楚地了解 chan 的工作机制,便于我们对程序实际运行情况进行分析, 尤其是一些非常隐晦的读写 chan 场景,毕竟稍有不慎就会导致协程泄漏,这对进程影响可能是非常大的。

比如下面的这种代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"runtime"
"time"
)

func main() {
for i := 0; i < 10000; i++ {
time.Sleep(time.Second)
go func() {
// 永远阻塞,协程泄漏
var ch chan int
ch <- 1
}()
// 我们会看到协程数量逐渐增长。
// 但是这部分挂起的协程永远不会被调度。
fmt.Printf("goroutine count: %d\n", runtime.NumGoroutine())
}
time.Sleep(time.Hour)
}

tips:在 chan 读写的地方需要注意自己的写法会不会让 goroutine 永远陷入阻塞,或者长时间阻塞。

总结

  • chan 底层是 hchan 结构体。
  • go 语法里面的 <- 不过是语法糖,在编译的时候,会编译成 hchan 相关的方法调用。最终都会调用 chansend 或者 chanrecvselect...case 里面的 chan 读写最终也会编译为对 chansendchanrecv 的调用。
  • chan 总体设计:维护了三个队列:
    • hchan.buf: chan 中暂存 sender 发送数据的队列(在有 receiver 读取的时候会从这个队列中复制到 receiver 中)
    • hchan.recvq: 接收队列,存储那些尝试读取 channel 但被阻塞的 goroutine
    • hchan.sendq: 发送队列,存储那些尝试写入 channel 但被阻塞的 goroutine
  • 读写 chan 的协程阻塞是通过 gopark 实现的,而从阻塞态转换为可运行状态是通过 goready 实现的。
  • chan 读写操作阻塞的时候,如果是在 select 语句中,则会直接返回(表示当前的分支没有被选中),否则,会调用 gopark 挂起当前协程。
  • 在关闭 chan 的时候,会调用 goready 唤醒阻塞在发送或者接收操作上的 g(协程)。
  • 无缓冲 chan 的操作有点特殊,对于无缓冲 chan,必须同时有 senderreceiver 才能发送和接收成功,否则另一边都会陷入阻塞(当然,select 不会阻塞)。

go 里面,在实际程序运行的过程中,往往会有很多协程在执行,通过启动多个协程的方式,我们可以更高效地利用系统资源。 而不同协程之间往往需要进行通信,不同于以往多线程程序的那种通信方式,在 go 里面是通过 channel (也就是 chan 类型)来进行通信的, 实现的方式简单来说就是,一个协程往 channel 里面写数据,然后其他的协程可以从 channel 中将其读取出来。 (注意:文中的 chan 表示是 go 语言里面的 chan 关键字,而 channel 只是我们描述它的时候用的一个术语)

通道(chan)的模型

在开始讲 channel 之前,也许了解一下它要解决什么样的问题会比较好,所以先来聊聊一些背景知识。

关于通道,一个比较潦草的图大概是下面这个样子的:

在图中,协程 A 将消息 msg 写入到 channel 中,然后协程 Bchannel 中读取消息,如果 B 没来得及从中读取消息,那么消息会在 chan 中存留。

这就是 go 的哲学:通过通信来实现共享内存。这不同于以往的多线程程序,在多线程程序中,往往是一块内存在不同线程之间进行共享, 然后通过一些保护机制,保证不允许多个线程同时对这块内存进行读写,比如通过 synchronized 关键字。 可能很多人都没有真正写过多线程的程序,但好像我们都有一种共识,多线程不安全。

多线程为什么不安全?

这是因为我们的程序除了通过共享一段内存之外,每一个 CPU 核心都有它本地的缓存,而 CPU 上的缓存是不共享的, 而线程可以同时在不同的 CPU 上执行。CPU 的执行过程是,先从内存中读取数据到 CPU 中,CPU 做完计算再更新到内存中。 这样一来,就有可能存在不同线程对同一段内存同时读写的问题。

这是什么问题呢?比如,A 线程计算完了但是还没有写回内存的时候,B 线程从内存读取出了 A 线程写入计算结果前的数据, 但是按我们的逻辑,B 应该是拿 A 线程的结算结果来进行逻辑运算的,这样就会出现数据不一致了,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Main {
int a = 0;

public static void main(String[] args) throws InterruptedException {
Main main = new Main();
main.run();
}

// 将 a 加 1
private void add() {
a++;
}

public void run() throws InterruptedException {
// 启动两个线程来对 a 进行加 1 的操作
Thread t1 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
add();
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 10000; i++) {
add();
}
});
// 启动线程
t1.start();
t2.start();

// 等待线程结束
t1.join();
t2.join();

// 我们的预期结果是 20000,但是实际运行显示了 14965
System.out.println(a);
}
}

在上面的代码中,我们预期的运行结果是 20000 的,但是实际得到了 14965(实际上,每次执行结果都会不一样),这也就是我上面所说的问题, 其中有一个线程读取到了另一个线程的计算结果写入内存前的数据,也就是说,这个线程的计算结果被覆盖了, 因为线程将计算结果写回内存的时候是相互覆盖的。

所以我们可以回答刚才的问题了,多线程不安全是因为多个线程可以对同一段内存进行读写,这就存在其中一个线程还没来得及更新内存, 然后另一个线程读取到的数据是旧的。(也即数据竞争的问题)

具体可以看下图:

CPU 执行的时候,会需要将数据从内存读取到 CPU 中,计算完毕之后,再更新内存里面的数据。

错乱发生的过程大概如下:

  1. CPU 1 先计算完了,计算的结果是 a = 3,但是还没来得及写入内存
  2. CPU 2 也从内存里面获取 a 来进行计算,但是这个时候 a 还没有被 CPU 1 更新,所以 CPU 2 拿到的还是 2
  3. CPU 2 进行计算的时候,CPU 1 将它的计算结果写入了内存,所以这个时候内存中的 a 是 3
  4. CPU 2 计算完毕,将等于 2 的变量 a 加 1 得到结果 3
  5. CPU 2 将结果 3 写入到内存,这个时候 a 的内存被更新,但是结果依然是 3

一种可行的办法 - 锁

其中一种可行的办法就是,给 add 方法加上 synchronized 关键字:

1
2
3
private synchronized void add() {
a++;
}

这个时候,在我们的代码中,对 a 读写的代码都被 synchronized 保护起来了,在这段更新之后的代码中,我们得到了正确的结果 20000

a++ 其实包含了读和写两个操作,程序运行的时候,会先将 a 读取出来,将其加上 1,然后写回到内存中。

synchronized 是同步锁,它修饰的方法不允许多个线程同时执行。synchronized 锁的粒度可大可小,粒度太大的话对性能影响也较大。

正如我们所看到的那样,synchronized 允许修饰一段代码,但是在实际中我们往往只是想保护其中某一个变量而已, 如果直接使用 synchronized 关键字来修饰一大段代码,那就意味着一个线程在执行这段代码的时候,其他线程就只能等待, 但是实际上,其中那些不涉及数据竞争的代码我们也无法执行,这样效率自然会降低,具体降低多少,取决于我们 synchronized 块的代码有多大。

go 中的处理办法

上面我们说到的多线程是通过共享内存来进行通信的,而在 go 里面,采用了 CSP(communicating sequential processes)并发模型, CSP 模型用于描述两个独立的并发实体通过共享 channel(管道)进行通信的并发模型。

CSP 是一套很复杂的东西,go 语言并没有完全实现它,仅仅是实现了 processchannel 这两个概念。process 就是 go 语言 中的 goroutine,每个 goroutine 之间是通过 channel 通讯来实现数据共享的。

然后我们上面说到,java 里面的 synchronized 关键字的粒度可能会比较大,这个是相比 go 里面的 channel 而言的, 在 go 里面,我们的代码在通信过程中很常见的一种阻塞场景是:

  • goroutine 需要从 channel 读取数据才能继续执行,但是 channel 里面还没数据,这个时候 goroutine 需要等待(会阻塞)另一个 goroutinechannel 写入数据。

对于这种场景,它隐含的逻辑是,阻塞的这个 goroutine 需要等待其他 goroutine 的结果才能继续往下执行,也就是 CSP 中的 sequential。下图是实际运行中的 chan

我们上面的 chan 模型那个图,读和写都只有一个协程,但在实际中,读 chan 和写 chan 的协程都有一个队列来保存。 我们需要明确的一点事实是:队列中的协程会一个接一个执行,队列头的协程先执行,然后我们对 chan 的读写是按顺序来读写的,先取 chan 队列头的元素,然后下一个元素

对应到上面 java 这个例子,我们在 go 里面可以怎么做呢?我们先把没有锁的 java 代码先写成 go 的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import "fmt"

var a = 0

func add(ch chan int, done chan<- struct{}) {
for i := 0; i < 10000; i++ {
a++
}
done <- struct{}{}
}

func main() {
done := make(chan struct{}, 2)

// ch 充当协程之间同步的角色
ch := make(chan int, 1)
// 这里可以传任意数字
ch <- 1

go add(ch, done)
go add(ch, done)

// 等待 2 个协程执行完毕
<-done
<-done
fmt.Println(a) // 15504 每次结果不一样
}

在 go 里面,我们可以把 add 方法改成下面这个样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
func add(ch chan int, done chan<- struct{}) {
for i := 0; i < 10000; i++ {
// 阻塞,只有另外一个协程往 ch 里面写入数据的时候,
// <-ch 才得以解除阻塞状态
<-ch
// 这一行同一时刻只能一个协程执行
a++
// 往 ch 写入数据,
// 等待从 ch 中读取数据的协程得以继续执行
ch <- i
}
done <- struct{}{}
}

这种写法看起来很笨拙,我们在实际使用中可能会稍有不同,所以不需要太纠结这个例子的合理性,这里想表达的是:在 go 中,我们的协程使用 chan 的时候只会阻塞在 chan 读写的地方,其他代码不受影响,当然,这个例子也没能很好体现。

假设我们有很大一段代码,但是涉及到数据竞争的时候,协程只会阻塞在 chan 读写的那一行代码上。这样一来我们就不用通过锁来覆盖一大段代码。

这里,我们可以看到 chan 其中一个很明显的优势是,我们没有了 synchronized 那种大粒度的锁,我们的 goroutine 只会阻塞在某一个 channel 上, 在读取 channel 之前的代码,goroutine 都是可以执行的,这样就在语言层面帮我们解决了一个很大的问题, 因为粒度更小,我们的代码自然也就能处理更大的并发请求了。

进程的几种状态

在开始讲述 channel 之前,再来回忆一下进程的几种状态会便于我们理解。

我们知道,我们的电脑上,同一时刻会有很多进程一直在运行,但是我们也发现很多进程的 CPU 占用其实都是 0%,也就是不占用 CPU。 其实进程会有几种状态,进程不是一直在运行的,一般来说,会有 执行阻塞就绪 几种状态,进程不是运行态的时候,那它就不会占用你的 CPU,因此会看到 CPU 占用是 0%,它们之间的转换如下图:

  • 执行:这表示进程正在运行中,是正在使用 CPU 的进程。在就绪状态的进程会在得到 CPU 时间片的时候得以执行。
  • 阻塞:这表示进程因为某些需要的资源得不到满足而挂起了(比如,正在进行磁盘读写),这种状态下,是不用占用 CPU 资源的。
  • 就绪:这表示一个状态所需要的资源都准备好了,可以继续执行了。

进程的几种状态跟 channel 有什么关系?

在 go 里面,其实协程也存在类似的调度机制,在协程需要的资源得不到满足的时候,也会被阻塞,然后协程调度器会去执行其他可以执行的协程。

比如下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
done := make(chan struct{})
// 这个协程在 main 协程序阻塞的时候依然在执行
go func() {
// 陷入睡眠状态
time.Sleep(time.Second)
fmt.Println("done")
// 往 done 这个 chan 写入数据
done <- struct{}{}
}()
// main 协程陷入阻塞状态
<-done
}

在这个例子中 done <- struct{}{} 这一行往 done 这个 chan 写入了数据,之前一直在等待 chanmain 协程的阻塞状态解除,得以继续执行。

goroutine 在等待 chan 返回数据的时候,会陷入阻塞状态。一个因为读取 chan 陷入阻塞状态的 goroutine 在获取到数据的时候,会继续往后执行。

channel 是什么?

我们在文章开头的第一张图,其实不是很准确。在 go 里面,channel 实际上是一个队列(准确来说是环形队列),大概长得像下面这样:

队列我们都知道,我们可以从队列头读取数据,也可以将数据推入到队列尾。上图中,1 是队列头,当我们从 channel 读取数据的时候, 读取到的是 16 是队列尾,当我们往 channel 中写入数据的时候,写入的位置是 6 后面的那个空间。

channel 是一个环形队列,goroutine 通过 channel 通信的方式是,一个 goroutine 将数据写入队列尾,然后另一个 goroutine 将数据从队列头读数据。

如何使用 channel

我们再仔细看看上面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"time"
)

func main() {
done := make(chan struct{})
go func() {
time.Sleep(time.Second)
fmt.Println("done")
// 发送取消信号
done <- struct{}{}
}()
// 等待结束信号
<-done
}

这里面包含了使用 channel 的基本用法:

  • done := make(chan struct{}):创建 channel,在 go 里面是使用 chan 关键字来代表一个 channel 的。而在这个语句中,创建了一个接收 struct{} 类型数据的 chan
  • done <- struct{}{}:写入到 chan,这里,我们创建了一个空结构体,然后通过 <- 操作符将这个空结构体写入到了 chan 中。
  • <-done:从 chan 中读取数据,也是使用了 <- 操作符,然后我们丢弃了它的返回结果。

这段代码的执行过程如下图:

  1. CPU 1 上启动了 main 协程
  2. 接着在 main 协程中通过 go func 启动了一个新的协程,go 的调度机制允许不同的协程在不同的线程上执行,所以 main 执行的时候,go func 也在执行,然后,因为 done 这个 chan 中没有数据,所以 main 协程陷入阻塞。
  3. go func 在短暂的睡眠之后,输出了 done,然后向名字为 done 这个 chan 中发送了一个空结构体实例。
  4. done 里面没有写入数据之前,main 一直阻塞,在 go func 写入数据之后,main<-done,解除了阻塞状态,得以继续执行
  5. 56 因为可能是在不同的线程上执行的,所以哪一个先结束其实不一定。

下面详细说说 channel 的具体用法

创建 chan

chan 是 go 的关键字,channel 是我用来描述 chan 所表示的东西的一个术语而已,我们在 go 里面使用的话还是得用 chan 关键字。

创建 chan 是通过 make 关键字创建的:

1
ch := make(chan int)

make 函数的参数是 chan 然后加一个数据类型,这个数据类型是我们的 chan 这个环形队列里面所能存储的数据类型。 不能传递不同的类型进一个 chan 里面。

也可以传递第二个参数作为 chan 的容量,比如:

1
ch := make(chan int, 3)

这里第二个参数表明了 ch 这个 chan 到底能存储多少个 int 类型的数据。

不传递或者传 0 表示 chan 本身不能存储数据,go 底层会直接在两个 goroutine 之间传递,而不经过 chan 的复制。 (如果第二个参数大于 0,我们往 chan 写数据的时候,会先复制到 chan 这个数据结构,然后其他的 goroutinechan 中读取数据的时候,chan 会将数据复制到这个 goroutine 中)

chan 读写的几种操作

  • 写:ch <- x,将 x 发送到 channel 中
  • 读:x = <-ch,从 channel 中接收,保存到 x
  • 读,但是忽略返回值(用作协程同步,上面的例子就是):<-ch,从 ch 中接收,但是忽略接收到的结果
  • 读,并且判断是否是关闭前发送的:x, ok := <-ch,这里使用了两个返回值接收,第二个返回值表明了接收到的 x 是不是 chan 关闭之前发送进去的,true 就代表是。

需要注意的是 <-chch<- 这两个看起来好像一样,但是效果是完全不同的,ch 位于 <- 操作符右边的时候, 表示是

有一个简单区分的方法是,将 <- 想象为数据流动的方向,具体来说就是看数据是流向 chan 还是从 chan 流出,流向 chan 就是写入到 chan,从 chan 流出就是读取。

缓冲 chan 与非缓冲 chan

上面我们说到,创建 chan 的时候可以传递第二个参数表示 chan 的容量是多少,这个容量表示的是, 在没有 goroutine 从这个 chan 读取数据的时候,chan 能存放多少数据,也就是 chan 底层环形队列的长度。

下面描述了缓冲的实际场景:

无缓冲 chan

还是用我们上面的那段代码:

1
2
3
4
5
6
7
8
9
10
11
12
package main

import "fmt"

func main() {
done := make(chan struct{})
go func() {
fmt.Println("done")
done <- struct{}{}
}()
<-done
}

这里 make(chan struct{}),只有一个参数,所以 done 是一个无缓冲的 chan,这种 chan 会在发送的时候阻塞,直到有另一个协程从 chan 中获取数据。

有缓冲 chan

有缓冲的 chan 在协程往里面写入数据的时候,可以进行缓冲。缓冲的作用是,在需要读取 chan 的 goroutine 的处理速度比较慢的时候,写入 chan 的 goroutine 也可以持续运行,直到写满 chan 的缓冲区

上图的 chan 是一个有缓冲的 chan,在 chan 里面的数据还没来得及被接收的时候,chan 可以充当一个缓冲的角色。但是,如果 chan 的数据一直没有被接收,然后满了的时候,往 chan 写入数据的协程依然会陷入阻塞。但这种阻塞状态会在 chan 的数据被读取的时候解除。

下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"time"
)

func main() {
done := make(chan struct{})
// 定义一个缓冲数量为 2 的 chan
ch := make(chan int, 2)
go func() {
for {
// 模拟比较慢的处理速度
time.Sleep(time.Second)

i, ok := <-ch
// ok 为 false 表示 ch 已经关闭并且数据已经被读取完
// 这个时候中断循环
if !ok {
break
}

fmt.Printf("[%d] get from ch: %d\n", time.Now().Unix(), i)
}
// 处理完数据之后,发送结束的信号
done <- struct{}{}
}()
go func() {
// 在循环结束之后关闭 chan
defer close(ch)
for i := 0; i < 3; i++ {
// 在写入 2 个数之后,会陷入阻塞状态
// 直到上面那个协程从 ch 读取出数据,ch 才会有空余的地方可以继续接收数据
ch <- i
fmt.Printf("[%d] write to ch: %d\n", time.Now().Unix(), i)
}
}()
// 收到结束信号,解除阻塞状态,继续往下执行
<-done
}

输出如下:

1
2
3
4
5
6
[1669381752] write to ch: 0
[1669381752] write to ch: 1
[1669381753] get from ch: 0
[1669381753] write to ch: 2
[1669381754] get from ch: 1
[1669381755] get from ch: 2

我们可以看到,写入 chan 的协程在 1669381752 的时候没有写入了,然后在读取 chan 的协程从 chan 中读取了一个数出来后才能继续写入。

nil chan

chan 的零值是 nilclose 一个 nil 通道会引发 panic。往 nil 通道写入或从中读取数据会永久阻塞:

1
2
3
4
5
6
package main

func main() {
var ch chan int
<-ch
}

执行的时候会报错:fatal error: all goroutines are asleep - deadlock!

len 和 cap

  • len:通过 len 我们可以查询一个 chan 的长度,也就是有多少被发送到这个 chan 但是还没有被接收的值。
  • cap:通过 cap 可以查询一个容道的容量,也就是我们传给 make 函数的第二个参数,它表示 chan 最多可以容纳多少数据。

如果 channil,那么 lencap 都会返回 0。

chan 的方向

chan 还有一个非常重要的特性就是它是可以有方向的,这里说的方向指的是,数据的流向。在我们上面的例子中,数据既可以流入 chan,也可以从 chan 中流出,因为我们没有指定方向,没有指定那么 chan 就是双向的。

具体来说,有以下几种情况:

  • chan,没有指定方向,既可以读又可以写。
  • chan<-,只写 chan,只能往 chan 中写入数据,如果从中读数据的话,编译不会通过。
  • <-chan,只读 chan,只能从 chan 中读取数据,如果往其中写入数据的话,编译不会通过。

另外,无方向的 chan 可以转换为 chan<- 或者 <-chan,但是反过来不行

在实际使用 chan 的时候,在某些地方我们其实是只允许往 chan 里面写数据,然后另一个地方只允许从 chan 中读数据。比如下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import "fmt"

var done = make(chan struct{})

// ch 是只写 chan,如果在这个函数里面从 ch 读取数据编译不会通过
func producer(ch chan<- int) {
for i := 0; i < 3; i++ {
ch <- i
fmt.Printf("produce %d\n", i)
}
// 发送 3 个数之后,关闭 chan
close(ch)
}

// ch 是只读 chan,如果在这个函数里往 ch 写入数据编译不会通过
func consumer(ch <-chan int) {
for {
i, ok := <-ch
if !ok {
// chan 的数据已经被全部接收完,
// 发送 done 信号
done <- struct{}{}
break
}
fmt.Printf("consume %d\n", i)
}
}

func main() {
nums := make(chan int, 10)
go producer(nums)
go consumer(nums)
// 收到结束信号之后继续往下执行
<-done
}

在这个例子中,producer 这个协程里面往 chan 写入数据,写入 3 个数之后关闭,然后 consumer 这个协程序从 chan 读取数据, 在读取完所有数据之后,发送结束信号(通过 done 这个 chan),最后 main 协程收到 done 信号后退出。

这样有个好处就是,从语法层面限制了对 chan 的读写操作。而不用担心有误操作。

什么时候阻塞?什么时候不阻塞?

在开始这个话题之前,很有必要说一下,go 里面 chan 的一些实现原理,在 chan 的实现中,维护了三个队列:

  • 数据缓冲队列(chan):也就是上面说的环形队列,是一种先进先出结构(FIFO,"First In, First Out"),它的长度是 chan 的容量。此队列存放的都是同一种类型的元素。
  • 接收数据协程队列(recvq):当 chan 里面没有数据可以读取的时候,这个队列会有数据,这个队列中的协程都在等待从 chan 中读取数据。
  • 发送数据协程队列(sendq):当数据缓冲队列满了的时候(又或者如果是一个无缓冲的 chan),那么这个队列不为空,这个队列中的协程都在等待往 chan 中写入数据。

大家在实际使用的时候可以参考一下下图,下图列出了对 chan 操作的所有场景:

对于阻塞或者非阻塞,其实有一个很简单的判断标准,下面描述了所有会阻塞的情况:

  • 发送:如果没有地方能存放发送的数据,则阻塞,具体有下面几种情况:
    • nil chan
    • 有缓冲但是缓冲满了
    • 无缓冲并且没有协程在等待从 chan 中读取数据
  • 接收:如果没有可以读取的数据,则阻塞,具体有下面几种情况:
    • nil chan
    • 有缓冲,但是缓冲区空的
    • 无缓冲,但是没有协程正在往 chan 中发送数据

大家觉得抽象可以结合下面这个图想象一下:

结合现实场景想象一下,我们可以把 chan 想象成为配送员,sendq 想象为商家,recvq 想象成用户,配送员装餐点的箱子想象成缓冲区:

一个假设的前提:假设商家只能在送出去一份餐点后,才能开始制作下一份餐点。

  • 发送
    • nil chan。没有配送员了,商家的餐点肯定是送不出去了,商家只能等着关门大吉了。
    • 有缓冲但是缓冲满了。配送员会有一个箱子(缓冲区)来存放外卖,但是这个箱子现在满了,虽然接了一个单,但是没有办法再从商家那里取得外卖来送了
    • 无缓冲并且没有协程在等待从 chan 中读取数据。这个外卖是用户自取的订单,但是用户联系不上。(当然现实中商家不用等,我们假设现在商家只能送出去一份后才能开始制作下一份)
  • 接收
    • nil chan。没有配送员,用户的餐没人送,用户只能等着饿死了。
    • 有缓冲,但是缓冲区空的。商家还没制作好餐点,配送员没有取到餐,这个时候用户打电话给配送员叫他快点送,但是这个时候配送员也没有办法,因为他也没有拿到用户的餐点。这个时候用户快饿死了,但也没有办法,只有干等着,先吃饱才能搬砖。
    • 无缓冲,但是没有协程正在往 chan 中发送数据。这天,用户是下了自取的订单,然后去到店里的时候,商家还没做好,这个时候,用户啥事也干不了,也只能等了。

需要注意的是,上图中发送和接收只有一个协程,但是在实际中,正如这一节开头讲的那样,发送和接收都维护了一个队列的。 对应到上面那个现实的例子,那就是配送员可以同时从多个商家那里取餐,也可以同时给多个用户送餐,这个过程,有可能多个商家在制作需要这个配送员配送的餐点,也有可能有多个用户在等着这个配送员送餐。

<- 操作符只是语法糖

在 go 里面我们操作 chan 的方式好像非常简单,就通过 <- 操作符就已经绰绰有余了,这也是 go 的设计理念吧,尽量把语言设计得简单。 (但是,简单并不容易)但是,从另外一个角度看,go 把对 chan 的操作简化成我们现在看到的这个样子,也说明了 chan 在 go 里面的地位(一等公民)。

在 go 中,chan 实际上是一个结构体(runtime/chan.go 里面的 hchan 结构体),而且,还是一个非常复杂的结构体,但是我们在使用的时候却非常简单, 这其实是 go 设计者给开发者提供的一种语法糖,直接在语法层面极大地简化了开发者对 chan 的使用,

如果没有这个语法糖,那就需要开发者自己去创建 hchan 结构体,然后发送或者接收的时候还需要调用这个结构体的方法。 相比之下,<- 就写一个操作符就行了,而且这个符号还非常形象,指向哪就代表了数据是流向 chan (写)还是从 chan 流出(读)。

for...range 语法糖

我们上面说过了,从 chan 读取数据的时候,可能需要用两个值来接收 chan 的返回值,第二个值用来判断接收到的值是否是 chan 关闭之前发送的。

for...range 语法也可以用来从 chan 中读取数据,它会循环,直到 chan 关闭,这样直接免去了我们判断的操作,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import "fmt"

func main() {
done := make(chan struct{})

nums := make(chan int)
go func() {
for i := 0; i < 3; i++ {
fmt.Printf("send %d\n", i)
nums <- i

}
close(nums)
}()

go func() {
// 传统写法
//for {
// num, ok := <-nums
// if !ok {
// break
// }
// fmt.Printf("receive %d\n", num)
//}

// range 语法糖
for num := range nums {
fmt.Printf("receive %d\n", num)
}
done <- struct{}{}
}()

<-done
}

select 语句里面使用 chan

go 里面有一个关键字 select,可以让我们同时监听几个 chan,在任意一个 chan 有数据的时候,select 里面的 case 块得以执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan int)
ch2 := make(chan int)

// ch1 会先收到数据
go func() {
time.Sleep(time.Second)
ch1 <- 1
}()
go func() {
time.Sleep(time.Second * 2)
ch2 <- 1
}()

// select 会阻塞,直到其中某一个分支收到数据
select {
case <-ch1:
// 执行这一行代码
fmt.Println("from ch1")
case <-ch2:
// 这一行不会被执行
fmt.Println("from ch2")
}
}

select-case 的用法类似于 switch-case,也有一个 default 语句,在 select 里面

  • 如果 default 之前的 case 都不满足,则执行 default 块的代码。
  • 如果没有 default 语句,则会一直阻塞,直到某一个 case 上面的 chan 返回(有数据、或者 chan 被关闭都会返回)

当然,case 后面可以从 chan 读取数据,也可以往 chan 写数据,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan int)
// 往 nil chan 写入数据会阻塞
var ch2 chan int

// ch1 会先收到数据
go func() {
time.Sleep(time.Second)
ch1 <- 1
}()

// 会阻塞,直到其中一个 case 返回
select {
case <-ch1:
// 执行这一行代码
fmt.Println("from ch1")
case ch2 <- 1: // 永远不会满足,因为 ch2 是 nil
fmt.Println("from ch2")
}
}

select 的另外一种很常见的用法是,等待一个 chan 和一个定时器(实现控制超时的功能),比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
"fmt"
"time"
)

func main() {
ch1 := make(chan int)

// ch1 一秒后才收到数据
go func() {
time.Sleep(time.Second)
ch1 <- 1
}()

select {
case <-ch1:
fmt.Println("from ch1")
case <-time.After(time.Millisecond * 100):
// 执行如下代码,因为这个 case 在 100ms 后就返回了
fmt.Println("from ch2")
}
}

如果我们需要控制某些操作的超时时间,那么就可以在时间到了之后,做一些清理操作,然后终止一些工作,最后退出协程。

总结

  • go 里面通过 chan 来实现协程之间的通信,chan 大概就是一个协程给另一个协程发送信息的代理。
  • 多线程程序执行的时候,因为有 CPU 缓存,然后需要对同一块内存进行并发读写,可能会导致数据竞争的问题。
  • 在很多语言中,都提供了锁的机制,来保护一片内存同一时刻只能一个线程操作,比如 java 里面的 synchronized 关键字。
  • go 里面很多情况下,在不同协程之间通信都是使用 chan 来实现的。
  • 进程会有阻塞态、运行态,go 里面的协程也有阻塞的状态,当需要的资源得不到满足的时候就会陷入阻塞。比如等待别的协程往 chan 里面写入数据。
  • chan 的几种常见操作:make 创建、<-chan 读、chan<- 写、len 获取 chan 中未读取的元素个数、cap 获取 chan 的缓冲区容量。
  • chan 类型上不加 <- 表示是一个可读可写的 chan<-chan T 表示只读 chanchan<- T 表示只写 chan,双向的 chan 可以转换为只读或者只写 chan,但是反过来不行,只读 chan 和只写 chan 之间也不能相互转换。
  • 协程的阻塞跟不阻塞,很简单的判断方式就是,发送的时候就看有没有地方能接得住,接收的时候就看有没有数据可以拿,没有则陷入阻塞。
  • <- 是 go 语言在设计层面提供给开发者的一种语法糖,chan 底层是一个很复杂的结构体。
  • for...range 结构在遍历 chan 的时候不用判断返回值是否有效,因为返回值无效的时候会退出循环。
  • 我们可以通过 select 来同时等待多个 chan 的操作返回。

学过 C 的朋友应该知道,有一种类型是指针类型,指针类型存储的是一个内存地址,通过这个内存地址可以找到它指向的变量。 go 虽然是一种高级语言,但是也还是给开发者提供了指针的类型 unsafe.Pointer,我们可以通过它来直接读写变量的内存。 正因为如此,如果我们操作不当,极有可能会导致程序崩溃。今天就来了解一下 unsafe 里所能提供的关于指针的一些功能, 以及使用 unsafe.Pointer 的一些注意事项。

内存里面的二进制数据表示什么?

我们知道,计算机存储数据的时候是以二进制的方式存储的,当然,内存里面存储的数据也是二进制的。二进制的 01 本身其实并没有什么特殊的含义。

它们的具体含义完全取决于我们怎么去理解它们,比如 0010 0000,如果我们将其看作是一个十进制数字,那么它就是 32, 如果我们将其看作是字符,那么他就是一个空格(具体可参考 ASCII 码表)。

对应到编程语言层面,其实我们的变量存储在内存里面也是 01 表示的二进制,这些二进制数表示是什么类型都是语言层面的事, 更准确来说,是编译器来处理的,我们写代码的时候将变量声明为整数,那么我们取出来的时候也会表示成一个整数。

这跟本文有什么关系呢?我们下面会讲到很多关于类型转换的内容,如果我们理解了这一节说的内容,下面的内容会更容易理解

在我们做类型转换的时候,实际上底层的二进制表示是没有变的,变的只是我们所看到的表面的东西。

内存布局

有点想直接开始讲 unsafe 里的 Pointer 的,但是如果读者对计算机内存怎么存储变量不太熟悉的话, 看起来可能会比较费解,所以在文章开头会花比较大的篇幅来讲述计算机是怎么存储数据的, 相信读完会再阅读后面的内容(比如指针的算术运算、通过指针修改结构体字段)会没有那么多障碍。

变量在内存中是怎样的?

我们先来看一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"fmt"
"unsafe"
)

func main() {
var a int8 = 1
var b int16 = 2
// unsafe.Sizeof() 可以获取存储变量需要的内存大小,单位为字节
// 输出:1 2
// int8 意味着,用 8 位,也就是一个字节来存储整型数据
// int16 意味着,用 16 位,也就是两个字节来存储整型数据
fmt.Println(unsafe.Sizeof(a), unsafe.Sizeof(b))
}

在这段代码中我们定义了两个变量,占用一个字节的 a 和占用两个字节的 b,在内存中它们大概如下图:

我们可以看到,在图中,a 存储在低地址,占用一个字节,而 b 存储在 a 相邻的地方,占用两个字节。

结构体在内存中是怎样的?

我们再来看看结构体在内存中的存储:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"fmt"
"unsafe"
)

type Person struct {
age int8
score int8
}

func main() {
var p Person
// 输出:2 1 1
// 意味着 p 占用两个字节,
// 其中 age 占用一个字节,score 占用一个字节
fmt.Println(unsafe.Sizeof(p), unsafe.Sizeof(p.age), unsafe.Sizeof(p.score))
}

这段代码中,我们定义了一个 Person 结构体,其中两个字段 agescore 都是 int8 类型,都是只占用一个字节的,它的内存布局大概如下图:

我们可以看到,在内存中,结构体字段是占用了内存中连续的一段存储空间的,具体来说是占用了连续的两个字节。

指针在内存中是怎么存储的?

在下面的代码中,我们定义了一个 a 变量,大小为 1 字节,然后我们定义了一个指向 a 的指针 p

需要先说明的是,下面有两个操作符,一个是 &,这个是取地址的操作符,var p = &a 意味着,取得 a 的内存地址,将其存储在变量 p 中, 另一个操作符是 *,这个操作符的意思是解指针,*p 就是通过 p 的地址取得 p 指向的内容(也就是 a)然后进行操作。 *p = 4 意味着,将 p 指向的 a 修改为 4。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"fmt"
"unsafe"
)

func main() {
var a int8 = 3
// ... 其他变量
var p = &a
fmt.Println(unsafe.Sizeof(p))
fmt.Println(*p) // 3
*p = 4
fmt.Println(a) // 4
}

需要注意的是,这里面不再是一个单元格一个字节了,p(指针变量)是要占用 8 个字节的(这个跟机器有关,我的是 64 位的 CPU,所以是 8 个字节)。

从这个图,我们可以得知,指针实际上存储的是一个内存地址,通过这个地址我们可以找到它实际存储的内容。

结构体的内存布局真的是我们上面说的那样吗?

上面我们说了,下面这个结构体占用了两个字节,结构体里面的一个字段占用一个字节:

1
2
3
4
type Person struct {
age int8
score int8
}

然后我们再来看看下面这个结构体,它会占用多少字节呢?

1
2
3
4
type Person struct {
age int8
score int16 // 类型由 int8 改为了 int16
}

也许我们这个时候已经算好了 1 + 2 = 3,3 个字节不是吗?说实话,真的不是,它会占用 4 个字节, 这可能会有点反常理,但是这跟计算机的体系结构有着密切的关系,先看具体的运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"fmt"
"unsafe"
)

type Person struct {
age int8
score int16
}

func main() {
var p Person
// 输出:4 1 2
// 意味着 p 占用 4 个字节,
// 其中 age 占用 2 个字节,score 占用 2 个字节
fmt.Println(unsafe.Sizeof(p), unsafe.Sizeof(p.age), unsafe.Sizeof(p.score))
}

为什么会这样呢?因为 CPU 运行的时候,需要从内存读取数据,而从内存取数据的过程是按字读取的,如果我们数据的内存没有对齐, 则可能会导致 CPU 本来一次可以读取完的数据现在需要多次读取,这样就会造成效率的下降。

关于内存对齐,是一个比较庞大的话题,这里不展开了,我们需要明确的是,go 编译器会对我们的结构体字段进行内存对齐。

内存对我们的影响就是,它可能会导致结构体所占用的空间比它字段类型所需要的空间大(所以我们做指针的算术运算的时候需要非常注意), 具体大多少其实我们其实不需要知道,因为有方法可以知道,哪就是 unsafe.Offsetof,下面会说到。

uintptr 是什么意思?

在开始下文之前,还是得啰嗦一句,uintptr 这种命名方式是 C 语言里面的一种类型命名的惯例, u 前缀表示是无符号数(unsigned),ptr 是指针(pointer)的缩写,这个 uintptr 按这个命名惯例解析的话,就是一个指向无符号整数的指针。

另外,还有另外一种命名惯例,就是在整型类型的后面加上一个表示占用 bit 数的数字,(1字节=8bit) 比如 int8 表示一个占用 8 位的整数,只可以存储 1 个字节的数据,然后 int64 表示的是一个 8 字节数(64位)。

unsafe 包定义的三个新类型

ArbitraryType

type ArbitraryType int,这个类型实际上是一个 int 类型,但是从名字上我们可以看到,它被命名为任意类型, 也就是说,他会被我们用来表示任意的类型,具体怎么用,是下面说的 unsafe.Pointer 用的。

IntegerType

type IntegerType int,它表示的是一个任意的整数,在 unsafe 包中它被用来作为表示切片或者指针加减的长度。

Pointer

type Pointer *ArbitraryType,这个就是我们上一节提到的指针了,它可以指向任何类型的数据(*ArbitraryType)。

内存地址实际上就是计算机内存的编号,是一个整数,所以我们才可以使用 int 来表示指针。

unsafe 包计算内存的三个方法

这几个方法在我们对内存进行操作的时候会非常有帮助,因为根据这几个方法,我们才可以得知底层数据类型的实际大小。

Sizeof

计算 x 所需要的内存大小(单位为字节),如果其中包含了引用类型,Sizeof 不会计算引用指向的内容的大小。

有几种常见的情况(没有涵盖全部情况):

  • 基本类型,如 int8intSizeof 返回的是这个类型本身的大小,如 unsafe.Sizeof(int8(x)) 为 1,因为 int8 只占用一个字节。
  • 引用类型,如 var x *intSizeof(x) 会返回 8(在我的机器上,不同机器可能不一样),另外就算引用指向了一个复合类型,比如结构体,返回的还是 8(因为变量本身存储的只是内存地址)。
  • 结构体类型,如果是结构体,那么 Sizeof 返回的大小包含了用于内存对齐的内存(所以可能会比结构体底层类型所需要的实际大小要大)
  • 切片,Sizeof 返回的是 24(返回的是切片这个类型所需要占用空间的大小,我们需要知道,切片底层是 slice 结构体,里面三个字段分别是 array unsafe.Pointerlen intcap int,这三个字段所需要的大小为 24)
  • 字符串,跟切片类似,Sizeof 会返回 16,因为字符串底层是一个用来存储字符串内容的 unsafe.Pointer 指针和一个表示长度的 int,所以是 16。

这个方法返回的大小跟机器密切相关,但一般开发者的电脑都是 64 位的,调用这个函数的值应该跟我的机器上得到的一样。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
"fmt"
"unsafe"
)

type Person struct {
age int8
score int16
}

type School struct {
students []Person
}

func main() {
var x int8
var y int
// 1 8
// int8 占用 1 个字节,int 占用 8 个字节
fmt.Println(unsafe.Sizeof(x), unsafe.Sizeof(y))

var p *int
// 8
// 指针变量占用 8 个字节
fmt.Println(unsafe.Sizeof(p))

var person Person
// 4
// age 内存对齐需要 2 个字节
// score 也需要两个字节
fmt.Println(unsafe.Sizeof(person))

var school School
// 24
// 只有一个切片字段,切片需要 24 个字节
// 不管这个切片里面有多少数据,school 所需要占用的内存空间都是 24 字节
fmt.Println(unsafe.Sizeof(school))

var s string
// 16
// 字符串底层是一个 unsafe.Pointer 和一个 int
fmt.Println(unsafe.Sizeof(s))
}

Offsetof 方法

这个方法用于计算结构体字段的内存地址相对于结构体内存地址的偏移。具体来说就是,我们可以通过 &(取地址)操作符获取结构体地址。

实际上,结构体地址就是结构体中第一个字段的地址。

拿到了结构体的地址之后,我们可以通过 Offsetof 方法来获取结构体其他字段的偏移量,下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"unsafe"
)

type Person struct {
age int8
score int16
}

func main() {
var person Person
// 0 2
// person.age 是第一个字段,所以是 0
// person.score 是第二个字段,因为需要内存对齐,实际上 age 占用了 2 个字节,
// 因此 unsafe.Offsetof(person.score) 是 2,也就是说从第二个字节开始才是 person.score
fmt.Println(unsafe.Offsetof(person.age), unsafe.Offsetof(person.score))
}

我们上面也说了,编译器会对结构体做一些内存对齐的操作,这会导致结构体底层字段占用的内存大小会比实际需要的大小要大。 因此,我们在取结构体字段地址的时候,最好是通过结构体地址加上 unsafe.Offsetof(x.y) 拿到的地址来操作。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"unsafe"
)

type Person struct {
age int8
score int16
}

func main() {
var person = Person{
age: 10,
score: 20,
}
// {10 20}
fmt.Println(person)
// 取得 score 字段的指针
// 通过结构体地址,加上 score 字段的偏移量,得到 score 字段的地址
score := (*int16)(unsafe.Pointer(uintptr(unsafe.Pointer(&person)) + unsafe.Offsetof(person.score)))
*score = 30
// {10 30}
fmt.Println(person)
}

这个例子看起来有点复杂,但是没关系,后面会详细展开的,这里主要要说明的是:

我们通过 unsafe.Pointer 来操作结构体底层字段的时候,我们是通过 unsafe.Offsetof 来获取结构体字段地址偏移量的, 因为我们看到的类型大小并不是内存实际占用的大小,通过 Offsetof 拿到的结果是已经将内存对齐等因素考虑在内的了。 (如果我们错误的认为 age 只占用一个字节,然后将 unsafe.Offsetof(person.score) 替换为 1,那么我们就修改不了 score 字段了)

Alignof 方法

这个方法用以获取某一个类型的对齐系数,就是对齐一个类型的时候需要多少个字节。 这个对开发者而言意义不是非常大,go 里面只有 WaitGroup 用到了一下, 没有看到其他地方有用到这个方法,所以本文不展开了,有兴趣的自行了解。

unsafe.Pointer 是什么?

让我们再来回顾一下,Pointer 的定义是 type Pointer *ArbitraryType,也就是一个指向任意类型的指针类型。 首先它是指针类型,所以我们初始化 unsafe.Pointer 的时候,需要通过 & 操作符来将变量的地址传递进去。我们可以将其想象为指针类型的包装类型。

例子:

1
2
3
4
5
6
7
8
9
10
11
12
package main

import (
"fmt"
"unsafe"
)

func main() {
var a int
// 打印出 a 的地址:0xc0000240a8
fmt.Println(unsafe.Pointer(&a))
}

unsafe.Pointer 类型转换

在使用 unsafe.Pointer 的时候,往往需要另一个类型来配合,那就是 uintptr,这个 uintptr 在文档里面的描述是: uintptr 是一种整数类型,其大小足以容纳任何指针的位模式。这里的关键是 "任何指针", 也就是说,它设计出来是被用来存储指针的,而且其大小保证能存储下任何指针。

而我们知道 unsafe.Pointer 也是表示指针,那么 uintptrunsafe.Pointer 有什么区别呢?

只需要记住最关键的一点,uintptr 是内存地址的整数表示,而且可以进行算术运算,而 unsafe.Pointer 除了可以表示一个内存地址之外,还能保证其指向的内存不会被垃圾回收器回收,但是 uintptr 这个地址不能保证其指向的内存不被垃圾回收器回收。

我们先来看看与 unsafe.Pointer 相关的几种类型转换,这在我们下文几乎所有地方都会用到:

  • 任何类型的指针值都能转换为 unsafe.Pointer
  • unsafe.Pointer 可以转换为一个指向任何类型的指针值
  • unsafe.Pointer 可以转换为 uintptr
  • uintptr 可以转换为 unsafe.Pointer

例子(下面这个例子中输出的地址都是变量 a 所在的内存地址,都是一样的地址):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"unsafe"
)

func main() {
var a int
var p = &a

// 1. int 类型指针转换为 unsafe.Pointer
fmt.Println(unsafe.Pointer(p)) // 0xc0000240a8

// 2. unsafe.Pointer 转换为普通类型的指针
pointer := unsafe.Pointer(&a)
var pp *int = (*int)(pointer) // 0xc0000240a8
fmt.Println(pp)

// 3. unsafe.Pointer 可以转换为 uintptr
var p1 = uintptr(unsafe.Pointer(p))
fmt.Printf("%x\n", p1) // c0000240a8,没有 0x 前缀

// 4. uintptr 可以转换为 unsafe.Pointer
p2 := unsafe.Pointer(p1)
fmt.Println(p2) // 0xc0000240a8
}

如何正确地使用指针?

指针允许我们忽略类型系统而对任意内存进行读写,这是非常危险的,所以我们在使用指针的时候要格外的小心。

我们使用 Pointer 的模式有以下几种,如果我们不是按照以下模式来使用 Pointer 的话,那使用的方式很可能是无效的, 或者在将来变得无效,但就算是下面的几种使用模式,也有需要注意的地方。

运行 go vet 可以帮助查找不符合这些模式的 Pointer 的用法,但 go vet 没有警告也并不能保证代码有效。

以下我们就来详细学习一下使用 Pointer 的几种正确的模式:

1. 将 *T1 转换为指向 *T2Pointer

前提条件:

  • T2 类型所需要的大小不大于 T1 类型的大小。(大小大的类型转换为占用空间更小的类型)
  • T1T2 的内存布局一样。

这是因为如果直接将占用空间小的类型转换为占用空间更大的类型的话,多出来的部分是不确定的内容,当然我们也可以通过 unsafe.Pointer 来修改这部分内容。

这种转换允许将一种类型的数据重新解释为另外一种数据类型。下面是一个例子(为了方便演示用了 int32int8 类型):

在这个例子中,int8 类型不大于 int32 类型,而且它们的内存布局是一样的,所以可以转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import (
"fmt"
"unsafe"
)

func main() {
var a int32 = 2
// p 是 *int8 类型,由 *int32 转换而来
var p = (*int8)(unsafe.Pointer(&a))
var b int8 = *p
fmt.Println(b) // 2
}

unsafe.Pointer(&a) 是指向 aunsafe.Pointer(本质上是指向 int32 的指针),(*int8) 表示类型转换,将这个 unsafe.Pointer 转换为 (*int8) 类型。

觉得代码不好理解的可以看下图:

在上图,我们实际上是创建了一个指向了 a 最低位那 1 字节的指针,然后取出了这个字节里面存储的内容,将其存入了 b 中。

上面提到有一个比较重要的地方,那就是:转换的时候是占用空间大的类型,转换为占用空间小的类型,比如 int32int8 就是符合这个条件的, 那么如果我们将一个小的类型转换为大的类型会发生什么呢?我们来看看下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"
"unsafe"
)

type A struct {
a int8
}

type B struct {
b int8
c int8
}

func main() {
var a = A{1}
var b = B{2, 3}

// 1. 大转小
var pa = (*A)(unsafe.Pointer(&b))
fmt.Println(*pa) // {2}

// 2. 错误示例:小转大(危险,A 里面 a 后面的内存其实是未知的)
var pb = (*B)(unsafe.Pointer(&a))
fmt.Println(*pb) // {1 2}
}

大转小:*B 转换为 *A 的具体转换过程可以表示为下图:

在这个过程中,其实 ab 都没有改变,本质上我们只是创建了一个 A 类型的指针, 这个指针指向变量 b 的地址(但是 *pa 会被看作是 A 类型),所以 pa 实际上是跟 b 共享了内存。 我们可以尝试修改 (*pa).a = 3,我们就会发现 b.b 也变成了 3。

也就是说,最终的内存布局是下图这样的:

小转大:*A 转换为 *B 的具体转换过程可以表示为下图:

注意:这是错误的用法。(当然也不是完全不行)

*A 转换为 *B 的过程中,因为 B 需要 2 个字节空间,所以我们拿到的 pb 实际上是包含了 a 后面的 1 个字节, 但是这个字节本来是属于 b 变量的,这个时候 b*pb 都引用了第 2 个字节,这样依赖它们在修改这个字节的时候, 会相互影响,这可能不是我们想要的结果,而且这种操作非常危险。

2. 将 Pointer 转换为 uintptr(但不转换回 Pointer

Pointer 转换为 uintptr 会得到 Pointer 指向的内存地址,是一个整数。这种 uintptr 的通常用途是打印它。

但是,uintptr 转换回 Pointer 通常无效uintptr 是一个整数,而不是一个引用。将指针转换为 uintptr 会创建一个没有指针语义的整数值。 即使 uintptr 持有某个对象的地址,如果该对象移动,垃圾收集器也不会更新该 uintotr 的值, 也不会阻止该对象被回收。

如下面这种,我们取得了变量的地址 p,然后做了一些其他操作,最后再从这个地址里面读取数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import (
"fmt"
"unsafe"
)

func main() {
var a int = 10
var p = uintptr(unsafe.Pointer(&a))
// ... 其他代码
// 下面这种转换是危险的,因为有可能 p 指向的对象已经被垃圾回收器回收
fmt.Println(*(*int)(unsafe.Pointer(p)))
}

具体如下图:

只有下面的模式中转换 uintptrPointer 是有效的。

3. 使用算术运算将 Pointer 转换为 uintptr 并转换回去

如果 p 指向一个已分配的对象,我们可以将 p 转换为 uintptr 然后加上一个偏移量,再转换回 Pointer。如:

1
p = unsafe.Pointer(uintptr(p) + offset)

这种模式最常见的用法是访问结构体或者数组元素中的字段:

1
2
3
4
5
// 等价于 f := unsafe.Pointer(&s.f)
f := unsafe.Pointer(uintptr(unsafe.Pointer(&s)) + unsafe.Offsetof(s.f))

// 等价于 e := unsafe.Pointer(&x[i])
e := unsafe.Pointer(uintptr(unsafe.Pointer(&x)) + i*unsafe.Sizeof(x[0]))

对于第一个例子,完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"unsafe"
)

type S struct {
d int8
f int8
}

func main() {
var s = S{
d: 1,
f: 2,
}
f := unsafe.Pointer(uintptr(unsafe.Pointer(&s)) + unsafe.Offsetof(s.f))
fmt.Println(*(*int8)(f)) // 2
}

最终的内存布局如下图(s 的两个字段都是 1 字节,所以图中 df 都是 1 字节):

详细说明一下:

第一小节我们说过了,结构体字段的内存布局是连续的。上面没有说的是,其实数组的内存布局也是连续的。这对理解下面的内容很有帮助。

  • &s 取得了结构体 s 的地址
  • unsafe.Pointer(&s) 转换为 Pointer 对象,这个指针对象指向的是结构体 s
  • uintptr(unsafe.Pointer(&s)) 取得 Pointer 对象的内存地址(整数)
  • unsafe.Offsetof(s.f) 取得了 f 字段的内存偏移地址(相对地址,相对于 s 的地址)
  • uintptr(unsafe.Pointer(&s)) + unsafe.Offsetof(s.f) 就是 s.f 的实际内存地址了(绝对地址)
  • 最后转换回 unsafe.Pointer 对象,这个对象指向的地址是 s.f 的地址

最终 f 指向的地址是 s.f,然后我们可以通过 (*int8)(f)unsafe.Pointer 转换为 *int8 类型指针,最后通过 * 操作符取得这个指针指向的值。

对于第二个例子,完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
package main

import (
"fmt"
"unsafe"
)

func main() {
var x = [3]int8{4, 5, 6}
e := unsafe.Pointer(uintptr(unsafe.Pointer(&x)) + 2*unsafe.Sizeof(x[0]))
fmt.Println(*(*int8)(e)) // 6
}

最终的内存布局如下图,e 指向了数组的第 3 个元素(下标从 0 开始算的):

代码中的 2 可以是其他任何有效的数组下标。

  • &s 取得了数组 x 的地址
  • unsafe.Pointer(&x) 转换为 Pointer 对象,这个指针对象指向的是数组 x
  • uintptr(unsafe.Pointer(&x)) 取得 Pointer 对象的内存地址(也就是 0xab
  • unsafe.Sizeof(x[0]) 是数组 x 里面每一个元素所需要的内存大小,乘以 2 表示是元素 x[2] 的地址偏移量(相对地址,相对于 x[0] 的地址)
  • uintptr(unsafe.Pointer(&x)) + 2*unsafe.Sizeof(x[0]) 表示的是数组元素 x[2] 的实际内存地址(绝对地址)
  • 最后转换回 unsafe.Pointer 对象,这个对象指向的地址是 x[2] 的地址(也就是 0xab + 2)。

最终,我们可以通过 (*int8)e 转换为 *int8 类型的指针,最后通过 * 操作符获取其指向的内容,也就是 6。

以这种方式对指针进行加减偏移量的运算都是有效的。(em...这里说的是写在同一行的这种方式)。这种情况下使用 &^ 这两个操作符也是有效的(通常用于内存对齐)。 在所有情况下,得到的结果必须指向原始分配的对象。

不像 C 语言,将指针加上一个超出其原始分配的内存区域的偏移量是无效的:

1
2
3
// 无效: end 指向了分配的空间以外的区域
var s thing
end = unsafe.Pointer(uintptr(unsafe.Pointer(&s)) + unsafe.Sizeof(s))

下面对切片的这种操作也跟上图类似。

1
2
3
// 无效: end 指向了分配的空间以外的区域
b := make([]byte, n)
end = unsafe.Pointer(uintptr(unsafe.Pointer(&b[0])) + uintptr(n))

这是因为,内存的地址范围是 [start, end),是不包含终点的那个地址的,上面的 end 都指向了地址的边界,这是无效的。 当然,除了边界上,边界以外都是无效的。(end 指向的内存不是属于那个变量的)

注意:两个转换(Pointer => uintptr, uintptr => Pointer)必须出现在同一个表达式中,只有中间的算术运算:

1
2
3
4
// 无效: uintptr 在转换回 Pointer 之前不能存储在变量中
// 原因上面也说过了,就是 p 指向的内容可能会被垃圾回收器回收。
u := uintptr(p)
p = unsafe.Pointer(u + offset)

注意:指针必须指向已分配的对象,因此它不能是 nil

1
2
3
// 无效: nil 指针转换
u := unsafe.Pointer(nil)
p := unsafe.Pointer(uintptr(u) + offset)

4. 调用 syscall.Syscall 时将指针转换为 uintptr

觉得文字太啰嗦可以直接看图:

syscall 包中的 Syscall 函数将其 uintptr 参数直接传递给操作系统,然后操作系统可以根据调用的细节将其中一些参数重新解释为指针。 也就是说,系统调用实现隐式地将某些参数从 uintptr 转换回指针。

如果必须将指针参数转换为 uintptr 以用作参数,则该转换必须出现在调用表达式本身中:

1
syscall.Syscall(SYS_READ, uintptr(fd), uintptr(unsafe.Pointer(p)), uintptr(n))

编译器通过安排被引用的分配对象(如果有的话)被保留,并且在调用完成之前不移动,来处理在调用程序集中实现的函数的参数列表中转换为 uintptr 的指针, 即使仅从类型来看,在调用期间似乎不再需要对象。

为了使编译器识别该模式,转换必须出现在参数列表中:

1
2
3
4
// 无效:在系统调用期间隐式转换回指针之前,
// uintptr 不能存储在变量中。
u := uintptr(unsafe.Pointer(p))
syscall.Syscall(SYS_READ, uintptr(fd), u, uintptr(n))

5. 将 reflect.Value.Pointerreflect.Value.UnsafeAddr 的结果从 uintptr 转换为 Pointer

reflect.ValuePointerUnsafeAddr 方法返回类型 uintptr 而不是 unsafe.Pointer, 从而防止调用者在未导入 unsafe 包的情况下将结果更改为任意类型。(这是为了防止开发者对 Pointer 的误操作。) 然而,这也意味着这个返回的结果是脆弱的,我们必须在调用之后立即转换为 Pointer(如果我们确切的需要一个 Pointer):

其实就是为了让开发者明确自己知道在干啥,要不然写出了 bug 都不知道。

1
2
3
// 在调用了 reflect.Value 的 Pointer 方法后,
// 立即转换为 unsafe.Pointer。
p := (*int)(unsafe.Pointer(reflect.ValueOf(new(int)).Pointer()))

与上述情况一样,在转换之前存储结果是无效的:

1
2
3
// 无效: uintptr 在转换回 Pointer 之前不能保存在变量中
u := reflect.ValueOf(new(int)).Pointer() // uintptr 保存到了 u 中
p := (*int)(unsafe.Pointer(u))

原因上面也说了,因为 u 指向的内存是不受保护的,可能会被垃圾回收器收集。

6. 将 reflect.SliceHeaderreflect.StringHeaderData 字段跟 Pointer 互相转换

与前面的情况一样,反射数据结构 SliceHeaderStringHeader 将字段 Data 声明为 uintptr, 以防止调用者在不首先导入 unsafe 的情况下将结果更改为任意类型。 然而,这意味着 SliceHeaderStringHeader 仅在解析实际切片或字符串值的内容时有效。

我们先来看看这两个结构体的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// SliceHeader 是切片的运行时表示(内存布局跟切片一致)
// 它不能安全或可移植地使用,其表示形式可能会在以后的版本中更改。
// 此外,Data 字段不足以保证它引用的数据不会被垃圾回收器收集,
// 因此程序必须保留一个指向底层数据的单独的、正确类型的指针。
type SliceHeader struct {
Data uintptr
Len int
Cap int
}

// StringHeader 字符串的运行时表示(内存布局跟字符串一致)
// ... 其他注意事项跟 SliceHeader 一样
type StringHeader struct {
Data uintptr
Len int
}

使用示例:

1
2
3
4
5
// 将字符串的内容修改为 p 指向的内容
var s string
hdr := (*reflect.StringHeader)(unsafe.Pointer(&s))
hdr.Data = uintptr(unsafe.Pointer(p))
hdr.Len = n

这种转换是有效的,因为 SliceHeader 的内存布局和 StringHeader 的内存布局一致,并且 SliceHeader 所占用的内存空间比 StringHeader 所占用内存空间大,也就是说,这是一种大小更大的类型转换为大小更小的类型,这会丢失 SliceHeader 的一部分数据, 但是丢失的那部分对我们程序正常运行是没有任何影响的。

在这个用法中,hdr.Data 实际上是引用字符串头中的基础指针的另一种方式,而不是 uintptr 变量本身。 (我们这里也是使用了 uintptr 表达式,而不是一个存储了 uintptr 类型的变量)

通常来说,reflect.SliceHeaderreflect.StringHeader 通常用在指向实际切片或者字符串的 *reflect.SliceHeader*reflect.StringHeader永远不会被当作普通结构体使用。 程序不应该声明或者分配这些结构体类型的变量,下面的写法是有风险的。

1
2
3
4
5
// 无效: 直接声明的 Header 不会将 Data 作为引用
var hdr reflect.StringHeader
hdr.Data = uintptr(unsafe.Pointer(p))
hdr.Len = n
s := *(*string)(unsafe.Pointer(&hdr)) // p 可能已经丢失

Add 函数

函数原型是:func Add(ptr Pointer, len IntegerType) Pointer

这个函数的作用是,可以将 unsafe.Pointer 类型加上一个偏移量得到一个指向新地址的 unsafe.Pointer。 简单点来说,就是对 unsafe.Pointer 做算术运算的,上面我们说过 unsafe.Pointer 是不能直接进行算术运算的, 因此需要先转换为 uintptr 然后再进行算术运算,算完再转换回 unsafe.Pointer 类型,所以会很繁琐。 有了 Add 方法,我们可以写得简单一些,不用做 uintptr 的转换。

有了 Add,我们可以简化一下上面那个通过数组指针加偏移量的例子,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
package main

import (
"fmt"
"unsafe"
)

func main() {
var x = [3]int8{4, 5, 6}
//e := unsafe.Pointer(uintptr(unsafe.Pointer(&x)) + 2*unsafe.Sizeof(x[0]))
e := unsafe.Add(unsafe.Pointer(&x), 2 * unsafe.Sizeof(x[0]))
fmt.Println(*(*int8)(e)) // 6
}

在这个例子中,我们先是通过 unsafe.Pointer(&x) 获取到了一个指向 xunsafe.Pointer 对象, 然后通过 unsafe.Add 加上了 2 个 int8 类型大小的偏移量,最终得到的是一个指向 x[2]unsafe.Pointer

Add 方法可以简化我们对指针的一些操作。

Slice 函数

Slice 函数的原型是:func Slice(ptr *ArbitraryType, len IntegerType) []ArbitraryType

函数 Slice 返回一个切片,其底层数组以 ptr 开头,长度和容量为 len

unsafe.Slice(ptr, len) 等价于:

1
(*[len]ArbitraryType)(unsafe.Pointer(ptr))[:]

除了这个,作为一种特殊情况,如果 ptrnillen 为零,则 Slice 返回 nil

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (
"fmt"
"unsafe"
)

func main() {
var x = [6]int8{4, 5, 6, 7, 8, 9}
// 这里取了数组第一个元素 x[1] 的地址,
// 从这个地址开始取了 3 个元素作为新的切片底层数组,
// 返回这个新的切片
s := unsafe.Slice(&x[1], 3)
fmt.Println(s) // [5 6 7]
}

需要非常注意的是,第一个参数实际上隐含传递了该地址对应的类型信息,上面用了 &x[1],传递的类型实际上是 int8

如果我们按照下面这样写,得到的结果就是错误的,因为它隐式传递的类型是 [6]int8(这是一个数组),而不是 int8

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 错误示例:
package main

import (
"fmt"
"unsafe"
)

func main() {
var x = [6]int8{4, 5, 6, 7, 8, 9}
// unsafe.Slice 第一个参数接收到的类型是 [6]int,
// 所以最终返回了一个切片,这个切片有三个元素,
// 每一个元素都是长度为 6 数据类型为 int8 的数组。
// 也即形如 [[6]int8, [6]int8, [6]int8] 的切片
s := unsafe.Slice(&x, 3)
// [[4 5 6 7 8 9] [91 91 52 32 53 32] [54 32 4 5 6 7]]
fmt.Println(s)
}

这样显然不是我们想要的结果,因为它读取到了一部分未知的内存,如果我们修改这部分内存,可能会造成程序崩溃。

一个很常见的用法

在实际应用中,很多框架为了提高性能,在做 []bytestring 的切换的时候,往往会使用 unsafe.Pointer 来实现(比如 gin 框架):

下面这个例子实现了 []bytestring 的转换,而且避免了内存分配。这是因为,切片和字符串的内存布局是一致的,只不过切片比字符串占用 的空间多了一点,还有一个 cap 容量字段,用来表示切片的容量是多少。具体我们可以再看看上面的 reflect.SliceHeaderreflect.StringHeader, 在下面这个字节切片到字符串的转换过程中,是从占用空间更大的类型转换为占用空间更小的类型,所以是安全的,丢失的那个 cap 对我们程序正常运行无影响。

先看看 []bytestring 的类型底层定义:

1
2
3
4
5
6
7
8
9
10
11
12
// 字符串
type stringStruct struct {
str unsafe.Pointer
len int
}

// 切片,比 string 的结构体多了一个 cap 字段,但是前面的两个字段是一样的
type slice struct {
array unsafe.Pointer
len int
cap int
}

[]byte 转字符串的示例:

1
2
3
4
func BytesToString(b []byte) string {
// 将 b 解析为字符串
return *(*string)(unsafe.Pointer(&b))
}

这个操作如下图:

在这个转换过程中,其实只是将 b 表示的类型转由 []byte 转换为了 string,之所以可以这么转, 是因为 []byte 的内存布局跟 string 的内存布局是一样的, 但是由于字符串实际占用空间比切片类型要小(不包括其底层指针指向的内容), 所以在转换过程中,cap 字段丢失了,但是 strin 也不需要这个字段,所以对程序运行没影响。

同时字符串长度是按照字节计算的,所以字节切片和字符串的 len 字段是一样的,不需要做额外处理。

字符串转 []byte 的示例:

1
2
3
4
5
6
7
8
9
10
func StringToBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(
// 定义匿名结构体变量,内存布局跟 []byte 一致,
// 这样就可以转换为 []byte 了。
&struct {
string
Cap int
}{s, len(s)},
))
}

这个操作如下图:

这个过程只是需要分配很小一部分内存就可以完成了,效率比 go 自带的转换高。

go 里面字符串是不可变的,但 go 为了维持字符串不可变的特性,在字符串和字节切片之间转换一般都是通过数据拷贝的方式实现的。 因为这样就不会影响到原来的字符串或者字节切片了,但是这样做的性能会非常低。 具体可参考 slicebytetostringstringtoslicebyte 函数,这两个函数位于 runtime/string.go 中。

总结

本文主要讲了如下内容:

  • 内存布局:结构体的字段存储是占用了连续的一段内存,而且结构体可能会占用比实际需要空间更大的内存,因为需要对齐内存。
  • 指针存储了指向变量的地址,对这个地址使用 * 操作符可以获取这个地址指向的内容。
  • uintptr 是 C 里面的一种命名惯例,u 前缀的意思是 unsignedint 表示是 int 类型,ptr 表示这个类型是用来表示指针的。
  • unsafe 定义的 Pointer 类型是一种可以指向任何类型的指针,ArbitraryType 可用于表示任意类型。
  • 我们通过 unsafe.Pointer 修改结构体字段的时候,要使用 unsafe.Offsetof 获取结构体的偏移量。
  • 通过 unsafe.Sizeof 可以获得某一种类型所需要的内存空间大小(其中包括了用于内存对齐的内存)。
  • unsafe.Pointeruintptr 之间的类型转换。
  • 几种使用 unsafe.Pointer 的模式:
    • *T1*T2 的转换
    • unsafe.Pointer 转换为 uintptr
    • 使用算术运算将 unsafe.Pointer 转换为 uintptr 并转换回去(需要注意不能使用中间变量来保存 uintptr(unsafe.Pointer(p))
    • 调用 syscall.Syscall 时将指针转换为 uintptr
    • reflect.ValuePointerUnsafeAddr 的结果从 uintptr 转换为 unsafe.Pointer
    • reflect.SliceHeaderreflect.StringHeaderData 字段跟 Pointer 互相转换
  • Add 函数可以简化指针的算术运算,不用来回转换类型(比如 unsafe.Pointer 转换为 uintptr,然后再转换为 unsafe.Pointer)。
  • Slice 函数可以获取指针指向内存的一部分。
  • 最后介绍了 string[]byte 之间通过 unsafe.Pointer 实现高效转换的方法。

版本:go 1.19

在前一篇文章中我们讨论了 go Context 的一些常见使用方式,今天我们再来从源码的角度深入了解一下 Context 的设计与实现。 Context 的源码数量不多,去掉注释大概只有两三百行,但是包含的信息量巨大(所以本文也比较长),而且设计得非常巧妙,值得读一读。 然后,下面的 图解 propagateCancel 这一小节的几个图描述了 Context 的工作机制,如果不想看代码,可以直接拉到下面。

再了解一下 chan

在开始本文之前,先来了解一下 Context 实现的关键:chan,对于 chan(再准确一点,我们这里讨论的其实是只读 chan),我们需要清楚以下几点:

  • <-ch 表示从 chan 中获取值。
  • <-ch 在通道(chan)尚未关闭的时候,会一直阻塞,直到通道接收到值。所以有时候通过 select 语句来监听 chan,从而实现协程间的通信。
  • <-ch 在通道(chan)关闭了之后,会立即返回,但是返回的是 chan 关联类型的零值,如果我们还需要判断是否是因为关闭才返回的话,需要用两个值来接收 <-ch 的返回值,如 v, ok := <-chok 表明了通道是否已经关闭,如果是关闭导致它返回,则返回的是 false

下面这个例子展示了 Context 实现的关键(通过 close(chan),所有 <-chan 会返回,本质上来说是一种 "广播机制"):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"time"
)

func main() {
// 创建一个 chan,类型是 struct{}
ch := make(chan struct{})

go func() {
select {
// 这个 case 会在 chan 关闭或者收到值的时候执行,
// 在这里的情况是关闭了 chan。
case v, ok := <-ch:
if !ok {
// 输出 "chan ch is closed."
fmt.Println("chan ch is closed.")
}
// 关闭 chan 之后得到的是 ch 的零值,也就是一个空结构体实例
fmt.Println(v) // {}
}
}()

// 关闭 chan,所有从 chan 读取的操作都会立即返回。
// 关闭 chan 之后,<-ch 返回的第一个值是 chan 对应类型的零值,第二个参数是 false。
// 如果不是关闭的 chan,第二个参数是 true,表示可以从 chan 获取到数据。
close(ch)

// 防止程序退出看不到效果
time.Sleep(time.Second)

// {},chan struct{} 关闭后,从中获取值的时候会立即返回一个空结构体实例
fmt.Println(<-ch)
}

Context 中,context.Done() 方法返回的 chan,不会接收任何的值,但是在调用 CancelFunc 的时候,会关闭这个 chan,因此 所有的 <-context.Done() 会返回一个零值,返回什么不重要,重要的是,它返回的时候就代表着被上游取消了(代表的是一个取消信号)。

Context 的 UML 图

先来看看它的 UML 图,在后面会陆续展开细说。

  • 首先,有两个关键的接口,分别是 ContextcancelerContext 里面的接口是我们实际开发的时候用的,而 cancelercontext 包内部使用, 这个 canceler 接口定义了一个 cancel 方法,这个方法就是用来发送取消信号的。
  • emptyCtx 代表一个空的 Context,往往用作根 ContextvalueCtx 在父 Context 的基础上加了一个键值对。
  • cancelCtx 同时实现了 Contextcanceler 接口,表示一个可以取消的 Context
  • timerCtxcancelCtx 的基础上加了一个定时器,表示可以在指定时间之后由定时器进行取消操作。又或者由开发者自行取消。

context 包的结构体、方法说明

  • Context 接口:定义了 Context 接口的四个方法
  • emptyCtx 结构体:一个空 Context
  • CancelFunc 函数类型:Context 的取消函数
  • canceler 接口:Context 取消接口
  • cancelCtx 结构体:实现了取消接口的 Context
  • timerCtx 结构体:超时会取消的 Context
  • valueCtx 结构体:可以存储键值对的 Context
  • Background() 函数:返回空 Context,常作为根 Context
  • TODO() 函数:返回一个空 Context,在需要 Context 的地方又没有合适的 Context 就用这个
  • WithCancel() 函数:基于父 Context,创建一个可取消的 Context
  • newCancelCtx() 函数:创建一个可取消的 Context
  • propagateCancel() 函数:将节点挂载到上游第一个 cancelCtx 上,又或者启动协程监听 Context 取消事件
  • parentCancelCtx() 函数:返回上游的第一个 cancelCtx
  • removeChild() 函数:移除 Context 节点
  • init() 函数:包初始化函数,创建了一个关闭的 chan
  • WithDeadline() 函数:创建一个有 deadlineContext
  • WithTimeout() 函数:创建一个有 timeoutContext
  • WithValue() 函数:创建一个存储键值对的 Context

Context 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Context interface {
// 返回一个 channel,当 context 被取消或者到了 deadline 的时候,
// 这个 channel 会被 close,从而 <-chan struct{} 会返回。
// 在没有关闭之前,一直阻塞,因为不会有任何地方往这个 channel 中发送值。
Done() <-chan struct{}

// 在 channel Done 返回的 channel 关闭后,返回 context 取消原因。
Err() error

// 返回 context 是否会被取消以及自动取消时间(即 deadline)
// ok 为 true,表明设置了 deadline,第一个返回值就是设置的 deadline
// ok 为 false,表示没有设置 deadline,第一个返回值没意义。
Deadline() (deadline time.Time, ok bool)

// 获取 key 对应的 value
Value(key interface{}) interface{}
}

Context 接口定义了 4 个方法,它们都是幂等的,也就是说连续多次调用同一个方法,得到的结果都是相同的。

Deadline() 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"context"
"github.com/davecgh/go-spew/spew"
"time"
)

func main() {
ctx, _ := context.WithTimeout(context.Background(), time.Second)
// 输出 ctx 的 deadline,具体时间为 1 秒之后
spew.Dump(ctx.Deadline())

ctx1 := context.Background()
// ctx1 的超时时间是一个零值
spew.Dump(ctx1.Deadline())

// 输出:
// (time.Time) 2022-11-19 11:45:38.702281 +0800 CST m=+1.000233039
// (bool) true
// (time.Time) 0001-01-01 00:00:00 +0000 UTC
// (bool) false
}

我们通过 Deadline() 方法可以知道当前拿到的 Context 参数是否设置了 deadline,以及 deadline 是什么时候, 从而决定接下来是否还需要做一些操作,如果时间太少的话,就可以考虑不做了,因为最终的结果还是超时。

canceler 接口

先看源码:

1
2
3
4
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}

实现了 canceler 两个方法的 Context,就表明该 Context 是可取消的。

cancel 方法的第一个参数 removeFromParent 表示的是,是否从父 Context 移除自身,这是因为 Context 是一个树状结构。 在 Context 取消的时候,它会给所有派生的 Context 也发送取消信号,所以派生新的 Context 的时候会记录从当前 Context 派生出去的 Context

但同样的,在 Context 被取消的时候,父 Context 也就再也不需要给这个 Context 发送取消信号啥的。

我们可以看这个图,Context 派生出了三个 Context,当 child 3 这个 Context cancel 的时候,只会影响到 child 3-1child 3-2 以及其自身, cancel 之后,根结点的 Context 再发送取消信号,child 3 就再也收不到了,因为它已经从这棵树中移除。

emptyCtx 结构体

emptyCtx 本身没有什么实际的作用,一般用作根 Context,比如在 main 函数里面创建的 Context

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}

func (*emptyCtx) Done() <-chan struct{} {
return nil
}

func (*emptyCtx) Err() error {
return nil
}

func (*emptyCtx) Value(key any) any {
return nil
}

func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}

我们使用 context.Background() 或者 context.TODO() 的时候返回的就是一个 emptyCtx

1
2
3
4
5
6
7
8
9
10
11
12
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)

func Background() Context {
return background
}

func TODO() Context {
return todo
}

emptyCtx 永远不会被取消,也没有值和 deadline。TODO 用在需要 Context 但又没有合适的 Context 可以用的时候。

cancelCtx 结构体

cancel 的操作实际上只会做一次,后续调用 cancel 的时候会返回第一次 cancel 的结果,cancel 是一个幂等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 可以被取消,取消的时候,所有实现了 canceler 接口的派生出来的 Context 也会被取消。
type cancelCtx struct {
// cancelCtx 也实现了 `Context` 接口
Context

// mu 用以保护后面的 done、children、err 字段
mu sync.Mutex

// 是一个 chan struct{},懒汉式创建,
// 在第一次 cancel 的时候被关闭
done atomic.Value
// 记录所有可以取消的子 Context
// 在第一次 cancel 的时候会被设置为 nil。
children map[canceler]struct{}
// 在第一次 cancel 的时候会被设置为非 nil 的值
err error
}

cancelCtx.done 是一个支持原子操作的 chan struct{}

先来看看它的 Done() 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 返回一个只读的 chan,但没有任何地方会往这个 chan 写入数据,
// cancel 的时候会关闭这个 chan,从而任何 <-ch 的操作都会立即返回。
func (c *cancelCtx) Done() <-chan struct{} {
// 如果 done 这个 chan 已经初始化了,就直接返回。
d := c.done.Load()
if d != nil {
return d.(chan struct{})
}
// 如果 done 还没初始化,则会进行初始化。
// 也就是上面说的 "懒汉式" 的创建方式,只有在需要的时候才会初始化。
c.mu.Lock()
defer c.mu.Unlock()
d = c.done.Load()
if d == nil {
d = make(chan struct{})
c.done.Store(d)
}
return d.(chan struct{})
}

再来看看 Err 方法:

1
2
3
4
5
6
7
func (c *cancelCtx) Err() error {
// 使用 mu 保证并发安全,本质是 return c.err
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}

cancel 方法

然后再来看 cancel 方法的实现:

cancel 方法做了如下操作:

  • 关闭 c.done
  • 取消 c 的所有孩子 Context
  • 如果 removeFromParenttrue,会将 c 从其父 Contextchildren 属性中移除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 发送取消信号
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
// 必须传递一个 err
if err == nil {
panic("context: internal error: missing cancel error")
}
// 如果已经取消,直接返回。(幂等的设计)
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
// 记录取消原因,在调用 c.Err() 的时候会返回这个原因
c.err = err
// 关闭 done 这个通道,通知其他协程
d, _ := c.done.Load().(chan struct{})
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
// 遍历它的所有子结点,并对其子结点进行取消操作
for child := range c.children {
child.cancel(false, err)
}
// 将子结点置空
c.children = nil
c.mu.Unlock()

// 从父结点中移除自己
if removeFromParent {
removeChild(c.Context, c)
}
}

WithCancel 方法

我们再来看看创建 cancelCtx 的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 返回值里的 Context 的 Done 方法返回的 channel 关闭或者 parent 被 cancel 的时候,
// 返回值的 CancelFunc 会被执行。
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
// 必须从其他 Context 派生
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
// 将 c 挂靠到 parent 的 children 属性中,
// 从而在 parent 取消的时候,可以感知得到。
propagateCancel(parent, &c) // 具体实现后面有详细说明
return &c, func() { c.cancel(true, Canceled) }
}

// 创建一个 cancelCtx 实例
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

我们看到这里的 CancelFunc 的里面调用 cancel 的时候,第一个参数是 true,这表示在取消的时候,需要从 parent 中移除自身。

parentCancelCtx 方法

parentCancelCtx 方法用以从 parent 开始直到根节点的路径搜索第一个 cancelCtx,会跳过中间的 valueCtx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 返回当前节点到根节点路径上的第一个 *cancelCtx。
// 如果 parent 就是 *cancelCtx,那么返回的就是 parent。
// 如果不是,它会从当前结点往 Context 树根结点遍历,找到父结点中的第一个 *cancelCtx,假设是 p。
// 然后检查 p.done 是否跟 parent.Done() 一样,不一样的话意味着 *cancelCtx 已经被包装在自定义实现中了,
// 这个时候,我们不应该绕过它,直接返回 nil 和 false。
// (注意:如果是我们的结构体嵌套了 Context,那么一样会被当做普通的 Context 处理。)
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
// 判断 parent 的 done 是否已经关闭或者并没有 done chan。
// 如果是,则返回 nil 和 false
done := parent.Done()
// done 为 nil 表示 parent 或者到根 Context 这条路径上并没有 *cancelCtx(只有 valueCtx 或 emptyCtx)。
if done == closedchan || done == nil {
return nil, false
}
// 判断 parent 是否是一个 cancelCtx
// 如果不是,则返回 nil 和 false
// 讲道理,parent.Value(&cancelCtxKey) 的返回值只有两种情况:
// emptyCtx(找到根节点也没找到)或者 *cancelCtx(找到了)
// (parent.Value 实现细节见下面的 value 那一小节)
p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
if !ok {
return nil, false
}
// 执行到这里的时候:p 是一个 *cancelCtx
// 判断 parent.Done() 和 p.done 是否相等:
// 不等则意味着 *cancelCtx 已经被包装在自定义实现中了,这个时候,我们不应该绕过它。
// 详细请参考:go issue 28728(google)
pdone, _ := p.done.Load().(chan struct{})
if pdone != done {
return nil, false
}
return p, true
}

// 从父结点移除自己(从 parent 移除 child)
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
// 从父结点的 children 中移除 child
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}

propagateCancel 函数

这个函数会在两个地方调用,一个是 WithCancel,另一个是 WithDeadline,它的主要作用是,找到 parent 以及其父级 Context 路径上 第一个 cancelCtx,目的是,将 child 挂载到找到的这个 cancelCtxchildren 属性上,从而在这个 cancelCtx 取消的时候, 可以通过遍历 cancelCtx.childrenchild 进行通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 由 parent 往根节点搜索第一个 cancelCtx,如果找到则将 child 写入到 cancelCtx.children 中。
// 如果找到的 cancelCtx 自定义了 Done,则启动协程监听 cancelCtx.Done()。
func propagateCancel(parent Context, child canceler) {
// 如果 Context 树上完全不存在 cancelCtx,则直接返回
done := parent.Done()
if done == nil {
return // parent is never canceled
}

// 如果 parent 已经取消,则直接取消 child
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}

// 往根节点搜索第一个 cancelCtx
if p, ok := parentCancelCtx(parent); ok {
// 找到了,但是已经取消了,则取消 child
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
// 找到了,尚未取消。
// 将 child 写入到 p 的 children 属性中。
// p.children 是懒汉式创建的。
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// 执行到这里的原因是:
// 用户自定义了 Done 通道(跟 parent 不是同一个 done),
// 所以不能以父节点路径上的 done 来决定 child 是否取消,
// 需要通过启动新协程的方式来监听 Done 通道,从而可以正常取消 parent 的孩子节点。
atomic.AddInt32(&goroutines, +1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}

图解 propagateCancel

propagateCancel 的实现可能只看代码不好理解,所以我画了几个图来帮助我们理解:

我们可以看到 propagateCancel 里面有一个 parentCancelCtx,对于 parentCancelCtx,下一小节有比较详细的说明。 这个图描述了 parentCancelCtx 的实际执行过程,在我们调用 propagateCancel 的时候,搜索 cancelCtx 的过程:

  • 首先,我们知道,Context 是一个树状结构,每一个 Context 都可以派生出子 Context
  • 图中 parentAchild 到根节点 emptyCtx 路径上的第一个 cancelCtx
  • parentCancelCtx 拿到的是 parentA,然后将 child 写入到 parentAchildren 属性中。
  • 从而在我们手动取消 parentA 的时候,parentA 可以通过遍历 children 的方式,告知 child 协程取消了。

注意:搜索的时候会跳过 valueCtx

childemptyCtx 路径上搜索第一个 cancelCtx 的过程

我们取消的过程大概如下图:

我们理解取消的过程的时候,可以忽略掉 Context 树中那些非 cancelCtx 节点,正如上图这样,实际上取消过程只涉及到 parentA 以及 child, 其他节点如何并不影响。

如果觉得这个图不太好懂,可以再看看这个图:

用户覆盖了 done 的特殊情况

如果用户覆盖了 done 通道,这表明用户想自行控制什么时候 parentB 结束。(也就是说,parentB 脱离了路径上 cancelCtx 的控制, 也就是假设 parentA 还是 cancelCtx,在 parentA 取消的时候,parentB 是收不到信号的,parentB 收到信号是在其 Done() 返回的通道关闭的时候。)

这个时候因为我们从 parentB 派生出了一个新的 cancelCtxchild),所以 parentB 需要对 child 进行控制, 也就是说在 parentB 取消的时候,也取消 child。这种情况下,就是通过 propagateCancel 里面的协程里面实现的。

parentCancelCtx 函数

parentCancelCtx 的描述比较晦涩,如果没有实际的例子我们很难看得懂它的意思。里面有一个比较, 是针对 parent.Done()p.done.Load().(chan struct{}) 的,源码里面判断如果这两者不一样,则返回 nilfalse

注释里说,如果两者不一样,我们不应该绕过它(bypass it),但是这里的绕过是什么意思呢?我们来看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package c

import (
"testing"
"time"
)

type A struct {
Context
ch <-chan struct{}
}

func (a *A) Done() <-chan struct{} {
return a.ch
}

func TestCancel(t *testing.T) {
// 创建一个 cancel context
ctx, cancel0 := WithCancel(TODO())

// 创建一个 A 实例
// 这个实例可以内嵌了 Context,所以可以当作 Context 使用,
// 但是我们覆盖了 Context 本身的 Done 方法。
ch := make(<-chan struct{})
a := A{ctx, ch}

// 因为我们覆盖了 Done 方法,所以 go 底层会认为是开发者想要
// 自行控制协程取消,所以在 WithCancel 的时候并不会把 ctx1
// 挂载到 a 的 children 属性下,这样一来,
// go 底层只能再启动一个协程来监听 a 的 Done chan,
// 从而可以在 a cancel 的时候可以正常通知到 ctx1。
ctx1, cancel := WithCancel(&a)

go func() {
time.Sleep(time.Millisecond * 10)
cancel()
}()

// ctx2 会写入到 ctx1 的 children 属性中,
// 这样就不需要启动新的协程来监测 ctx1 的 done。
ctx2, cancel2 := WithCancel(ctx1)

time.Sleep(time.Millisecond * 20)
}

下面这个图描述了上面这个例子中的 Context 树结构:

我们调用了三次 WithCancel,这三次的效果都不太一样:

  • 第一次调用的时候,parent.Done 返回 nil,这个时候,我们取消只有调用 cancel0 这一种途径,也就是手动取消。
  • 第二次调用的时候,parent.Done 返回的不是 nil,但是和 go 语言底层的那个 done 属性不一致(一个是 A.Done() ,另一个是 cancelCtx.done)。这种情况下,go 底层就知道,开发者自己定义了一个 done 通道,这个时候,会需要另外启动一个协程来监听 A.Done() 返回的 done,从而可以在 A 结束的时候,通知 A 的孩子 ctx1
  • 第三次调用的时候,parent.Done 返回的不是 nil,而且和 cancelCtx.done 相等,说明用户没有重写 Done 方法,这样就可以直接将 ctx2 挂载到 ctx1children 属性上,而不用另外启动协程来监听 ctx1done

具体怎么实现的可以看上面的 propagateCancel 这一小节。

之所以这样是为了给开发者一定的控制权,如果忽略了用户自定义的 Done 方法,那么可能取消的操作用户就无法控制了。 但我们覆盖 Done 方法就是为了可以自主去控制取消的操作。

child 什么时候从父 Context 移除?

如果我们足够细心,就会发现我们在 cancel 的时候,有的地方需要将 childContext 中移除,而有的地方不需要,那什么时候需要呢?

需要移除的情况:

  1. WithCancel 派生出新的 Context 的时候,假设叫 root,这个时候派生的这个 root 也是可以继续派生出新的 Context 的,而这个 root 对于它的子孙 Context 它就是根节点,所以当 root 被取消的时候,它和它的子孙 Context 也要被取消了,所以以 root 为根节点的子树需要被移除。
  2. WithDeadline 里面,当给定的 d 其实已经小于当前时间的时候(也就是父 Context 已经超时了),这个时候会将刚挂载到父节点的 timerCtx 移除,同时返回的 CancelFunc 中,cancel 的第一个参数是 false,因为它已经被移除了。

不需要移除的情况:

  1. propagateCancel 中监测到 parent 已经被取消的时候,因为这个时候 child 并没有关联上 parent,所以自然也没有移除的这种操作。
  2. 就是上面提到的第二种情况中,WithDeadline 的时候就监测到 deadline 已经比当前时间小了(超时了)。
  3. cancelCtxcancel 方法里面,遍历 cancelCtx 的孩子节点的时候,不需要做移除的操作,因为 cancelCtx 本身就需要被从 Context 树中移除。
  4. timerCtx 在没有挂载到 parent 上就已经过期了。

timerCtx 结构体

timerCtx 是一个带有定时器的 cancelCtx,我们既可以手动取消,也可以由底层定时器在到达 deadline 的时候进行取消。

1
2
3
4
5
6
7
8
9
// timerCtx 嵌套了 cancelCtx,这表示我们可以手动取消。
// 另外还有一个定时器,这个定时器的执行时间定在 deadline 这个时间点,
// 一旦时间到了,就会调用 cancelCtx 的 cancel 方法。
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.

deadline time.Time
}

我们有两种方法来创建 timerCtx,一个是 WithDeadline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 基于 parent 创建一个 cancelCtx,内嵌到 timerCtx 中。
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
// 不能基于 nil 创建一个 timerCtx
if parent == nil {
panic("cannot create context from nil parent")
}
// 如果当前设置的超时时间比 parent 设置的超时时间更长,
// 那么不用 timerCtx 开启定时器了,因为 parent 会先到期取消,
// 这里再启动一个定时器也没有执行的机会了。
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
// 创建一个 timerCtx
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
// 将刚创建的 timerCtx 挂载到父 Context 中
propagateCancel(parent, c)
// 判断还有多久到达 deadline
dur := time.Until(d)
// 如果 deadline 已经过去了,那么直接执行 timerCtx 的 cancel 逻辑,
// 同时移除跟父节点的关联。(创建了还没来得及启动定时器就到期了)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
// cancel 不再需要从父节点移除自身,上一行已经移除了
return c, func() { c.cancel(false, Canceled) }
}
// 启动一个定时器,在到达 deadline 的时候执行 cancel 操作。
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
// 除了定时器之外,返回一个 CancelFunc 给用户提供自行取消的方式。
return c, func() { c.cancel(true, Canceled) }
}

另外一个是 WithTimeout

WithTimeout 本质上是对 WithDeadline 的调用而已,只不过描述到期时间的方式不一样而已。 WithDeadline 描述的是具体的到期时间,WithTimeout 描述的是多久以后的时间,两者其实都代表未来的某一个时间点。

1
2
3
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}

valueCtx

1
2
3
4
5
6
// valueCtx 在父 Context 的基础上,带有一对键值对。
// 实现了一个 Value 方法,其他方法都是调用 parent 的。
type valueCtx struct {
Context
key, val any
}

WithValue 方法

WithValue 方法一般用在请求范围内的数据共享,WithValue 方法很简单,就是在 parent 的基础上加上了一个 key 和 一个 value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 返回的子 Context 里面有 key val 对
func WithValue(parent Context, key, val any) Context {
// parent 不能为 nil
if parent == nil {
panic("cannot create context from nil parent")
}
// key 不能为 nil
if key == nil {
panic("nil key")
}
// key 必须是可以比较的,
// 因为在获取值的时候需要进行 key 的比较。
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
// 返回 valueCtx
return &valueCtx{parent, key, val}
}

valueCtx

valueCtx 是从其他 Context 派生出来的,所以内嵌了 Context 接口,同时还有两个字段是 keyval,表示的是父 Context 传递的键值。

1
2
3
4
type valueCtx struct {
Context
key, val any
}

value 方法

value 方法用以从 Context 中获取对应的值,它会从 Context 树自底向上进行递归搜索,具体来说会有以下几种情况:

  • 如果 ctx*valueCtx,则会判断 key 是否等于 ctx 里面的 key,如果相等,返回 ctx.val否则,再去搜索 ctx 的父 Context
  • 如果 ctx*cancelCtx,同时 key&cancelCtxKey,则会返回 ctx否则,会继续搜索 ctx 到根结点这个路径上的第一个 cancelCtx
  • 如果 ctx*timerCtx,同时 key&cancelCtxKey,则会返回 ctx.cancelCtx否则,会继续搜索 ctx 到根结点这个路径上的第一个 cancelCtx
  • 如果 ctx*emptyCtx,则会返回 nil。(因为这时候是最顶层的 Context 了,也找不到对应的值)。
  • 如果都不是以上的几种情况,则有可能是开发者自定义的 Context 实现,则直接返回 c.Value(key)

它要解决的问题是:

  • 获取父级 ContextWithValue 共享的值。
  • 获取父级 Context 中最靠近当前节点的 cancelCtx非常重要:它的一个很重要的作用是,将当前节点设置为这个 cancelCtxchildren,从而可以实现在这个父级的 cancelCtx 取消的时候,当前的 Context 可以感知到)。
  • 如果是开发者自己实现的 Context,则直接调用用户自定义的 Value 方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 根据 key 从 c 中获取对应的值,会从 Context 树自底向上递归搜索。
func value(c Context, key any) any {
for {
switch ctx := c.(type) {
case *valueCtx:
if key == ctx.key {
return ctx.val
}
c = ctx.Context
case *cancelCtx:
if key == &cancelCtxKey {
return c
}
c = ctx.Context
case *timerCtx:
if key == &cancelCtxKey {
return &ctx.cancelCtx
}
c = ctx.Context
case *emptyCtx:
return nil
default:
return c.Value(key)
}
}
}

看下面的图可能会更加直观:

这个方法大概做的事情是,从当前的 Context 中查找指定的 key,如果找不到则递归地从其父级 Context 上找。

但我们最需要关注的是,它这里面 case *cancelCtxcase *timerCtx 的逻辑表明了,在我们调用 parent.Value(&cancelCtxKey) 的 时候,实际上获取到的是当前 Context 到根 Context 上第一个 cancelCtx

一个更简单的图是下面这样的(当然在实际中,会有多个子节点,这里假设都是只有一个子节点):

为什么是通过关闭 chan 的方式取消?

上面我们说了,在 Context 取消的时候,是通过关闭 chan 的方式来实现的,那么为什么要这么做呢?这是因为,如果说要通过往 chan 写入数据的方式来 通知其他子孙 Context 的话,我们就需要有多少个子孙 Context 就要往 chan 里面发多少次,但是如果选择使用 close 的方式的话, 我们就完全不用管派生出了多少个可以 cancelContext,因为一旦 chan 关闭了,所有的 <-chan 操作立即得以返回,这同样也实现了通信。 但是 close 这种方式无疑更加高明,更加简洁,当然也更加巧妙。

总结

本文主要讲了如下内容:

  • 介绍了 context 包的几个 Context 结构体,其中 emptyCtx 一般用作根 ContextvalueCtx 一般用作请求范围内的数据共享,而 cancelCtx 给开发者控制下游 Context 提供了一种很好的方式,timerCtxcancelCtx 的基础上加了一个定时器,时间到会发送取消信号。
  • timerCtx 有两种取消方式,一种是开发者手动取消,这个和 cancelCtx 一致,另外一种方法是到达 deadline 的时候,由定时器来取消。
  • go 里面 Context 取消的时候,是通过关闭 chan 的方式来让下游的 Context 感知的,因为 chan 的工作机制就是如果被关闭则调用 <-chan 会立即返回。
  • 创建 cancelCtx 的时候,会将派生的 Context 挂载到上游 Context 中第一个 cancelCtxchildren 上,这样在上游取消的时候,这个派生的 Context 可以感知得到。
  • 上游取消的信号会跨过中间的所有 valueCtx,传达到下游那些 cancelCtx,在取消的时候,Context 会从 Context 树中移除。
  • 如果开发者实现了自己的 Done 通道,并且返回的是跟底层 cancelCtx 中不一样的 done 通道,则会导致 go 底层启动一个协程来监测这个被覆盖的 done 通道。
  • value 也会从 Context 树中自底向上搜索,直到根节点。

在上一篇文章中,讲了很多跟 Context 相关的东西,我们也知道了 go 里面 Context 的一些比较常见的用法、使用场景,比如超时控制、变量共享等, 但是对于 go Context 本身还没有太多的讲解,可能看起来会有点费解,今天就来详细说说 Context 的设计以及其用法。

context.Context 模型

在开始之前,我们先来看看这张图,这张图涵盖了所有创建 context.Context 的方法:

首先,是最上层的 context.Background()context.TODO(),看过源码的同学应该知道,这两个方法返回的 Context 是一样的,都是 new(emptyCtx), 而这个 emptyCtx 其实是没有任何实际功能的,但是他们又是最重要的,因为创建 Context 只有这两个方法,其他的几个方法都是从这里创建的 Context 派生的。 我们一般会使用 context.Background() 来创建一个最顶级的 Context,比如,go 的 http 服务器中 request.Context() 方法的那个 Context 就是通过 context.Background() 创建的。 而 context.TODO() 往往用在需要 Context 的地方,但是我们还没确定使用一个什么样的 Context 的时候。

其次,中间的 context.Context 表示通过 context.Background() 或者 context.TODO() 方法创建的 Context。 这个 Context 往往就是一个请求中的根 Context,所有子协程里面的 Context 都是从这个 Context 派生的,又或者是直接使用了这个 Context

然后,从父 Context 创建新的 Context 的几个方法需要详细说一下:

  • WithCancel: 这个方法返回一个新的 Context,同时返回一个 CancelFunc,通过调用 CancelFunc,我们可以在子协程中的 context.Done() 方法接收到取消的信号,从而作出相应的操作(比如清理、中止执行等)。
  • WithDeadline: 这个方法也会返回一个新的 Context,同时也返回了一个 CancelFunc,本质上来说,这两个返回值跟 WithCancel 的两个返回值并无二致。我们通过 WithDeadline 返回的 CancelFunc 也是可以给子协程发送取消信号的。但是通过 WithDeadline 创建的 Context,会有一个定时器在运行,到了指定时间如果我们的子协程依然没有结束,同样也会收到取消的信号,这个定时器的作用就是在指定时间后执行 CancelFunc
  • WithTimeout: 这个其实跟 WithDeadline 是一样的,只是参数上有点不一样,最终效果都是在一定时间后发送取消信号。
  • WithValue: 这个方法只是返回一个带有我们传递变量的新的 Context,没有其他什么特别的功能了。

所以,除了基础的 context.Background()context.TODO(),对于怎么基于这两个基础的 Context 创建新的 Context,可以简单总结如下:

  • 如果我们只是想有一个机制可以取消子协程的执行,可以使用 WithCancel,拿到 CancelFunc 之后,在我们需要的时候调用 CancelFunc 就可以给子 Context 传递取消信号。
  • 如果我们想对子协程进行超时控制,可以使用 WithDeadline 或者 WithTimeout,这两个方法的本质上都是启动一个定时器,在到达一定时间后,会给子协程发送取消信号。但是除了定时器,它们还返回了一个 CancelFunc,这意味着我们在到达定时器指定的时间之前,也可以手动调用 CancelFunc 来发送取消信号。
  • 如果我们只是想给子协程传递一些数据,从而实现变量共享的话,可以使用 WithValue

实际使用中的 Context

我们再来看一张图,上面的描述可能会比较抽象,下面这个图展示了实际使用中的 Context

根结点的 Context 只有两种创建方式 context.Background() 或者 context.TODO(),在我们做一些 io 操作的时候,比如 rpc 调用,数据库查询等, 我们会需要做一些超时控制,这个时候我们就会需要新建一个有超时控制功能的 Context(使用 context.WithDeadline 或者 context.WithTimeout), 假设是上图的 child 2,然后 child 2 这个 Context 所在的 goroutine 里面也需要做一些 io 操作,然后也需要限制这些操作的超时时间, 然后在 child 2 的基础上再通过 context.WithTimeout 创建了一个新的 Context,假设是 child 2-2

需要注意的是,这里每一级都是一个新的 Context 实例,而不是在原有 Context 上增加或者修改其属性。

假设 child 2Deadline 到了,这个时候 child 2 的定时器会调用 CancelFunc 来给子 Context 发送取消信号。 child 2-2 里的 select 语句的 context.Done() 得以返回,从而开始执行清理操作,然后中止协程的执行。

在这个过程中,取消信号的传播是从上往下一级级有序传递的,每一级的 Context 会给那些从其派生的 Context 传传递取消信号,直到叶子结点。

需要注意的是,虽然信号传播是从上往下的,但是不代表子协程需要等待父协程的 context.Done() 里面的逻辑执行完再执行,因为我们之前也说过, 在 go 里面,协程是平等的,父子协程的执行是同时进行的。

我们可以看看下面的例子,有点啰嗦,大概看一下就好:

主要是想通过这个例子说明,在调用 CancelFunc 的时候,所有子孙 Context 都能接收到这个信号(当然它的父 Context 不会收到)。 这也跟我们实际的应用场景一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
"context"
"fmt"
"sync"
"time"
)

func main() {
var wg sync.WaitGroup
wg.Add(4)

ctx, cancel := context.WithCancel(context.Background())

go func(c context.Context) {
ctx1, _ := context.WithCancel(c)

go func(c1 context.Context) {
ctx2, _ := context.WithCancel(c1)

go func(c2 context.Context) {
select {
case <-c2.Done():
fmt.Println("ctx2 done.")
wg.Done()
}
}(ctx2)

select {
case <-c1.Done():
fmt.Println("ctx1 done.")
wg.Done()
}
}(ctx1)

select {
case <-c.Done():
fmt.Println("ctx1 done.")
wg.Done()
}
}(ctx)

// main ctx
go func() {
time.Sleep(time.Second)
// 父协程通过调用 CancelFunc 发送了取消信号
cancel()
wg.Done()
}()

wg.Wait()

// 输出:
// ctx2 done.
// ctx1 done.
// ctx1 done.
}

整个过程大概如下图:

实际使用中的 goroutine

在实际的场景中,goroutine 类似 Context,也是树状的结构,每一个协程都可以启动新的协程,同样子协程也可以启动新的协程,最终会如下图这样:

同样的,而在父协程里面通过 Context 发送取消信号的时候,所有子孙协程都能感知得到,所以虽然看起来这棵树可能变得有点庞大,但是也不是完全不可控的。

go 中监控协程的一个工具

我们现在直到了,go 的协程里面可以启动新的协程,最终可能会有非常多的协程,但是到底有多少呢?

对于这个问题,go 官方的标准库已经给我们提供了一个工具 net/http/pprof,具体使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"net/http"
_ "net/http/pprof"
"time"
)

func main() {
// 启动之后,在 localhost:6060 可以看到当前进程的一些指标,比如当前的协程有多少个
go func() {
http.ListenAndServe("localhost:6060", nil)
}()

ch := make(chan struct{}, 1)

go func() {
time.Sleep(time.Second * 120)
ch <- struct{}{}
}()

<-ch
}

通过 pprof 我们可以知道应用的健康状况,如协程数量等,这不是本文重点,不赘述了。

总结

本文主要讲述了如下内容:

  • 我们先是讲解了创建Context 的几种方式,其中,根 Context 只有两种创建方式,分别是 context.Background()context.TODO(),其他种类的 Context 可以通过 context.WithXXX() 创建。
  • 在 go 里面,如果我们只是想要取消一个协程,那么我们可以通过 WithCancel 来实现,如果要进行超时控制,可以使用 WithTimeoutWithDeadline
  • Context 是一个树状结构,每一个 Context 都可以作为父 Context 创建新的 Context,然后在调用 CancelFunc 或者超时的时候,会由父到子传递取消的信号。
  • Context 也可以用来传递参数,比如我们可以通过 WithValue 来传递参数,然后在子协程里面通过 Value 来获取参数。
  • 最后,我们讲解了如何通过 pprof 来监控协程的数量。