Skip to main content

go channel 实现

· 11 min read
Softwore Developer

Channelgo之间通信的一种方式,从实现上说,channel是一个链表的数据结构,可以将数据放入和读取。

Channelgo之间通信的一种方式,从实现上说,channel是一个链表的数据结构,可以将数据放入和读取。

基本使用

channel分为有缓冲区的和无缓冲区的通道。

原理

channel就是通过一个共享队列来做协程之间的数据传递的,是一种数据结构,低层还是通过锁来控制数据之间的安全;和我们自己实现的差别,加入了协程挂起和通知的功能,使得协程更加的快速。

发送消息时:

channel已经关闭,那就不能发。panic掉。 看一下有没有阻塞在读操作上的goroutine,有的话取出上一次接收的位置,然后把发送的元素插入到缓冲槽尾部。所以写入有序,输出也是有序的。 没有被阻塞的goroutine。如果带buffer,buffer还有空位,就放在buffer里。否则就阻塞挂起当前发送消息的goroutine。

读取消息时:

channel已经关闭,也可以读,只是读出来的数据为空。 看一下有没有阻塞的写操作的goroutine,有的话唤醒它。读取它发送的数据(A)。 读取后的数据(A)放哪,视乎是带buffer还是无buffer。无buffer的话,就直接把写数据(A)给读取者。带buffer的话,就先看buffer里是否有数据(B),有就把数据(B)给读取者,再把数据(A)放到原来数据(B)空出来的位置上。

chan 结构体解读

type hchan struct {
qcount uint // 队列中的数据量
dataqsiz uint // 队列的容量
buf unsafe.Pointer // 存储数据的缓冲区
elemsize uint16 //元素的占位大小
closed uint32 //关闭标志位,0 未关闭,1 关闭
elemtype *_type // 元素类型
sendx uint // 发送index
recvx uint // 接收index
recvq waitq // 接收等待队列,存储了接收挂起的g
sendq waitq // 存储了挂起的发送队列
//锁,保护hchan里面的字段
lock mutex
}

chansend 发送源码解读

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//加锁
lock(&c.lock)
//1、如果关闭就panic,不能发送到一个关闭的channel
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//2、接收队列中取出一个g
if sg := c.recvq.dequeue(); sg != nil {
//找到一个等待的接收g,把数据直接复制到g的stack上,并把它安排在下一次调度上
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//3、如果缓冲队列还有空间
if c.qcount < c.dataqsiz {
// 将要发送的消息放入队列中
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
//记得归位
if c.sendx == c.dataqsiz {
c.sendx = 0
}
//放入一个+1,取出一个-1
c.qcount++
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}
//4、如果没有缓冲区可用,把当前的g挂起,并加入到发送队列中
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}

chanrecv 接收解读

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

//先检查如果队列中没有元素,并且发送等待队列中也没有挂起的g,并且未关闭就直接返回
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
//加锁
lock(&c.lock)
//1、如果channel已经关闭并且队列元素个数为0,则返回,不会报panic
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
//2、
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 如果buffer是0,从发送中直接获取并接收;除此之外,从队列头接收并且发送值到队尾
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//3、如果缓存队列中还有元素,就取出来接收
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

// 没有可用的,就挂起当前的g,并放入接收等待队列中
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

waitq 双向链表解读

// waitq 只保存了一个执行头和一个执行尾的指针,保存的元素都是sudog
type waitq struct {
first *sudog
last *sudog
}
// sudog 代表一个go在一个等待队列中
type sudog struct {
//下面的字段受hchan.lock锁的保护,
g *g

// isSelect 为true表示g正在参与select选择,g.selectDone必须被cas算法调用才能唤醒g
isSelect bool
next *sudog // 下一个 这个两个字段组成了一个双向链表
prev *sudog // 上一个
...
}

入队列

// 队头和队尾分别指向第一个元素和最后一个元素,如果只有一个元素,那么队头和队尾都指向他
func (q *waitq) enqueue(sgp *sudog) {
sgp.next = nil
x := q.last
if x == nil {
sgp.prev = nil
q.first = sgp
q.last = sgp
return
}
sgp.prev = x
x.next = sgp
q.last = sgp
}

出队列

// 出队列的时候,移动队头往后移动,并设置队头的元素指向下一个为nil,是一个FIFO队列
func (q *waitq) dequeue() *sudog {
for {
sgp := q.first
if sgp == nil {
return nil
}
y := sgp.next
if y == nil {
q.first = nil
q.last = nil
} else {
y.prev = nil
q.first = y
sgp.next = nil // mark as removed (see dequeueSudog)
}
//当g是在select阻塞的时候需要设置selectDone为1才能唤醒g
if sgp.isSelect {
if !atomic.Cas(&sgp.g.selectDone, 0, 1) {
continue
}
}

return sgp
}
}

lock 实现解读

加锁和解锁没有使用mutex,而是使用了futex技术,这个是在linux系统环境下的,futex是一种高效的加锁方式。

lock 加锁

func lock(l *mutex) {
gp := getg()

if gp.m.locks < 0 {
throw("runtime·lock: lock count")
}
gp.m.locks++

// Speculative grab for lock.
v := atomic.Xchg(key32(&l.key), mutex_locked)
if v == mutex_unlocked {
return
}

// wait is either MUTEX_LOCKED or MUTEX_SLEEPING
// depending on whether there is a thread sleeping
// on this mutex. If we ever change l->key from
// MUTEX_SLEEPING to some other value, we must be
// careful to change it back to MUTEX_SLEEPING before
// returning, to ensure that the sleeping thread gets
// its wakeup call.
wait := v

// On uniprocessors, no point spinning.
// On multiprocessors, spin for ACTIVE_SPIN attempts.
spin := 0
if ncpu > 1 {
spin = active_spin
}
for {
// Try for lock, spinning.
for i := 0; i < spin; i++ {
for l.key == mutex_unlocked {
if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
return
}
}
procyield(active_spin_cnt)
}

// Try for lock, rescheduling.
for i := 0; i < passive_spin; i++ {
for l.key == mutex_unlocked {
if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
return
}
}
osyield()
}

// Sleep.
v = atomic.Xchg(key32(&l.key), mutex_sleeping)
if v == mutex_unlocked {
return
}
wait = mutex_sleeping
futexsleep(key32(&l.key), mutex_sleeping, -1)
}
}

unlock 解锁

func unlock(l *mutex) {
v := atomic.Xchg(key32(&l.key), mutex_unlocked)
if v == mutex_unlocked {
throw("unlock of unlocked lock")
}
if v == mutex_sleeping {
futexwakeup(key32(&l.key), 1)
}

gp := getg()
gp.m.locks--
if gp.m.locks < 0 {
throw("runtime·unlock: lock count")
}
if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
gp.stackguard0 = stackPreempt
}
}

Channel 的优点

1、和goroutine调度结合起来,使得数据传输更加的高效和快速。 2、使用的锁不是操作系统的mutex互斥锁,而是使用的futex技术。

参考