歡迎光臨
每天分享高質量文章

是什麼讓我們閱讀RxJava原始碼如此艱難?

作者:遺失的美好yxj2

連結:https://juejin.im/post/5cce6fb05188254177317fdc

這是一篇需要用心去感受的文章,快讀的閱讀可能效果不好喲。

概述

你是不是看過了很多分析Rxjava原始碼的文章,但依舊無法在心中勾勒出Rxjava原理的樣貌。

是什麼讓我們閱讀Rxjava原始碼變得如此艱難?

是Rxjava的程式碼封裝,以及各種細節問題的解決。

本文我把Rxjava的各種封裝、抽象統統剝去,只專註於基本的事件變換。在理解了事件變換大概是做了件什麼事情時,再去看原始碼,考慮一些其它問題就會更加容易。說明:這是一篇Rxjava原始碼分析的入門文章。旨在讓讀者腦中有個概念Rxjava最主要幹了件什麼事情,幾個常用運運算元的主要原理。今後再去看其它原始碼分析文章或原始碼能夠更容易理解。因此本文先不去考慮Rxjava原始碼中複雜的抽象封裝,執行緒間通訊,onComplete、onError、dispose等方法,僅專註於“onNext”的最基本呼叫方式。

專案原始碼

https://github.com/OliverY/RxjavaYxj

本文目錄:

  • 手寫Rxjava核心程式碼,create,nullMap(核心)運運算元

  • map,observeOn,subscribeOn,flatMap運運算元

  • 響應式程式設計思想的理解

手寫Rxjava核心程式碼,create,nullMap運運算元

手寫Rxjava核心程式碼,create,nullMap運運算元

Create運運算元

 

我們先來看一個最簡單呼叫

MainActivity.java

Observable.create(new Observable() {
    @Override
    public void subscribe(Observer observer) {
        observer.onNext("hello");
        observer.onNext("world");
        observer.onComplete();
    }
}).subscribe(new Observer() {
    @Override
    public void onNext(String s) {
        Log.e("yxj",s);
    }

    @Override
    public void onComplete() {
        Log.e("yxj","onComplete");
    }
});          

 

Observable.java

public abstract class Observable<T{

    public abstract void subscribe(Observer observer);

    public static  Observable create(Observable observable){
        return observable;
    }

}       

Observer.java

public interface Observer<T{

    void onNext(T t);
    void onComplete();
}

 

本篇文章我把Observable稱為“節點”,Observer稱為“處理者”,一是因為我被觀察者、被觀察者、誰訂閱誰給繞暈了,更重要的是我覺得這個名稱比較符合Rxjava的設計思想。

Observable呼叫create方法建立一個自己,重寫subscribe方法說:如果 我有一個處理者Observer,我就把“hello”,“world”交給它處理。

Observable呼叫了subscribe方法,真的找到了Observer。於是兌現承諾,完成整個呼叫邏輯。

這裡是“如果”有處理者,需要subscribe方法被呼叫時,“如果”才成立。Rxjava就是建立在一系列的“如果”(回呼)操作上的。

“nullMap”運運算元(核心)

1、建立一個observable

2、呼叫空map運運算元做變換

3、交給observer處理

 

MainActivity.java

Observable.create(new Observable() {
            @Override
            public void subscribe(Observer observer) {
                observer.onNext("hello");
                observer.onNext("world");
                observer.onComplete();
            }
        })
        .nullMap()
        .subscribe(new Observer() {
            @Override
            public void onNext(String s) {
                Log.e("yxj",s);
            }

            @Override
            public void onComplete() {
                Log.e("yxj","onComplete");
            }
        });

nullMap()等價於 下麵這段程式碼。

即把上個節點的資料不做任何修改的傳遞給下一節點的map操作

 


.map(new Function<StringString>() {
    @Override
    public String apply(String s) throws Exception {
        return s;
    }
})

“nullMap”運運算元在Rxjava原始碼裡並不存在,是我方便大家理解Rxjava執行機制寫出來的。

 因為nullMap操作是一個 base變換操作,map,flatMap,subscribeOn,observeOn運運算元都是在nullMap上修改而來。所以Rxjava的變換的基礎就是nullMap運運算元。

Observable.java
// 這就是Rxjava的變換核心

public Observable nullMap() {

        return new Observable() {
            @Override
            public void subscribe(final Observer observerC) {

                Observer observerB = new Observer() {
                    @Override
                    public void onNext(T t) {
                        observerC.onNext(t);
                    }

                    @Override
                    public void onComplete() {
                        observerC.onComplete();
                    }
                };
                Observable.this.subscribe(observerB);
            }
        };
    }

“nullMap”運運算元做了件什麼事情:

  1. 上一個節點Observable A呼叫nullMap(),在內部new一個新的節點Observable B。

  2. 節點B重寫subscribe方法,說”如果”自己有操作者Observer C,就new一個操作者Observer B,然後讓節點A subscribe 操作者B。

  3. 節點A subscribe 操作者B,讓操作者B執行onNext方法。操作者B的onNext方法內部,呼叫了操作者C的onNext。從而完成了整個呼叫。

請註意2中的”如果“。意味著,當節點B中的subscribe方法沒有被呼叫的時候,2,3步驟都不會執行(他們都是回呼),沒有Observer B,節點A也不會呼叫subscribe方法。 

接下來分兩種情況:

  1. 節點B呼叫了subscribe方法,則執行2,3,完成整個流程。

  2. 節點B呼叫nullMap,從新走一遍1,2,3步驟,相當於節點B把任務交給了下一個節點C。

概況一下就是:

Observable每呼叫一次運運算元,其實就是建立一個新的Observable。新Observable內部透過subscribe方法“逆向的”與上一Observable關聯。在新Observable中的new出來的Observer內的onNext方法中做了和下一個Observer之間的關聯。

 

圖文詳細解說nullMap整體呼叫過程

 

第一階段

 

上圖代表節點還未最終subscribe一個Observer,圖中

 

步驟1節點A呼叫map方法,在內部建立了一個新的節點B

 

這一階段:主要就是節點與節點之間做連線,之間有各種“如果”(回呼)的承諾。節點B這時候對節點A做了個承諾:“如果”我有處理者Observer C,那我就內部new一個 Observer B給你(節點A)用”。節點B中的操作者Observer B內部做了與Observer C的銜接工作。

 

第二階段:逆向subscribe

 

這一階段是subscribe方法被呼叫,傳入了最終的Observer。

 

圖中步驟2、3

 

步驟2. 節點B呼叫subscribe方法,找到處理者Observer C

步驟3. 節點B兌現對節點A的承諾:如果我有處理者Observer C,那我就內部new一個 Observer B給你(節點A)用”,這裡的Observable.this == Observable A。

這一階段:是把原來各個節點的“如果”一一兌現的過程,從最末一個Observable的subscribe方法開始,按節點順序逆向的兌現承諾。每個subscribe方法內部都會新建一個Observer,然後用上一個節點Observable來subcriber這個Observer。這是一個逆序的過程.

 

第三階段:執行業務
 

 
圖中步驟4,5

步驟 4:節點A呼叫subscribe,讓Observer B呼叫onNext方法傳入“hello”,“world”資料

步驟 5:在Observer B的onNext()方法中,通知ObserverC呼叫onNext方法

 

這一階段:是透過各個節點的Observer順序執行具體的業務操作的過程,只有這個階段是與具體業務相關的階段。

大家可以先思考一下,如果是一個普通的map(Function function),這個變換髮生在哪?

答案是:在第三階段中,Observer B的內部的onNext方法中。

 

整個Rxjava就是這9行核心變換程式碼了。如果以上不是特別理解我非常建議你繼續看完剩下的部分,再回過頭來看一遍第一部分。

 

map,observeOn,subscribeOn,flatMap運運算元

接下來讓我們看看這4個運運算元,僅僅是在nullMap中做了小改動而已。

https://github.com/OliverY/RxjavaYxj/blob/master/app/src/main/java/com/yxj/rxjavayxj/rxjava/Observable.java

map運運算元

 

Observable.java

public  Observable map(final Function function) {

        return new Observable() {
            @Override
            public void subscribe(final Observer observer1) {
                Observable.this.subscribe(new Observer() {
                    @Override
                    public void onNext(T t) {
                        R r = function.apply(t); // 僅僅在這裡加了變換操作
                        observer1.onNext(r);
                    }

                    @Override
                    public void onComplete() {
                        observer1.onComplete();
                    }
                });
            }
        };
    }

和“nullMap”相比,僅僅加了一行程式碼function.apply() 方法的呼叫。

 

observeOn運運算元

 

Observable.java

public Observable observeOn() {
        return new Observable() {
            @Override
            public void subscribe(final Observer observer) {
                Observable.this.subscribe(new Observer() {
                    @Override
                    public void onNext(final T t) {
                //模擬切換到主執行緒(通常上個節點是執行在子執行緒的情況)
                        handler.post(new Runnable() {
                            @Override
                            public void run() {
                                observer.onNext(t);
                            }
                        });
                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }
        };
    }

與“nullMap”相比,修改了最內部的onNext方法執行所在的執行緒。Rxjava原始碼會更加靈活,observerOn方法引數讓你可以指定切換到的執行緒,其實就是傳入了一個執行緒排程器,用於指定observer.onNext()方法要在哪個執行緒執行。

原理是一樣的。

我這裡就簡寫,直接寫了切換到主執行緒,這你肯定能看明白。

 

subscribeOn運運算元

 

Observable.java

public Observable subscribeOn() {
        return new Observable() {
            @Override
            public void subscribe(final Observer observer) {

                new Thread() {
                    @Override
                    public void run() {
                    // 這裡簡寫了,沒有new Observer做中轉,github上有完整程式碼
                        Observable.this.subscribe(observer);
                    }
                }.start();
            }
        };
    }

將上一個節點切換到新的執行緒,修改了Observable.this.subscribe()執行的執行緒,Observable.this指的是呼叫subscribeOn()的Observable,即上一個節點。

因此subscribeOn運運算元修改了上一個節點的執行所在的執行緒。

 

flatMap運運算元

 

public  Observable flatMap(final Function> function) {

        return new Observable() {
            @Override
            public void subscribe(final Observer observer) {
                Observable.this.subscribe(new Observer() {
                    @Override
                    public void onNext(T t) {
                        try {
                            Observable observable = function.apply(t);
                            observable.subscribe(observer);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }
        };

    }

flatmap和map極為相似,只不過function.apply()的傳回值是一個Observable。

Observable是一個節點,既可以用來封裝非同步操作,也可以用來封裝同步操作(封裝同步操作 == map運運算元)。所以這樣就可以很方便的寫出一個 耗時1操作 —> 耗時2操作 —> 耗時3操作…的操作

 

到這裡相信大家已經對Rxjava怎樣執行,幾個常見的運運算元內部基本原理有了初步的理解,本文的目的就已經達到了。

 

在之後看Rxjava原始碼或者其它分析文章時,就能少受各種變換的幹擾。

 

接下來就可以思考Rxjava是如何對各個Observable做封裝,執行緒之間如何通訊,onComplete、onError、dispose等方法如何實現了。

響應式程式設計的思想

響應式程式設計是一種面向資料流和變化傳播的程式設計正規化。

直接看這句話其實不太容易理解。讓我們換個說法,實際程式設計中是什麼會幹擾我們,使我們無法專註於資料流和變化傳播呢?

答案是:非同步,它會讓我們的程式碼形成巢狀,不夠順序化。

因為非同步,我們的業務邏輯會寫成回呼巢狀的形式,導致過一段時間看自己程式碼看不懂,語意化不強,不是按著順序一個節點一個節點的往下執行的。

Rxjava將所有的業務操作變成一步一步,每一步不管你是同步、非同步,統統用一個節點包裹起來,節點與節點之間是同步呼叫的關係。如此,整個程式碼的節點都是按順序執行的。

 

限於作者個人水平有限,本文部分表述難免有不對之處,請留言指出,相互交流。

贊(0)

分享創造快樂