在上一篇文章《深入理解
go chan》中,我们讲解了 chan
相关的一些概念、原理等东西,
今天让我们再深入一下,读一下它的源码,看看底层实际上是怎么实现的。
整体设计
我们可以从以下三个角度看 chan
的设计(源码位于
runtime/chan.go
,结构体 hchan
就是
chan
的底层数据结构):
- 存储:
chan
里面的数据是通过一个环形队列来存储的(实际上是一个数组,但是我们视作环形队列来操作。无缓冲
chan
不用存储,会直接从 sender
复制到
receiver
)
- 发送:数据发送到
chan
的时候,如果 chan
满了,则会将发送数据的协程挂起,将其放入一个协程队列中,chan
空闲的时候会唤醒这个协程队列。如果 chan
没满,则发送队列为空。
- 接收:从
chan
中接收数据的时候,如果 chan
是空的,则会将接收数据的协程挂起,将其放入一个协程队列中,当
chan
有数据的时候会唤醒这个协程队列。如果 chan
有数据,则接收队列为空。
文中一些比较关键的名词解释:
sender
: 表示尝试写入 chan
的
goroutine
。
receiver
: 表示尝试从 chan
读取数据的
goroutine
。
sendq
是一个队列,存储那些尝试写入 channel
但被阻塞的 goroutine
。
recvq
是一个队列,存储那些尝试读取 channel
但被阻塞的 goroutine
。
g
表示一个协程。
gopark
是将协程挂起的函数,协程状态:_Grunning
=>
_Gwaiting
。
goready
是将协程改为可运行状态的函数,协程状态:
_Gwaiting
=> _Grunnable
。
现在,假设我们有下面这样的一段代码,通过这段代码,我们可以大概看一下
chan
的总体设计:
1 2 3 4 5 6 7 8 9 10 11 12
| package main
func main() { ch := make(chan int, 9) for i := 0; i < 7; i++ { ch <- i + 1 } <-ch }
|
现在,我们的 chan
大概长得像下面这个样子,后面会详细展开将这个图中的所有元素:
上图为了说明而在 recvq 和 sendq 都画了 3 个 G,但实际上 recvq 和
sendq
至少有一个为空。因为不可能有协程正在等待接收数据的时候,还有协程的数据因为发不出去数据而阻塞。
数据结构
在底层,go 是使用 hchan
这个结构体来表示
chan
的,下面是结构体的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq
lock mutex }
|
waitq
的数据结构如下:
1 2 3 4
| type waitq struct { first *sudog last *sudog }
|
waitq
用来保存阻塞在等待或接收数据的协程列表(是一个双向链表),在解除阻塞的时候,需要唤醒这两个队列中的数据。
对应上图各字段详细说明
hchan
,对于 hchan
这个结构体,我们知道,在
go 里面,结构体字段是存储在一段连续的内存上的(可以看看《深入理解
go unsafe》),所以图中用了连续的一段单元格表示。
下面是各字段说明:
qcount
: 写入 chan
缓冲区元素个数。我们的代码往 chan
中存入了 7
个数,然后从中取出了一个数,最终还剩 6
个,因此
qcount
是 6
。
dataqsiz
: hchan
缓冲区的长度。它在内存中是连续的一段内存,是一个数组,是通过
make
创建的时候传入的,是 9
。
buf
:hchan
缓冲区指针。指向了一个数组,这个数组就是用来保存发送到 chan
的数据的。
sendx
、recvx
:写、读操作的下标。指向了
buf
指向的数组中的下标,sendx
是下一个发送操作保存的下标,recvx
是下一个接收操作的下标。
recvq
、sendq
: 阻塞在 chan
读写上的协程列表。底层是双向链表,链表的元素是
sudog
(sudog
是一个对 g
的封装),我们可以简单地理解为 recvq
和 sendq
的元素就是 g
(协程)。
g 和 sudog 是什么?
上面提到了 g
和 sudog
,g
是底层用来表示协程的结构体,而 sudog
是对 g
的封装,记录了一些额外的信息,比如关联的 hchan
。
在 go 里面,协程调度的模型是 GMP
模型,G
代表协程、M
代表线程、P
表示协程调度器。我上图里面的 G
就是代表协程(当然,实际上是
sudog
)。 还有一个下面会提到的就是
g0
,g0
表示 P
上启动的第一个协程。
GMP
模型是另外一个庞大的话题了,大家可以自行去了解一下,对理解本文也很有好处。因为在
chan
阻塞的时候实际上也是一个协程调度的过程。
具体来说,就是从 g
的栈切换到 g0
的栈,然后重新进行协程调度。这个时候 g
因为从运行状态修改为了等待状态,所以在协程调度中不会将它调度来执行,
而是会去找其他可执行的协程来执行。
创建 chan
我们的 make(chan int, 9)
最终会调用
makechan
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| func makechan(t *chantype, size int) *hchan { elem := t.elem
if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") }
mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }
var c *hchan switch { case mem == 0: c = (*hchan)(mallocgc(hchanSize, nil, true)) case elem.ptrdata == 0: c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: c = new(hchan) c.buf = mallocgc(mem, elem, true) }
c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) }
|
创建 chan
的过程主要就是给 hchan
分配内存的过程:
- 非缓冲
chan
,只需要分配 hchan
结构体所需要的内存,无需分配环形队列内存(数据会直接从
sender
复制到 receiver
)
- 缓冲
chan
(不包含指针),分配 hchan
所需要的内存和环形队列所需要的内存,其中 buf
会紧挨着
hchan
- 缓冲
chan
(含指针),hchan
和环形队列所需要的内存单独进行分配
对应到文章开头的图就是,底下的 hchan
和 buf
那两段内存。
发送数据
<- 语法糖
在《深入理解
go chan》中,我们说也过,<-
这个操作符号是一种语法糖, 实际上,<-
会被编译成一个函数调用,对于发送操作而言,c <- x
会编译为对下面的函数的调用:
1 2 3 4 5
|
func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) }
|
另外,对于 select
里面的调用,chansend
会返回一个布尔值给 select
用来判断是否是要选中当前
case
分支。 如果 chan
发送成功,则返回
true
,则 select
的那个分支得以执行。(select...case
本质上是
if...else
,返回 false
表示判断失败。)
chansend 第二个参数的含义
chansend
第二个参数 true
表示是一个阻塞调用,另外一种是在 select
里面的发送操作,在
select
中的操作是非阻塞的。
1 2 3 4 5 6 7 8 9
| package main
func main() { ch := make(chan int, 2) ch <- 1 select { case ch <- 3: } }
|
在 select
中对 chan
的读写是非阻塞的,不会导致当前协程阻塞,如果是因为 chan
满或者空无法发送或接收, 则不会导致阻塞在 case
的某一个分支上,还可以继续判断其他 case
分支。
select
中的 send
实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) }
|
chansend 发送实现
- 发送到
nil chan
(select
中发送不阻塞,其他情况阻塞)
如果是在 select
的 case
里面发送,则不会阻塞,其他情况会导致当前 goroutine
挂起,永远阻塞:
示例代码:
1 2 3 4 5 6 7 8
| var ch chan int
ch <- 1 select {
case ch <- 3: }
|
- 发送到满了的
chan
(select
中发送不阻塞,其他情况阻塞)
对于无缓冲而且又没有
receiver
,或者是有缓冲但是缓冲满了的情况,发送也会阻塞(我们称其为
full
,也就是满了,满了的 chan
是放不下任何数据了的,所以就无法再往 chan
发送数据了):
receiver 表示等待从 chan 接收数据的协程。
对于满了的
chan
,什么时候可以再次发送呢?那就是有
receiver
接收数据的时候。chan
之所以会满就是因为没有 receiver
,也就是没有从
chan
接收数据的协程。
A. 对于无缓冲的 chan
,在满了的情况下,当有
receiver
来读取数据的时候,数据会直接从 sender
复制到 receiver
中:
B. 对于有缓冲,但是缓冲满了的情况(图中 chan
满了,并且有两个 g
正在等待写入 chan
):
这个发送过程大概如下:
receiver
从 chan
中获取到
chan
队头元素,然后 chan
的队头元素出队。
- 发送队列
sendq
对头元素出队,将其要发送的数据写入到
chan
缓冲中。最后,sendq
只剩下一个等待写入
chan
的 g
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package main
func main() { var ch1 = make(chan int) ch1 <- 1 select { case ch1 <- 1: }
var ch2 = make(chan int, 1) ch2 <- 1 go func() { ch2 <- 2 }() go func() { ch2 <- 3 }() select { case ch2 <- 4: } }
|
- 发送到有缓冲,但是缓冲还没满的
chan
(不阻塞,发送成功)
这种情况比较简单,就是将 sender
要发送的数据写入到
chan
缓冲区:
示例代码:
1 2 3
| var ch = make(chan int, 1)
ch <- 1
|
chansend 源码解读
阻塞模式下,在发送的过程中,如果遇到无法发送成功的情况,会调用
gopark
来将协程挂起,然后当前协程陷入阻塞状态。
非阻塞模式下(select
),在发送过程中,任何无法发送的情况,都会直接返回
false
,表示发送失败。

|
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") }
if !block && c.closed == 0 && full(c) { return false } lock(&c.lock)
if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
if !block { unlock(&c.lock) return false }
gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) KeepAlive(ep)
return true }
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
func full(c *hchan) bool { if c.dataqsiz == 0 { return c.recvq.first == nil } return c.qcount == c.dataqsiz }
|
接收数据
<- 语法糖
在发送数据的那一节我们提到了,ch <- x
编译之后,实际上是对 chansend1
的函数调用。同样的,在接收数据的时候, <-
这个操作符也会根据不同情况编译成不同的函数调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) }
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) { return chanrecv(c, elem, false) }
|
还需要再提醒一下的是:chan
关闭之后,并且
chan
缓冲区所有数据被接收完之后,received
才会是 false
,并不是一关闭 received
马上返回
false
chanrecv 函数 block
参数的含义
跟 chansend
中的 block
参数的作用一样,用来判断是否是 select
模式下的接收操作,如果是,则在需要阻塞的时候不会阻塞,取而代之的是直接返回。
chanrecv 接收数据实现
- 从
nil chan
接收(select
中接收不阻塞,其他情况阻塞)
从 nil chan
中读取的时候,如果是阻塞模式,会调用
gopark
将协程阻塞起来。
示例代码:
- 从空
chan
接收(select
中接收不阻塞,其他情况阻塞)
判断空的条件为:无缓冲并且没有等待发送数据的
g,或者有缓冲但是缓冲区无数据。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package main
func main() { var ch1 = make(chan int) <-ch1 select { case <-ch1:
}
var ch2 = make(chan int, 1) <-ch2 select { case <-ch2:
} }
|
- 从缓冲区满的
chan
接收(不会阻塞,这个时候
sendq
一定不为空)
这种情况不会阻塞,上面已经有图了,这里不再贴了。
- 从缓冲区不满的
chan
接收(不会阻塞)
示例代码:
1 2 3 4 5 6 7 8
| package main
func main() { var ch = make(chan int, 2) ch <- 1 <-ch }
|
chanrecv 源码解读
chanrecv
函数:
- 参数:
c
是 chan
实例,ep
是用来接收数据的指针,block
表示是否是阻塞模式。
- 返回值:
selected
表示 select
语句的
case
是否被选中,received
表示接收到的值是否有效。
- 功能:从
c
这个通道接收数据,同时将接收到的数据写入到
ep
里。
概览:
ep
可能是
nil
,这意味着接收到的值被忽略了(对应 <-c
这种形式的接收)。
- 如果是非阻塞模式,并且通道无数据,返回
(false, false)
,也就是 select
语句中的
case
不会被选中。
- 否则,如果
c
关闭了,会对 ep
指向的地址设置零值,然后返回 (true, false)
。如果是
select
语句,意味被选中,
- 但是
received
为 false
表明返回的数不是通道关闭之前发送的。
- 否则,将从通道中获取到的值写入
ep
指向的地址,并且返回
(true, true)
- 一个非
nil
的 ep
必须指向堆或者调用者的栈。

| func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
if !block && empty(c) { if atomic.Load(&c.closed) == 0 { return } if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } }
lock(&c.lock)
if c.closed != 0 { if c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } else { if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } }
if c.qcount > 0 { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
if !block { unlock(&c.lock) return false, false }
gp := getg() mysg := acquireSudog() c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
return true, success }
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if ep != nil { recvDirect(c.elemtype, sg, ep) } } else { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx } sg.elem = nil gp := sg.g unlockf() goready(gp, skip+1) }
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
|
关闭 chan
chan
关闭的过程比较简单,修改 closed
为
1,然后唤醒发送队列和接收队列里面的 g
,如果发送队列有
g
,被唤醒之后会
panic
,因为不能往一个已经关闭的 chan
发送数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) }
lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
c.closed = 1
var glist gList
for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } glist.push(gp) }
for { sg := c.sendq.dequeue() if sg == nil { break } glist.push(gp) } unlock(&c.lock)
for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
|
对于实际开发的作用
在上一篇文章和本文中,花了很大的篇幅来讲述 chan
的设计、实现与使用,这么多东西对我们有什么用呢?
其中非常重要的一个作用是,清楚地了解 chan
的工作机制,便于我们对程序实际运行情况进行分析,
尤其是一些非常隐晦的读写 chan
场景,毕竟稍有不慎就会导致协程泄漏,这对进程影响可能是非常大的。
比如下面的这种代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package main
import ( "fmt" "runtime" "time" )
func main() { for i := 0; i < 10000; i++ { time.Sleep(time.Second) go func() { var ch chan int ch <- 1 }() fmt.Printf("goroutine count: %d\n", runtime.NumGoroutine()) } time.Sleep(time.Hour) }
|
tips:在 chan
读写的地方需要注意自己的写法会不会让
goroutine 永远陷入阻塞,或者长时间阻塞。
总结
chan
底层是 hchan
结构体。
- go 语法里面的
<-
不过是语法糖,在编译的时候,会编译成 hchan
相关的方法调用。最终都会调用 chansend
或者
chanrecv
。select...case
里面的
chan
读写最终也会编译为对 chansend
或
chanrecv
的调用。
chan
总体设计:维护了三个队列:
hchan.buf
: chan
中暂存 sender
发送数据的队列(在有 receiver
读取的时候会从这个队列中复制到 receiver
中)
hchan.recvq
: 接收队列,存储那些尝试读取
channel
但被阻塞的 goroutine
。
hchan.sendq
: 发送队列,存储那些尝试写入
channel
但被阻塞的 goroutine
。
- 读写
chan
的协程阻塞是通过 gopark
实现的,而从阻塞态转换为可运行状态是通过 goready
实现的。
- 在
chan
读写操作阻塞的时候,如果是在
select
语句中,则会直接返回(表示当前的分支没有被选中),否则,会调用
gopark
挂起当前协程。
- 在关闭
chan
的时候,会调用 goready
唤醒阻塞在发送或者接收操作上的 g
(协程)。
- 无缓冲
chan
的操作有点特殊,对于无缓冲
chan
,必须同时有 sender
和
receiver
才能发送和接收成功,否则另一边都会陷入阻塞(当然,select
不会阻塞)。