- 原文地址:
https://medium.com/golangspec/sync-rwmutex-ca6c6c3208a0
- 原文作者:Michał Łowicki
- 譯文出處:https://medium.com
- 本文永久連結:
https://github.com/gocn/translator/blob/master/2019/w13_sync_mutex_translation.md
- 譯者:fivezh
- 校對者:咔嘰咔嘰
當多個執行緒訪問共享資料時,會出現併發讀寫問題(reader-writer problems)。有兩種訪問資料的執行緒型別:
- 讀執行緒 reader:只進行資料讀取
- 寫執行緒 writer:進行資料修改
當 writer 獲取到資料的訪問許可權後,其他任何執行緒(reader 或 writer)都無許可權訪問此資料。這種約束亦存在於現實中,比如,當 writer 在修改資料無法保證原子性時(如資料庫),此時讀取未完成的修改必須被阻塞,以防止載入臟資料(譯者註:資料庫中的臟讀)。還有許多諸如此類的核心問題,例如:
- writer 不能無限等待
- reader 不能無限等待
- 不允許執行緒出現無限等待
多讀/單寫互斥鎖(如sync.RWMutex)的具體實現解決了一種併發讀寫問題。接下來,讓我們看下在 Go 語言中是如何實現的,同時它提供了哪些的資料可靠性保證機制。
作為額外的工作,我們將深入研究分析競態情況下的互斥鎖。
用法
在深入研究實現細節之前,我們先看看sync.RWMutex
的使用實體。下麵的程式使用讀寫互斥鎖來保護臨界區–sleep()
。為了更好的展示整個過程,臨界區部分計算了當前正在執行的 reader 和 writer 的數量(原始碼)。
package main
import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
)
func init() {
rand.Seed(time.Now().Unix())
}
func sleep() {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
func reader(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
sleep()
m.RLock()
c 1
sleep()
c -1
m.RUnlock()
wg.Done()
}
func writer(c chan int, m *sync.RWMutex, wg *sync.WaitGroup) {
sleep()
m.Lock()
c 1
sleep()
c -1
m.Unlock()
wg.Done()
}
func main() {
var m sync.RWMutex
var rs, ws int
rsCh := make(chan int)
wsCh := make(chan int)
go func() {
for {
select {
case n := rs += n
case n := ws += n
}
fmt.Printf("%s%s\n", strings.Repeat("R", rs),
strings.Repeat("W", ws))
}
}()
wg := sync.WaitGroup{}
for i := 0; i 10; i++ {
wg.Add(1)
go reader(rsCh, &m;, &wg;)
}
for i := 0; i 3; i++ {
wg.Add(1)
go writer(wsCh, &m;, &wg;)
}
wg.Wait()
}
play.golang.org 載入的程式環境是確定的(比如開始時間),所以
rand.Seed(time.Now().Unix())
總是傳回相同的數值,此時程式的執行結果可能總是相同的。為了避免這種情況,可透過修改不同的隨機種子值或者在自己的機器上執行程式。
程式執行結果:
W
R
RR
RRR
RRRR
RRRRR
RRRR
RRR
RRRR
RRR
RR
R
W
R
RR
RRR
RRRR
RRR
RR
R
W
譯者註:不同機器上執行的結果會有所不同
每次執行完一組 goroutine(reader 和 writer)的臨界區程式碼後,都會列印新的一行。很顯然,RWMutex 允許至少一個 reader(一個或多個 reader)存在而 writer 同時只能存在一個。
同樣重要且將進一步討論的是:writer 呼叫到Lock()
時,將會使新的 reader/writer 被阻塞。當存在 reader 加了 RLock 時,writer 會等待這一組 reader 完成正在執行的任務,當這一組任務完成後,writer 將開始執行。從輸出可以很明顯的看到,每一行的 R 都會遞減一個,直到沒有 R 之後將列印一個 W。
...
RRRRR
RRRR
RRR
RR
R
W
...
一旦 writer 結束,之前被阻塞的 reader 將恢復執行,然後下一個 writer 也將開始啟動。值得一提的是,如果一個 writer 完成,並且有 reader 和 writer 都在等待,那麼首個 reader 將解除阻塞,然後才輪到 writer。這種交替執行的方式使得 writer 需等待當前這組 reader 完成,所以無論 reader 還是 writer 都不會有無限等待的情況。
實現
註意,本文針對的
RWMutex
實現(Go commit: 718d6c58)在 Go 不同版本中可能隨時有修改。RWMutex
為 reader 提供兩個方法(RLock
和RUnlock
)、也為 writer 提供了兩個方法(Lock
和Unlock
)
讀鎖 RLock
為了簡潔起見,我們先跳過原始碼中競態檢測相關部分(它們將被...
代替)。
func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount;, 1) 0 {
runtime_SemacquireMutex(&rw.ReadeSem;, false)
}
...
}
readerCount
欄位是int32
型別的值,表示待處理的 reader 數量(正在讀取資料或被 writer 阻塞)。這基本上是已呼叫 RLock 函式,但尚未呼叫 RUnlock 函式的 reader 數量。
atomic.AddInt32等價於如下原子性表達:
*addr += delta
return *addr
addr
是*int32
型別變數,delta
是int32
型別。因為此操作具有原子性,所以累加delta
操作不會影響其他執行緒(更多詳見Fetch-and-add)。
如果沒有 writer,則
readerCount
總是會大於或等於 0(譯者註:因為 writer 會把 readerCount 置為負數,透過 Lock 函式的 atomic.AddInt32(&rw.readerCount;, -rwmutexMaxreaders),此時 reader 是一種執行速度很快的非阻塞方式,因為只需要呼叫atomic.AddInt32
。
訊號量 Semaphore
訊號量是 Edsger Dijkstra 發明的資料結構,在解決多種同步問題時很有用。其本質是一個整數,並關聯兩個操作:
- 申請
acquire
(也稱為wait
、decrement
或P
操作) - 釋放
release
(也稱signal
、increment
或V
操作)
acquire
操作將訊號量減 1,如果結果值為負則執行緒阻塞,且直到其他執行緒進行了訊號量累加為正數才能恢復。如結果為正數,執行緒則繼續執行。
release
操作將訊號量加 1,如存在被阻塞的執行緒,此時他們中的一個執行緒將解除阻塞。
Go 執行時提供的runtime_SemacquireMutex
和runtime_Semrelease
函式可用來實現sync.RWMutex
互斥鎖。
鎖 Lock
實現原始碼:
func (rw *RWMutex) Lock() {
...
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount;, -rwmutexMaxreader) + rwmutexMaxreader
if r != 0 && atomic.AddInt32(&rw.readerWait;, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem;, false)
}
...
}
writer 透過Lock
方法獲取共享資料的獨佔許可權。首先,它會申請阻塞其他寫操作的互斥鎖(rw.w.Lock()
),此互斥鎖在Unlock
函式的最後才會進行解鎖。下一步,將readerCount
減去rwmutexMaxreader
(值為 1 左移 30 位, 1<<30
)使其為負數。當readerCount
變為負數時,Rlock 將阻塞接下來的所有讀請求。
再回過頭來看下Rlock()
函式中邏輯:
if atomic.AddInt32(&rw.readerCount;, 1) 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.SeadeSem;, false)
}
後續的 reader 將會被阻塞,那麼已執行的 reader 會怎樣呢?readerWait
欄位用來記錄當前 reader 執行的數量。writer 被訊號量writerSem
阻塞,直到最後一個 reader 在使用後面討論的RUnlock
方法解鎖後會把writerSem
加 1,此時訊號量將變成 0,writer
被解除阻塞(譯者註:RUnlock 函式中的runtime_Semrelease(&rw.writerSem;, false)
)
如果沒有有效的 reader,那麼 writer 將繼續其執行。
最大 reader 數 rwmutexMaxreader
在rwmutex.go中定義的常量:
const rwmutexMaxreader = 1 <30
那麼,其用途是什麼,以及1<<30
表示什麼意義呢?
readerCount
欄位是int32型別,其範圍為:
[-1 <
RWMutext
使用此欄位來計算掛起的 reader 和 writer 的標記(置為負數)。在Lock
方法中:
r := atomic.AddInt32(&rw.readerCount;, -rwmutexMaxreader) + rwmutexMaxreader
Lock
會將readerCount
欄位減去1<<30
,當readerCount
負值時表示 writer 呼叫了Lock
正等待被處理,atomic.AddInt32(&rw.readerCount;, -rwmutexMaxreaders) + rwmutexMaxreaders
這個操作既讓readerCount
變為負數又使r
儲存回了 readerCount。rwmutexMaxreaders
也可以限制被掛起 reader 的數量。如果有rwmutexMaxreader
個或更多掛起的 reader,那麼readerCount
將是非負值,此時將導致整個機制的崩潰。所以,reader 實際的限制數是:rwmutexMaxreader - 1
,此值1073741823
超過了10億
。
解讀鎖 RUnlock
實現原始碼:
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount;, -1); r 0 {
if r+1 == 0 || r+1 == -rwmutexMaxreader {
race.Enable()
thrSw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait;, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.WriteSem;, false)
}
}
...
}
每次呼叫此方法將使readerCount
減 1(RLock 方法中增加 1)。如果減完後readerCount
值為負,則表示當前存在 writer 正在等待或執行。這是因為在Lock()
方法中readerCount
減去了rwmutexMaxreader
。然後,當檢查到將完成的 reader 數量(readerWait 數值)最終為 0 時,則表示 writer 可以最終申請訊號量。(譯者註:r < 0
時,存在兩個分支,當走 r+1 == 0 的分支時,表示 readerCount 此時為 0 即沒有 RLock,所以 throw 了。當走下麵那個分支時,r < 0
則是因為存在 writer 把 readerCount 置為了負數在等待 reader 結束,那麼當最後一個 reader 解鎖時需要將 WriteSem 訊號量加 1,喚醒 writer)
解鎖 Unlock
實現原始碼:
func (rw *RWMutex) Unlock() {
...
r := atomic.AddInt32(&rw.readerCount;, rwmutexMaxreader)
if r >= rwmutexMaxreader {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i int(r); i++ {
runtime_Semrelease(&rw.readerSem;, false)
}
rw.w.Unlock()
...
}
解鎖被 writer 持有的互斥鎖時,首先透過atomic.AddInt32
將readerCount
加上rwmutexMaxreader
,這時readerCount
將變成非負值。如readerCount
比 0 大,則表示存在 reader 正在等待 writer 執行完成,此時應喚醒這些等待的 reader。之後寫鎖將被釋放,從而允許其他 writer 為了寫入而鎖定互斥鎖。(譯者註:如果還存在掛起的 reader,則在 writer 解鎖之前需要透過訊號量 readerSem 喚醒這些 reader 執行)
如果 reader 或 writer 嘗試解鎖未鎖定的互斥鎖時,呼叫Unlock
或Runlock
方法將丟擲錯誤(示例原始碼)。
m := sync.RWMutex{}
m.Unlock()
輸出:
fatal error: sync: Unlock of unlocked RWMutex
...
遞迴讀鎖定 Recursive read locking
檔案描述:
如果一個 reader goroutine 持有了讀鎖,而此時另一個 writer goroutine 呼叫
Lock
申請加寫鎖,此後在最初的讀鎖被釋放前其他 goroutine 不能獲取到讀鎖。特定情況下,這能防止遞迴讀鎖,這種策略保證了鎖的可用性,Lock
的呼叫會阻止其他新的 reader 來獲得鎖。
RWMutex 的工作方式是,如果有一個 writer 呼叫了 Lock,則所有呼叫 RLock 都將被鎖定,無論是否已經獲得了讀鎖定(示例原始碼):
示例程式碼:
package main
import (
"fmt"
"sync"
"time"
)
var m sync.RWMutex
func f(n int) int {
if n 1 {
return 0
}
fmt.Println("RLock")
m.RLock()
defer func() {
fmt.Println("RUnlock")
m.RUnlock()
}()
time.Sleep(100 * time.Millisecond)
return f(n-1) + n
}
func main() {
done := make(chan int)
go func() {
time.Sleep(200 * time.Millisecond)
fmt.Println("Lock")
m.Lock()
fmt.Println("Unlock")
m.Unlock()
done 1
}()
f(4)
}
輸出:
RLock
RLock
RLock
Lock
RLock
fatal error: all goroutines are asleep - deadlock!
譯者註(至下一節以前均為譯者註):為什麼會傳送死鎖呢?原作者用遞迴函式在 defer 裡面解鎖,那麼在加第三層讀鎖的時候,還沒有讀鎖解鎖。這時,readCount 是 3,此時正好加了一個 Lock 寫鎖,由於 readCount 是 3
if r != 0 && atomic.AddInt32(&rw.readerWait;, r) != 0 {
runtime_Semacquire(&rw.writerSem;)
}
由上可知,此時 writer 需要等待所有進行中的 reader 完成,此時又呼叫了 RLock,
if atomic.AddInt32(&rw.readerCount;, 1) 0 {
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem;)
}
由於在第四個 RLock 前,加了 Lock 操作,使得 readerCount 為負數。所以就造成了死鎖,即 reader 在等待 readerSem,writer 在等待 writerSem*
複製鎖 Copying locks
go tool vet
可以檢測鎖是否被覆制了,因為複製鎖會導致死鎖。更多關於此問題可參考之前的文章:Detect locks passed by value in Go
效能 Performance
之前有人發現,在 CPU 核數增多時 RWMutex 的效能會有下降,詳見:sync: RWMutex scales poorly with CPU count
爭用 Contention
Go 版本 ≥ 1.8 之後,支援分析爭用的互斥鎖(runtime: Profile goroutines holding contended mutexes.)。我們來看下如何做:
package main
import (
"net/http"
_ "net/http/pprof"
"runtime"
"sync"
"time"
)
func main() {
var mu sync.Mutex
runtime.SetMutexProfileFraction(5)
for i := 0; i 10; i++ {
go func() {
for {
mu.Lock()
time.Sleep(100 * time.Millisecond)
mu.Unlock()
}
}()
}
http.ListenAndServe(":8888", nil)
}
> go build mutexcontention.go
> ./mutexcontention
當mutexcontention
程式執行時,執行 pprof:
> go tool pprof mutexcontention http://localhost:8888/debug/pprof/mutex?debug=1
Fetching profile over HTTP from http://localhost:8888/debug/pprof/mutex?debug=1
Saved profile in /Users/mlowicki/pprof/pprof.mutexcontention.contentions.delay.003.pb.gz
File: mutexcontention
Type: delay
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) list main
Total: 57.28s
ROUTINE main.main.func1 in .../src/github.com/mlowicki/mutexcontention/mutexcontention.go
0 57.28s (flat, cum) 100% of Total
. . 14: for i := 0; i . . 15: go func() {
. . 16: for {
. . 17: mu.Lock()
. . 18: time.Sleep(100 * time.Millisecond)
. 57.28s 19: mu.Unlock()
. . 20: }
. . 21: }()
. . 22: }
. . 23:
. . 24: http.ListenAndServe(":8888", nil)
註意,為什麼這裡耗時 57.28s,且指向了mu.Unlock()
呢?
當 goroutine 呼叫Lock
而阻塞時,會記錄當前發生的準確時間–叫做acquiretime
。當另一個 groutine 解鎖,至少存在一個 goroutine 在等待獲得鎖,則其中一個解除阻塞並呼叫其mutexevent
函式。該mutexevent
函式透過檢查SetMutexProfileFraction
設定的速率來決定此事件應被保留還是丟棄。此事件包含整個等待的時間(當前時間 – 獲得時間)。從上面的例子可以看出,所有阻塞在特定互斥鎖的 goroutines 的總等待時間會被收集和展示。
在 Go 1.11(sync: enable profiling of RWMutex)中將增加讀鎖(Rlock 和 RUnlock)的爭用。
資料 Resources
- Allen B. Downey: The Little Book of Semaphores
- Documentation of sync.RWMutex
- Wikipedia: reader-writer problem
- Reusable barriers in Golang
- Synchronization queues in Golang
備註:
- critical section:臨界區
- Mutual exclusion,縮寫 Mutex:互斥鎖