作者:碼農阿宇
連結:http://www.cnblogs.com/CoderAyu/p/10680805.html
AsyncStreamsInCShaper 8.0
很開心今天能與大家一起聊聊C# 8.0中的新特性-Async Streams,一般人通常看到這個詞表情是這樣.
簡單說,其實就是C# 8.0中支援await foreach.
或者說,C# 8.0中支援非同步傳回列舉型別async Task>.
好吧,還不懂?Good,這篇文章就是為你寫的,看完這篇文章,你就能明白它的神奇之處了.
為什麼寫這篇文章
Async Streams這個功能已經釋出很久了,在去年的Build 2018 The future of C#就有演示,最近VS 2019釋出,在該版本的Release Notes中,我再次看到了這個新特性,因為對非同步程式設計不太熟悉,所以藉著這個機會,學習新特性的同時,把非同步程式設計重溫一遍.
Async / Await
C# 5引入了 Async/Await,用以提高使用者介面響應能力和對 Web 資源的訪問能力。換句話說,非同步方法用於執行不阻塞執行緒並傳回一個標量結果的非同步操作。
微軟多次嘗試簡化非同步操作,因為 Async/Await 樣式易於理解,所以在開發人員當中獲得了良好的認可。
常規示例
要瞭解問什麼需要Async Streams,我們先來看看這樣的一個示例,求出5以內的整數的和.
static int SumFromOneToCount(int count)
{
ConsoleExt.WriteLine("SumFromOneToCount called!");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
}
return sum;
}
呼叫方法.
static void Main(string[] args)
{
const int count = 5;
ConsoleExt.WriteLine($"Starting the application with count: {count}!");
ConsoleExt.WriteLine("Classic sum starting.");
ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}");
ConsoleExt.WriteLine("Classic sum completed.");
ConsoleExt.WriteLine("################################################");
}
輸出結果.
可以看到,整個過程就一個執行緒Id為1的執行緒自上而下執行,這是最基礎的做法.
Yield Return
接下來,我們使用yield運運算元使得這個方法程式設計延遲載入,如下所示.
static IEnumerable<int> SumFromOneToCountYield(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountYield called!");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
yield return sum;
}
}
主函式
static void Main(string[] args)
{
const int count = 5;
ConsoleExt.WriteLine("Sum with yield starting.");
foreach (var i in SumFromOneToCountYield(count))
{
ConsoleExt.WriteLine($"Yield sum: {i}");
}
ConsoleExt.WriteLine("Sum with yield completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);
}
執行結果如下.
正如你在輸出視窗中看到的那樣,結果被分成幾個部分傳回,而不是作為一個值傳回。以上顯示的累積結果被稱為惰性列舉。但是,仍然存在一個問題,即 sum 方法阻塞了程式碼的執行。如果你檢視執行緒ID,可以看到所有東西都在主執行緒1中執行,這顯然不完美,繼續改造.
Async Return
我們試著將async用於SumFromOneToCount方法(沒有yield關鍵字).
static async Task<int> SumFromOneToCountAsync(int count)
{
ConsoleExt.WriteLine("SumFromOneToCountAsync called!");
var result = await Task.Run(() =>
{
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
}
return sum;
});
return result;
}
主函式
static async Task Main(string[] args)
{
const int count = 5;
ConsoleExt.WriteLine("async example starting.");
// Sum runs asynchronously! Not enough. We need sum to be async with lazy behavior.
var result = await SumFromOneToCountAsync(count);
ConsoleExt.WriteLine("async Result: " + result);
ConsoleExt.WriteLine("async completed.");
ConsoleExt.WriteLine("################################################");
ConsoleExt.WriteLine(Environment.NewLine);
}
執行結果.
我們可以看到計算過程是在另一個執行緒中執行,但結果仍然是作為一個值傳回!任然不完美.
如果我們想把惰性列舉(yield return)與非同步方法結合起來,即傳回Task
Task
我們根據假設把程式碼改造一遍,使用Task>來進行計算.
可以看到,直接出現錯誤.
IAsyncEnumerable
其實,在C# 8.0中Task這種組合稱為IAsyncEnumerable。
這個新功能為我們提供了一種很好的技術來解決拉非同步延遲載入的問題,例如從網站下載資料或從檔案或資料庫中讀取記錄,與 IEnumerable 和 IEnumerator 類似,Async Streams 提供了兩個新介面 IAsyncEnumerable 和 IAsyncEnumerator,定義如下:
public interface IAsyncEnumerable
{
IAsyncEnumerator GetAsyncEnumerator();
}
public interface IAsyncEnumerator : IAsyncDisposable
{
Task<bool> MoveNextAsync();
T Current { get; }
}
// Async Streams Feature 可以被非同步銷毀
public interface IAsyncDisposable
{
Task DiskposeAsync();
}
AsyncStream
下麵,我們就來見識一下AsyncStrema的威力,我們使用IAsyncEnumerable來對函式進行改造,如下.
static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence)
{
ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called");
await foreach (var value in sequence)
{
ConsoleExt.WriteLineAsync($"Consuming the value: {value}");
// simulate some delay!
await Task.Delay(TimeSpan.FromSeconds(1));
};
}
private static async IAsyncEnumerable<int> ProduceAsyncSumSeqeunc(int count)
{
ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called");
var sum = 0;
for (var i = 0; i <= count; i++)
{
sum = sum + i;
// simulate some delay!
await Task.Delay(TimeSpan.FromSeconds(0.5));
yield return sum;
}
}
主函式
static async Task Main(string[] args)
{
const int count = 5;
ConsoleExt.WriteLine("Starting Async Streams Demo!");
// Start a new task. Used to produce async sequence of data!
IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count);
// Start another task; Used to consume the async data sequence!
var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence));
await Task.Delay(TimeSpan.FromSeconds(3));
ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#");
// Just for demo! Wait until the task is finished!
await consumingTask;
ConsoleExt.WriteLineAsync("Async Streams Demo Done!");
}
如果一切順利,那麼就能看到這樣的執行結果了.
最後,看到這就是我們想要的結果,在列舉的基礎上,進行了非同步迭代.
可以看到,整個計算過程並沒有造成主執行緒的阻塞,其中,值得重點關註的是紅色方框區域的執行緒5!執行緒5!執行緒5!執行緒5在請求下一個結果後,並沒有等待結果傳回,而是去了Main()函式中做了別的事情,等待請求的結果傳回後,執行緒5又接著執行foreach中任務.
Client/Server的非同步拉取
如果還沒有理解Async Streams的好處,那麼我藉助客戶端 / 伺服器端架構是演示這一功能優勢的絕佳方法。
同步呼叫
客戶端向伺服器端傳送請求,客戶端必須等待(客戶端被阻塞),直到伺服器端做出響應.
示例中Yield Return就是以這種方式執行的,所以整個過程只有一個執行緒即執行緒1在處理.
非同步呼叫
客戶端發出資料塊請求,然後繼續執行其他操作。一旦資料塊到達,客戶端就處理接收到的資料塊並詢問下一個資料塊,依此類推,直到達到最後一個資料塊為止。這正是 Async Streams 想法的來源。
最後一個示例就是以這種方式執行的,執行緒5詢問下一個資料後並沒有等待結果傳回,而是去做了Main()函式中的別的事情,資料到達後,執行緒5又繼續處理foreach中的任務.
Tips
如果你使用的是.net core 2.2及以下版本,會遇到這樣的報錯.
需要安裝.net core 3.0 preview的SDK(截至至部落格撰寫日期4月9日,.net core SDK最新版本為3.0.100-preview3-010431),安裝好SDK後,如果你是VS 2019正式版,可能無法選擇.net core 3.0,vs 2019 正式版預設情況下沒有開啟對預覽版.net core 3.0的支援.
根據網友補充,需要在VS 2019正式版本中需要開啟使用 .Net core SDK 預覽版,才能建立3.0的專案.
工具 > 選項 > 專案和解決方案 > .Net Core > 使用 .Net core SDK 預覽版
總結
我們已經討論過 Async Streams,它是一種出色的非同步拉取技術,可用於進行生成多個值的非同步計算。
Async Streams 背後的程式設計概念是非同步拉取模型。我們請求獲取序列的下一個元素,並最終得到答覆。
Async Streams 提供了一種處理非同步資料源的絕佳方法,希望對大家能夠有所幫助。
文章中涉及的所有程式碼已儲存在我的GitHub中,請盡情享用!
https://github.com/liuzhenyulive/AsyncStreamsInCShaper8.0
朋友會在“發現-看一看”看到你“在看”的內容