1.Sync包 简明介绍
golang
语言有着天生适合高并发的特性,作为原生支持用户态进程(Goroutine
)的语言,当涉及到并发编程,多线程编程时候,则离不开锁相关的概念了,于是编写 golang
源码的大叔们就给了src/sync 这么个包,通过浏览源码可以知道src/sync包中提供了用于同步的一些基本原语,可以说这是一个很重要的的包了,如果掌握这个包,在编写代码的工程中肯定能够大有帮助,这个包大体上有:sync.Mutex、sync.RWMutex、sync.WaitGroup、sync.Map、sync.Pool、sync.Once、sync.Cond,也可以看一下官方给的sync包的一些方法解释: sync
,同时为了节省一下 Clone 源码时间,或者只想简单回顾一下源码,这里给了快捷的入口:src/sync。
在并发编程中,同步原语或者锁,他们的主要作用是保证多个线程或者多个goroutine
在访问同一片内存时不会出现混乱的问题,这个是一个非常重要的内容,很多人在编写代码的时候常常没有注重对这些并发访问的内容进行关注,导致出现事故也不清楚是什么原因,而sync
包中所有的结构都适用于goroutine
并发执行的情况,需要好好掌握。
注:以下golang
源码版本为:1.15
2.sync.Mutex 源码解析
sync.Mutex 背景更迭
我认为了解知识的最好方式是从其发展背景进行了解,比如它在发展的过程中,做了哪些改变?是因为什么问题而做了这些改变?是否能够使得现在的框架更加优秀?我想如果有这些问题的话,了解会更加深入一些。
我们可以看看Russ Cox在2008提交的第一版的Mutex
实现
1 | type Mutex struct { |
这个版本相对简单,只需要通过cas
对 key
进行加一, 如果key
的值是从0
加到1
, 则直接获得了锁。否则通过semacquire
进行sleep, 被唤醒的时候就获得了锁。
2012年, commit dd2074c8做了一次大的改动,它将waiter count
(等待者的数量)和锁标识
分开来(内部实现还是合用使用state
字段)。新来的 goroutine 占优势,会有更大的机会获取锁。
2015年, commit edcad863, Go 1.5中mutex
实现为全协作式的,增加了spin机制,一旦有竞争,当前goroutine就会进入调度器。在临界区执行很短的情况下可能不是最好的解决方案。
2016年 commit 0556e262, Go 1.9中增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在1毫秒,并且修复了一个大bug,唤醒的goroutine总是放在等待队列的尾部会导致更加不公平的等待时间。
而现在的 Go 1.15 中的也稍微更加复杂了一些,初次看很容易迷糊,绝对不是说像初次版本中只有加锁解锁那么简单,对正常模式和饥饿模式的认识我也是通过源码才得知的,还有这其中对字段的一些共用、标识位的一些位操作都挺麻烦的,需要对源码进行进一步的理解。
sync.Mutex 结构体及常量定义
sync.Mutex 可能是sync
包中使用最广泛的原语,顾名思义就是相互排斥的锁,它确保同一时刻只有一个协程能访问某对象,也即它允许在共享资源上互斥访问(不能同时访问),初始状态为unlock
。
Mutex
的结构体定义如下:
1 | type Mutex struct { |
可以看到 Mutex
是由 state
和 sema
两个字段构成的,int32
和 uint32
均为四个字节,所以一个 Mutex
一般需要消耗八个字节的空间。通过源码可以很容易理解 state
为状态表示,而 sema 的用途是什么呢? sema
其实是用来控制锁状态的信号量,它是一个非负数。
查看源码我们会发现有以下几个常量:
1 | const ( |
对这几个常量稍微解释一下:
- mutexLocked :值为 1,第一位为 1,表示
mutex
已经被加锁。根据mutex.state & mutexLocked
的结果来判断mutex
的状态:该位为 1 表示已加锁,0 表示未加锁。 - mutexWoken:值为 2(二进制:10),第二位为 1,表示
mutex
是否被唤醒。根据mutex.state & mutexWoken
的结果判断mutex
是否被唤醒:该位为 1 表示已被唤醒,0 表示未被唤醒。 - mutexStarving:值为 4(二进制:100),第三位为 1,表示
mutex
是否处于饥饿模式。根据mutex.state & mutexWoken
的结果判断mutex
是否处于饥饿模式:该位为 1 表示处于饥饿模式,0 表示正常模式。 - mutexWaiterShift:值为 3,表示
mutex.state
右移 3 位后即为等待的goroutine
的数量,也即表示统计阻塞在该mutex
上的goroutine
数目需要移位的数值。根据mutex.state >> mutexWaiterShift
得到当前等待的goroutine
数目
Mutex
中的 state
其32位 bit 的分布具体如下所示:
1 | 1111 1111 ...... 1111 1 1 1 1 |
还有最后一个常量,这个常量尤其重要,因为它引出了 引出了 sync.Mutex
的一个特性:保证公平。怎样来保证公平呢?通过引入正常状态和饥饿状态模式进行。
- starvationThresholdNs:值为 1000000 纳秒,即 1ms,表示将
mutex
切换到饥饿模式的等待时间阈值。这个常量在源码中有大篇幅的注释,理解这段注释对理解程序逻辑至关重要。
注解原文:Mutex fairness
饥饿模式与正常模式
将上面的注解原文,进行一个简单的翻译处理之后的认识如下。(从源码作者口中讲出的东西,更有说服力!)
饥饿模式是在 Go 语言 1.9 版本引入的优化,引入的目的是保证互斥锁的公平性(Fairness)。互斥锁有两种状态:正常状态和饥饿状态。
在正常状态下,所有等待锁的goroutine按照FIFO顺序等待。唤醒的goroutine
不会直接拥有锁,而是会和新请求锁的goroutine竞争锁的拥有。新请求锁的goroutine
具有优势:它正在CPU上执行,而且可能有好几个,所以刚刚唤醒的goroutine
有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的goroutine
会加入到等待队列的前面。 如果一个等待的goroutine
超过1ms没有获取锁,那么它将会把锁转变为饥饿模式。
在饥饿模式下,锁的所有权将从unlock
的gorutine
直接交给交给等待队列中的第一个。新来的goroutine
将不会尝试去获得锁,即使锁看起来是unlock
状态, 也不会去尝试自旋操作(等也白等,在饥饿模式下是不会给你的),而是乖乖地待在等待队列的尾部。
如果一个等待的goroutine
获取了锁,并且满足一以下其中的任何一个条件:(1)它是等待队列中的最后一个;(2)它等待的时候小于1ms。它会将锁的状态转换为正常状态。
相比于饥饿模式,正常模式有更好地性能,因为一个 goroutine
可以连续获得好几次 mutex
,即使有阻塞的等待者。而饥饿模式可以有效防止出现位于等待队列尾部的等待者一直无法获取到 mutex
的情况。
加锁
sync.Mutex 有加锁和解锁,它们分别使用 sync.Mutex.Lock 和 sync.Mutex.Unlock 方法。
互斥锁的加锁是靠 sync.Mutex.Lock 进行完成的,如果参考其他以往版本的 Golang 源码解析,会发现最新版本的 Golang 源码将 sync.Mutex.Lock 进行了简化,方法的主干只保留最常见、简单的情况 — 当锁的状态是 0 时,将 mutexLocked
位置成 1:
1 | func (m *Mutex) Lock() { |
这个 atomic.CompareAndSwapInt32()
方法的签名如下:
1 | // CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value. |
这里对 atomic.CompareAndSwapInt32()
方法进行一个解释,atomic
包是由golang提供的low-level的原子操作封装,主要用来解决进程同步问题,但官方并不建议直接使用。CompareAndSwapInt32()
就是int32型数字的compare-and-swap
实现。cas(&addr, old, new)
的意思是if *addr==old, *addr=new
。大部分操作系统支持CAS,x86指令集上的CAS汇编指令是CMPXCHG
。
当然,这里还是可以深入理解一下 CAS
的底层实现,源码可见:src/runtime/internal/atomic/asm_amd64.s,对汇编语言的了解较少,读起来较为吃力,具体的代码片段及注释可见下面:
1 | // bool Cas(int32 *val, int32 old, int32 new) |
从上面的汇编源码可以知道,大概的流程为:看看这把锁是不是空闲状态,如果是的话,直接原子性地修改一下 state
为已被获取就行了。大概了解一下,知道该方法是原子性的即可。
继续回到 sync.Mutex.Lock 当中,接下来判断如果 sync.Mutex 的状态不为0的时候, sync.Mutex.Lock 就会进入 m.lockSlow()
方法,那么m.lockSlow()
方法做了些什么事情呢?注意,我将分段解析这个方法,具体合并的注释理解在最后附上。
我们先对其一些变量做一些简单的注解解释:
1 | func (m *Mutex) lockSlow() { |
接下来分成几个部分介绍获取锁的过程:
- 判断
Goroutine
的状态,看是否能够进行自旋等锁;
1 | for { |
总的来说需要注意如果是饥饿模式则不进行自旋,因为锁的所有权会直接交给队列头部的goroutine,所以在这个饥饿状态下,无论如何都无法获得mutex。
需要了解的是自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用恰当会对性能带来很大的增益,但是使用的不恰当就会拖慢整个程序,所以 Goroutine 进入自旋的条件非常苛刻:
- 互斥锁只有在普通模式才能进入自旋;
- 需要等待 runtime_canSpin() 返回 True;
而 runtime_canSpin() 究竟是做些什么判断呢?其源码片段如下:
1 | // Active spinning for sync.Mutex. |
源码注释认为因为 sync.Mutex
是协作的,所以对于 Spin
我们应该要保守一些使用,使用 Spin
的条件还挺严苛,看看其需要满足什么条件:
- 旋次数小于active_spin(这里是4)次;
- 然后应该运行在多内核的机器上,且
GOMAXPROCS
的数目应该要大于1;(如果GOMAXPROCS
不了解,可以看看Goroutine
相对应的GMP
模型) - 还有当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;
如果当前的 Goroutine
能够满足以上进入 Spin
的条件,则会调用 runtime_doSpin 进行 Spin
。所以可以看出来,并不是一直无限自旋下去的,当自旋次数到达 4 次或者其它条件不符合的时候,就改为信号量拿锁了。
- 通过自旋等待互斥锁的释放;
上面已经分析了,m.lockSlow()
会调用 runtime_doSpin 进行 Spin
进行自旋操作,其源码片段如下:
1 | //go:linkname sync_runtime_doSpin sync.runtime_doSpin |
procyield()
会执行 30 次的 PAUSE
指令,该指令只会占用 CPU 并消耗 CPU 时间:
1 | TEXT runtime·procyield(SB),NOSPLIT,$0-0 |
- 计算互斥锁的最新状态;
如果此时 Goroutine
不能进行自旋操作,则会进入剩余的代码逻辑;到了这一步, state的状态可能是:
- 锁还没有被释放,锁处于正常状态;
- 锁还没有被释放, 锁处于饥饿状态;
- 锁已经被释放, 锁处于正常状态;
- 锁已经被释放, 锁处于饥饿状态;
接下来互斥锁会根据上下文计算当前互斥锁最新的状态:
- 如果
mutex
当前处于正常模式,将new
的第一位即锁位设置为 1; - 如果
mutex
当前已经被加锁或处于饥饿模式,则当前goroutine
进入等待队列; - 如果
mutex
当前处于饥饿模式,而且mutex
已被加锁,则将new
的第三位即饥饿模式位设置为 1。 - 如果
goroutine
已经设置为唤醒状态, 需要清除new state
的唤醒标记
具体源码片段如下:
1 | // new 复制 state的当前状态, 用来设置新的状态 |
- 通过CAS来尝试设置互斥锁的状态;
m.lockSlow()
剩下代码片段如下:
1 | // 调用 CAS 更新 state 状态 |
计算了新的互斥锁状态之后,就会使用 CAS
函数 atomic.CompareAndSwapInt32
更新该状态,但是如果通过 CAS
没有获得锁,则会调用 sync.runtime_SemacquireMutex 使用信号量保证资源不会被两个 Goroutine 获取,sync.runtime_SemacquireMutex 的源码如下,可以尝试一下解读:
1 | //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex |
semacquire1 会在方法中不断调用尝试获取锁并休眠当前 Goroutine
等待信号量的释放,一旦当前 Goroutine
可以获取信号量,它就会立刻返回,其源码片段如下:
1 | func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { |
通过上面进行 sleep
之后,然后被唤醒,会接着继续执行剩下的代码,剩下会重新获得当前的模式,然后进行判断:
- 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
- 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;
具体 sync.Mutex 源码及注释如下:
1 | func (m *Mutex) Lock() { |
解锁
sync.Mutex.Unlock 代码比较少,总的来说,该过程会先使用 AddInt32
函数快速解锁,这时会发生下面的两种情况:
- 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
- 如果该函数返回的新状态不等于 0,这段代码会调用
sync.Mutex.unlockSlow
方法开始慢速解锁:
其源码片段如下:
1 | func (m *Mutex) Unlock() { |
而 sync.Mutex.unlockSlow 方法首先会校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了就会直接抛出异常 sync: unlock of unlocked mutex
中止当前程序。
在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁,具体看以下源码及注释
1 | func (m *Mutex) unlockSlow(new int32) { |
3.小结
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念,加锁流程小结如下:
- 如果互斥锁处于初始化状态,就会直接通过置位
mutexLocked
加锁; - 如果互斥锁处于
mutexLocked
并且在普通模式下工作,就会进入自旋,执行 30 次PAUSE
指令消耗 CPU 时间等待锁的释放; - 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
- 互斥锁在正常情况下会通过
sync.runtime_SemacquireMutex
函数将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒当前 Goroutine; - 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,当前 Goroutine 会将互斥锁切换回正常模式;
而解锁流程较为简单,小结如下:
- 当互斥锁已经被解锁时,那么调用
sync.Mutex.Unlock
会直接抛出异常; - 当互斥锁处于饥饿模式时,会直接将锁的所有权交给队列中的下一个等待者,等待者会负责设置
mutexLocked
标志位; - 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,就会直接返回;在其他情况下会通过
sync.runtime_Semrelease
唤醒对应的 Goroutine;