https://blog.angularindepth.com/the-extensive-guide-to-creating-streams-in-rxjs-aaa02baaff9a
作者 | Oliver Flaggl
譯者 | 周家未 (BriFuture) ???共計翻譯:25 篇 貢獻時間:794 天
對大多數開發者來說,與 RxJS 的初次接觸是透過庫的形式,就像 Angular。一些函式會傳回流,要使用它們就得把註意力放在運運算元上。
有些時候,混用響應式和非響應式程式碼似乎很有用。然後大家就開始熱衷流的創造。不論是在編寫非同步程式碼或者是資料處理時,流都是一個不錯的方案。
RxJS 提供很多方式來建立流。不管你遇到的是什麼情況,都會有一個完美的建立流的方式。你可能根本用不上它們,但瞭解它們可以節省你的時間,讓你少碼一些程式碼。
我把所有可能的方法,按它們的主要目的,放在四個分類當中:
註意:示例用的是 RxJS 6,可能會以前的版本有所不同。已知的區別是你匯入函式的方式不同了。
RxJS 6
import {of, from} from 'rxjs';
of(...);
from(...);
RxJS < 6
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/from';
Observable.of(...);
Observable.from(...);
//或
import { of } from 'rxjs/observable/of';
import { from } from 'rxjs/observable/from';
of(...);
from(...);
流的圖示中的標記:
|
表示流結束了X
表示流出現錯誤並被終結...
表示流的走向不定流式化已有資料
你有一些資料,想把它們放到流中。有三種方式,並且都允許你把排程器當作最後一個引數傳入(你如果想深入瞭解排程器,可以看看我的 上一篇文章[1])。這些生成的流都是靜態的。
of
如果只有一個或者一些不同的元素,使用 of
:
of(1,2,3)
.subscribe();
// 結果
// 1 2 3 |
from
如果有一個陣列或者 可迭代的物件 ,而且你想要其中的所有元素傳送到流中,使用 from
。你也可以用它來把一個 promise 物件變成可觀測的。
const foo = [1,2,3];
from(foo)
.subscribe();
// 結果
// 1 2 3 |
pairs
流式化一個物件的鍵/值對。用這個物件表示字典時特別有用。
const foo = { a: 1, b: 2};
pairs(foo)
.subscribe();
// 結果
// [a,1] [b,2] |
那麼其他的資料結構呢?
也許你的資料儲存在自定義的結構中,而它又沒有實現 可迭代的物件 介面,又或者說你的結構是遞迴的、樹狀的。也許下麵某種選擇適合這些情況:
generate
函式,遍歷所有資料稍後會講到選項 2 和 3 ,因此這裡的重點是建立一個迭代器。我們可以對一個 可迭代的物件 呼叫 from
建立一個流。 可迭代的物件 是一個物件,可以產生一個迭代器(如果你對細節感興趣,參考 這篇 mdn 文章[2])。
建立一個迭代器的簡單方式是 生成函式[3]。當你呼叫一個生成函式時,它傳回一個物件,該物件同時遵循 可迭代的物件 介面和 迭代器 介面。
// 自定義的資料結構
class List {
add(element) ...
get(index) ...
get size() ...
...
}
function* listIterator(list) {
for (let i = 0; i<list.size; i++) {
yield list.get(i);
}
}
const myList = new List();
myList.add(1);
myList.add(3);
from(listIterator(myList))
.subscribe(console.log);
// 結果
// 1 3 |
呼叫 listIterator
函式時,傳回值是一個 可迭代的物件 / 迭代器 。函式裡面的程式碼在呼叫 subscribe
前不會執行。
生成資料
你知道要傳送哪些資料,但想(或者必須)動態生成它。所有函式的最後一個引數都可以用來接收一個排程器。他們產生靜態的流。
範圍(range
)
從初始值開始,傳送一系列數字,直到完成了指定次數的迭代。
range(10, 2) // 從 10 開始,傳送兩個值
.subscribe();
// 結果
// 10 11 |
間隔(interval
) / 定時器(timer
)
有點像範圍,但定時器是週期性的傳送累加的數字(就是說,不是立即傳送)。兩者的區別在於在於定時器允許你為第一個元素設定一個延遲。也可以只產生一個值,只要不指定週期。
interval(1000) // 每 1000ms = 1 秒 傳送資料
.subscribe()
// 結果
// 0 1 2 3 4 ...
delay(5000, 1000) // 和上面相同,在開始前先等待 5000ms
delay(5000)
.subscribe(i => console.log("foo");
// 5 秒後列印 foo
大多數定時器將會用來週期性的處理資料:
interval(10000).pipe(
flatMap(i => fetch("https://server/stockTicker")
).subscribe(updateChart)
這段程式碼每 10 秒獲取一次資料,更新螢幕。
生成(generate
)
這是個更加複雜的函式,允許你傳送一系列任意型別的物件。它有一些多載,這裡你看到的是最有意思的部分:
generate(
0, // 從這個值開始
x => x < 10, // 條件:只要值小於 10,就一直傳送
x => x*2 // 迭代:前一個值加倍
).subscribe();
// 結果
// 1 2 4 8 |
你也可以用它來迭代值,如果一個結構沒有實現 可迭代的物件 介面。我們用前面的串列例子來進行演示:
const myList = new List();
myList.add(1);
myList.add(3);
generate(
0, // 從這個值開始
i => i < list.size, // 條件:傳送資料,直到遍歷完整個串列
i => ++i, // 迭代:獲取下一個索引
i => list.get(i) // 選擇器:從串列中取值
).subscribe();
// 結果
// 1 3 |
如你所見,我添加了另一個引數:選擇器。它和 map
運運算元作用類似,將生成的值轉換為更有用的東西。
空的流
有時候你要傳遞或傳回一個不用傳送任何資料的流。有三個函式分別用於不同的情況。你可以給這三個函式傳遞排程器。empty
和 throwError
接收一個排程器引數。
empty
建立一個空的流,一個值也不傳送。
empty()
.subscribe();
// 結果
// |
never
建立一個永遠不會結束的流,仍然不傳送值。
never()
.subscribe();
// 結果
// ...
throwError
建立一個流,流出現錯誤,不傳送資料。
throwError('error')
.subscribe();
// 結果
// X
掛鉤已有的 API
不是所有的庫和所有你之前寫的程式碼使用或者支援流。幸運的是 RxJS 提供函式用來橋接非響應式和響應式程式碼。這一節僅僅討論 RxJS 為橋接程式碼提供的模版。
你可能還對這篇出自 Ben Lesh[4] 的 全面的文章[5] 感興趣,這篇文章講了幾乎所有能與 promises 互動操作的方式。
from
我們已經用過它,把它列在這裡是因為,它可以封裝一個含有 observable 物件的 promise 物件。
from(new Promise(resolve => resolve(1)))
.subscribe();
// 結果
// 1 |
fromEvent
fromEvent 為 DOM 元素新增一個事件監聽器,我確定你知道這個。但你可能不知道的是,也可以透過其它型別來新增事件監聽器,例如,一個 jQuery 物件。
const element = $('#fooButton'); // 從 DOM 元素中建立一個 jQuery 物件
from(element, 'click')
.subscribe();
// 結果
// clickEvent ...
fromEventPattern
要理解為什麼有 fromEvent 了還需要 fromEventPattern,我們得先理解 fromEvent 是如何工作的。看這段程式碼:
from(document, 'click')
.subscribe();
這告訴 RxJS 我們想要監聽 document 中的點選事件。在提交過程中,RxJS 發現 document 是一個 EventTarget 型別,因此它可以呼叫它的 addEventListener
方法。如果我們傳入的是一個 jQuery 物件而非 document,那麼 RxJs 知道它得呼叫 on 方法。
這個例子用的是 fromEventPattern ,和 fromEvent 的工作基本上一樣:
function addClickHandler(handler) {
document.addEventListener('click', handler);
}
function removeClickHandler(handler) {
document.removeEventListener('click', handler);
}
fromEventPattern(
addClickHandler,
removeClickHandler,
)
.subscribe(console.log);
// 等效於
fromEvent(document, 'click')
RxJS 自動建立實際的監聽器( handler )你的工作是新增或者移除監聽器。fromEventPattern的目的基本上是告訴 RxJS 如何註冊和移除事件監聽器。
現在想象一下你使用了一個庫,你可以呼叫一個叫做 registerListener 的方法。我們不能再用 fromEvent,因為它並不知道該怎麼處理這個物件。
const listeners = [];
class Foo {
registerListener(listener) {
listeners.push(listener);
}
emit(value) {
listeners.forEach(listener => listener(value));
}
}
const foo = new Foo();
fromEventPattern(listener => foo.registerListener(listener))
.subscribe();
foo.emit(1);
// 結果
// 1 ...
當我們呼叫 foo.emit(1)
時,RxJS 中的監聽器將被呼叫,然後它就能把值傳送到流中。
你也可以用它來監聽多個事件型別,或者結合所有可以透過回呼進行通訊的 API,例如,WebWorker API:
const myWorker = new Worker('worker.js');
fromEventPattern(
handler => { myWorker.onmessage = handler },
handler => { myWorker.onmessage = undefined }
)
.subscribe();
// 結果
// workerMessage ...
bindCallback
它和 fromEventPattern 相似,但它能用於單個值。就在回呼函式被呼叫時,流就結束了。用法當然也不一樣 —— 你可以用 bindCallBack 封裝函式,然後它就會在呼叫時魔術般的傳回一個流:
function foo(value, callback) {
callback(value);
}
// 沒有流
foo(1, console.log); //prints 1 in the console
// 有流
const reactiveFoo = bindCallback(foo);
// 當我們呼叫 reactiveFoo 時,它傳回一個 observable 物件
reactiveFoo(1)
.subscribe(console.log); // 在控制檯列印 1
// 結果
// 1 |
websocket
是的,你完全可以建立一個 websocket 連線然後把它暴露給流:
import { webSocket } from 'rxjs/webSocket';
let socket$ = webSocket('ws://localhost:8081');
// 接收訊息
socket$.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete') * );
// 傳送訊息
socket$.next(JSON.stringify({ op: 'hello' }));
把 websocket 功能新增到你的應用中真的很簡單。websocket 建立一個 subject。這意味著你可以訂閱它,透過呼叫 next
來獲得訊息和傳送訊息。
ajax
如你所知:類似於 websocket,提供 AJAX 查詢的功能。你可能用了一個帶有 AJAX 功能的庫或者框架。或者你沒有用,那麼我建議使用 fetch(或者必要的話用 polyfill),把傳回的 promise 封裝到一個 observable 物件中(參考稍後會講到的 defer
函式)。
定製流
有時候已有的函式用起來並不是足夠靈活。或者你需要對訂閱有更強的控制。
主題(Subject
)
Subject
是一個特殊的物件,它使得你的能夠把資料傳送到流中,並且能夠控制資料。Subject
本身就是一個可觀察物件,但如果你想要把流暴露給其它程式碼,建議你使用 asObservable
方法。這樣你就不能意外呼叫原始方法。
const subject = new Subject();
const observable = subject.asObservable();
observable.subscribe();
subject.next(1);
subject.next(2);
subject.complete();
// 結果
// 1 2 |
註意在訂閱前傳送的值將會“丟失”:
const subject = new Subject();
const observable = subject.asObservable();
subject.next(1);
observable.subscribe(console.log);
subject.next(2);
subject.complete();
// 結果
// 2
除了常規的 Subject
,RxJS 還提供了三種特殊的版本。
AsyncSubject
在結束後只傳送最後的一個值。
const subject = new AsyncSubject();
const observable = subject.asObservable();
observable.subscribe(console.log);
subject.next(1);
subject.next(2);
subject.complete();
// 輸出
// 2
BehaviorSubject
使得你能夠提供一個(預設的)值,如果當前沒有其它值傳送的話,這個值會被髮送給每個訂閱者。否則訂閱者收到最後一個傳送的值。
const subject = new BehaviorSubject(1);
const observable = subject.asObservable();
const subscription1 = observable.subscribe(console.log);
subject.next(2);
subscription1.unsubscribe();
// 輸出
// 1
// 2
const subscription2 = observable.subscribe(console.log);
// 輸出
// 2
ReplaySubject
儲存一定數量、或一定時間或所有的傳送過的值。所有新的訂閱者將會獲得所有儲存了的值。
const subject = new ReplaySubject();
const observable = subject.asObservable();
subject.next(1);
observable.subscribe(console.log);
subject.next(2);
subject.complete();
// 輸出
// 1
// 2
你可以在 ReactiveX 檔案[6](它提供了一些其它的連線) 裡面找到更多關於 Subject
的資訊。Ben Lesh[4] 在 On The Subject Of Subjects[7] 上面提供了一些關於 Subject
的理解,Nicholas Jamieson[8] 在 in RxJS: Understanding Subjects[9] 上也提供了一些理解。
可觀察物件
你可以簡單地用 new 運運算元建立一個可觀察物件。透過你傳入的函式,你可以控制流,只要有人訂閱了或者它接收到一個可以當成 Subject
使用的觀察者,這個函式就會被呼叫,比如,呼叫 next
、complet
和 error
。
讓我們回顧一下串列示例:
const myList = new List();
myList.add(1);
myList.add(3);
new Observable(observer => {
for (let i = 0; i<list.size; i++) {
observer.next(list.get(i));
}
observer.complete();
})
.subscribe();
// 結果
// 1 3 |
這個函式可以傳回一個 unsubcribe
函式,當有訂閱者取消訂閱時這個函式就會被呼叫。你可以用它來清楚或者執行一些收尾操作。
new Observable(observer => {
// 流式化
return () => {
//clean up
};
})
.subscribe();
繼承可觀察物件
在有可用的運運算元前,這是一種實現自定義運運算元的方式。RxJS 在內部擴充套件了 可觀察物件 。Subject
就是一個例子,另一個是 publisher
運運算元。它傳回一個 ConnectableObservable
物件,該物件提供額外的方法 connect
。
實現 Subscribable
介面
有時候你已經用一個物件來儲存狀態,並且能夠傳送值。如果你實現了 Subscribable
介面,你可以把它轉換成一個可觀察物件。Subscribable
介面中只有一個 subscribe
方法。
interface Subscribable<T> { subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Unsubscribable}
結合和選擇現有的流
知道怎麼建立一個獨立的流還不夠。有時候你有好幾個流但其實只需要一個。有些函式也可作為運運算元,所以我不打算在這裡深入展開。推薦看看 Max NgWizard K[10] 所寫的一篇 文章[11],它還包含一些有趣的動畫。
還有一個建議:你可以透過拖拽元素的方式互動式的使用結合操作,參考 RxMarbles[12]。
ObservableInput 型別
期望接收流的運運算元和函式通常不單獨和可觀察物件一起工作。相反,它們實際上期望的引數型別是 ObservableInput,定義如下:
type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;
這意味著你可以傳遞一個 promises 或者陣列卻不需要事先把他們轉換成可觀察物件。
defer
主要的目的是把一個 observable 物件的建立延遲(defer
)到有人想要訂閱的時間。在以下情況,這很有用:
最後一點包含了一個並不起眼的用例:Promises(defer
也可以傳回一個 promise 物件)。看看這個用到了 fetch API 的例子:
function getUser(id) {
console.log("fetching data");
return fetch(`https://server/user/${id}`);
}
const userPromise = getUser(1);
console.log("I don't want that request now");
// 其它地方
userPromise.then(response => console.log("done");
// 輸出
// fetching data
// I don't want that request now
// done
只要流在你訂閱的時候執行了,promise 就會立即執行。我們呼叫 getUser
的瞬間,就發送了一個請求,哪怕我們這個時候不想傳送請求。當然,我們可以使用 from
來把一個 promise 物件轉換成可觀察物件,但我們傳遞的 promise 物件已經建立或執行了。defer
讓我們能夠等到訂閱才傳送這個請求:
const user$ = defer(() => getUser(1));
console.log("I don't want that request now");
// 其它地方
user$.subscribe(response => console.log("done");
// 輸出
// I don't want that request now
// fetching data
// done
iif
iif
包含了一個關於 defer
的特殊用例:在訂閱時選擇兩個流中的一個:
iif(
() => new Date().getHours() < 12,
of("AM"),
of("PM")
)
.subscribe();
// 結果
// AM before noon, PM afterwards
取用該檔案:
實際上 iif[13] 能夠輕鬆地用 defer[14] 實現,它僅僅是出於方便和可讀性的目的。
onErrorResumeNext
開啟第一個流並且在失敗的時候繼續進行下一個流。錯誤被忽略掉。
const stream1$ = of(1, 2).pipe(
tap(i => { if(i>1) throw 'error'}) //fail after first element
);
const stream2$ = of(3,4);
onErrorResumeNext(stream1$, stream2$)
.subscribe(console.log);
// 結果
// 1 3 4 |
如果你有多個 web 服務,這就很有用了。萬一主伺服器開啟失敗,那麼備份的服務就能自動呼叫。
forkJoin
它讓流並行執行,當流結束時傳送存在陣列中的最後的值。由於每個流只有最後一個值被髮送,它一般用在只傳送一個元素的流的情況,就像 HTTP 請求。你讓請求並行執行,在所有流收到響應時執行某些任務。
function handleResponses([user, account]) {
// 執行某些任務
}
forkJoin(
fetch("https://server/user/1"),
fetch("https://server/account/1")
)
.subscribe(handleResponses);
merge / concat
傳送每一個從可觀察物件源中發出的值。
merge
接收一個引數,讓你定義有多少流能被同時訂閱。預設是無限制的。設為 1 就意味著監聽一個源流,在它結束的時候訂閱下一個。由於這是一個常見的場景,RxJS 為你提供了一個顯示的函式:concat
。
merge(
interval(1000).pipe(mapTo("Stream 1"), take(2)),
interval(1200).pipe(mapTo("Stream 2"), take(2)),
timer(0, 1000).pipe(mapTo("Stream 3"), take(2)),
2 //two concurrent streams
)
.subscribe();
// 只訂閱流 1 和流 2
// 輸出
// Stream 1 -> after 1000ms
// Stream 2 -> after 1200ms
// Stream 1 -> after 2000ms
// 流 1 結束後,開始訂閱流 3
// 輸出
// Stream 3 -> after 0 ms
// Stream 2 -> after 400 ms (2400ms from beginning)
// Stream 3 -> after 1000ms
merge(
interval(1000).pipe(mapTo("Stream 1"), take(2)),
interval(1200).pipe(mapTo("Stream 2"), take(2))
1
)
// 等效於
concat(
interval(1000).pipe(mapTo("Stream 1"), take(2)),
interval(1200).pipe(mapTo("Stream 2"), take(2))
)
// 輸出
// Stream 1 -> after 1000ms
// Stream 1 -> after 2000ms
// Stream 2 -> after 3200ms
// Stream 2 -> after 4400ms
zip / combineLatest
merge
和 concat
一個接一個的傳送所有從源流中讀到的值,而 zip
和 combineLatest
是把每個流中的一個值結合起來一起傳送。zip
結合所有源流中傳送的第一個值。如果流的內容相關聯,那麼這就很有用。
zip(
interval(1000),
interval(1200),
)
.subscribe();
// 結果
// [0, 0] [1, 1] [2, 2] ...
combineLatest
與之類似,但結合的是源流中傳送的最後一個值。直到所有源流至少傳送一個值之後才會觸發事件。這之後每次源流傳送一個值,它都會把這個值與其他流傳送的最後一個值結合起來。
combineLatest(
interval(1000),
interval(1200),
)
.subscribe();
// 結果
// [0, 0] [1, 0] [1, 1] [2, 1] ...
兩個函式都讓允許傳遞一個選擇器函式,把元素結合成其它物件而不是陣列:
zip(
interval(1000),
interval(1200),
(e1, e2) -> e1 + e2
)
.subscribe();
// 結果
// 0 2 4 6 ...
race
選擇第一個傳送資料的流。產生的流基本是最快的。
race(
interval(1000),
of("foo")
)
.subscribe();
// 結果
// foo |
由於 of
立即產生一個值,因此它是最快的流,然而這個流就被選中了。
總結
已經有很多建立可觀察物件的方式了。如果你想要創造響應式的 API 或者想用響應式的 API 結合傳統 API,那麼瞭解這些方法很重要。
我已經向你展示了所有可用的方法,但它們其實還有很多內容可以講。如果你想更加深入地瞭解,我極力推薦你查閱 檔案[15] 或者閱讀相關文章。
RxViz[16] 是另一種值得瞭解的有意思的方式。你編寫 RxJS 程式碼,產生的流可以用圖形或動畫進行顯示。
via: https://blog.angularindepth.com/the-extensive-guide-to-creating-streams-in-rxjs-aaa02baaff9a
作者:Oliver Flaggl[18] 譯者:BriFuture 校對:wxy
本文由 LCTT 原創編譯,Linux中國 榮譽推出