Channel在Go中运用
basic usage of channel
使用channel发送接收
package main import "fmt" func main() { jobs := make(chan int, 5) done := make(chan bool) go func() { for { j, more := <-jobs if more { fmt.Println("received job", j) } else { fmt.Println("received all jobs") done <- true return } } }() for j := 1; j <= 3; j++ { jobs <- j fmt.Println("sent job", j) } fmt.Println("sent all jobs") close(jobs) <-done }
程序结果
sent job 1 sent job 2 sent job 3 sent all jobs received job 1 received job 2 received job 3 received all jobs
往一个已关闭的channel中发送数据
package main import ( "log" ) func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) defer func() { err := recover() if err != nil { log.Println(err) } }() done := make(chan bool) close(done) done <- true }
往一个已经关闭的channel中发送数据会panic
往一个已关闭的channel中读取数据
package main import ( "log" ) func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) defer func() { err := recover() if err != nil { log.Println(err) } }() done := make(chan bool) close(done) ret := <-done log.Println(ret) }
向一个已关闭的channel中读取数据,会马上返回channel中的值(如果没有即类型初值),不会panic
package main import ( "log" ) func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) defer func() { err := recover() if err != nil { log.Println(err) } }() done := make(chan bool, 3) done <- true close(done) ret := <-done log.Println(ret) }
chan原理实现
chan结构
src/runtime/chan.go
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex } type waitq struct { first *sudog last *sudog }
buf指向dataqsiz元素数组,指向环形队列;qcount是队列中的总数据;datasiz是环形队列的大小;如果是带缓冲区的chan,则缓冲区实际是紧跟着hchan结构体分配的
创建chan
chan由make创建,Go语言在编译期间的类型检查阶段将代表make关键字的OMAKE节点根据参数类型的不同转换成了OMAKESLICE、OMAKEMAP和OMAKECHAN三种不同类型的节点,这些节点会调用不同的运行时函数来初始化相应的数据结构。而chan类型则调用 makechan() ,如下(src/runtime/chan.go)
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }
声明一个hchan指针,如果是无缓冲chan,使用mallocgc分配;如果chan元素类型为指针,同时为chan和chan元素分配一块连续的内存;默认使用new分配chan,用mallocgc分配chan元素
发送chan
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, "\n") } if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not closed, we observe that the channel is // not ready for sending. Each of these observations is a single word-sized read // (first c.closed and second full()). // Because a closed channel cannot transition from 'ready for sending' to // 'not ready for sending', even if the channel is closed between the two observations, // they imply a moment between the two when the channel was both not yet closed // and not ready for sending. We behave as if we observed the channel at that moment, // and report that the send cannot proceed. // // It is okay if the reads are reordered here: if we observe that the channel is not // ready for sending and then observe that it is not closed, that implies that the // channel wasn't closed during the first observation. However, nothing here // guarantees forward progress. We rely on the side effects of lock release in // chanrecv() and closechan() to update this thread's view of c.closed and full(). if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true }
发送时,先进行加锁,判断recvq队列是否有receivers,如果有则直接将元素拷贝到接收者的go协程的mem中,使用goready唤醒接收者所在协程
如果是recvq队列没有接收者,有缓存队列首先计算一个可以放置待处理变量的位置,然后通过typedmemmove将元素拷贝到所在位置,更新sendx和qcount,释放锁
无缓冲队列阻塞发送操作: 调用getg获取发送操作的协程;执行acquireSudog获取一个sudog结构体并设置这一次阻塞发送的相关信息;在当前channel的sendq队列中将刚刚创建并初始化的sudog结构体加入等待队列;调用goparkunlock函数将当前的goroutine更新成gwaiting状态并解锁,该goroutine可以被调用goready再次唤醒;当前的gouroutine在此陷入阻塞状态
梳理下向channel发送数据时遇到的几种情况:
- 如果当前channel的recvq上已存在被阻塞的goroutine,那么会直接将数据发送给当前的goroutine并将其设置成下一个运行的协程;
- 如果channel存在缓冲区并且其中还有空闲的容量,我们就会直接将数据存储到当前缓冲区sendx所在位置
- 如果都不满足上面的两种情况,就会创建一个sudog结构并加入channel的sendq队列,同时当前的goroutine就会陷入阻塞等待其它的协程向channel中发送数据
chan接收
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // raceenabled: don't need to check ep, as it is always on the stack // or is new memory allocated by reflect. if debugChan { print("chanrecv: chan=", c, "\n") } if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. if !block && empty(c) { // After observing that the channel is not ready for receiving, we observe whether the // channel is closed. // // Reordering of these checks could lead to incorrect behavior when racing with a close. // For example, if the channel was open and not empty, was closed, and then drained, // reordered reads could incorrectly indicate "open and empty". To prevent reordering, // we use atomic loads for both checks, and rely on emptying and closing to happen in // separate critical sections under the same lock. This assumption fails when closing // an unbuffered channel with a blocked send, but that is an error condition anyway. if atomic.Load(&c.closed) == 0 { // Because a channel cannot be reopened, the later observation of the channel // being not closed implies that it was also not closed at the moment of the // first observation. We behave as if we observed the channel at that moment // and report that the receive cannot proceed. return } // The channel is irreversibly closed. Re-check whether the channel has any pending data // to receive, which could have arrived between the empty and closed checks above. // Sequential consistency is also required here, when racing with such a send. if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
接收时,先进行枷锁,判断sendq队列是否有senders,如果有则直接将send队列中的拷贝到接收者目标内存地址中(如果当前channel已经满了,通过typedmemmove将队列中的数据拷贝到接收方的内存地址中并将发送方的数据拷贝到队列中,释放一个发送方协程)。
与select语句结合使用时可能会使用到非阻塞block=false的接收操作
梳理下向channel接收数据时遇到的几种情况:
- 如果channel是空的,那么会直接调用gopark挂起当前的goroutine
- 如果channel已经关闭并且缓冲区没有任何数据,chanrecv函数就会直接返回
- 如果channel的sendq队列中存在观其的goroutine,就会将recvx索引所在数据拷贝到接收变量所在的内存空间上并将sendq队列中goroutine的数据拷贝到缓冲区中
- 如果channel的缓冲区包含数据就会直接从recvx所在索引上进行读取
- 在默认情况下会直接挂起当前的goroutine,将sudog结构加入recvq队列并等待调度器唤醒
problem
chan结构为何是环形队列而不是队列,有什么优势?
循环队列的出队操作时间为O(1),而队列为O(n)