深入簡出 RxJava

深入簡出 RxJava

核心對象

ReactiveX 是基於觀察者模式設計的,核心對象只有 Observable 和 Observer。它們最簡單的代碼為:

interface Observable {
 void subscribe(Observer observer);
}
interface Observer {
 void onNext(T t);
}

Observable 的核心方法是 subscribe(),它接收一個 Observer。當調用 subscribe() 的時候,就開始通過調用 Observer 的 onNext() 方法發射數據。

上下游

以下代碼中:

ob1 = Observable.create(Func1);
ob2 = ob1.map(Func2);
ob3 = ob2.subscribeOn(SchedulerA);

ob1 是 ob2 的上游,ob3 是 ob2 的下游。可以看出,對 Observable 進行一次「操作」後會得到一個新的 Observable。

官方定義:Doubt about the terms Upstream vs Downstream · Issue #5022 · ReactiveX/RxJava

操作符

Rx 裡的操作符,例如上面的 map(Func2),其內部實現是這樣(官方命名稍有不同):

ob2 = new MapObservable(ob1, Func2);

所以上一小節的代碼可以寫成:

ob1 = new CreateObservable(Func1);
ob2 = new MapObservable(ob1, Func2);
ob3 = new SubscribeOnObservable(ob2, SchedulerA);

可以說 Rx 裡最重要的是「組合操作符,加工數據流」。

深入簡出 RxJava

官方操作符文檔:ReactiveX - Operators

操作符的內部實現的核心思想是「Wrap」,例如我們可以通過 Wrap Observable 來實現一個最簡單的沒有任何操作的操作符:

class NoOpObservable {
 Observable upstream;
 NoOpObservable(Observable upstream) {
 this.upstream = upstream;
 }
 void subscribe(Observer downstream) {
 upstream.subscribe(downstream);
 }
}

可以看到,在我們 NoOpObservable 的核心方法 subscribe() 裡,我們直接通過調用上游 Observable 的 subscribe() 方法,把下游的 Observer 往上游傳。這樣,我們就成功把下游和上游之間建立聯繫了。

接下來我們增加難度,通過 Wrap Observer(注意是 Observer)來實現一個對數據流進行加工的操作符。舉個例子,MapObservable 就是一個能對數據流進行加工的操作符,它在構造時傳入一個 Func 參數,類型為:

(T) -> U

這個 Func 的作用,就是把上游發射的數據 T 加工成 U 然後繼續往下游傳遞。例如,我們可以使用 Func 把上游發射的數據轉換為 String 再傳遞給下游:

(T t) -> t.toString();

前面有說到,Observable 是通過調用 onNext() 來向 Observer 發射數據的,為了避免 Observable 直接向最下游的 Observer 直接發射數據(因為我們還要進行加工),所以我們需要對 Observer 也進行 Wrap,於是我們可以把 MapObservable 設計成這樣:

class MapObservable {
 MapObservable(Observable upstream, Func mapFunc) { /* ... */ }
 void subscribe(Observer downstream) {
 upstream.subscribe(new WrapObserver(downstream, mapFunc));
 }
 
 class WrapObserver() {
 WrapObserver(Observer downstream, Func mapFunc) { /* ... */ }
 void onNext(T data) {
 U newData = mapFunc.apply(data);
 downstream.onNext(newData);
 }
 }
}

subscribeOn & observeOn

這是 RxJava 中最常用,但又不好理解的兩個操作符。因為它們自身不對數據進行任何加工,而是對其它操作符產生「副作用」。

為了更好地理解這兩個操作符,我製作了個動畫來展示它們的工作流程:


深入簡出 RxJava

圖中分別有 A B C 三個不同的 Scheduler,它們會把 Runnable/Func 扔到不同的線程上去執行,而上圖中不同的顏色代表被不同的 Scheduler 執行。

可以看到,subscribeOn() 其實影響的是上游操作符中的 subscribe() 操作,而 observeOn() 影響的是下游操作符中的 onNext() 操作(這裡是泛指,當然還包括 onComplete()、onError() 等)。

所以,想要確定操作符上的某個 Func 是在哪個 Scheduler 上工作時,先要確定這個 Func 是在操作符上的 subscribe() 還是 onNext() 上執行的。例如 Create 操作符傳入的 Func 是在 subscribe() 上執行的,所以它優先受下游最近的 subscribeOn() 調度。而 Map 操作符傳入的 Func 是在 onNext() 上執行的,所以它優先受上游最近的 observeOn() 調度。

再例如 Collect 操作符:

.collect(()->V func1, (T)->U func2)

傳入的 func1 是在 subscribe() 上執行的,而 func2 是在 onNext() 上執行,所以 func1 和 func2 可能被兩個不同的 Scheduler 調度。

深入簡出 RxJava

私信小編可以免費獲得Java學習資料


分享到:


相關文章: