作者:遺失的美好yxj2
連結:https://juejin.im/post/5cce6fb05188254177317fdc
這是一篇需要用心去感受的文章,快讀的閱讀可能效果不好喲。
概述
你是不是看過了很多分析Rxjava原始碼的文章,但依舊無法在心中勾勒出Rxjava原理的樣貌。
是什麼讓我們閱讀Rxjava原始碼變得如此艱難?
是Rxjava的程式碼封裝,以及各種細節問題的解決。
本文我把Rxjava的各種封裝、抽象統統剝去,只專註於基本的事件變換。在理解了事件變換大概是做了件什麼事情時,再去看原始碼,考慮一些其它問題就會更加容易。說明:這是一篇Rxjava原始碼分析的入門文章。旨在讓讀者腦中有個概念Rxjava最主要幹了件什麼事情,幾個常用運運算元的主要原理。今後再去看其它原始碼分析文章或原始碼能夠更容易理解。因此本文先不去考慮Rxjava原始碼中複雜的抽象封裝,執行緒間通訊,onComplete、onError、dispose等方法,僅專註於“onNext”的最基本呼叫方式。
專案原始碼
https://github.com/OliverY/RxjavaYxj
本文目錄:
-
手寫Rxjava核心程式碼,create,nullMap(核心)運運算元
-
map,observeOn,subscribeOn,flatMap運運算元
-
響應式程式設計思想的理解
手寫Rxjava核心程式碼,create,nullMap運運算元
手寫Rxjava核心程式碼,create,nullMap運運算元
Create運運算元
我們先來看一個最簡單呼叫
MainActivity.java
Observable.create(new Observable() {
@Override
public void subscribe(Observer observer) {
observer.onNext("hello");
observer.onNext("world");
observer.onComplete();
}
}).subscribe(new Observer() {
@Override
public void onNext(String s) {
Log.e("yxj",s);
}
@Override
public void onComplete() {
Log.e("yxj","onComplete");
}
});
Observable.java
public abstract class Observable<T> {
public abstract void subscribe(Observer observer);
public static Observable create(Observable observable){
return observable;
}
}
Observer.java
public interface Observer<T> {
void onNext(T t);
void onComplete();
}
本篇文章我把Observable稱為“節點”,Observer稱為“處理者”,一是因為我被觀察者、被觀察者、誰訂閱誰給繞暈了,更重要的是我覺得這個名稱比較符合Rxjava的設計思想。
Observable呼叫create方法建立一個自己,重寫subscribe方法說:如果 我有一個處理者Observer,我就把“hello”,“world”交給它處理。
Observable呼叫了subscribe方法,真的找到了Observer。於是兌現承諾,完成整個呼叫邏輯。
這裡是“如果”有處理者,需要subscribe方法被呼叫時,“如果”才成立。Rxjava就是建立在一系列的“如果”(回呼)操作上的。
“nullMap”運運算元(核心)
1、建立一個observable
2、呼叫空map運運算元做變換
3、交給observer處理
MainActivity.java
Observable.create(new Observable() {
@Override
public void subscribe(Observer observer) {
observer.onNext("hello");
observer.onNext("world");
observer.onComplete();
}
})
.nullMap()
.subscribe(new Observer() {
@Override
public void onNext(String s) {
Log.e("yxj",s);
}
@Override
public void onComplete() {
Log.e("yxj","onComplete");
}
});
nullMap()等價於 下麵這段程式碼。
即把上個節點的資料不做任何修改的傳遞給下一節點的map操作
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
})
“nullMap”運運算元在Rxjava原始碼裡並不存在,是我方便大家理解Rxjava執行機制寫出來的。
因為nullMap操作是一個 base變換操作,map,flatMap,subscribeOn,observeOn運運算元都是在nullMap上修改而來。所以Rxjava的變換的基礎就是nullMap運運算元。
Observable.java
// 這就是Rxjava的變換核心
public Observable nullMap() {
return new Observable() {
@Override
public void subscribe(final Observer observerC) {
Observer observerB = new Observer() {
@Override
public void onNext(T t) {
observerC.onNext(t);
}
@Override
public void onComplete() {
observerC.onComplete();
}
};
Observable.this.subscribe(observerB);
}
};
}
“nullMap”運運算元做了件什麼事情:
-
上一個節點Observable A呼叫nullMap(),在內部new一個新的節點Observable B。
-
節點B重寫subscribe方法,說”如果”自己有操作者Observer C,就new一個操作者Observer B,然後讓節點A subscribe 操作者B。
-
節點A subscribe 操作者B,讓操作者B執行onNext方法。操作者B的onNext方法內部,呼叫了操作者C的onNext。從而完成了整個呼叫。
請註意2中的”如果“。意味著,當節點B中的subscribe方法沒有被呼叫的時候,2,3步驟都不會執行(他們都是回呼),沒有Observer B,節點A也不會呼叫subscribe方法。
接下來分兩種情況:
-
節點B呼叫了subscribe方法,則執行2,3,完成整個流程。
-
節點B呼叫nullMap,從新走一遍1,2,3步驟,相當於節點B把任務交給了下一個節點C。
概況一下就是:
Observable每呼叫一次運運算元,其實就是建立一個新的Observable。新Observable內部透過subscribe方法“逆向的”與上一Observable關聯。在新Observable中的new出來的Observer內的onNext方法中做了和下一個Observer之間的關聯。
圖文詳細解說nullMap整體呼叫過程
第一階段
上圖代表節點還未最終subscribe一個Observer,圖中
步驟1:節點A呼叫map方法,在內部建立了一個新的節點B
這一階段:主要就是節點與節點之間做連線,之間有各種“如果”(回呼)的承諾。節點B這時候對節點A做了個承諾:“如果”我有處理者Observer C,那我就內部new一個 Observer B給你(節點A)用”。節點B中的操作者Observer B內部做了與Observer C的銜接工作。
第二階段:逆向subscribe
這一階段是subscribe方法被呼叫,傳入了最終的Observer。
圖中步驟2、3
步驟2. 節點B呼叫subscribe方法,找到處理者Observer C
步驟3. 節點B兌現對節點A的承諾:如果我有處理者Observer C,那我就內部new一個 Observer B給你(節點A)用”,這裡的Observable.this == Observable A。
這一階段:是把原來各個節點的“如果”一一兌現的過程,從最末一個Observable的subscribe方法開始,按節點順序逆向的兌現承諾。每個subscribe方法內部都會新建一個Observer,然後用上一個節點Observable來subcriber這個Observer。這是一個逆序的過程.
第三階段:執行業務
圖中步驟4,5
步驟 4:節點A呼叫subscribe,讓Observer B呼叫onNext方法傳入“hello”,“world”資料
步驟 5:在Observer B的onNext()方法中,通知ObserverC呼叫onNext方法
這一階段:是透過各個節點的Observer順序執行具體的業務操作的過程,只有這個階段是與具體業務相關的階段。
大家可以先思考一下,如果是一個普通的map(Function function),這個變換髮生在哪?
答案是:在第三階段中,Observer B的內部的onNext方法中。
整個Rxjava就是這9行核心變換程式碼了。如果以上不是特別理解我非常建議你繼續看完剩下的部分,再回過頭來看一遍第一部分。
map,observeOn,subscribeOn,flatMap運運算元
接下來讓我們看看這4個運運算元,僅僅是在nullMap中做了小改動而已。
https://github.com/OliverY/RxjavaYxj/blob/master/app/src/main/java/com/yxj/rxjavayxj/rxjava/Observable.java
map運運算元
Observable.java
public Observable map(final Function function) {
return new Observable() {
@Override
public void subscribe(final Observer observer1) {
Observable.this.subscribe(new Observer() {
@Override
public void onNext(T t) {
R r = function.apply(t); // 僅僅在這裡加了變換操作
observer1.onNext(r);
}
@Override
public void onComplete() {
observer1.onComplete();
}
});
}
};
}
和“nullMap”相比,僅僅加了一行程式碼function.apply() 方法的呼叫。
observeOn運運算元
Observable.java
public Observable observeOn() {
return new Observable() {
@Override
public void subscribe(final Observer observer) {
Observable.this.subscribe(new Observer() {
@Override
public void onNext(final T t) {
//模擬切換到主執行緒(通常上個節點是執行在子執行緒的情況)
handler.post(new Runnable() {
@Override
public void run() {
observer.onNext(t);
}
});
}
@Override
public void onComplete() {
}
});
}
};
}
與“nullMap”相比,修改了最內部的onNext方法執行所在的執行緒。Rxjava原始碼會更加靈活,observerOn方法引數讓你可以指定切換到的執行緒,其實就是傳入了一個執行緒排程器,用於指定observer.onNext()方法要在哪個執行緒執行。
原理是一樣的。
我這裡就簡寫,直接寫了切換到主執行緒,這你肯定能看明白。
subscribeOn運運算元
Observable.java
public Observable subscribeOn() {
return new Observable() {
@Override
public void subscribe(final Observer observer) {
new Thread() {
@Override
public void run() {
// 這裡簡寫了,沒有new Observer做中轉,github上有完整程式碼
Observable.this.subscribe(observer);
}
}.start();
}
};
}
將上一個節點切換到新的執行緒,修改了Observable.this.subscribe()執行的執行緒,Observable.this指的是呼叫subscribeOn()的Observable,即上一個節點。
因此subscribeOn運運算元修改了上一個節點的執行所在的執行緒。
flatMap運運算元
public Observable flatMap(final Function> function) {
return new Observable() {
@Override
public void subscribe(final Observer observer) {
Observable.this.subscribe(new Observer() {
@Override
public void onNext(T t) {
try {
Observable observable = function.apply(t);
observable.subscribe(observer);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onComplete() {
}
});
}
};
}
flatmap和map極為相似,只不過function.apply()的傳回值是一個Observable。
Observable是一個節點,既可以用來封裝非同步操作,也可以用來封裝同步操作(封裝同步操作 == map運運算元)。所以這樣就可以很方便的寫出一個 耗時1操作 —> 耗時2操作 —> 耗時3操作…的操作
到這裡相信大家已經對Rxjava怎樣執行,幾個常見的運運算元內部基本原理有了初步的理解,本文的目的就已經達到了。
在之後看Rxjava原始碼或者其它分析文章時,就能少受各種變換的幹擾。
接下來就可以思考Rxjava是如何對各個Observable做封裝,執行緒之間如何通訊,onComplete、onError、dispose等方法如何實現了。
響應式程式設計的思想
響應式程式設計是一種面向資料流和變化傳播的程式設計正規化。
直接看這句話其實不太容易理解。讓我們換個說法,實際程式設計中是什麼會幹擾我們,使我們無法專註於資料流和變化傳播呢?
答案是:非同步,它會讓我們的程式碼形成巢狀,不夠順序化。
因為非同步,我們的業務邏輯會寫成回呼巢狀的形式,導致過一段時間看自己程式碼看不懂,語意化不強,不是按著順序一個節點一個節點的往下執行的。
Rxjava將所有的業務操作變成一步一步,每一步不管你是同步、非同步,統統用一個節點包裹起來,節點與節點之間是同步呼叫的關係。如此,整個程式碼的節點都是按順序執行的。
限於作者個人水平有限,本文部分表述難免有不對之處,請留言指出,相互交流。
朋友會在“發現-看一看”看到你“在看”的內容