Go sync package
sync
包提供了基本的同步原语,比如同步锁,还有其他的Once
和WaitGroup
,大多数都用于低级别的库,高级别的同步控制最好用channels
。
Cond
Cond
实现了一个条件变量,等待或宣布事件发生的goroutines
的集合点。
任何一个Cond
都有任意一个相关的锁对象L
,可以是同步锁或者读写锁。不论选择哪一个必须在条件改变和等待调用方法前不被改变。
Cond
使用之后不能被复制。
Cond
主要用于go
之间的消息通知。
func (c *Cond) Wait()
func (c *Cond) Wait()
等待c.L
原子解锁以及暂停执行调用goroutine
,之后再恢复执行,在返回前加锁c.L
,除非被广播或信号唤醒,否则等待无法返回。
使用例子:
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()
func (*Cond) Signal
func (*Cond) Signal
Signal
函数是发送一个信号,用于唤醒一个等待的go
,如果有多个go
处于等待状态,则不能控制唤醒那个go
。
func (*Cond) Broadcast
func (*Cond) Broadcast
Broadcast
广播函数用于唤醒所有等待的go
。
Mutex
Mutex
就是一个互斥锁的。保证任何资源只能被任何一个go
使用。
type Mutex struct {
state int32 # 状态
sema uint32 # 信号量字段semaphore
}
func (*Mutex) Lock
func (m *Mutex) Lock()
锁m
,如果锁已被使用,则调用goroutine
将阻塞,直到互斥锁可用。
# Lock 函数,使用CAS算法,如果锁状态是0,期望值也是0,就修改为1,如果再次加锁时,内存值是1,期望值是0,操作就是失败,得到的就是false;第一次加锁时m.state是0
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
.....
# 如果再次加锁就会进入go等待队列中。如果lifo为true,队列等待在等待队列的头部。
# m.sema 是信号量,这个方法可Semacquire方法一样,是为了Mutexes实现的,如果lifo为true,则在等待队列的头部排队服务员
runtime_SemacquireMutex(&m.sema, queueLifo)
# Semacquire等待直到* s> 0然后以原子方式递减它。
func runtime_Semacquire(s *uint32)
func (*Mutex) Unlock
func (m *Mutex) Unlock()
# 解锁的时候就进行m.state原子加-1,如果多次解锁就会运行时异常。
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
....
# 这个方法主要是执行原子的递增m.sema,并通知一个阻塞在Semacquire的等待goroutines
runtime_Semrelease(&m.sema, false)
互斥锁与go
不绑定,可以go1
进行加锁,go2
进行解锁。只要解锁在加锁后操作就行。
go
中的互斥锁是通过信号量和
Once
Once
是一个只执行一次动作的对象。一般可以用于连接关闭,保证只执行一次;或者用于执行初始化,保证只初始化一次。
func (*Once) Do
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
// Slow-path.
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
如果once.Do(f)
被调用多次,也只会执行一次,即时参数f
不一致也只会执行一次。每个需要执行一次的函数都需要创建一个Once
对象。执行f
函数时如果出现panics
也会认为执行过了,不会再次调用这个函数。
如果出现循环调用就会出现死锁;这个点看源码就知道了。
once.Do(func() {
once.Do(func() {
f("test")
})
})
Pool
池是一组可以单独保存和检索的临时对象,池中的对象可能会被自动删除,池使用时时安全的;池主要时缓存已经创建但是未使用到对象,这样可以减轻GC的压力;fmt
包中使用的池时比较好的,在大负载下进行扩展,在负载降低时进行收缩。
RWMutex
type RWMutex struct {
w Mutex // 写互斥锁
writerSem uint32 // 写信号量
readerSem uint32 // 读信号量
readerCount int32 // 读阻塞的数量
readerWait int32 // 读放弃的数量
}
const rwmutexMaxReaders = 1 << 30
加读锁最大次数是2^30
次。
RWMutex
是一个读写互斥锁,可以拥有多个读锁或者一个写锁,RWMutex
的零值状态时是互斥锁,写锁的优先级高于读写锁。
解释一下写锁的优先级高于读锁:比如有10
个goroutines
获取了读锁,此时再有一个go
来获取写锁,这个go
就会进入阻塞状态,然后又有一组go
来获取读锁,此时就获取不到读锁了,需要等前面的读锁全部释放完,并且写锁释放过后才能再次获取读锁,所以就是说写锁优先级高于读锁。不这样处理的话写锁会处于永久饥饿中。
func (*RWMutex) Lock
func (rw *RWMutex) Lock
用于rw
的写锁,如果锁是已经被读或写使用,那锁将阻塞知道可用。
func (*RWMutex) RLock
func (rw *RWMutex) RLock
用于rw
的读锁,次锁不能被递归调用。
func (*RWMutex) RLocker
func (rw *RWMutex) RLocker
RLocker
返回一个Locker
接口,通过调用Lock
和Unlock
来实现rw.RLock
和rw.RUnlock
方法。
func (*RWMutex) RUnlock
func (rw *RWMutex) RUnlock
RUnlock
用于解锁其中一个读锁,如果没有加的读锁就会出现异常。
func (*RWMutex) Unlock
func (rw *RWMutex) Unlock
解除rw
的写锁,如果rw
是没有加写锁就会出现运行时异常。和互斥锁一样Mutex
,锁对象和goroutines
没有关联,可以任意go
加锁,任意go
解锁。
Context
Context
主要用于在一个http request
中,启动了多个go
去访问其他资源,context
主要用于可以统一控制其他的go
一起退出。
Context
方法可以由多个go
同时使用。
Context
定义的接口如下,主要实现如下的几个功能。
type Context interface {
//子go用于监听父go是否发起结束当前go的消息
Done() <-chan struct{}
// 返回关闭的原因
Err() error
//返回到期时间,比如设置了5s取消,那返回的值就是5s后的时间,ok is true if your setting deadling
Deadline() (deadline time.Time, ok bool)
//从context中取出key的值
Value(key interface{}) interface{}
}
使用方法主要由如下:
# 设置超时返回,并返回一个新的context
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
# 没有超时时间,只是用于通知子go执行关闭
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
# 设置一个值到context中,并返回一个新的ctx
func WithValue(parent Context, key, val interface{}) Context
主要使用的一个Http
请求中,用来控制子go
。如下的代码是模拟在一个demo
的controller
中,去并发的请求其他资源,比如数据库资源,还有gRpc
调用等,这些操作都是用子go
去调用的。
func main() {
http.HandleFunc("/demo",demo)
http.ListenAndServe(":8888",nil)
}
func demo(w http.ResponseWriter,r *http.Request) {
timeout, err := time.ParseDuration(r.FormValue("timeout"))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var (
ctx context.Context
cancel context.CancelFunc
)
if err != nil {
ctx, cancel = context.WithTimeout(r.Context(), timeout)
}else {
ctx,cancel = context.WithCancel(r.Context())
}
defer cancel()
database := getDatabaseData(ctx)
age,ct := gRpcServer(ctx)
if deadline, ok := ctx.Deadline() ; ok {
fmt.Println(deadline.String())
fmt.Println(time.Now().String())
}
if err := ctx.Err(); err != nil {
fmt.Println(err)
}
if value := ct.Value(ageKey); value != nil {
fmt.Println("value:",value)
}
fmt.Fprint(w,fmt.Sprintf("Database:%s\nAge:%d",database,age))
}
func getDatabaseData(ctx context.Context) string {
c := make(chan string)
go func() {
time.Sleep(time.Second*2)
c <- "database"
}()
select {
case <- ctx.Done():
return ""
case r := <- c:
return r
}
}
func gRpcServer(ctx context.Context) (int,context.Context) {
c := make(chan int)
go func() {
time.Sleep(2*time.Second)
c <- 24
}()
ct := context.WithValue(ctx, ageKey, ageKey)
select {
case <- ctx.Done():
return 0,ct
case v := <- c:
return v,ct
}
}
上面这两个方法有点问题,会出现go
不能会回收,泄漏的情况,在这种情况下会出现:函数中启动的go
还在休眠,还没有写入到c
中,此时收到ctx.Done()
的信号,就返回了,此时就会出现写入不了c
这个chan
中,于是这个go
就永远不能被释放了。
解决办法有两种:
- 接收到关闭信号之后等待
c
返回
func getDatabaseData(ctx context.Context) string {
c := make(chan string)
go func() {
time.Sleep(time.Second*2)
c <- "database"
}()
select {
case <- ctx.Done():
<- c //在此等待c返回,但是这样就会阻塞http请求,就会出现变成同步操作。
return ""
case r := <- c:
return r
}
}
# 下一个方法同上
- 还有一种就是关闭这个
chan
,这种既可以快速返回,也不会出现go
不能释放
func getDatabaseData(ctx context.Context) string {
c := make(chan string)
go func() {
// 如果加入关闭通道的操作,那就一定要加捕获panic的操作,不然会导致程序推出。因为发送到一个关闭的chan中会出现panic
defer func() {
if err := recover() ; err != nil {
}
}()
time.Sleep(time.Second*2)
c <- "database"
}()
select {
case <- ctx.Done():
close(c)
return ""
case r := <- c:
return r
}
}
涉及到的算法
CAS 算法
CAS
算法又叫无锁算法,全称是(Compare-and-swap)
比较交换算法。CAS
算法是原子操作,是并发安全的。
CAS
算法有三个操作数,第一个是内存值V,第二个是旧预期值A,第三个是新值B;当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
信号量算法
信号量是Unix
系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源。
可简单理解为信号量为一个数值:
- 当信号量>0时,表示资源可用,获取信号量时系统自动将信号量减1;
- 当信号量==0时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒;