歡迎光臨
每天分享高質量文章

C#並行程式設計(6):執行緒同步面面觀

理解執行緒同步

執行緒的資料訪問

在並行(多執行緒)環境中,不可避免地會存在多個執行緒同時訪問某個資料的情況。多個執行緒對共享資料的訪問有下麵3種情形:

  1. 多個執行緒同時讀取資料;
  2. 單個執行緒更新資料,此時其他執行緒讀取資料;
  3. 多個執行緒同時更新資料。

顯而易見,多個執行緒同時讀取資料是不會產生任何問題的。僅有一個執行緒更新資料的時候,貌似也沒有問題,但真的沒有問題嗎?多個執行緒同時更新資料,很明顯,你可能把我的更改改寫掉了,資料從此不再可信。

什麼是執行緒同步

為瞭解決多執行緒同時訪問共享資料可能導致資料被破壞的問題,我們需要採取一些措施來保證資料的一致性,讓每個執行緒都能準確地讀取或更新資料。

問題的根源在於多個執行緒同時訪問資料,那麼只要我們保證同一時間只有一個執行緒訪問資料,就能解決問題。保證同一時間只有一個執行緒訪問資料的處理,就是執行緒同步了。我在訪問資料的時候,你們都先等著,我完事了你們再來。

C#中的執行緒同步

.NET提供了很多執行緒同步的方式,這些方式分為使用者樣式和核心樣式以及混合樣式(即使用者樣式與核心樣式的結合),下麵會總結C#/.NET中各樣式下的執行緒同步。

使用者樣式與核心樣式

Windows作業系統下,CPU跟據所執行程式碼的不同,會在兩種樣式下進行切換。CPU執行應用程式程式碼(如我們開發的.NET程式)時,一般執行在使用者樣式下;執行作業系統核心程式碼(核心函式或者某些裝置驅動程式)時,CPU則切換到核心樣式。

使用者樣式的程式碼只能訪問自身行程的專有地址空間,程式碼異常不會影響到其他程式或者作業系統;核心樣式的所有程式碼共享單個地址空間,程式碼異常將可能導致系統崩潰。CPU的樣式切換,是為了保證應用程式和作業系統的穩定性。

應用程式中,執行緒可以透過Windows API呼叫作業系統核心函式,這時候執行執行緒的CPU將從使用者樣式切換到核心樣式,執行完作業系統函式後,再由核心樣式切換到使用者樣式。CPU的樣式切換是很耗時的,據《Windows核心程式設計》中的描述,CPU樣式的切換,要佔用1000個以上的CPU週期。因此,在我們的.NET程式中,應該盡可能地避免CPU的樣式切換。

使用者樣式執行緒同步

使用者樣式下,利用特殊的CPU指令來協調執行緒,使同一時間只有一個執行緒能訪問某記憶體地址,這種協調在硬體中發生,速度很快。這種樣式下,CPU指令對執行緒的阻塞很短暫,作業系統排程執行緒時不會認為該執行緒已被阻塞,這種情況下,執行緒池不會建立新的執行緒來替換該執行緒。

使用者樣式下,等待資源的執行緒會一直被作業系統排程,導致執行緒的“自旋”並因此浪費很多的CPU資源。如果某執行緒一直佔著資源不釋放,等待該資源的執行緒將一直處於自旋狀態,這樣就造成了“活鎖”,活鎖除了浪費記憶體外,還會浪費大量CPU。

.NET提供兩種使用者樣式的執行緒同步,volatileinterlocked,即易變和互鎖。

volatile關鍵字和Volatile

上面我們遺留了一個問題:只有一個執行緒更新資料,其他執行緒讀取資料,會不會出現問題?先看一個例子:

private static bool _stop;
public static void Run()
{
    Task.Run(() =>
    {
        int number = 1;
        while (!_stop) 
        {
            number++;
        }
        Console.WriteLine($"increase stopped,value = {number}");
    });

    Thread.Sleep(1000);
    _stop = true;
}

編譯器和CPU會對上面的程式碼進行最佳化(除錯樣式不會最佳化),任務執行緒在執行時,會把_stop讀取到CPU暫存器中,while迴圈的時候,每次都從當前CPU暫存器中讀取_stop;同樣,主執行緒執行的時候CPU也會把_stop讀取到暫存器,更新_stop時,先更新是CPU暫存器中的_stop值,再把值存到變數_stop;在並行環境中,主執行緒和任務執行緒獨立執行,主執行緒對_stop的更新並不會公開到任務執行緒,這樣,任務執行緒的while迴圈便不會停止,永遠無法得到輸出。

把變數讀到暫存器只是CPU最佳化程式碼的一種方式,CPU還可能調整程式碼的執行順序,當前,CPU任務這種調整不會改變程式碼的意圖。上面的程式碼說明,由於編譯器和CPU的最佳化,只有一個執行緒更新資料,也可能存在問題

這種情況,我們可以使用volatile關鍵字或者類System.Threading.Volatile來阻止編譯器和CPU的最佳化,這種阻止利用的是記憶體屏障MemoryBarrier,它告訴CPU在執行完屏障之前的記憶體存取後才能執行屏障後面的記憶體存取。上面程式碼的問題在於,while迴圈讀取到的值總是CPU暫存器中的false。我們把while迴圈的條件改成!Volatile.Read(ref _stop)或者把用volatile宣告變數_stop,while條件直接讀取記憶體中的值,問題就能得到解決。

Interlocked原子訪問

.NET提供的另一種使用者樣式執行緒同步方式是System.Threading.InterlockedInterlocked的工作依賴於程式碼執行的CPU平臺,如果是X86的CPU,Interlocked函式會在匯流排上維持一個硬體訊號,來阻止其他CPU訪問同一記憶體地址(《Windows核心程式設計第五版》)。計算機對變數的修改一般來說並不是原子性的,而是分為3個步驟:

  1. 將變數值載入到CPU暫存器
  2. 改變值
  3. 將更新後的值儲存到記憶體中

假如執行了前兩個步驟後,CPU被搶佔,變數在之前執行緒中的修改將丟失。Interlocked函式保證對值的修改是原子性的,一個執行緒完成變數的修改和儲存後,另一個執行緒才能修改變數

System.Threading.Interlocked提供了很多方法,例如遞增、遞減、求和等,下麵用Interlocked的遞增方法展示其執行緒同步功能。

public static void Run()
{
    DoIncrease(100000);
}

private static void DoIncrease(int incrementPerThread)
{
    int number1 = 0;
    int number2 = 0;

    Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");

    IList increaseTasks = new List();

    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number1.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            Interlocked.Increment(ref number1);
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number1.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            Interlocked.Increment(ref number1);
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number2.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            number2++;
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} increasing number2.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            number2++;
        }
    }));

    Task.WaitAll(increaseTasks.ToArray());

    Console.WriteLine($"use interlocked: number1 result = {number1}");
    Console.WriteLine($"normal increase: number2 result = {number2}");
}

執行上面的程式碼多次(每個執行緒增加的數量儘量大,否則不容易體現結果),每次number1的結果都一樣,number2的結果都不同,足以體現Interlocked的執行緒同步功能。

SpinLock自旋鎖

System.Threading.SpinLock是基於InterLocked和SpinWait實現的輕量級自旋鎖,具體的實現方式這裡不去關心。SpinLock的簡單用法如下:

private static SpinLock _spinlock = new SpinLock();
public static void DoWork()
{
    bool lockTaken = false;
    try
    {
        _spinlock.Enter(ref lockTaken);
        
    }
    finally
    {
        if (lockTaken)
        {
            _spinlock.Exit(false);
        }
    }
}

SpinLock很輕量級,效能較高,但由於是自旋鎖,鎖定的操作應該是很快完成,否則會因執行緒自旋而浪費CPU。

核心樣式執行緒同步

除了使用者樣式的兩種執行緒同步方式,我們還會利用Windows系統的核心物件實現執行緒的同步。使用系統核心物件將會導致執行執行緒的CPU執行樣式的切換,這會有很大的消耗,所以能夠使用使用者樣式的執行緒同步就儘量避免使用核心樣式。

核心樣式下,執行緒在等待資源時會被系統阻塞,避免了CPU的浪費,這是核心樣式優勢。假如執行緒等待的資源一直被佔用則執行緒將一直處於阻塞狀態,造成“死鎖”。相對於活鎖,死鎖只會浪費記憶體資源。

我們使用系統核心中的事件、訊號量和互斥量進行核心樣式的執行緒同步。

利用核心事件實現執行緒同步

事件實際上是由系統核心維護的一個布林值。

.NET提供System.Threading.EventWaitHandle進行執行緒的訊號互動。EventWaitHandle繼承WaitHandle(封裝等待對共享資源獨佔訪問的作業系統特定的物件),有三個關鍵方法:

  • Set():將事件狀態設定為終止狀態,允許一個或多個等待執行緒繼續。
  • Reset():將事件狀態設定為非終止狀態,導致執行緒阻塞
  • WaitOne():阻塞執行緒直到收到事件狀態訊號

執行緒互動事件有自動重置和手動重置兩種型別,分別由AutoResetEventManualResetEvent繼承EventWaitHandle得到。自動重置事件在Set喚醒第一個阻塞執行緒之後,會自動Reset事件,其他阻塞執行緒仍保持阻塞狀態;而手動重置事件Set時,會喚醒所有被該事件阻塞的執行緒,手動Reset後,事件才會繼續起作用。手動重置事件的這種性質,導致它不能用於執行緒同步,因為不能保證同一時間只有一個執行緒訪問資源;相反,自動重置時間則很適合用來處理執行緒同步。

下麵的例子演示了利用自動重置時間進行的執行緒同步。

public static void Run()
{
    DoIncrease(100000);
}

private static void DoIncrease(int incrementPerThread)
{
    int number = 0;
    Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");

    AutoResetEvent are = new AutoResetEvent(true);

    IList increaseTasks = new List();
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            are.WaitOne();
            number++;
            are.Set();
        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            are.WaitOne();
            number++;
            are.Set();
        }
    }));

    Task.WaitAll(increaseTasks.ToArray());
    are.Dispose();
    Console.WriteLine($"use AutoResetEvent: result = {number}");
}

利用訊號量進行執行緒同步

訊號量是系統核心維護的一個整型變數。

訊號量值為0時,所有等待訊號量的執行緒會被阻塞;訊號量值大於零0,等待的執行緒會被解除阻塞,每喚醒一個阻塞的執行緒,系統核心就會把訊號量的值減1。此外,我們能夠對訊號量進行最大值限制,從而控制訪問同一資源的最大執行緒數量。

.Net中,利用System.Threading.Semaphore進行訊號量操作。下麵時利用訊號量實現執行緒同步的一個例子。

public static void Run()
{
    DoIncrease(100000);
}

private static void DoIncrease(int incrementPerThread)
{
    int number = 0;
    Console.WriteLine($"use two threads to increase zero. each thread increase {incrementPerThread}.");

    Semaphore semaphore = new Semaphore(1,1);

    IList increaseTasks = new List();
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            semaphore.WaitOne();
            number++;
            semaphore.Release(1);

        }
    }));
    increaseTasks.Add(Task.Run(() =>
    {
        Console.WriteLine($"thread #{Thread.CurrentThread.ManagedThreadId} is increasing the number.");
        for (int i = 0; i < incrementPerThread; i++)
        {
            semaphore.WaitOne();
            number++;
            semaphore.Release(1);

        }
    }));

    Task.WaitAll(increaseTasks.ToArray());
    semaphore.Dispose();
    Console.WriteLine($"use Semaphore: result = {number}");
}

利用互斥體行程執行緒同步

互斥體Mutex的使用與自動重置事件和訊號量類似,這裡不再進行詳細的總結。

互斥體常被用來保證應用程式只有一個實體執行,具體用法如下:

bool createNew;
using (new Mutex(true, Assembly.GetExecutingAssembly().FullName, out createNew))
{
    if (!createNew)
    {
        
        Environment.Exit(0);
    }
    else
    {
        
    }
}

執行緒同步的混合樣式

透過上面的總結我們知道,使用者樣式和核心樣式由各自的優缺點,需要有一種樣式既能兼顧使用者和核心樣式的優點又能避免他們的缺點,這就是混合樣式。

混合樣式會優先使用使用者樣式的執行緒同步處理,當多個執行緒競爭同步鎖的時候,才會使用核心物件進行處理。如果多個執行緒一直不產生資源競爭,就不會發生CPU使用者樣式到核心樣式的轉換,開始資源競爭時,又會透過執行緒阻塞來防止CPU資源的浪費。

.NET中提供了多種混合樣式的執行緒同步方式。例如手工重置事件和訊號量的簡化版本ManualResetEventSlimSemaphoreSlim,他們是執行緒在使用者樣式中自旋,直到發生資源競爭。具體使用與各自的核心樣式一樣,這裡不再贅述。

lock關鍵字和Monitor

相信lock加鎖是很多人做常用的執行緒同步方式。lock的使用很簡單,如下:

private static readonly object _syncObject = new object();
public static void DoWork()
{
    lock (_syncObject)
    {
        
    }
}

實際上,lock語法是對System.Threading.Monitor使用的一種簡化,Monitor的用法如下:

private static readonly object _syncObject = new object();
public static void DoWork()
{
    Monitor.Enter(_syncObject);
    
    Monitor.Exit(_syncObject);
}

使用Monitor的可能會出先一些意象不到的問題。例如,如果不相關的業務程式碼在使用Monitor進行執行緒同步的時候,鎖定了同一字串,將會造成不相關業務程式碼的同步執行;此外需要註意的是,Monitor不能使用值型別作為鎖物件,值型別會被裝箱,裝箱後的物件不同,將導致無法同步。

讀寫鎖ReaderWriterLockSlim

ReaderWriterLockSlim可以用來實現多執行緒讀取或獨佔寫入的資源訪問。讀寫鎖的執行緒控制邏輯如下:

  • 一個執行緒寫資料時,其他請求資源的執行緒全部被阻塞;
  • 一個執行緒讀資料時,寫執行緒被阻塞,其他讀執行緒能繼續執行;
  • 寫結束時,解除其他某個寫執行緒的阻塞,或者解除所有讀執行緒的阻塞;
  • 讀結束時,解除一個寫執行緒的阻塞。

下麵是讀寫鎖的簡單用法,詳細用法可參考msdn檔案。

private static readonly ReaderWriterLockSlim _rwlock = new ReaderWriterLockSlim();
public static void DoWork()
{
_rwlock.EnterWriteLock();

_rwlock.ExitWriteLock();
}

ReaderWriterLockSlim還有一個比較老的版本ReaderWriterLock,據說存在較多問題應儘量避免使用。

執行緒安全集合

.NET除了提供包含上面總結到的各種執行緒同步的諸多方式外,還封裝了一些執行緒安全集合。這些集合在內部實現了執行緒同步,我們直接使用即可,很友好。執行緒安全集合在名稱空間System.Collections.Concurrent下,包括ConcurrentQueue (T),ConcurrentStack,ConcurrentDictionary,ConcurrentBag,BlockingCollection,具體可閱讀《何時使用執行緒安全集合》。

各種執行緒同步效能對比

下麵我們對整數零進行多執行緒遞增操作,每個執行緒固定遞增量,來測試以下各種同步方式的效能對比。測試程式碼如下。




private static int _numberToIncrease;

public static void Run()
{
    int increment = 100000;
    int threadCount = 4;
    DoIncrease(increment, threadCount, DoIncreaseByInterLocked);
    DoIncrease(increment, threadCount, DoIncreaseWithSpinLock);
    DoIncrease(increment, threadCount, DoIncreaseWithEvent);
    DoIncrease(increment, threadCount, DoIncreaseWithSemaphore);
    DoIncrease(increment, threadCount, DoIncreaseWithMonitor);
    DoIncrease(increment, threadCount, DoIncreaseWithReaderWriterLockSlim);

}

public static void DoIncrease(int increment, int threadCount, Action<int> action)
{
_numberToIncrease = 0;
IList increaseTasks = new List(threadCount);
Stopwatch watch = Stopwatch.StartNew();
for (int i = 0; i < threadCount; i++)
{
increaseTasks.Add(Task.Run(() => action(increment)));
}
Task.WaitAll(increaseTasks.ToArray());
Console.WriteLine($"{action.Method.Name}=> Result: {_numberToIncrease} , Time: {watch.ElapsedMilliseconds} ms.");
}

#region 使用Interlocked,使用者樣式

public static void DoIncreaseByInterLocked(int increment)
{
for (int i = 0; i < increment; i++)
{
Interlocked.Increment(ref _numberToIncrease);
}
}

#endregion

#region 使用SpinLock,使用者樣式

private static SpinLock _spinlock = new SpinLock();
public static void DoIncreaseWithSpinLock(int increment)
{
for (int i = 0; i < increment; i++)
{
bool lockTaken = false;
try
{
_spinlock.Enter(ref lockTaken);
_numberToIncrease++;
}
finally
{
if (lockTaken)
{
_spinlock.Exit(false);
}
}
}
}

#endregion

#region 使用訊號量Semaphore,核心樣式

private static readonly Semaphore _semaphore = new Semaphore(1, 10);

public static void DoIncreaseWithSemaphore(int increment)
{
for (int i = 0; i < increment; i++)
{
_semaphore.WaitOne();
_numberToIncrease++;
_semaphore.Release(1);
}
}

#endregion

#region 使用事件AutoResetEvent,核心樣式

private static readonly AutoResetEvent _are = new AutoResetEvent(true);
public static void DoIncreaseWithEvent(int increment)
{
for (int i = 0; i < increment; i++)
{
_are.WaitOne();
_numberToIncrease++;
_are.Set();
}
}

#endregion

#region 使用Monitor,混合樣式

private static readonly object _monitorLocker = new object();
public static void DoIncreaseWithMonitor(int increment)
{
for (int i = 0; i < increment; i++)
{
bool lockTaken = false;
try
{
Monitor.Enter(_monitorLocker, ref lockTaken);
_numberToIncrease++;
}
finally
{
if (lockTaken)
{
Monitor.Exit(_monitorLocker);
}
}
}
}

#endregion

#region 使用ReaderWriterLockSlim,混合樣式

private static readonly ReaderWriterLockSlim _rwlock = new ReaderWriterLockSlim();
public static void DoIncreaseWithReaderWriterLockSlim(int increment)
{
for (int i = 0; i < increment; i++)
{
_rwlock.EnterWriteLock();
_numberToIncrease++;
_rwlock.ExitWriteLock();
}
}

#endregion

下麵是一組測試結果,可以很明顯地看出,核心樣式是相當耗時的,應儘量避免使用。而使用者樣式和混合樣式,也需要根據具體的場景進行選擇。這個測試過於簡單,不具有普遍性。

DoIncreaseByInterLocked=> Result: 400000 , Time: 15 ms.
DoIncreaseWithSpinLock=> Result: 400000 , Time: 75 ms.
DoIncreaseWithEvent=> Result: 400000 , Time: 1892 ms.
DoIncreaseWithSemaphore=> Result: 400000 , Time: 1779 ms.
DoIncreaseWithMonitor=> Result: 400000 , Time: 14 ms.
DoIncreaseWithReaderWriterLockSlim=> Result: 400000 , Time: 22 ms.

小結

本文對C#/.NET中的執行緒同步進行了儘量詳盡的總結,並行環境中在追求程式的高效能、響應性的同時,務必要保證資料的安全性。

C#並行程式設計系列的文章暫時就告一段落了。剛開始寫部落格,文章肯定存在不少問題,歡迎各位博友指出。

已同步到看一看
贊(0)

分享創造快樂