RxJava原理及內核分析

前言

RxJava是近兩年來越來越流行的一個異步開發框架,其使用起來十分簡單方便,功能包羅萬象,十分強大。

但當我們想跟進去源碼一窺究竟的時候卻發現,它的實現邏輯比較複雜,註釋又寫得比較簡單,加之其中的類與對象名一直在subscribe,observe,observable等幾個單詞來來回回,讓人眼睛越看越瞎,經常看著看著就被繞進去了。

雖然網上關於RxJava分析的文章有很多,但剛接觸的時候,也跟很多人一樣,看完還是覺得似懂非懂。在這裡,我們將試圖只抽出主幹代碼,以代碼執行順序來描述RxJava,回答這幾個問題。

  • RxJava框架內部的主幹邏輯是怎樣的?
  • RxJava究竟是如何實現一句話線程切換的?
  • 多次調用subscribeOn()或observeOn()切換線程會有什麼效果?

回答了這幾個問題以後,我們大概就能對RxJava的基本原理有初步的認知了。

這裡的代碼抽取自RxJava2,RxJava1雖然在實現上有一定程度的不同,但其核心思想與原理是一樣的。

RxJava的基本角色

一般我們使用RxJava都是編寫類似如下的代碼:

Observable.create(new ObservableOnSubscribe()) //創建一個事件流,參數是我們創建的一個事件源
.map(...)//有時我們會需要使用操作符進行變換
.subscribeOn(Schedulers.io())//指定事件源代碼執行的線程
.observeOn(AndroidSchedulers.mainThread())//指定訂閱者代碼執行的線程
.subscribe(new Observer())//參數是我們創建的一個訂閱者,在這裡與事件流建立訂閱關係

RxJava是一種基於觀察者模式的響應式編程框架,其中的主要角色有:

Observable 是RxJava描述的事件流,在鏈式調用中非常清晰,事件從創建到加工處理再到被訂閱者接收到,就是一個接一個的Observable形成的一個事件流。在上面代碼中,每一步方法的調用,都會返回一個新的Observable給下一步,這個是RxJava源碼的基礎。同樣是鏈式調用,但它與我們常見的Builder模式不太一樣,每個操作符,每次線程切換,每步都會新建一個Observable而非直接加工上一步的Observable返回給下一步。(在源碼中不同的加工會創建不同的Observable,比如map()會創建一個ObservableMap,subscribeOn()會創建一個ObservableSubscribeOn,但它們實際上都是Observable的子類)。

ObservableOnSubscribe 是這個事件流的源頭,下面我們稱之為事件源,一般由我們自己創建並傳入。我們創建時,需要重寫其subscribe()方法,為了和Observable中的subscribe()方法區別,我們將在下面貼出的代碼中將其改名為call()。我們在調用鏈中有時會用到各種操作符進行一些變換,事實上每個操作符都會重寫這麼一個call()方法,相對於我們創建事件源時在這裡寫入的源業務代碼,這些操作符在這裡要做的事是由RxJava欽定的,一般是連接事件流的上下游。在這裡我們將準備好被訂閱的數據,並調用subscribe()參數中ObservableEmitter的onNext(),onCompleted()或onError()通知訂閱者數據準備情況。

Observer 是整個事件流的訂閱者,也就是說,它將會訂閱前面事件創建,加工以後的最終結果。它也是由我們創建的,我們將要重寫它的onNext(),onCompleted(),onError()和onSubscribe(),在接下來的分析中我們將簡化一些,只關注onNext()。我們創建出了Observer以後,將會使用經過上面所有步驟的最後一步生成的Observable,調用它的subscribe(),與事件源產生聯繫。

最簡單情況

我們先來看最簡單的情況,沒有使用操作符,沒有切換線程。我們寫的調用代碼是下面這樣的(以下源碼都是精簡、改寫過後的主幹邏輯代碼),為了區分,我將ObservableOnSubscribe中的subscribe()改叫call(),於是:

Observable.create(new ObservableOnSubscribe<object>() {
@Override
public void call(@NonNull Observer<object> e) throws Exception {
//事件源業務代碼
e.onNext(o);
}
}).subscribe(new Observer<object>() {
@Override
public void onNext(@NonNull Object o) {
//訂閱者業務代碼
}
});
/<object>/<object>/<object>

去繁就簡,訂閱者中我們暫時只關注onNext()。我們在這裡,創建了一個事件源,一個訂閱者,並以訂閱者為參數調用了subscribe()。那麼其中的調用過程是怎樣的呢?我們跟進源碼看看。

先看Observable類的靜態方法create():

public class Observable {
final ObservableOnSubscribe source;
...
public static Observable create(ObservableOnSubscribe source) {
return new Observable(source);
}

可以看到create()使用我們在參數中提供的source創建了一個Observable(實際上創建的是ObservableCreate,但是為了簡化代碼,我們統一視作Observable,下同)。其中ObservableOnSubscribe是個接口,我們創建的時候必須重寫它的call()方法,我們的源業務代碼就寫在這裡面。

然後看Observable類中的subscribe()方法:

public class Observable {
final ObservableOnSubscribe source;

protected void subscribe(Observer super T> observer) {
source.call(observer);
}

其中參數中的observer就是我們創建傳入的訂閱者。可以看到,subscribe()中回調了我們重寫的call(),並把我們創建的訂閱者以call()參數的形式交給了事件源。這樣在call()中,我們就可以在我們的事件源業務代碼中適時調用onNext(o),這樣,就將事件源和訂閱者聯繫了起來。也就是說,調用subscribe()方法實際上建立了我們創建的事件源與訂閱者的訂閱關係。如果畫圖來描述代碼執行的流程的話,就是:

RxJava原理及內核分析

RxJava代碼執行流程(最簡).png

上面這張是代碼執行順序圖,在這張圖中我們發現,所有的業務代碼都是在subscribe()調用以後才會執行的。也就是說,在我們使用RxJava的時候,不管前面調用了create(),map()或是observeOn()等等有多少東西,只要沒有調用subscribe(),所有的業務代碼,包括事件源的代碼,都不會執行。

關注我私信回覆【學習】獲取高級UI、性能優化、架構師課程、Rxjava、NDK、Kotlin進階、混合式開發(ReactNative+Weex)全方面的Android進階實踐技術分享。

流程中加上一個操作符

有時我們會使用操作符對事件源產生的數據進行變化,這裡以比較常見的map()為例,我們來看一下作為一個鏈中的中間過程,RxJava是怎樣做到承上啟下的。

map()的作用是將傳遞的數據類型進行轉換,比如調用map()的上游是一個Observable,經過轉換以後,會將一個Observable返回給調用鏈的下游,代碼一般如下:

Observable.create(new ObservableOnSubscribe<object>() {
@Override
public void call(@NonNull Observer<object> e) throws Exception {
//源業務代碼
}
}).map(new Function<object>() {
@Override
public String apply(@NonNull Object o) throws Exception {
//轉換業務代碼
return "Obj";
}
}).subscribe(new Observer<string>() {
@Override
public void onNext(@NonNull String s) {
//訂閱業務代碼
}
});
/<string>/<object>/<object>/<object>

我們在這段代碼中先是創建了一個事件源,然後中間經過了一次map()的轉換,使創建出的Observable<object>轉換成為了Observable<string>傳遞到下游,最後進行訂閱。那麼在這個過程中,RxJava是如何連接起這個事件流的,內部又進行了什麼處理使得我們的業務代碼最終能按順序執行呢?我們將map()的源碼精簡一下:/<string>/<object>

 public final  Observable map(Function super T, ? extends R> mapper) {
//第一層
return create(new ObservableOnSubscribe() { //新建了一個事件源
@Override
public void call(@NonNull Observer<object> e) {
//第二層
subscribe(new Observer() { //新建了一個訂閱者,並且與新建的事件源建立了訂閱關係


@Override
public void onNext(T var1) {
//第三層
e.onNext(mapper.call(var1));
}
});
}
});
}
/<object>

這段代碼中,有幾個很重要的地方:

首先,我們必須要清楚,這裡的代碼的執行順序並不是順序執行的,它包含了三層,其中後兩層都是回調層,只有第一層會被立即執行,後面兩層要等到回調時才會執行。

我們先看第一層,我們發現在RxJava在map()中,跟我們創建原始Observable一模一樣地調用了一次create()創建了一個Observable,並將它返回給了下游。我們可以理解為,我們使用map()操作符時,實際上用它創建的新的Observable替換掉了我們自己用create()創建的包含了我們業務代碼的那個Observable。那我們創建的原始Observable去哪裡了呢?接著看。

接著我們看到第二層,在Observable中我們寫源業務代碼的地方(重寫的call()中)調用了subscribe()。subscribe()在上文中我們已經提到,他聯繫起了事件源與訂閱者。要注意的是,在這裡subscribe()的調用者是調用map()的Observable,也就是我們創建的原始Observable。所以它建立訂閱關係的是我們創建的原始Observable與map()創建的新訂閱者。根據上文,這個call()方法的調用時機,是下游也就是我們自己手動調用subscribe()時。而這裡第二層的subscribe()又將回調原始Observable創建時我們重寫的call()。也就是說,這一層的執行順序是向上的,由我們手動調用subscribe()起,向上回調call()並調用上游的subscribe(),直到調用我們創建的原始Observable中重寫的call(),也就是我們的源業務代碼。

我們再看第三層,在新創建的訂閱者中的onNext(),調用了call()參數中的訂閱者,也就是下游由我們創建的,包含我們訂閱業務代碼的訂閱者的onNext()。也就是說,現在我們的源業務代碼執行結果將交給由map()創建的訂閱者,而它又將交給我們的訂閱業務代碼。這裡的實現,有點類似於鏈表中增加一個節點,創建一個新節點,由上游直接指向下游,變為上游指向新節點,新節點再指向下游。

而我們使用map()產生的類型轉換,就發生在mapper.call(var1)中,在第三層。

我們畫個圖總結一下這裡的順序:

RxJava原理及內核分析

RxJava代碼執行流程(map)

黃色是我們上面代碼使用RxJava時調用的三個步驟。由這個流程圖我們可以很清楚地看到,RxJava的整體執行順序分成了三層。

第一層,是我們使用RxJava時進行的一系列鏈式調用,每次調用實際上都創建了一個新的Observable返回給下層,這一層的目的,就是創建一個個Observable,終點是subscribe()的調用,由此開始進入第二層。第一層按順序執行完畢時,這些新建的Observable之間實際上還並沒有建立聯繫。

第二層,是逆向順序的回調層,以我們手動調用的subscribe()開始,逐級向上游回溯。每一級的subscribe()都會調用上級重寫的call(),而中間每一級的call()又會調用subscribe(),直到調用到我們創建的原始Observable中重寫的call(),開始執行我們的源業務代碼,進入第三層。這一層的目的,是逐級建立訂閱關係,我們的訂閱業務代碼被包含在原始Observer中,由我們手動調用subscribe()(黃色subscribe())開始,向上逐級傳遞。這一層執行完畢以後,第一層創建的每個Observable才真正形成了一條鏈。

第三層,是正向順序的回調層,從我們的源業務代碼所在的call()開始,順序執行,並由onNext()將結果傳遞至下一級。可以看到,我們在map()中寫入的告訴RxJava具體如何進行轉換的代碼,同其他業務代碼一樣,也是在這一層執行的。

切換線程

我們再來看RxJava是如何實現切換線程的。通常我們使用RxJava的線程切換功能時,只需要在調用鏈中加上一句subscribeOn()或observeOn()。參數中傳入的是一個Scheduler,這個Scheduler是一個調度器,這裡不具體解釋其運作原理,簡單來說,在我們調用subscribeOn(Schedulers.io())或是subscribeOn(AndroidSchedulers.mainThread())時,RxJava將會依照我們提出的不同線程要求,在不同的線程池中選擇線程為我們執行代碼。

subscribeOn

先看subscribeOn()的簡化源碼:

 public final Observable subscribeOn(Scheduler scheduler){
//第一層
return Observable.create(new ObservableOnSubscribe() {
@Override
public void call(@NonNull ObservableEmitter<object> e) {
//第二層
// 在調度器分配的新線程中執行。
scheduler.createWorker().schedule(new SubscribeTask() {
@Override
public void run() {
source.call(e);
}
});
}
});
}
/<object>

我們發現跟map()類似,它同樣是創建一個新的Observable返回給下游。然而它重寫的call()中,沒有調用subscribe()而是直接調用了上游的call(),因為它除了切換線程之外並沒有其它的事情需要做。而且,調用上游的call()這件事是在使用調度器分配的一個新的線程中進行的。注意,我們在這裡與之前的map()相比發現,切換線程這件事是執行在第二層,訂閱回調層的,而它並沒有創建相應的第三層。也就是說,我們使用subscribeOn()切換線程時,在第二層就已經切換了,這個時候我們所有的業務代碼都還沒有執行,它們要到第三層才會執行!

這裡看圖更加直觀:

RxJava原理及內核分析

RxJava代碼執行流程(subscribeOn)

可以看到subscribeOn()產生的線程切換髮生在代碼執行的第二層,而它的回溯又將會執行在新的線程中。因此,在subscribeOn切換線程以後的流程,均將在新的線程中執行。也就是說,如果我們只調用了subscribeOn()進行線程切換,將改變後面包括源業務代碼、操作符轉換、訂閱業務代碼的所有業務代碼,的執行的線程

RxJava原理及內核分析

RxJava代碼執行流程(多次subscribeOn)

如果我們多次調用subscribeOn()進行線程切換會產生什麼效果呢?從圖中可以看出,雖然在第二層中每次切換以後後續的代碼都會在新線程中執行,但是真正影響到執行層的只有最後一次切換,也就是我們手動調用subscribeOn()的第一次。我們第二次調用subscribeOn()實際上也起了作用,但它隻影響了圖中第二層中的一段代碼的,而我們的業務代碼均執行在第三層,所以我們在平時使用RxJava時是無法感覺到其效果的。

observeOn()

我們再來看observeOn():

public final Observable observeOn(Scheduler scheduler) {
//第一層
return Observable.create(new ObservableOnSubscribe() {
@Override
public void call(@NonNull Observer<object> e) {
//第二層
source.call(new Observer() {
@Override
public void onNext(T var1) {
//第三層
// 在調度器分配的新線程中執行。
scheduler.createWorker().schedule(new Runnable() {
@Override
public void run() {

e.onNext(var1);
}
});
}
});
}
});
}
/<object>

可以看到,observeOn()的結構就跟map()非常相似了,同樣是分了三層。它在第二層與subscribeOn()一樣,調用了上層的call()進行回溯,不同的是在這裡它並沒有在一個新線程中做這件事。observeOn()的線程切換髮生在第三層,也就是在執行層。此時,它上游的業務代碼已經執行了,將結果傳遞到它這裡,它才進行線程的切換並在新線程中繼續向下遊執行。也就是說,observeOn()不會影響調用鏈中在它前面的流程,而在它後面的流程全部都將在切換後的新線程中執行

Observable.create(...) //不受影響
.map(...)//不受影響
.observeOn(線程1)//切換到線程1
.map(...)//在線程1執行
.subscribe(new Observer<object>() {
@Override
public void onNext(@NonNull Object o) {
//在線程1執行
}
});
/<object>

我們再畫個流程圖,比較清楚:

RxJava原理及內核分析

RxJava代碼執行流程(observeOn)

圖中看的非常清楚,兩次調用observeOn()產生的線程切換都發生在第三層執行層,而在切換線程前的業務代碼由於已經執行了,故不受切換線程切換的影響。

這裡我們還看到,在我們多次調用observeOn()進行線程切換時,每次調用都將改變下次調用observeOn()前部分的代碼執行線程,而我們的訂閱業務代碼將執行在鏈中最後一次調用observeOn()指定的線程

總結

RxJava的主幹分為了三層,其中第一層創建Observable,第二層從下游回溯建立起上下游的訂閱關係,第三層開始真正執行我們提供的所有的業務代碼。其中,操作符進行的轉換髮生在第三層,而我們調用subscribeOn()切換線程發生在第二層,調用observeOn()切換線程發生在第三層。我個人給大家整理了Rxjava方面的思維導圖可以看一下:

RxJava原理及內核分析

java語言進階及Android相關技術內核

到此,我們只是通過精簡過的代碼初步瞭解了RxJava的主幹邏輯和基本原理,實際上RxJava的源碼中進行了大量的封裝與解耦,同時還包括了錯誤處理,背壓處理以及訂閱回收等一系列的東西。我們要完全瞭解這個框架的精妙之處,還需要下一番功夫。

關注我私信回覆【學習】獲取高級UI、性能優化、架構師課程、Rxjava、NDK、Kotlin進階、混合式開發(ReactNative+Weex)全方面的Android進階實踐技術分享。


分享到:


相關文章: