Reactor 3快速上手

1.3.2 Project Reactor

Project Reactor(以下簡稱“Reactor”)與Spring是兄弟項目,側重於Server端的響應式編程,主要 artifact 是 reactor-core,這是一個基於 Java 8 的實現了響應式流規範 (Reactive Streams specification)的響應式庫。

本文對Reactor的介紹以基本的概念和簡單的使用為主,深度以能夠滿足基本的Spring WebFlux使用為準。在下一章,我會結合Reactor的設計模式、併發調度模型等原理層面的內容系統介紹Reactor的使用。

光說不練假把式,我們先把練習用的項目搭起來。先創建一個maven項目,然後添加依賴:

 <dependency>
<groupid>io.projectreactor/<groupid>
<artifactid>reactor-core/<artifactid>
<version>3.1.4.RELEASE/<version>
/<dependency>

最新版本可到 http://search.maven.org 查詢,複製過來即可。另外出於測試的需要,添加如下依賴:

 <dependency>
<groupid>io.projectreactor/<groupid>
<artifactid>reactor-test/<artifactid>
<version>3.1.4.RELEASE/<version>
<scope>test/<scope>
/<dependency>
<dependency>
<groupid>junit/<groupid>
<artifactid>junit/<artifactid>
<version>4.12/<version>
<scope>test/<scope>
/<dependency>

好了,我們開始Coding吧。

1.3.2.1 Flux與Mono

Reactor中的發佈者(Publisher)由Flux和Mono兩個類定義,它們都提供了豐富的操作符(operator)。一個Flux對象代表一個包含0..N個元素的響應式序列,而一個Mono對象代表一個包含零/一個(0..1)元素的結果。

既然是“數據流”的發佈者,Flux和Mono都可以發出三種“數據信號”:元素值、錯誤信號、完成信號,錯誤信號和完成信號都是終止信號,完成信號用於告知下游訂閱者該數據流正常結束,錯誤信號終止數據流的同時將錯誤傳遞給下游訂閱者。

下圖所示就是一個Flux類型的數據流,黑色箭頭是時間軸。它連續發出“1” - “6”共6個元素值,以及一個完成信號(圖中⑥後邊的加粗豎線來表示),完成信號告知訂閱者數據流已經結束。

Reactor 3快速上手

(4)Reactor 3快速上手——響應式Spring的道法術器

下圖所示是一個Mono類型的數據流,它發出一個元素值後,又發出一個完成信號。

Reactor 3快速上手

(4)Reactor 3快速上手——響應式Spring的道法術器

既然Flux具有發佈一個數據元素的能力,為什麼還要專門定義一個Mono類呢?舉個例子,一個HTTP請求產生一個響應,所以對其進行“count”操作是沒有多大意義的。表示這樣一個結果的話,應該用Mono<httpresponse>而不是Flux<httpresponse>,對於的操作通常只用於處理 0/1 個元素。它們從語義上就原生包含著元素個數的信息,從而避免了對Mono對象進行多元素場景下的處理。/<httpresponse>/<httpresponse>

有些操作可以改變基數,從而需要切換類型。比如,count操作用於Flux,但是操作返回的結果是Mono<long>。/<long>

我們可以用如下代碼聲明上邊兩幅圖所示的Flux和Mono:

Flux.just(1, 2, 3, 4, 5, 6);
Mono.just(1);


Flux和Mono提供了多種創建數據流的方法,just就是一種比較直接的聲明數據流的方式,其參數就是數據元素。

對於圖中的Flux,還可以通過如下方式聲明(分別基於數組、集合和Stream生成):

Integer[] array = new Integer[]{1,2,3,4,5,6};
Flux.fromArray(array);
List<integer> list = Arrays.asList(array);

Flux.fromIterable(list);
Stream<integer> stream = list.stream();
Flux.fromStream(stream);
/<integer>/<integer>

不過,這三種信號都不是一定要具備的:

  • 首先,錯誤信號和完成信號都是終止信號,二者不可能同時共存;
  • 如果沒有發出任何一個元素值,而是直接發出完成/錯誤信號,表示這是一個空數據流;
  • 如果沒有錯誤信號和完成信號,那麼就是一個無限數據流。

比如,對於只有完成/錯誤信號的數據流:

// 只有完成信號的空數據流
Flux.just();
Flux.empty();
Mono.empty();
Mono.justOrEmpty(Optional.empty());
// 只有錯誤信號的數據流
Flux.error(new Exception("some error"));
Mono.error(new Exception("some error"));

你可能會納悶,空的數據流有什麼用?舉個例子,當我們從響應式的DB中獲取結果的時候(假設DAO層是ReactiveRepository<user>),就有可能為空:/<user>

 Mono<user> findById(long id); 

Flux<user> findAll();
/<user>/<user>


無論是空還是發生異常,都需要通過完成/錯誤信號告知訂閱者,已經查詢完畢,但是抱歉沒有得到值,禮貌問題嘛~

1.3.2.2 訂閱前什麼都不會發生

數據流有了,假設我們想把每個數據元素原封不動地打印出來:

Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
System.out.println();
Mono.just(1).subscribe(System.out::println);

輸出如下:

123456
1


可見,subscribe方法中的lambda表達式作用在了每一個數據元素上。此外,Flux和Mono還提供了多個subscribe方法的變體:

// 訂閱並觸發數據流
subscribe();
// 訂閱並指定對正常數據元素如何處理
subscribe(Consumer super T> consumer);
// 訂閱並定義對正常數據元素和錯誤信號的處理

subscribe(Consumer super T> consumer,
Consumer super Throwable> errorConsumer);
// 訂閱並定義對正常數據元素、錯誤信號和完成信號的處理
subscribe(Consumer super T> consumer,
Consumer super Throwable> errorConsumer,
Runnable completeConsumer);
// 訂閱並定義對正常數據元素、錯誤信號和完成信號的處理,以及訂閱發生時的處理邏輯
subscribe(Consumer super T> consumer,
Consumer super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer super Subscription> subscriptionConsumer);
Flux.just(1, 2, 3, 4, 5, 6).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Completed!"));

輸出如下:

1
2
3
4
5
6
Completed!

2)再舉一個有錯誤信號的例子:

Mono.error(new Exception("some error")).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Completed!")
);

輸出如下:

java.lang.Exception: some error 


打印出了錯誤信號,沒有輸出Completed!表明沒有發出完成信號。

這裡需要注意的一點是,Flux.just(1, 2, 3, 4, 5, 6)僅僅聲明瞭這個數據流,此時數據元素並未發出,只有subscribe()方法調用的時候才會觸發數據流。所以,訂閱前什麼都不會發生

1.3.2.3 測試與調試

從命令式和同步式編程切換到響應式和異步式編程有時候是令人生畏的。學習曲線中最陡峭的地方就是出錯時如何分析和調試。

在命令式世界,調試通常都是非常直觀的:直接看 stack trace 就可以找到問題出現的位置, 以及其他信息:是否問題責任全部出在你自己的代碼?問題是不是發生在某些庫代碼?如果是, 那你的哪部分代碼調用了庫,是不是傳參不合適導致的問題?等等。

當你切換到響應式的異步代碼,事情就變得複雜的多了。不過我們先不接觸過於複雜的內容,先了解一個基本的單元測試工具——StepVerifier。

最常見的測試 Reactor 序列的場景就是定義一個 Flux 或 Mono,然後在訂閱它的時候測試它的行為。

當你的測試關注於每一個數據元素的時候,就非常貼近使用 StepVerifier 的測試場景:下一個期望的數據或信號是什麼?你是否期望使用 Flux 來發出某一個特別的值?或者是否接下來 300ms 什麼都不做?——所有這些都可以使用 StepVerifier API 來表示。

還是以那個1-6的Flux以及會發出錯誤信號的Mono為例:

private Flux<integer> generateFluxFrom1To6() {
return Flux.just(1, 2, 3, 4, 5, 6);
}
private Mono<integer> generateMonoWithError() {
return Mono.error(new Exception("some error"));
}
@Test
public void testViaStepVerifier() {
StepVerifier.create(generateFluxFrom1To6())
.expectNext(1, 2, 3, 4, 5, 6)
.expectComplete()
.verify();
StepVerifier.create(generateMonoWithError())
.expectErrorMessage("some error")
.verify();
}
/<integer>/<integer>

其中,expectNext用於測試下一個期望的數據元素,expectErrorMessage用於校驗下一個元素是否為錯誤信號,expectComplete用於測試下一個元素是否為完成信號。

StepVerifier還提供了其他豐富的測試方法,我們會在後續的介紹中陸續接觸到。

1.3.2.4 操作符(Operator)

通常情況下,我們需要對源發佈者發出的原始數據流進行多個階段的處理,並最終得到我們需要的數據。這種感覺就像是一條流水線,從流水線的源頭進入傳送帶的是原料,經過流水線上各個工位的處理,逐漸由原料變成半成品、零件、組件、成品,最終成為消費者需要的包裝品。這其中,流水線源頭的下料機就相當於源發佈者,消費者就相當於訂閱者,流水線上的一道道工序就相當於一個一個的操作符(Operator)。

下面介紹一些我們常用的操作符。

1)map - 元素映射為新元素

map操作可以將數據元素進行轉換/映射,得到一個新元素。

Reactor 3快速上手

map

public final  Flux map(Function super T,? extends V> mapper)
public final Mono map(Function super T, ? extends R> mapper)


上圖是Flux的map操作示意圖,上方的箭頭是原始序列的時間軸,下方的箭頭是經過map處理後的數據序列時間軸。

map接受一個Function的函數式接口為參數,這個函數式的作用是定義轉換操作的策略。舉例說明:

StepVerifier.create(Flux.range(1, 6) // 1
.map(i -> i * i)) // 2
.expectNext(1, 4, 9, 16, 25, 36) //3
.expectComplete(); // 4
  1. Flux.range(1, 6)用於生成從“1”開始的,自增為1的“6”個整型數據;
  2. map接受lambdai -> i * i為參數,表示對每個數據進行平方;
  3. 驗證新的序列的數據;
  4. verifyComplete()相當於expectComplete().verify()。

2)flatMap - 元素映射為流

flatMap操作可以將每個數據元素轉換/映射為一個流,然後將這些流合併為一個大的數據流。

Reactor 3快速上手

flatMap

注意到,流的合併是異步的,先來先到,並非是嚴格按照原始序列的順序(如圖藍色和紅色方塊是交叉的)。

public final  Flux flatMap(Function super T, ? extends Publisher extends R>> mapper)
public final Mono flatMap(Function super T, ? extends Mono extends R>> transformer)


flatMap也是接收一個Function的函數式接口為參數,這個函數式的輸入為一個T類型數據值,對於Flux來說輸出可以是Flux和Mono,對於Mono來說輸出只能是Mono。舉例說明:

 StepVerifier.create(
Flux.just("flux", "mono")
.flatMap(s -> Flux.fromArray(s.split("\\\\s*")) // 1
.delayElements(Duration.ofMillis(100))) // 2
.doOnNext(System.out::print)) // 3
.expectNextCount(8) // 4
.verifyComplete();
  1. 對於每一個字符串s,將其拆分為包含一個字符的字符串流;
  2. 對每個元素延遲100ms;
  3. 對每個元素進行打印(注doOnNext方法是“偷窺式”的方法,不會消費數據流);
  4. 驗證是否發出了8個元素。

打印結果為mfolnuox,原因在於各個拆分後的小字符串都是間隔100ms發出的,因此會交叉。

flatMap通常用於每個元素又會引入數據流的情況,比如我們有一串url數據流,需要請求每個url並收集response數據。假設響應式的請求方法如下:

Mono<httpresponse> requestUrl(String url) {...}
/<httpresponse>


而url數據流為一個Flux<string> urlFlux,那麼為了得到所有的HttpResponse,就需要用到flatMap:/<string>

urlFlux.flatMap(url -> requestUrl(url));


其返回內容為Flux<httpresponse>類型的HttpResponse流。/<httpresponse>

3)filter - 過濾

filter操作可以對數據元素進行篩選。

Reactor 3快速上手

filter

public final Flux filter(Predicate super T> tester)
public final Mono filter(Predicate super T> tester)


filter接受一個Predicate的函數式接口為參數,這個函數式的作用是進行判斷並返回boolean。舉例說明:

StepVerifier.create(Flux.range(1, 6)
.filter(i -> i % 2 == 1) // 1
.map(i -> i * i))
.expectNext(1, 9, 25) // 2
.verifyComplete();
  1. filter的lambda參數表示過濾操作將保留奇數;
  2. 驗證僅得到奇數的平方。


4)zip - 一對一合併

看到zip這個詞可能會聯想到拉鍊,它能夠將多個流一對一的合併起來。zip有多個方法變體,我們介紹一個最常見的二合一的。

Reactor 3快速上手

zip

它對兩個Flux/Mono流每次各取一個元素,合併為一個二元組(Tuple2):

public static  Flux<tuple2>> zip(Publisher extends T1> source1,
Publisher extends T2> source2)
public static Mono<tuple2>> zip(Mono extends T1> p1, Mono extends T2> p2)
/<tuple2>
/<tuple2>

Flux的zip方法接受Flux或Mono為參數,Mono的zip方法只能接受Mono類型的參數。

舉個例子,假設我們有一個關於zip方法的說明:“Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.”,我們希望將這句話拆分為一個一個的單詞並以每200ms一個的速度發出,除了前面flatMap的例子中用到的delayElements,可以如下操作:

private Flux<string> getZipDescFlux() {
String desc = "Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.";
return Flux.fromArray(desc.split("\\\\s+")); // 1
}
@Test
public void testSimpleOperators() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1); // 2
Flux.zip(
getZipDescFlux(),
Flux.interval(Duration.ofMillis(200))) // 3
.subscribe(t -> System.out.println(t.getT1()), null, countDownLatch::countDown); // 4
countDownLatch.await(10, TimeUnit.SECONDS); // 5
}
/<string>
  1. 將英文說明用空格拆分為字符串流;
  2. 定義一個CountDownLatch,初始為1,則會等待執行1次countDown方法後結束,不使用它的話,測試方法所在的線程會直接返回而不會等待數據流發出完畢;
  3. 使用Flux.interval聲明一個每200ms發出一個元素的long數據流;因為zip操作是一對一的,故而將其與字符串流zip之後,字符串流也將具有同樣的速度;
  4. zip之後的流中元素類型為Tuple2,使用getT1方法拿到字符串流的元素;定義完成信號的處理為countDown;
  5. countDownLatch.await(10, TimeUnit.SECONDS)會等待countDown倒數至0,最多等待10秒鐘。

除了zip靜態方法之外,還有zipWith等非靜態方法,效果與之類似:

getZipDescFlux().zipWith(Flux.interval(Duration.ofMillis(200)))


在異步條件下,數據流的流速不同,使用zip能夠一對一地將兩個或多個數據流的元素對齊發出。

5)更多

Reactor中提供了非常豐富的操作符,除了以上幾個常見的,還有:

  • 用於編程方式自定義生成數據流的create和generate等及其變體方法;
  • 用於“無副作用的peek”場景的doOnNext、doOnError、doOncomplete、doOnSubscribe、doOnCancel等及其變體方法;
  • 用於數據流轉換的when、and/or、merge、concat、collect、count、repeat等及其變體方法;
  • 用於過濾/揀選的take、first、last、sample、skip、limitRequest等及其變體方法;
  • 用於錯誤處理的timeout、onErrorReturn、onErrorResume、doFinally、retryWhen等及其變體方法;
  • 用於分批的window、buffer、group等及其變體方法;
  • 用於線程調度的publishOn和subscribeOn方法。

使用這些操作符,你幾乎可以搭建出能夠進行任何業務需求的數據處理管道/流水線。

抱歉以上這些暫時不能一一介紹,更多詳情請參考JavaDoc,在下一章我們還會回頭對Reactor從更深層次進行系統的分析。

此外,也可閱讀我翻譯的Reactor參考文檔,我會盡量及時更新翻譯的內容。文檔源碼位於github,如有翻譯不當,歡迎提交Pull-Request。

1.3.2.5 調度器與線程模型

在Reactor中,對於多線程併發調度的處理變得異常簡單。

在以往的多線程開發場景中,我們通常使用Executors工具類來創建線程池,通常有如下四種類型:

  • newCachedThreadPool創建一個彈性大小緩存線程池,如果線程池長度超過處理需要,可靈活回收空閒線程,若無可回收,則新建線程;
  • newFixedThreadPool創建一個大小固定的線程池,可控制線程最大併發數,超出的線程會在隊列中等待;
  • newScheduledThreadPool創建一個大小固定的線程池,支持定時及週期性的任務執行;
  • newSingleThreadExecutor創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

此外,newWorkStealingPool還可以創建支持work-stealing的線程池。

說良心話,Java提供的Executors工具類使得我們對ExecutorService使用已經非常得心應手了。BUT~ Reactor讓線程管理和任務調度更加“傻瓜”——調度器(Scheduler)幫助我們搞定這件事。Scheduler是一個擁有多個實現類的抽象接口。Schedulers類(按照通常的套路,最後為s的就是工具類咯)提供的靜態方法可搭建以下幾種線程執行環境:

  • 當前線程(Schedulers.immediate());
  • 可重用的單線程(Schedulers.single())。注意,這個方法對所有調用者都提供同一個線程來使用, 直到該調度器被廢棄。如果你想使用獨佔的線程,請使用Schedulers.newSingle();
  • 彈性線程池(Schedulers.elastic())。它根據需要創建一個線程池,重用空閒線程。線程池如果空閒時間過長 (默認為 60s)就會被廢棄。對於 I/O 阻塞的場景比較適用。Schedulers.elastic()能夠方便地給一個阻塞 的任務分配它自己的線程,從而不會妨礙其他任務和資源;
  • 固定大小線程池(Schedulers.parallel()),所創建線程池的大小與CPU個數等同;
  • 自定義線程池(Schedulers.fromExecutorService(ExecutorService))基於自定義的ExecutorService創建 Scheduler(雖然不太建議,不過你也可以使用Executor來創建)。

Schedulers類已經預先創建了幾種常用的線程池:使用single()、elastic()和parallel()方法可以分別使用內置的單線程、彈性線程池和固定大小線程池。如果想創建新的線程池,可以使用newSingle()、newElastic()和newParallel()方法。

Executors提供的幾種線程池在Reactor中都支持:

  • Schedulers.single()和Schedulers.newSingle()對應Executors.newSingleThreadExecutor();
  • Schedulers.elastic()和Schedulers.newElastic()對應Executors.newCachedThreadPool();
  • Schedulers.parallel()和Schedulers.newParallel()對應Executors.newFixedThreadPool();
  • 下一章會介紹到,Schedulers提供的以上三種調度器底層都是基於ScheduledExecutorService的,因此都是支持任務定時和週期性執行的;
  • Flux和Mono的調度操作符subscribeOn和publishOn支持work-stealing。

舉例:將同步的阻塞調用變為異步的

前面介紹到Schedulers.elastic()能夠方便地給一個阻塞的任務分配專門的線程,從而不會妨礙其他任務和資源。我們就可以利用這一點將一個同步阻塞的調用調度到一個自己的線程中,並利用訂閱機制,待調用結束後異步返回。

假設我們有一個同步阻塞的調用方法:

private String getStringSync() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello, Reactor!";
}

正常情況下,調用這個方法會被阻塞2秒鐘,然後同步地返回結果。我們藉助elastic調度器將其變為異步,由於是異步的,為了保證測試方法所在的線程能夠等待結果的返回,我們使用CountDownLatch:

@Test
public void testSyncToAsync() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Mono.fromCallable(() -> getStringSync()) // 1
.subscribeOn(Schedulers.elastic()) // 2
.subscribe(System.out::println, null, countDownLatch::countDown);
countDownLatch.await(10, TimeUnit.SECONDS);
}
  1. 使用fromCallable聲明一個基於Callable的Mono;
  2. 使用subscribeOn將任務調度到Schedulers內置的彈性線程池執行,彈性線程池會為Callable的執行任務分配一個單獨的線程。


切換調度器的操作符

Reactor 提供了兩種在響應式鏈中調整調度器 Scheduler的方法:publishOn和subscribeOn。它們都接受一個 Scheduler作為參數,從而可以改變調度器。但是publishOn在鏈中出現的位置是有講究的,而subscribeOn 則無所謂。

Reactor 3快速上手

調度

假設與上圖對應的代碼是:Flux.range(1, 1000).map(…).publishOn(Schedulers.elastic()).filter(…).publishOn(Schedulers.parallel()).flatMap(…).subscribeOn(Schedulers.single())
  • 如圖所示,publishOn會影響鏈中其後的操作符,比如第一個publishOn調整調度器為elastic,則filter的處理操作是在彈性線程池中執行的;同理,flatMap是執行在固定大小的parallel線程池中的;
  • subscribeOn無論出現在什麼位置,都隻影響源頭的執行環境,也就是range方法是執行在單線程中的,直至被第一個publishOn切換調度器之前,所以range後的map也在單線程中執行。


關於publishOn和subscribeOn為什麼會出現如此的調度策略,需要深入討論Reactor的實現原理,我們將在下一章展開。

1.3.2.6 錯誤處理

在響應式流中,錯誤(error)是終止信號。當有錯誤發生時,它會導致流序列停止,並且錯誤信號會沿著操作鏈條向下傳遞,直至遇到subscribe中的錯誤處理方法。這樣的錯誤還是應該在應用層面解決的。否則,你可能會將錯誤信息顯示在用戶界面,或者通過某個REST endpoint發出。所以還是建議在subscribe時通過錯誤處理方法妥善解決錯誤。

@Test
public void testErrorHandling() {
Flux.range(1, 6)
.map(i -> 10/(i-3)) // 1
.map(i -> i*i)
.subscribe(System.out::println, System.err::println);
}
  1. 當i為3時會導致異常。


輸出為:

25
100
java.lang.ArithmeticException: / by zero //注:這一行是紅色,表示標準錯誤輸出

subscribe方法的第二個參數定義了對錯誤信號的處理,從而測試方法exit為0(即正常退出),可見錯誤沒有蔓延出去。不過這還不夠~

此外,Reactor還提供了其他的用於在鏈中處理錯誤的操作符(error-handling operators),使得對於錯誤信號的處理更加及時,處理方式更加多樣化。

在討論錯誤處理操作符的時候,我們藉助命令式編程風格的 try 代碼塊來作比較。我們都很熟悉在 try-catch 代碼塊中處理異常的幾種方法。常見的包括如下幾種:

  1. 捕獲並返回一個靜態的缺省值。
  2. 捕獲並執行一個異常處理方法或動態計算一個候補值來頂替。
  3. 捕獲,並再包裝為某一個 業務相關的異常,然後再拋出業務異常。
  4. 捕獲,記錄錯誤日誌,然後繼續拋出。
  5. 使用 finally 來清理資源,或使用 Java 7 引入的 "try-with-resource"。

以上所有這些在 Reactor 都有相應的基於 error-handling 操作符處理方式。

1. 捕獲並返回一個靜態的缺省值

onErrorReturn方法能夠在收到錯誤信號的時候提供一個缺省值:

Flux.range(1, 6)
.map(i -> 10/(i-3))
.onErrorReturn(0) // 1
.map(i -> i*i)
.subscribe(System.out::println, System.err::println);
  1. 當發生異常時提供一個缺省值0


輸出如下:

25
100
0

2. 捕獲並執行一個異常處理方法或計算一個候補值來頂替

onErrorResume方法能夠在收到錯誤信號的時候提供一個新的數據流:

Flux.range(1, 6)
.map(i -> 10/(i-3))
.onErrorResume(e -> Mono.just(new Random().nextInt(6))) // 提供新的數據流
.map(i -> i*i)
.subscribe(System.out::println, System.err::println);

輸出如下:

25
100
16

舉一個更有業務含義的例子:

Flux.just(endpoint1, endpoint2)
.flatMap(k -> callExternalService(k)) // 1
.onErrorResume(e -> getFromCache(k)); // 2
  1. 調用外部服務;
  2. 如果外部服務異常,則從緩存中取值代替。


3. 捕獲,並再包裝為某一個業務相關的異常,然後再拋出業務異常

有時候,我們收到異常後並不想立即處理,而是會包裝成一個業務相關的異常交給後續的邏輯處理,可以使用onErrorMap方法:

Flux.just("timeout1")
.flatMap(k -> callExternalService(k)) // 1
.onErrorMap(original -> new BusinessException("SLA exceeded", original)); // 2
  1. 調用外部服務;
  2. 如果外部服務異常,將其包裝為業務相關的異常後再次拋出。


這一功能其實也可以用onErrorResume實現,略麻煩一點:

Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("SLA exceeded", original)
);

4. 捕獲,記錄錯誤日誌,然後繼續拋出

如果對於錯誤你只是想在不改變它的情況下做出響應(如記錄日誌),並讓錯誤繼續傳遞下去, 那麼可以用doOnError 方法。前面提到,形如doOnXxx是隻讀的,對數據流不會造成影響:

Flux.just(endpoint1, endpoint2)
.flatMap(k -> callExternalService(k))
.doOnError(e -> { // 1
log("uh oh, falling back, service failed for key " + k); // 2
})
.onErrorResume(e -> getFromCache(k));

  1. 只讀地拿到錯誤信息,錯誤信號會繼續向下遊傳遞;
  2. 記錄日誌。


5. 使用 finally 來清理資源,或使用 Java 7 引入的 "try-with-resource"

Flux.using(
() -> getResource(), // 1
resource -> Flux.just(resource.getAll()), // 2
MyResource::clean // 3
);
  1. 第一個參數獲取資源;
  2. 第二個參數利用資源生成數據流;
  3. 第三個參數最終清理資源。

另一方面, doFinally在序列終止(無論是 onComplete、onError還是取消)的時候被執行, 並且能夠判斷是什麼類型的終止事件(完成、錯誤還是取消),以便進行針對性的清理。如:

LongAdder statsCancel = new LongAdder(); // 1
Flux<string> flux =
Flux.just("foo", "bar")
.doFinally(type -> {

if (type == SignalType.CANCEL) // 2
statsCancel.increment(); // 3
})
.take(1); // 4
/<string>
  1. 用LongAdder進行統計;
  2. doFinally用SignalType檢查了終止信號的類型;
  3. 如果是取消,那麼統計數據自增;
  4. take(1)能夠在發出1個元素後取消流。

重試

還有一個用於錯誤處理的操作符你可能會用到,就是retry,見文知意,用它可以對出現錯誤的序列進行重試。

請注意:**retry對於上游Flux是採取的重訂閱(re-subscribing)的方式,因此重試之後實際上已經一個不同的序列了, 發出錯誤信號的序列仍然是終止了的。舉例如下:

Flux.range(1, 6)
.map(i -> 10 / (3 - i))
.retry(1)
.subscribe(System.out::println, System.err::println);
Thread.sleep(100); // 確保序列執行完

輸出如下:

5 

10
5
10
java.lang.ArithmeticException: / by zero

可見,retry不過是再一次從新訂閱了原始的數據流,從1開始。第二次,由於異常再次出現,便將異常傳遞到下游了。

1.3.2.7 回壓

前邊的例子並沒有進行流量控制,也就是,當執行.subscribe(System.out::println)這樣的訂閱的時候,直接發起了一個無限的請求(unbounded request),就是對於數據流中的元素無論快慢都“照單全收”。

subscribe方法還有一個變體:

// 接收一個Subscriber為參數,該Subscriber可以進行更加靈活的定義
subscribe(Subscriber subscriber)
注:其實這才是subscribe方法本尊,前邊介紹到的可以接收0~4個函數式接口為參數的subscribe最終都是拼裝為這個方法,所以按理說前邊的subscribe方法才是“變體”。

我們可以通過自定義具有流量控制能力的Subscriber進行訂閱。Reactor提供了一個BaseSubscriber,我們可以通過擴展它來定義自己的Subscriber。

假設,我們現在有一個非常快的Publisher——Flux.range(1, 6),然後自定義一個每秒處理一個數據元素的慢的Subscriber,Subscriber就需要通過request(n)的方法來告知上游它的需求速度。代碼如下:

@Test
public void testBackpressure() {
Flux.range(1, 6) // 1
.doOnRequest(n -> System.out.println("Request " + n + " values...")) // 2
.subscribe(new BaseSubscriber<integer>() { // 3
@Override
protected void hookOnSubscribe(Subscription subscription) { // 4
System.out.println("Subscribed and make a request...");
request(1); // 5
}
@Override
protected void hookOnNext(Integer value) { // 6
try {
TimeUnit.SECONDS.sleep(1); // 7
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Get value [" + value + "]"); // 8
request(1); // 9
}
});
}
/<integer>
  1. Flux.range是一個快的Publisher;
  2. 在每次request的時候打印request個數;
  3. 通過重寫BaseSubscriber的方法來自定義Subscriber;
  4. hookOnSubscribe定義在訂閱的時候執行的操作;
  5. 訂閱時首先向上游請求1個元素;
  6. hookOnNext定義每次在收到一個元素的時候的操作;
  7. sleep 1秒鐘來模擬慢的Subscriber;
  8. 打印收到的元素;
  9. 每次處理完1個元素後再請求1個。

輸出如下(我們也可以使用log()來打印類似下邊的輸出,以代替上邊代碼中的System.out.println):

Subscribed and make a request...
Request 1 values...
Get value [1]
Request 1 values...
Get value [2]
Request 1 values...
Get value [3]
Request 1 values...
Get value [4]
Request 1 values...
Get value [5]
Request 1 values...
Get value [6]
Request 1 values...

這6個元素是以每秒1個的速度被處理的。由此可見range方法生成的Flux採用的是緩存的回壓策略,能夠緩存下游暫時來不及處理的元素。

1.3.2.8 總結

以上關於Reactor的介紹主要是概念層面和使用層面的介紹,不過應該也足以應對常見的業務環境了。

從命令式編程到響應式編程的切換並不是一件容易的事,需要一個適應的過程。不過相信你通過本節的瞭解和實操,已經可以體會到使用Reactor編程的一些特點:

  • 相對於傳統的基於回調和Future的異步開發方式,響應式編程更加具有可編排性和可讀性,配合lambda表達式,代碼更加簡潔,處理邏輯的表達就像裝配“流水線”,適用於對數據流的處理;
  • 訂閱(subscribe)時才觸發數據流,這種數據流叫做“冷”數據流,就像插座插上電器才會有電流一樣,還有一種數據流不管是否有訂閱者訂閱它都會一直髮出數據,稱之為“熱”數據流,Reactor中幾乎都是“冷”數據流;
  • 調度器對線程管理進行更高層次的抽象,使得我們可以非常容易地切換線程執行環境;
  • 靈活的錯誤處理機制有利於編寫健壯的程序;
  • “回壓”機制使得訂閱者可以無限接受數據並讓它的源頭“滿負荷”推送所有的數據,也可以通過使用request方法來告知源頭它一次最多能夠處理 n 個元素,從而將“推送”模式轉換為“推送+拉取”混合的模式。

後續隨著對Reactor的瞭解我們還會逐漸瞭解它更多的好玩又好用的特性。

Reactor的開發者中也有來自RxJava的大牛,因此Reactor中甚至許多方法名都是來自RxJava的API的,學習了Reactor之後,很輕鬆就可以上手Rx家族的庫了。


分享到:


相關文章: