目录
- 背景
- go版本
- selectgo函数解释
- 【1】函数参数解释
- 【2】函数具体解释
- 第一步:遍历pollorder,选出准备好的case
- 第二步:将当前goroutine放到所有case通道中对应的收发队列上
- 第三步:唤醒groutine
- 总结
背景
select多路复用在go的异步和并发控制场景中非常好用,对于无case和只有单个case的情况,编译器在编译的时候就会对其做优化,无case就相当于调用了一个阻塞函数,单个case就相当于对一个通道进行读写操作,如果单个case中有default分支时,就相当于是一个if else逻辑,对于多个case的情况,是在运行时调用selectgo函数决定的,接下来我们就来研究一下selectgo函数。
go版本
$ go version go version go1.21.4 Windows/386
selectgo函数解释
【1】函数参数解释
selectgo函数位于:src/runtime/select.go中,定义如下:
//cas0:case数组地址,按照往通道写数据在前,从通道读数据在后的排列顺序(编译时编译器优化行为操作的) //nsends:往通道写数据的case数量 //nrecvs:从通道读数据的case数量 //block:是否阻塞 //返回值分别代表选中规定case位置和是否成功从通道接收数据,如果选中的是default,第一个返回值就返回-1 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)
select中每一个case都对应一个scase结构,定义如下:
type scase struct { c *hchan //case对应的读或写通道 elem unsafe.Pointer //指向要写入元素或存放读取元素js的地址 }
【2】函数具体解释
selectgo函数中会遍历所有的case,为确保遍历case的随机性和安全性,有两个关键的顺序:pollorder和lockorder,不用关心其具体实现,明白其的作用就行。
pollorder
:随机的case顺序,确保公平的处理每一个case。lockorder
:加锁的case顺序,确保并发安全。
计算出pollorder和lockorder顺序之后,会根据这2个顺序进行遍历分为了3步。
第一步:遍历pollorder,选出准备好的case
第一部分的代码如下:
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... var casi int //准备好的case位置 var cas *scase //case对象 var caseSuccess bool var caseReleaseTime int64 = -1 var recvOK bool //如果是从通道读取数据,是否读取成功 for _, casei := range pollorder { //遍历随机顺序的case casi = int(casei) //case的位置 cas = &scases[casi] //case对象 c = cas.c //case通道 if casi >= nsends { //前面讲过,写通道在前,读通道在后,所以这里是读通道case sg = c.sendq.dequeue() //取出往读通道写数据的协程队列中的第一个协程 if sg != nil { //如果存在往通道写数据的协程 goto recv //从往通道写数据的协程中读取数据并返回case位置和读取结果 } if c.qcount > 0 { //如果缓冲区还有数据 goto bufrecv //从缓冲区读取数据并返回case位置和读取结果 } if c.closed != 0 { //如果通道已关闭 goto rclose //释放相关资源 } } else { //写通道的case if raceenabled { racereadpc(c.raceaddr(), casePC(casi), chansendpc) } if c.closed != 0 { //如果通道已经关闭 goto sclose //直接panic } sg = c.recvq.dequeue() //从正在往通道读数据的协程队列中取得第一个 if sg != nil { //如果往通道读数据的协程存在 goto send //发送数据到读通道的协程 } if c.qcount < c.dataqsiz { //缓冲区还有位置 goto bufsend } } } if !block { //如果不阻塞,也就是带default分支 selunlock(scases, lockorder) casi = -1 //case位置为-1 goto retc //直接返回,不用进入下一步 } ... }
bufrecv标签:
bufrecv: recvOK = true //返回读数据成功 qp = chanbuf(c, c.recvx) //缓冲区中要读取数据的地址 if cas.elem != nil { typedmjsemmove(c.elemtype, cas.elem, qp) //将读取的缓冲区数据拷贝到case中的elem位置 } typedmemclr(c.elemtype, qp) //清理缓冲区被读的数据 c.recvx++ //读取缓冲区的位置+1 if c.recvx == c.dataqsiz { //下一个要读取缓冲区的位置如果等于缓冲区大小就编程将下次要读取的缓冲区位置置为0 c.recvx = 0 } c.qcount-- //缓冲区中元素个数-1 selunlock(scases, lockorder) goto retc
bufsend标签:
bufsend: typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) //将case中要写入的元素写到缓冲区 c.sendx++ //写入缓冲区的位置+1 if c.sendx == c.dataqsiz { //如果下次要写入缓冲区的位置等于缓冲区的大小就将缓冲区写入位置置为开头 c.sendx = 0 } c.qcount++ //缓冲区元素数量+1 selunlock(scases, lockorder) goto retc
recv标签:
recv: recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) //从写通道的协程读取数据 if debugSelect { print("syncrecv: cas0=", cas0, " c=", c, "\n") } recvOK = true //返回成功读取 goto retc
rclose标签:
rclose: selunlock(scases, lockorder) recvOK = false //从通道中读取数据失败 if cas.elem != nil { typedmemclr(c.elemtype, cas.elem) //释放case中元素的空间 } if raceenabled { raceacquire(c.raceaddr()) } goto retc
send标签:
send: send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) //发送数据到往通道读数据的协程 if debugSelect { print("syncsend: cas0=", cas0, " c=", cwww.devze.com, "\n") } goto retc
retc标签:
retc: if caseReleaseTime > 0 { blockevent(caseReleaseTime-t0, 1) } return casi, recvOK //返回case位置和是否从通道成功读取数据
sclose标签:
sclose: selunlock(scases, lockorder) panic(plainError("send on closed channel"))
上面就是selectgo函数第一部分的逻辑,第一部分就是遍历一个随机的case顺序,如果有符合条件的case就返回case的位置并且返回读数据的结果,如果没有case符合条件但是有default分支就返回-1,如果没default分支就进入下一步。
第二步:将当前goroutine放到所有case通道中对应的收发队列上
第二部分的代码如下:
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... gp = getg() //获取当前协程 if gp.waiting != nil { throw("gp.waiting != nil") } nextp = &gp.waiting for _, casei := range lockorder { //按照对case加锁的顺序遍历case casi = int(casei) //case的位置 cas = &scases[casi] //case对象 c = cas.c //case对象中的通道 sg := acquireSudog() //初始化一个协程等待结构 sg.g = gp //协程等待结构绑定协程 sg.isSelect = true //表示该协程等待结构与select操作相关 sg.elem = cas.elem sg.releasetime = 0 if t0 != 0 { sg.releasetime = -1 } sg.c = c *nextp = sg nextp = &sg.waitlink if casi < nsends { //如果case上是往通道写数据,就将绑定当前协程的等待对象插入当前case通道的发送队列中 c.sendq.enqueue(sg) } else { //如果case上是往通道读数据,就将绑定当前协程的等待对象插入当前case通道的接收队列中 c.recvq.enqueue(sg) } } ... }
第二部分就是将当前协程放到每个case中的通道对应的收发队列中去。
第三步:唤醒groutine
第三部分代码如下:
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { ... sg = (*sudog)(gp.param) //被唤醒的协程等待结构 gp.param = nil casi = -1 //case位置 cas = nil //case对象 caseSuccess = false sglist = gp.waiting //lockorder顺序的协程等待结构队列,这里是队列中的第一个协程等待结构 for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { //清空协程等待结构队列中元素便于进行垃圾回收 sg1.isSelect = false sg1.elem = nil sg1.c = nil } gp.waiting = nil for _, casei := range lockorder { //根据对case的加锁顺序进行遍历 k = &scases[casei] //当前case if sg == sglist { //唤醒的协程等待结构是当前case的 casi = int(casei) //唤醒的case位置 cas = k //唤醒的case对象 caseSuccess = sglist.success //往通道读取或写数据结果 if sglist.releasetime > 0 { caseReleaseTime = sglist.releasetime } } else { //唤醒的协程等待结构不是当前case的 c = k.c if int(casei) < nsends { //case为发送通道,就是释放当前case通道里sendq队列的协程等待结构对象 c.sendq.dequeueSudoG(sglist) } else { //case为读取通道,就是释放当前case通道里recvq队列的协程等待结构对象 c.recvq.dequeueSudoG(sglist) } } sgnext = sglist.waitlink //下一个协程等待结构 sglist.waitlink = nil releaseSudog(sglist) //释放上一个协程等待结构 sglist = sgnext } ... }
第三部分就是某一个case上的协程等待结构被唤醒时,会先执行通道上对应的收发操作, 然后去将所有case上的协程等待结构释放掉。
javascript总结
select虽然使用起来简单,但其实现逻辑还是比较复杂的,通过熟悉其实现,我们能理解对多个通道进行操作时候,可以为每一个通道创建一个协程去操作,这无疑增加了GC开销,但是使用select采用了多路复用的思想,将一个协程绑定在多个协程等待对象上,而且对case使用了随机顺序,确保每一个case都能公平的被执行。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论