go channel 实现

September 18, 2019

什么是Channel

Channelgo之间通信的一种方式,从实现上说,channel是一个链表的数据结构,可以将数据放入和读取。

基本使用

channel分为有缓冲区的和无缓冲区的通道。

原理

channel就是通过一个共享队列来做协程之间的数据传递的,是一种数据结构,低层还是通过锁来控制数据之间的安全;和我们自己实现的差别,加入了协程挂起和通知的功能,使得协程更加的快速。

发送消息时:

channel已经关闭,那就不能发。panic掉。
看一下有没有阻塞在读操作上的goroutine,有的话取出上一次接收的位置,然后把发送的元素插入到缓冲槽尾部。所以写入有序,输出也是有序的。
没有被阻塞的goroutine。如果带buffer,buffer还有空位,就放在buffer里。否则就阻塞挂起当前发送消息的goroutine。

读取消息时:

channel已经关闭,也可以读,只是读出来的数据为空。
看一下有没有阻塞的写操作的goroutine,有的话唤醒它。读取它发送的数据(A)。
读取后的数据(A)放哪,视乎是带buffer还是无buffer。无buffer的话,就直接把写数据(A)给读取者。带buffer的话,就先看buffer里是否有数据(B),有就把数据(B)给读取者,再把数据(A)放到原来数据(B)空出来的位置上。

chan 结构体解读

type hchan struct {
	qcount   uint           // 队列中的数据量
	dataqsiz uint           // 队列的容量
	buf      unsafe.Pointer // 存储数据的缓冲区
	elemsize uint16         //元素的占位大小
	closed   uint32         //关闭标志位,0 未关闭,1 关闭
	elemtype *_type // 元素类型
	sendx    uint   // 发送index
	recvx    uint   // 接收index
	recvq    waitq  // 接收等待队列,存储了接收挂起的g
	sendq    waitq  // 存储了挂起的发送队列
    //锁,保护hchan里面的字段
	lock mutex
}

chansend 发送源码解读

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	//加锁
	lock(&c.lock)
    //1、如果关闭就panic,不能发送到一个关闭的channel
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
    //2、接收队列中取出一个g
	if sg := c.recvq.dequeue(); sg != nil {
		//找到一个等待的接收g,把数据直接复制到g的stack上,并把它安排在下一次调度上
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
    //3、如果缓冲队列还有空间
	if c.qcount < c.dataqsiz {
		// 将要发送的消息放入队列中
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		//记得归位
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		//放入一个+1,取出一个-1
		c.qcount++
		unlock(&c.lock)
		return true
	}

	if !block {
		unlock(&c.lock)
		return false
	}
    //4、如果没有缓冲区可用,把当前的g挂起,并加入到发送队列中
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

chanrecv 接收解读

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	
    //先检查如果队列中没有元素,并且发送等待队列中也没有挂起的g,并且未关闭就直接返回
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}
    //加锁
	lock(&c.lock)
    //1、如果channel已经关闭并且队列元素个数为0,则返回,不会报panic
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
    //2、
	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		// 如果buffer是0,从发送中直接获取并接收;除此之外,从队列头接收并且发送值到队尾
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
    //3、如果缓存队列中还有元素,就取出来接收
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}

	// 没有可用的,就挂起当前的g,并放入接收等待队列中
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}

waitq 双向链表解读

// waitq 只保存了一个执行头和一个执行尾的指针,保存的元素都是sudog
type waitq struct {
	first *sudog
	last  *sudog
}
// sudog 代表一个go在一个等待队列中
type sudog struct {
    //下面的字段受hchan.lock锁的保护,
	g *g

	// isSelect 为true表示g正在参与select选择,g.selectDone必须被cas算法调用才能唤醒g
	isSelect bool
	next     *sudog // 下一个 这个两个字段组成了一个双向链表
	prev     *sudog // 上一个
    ...
}

入队列

// 队头和队尾分别指向第一个元素和最后一个元素,如果只有一个元素,那么队头和队尾都指向他
func (q *waitq) enqueue(sgp *sudog) {
	sgp.next = nil
	x := q.last
	if x == nil {
		sgp.prev = nil
		q.first = sgp
		q.last = sgp
		return
	}
	sgp.prev = x
	x.next = sgp
	q.last = sgp
}

出队列

// 出队列的时候,移动队头往后移动,并设置队头的元素指向下一个为nil,是一个FIFO队列
func (q *waitq) dequeue() *sudog {
	for {
		sgp := q.first
		if sgp == nil {
			return nil
		}
		y := sgp.next
		if y == nil {
			q.first = nil
			q.last = nil
		} else {
			y.prev = nil
			q.first = y
			sgp.next = nil // mark as removed (see dequeueSudog)
		}
        //当g是在select阻塞的时候需要设置selectDone为1才能唤醒g
		if sgp.isSelect {
			if !atomic.Cas(&sgp.g.selectDone, 0, 1) {
				continue
			}
		}

		return sgp
	}
}

lock 实现解读

加锁和解锁没有使用mutex,而是使用了futex技术,这个是在linux系统环境下的,futex是一种高效的加锁方式。

lock 加锁

func lock(l *mutex) {
	gp := getg()

	if gp.m.locks < 0 {
		throw("runtime·lock: lock count")
	}
	gp.m.locks++

	// Speculative grab for lock.
	v := atomic.Xchg(key32(&l.key), mutex_locked)
	if v == mutex_unlocked {
		return
	}

	// wait is either MUTEX_LOCKED or MUTEX_SLEEPING
	// depending on whether there is a thread sleeping
	// on this mutex. If we ever change l->key from
	// MUTEX_SLEEPING to some other value, we must be
	// careful to change it back to MUTEX_SLEEPING before
	// returning, to ensure that the sleeping thread gets
	// its wakeup call.
	wait := v

	// On uniprocessors, no point spinning.
	// On multiprocessors, spin for ACTIVE_SPIN attempts.
	spin := 0
	if ncpu > 1 {
		spin = active_spin
	}
	for {
		// Try for lock, spinning.
		for i := 0; i < spin; i++ {
			for l.key == mutex_unlocked {
				if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
					return
				}
			}
			procyield(active_spin_cnt)
		}

		// Try for lock, rescheduling.
		for i := 0; i < passive_spin; i++ {
			for l.key == mutex_unlocked {
				if atomic.Cas(key32(&l.key), mutex_unlocked, wait) {
					return
				}
			}
			osyield()
		}

		// Sleep.
		v = atomic.Xchg(key32(&l.key), mutex_sleeping)
		if v == mutex_unlocked {
			return
		}
		wait = mutex_sleeping
		futexsleep(key32(&l.key), mutex_sleeping, -1)
	}
}

unlock 解锁

func unlock(l *mutex) {
	v := atomic.Xchg(key32(&l.key), mutex_unlocked)
	if v == mutex_unlocked {
		throw("unlock of unlocked lock")
	}
	if v == mutex_sleeping {
		futexwakeup(key32(&l.key), 1)
	}

	gp := getg()
	gp.m.locks--
	if gp.m.locks < 0 {
		throw("runtime·unlock: lock count")
	}
	if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
		gp.stackguard0 = stackPreempt
	}
}

Channel 的优点

1、和goroutine调度结合起来,使得数据传输更加的高效和快速。 2、使用的锁不是操作系统的mutex互斥锁,而是使用的futex技术。

参考


LRF 记录学习、生活的点滴