1. 引言
An API for asynchronous programming with observable streams.
ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.ReactiveX 使用可觀察資料流進行非同步程式設計的API。
ReactiveX結合了觀察者樣式、迭代器樣式和函式式程式設計的精華。
關於Reactive(本文統一譯作響應式),有一個The Reactive Manifesto【響應式宣言】:響應式系統(Reactive System)具備以下特質:即時響應性(Responsive)、回彈性(Resilient)、彈性(Elastic)以及訊息驅動(Message Driven)。
很顯然開發一個響應式系統,並不簡單。
那本文就來講一講如何基於Rx.NET進行響應式程式設計,進而開發更加靈活、松耦合、可伸縮的響應式系統。
2. 程式設計正規化
在開始之前呢,我們有必要瞭解下幾種程式設計正規化:指令式程式設計、宣告式程式設計、函式式程式設計和響應式程式設計。
指令式程式設計:指令式程式設計的主要思想是關註計算機執行的步驟,即一步一步告訴計算機先做什麼再做什麼。
//1. 宣告變數
List results = new List();
//2. 迴圈變數
foreach(var num in Enumerable.Range(1,10))
{
//3. 新增條件
if (num > 5)
{
//4. 新增處理邏輯
results.Add(num);
Console.WriteLine(num);
}
}
宣告式程式設計:宣告式程式設計是以資料結構的形式來表達程式執行的邏輯。它的主要思想是告訴計算機應該做什麼,但不指定具體要怎麼做。
var nums = from num in Enumerable.Range(1,10) where num > 5 select num
函式式程式設計:主要思想是把運算過程儘量寫成一系列巢狀的函式呼叫。
Enumerable.Range(1, 10).Where(num => num > 5).ToList().ForEach(Console.WriteLine);
響應式程式設計:響應式程式設計是一種面向資料流和變化傳播的程式設計正規化,旨在簡化事件驅動應用的實現。響應式程式設計專註於如何建立依賴於變更的資料流並對變化做出響應。
IObservable nums = Enumerable.Range(1, 10).ToObservable();
IDisposable subscription = nums.Where(num => num > 5).Subscribe(Console.WriteLine);
subscription.Dispose();
3. Hello Rx.NET
從一個簡單的Demo開始。
假設我們現在模擬電熱壺燒水,實時輸出當前水溫,一般我們會這樣做:
Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine);
// do something else. 阻塞
假設當前程式是智慧家居的中控裝置,不僅控制電熱壺燒水,還控制其他裝置,為了避免阻塞主執行緒。一般我們會建立一個Thread或Task去做。
Task.Run(() => Enumerable.Range(1, 100).ToList().ForEach(Console.WriteLine));
// do something else. 非阻塞
假設現在我們不僅要在控制檯輸出而且還要實時透過揚聲器報警。這時我們應該想到委託和事件。
class Heater
{
private delegate void TemperatureChanged(int temperature);
private event TemperatureChanged TemperatureChangedEvent;
public void BoilWater()
{
TemperatureChangedEvent += ShowTemperature;
TemperatureChangedEvent += MakeAlerm;
Task.Run(
() =>
Enumerable.Range(1, 100).ToList().ForEach((temperature) => TemperatureChangedEvent(temperature))
);
}
private void ShowTemperature(int temperature)
{
Console.WriteLine($"當前溫度:{temperature}");
}
private void MakeAlerm(int temperature)
{
Console.WriteLine($"嘟嘟嘟,當前水溫{temperature}");
}
}
class Program
{
static void Main(string[] args)
{
Heater heater = new Heater();
heater.BoilWater();
}
}
瞬間程式碼量就上去了。但是藉助Rx.NET,我們可以簡化成以下程式碼:
var observable = Enumerable.Range(1, 100).ToObservable(NewTheadScheduler.Default);//申明可觀察序列
Subject subject = new Subject();//申明Subject
subject.Subscribe((temperature) => Console.WriteLine($"當前溫度:{temperature}"));//訂閱subject
subject.Subscribe((temperature) => Console.WriteLine($"嘟嘟嘟,當前水溫:{temperature}"));//訂閱subject
observable.Subscribe(subject);//訂閱observable
僅僅透過以下三步:
- 呼叫
ToObservable
將列舉序列轉換為可觀察序列。 - 透過指定
NewTheadScheduler.Default
來指定在單獨的執行緒進行列舉。 - 呼叫
Subscribe
方法進行事件註冊。 - 藉助
Subject
進行多播傳輸
透過以上我們可以看到Rx.NET大大簡化了事件處理的步驟,而這隻是Rx的冰山一角。
4. Rx.NET 核心
Reactive Extensions(Rx)是一個為.NET應用提供響應式程式設計模型的庫,用來構建非同步基於事件流的應用,透過安裝 System.Reactive
Nuget包進行取用。Rx將事件流抽象為Observable sequences(可觀察序列)表示非同步資料流,使用LINQ運運算元查詢非同步資料流,並使用 Scheduler
來控制非同步資料流中的併發性。簡單地說:Rx = Observables + LINQ + Schedulers。
在軟體系統中,事件是一種訊息用於指示發生了某些事情。事件由Event Source(事件源)引發並由Event Handler(事件處理程式)使用。
在Rx中,事件源可以由observable表示,事件處理程式可以由observer表示。
但是應用程式使用的資料如何表示呢,例如資料庫中的資料或從Web伺服器獲取的資料。而在應用程式中我們一般處理的資料無外乎兩種:靜態資料和動態資料。 但無論使用何種型別的資料,其都可以作為流來觀察。換句話說,資料流本身也是可觀察的。也就意味著,我們也可以用observable來表示資料流。
講到這裡,Rx.NET的核心也就一目瞭然了:
- 一切皆為資料流
- Observable 是對資料流的抽象
- Observer是對Observable的響應
在Rx中,分別使用 IObservable<T>
和 IObserver<T>
介面來表示可觀察序列和觀察者。它們預置在system名稱空間下,其定義如下:
public interface IObservable<out T>
{
//Notifies the provider that an observer is to receive notifications.
IDisposable Subscribe(IObserver<T> observer);
}
public interface IObserver<in T>
{
//Notifies the observer that the provider has finished sending push-based notifications.
void OnCompleted();
//Notifies the observer that the provider has experienced an error condition.
void OnError(Exception error);
//Provides the observer with new data.
void OnNext(T value);
}
5. 建立IObservable
建立 IObservable<T>
主要有以下幾種方式:
1. 直接實現 IObservable<T>
介面
2. 使用 Observable.Create
建立
Observable.Create(observer=>{
for (int i = 0; i < 5; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
return Disposable.Empty;
})
3. 使用 Observable.Deffer
進行延遲建立(當有觀察者訂閱時才建立)比如要連線資料庫進行查詢,如果沒有觀察者,那麼資料庫連線會一直被佔用,這樣會造成資源浪費。使用Deffer可以解決這個問題。
Observable.Defer(() =>
{
var connection = Connect(user, password);
return connection.ToObservable();
});
4. 使用 Observable.Generate
建立迭代型別的可觀察序列
IObservable observable =
Observable.Generate(
0, //initial state
i => i < 10, //condition (false means terminate)
i => i + 1, //next iteration step
i => i * 2); //the value in each iteration
5. 使用 Observable.Range
建立指定區間的可觀察序列
IObservable observable = Observable.Range (0, 10).Select (i => i * 2);
6. 建立特殊用途的可觀察序列
Observable.Return ("Hello World");//建立單個元素的可觀察序列
Observable.Never ();//建立一個空的永遠不會結束的可觀察序列
Observable.Throw<ApplicationException> (
new ApplicationException ("something bad happened"))//建立一個丟擲指定異常的可觀察序列
Observable.Empty ()//建立一個空的立即結束的可觀察序列
7. 使用 ToObservable
轉換 IEnumerate
和Task型別
Enumerable.Range(1, 10).ToObservable();
IObservable<IEnumerable> resultsA = searchEngineA.SearchAsync(term).ToObservable();
8. 使用 Observable.FromEventPattern<T>
和 Observable.FromEvent<TDelegate,TEventArgs>
進行事件的轉換
public delegate void RoutedEventHandler(object sender,
System.Windows.RoutedEventArgs e)
IObservable<EventPattern<RoutedEventArgs>> clicks =
Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
h => theButton.Click += h,
h => theButton.Click -= h);
clicks.Subscribe(eventPattern => output.Text += "button clicked" + Environment.NewLine);
9. 使用 Observable.Using
進行資源釋放
IObservable lines =
Observable.Using (
() => File.OpenText ("TextFile.txt"), // opens the file and returns the stream we work with
stream =>
Observable.Generate (
stream, //initial state
s => !s.EndOfStream, //we continue until we reach the end of the file
s => s, //the stream is our state, it holds the position in the file
s => s.ReadLine ()) //each iteration will emit the current line (and moves to the next)
);
10. 使用 Observable.Interval
建立指定間隔可觀察序列
11. 使用 Observable.Timer
建立可觀察的計時器
6. RX 運運算元
建立完IObservable後,我們可以對其應用系列Linq運運算元,對其進行查詢、過濾、聚合等等。Rx內建了以下系列運運算元:下麵透過圖示來解釋常用運運算元的作用:
7. 多播傳輸靠:Subject
基於以上示例,我們瞭解到,藉助Rx可以簡化事件模型的實現,而其實質上就是對觀察者樣式的擴充套件。提到觀察者樣式,我們知道一個Subject可以被多個觀察者訂閱,從而完成訊息的多播。同樣,在Rx中,也引入了Subject用於多播訊息傳輸,不過Rx中的Subject具有雙重身份——即是觀察者也是被觀察者。
interface ISubject<in TSource, out TResult> : IObserver<TSource>,IObservable<TResult>
{
}
Rx中預設提供了以下四種實現:
Subject– 向所有觀察者廣播每個通知
AsyncSubject– 當可觀察序列完成後有且僅傳送一個通知
ReplaySubject– 快取指定通知以對後續訂閱的觀察者進行重放
BehaviorSubject– 推送預設值或最新值給觀察者
但對於第一種 Subject<T>
有一點需要指出,當其有多個觀察者序列時,一旦其中一個停止傳送訊息,則Subject就停止廣播所有其他序列後續傳送的任何訊息。
8. 有溫度的可觀察者序列
對於Observable,它們是有溫度的,有冷熱之分。它們的區別如下圖所示:
Cold Observable:有且僅當有觀察者訂閱時才傳送通知,且每個觀察者獨享一份完整的觀察者序列。
Hot Observable:不管有無觀察者訂閱都會傳送通知,且所有觀察者共享同一份觀察者序列。
9. 一切皆在掌控:Scheduler
在Rx中,使用Scheduler來控制併發。而對於Scheduler我們可以理解為程式排程,透過Scheduler來規定在什麼時間什麼地點執行什麼事情。Rx提供了以下幾種Scheduler:
-
NewThreadScheduler:即在新執行緒上執行
-
ThreadPoolScheduler:即在執行緒池中執行
-
TaskPoolScheduler:同ThreadPoolScheduler
-
CurrentThreadScheduler:在當前執行緒執行
-
ImmediateScheduler:在當前執行緒立即執行
-
EventLoopScheduler:建立一個後臺執行緒按序執行所有操作
舉例而言:
Observable.Return("Hello",NewThreadScheduler.Default)
.Subscribe(str=>Console.WriteLine($"{str} on ThreadId:{Thread.CurrentThread.ManagedThreadId}")
);
Console.WriteLine($"Current ThreadId:{Thread.CurrentThread.ManagedThreadId}");
以上輸出:
Current ThreadId:1
Hello on ThreadId:4
10. 最後
羅裡吧嗦的總算把《Rx.NET In Action》這本書的內容大致梳理了一遍,對Rx也有了一個更深的認識,Rx擴充套件了觀察者樣式用於支援資料和事件序列,內建系列運運算元允許我們以宣告式的方式組合這些序列,且無需關註底層的實現進行事件驅動開發:如執行緒、同步、執行緒安全、併發資料結構和非阻塞IO。
但事無巨細,難免疏漏。對響應式程式設計有興趣的不妨拜讀下此書,相信對你會大有裨益。
參考資料:
Rx.NET in Action.pdf
ReactiveX
.Net中的反應式程式設計(Reactive Programming)