併發是程式開發中不可避免的問題,根據系統面向使用者、功能場景的不同,併發的重視程度會有不同。從程式的角度來說,併發意味著相同的時間點執行了相同的程式碼,而有些情況是不被允許的,比如:轉賬、搶購佔庫存等,如果沒有做好臨界條件的驗證,會帶來非常嚴重的後果。追根結底是因為併發引起的資料不一致問題,面對併發,我們通常會採用鎖來最佳化。
場景模擬
如下模擬搶購的示例程式碼(C#):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// 有10個商品庫存 private static int stockCount = 10; public bool Buy() { // 模擬執行的邏輯程式碼花費的時間 Thread.Sleep(new Random().Next(100,500)); if (stockCount > 0) { stockCount--; return true; } return false; } |
1 2 3 4 5 6 7 8 9 10 11 |
var test = new Test(); Parallel.For(1, 16, (i) => { var stopwatch = new Stopwatch(); stopwatch.Start(); var data = test.Buy(); stopwatch.Stop(); Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}"); }); Console.ReadKey(); |
模擬並行呼叫 Buy 方法 15 次(內部使用的是執行緒池,所以 ThreadId 會有重覆),實際上只有 10 個庫存,傳回結果卻顯示 11 個請求都購買成功了。
單機部署樣式解決方案
在單機部署樣式下,我們只需要加 lock(){} 就可以解決問題:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
// 有10個商品庫存 private static int stockCount = 10; private static object obj = new object(); public bool Buy() { lock (obj) { // 模擬執行的邏輯程式碼花費的時間 Thread.Sleep(new Random().Next(100, 500)); if (stockCount > 0) { stockCount--; return true; } return false; } } |
從輸出結果中可以看出,確實只有10個請求是顯示購買成功,但同時發現部分請求的執行時間明顯變長,這就是加鎖帶來的最直觀影響,當某個執行緒獲得鎖之後,在沒有釋放之前,其他執行緒只能繼續等待,併發越高,更多的執行緒需要等待輪流被處理。
各種語言一般都提供了鎖的實現,用法大同小異,語言本身實現的鎖只能作用於當前行程內,所以在單機樣式部署的系統中使用基本沒什麼問題。
叢集部署樣式解決方案(分散式鎖)
在叢集樣式下,系統部署於多臺機器(一個系統執行在多個行程中),語言本身實現的鎖只能確保當前行程內有效(基於記憶體),多行程就沒辦法共享鎖狀態,這時我們就得考慮採用分散式鎖,分散式鎖可以採用 資料庫、ZooKeeper、Redis 等來實現,最終都是為了達到在不同的行程、執行緒內能共享鎖狀態的目的。
這裡將介紹基於 Redis 的 RedLock.net 來解決分散式下的併發問題,RedLock.net 是 RedLock 分散式鎖演演算法的 .NET 版實現 (大部分語言都有對應的實現,檢視) ,RedLock 分散式鎖演演算法是由 Redis 的作者提出。
RedLock 簡介
RedLock 的思想是使用多臺 Redis Master ,節點完全獨立,節點間不需要進行資料同步,因為 Master-Slave 架構一旦 Master 發生故障時資料沒有複製到 Slave,被選為 Master 的 Slave 就丟掉了鎖,另一個客戶端就可以再次拿到鎖。鎖透過 setNX(原子操作) 命令設定,在有效時間內當獲得鎖的數量大於 (n/2+1) 代表成功,失敗後需要向所有節點傳送釋放鎖的訊息。
獲取鎖:
1
|
SET resource_name my_random_value NX PX 30000
|
釋放鎖:
1 2 3 4 5 |
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end |
RedLock.net 整合
-
建立 .NETCore API 專案
-
Nuget 安裝 RedLock.net
1
Install-Package RedLock.net
-
appsettings.json 新增 redis 配置
1 2 3 4
{ "RedisUrl": "127.0.0.1:6379", // 多個用,分割 ... }
-
新增 ProductService.cs,模擬商品購買
1 2 3 4 5 6 7 8 9 10 11 12 13
// 有10個商品庫存,如果同時啟動多個API服務進行測試,這裡改成存資料庫或其他方式 private static int stockCount = 10; public async Task BuyAsync() { // 模擬執行的邏輯程式碼花費的時間 await Task.Delay(new Random().Next(100, 500)); if (stockCount > 0) { stockCount--; return true; } return false; }
-
修改 Startup.cs ,建立 RedLockFactory
定義 RedLockFactory 變數:
1
private RedLockFactory lockFactory;
新增方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
private RedLockFactory GetRedLockFactory() { var redisUrl = Configuration["RedisUrl"]; if (string.IsNullOrEmpty(redisUrl)) { throw new ArgumentException("RedisUrl 不能為空"); } var urls = redisUrl.Split(",").ToList(); var endPoints = new List(); foreach (var item in urls) { var arr = item.Split(":"); endPoints.Add(new DnsEndPoint(arr[0], Convert.ToInt32(arr[1]))); } return RedLockFactory.Create(endPoints); }
在 ConfigureServices 註入 IDistributedLockFactory:
1 2 3
lockFactory = GetRedLockFactory(); services.AddSingleton(typeof(IDistributedLockFactory), lockFactory); services.AddScoped(typeof(ProductService));
修改 Configure,應用程式結束時釋放 lockFactory :
1 2 3 4 5 6 7 8 9
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime) { ... lifetime.ApplicationStopping.Register(() => { lockFactory.Dispose(); }); }
-
在 Controller 新增方法 DistributedLockTest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
private readonly IDistributedLockFactory _distributedLockFactory; private readonly ProductService _productService; public HomeController(IDistributedLockFactory distributedLockFactory, ProductService productService) { _distributedLockFactory = distributedLockFactory; _productService = productService; } [HttpGet] public async Task DistributedLockTest() { var productId = "id"; // resource 鎖定的物件 // expiryTime 鎖定過期時間,鎖區域內的邏輯執行如果超過過期時間,鎖將被釋放 // waitTime 等待時間,相同的 resource 如果當前的鎖被其他執行緒佔用,最多等待時間 // retryTime 等待時間內,多久嘗試獲取一次 using (var redLock = await _distributedLockFactory.CreateLockAsync(productId, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(20))) { if (redLock.IsAcquired) { var result = await _productService.BuyAsync(); return result; } else { Console.WriteLine($"獲取鎖失敗:{DateTime.Now}"); } } return false; }
-
呼叫介面測試
1 2 3 4 5 6 7 8
Parallel.For(1, 16, (i) => { var stopwatch = new Stopwatch(); stopwatch.Start(); var data = GetAsync($"http://localhost:5000/home/distributedLockTest").Result; stopwatch.Stop(); Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}"); });
關於 RedLock 分散式鎖演演算法的爭議大家可以參考:
How to do distributed locking
Is Redlock safe?
總結
如果使用鎖,必然對效能上會有一定影響,我們需要根據實際場景來判斷是真正需要。在指定鎖過期時間時要相對合理,避免出現鎖已過期,但邏輯還沒執行完成,這樣就失去了鎖的意義,當然這種情況下我們還可以考慮重入鎖。
最後推薦一下微軟開源的一個基於 Actor 模型的分散式框架 Orleans,也可以達到分散式鎖的效果。
參考連結
- Distributed locks with Redis
- RedLock.net
朋友會在“發現-看一看”看到你“在看”的內容