在上一篇文章《深入理解
go chan》中,我们讲解了 chan
相关的一些概念、原理等东西,
今天让我们再深入一下,读一下它的源码,看看底层实际上是怎么实现的。
整体设计
我们可以从以下三个角度看 chan
的设计(源码位于
runtime/chan.go
,结构体 hchan
就是
chan
的底层数据结构):
- 存储:
chan
里面的数据是通过一个环形队列来存储的(实际上是一个数组,但是我们视作环形队列来操作。无缓冲
chan
不用存储,会直接从 sender
复制到
receiver
)
- 发送:数据发送到
chan
的时候,如果 chan
满了,则会将发送数据的协程挂起,将其放入一个协程队列中,chan
空闲的时候会唤醒这个协程队列。如果 chan
没满,则发送队列为空。
- 接收:从
chan
中接收数据的时候,如果 chan
是空的,则会将接收数据的协程挂起,将其放入一个协程队列中,当
chan
有数据的时候会唤醒这个协程队列。如果 chan
有数据,则接收队列为空。
文中一些比较关键的名词解释:
sender
: 表示尝试写入 chan
的
goroutine
。
receiver
: 表示尝试从 chan
读取数据的
goroutine
。
sendq
是一个队列,存储那些尝试写入 channel
但被阻塞的 goroutine
。
recvq
是一个队列,存储那些尝试读取 channel
但被阻塞的 goroutine
。
g
表示一个协程。
gopark
是将协程挂起的函数,协程状态:_Grunning
=>
_Gwaiting
。
goready
是将协程改为可运行状态的函数,协程状态:
_Gwaiting
=> _Grunnable
。
现在,假设我们有下面这样的一段代码,通过这段代码,我们可以大概看一下
chan
的总体设计:
1 2 3 4 5 6 7 8 9 10 11 12
| package main
func main() { ch := make(chan int, 9) for i := 0; i < 7; i++ { ch <- i + 1 } <-ch }
|
现在,我们的 chan
大概长得像下面这个样子,后面会详细展开将这个图中的所有元素:
上图为了说明而在 recvq 和 sendq 都画了 3 个 G,但实际上 recvq 和
sendq
至少有一个为空。因为不可能有协程正在等待接收数据的时候,还有协程的数据因为发不出去数据而阻塞。
数据结构
在底层,go 是使用 hchan
这个结构体来表示
chan
的,下面是结构体的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq
lock mutex }
|
waitq
的数据结构如下:
1 2 3 4
| type waitq struct { first *sudog last *sudog }
|
waitq
用来保存阻塞在等待或接收数据的协程列表(是一个双向链表),在解除阻塞的时候,需要唤醒这两个队列中的数据。
对应上图各字段详细说明
hchan
,对于 hchan
这个结构体,我们知道,在
go 里面,结构体字段是存储在一段连续的内存上的(可以看看《深入理解
go unsafe》),所以图中用了连续的一段单元格表示。
下面是各字段说明:
qcount
: 写入 chan
缓冲区元素个数。我们的代码往 chan
中存入了 7
个数,然后从中取出了一个数,最终还剩 6
个,因此
qcount
是 6
。
dataqsiz
: hchan
缓冲区的长度。它在内存中是连续的一段内存,是一个数组,是通过
make
创建的时候传入的,是 9
。
buf
:hchan
缓冲区指针。指向了一个数组,这个数组就是用来保存发送到 chan
的数据的。
sendx
、recvx
:写、读操作的下标。指向了
buf
指向的数组中的下标,sendx
是下一个发送操作保存的下标,recvx
是下一个接收操作的下标。
recvq
、sendq
: 阻塞在 chan
读写上的协程列表。底层是双向链表,链表的元素是
sudog
(sudog
是一个对 g
的封装),我们可以简单地理解为 recvq
和 sendq
的元素就是 g
(协程)。
g 和 sudog 是什么?
上面提到了 g
和 sudog
,g
是底层用来表示协程的结构体,而 sudog
是对 g
的封装,记录了一些额外的信息,比如关联的 hchan
。
在 go 里面,协程调度的模型是 GMP
模型,G
代表协程、M
代表线程、P
表示协程调度器。我上图里面的 G
就是代表协程(当然,实际上是
sudog
)。 还有一个下面会提到的就是
g0
,g0
表示 P
上启动的第一个协程。
GMP
模型是另外一个庞大的话题了,大家可以自行去了解一下,对理解本文也很有好处。因为在
chan
阻塞的时候实际上也是一个协程调度的过程。
具体来说,就是从 g
的栈切换到 g0
的栈,然后重新进行协程调度。这个时候 g
因为从运行状态修改为了等待状态,所以在协程调度中不会将它调度来执行,
而是会去找其他可执行的协程来执行。
创建 chan
我们的 make(chan int, 9)
最终会调用
makechan
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| func makechan(t *chantype, size int) *hchan { elem := t.elem
if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") }
mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }
var c *hchan switch { case mem == 0: c = (*hchan)(mallocgc(hchanSize, nil, true)) case elem.ptrdata == 0: c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: c = new(hchan) c.buf = mallocgc(mem, elem, true) }
c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) }
|
创建 chan
的过程主要就是给 hchan
分配内存的过程:
- 非缓冲
chan
,只需要分配 hchan
结构体所需要的内存,无需分配环形队列内存(数据会直接从
sender
复制到 receiver
)
- 缓冲
chan
(不包含指针),分配 hchan
所需要的内存和环形队列所需要的内存,其中 buf
会紧挨着
hchan
- 缓冲
chan
(含指针),hchan
和环形队列所需要的内存单独进行分配
对应到文章开头的图就是,底下的 hchan
和 buf
那两段内存。
发送数据
<- 语法糖
在《深入理解
go chan》中,我们说也过,<-
这个操作符号是一种语法糖, 实际上,<-
会被编译成一个函数调用,对于发送操作而言,c <- x
会编译为对下面的函数的调用:
1 2 3 4 5
|
func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) }
|
另外,对于 select
里面的调用,chansend
会返回一个布尔值给 select
用来判断是否是要选中当前
case
分支。 如果 chan
发送成功,则返回
true
,则 select
的那个分支得以执行。(select...case
本质上是
if...else
,返回 false
表示判断失败。)
chansend 第二个参数的含义
chansend
第二个参数 true
表示是一个阻塞调用,另外一种是在 select
里面的发送操作,在
select
中的操作是非阻塞的。
1 2 3 4 5 6 7 8 9
| package main
func main() { ch := make(chan int, 2) ch <- 1 select { case ch <- 3: } }
|
在 select
中对 chan
的读写是非阻塞的,不会导致当前协程阻塞,如果是因为 chan
满或者空无法发送或接收, 则不会导致阻塞在 case
的某一个分支上,还可以继续判断其他 case
分支。
select
中的 send
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) }
|
chansend 发送实现
- 发送到
nil chan
(select
中发送不阻塞,其他情况阻塞)
如果是在 select
的 case
里面发送,则不会阻塞,其他情况会导致当前 goroutine
挂起,永远阻塞:
示例代码:
1 2 3 4 5 6 7 8
| var ch chan int
ch <- 1 select {
case ch <- 3: }
|
- 发送到满了的
chan
(select
中发送不阻塞,其他情况阻塞)
对于无缓冲而且又没有
receiver
,或者是有缓冲但是缓冲满了的情况,发送也会阻塞(我们称其为
full
,也就是满了,满了的 chan
是放不下任何数据了的,所以就无法再往 chan
发送数据了):
receiver 表示等待从 chan 接收数据的协程。
对于满了的
chan
,什么时候可以再次发送呢?那就是有
receiver
接收数据的时候。chan
之所以会满就是因为没有 receiver
,也就是没有从
chan
接收数据的协程。
A. 对于无缓冲的 chan
,在满了的情况下,当有
receiver
来读取数据的时候,数据会直接从 sender
复制到 receiver
中:
B. 对于有缓冲,但是缓冲满了的情况(图中 chan
满了,并且有两个 g
正在等待写入 chan
):
这个发送过程大概如下:
receiver
从 chan
中获取到
chan
队头元素,然后 chan
的队头元素出队。
- 发送队列
sendq
对头元素出队,将其要发送的数据写入到
chan
缓冲中。最后,sendq
只剩下一个等待写入
chan
的 g
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package main
func main() { var ch1 = make(chan int) ch1 <- 1 select { case ch1 <- 1: }
var ch2 = make(chan int, 1) ch2 <- 1 go func() { ch2 <- 2 }() go func() { ch2 <- 3 }() select { case ch2 <- 4: } }
|
- 发送到有缓冲,但是缓冲还没满的
chan
(不阻塞,发送成功)
这种情况比较简单,就是将 sender
要发送的数据写入到
chan
缓冲区:
示例代码:
1 2 3
| var ch = make(chan int, 1)
ch <- 1
|
chansend 源码解读
阻塞模式下,在发送的过程中,如果遇到无法发送成功的情况,会调用
gopark
来将协程挂起,然后当前协程陷入阻塞状态。
非阻塞模式下(select
),在发送过程中,任何无法发送的情况,都会直接返回
false
,表示发送失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") }
if !block && c.closed == 0 && full(c) { return false } lock(&c.lock)
if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
if !block { unlock(&c.lock) return false }
gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) KeepAlive(ep)
return true }
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
func full(c *hchan) bool { if c.dataqsiz == 0 { return c.recvq.first == nil } return c.qcount == c.dataqsiz }
|
接收数据
<- 语法糖
在发送数据的那一节我们提到了,ch <- x
编译之后,实际上是对 chansend1
的函数调用。同样的,在接收数据的时候, <-
这个操作符也会根据不同情况编译成不同的函数调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) }
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) { return chanrecv(c, elem, false) }
|
还需要再提醒一下的是:chan
关闭之后,并且
chan
缓冲区所有数据被接收完之后,received
才会是 false
,并不是一关闭 received
马上返回
false
chanrecv 函数 block
参数的含义
跟 chansend
中的 block
参数的作用一样,用来判断是否是 select
模式下的接收操作,如果是,则在需要阻塞的时候不会阻塞,取而代之的是直接返回。
chanrecv 接收数据实现
- 从
nil chan
接收(select
中接收不阻塞,其他情况阻塞)
从 nil chan
中读取的时候,如果是阻塞模式,会调用
gopark
将协程阻塞起来。
示例代码:
- 从空
chan
接收(select
中接收不阻塞,其他情况阻塞)
判断空的条件为:无缓冲并且没有等待发送数据的
g,或者有缓冲但是缓冲区无数据。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package main
func main() { var ch1 = make(chan int) <-ch1 select { case <-ch1:
}
var ch2 = make(chan int, 1) <-ch2 select { case <-ch2:
} }
|
- 从缓冲区满的
chan
接收(不会阻塞,这个时候
sendq
一定不为空)
这种情况不会阻塞,上面已经有图了,这里不再贴了。
- 从缓冲区不满的
chan
接收(不会阻塞)
示例代码:
1 2 3 4 5 6 7 8
| package main
func main() { var ch = make(chan int, 2) ch <- 1 <-ch }
|
chanrecv 源码解读
chanrecv
函数:
- 参数:
c
是 chan
实例,ep
是用来接收数据的指针,block
表示是否是阻塞模式。
- 返回值:
selected
表示 select
语句的
case
是否被选中,received
表示接收到的值是否有效。
- 功能:从
c
这个通道接收数据,同时将接收到的数据写入到
ep
里。
概览:
ep
可能是
nil
,这意味着接收到的值被忽略了(对应 <-c
这种形式的接收)。
- 如果是非阻塞模式,并且通道无数据,返回
(false, false)
,也就是 select
语句中的
case
不会被选中。
- 否则,如果
c
关闭了,会对 ep
指向的地址设置零值,然后返回 (true, false)
。如果是
select
语句,意味被选中,
- 但是
received
为 false
表明返回的数不是通道关闭之前发送的。
- 否则,将从通道中获取到的值写入
ep
指向的地址,并且返回
(true, true)
- 一个非
nil
的 ep
必须指向堆或者调用者的栈。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
| func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
if !block && empty(c) { if atomic.Load(&c.closed) == 0 { return } if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } }
lock(&c.lock)
if c.closed != 0 { if c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } else { if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } }
if c.qcount > 0 { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
if !block { unlock(&c.lock) return false, false }
gp := getg() mysg := acquireSudog() c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
return true, success }
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if ep != nil { recvDirect(c.elemtype, sg, ep) } } else { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx } sg.elem = nil gp := sg.g unlockf() goready(gp, skip+1) }
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
|
关闭 chan
chan
关闭的过程比较简单,修改 closed
为
1,然后唤醒发送队列和接收队列里面的 g
,如果发送队列有
g
,被唤醒之后会
panic
,因为不能往一个已经关闭的 chan
发送数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) }
lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
c.closed = 1
var glist gList
for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } glist.push(gp) }
for { sg := c.sendq.dequeue() if sg == nil { break } glist.push(gp) } unlock(&c.lock)
for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
|
对于实际开发的作用
在上一篇文章和本文中,花了很大的篇幅来讲述 chan
的设计、实现与使用,这么多东西对我们有什么用呢?
其中非常重要的一个作用是,清楚地了解 chan
的工作机制,便于我们对程序实际运行情况进行分析,
尤其是一些非常隐晦的读写 chan
场景,毕竟稍有不慎就会导致协程泄漏,这对进程影响可能是非常大的。
比如下面的这种代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package main
import ( "fmt" "runtime" "time" )
func main() { for i := 0; i < 10000; i++ { time.Sleep(time.Second) go func() { var ch chan int ch <- 1 }() fmt.Printf("goroutine count: %d\n", runtime.NumGoroutine()) } time.Sleep(time.Hour) }
|
tips:在 chan
读写的地方需要注意自己的写法会不会让
goroutine 永远陷入阻塞,或者长时间阻塞。
总结
chan
底层是 hchan
结构体。
- go 语法里面的
<-
不过是语法糖,在编译的时候,会编译成 hchan
相关的方法调用。最终都会调用 chansend
或者
chanrecv
。select...case
里面的
chan
读写最终也会编译为对 chansend
或
chanrecv
的调用。
chan
总体设计:维护了三个队列:
hchan.buf
: chan
中暂存 sender
发送数据的队列(在有 receiver
读取的时候会从这个队列中复制到 receiver
中)
hchan.recvq
: 接收队列,存储那些尝试读取
channel
但被阻塞的 goroutine
。
hchan.sendq
: 发送队列,存储那些尝试写入
channel
但被阻塞的 goroutine
。
- 读写
chan
的协程阻塞是通过 gopark
实现的,而从阻塞态转换为可运行状态是通过 goready
实现的。
- 在
chan
读写操作阻塞的时候,如果是在
select
语句中,则会直接返回(表示当前的分支没有被选中),否则,会调用
gopark
挂起当前协程。
- 在关闭
chan
的时候,会调用 goready
唤醒阻塞在发送或者接收操作上的 g
(协程)。
- 无缓冲
chan
的操作有点特殊,对于无缓冲
chan
,必须同时有 sender
和
receiver
才能发送和接收成功,否则另一边都会陷入阻塞(当然,select
不会阻塞)。