C#中的任務Task
在C#程式設計中,實現並行可以直接使用執行緒,但使用起來很繁瑣;也可以使用執行緒池,執行緒池很大程度上簡化了執行緒的使用,但是也有著一些侷限,比如我們不知道作業什麼時候完成,也取不到作業的傳回值;解決執行緒池侷限性的方案是使用任務
。本文將總結C#中Task
的使用。
類似於執行緒池工作項對非同步操作的封裝,任務是對非同步操作的另一種形式的封裝,這種封裝抽象層次更高,讓我們能夠對非同步操作進行更多的控制。
任務啟動後,透過任務排程器TaskScheduler
來排程。.NET中提供兩種任務排程器,一種是執行緒池任務排程器,也是預設排程器,它會將任務派發給執行緒池工作者執行緒;另一種是背景關係同步任務排程器,它會將任務派發給當前背景關係執行緒,例如GUI執行緒。此外,我們也能自定義任務排程器,例如可以將非同步IO任務派發給執行緒池IO執行緒。
Task的使用方法
隱式使用
Parallel
靜態類除了提供並行迴圈的各種多載,還提供了一個方法Parallel.Invoke
。這個方法可以建立並執行一個或多個非同步任務,使用方法如下:
private static void DoWork(int workId = 0)
{
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started work[{workId}].");
Thread.Sleep(3000);
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] done work[{workId}].");
}
public static void ImplicitUsingOfTask()
{
Parallel.Invoke(()=>DoWork(1),()=>DoWork(2),() => DoWork(3));
}
上例的執行結果如下:
2019/3/27 20:40:18=> Thread[9] started work[1].
2019/3/27 20:40:18=> Thread[12] started work[3].
2019/3/27 20:40:18=> Thread[10] started work[2].
2019/3/27 20:40:21=> Thread[9] done work[1].
2019/3/27 20:40:21=> Thread[12] done work[3].
2019/3/27 20:40:21=> Thread[10] done work[2].
對於簡單的多工並行,使用上述的方式很方便,但是這種方式與執行緒池一樣,我們不能控制任務的執行或者獲取任務傳回值。
顯式使用
相對於使用Parallel.Invoke
執行並行操作,更常用的是使用Task
和Task
提供的方法進行非同步和並行處理。下麵是任務最基本的使用:
Task.Run(() =>
{
});
Task.Factory.StartNew(() =>
{
});
任務的常用操作
獲取任務的傳回值
具有傳回值的任務使用Task
,T
可根據我們的需求指定,下麵是獲取任務傳回值的方法。
Task<int> task = Task<int>.Factory.StartNew(() =>
{
Thread.Sleep(3000);
return DateTime.Now.Day;
});
int day = task.Result;
需要說明的是,獲取任務的結果會阻塞當前執行緒。
等待任務完成
有時候,我們需要等待一些任務全部完成後才能執行後續操作,有時候只要多個任務中的一個完成了,就可以執行後續操作。Task
提供了Wait
、WaitAll
和WaitAny
等方法滿足我們的需求。下麵的例子展示了各種等待方法的使用。
public static void TaskWait()
{
Stopwatch watch = new Stopwatch();
#region 場景1:等待一個任務完成
Task task = Task.Run(() => DoWorkOfTask(1000));
Console.WriteLine("start wait. work duration: 1000");
watch.Start();
task.Wait();
watch.Stop();
Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
#endregion
#region 場景2:等待多個任務完成
Task[] tasks = new Task[3]
{
Task.Run(() => DoWorkOfTask(1000)),
Task.Run(() => DoWorkOfTask(2000)),
Task.Run(() => DoWorkOfTask(3000)),
};
Console.WriteLine("start wait all. work duration: min 1000 max 3000.");
watch.Restart();
Task.WaitAll(tasks);
watch.Stop();
Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
#endregion
#region 場景3:等待某個任務完成
tasks = new Task[3]
{
Task.Run(() => DoWorkOfTask(1000)),
Task.Run(() => DoWorkOfTask(2000)),
Task.Run(() => DoWorkOfTask(3000)),
};
Console.WriteLine("start wait any. work duration: min 1000 max 3000.");
watch.Restart();
Task.WaitAny(tasks);
watch.Stop();
Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
#endregion
}
private static void DoWorkOfTask(int workDuration)
{
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task[{Task.CurrentId}].");
Thread.Sleep(workDuration);
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task[{Task.CurrentId}].");
}
使用Wait
、WaitAll
和WaitAny
方法時,我們可以設定超時時間或者傳入取消Token,以控制等待時間。但這些方法傳回布林值,只能表明是否等待成功;假如我們需要知道所等待的任務傳回值,則可以使用WhenAll
或WhenAny
方法,這兩個方法不能控制等待時間,但會傳回一個完成的任務。如下例:
Task<int>[] tasks = new Task<int>[3]
{
Task<int>.Factory.StartNew(() =>
{
Console.WriteLine($"task #{Task.CurrentId} run");
Thread.Sleep(100);
Console.WriteLine($"task #{Task.CurrentId} done");
return 100;
}),
Task<int>.Factory.StartNew(() =>
{
Console.WriteLine($"task #{Task.CurrentId} run");
Thread.Sleep(500);
Console.WriteLine($"task #{Task.CurrentId} done");
return 1000;
}),
Task<int>.Factory.StartNew(() =>
{
Console.WriteLine($"task #{Task.CurrentId} run");
Thread.Sleep(1000);
Console.WriteLine($"task #{Task.CurrentId} done");
return 10000;
}),
};
Task<int> task = Task.WhenAny(tasks).Result;
Console.WriteLine($"task #{task.Id}. result {task.Result}");
Task.WhenAll
和Task.WhenAny
在等待結束時,都會建立一個完成狀態的任務,WhenAll
將等待的所有已完成任務的結果放入建立任務的結果中,WhenAny
則將等待的已完成任務放到建立任務的結果中。
任務延續
有時候,我們需要在一個任務完成時開始另一個任務。對於這種需求,我們可以使用Task
的ContinueWith
等方法來處理。
Task task = Task.Run(() => DoWorkOfTask(3000));
task.ContinueWith(t => DoWorkOfTask(1000));
執行結果:
2019/3/27 21:25:09=> Thread[10] started task[1].
2019/3/27 21:25:12=> Thread[10] completed task[1].
2019/3/27 21:25:12=> Thread[11] started task[2].
2019/3/27 21:25:13=> Thread[11] completed task[2].
我們還可以透過TaskContinuationOptions
指定延續任務的執行條件,如任務取消時或者任務出現異常時才執行,等。
子任務的使用
有時候,我們要在一個任務裡面建立一些其他任務,並且還要在任務裡面等待建立的任務完成,此時我們可以使用子任務。
Task parent = Task.Factory.StartNew(() =>
{
Console.WriteLine($"parent task #{Task.CurrentId} run.");
for (int i = 0; i < 10; i++)
{
Task.Factory.StartNew(() =>
{
Console.WriteLine($"child task #{Task.CurrentId} run.");
Thread.Sleep(1000);
Console.WriteLine($"child task #{Task.CurrentId} done.");
}, TaskCreationOptions.AttachedToParent);
}
});
parent.Wait();
Console.WriteLine($"parent task #{parent.Id} done.");
在一個任務中建立的新任務,預設情況下與父級任務是分離的,各自的執行不受影響,除非在建立任務時顯式附加到父級任務中。例如,上例中如果不指定TaskCreationOptions.AttachedToParent
,parent.Wait()
就不會持續到所有子任務都執行完成。
任務的取消
我們在啟動任務時,傳入取消令牌CancellationToken
,當收到取消請求時,丟擲取消異常併在等待任務完成時捕獲異常TaskCanceledException
。我們透過這種方式控制任務的取消。
public static void TaskCancle()
{
Console.WriteLine("Press any key to begin. Press 'c' to cancel. ");
Console.ReadKey(true);
Console.WriteLine();
CancellationTokenSource tokenSource = new CancellationTokenSource();
ConcurrentBag tasks = new ConcurrentBag();
Task task1 = Task.Factory.StartNew(() => DoWorkOfTask(5000, tokenSource.Token), tokenSource.Token);
tasks.Add(task1);
Task task2 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i++)
{
int duration = 1000 * i;
tasks.Add(Task.Factory.StartNew(()=>DoWorkOfTask(duration, tokenSource.Token), tokenSource.Token));
}
DoWorkOfTask(5000,tokenSource.Token);
}, tokenSource.Token);
tasks.Add(task2);
char ch = Console.ReadKey().KeyChar;
if (ch == 'c' || ch == 'C')
{
tokenSource.Cancel();
Console.WriteLine($"{DateTime.Now}=> Task cancellation requested.");
}
try
{
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException ae)
{
foreach (Exception ex in ae.InnerExceptions)
{
TaskCanceledException tce = ex as TaskCanceledException;
string cancelledTask = tce == null ? string.Empty : $"Task #{tce.Task.Id}";
Console.WriteLine($"Exception: {ex.GetType().Name}. {cancelledTask}");
}
}
finally
{
tokenSource.Dispose();
}
Console.WriteLine();
foreach (Task task in tasks)
{
Console.WriteLine($"Task: #{task.Id} now is {task.Status}");
}
}
private static void DoWorkOfTask(int workDuration, CancellationToken cancleToken)
{
if (cancleToken.IsCancellationRequested)
{
Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled before it got started.");
cancleToken.ThrowIfCancellationRequested();
}
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task #{Task.CurrentId}.");
Thread.Sleep(workDuration);
if (cancleToken.IsCancellationRequested)
{
Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled.");
cancleToken.ThrowIfCancellationRequested();
}
Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task #{Task.CurrentId}.");
}
任務的異常處理
上面提到透過取消令牌丟擲TaskCanceledException
的方式控制任務的取消,實際上,Task會把自身執行過程中的所有異常都包裝到一個AggregateException
中,並傳回呼用執行緒。我們在主執行緒中透過捕獲AggregateException
來進行異常處理。
簡單的處理方式
我們可以在任務的呼叫執行緒捕獲並遍歷AggregateException
的內部異常,或者使用AggregateException
提供的Handle方法進行處理,如下:
Task task = Task.Run(() =>
{
throw new Exception($"Task #{Task.CurrentId} thrown an exception");
});
try
{
task.Wait();
}
catch (AggregateException ae)
{
foreach (Exception ex in ae.InnerExceptions)
{
Console.WriteLine($"foreach: {ex.Message}");
}
ae.Handle(ex=>
{
Console.WriteLine($"handle: {ex.Message}");
return true ;
});
}
使用延續任務處理任務的異常
有時候,我們可以給任務附加一個任務異常時才會執行的延續任務,併在延續任務中進行異常處理。
Task.Run(() => { throw new Exception($"Task #{Task.CurrentId} thrown an exception"); })
.ContinueWith(t =>
{
Console.WriteLine($"{t.Exception?.InnerException?.Message}");
}, TaskContinuationOptions.OnlyOnFaulted);
巢狀任務的異常處理
下麵是一個3層巢狀的任務。
Task parent = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 10; i++)
{
Task.Factory.StartNew(() =>
{
for (int j = 0; j < 10; j++)
{
Task.Factory.StartNew(() =>
{
throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
});
}
throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
});
}
throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
});
try
{
parent.Wait();
}
catch (AggregateException ae)
{
ae.Flatten().Handle(ex =>
{
Console.WriteLine(ex.Message);
return true;
});
}
執行上面的程式碼只會得到一行輸出:
Task #1 thrown an exception.
看起來有點奇怪,為什麼只捕獲到一個異常呢?其實也是在情理之中的:任務預設只會把自身異常傳遞到它自己的呼叫執行緒,子任務是在父任務中呼叫的,其異常只會傳遞到父任務的執行執行緒,所以我們在父任務的呼叫執行緒,也就是我們的主執行緒中是捕獲不到子任務的異常的。
取消上面程式碼的兩處/*, TaskCreationOptions.AttachedToParent*/
,就會捕獲到所有異常。
任務排程器
.NET提供的任務排程器
任務是由TaskScheduler
排程的,啟動任務時,預設使用執行緒池任務排程器,任務將會被派發到執行緒池工作執行緒。執行緒池的排程前面已經總結過,這裡不再展開。.NET提供的另一種任務排程器是同步背景關係排程器,用TaskScheduler.FromCurrentSynchronizationContext()
獲取,這個排程器會把任務派發給當前的背景關係執行緒,常用在GUI應用程式中。
例如,我們在一個窗體中新建一個ListBox,新建幾個任務向其中新增項,程式碼如下:
this.lbxMsg.Items.Add($"{DateTime.Now:O}=>Current thread is thread #{Thread.CurrentThread.ManagedThreadId} .");
for (int i = 0; i < 10; i++)
{
new Task(() =>
{
for (int j = 0; j < 3; j++)
{
this.lbxMsg.Items.Add($"{DateTime.Now:O}=> Task #{Task.CurrentId} add an item with thread #{Thread.CurrentThread.ManagedThreadId}.");
}
}).Start(TaskScheduler.FromCurrentSynchronizationContext());
}
執行上面的程式碼可以發現建立的任務都是由介面執行緒執行的。這裡如果使用預設的任務排程器將產生“執行緒間操作無效”的異常。
實際使用時,可以給一個非同步任務新增延續任務,來處理非同步任務的結果或者異常等。如下:
Task.Run(() =>
{
Thread.Sleep(3000);
return 1000;
}).ContinueWith(t =>
{
this.lbxMsg.Items.Add(t.Result);
}, TaskScheduler.FromCurrentSynchronizationContext());
自定義任務排程器
除了使用.NET提供的排程器外,我們能夠繼承類TaskScheduler
來實現自己的任務排程器。這裡不再展開,需要瞭解的可以參考Samples for Parallel Programming with the .NET Framework。
朋友會在“發現-看一看”看到你“在看”的內容