Go sync package

June 18, 2019

Go sync package

sync包提供了基本的同步原语,比如同步锁,还有其他的OnceWaitGroup,大多数都用于低级别的库,高级别的同步控制最好用channels

Cond

Cond实现了一个条件变量,等待或宣布事件发生的goroutines的集合点。

任何一个Cond都有任意一个相关的锁对象L,可以是同步锁或者读写锁。不论选择哪一个必须在条件改变和等待调用方法前不被改变。

Cond使用之后不能被复制。

Cond主要用于go之间的消息通知。

func (c *Cond) Wait()

func (c *Cond) Wait()

等待c.L原子解锁以及暂停执行调用goroutine,之后再恢复执行,在返回前加锁c.L,除非被广播或信号唤醒,否则等待无法返回。

使用例子:

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

func (*Cond) Signal

func (*Cond) Signal

Signal函数是发送一个信号,用于唤醒一个等待的go,如果有多个go处于等待状态,则不能控制唤醒那个go

func (*Cond) Broadcast

func (*Cond) Broadcast

Broadcast广播函数用于唤醒所有等待的go

Mutex

Mutex就是一个互斥锁的。保证任何资源只能被任何一个go使用。

type Mutex struct {
	state int32 # 状态
	sema  uint32 # 信号量字段semaphore
}

func (*Mutex) Lock

func (m *Mutex) Lock()

m,如果锁已被使用,则调用goroutine将阻塞,直到互斥锁可用。

# Lock 函数,使用CAS算法,如果锁状态是0,期望值也是0,就修改为1,如果再次加锁时,内存值是1,期望值是0,操作就是失败,得到的就是false;第一次加锁时m.state是0
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	.....
# 如果再次加锁就会进入go等待队列中。如果lifo为true,队列等待在等待队列的头部。	
# m.sema 是信号量,这个方法可Semacquire方法一样,是为了Mutexes实现的,如果lifo为true,则在等待队列的头部排队服务员
runtime_SemacquireMutex(&m.sema, queueLifo)
# Semacquire等待直到* s> 0然后以原子方式递减它。
func runtime_Semacquire(s *uint32)

func (*Mutex) Unlock

func (m *Mutex) Unlock()
# 解锁的时候就进行m.state原子加-1,如果多次解锁就会运行时异常。
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
....
# 这个方法主要是执行原子的递增m.sema,并通知一个阻塞在Semacquire的等待goroutines
runtime_Semrelease(&m.sema, false)

互斥锁与go不绑定,可以go1进行加锁,go2进行解锁。只要解锁在加锁后操作就行。

go中的互斥锁是通过信号量和

Once

Once是一个只执行一次动作的对象。一般可以用于连接关闭,保证只执行一次;或者用于执行初始化,保证只初始化一次。

func (*Once) Do

func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 1 {
		return
	}
	// Slow-path.
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

如果once.Do(f)被调用多次,也只会执行一次,即时参数f不一致也只会执行一次。每个需要执行一次的函数都需要创建一个Once对象。执行f函数时如果出现panics也会认为执行过了,不会再次调用这个函数。

如果出现循环调用就会出现死锁;这个点看源码就知道了。

once.Do(func() {
		once.Do(func() {
			f("test")
		})
	})

Pool

池是一组可以单独保存和检索的临时对象,池中的对象可能会被自动删除,池使用时时安全的;池主要时缓存已经创建但是未使用到对象,这样可以减轻GC的压力;fmt包中使用的池时比较好的,在大负载下进行扩展,在负载降低时进行收缩。

RWMutex

type RWMutex struct {
	w           Mutex  // 写互斥锁
	writerSem   uint32 // 写信号量
	readerSem   uint32 // 读信号量
	readerCount int32  // 读阻塞的数量
	readerWait  int32  // 读放弃的数量
}
const rwmutexMaxReaders = 1 << 30

加读锁最大次数是2^30次。

RWMutex是一个读写互斥锁,可以拥有多个读锁或者一个写锁,RWMutex的零值状态时是互斥锁,写锁的优先级高于读写锁。

解释一下写锁的优先级高于读锁:比如有10goroutines获取了读锁,此时再有一个go来获取写锁,这个go就会进入阻塞状态,然后又有一组go来获取读锁,此时就获取不到读锁了,需要等前面的读锁全部释放完,并且写锁释放过后才能再次获取读锁,所以就是说写锁优先级高于读锁。不这样处理的话写锁会处于永久饥饿中。

func (*RWMutex) Lock

func (rw *RWMutex) Lock 

用于rw的写锁,如果锁是已经被读或写使用,那锁将阻塞知道可用。

func (*RWMutex) RLock

func (rw *RWMutex) RLock 

用于rw的读锁,次锁不能被递归调用。

func (*RWMutex) RLocker

func (rw *RWMutex) RLocker

RLocker返回一个Locker接口,通过调用LockUnlock来实现rw.RLockrw.RUnlock方法。

func (*RWMutex) RUnlock

func (rw *RWMutex) RUnlock 

RUnlock用于解锁其中一个读锁,如果没有加的读锁就会出现异常。

func (*RWMutex) Unlock

func (rw *RWMutex) Unlock

解除rw的写锁,如果rw是没有加写锁就会出现运行时异常。和互斥锁一样Mutex,锁对象和goroutines没有关联,可以任意go加锁,任意go解锁。

Context

Context主要用于在一个http request中,启动了多个go去访问其他资源,context主要用于可以统一控制其他的go一起退出。 Context方法可以由多个go同时使用。

Context定义的接口如下,主要实现如下的几个功能。

type Context interface {
    //子go用于监听父go是否发起结束当前go的消息
    Done() <-chan struct{}
    // 返回关闭的原因
    Err() error
    //返回到期时间,比如设置了5s取消,那返回的值就是5s后的时间,ok is true if your setting deadling
    Deadline() (deadline time.Time, ok bool)
    //从context中取出key的值
    Value(key interface{}) interface{}
}

使用方法主要由如下:

# 设置超时返回,并返回一个新的context
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
# 没有超时时间,只是用于通知子go执行关闭
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
# 设置一个值到context中,并返回一个新的ctx
func WithValue(parent Context, key, val interface{}) Context

主要使用的一个Http请求中,用来控制子go。如下的代码是模拟在一个democontroller中,去并发的请求其他资源,比如数据库资源,还有gRpc调用等,这些操作都是用子go去调用的。

func main() {

	http.HandleFunc("/demo",demo)
	http.ListenAndServe(":8888",nil)
}
func demo(w http.ResponseWriter,r *http.Request)  {
	timeout, err := time.ParseDuration(r.FormValue("timeout"))
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	var (
		ctx context.Context
		cancel context.CancelFunc
	)
	if err != nil {
		ctx, cancel = context.WithTimeout(r.Context(), timeout)
	}else {
		ctx,cancel = context.WithCancel(r.Context())
	}

	defer cancel()
	database := getDatabaseData(ctx)
	age,ct := gRpcServer(ctx)
	if deadline, ok := ctx.Deadline() ; ok {
		fmt.Println(deadline.String())
		fmt.Println(time.Now().String())
	}
	if err := ctx.Err(); err != nil {
		fmt.Println(err)
	}
	if value := ct.Value(ageKey); value != nil {
		fmt.Println("value:",value)
	}
	fmt.Fprint(w,fmt.Sprintf("Database:%s\nAge:%d",database,age))
}


func getDatabaseData(ctx context.Context) string {
	c := make(chan string)
	go func() {
		time.Sleep(time.Second*2)
		c <- "database"
	}()
	select {
	case <- ctx.Done():
		return ""
	case r := <- c:
		return r
	}
}

func gRpcServer(ctx context.Context) (int,context.Context) {
	c := make(chan int)
	go func() {
		time.Sleep(2*time.Second)
		c <- 24
	}()
	ct := context.WithValue(ctx, ageKey, ageKey)
	select {
	case <- ctx.Done():
		return 0,ct
	case v := <- c:
		return v,ct
	}
}

上面这两个方法有点问题,会出现go不能会回收,泄漏的情况,在这种情况下会出现:函数中启动的go还在休眠,还没有写入到c中,此时收到ctx.Done()的信号,就返回了,此时就会出现写入不了c这个chan中,于是这个go就永远不能被释放了。

解决办法有两种:

  • 接收到关闭信号之后等待c返回
func getDatabaseData(ctx context.Context) string {
	c := make(chan string)
	go func() {
		time.Sleep(time.Second*2)
		c <- "database"
	}()
	select {
	case <- ctx.Done():
	    <- c //在此等待c返回,但是这样就会阻塞http请求,就会出现变成同步操作。
		return ""
	case r := <- c:
		return r
	}
}
# 下一个方法同上
  • 还有一种就是关闭这个chan,这种既可以快速返回,也不会出现go不能释放
func getDatabaseData(ctx context.Context) string {
	c := make(chan string)
	go func() {
	    // 如果加入关闭通道的操作,那就一定要加捕获panic的操作,不然会导致程序推出。因为发送到一个关闭的chan中会出现panic
	    defer func() {
			if err := recover() ; err != nil {

			}
		}()
		time.Sleep(time.Second*2)
		c <- "database"
	}()
	select {
	case <- ctx.Done():
	    close(c)
		return ""
	case r := <- c:
		return r
	}
}

涉及到的算法

CAS 算法

CAS算法又叫无锁算法,全称是(Compare-and-swap)比较交换算法。CAS算法是原子操作,是并发安全的。

CAS算法有三个操作数,第一个是内存值V,第二个是旧预期值A,第三个是新值B;当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

信号量算法

信号量是Unix系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源。

可简单理解为信号量为一个数值:

  • 当信号量>0时,表示资源可用,获取信号量时系统自动将信号量减1;
  • 当信号量==0时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒;

参考


LRF 记录学习、生活的点滴