1.sync.RWMutex 介绍
Sync.RWMutex 官方给出的定义是读/写互斥锁,锁可以由任意数量的读者或单个写者持有。 RWMutex
的零值是未锁定的互斥锁。
A RWMutex is a reader/writer mutual exclusion lock. The lock can be held by an arbitrary number of readers or a single writer. The zero value for a RWMutex is an unlocked mutex.
其包含了四种方法,如下所示:
1 | func (rw *RWMutex) Lock // 提供写锁加锁操作 |
这四个方法的源码解析在下面可以深入理解,可以大概了解到在 RWMutex
读操作可并发重入,而写操作是互斥的,读写锁通常用互斥锁、条件变量、信号量实现。
以下有四个场景可以先了解一下,方便理解该读/写互斥锁的源码:
(1):写操作如何防止写操作?
- 写操作依赖互斥锁阻止其他的写操作:读写锁包含一个互斥锁(Mutex),写锁定必须要先获取该互斥锁,如果互斥锁已被协程A获取(或者协程A在阻塞等待读结束),意味着协程A获取了互斥锁,那么协程B只能阻塞等待该互斥锁。
(2):写操作是如何阻止读操作的?
- 首先了解一下最多可支持的并发读者:
RWMutex.readerCount
是个整型值,用于表示读者数量,不考虑写操作的情况下,每次读锁定将该值+1,每次解除读锁定将该值-1,所以readerCount
取值为[0, N]
,N为读者个数,实际上最大可支持2^30
个并发读者。 - 写操作将
readerCount
变成负值来阻止读操作的:当写锁定进行时,会先将readerCount
减去2^30
,从而readerCount
变成了负值,此时再有读锁定到来时检测到readerCount
为负值,便知道有写操作在进行,只好阻塞等待。而真实的读操作个数并不会丢失,只需要将readerCount
加上2^30
即可获得。
(3):读操作是如何阻止写操作的?
- 读操作通过
readerCount
来将来阻止写操作的:读锁定会先将RWMutext.readerCount
加1,此时写操作到来时发现读者数量不为0,会阻塞等待所有读操作结束。
(4):为什么写锁定不会被饿死?
- 首先解释一下为什么可能有饿死的情况发生:写操作要等待读操作结束后才可以获得锁,写操作等待期间可能还有新的读操作持续到来,如果写操作等待所有读操作结束,很可能被饿死。
- 可以通过
RWMutex.readerWait
解决这个问题写操作到来时,会把RWMutex.readerCount
值拷贝到RWMutex.readerWait
中,用于标记排在写操作前面的读者个数。前面的读操作结束后,除了会递减RWMutex.readerCount
,还会递减RWMutex.readerWait
值,当RWMutex.readerWait
值变为0时唤醒写操作。所以,写操作就相当于把一段连续的读操作划分成两部分,前面的读操作结束后唤醒写操作,写操作结束后唤醒后面的读操作。
注:以下源码解析基于 go 1.15
2.源码解析
结构体及其常量定义
RWMutex
包含了五个字段及一个常量,其结构体定义如下:
1 | type RWMutex struct { |
关于信号量(semaphore)的问题,这里做一个补充:
这是一个由 Edsger Dijkstra
提出的数据结构,解决很多关于同步的问题时,它提供了两种操作的整数:
- 获取(acquire,又称 wait、decrement 或者 P)
- 释放(release,又称 signal、increment 或者 V)
获取操作把信号量减一,如果减一的结果是非负数,那么线程可以继续执行。如果结果是负数,那么线程将会被阻塞,除非有其它线程把信号量增加回非负数,该线程才有可能恢复运行)。
释放操作把信号量加一,如果当前有被阻塞的线程,那么它们其中一个会被唤醒,恢复执行。
Go 语言的运行时提供了 runtime_SemacquireMutex
和 runtime_Semrelease
函数,像 sync.RWMutex 这些对象的实现会用到这两个函数。
写锁加锁 Lock()
当资源的使用者想要获取写锁时,需要调用 sync.RWMutex.Lock
方法:
1 | func (rw *RWMutex) Lock() { |
其主要的内容:
- 使用
sync.Mutex
中的互斥锁sync.Mutex.Lock()
先解决与其他写者的竞争问题; - 判断当前是否存在读锁:先通过原子操作改变
readerCount(readerCount-rwmutexMaxReaders)
,使其变为负数,告诉 RUnLock 当前存在写锁等待,然后再加回rwmutexMaxReaders
并赋给r,若r仍然不为0,代表当前还有读锁 - 判断是否还有其他
Goroutine
持有RWMutex
互斥锁的读锁(r != 0),如果有则会先将当前的readerCount
的数量加到readerWait
中,从而防止后面源源不断的读者请求读锁,从而进来导致写锁饿死的情况发生,然后该Goroutine
会调用sync.runtime_SemacquireMutex
进入休眠状态,并等待当前持有读锁的Goroutine
结束后释放writerSem
信号量将当前Goroutine
唤醒。
注意:上面的 readerWait
就是用以解决写锁饥饿的问题,代码与注释已经解释得很清楚了,不再阐述了。
写锁释放 UnLock()
写锁的释放会调用 sync.RWMutex.Unlock
方法:
1 | func (rw *RWMutex) Unlock() { |
若抛开竞态检测的代码的话,其实这个写锁释放的代码也是挺简洁的,主要完成了以下几件事情:
- 释放读锁:通过调用
atomic.AddInt32
函数将readerCount
加上rwmutexMaxReaders
从而变回正数; - 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine,也即解除阻塞的读锁(若有);
- 调用
sync.Mutex.Unlock()
方法释放写互斥锁。
读锁加锁 RLock()
读锁操作调用 RLock()
1 | func (rw *RWMutex) RLock() { |
该读锁加锁整体流程较为简单,分两种情况进行判断:
- 此时无写锁
(readerCount + 1) > 0
(注意,在写锁是加锁那里,我们对readerCount 进行了readerCount-rwmutexMaxReaders
处理),那么可以上读锁, 并且readerCount
原子加1(读锁可重入[只要匹配了释放次数就行]); - 此时有写锁
(readerCount + 1) < 0,
所以通过readerSem
读信号量, 使读操作睡眠等待;
读锁释放 RUnlock()
RUnLock()
方法对读锁进行解锁:
1 | func (rw *RWMutex) RUnlock() { |
关于读锁释放的详细如下:
- 首先
readerCount
减1,然后进行两种情况的判断:- 若 r 大于等于0,读锁直接解锁成功,直接结束本次操作;
- 若 r 小于0, 有一个正在执行的写操作,在这时会调用
sync.RWMutex.rUnlockSlow
方法;
- 然后倘若上面判断 r 小于0,则进入
rUnlockSlow()
慢解锁,先进行一个判断,若有以下两种情况发生:r + 1 == 0
表示直接执行RUnlock()
,r + 1 == -rwmutexMaxReaders
表示执行Lock()
再执行RUnlock()
,这两种情况都会进行报错。 - 如果没有上述两种情况发生,则
sync.RWMutex.rUnlockSlow
会减少获取锁的写操作等待的读操作数readerWait
,并在所有读操作都被释放之后触发写操作的信号量writerSem
,该信号量被触发时,调度器就会唤醒尝试获取写锁的Goroutine
。
3.小结
之前的sync.Mutex
分析起来简直是复杂得不得了,但是这个读写互斥锁 sync.RWMutex
看起来功能非常复杂源码却相对较为简单,因为它建立在 sync.Mutex
上,复杂的工作都被sync.Mutex
完成了,所以其简单很多了,总的来说读锁和写锁的关系:
- 读锁不能阻塞读锁,引入
readerCount
实现; - 读锁需要阻塞写锁,直到当前的所有读锁都释放,引入
readerSem
实现(同时防止写锁饥饿); - 写锁需要阻塞读锁,直到所有写锁都释放,引入
writerSem
实现; - 写锁需要阻塞写锁,引入
Metux
实现;
使用好读写互斥锁可以得到更细粒度的控制,能够在读操作远远多于写操作的时候提升性能,所以对于究竟是使用sync.Mutex
还是 sync.RWMutex
就要看具体的业务分析了。