Reactive 理解 SpringBoot 響應式的核心-Reactor

一、前言

關於 響應式 Reactive,前面的兩篇文章談了不少概念,基本都離不開下面兩點:

  • 響應式編程是面向流的、異步化的開發方式
  • 響應式是非常通用的概念,無論在前端領域、還是實時流、離線處理場景中都是適用的。

有興趣的朋友可以看看這兩篇文章:

Reactive(1) 從響應式編程到“好萊塢”Reactive(2) 響應式流與制奶廠業務

這次,我們把目光轉向 SpringBoot,在SpringBoot 2.0版本之後,提供了對響應式編程的全面支持。因此在升級到 2.x版本之後,便能方便的實現事件驅動模型的後端編程,這其中離不開 webflux這個模塊。其同時也被 Spring 5 用作開發響應式 web 應用的核心基礎。 那麼, webflux 是一個怎樣的東西?

Webflux

Webflux 模塊的名稱是 spring-webflux,名稱中的 Flux 來源於 Reactor 中的類 Flux。該模塊中包含了對 響應式 HTTP、服務器推送 和 WebSocket 的支持。

Webflux 支持兩種不同的編程模型:

  • 第一種是 Spring MVC 中使用了基於 Java 註解的方式,一個使用Reactive風格的Controller如下所示:
<code>@RestController
public class EchoController {
@GetMapping("/echo")
public Mono<string> sayHelloWorld() {
return Mono.just("Echo!");
}
}/<string>/<code>
  • 第二種是 基於 Java 8 的 lambda 表達式的函數式編程模型。

這兩種編程模型只是在代碼編寫方式上存在不同,但底層的基礎模塊仍然是一樣的。除此之外,Webflux 可以運行在支持 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上,或是其他異步運行時環境,如 Netty 和 Undertow。

關於Webflux 與 SpringMVC 的區別,可以參考下圖:

Reactive  理解 SpringBoot 響應式的核心-Reactor

SpringBoot、Webflux、Reactor 可以說是層層包含的關係,其中,響應式能力的核心仍然是來自 Reactor組件。由此可見,掌握Reactor的用法 必然是熟練進行 Spring 響應式編程的重點。

二、 Mono 與 Flux

在理解響應式Web編程之前,我們需要對Reactor 兩個核心概念做一些澄清,一個是Mono,另一個是Flux。

Flux 表示的是包含 0 到 N 個元素的異步序列。在該序列中可以包含三種不同類型的消息通知:

  • 正常的包含元素的消息
  • 序列結束的消息
  • 序列出錯的消息

當消息通知產生時,訂閱者中對應的方法 onNext(), onComplete()和 onError()會被調用。

Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。Flux 和 Mono 之間可以進行轉換,比如對一個 Flux 序列進行計數操作,得到的結果是一個 Mono對象,或者把兩個 Mono 序列合併在一起,得到的是一個 Flux 對象。

構造器

Reactor提供了非常方便的API來創建 Flux、Mono 對象,如下:

使用靜態工廠類創建Flux

<code>Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);/<code>
  • just():可以指定序列中包含的全部元素。創建出來的 Flux 序列在發佈這些元素之後會自動結束。
  • fromArray():可以從一個數組、Iterable 對象或 Stream 對象中創建 Flux 對象。
  • empty():創建一個不包含任何元素,只發布結束消息的序列。
  • range(int start, int count):創建包含從 start 起始的 count 個數量的 Integer 對象的序列。
  • interval(Duration period)和 interval(Duration delay, Duration period):創建一個包含了從 0 開始遞增的 Long 對象的序列。其中包含的元素按照指定的間隔來發布。除了間隔時間之外,還可以指定起始元素髮布之前的延遲時間。

除了上述的方式之外,還可以使用 generate()、create()方法來自定義流數據的產生過程:

generate()

<code>Flux.generate(sink -> {
sink.next("Echo");
sink.complete();
}).subscribe(System.out::println);/<code>

generate 只提供序列中單個消息的產生邏輯(同步通知),其中的 sink.next()最多隻能調用一次,比如上面的代碼中,產生一個Echo消息後就結束了。

create()

<code>Flux.create(sink -> {
for (char i = 'a'; i <= 'z'; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::print);/<code>

create 提供的是整個序列的產生邏輯,sink.next()可以調用多次(異步通知),如上面的代碼將會產生a-z的小寫字母。

使用靜態工廠類創建Mono

Mono 的創建方式與 Flux 是很相似的。 除了Flux 所擁有的構造方式之外,還可以支持與Callable、Runnable、Supplier 等接口集成。

參考下面的代碼:

<code>Mono.fromSupplier(() -> "Mono1").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Mono2")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Mono3")).subscribe(System.out::println);/<code>

三、 流計算

1. 緩衝

在Reactive(1) 從響應式編程到“好萊塢” 一文中曾經提到過緩衝(buffer)的概念。buffer 是流處理中非常常用的一種處理,意思就是將流的一段截停後再做處理。

比如下面的代碼:

<code>Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.interval(Duration.of(0, ChronoUnit.SECONDS),
Duration.of(1, ChronoUnit.SECONDS))
.buffer(Duration.of(5, ChronoUnit.SECONDS)).
take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);/<code>

第一個buffer(20)是指湊足20個數字後再進行處理,該語句會輸出5組數據(按20分組)第二個buffer(Duration duration)是指湊足一段時間後的數據再近些處理,這裡是5秒鐘做一次處理第三個bufferUtil(Predicate p)是指等到某個元素滿足斷言(條件)時進行收集處理,這裡將會輸出[1,2],[3,4]..這樣的奇偶數字對第四個bufferWhile(Predicate p)則僅僅是收集滿足斷言(條件)的元素,這裡將會輸出2,4,6..這樣的偶數

與 buffer 類似的是window函數,後者的不同在於其在緩衝截停後並不會輸出一些元素列表,而是直接轉換為Flux對象,如下:

<code>Flux.range(1, 100).window(20)
.subscribe(flux ->
flux.buffer(5).subscribe(System.out::println));/<code>

window(20)返回的結果是一個Flux類型的對象,我們進而對其進行了緩衝處理。
因此上面的代碼會按5個一組輸出:

<code>[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]
[11, 12, 13, 14, 15]
.../<code>

2. 過濾/提取

上面的bufferWhile 其實充當了過濾的作用,當然,對於流元素的過濾也可以使用filter函數來處理:

<code>Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);/<code>

take 函數可以用來提取想要的元素,這與filter 過濾動作是恰恰相反的,來看看take的用法:

<code>Flux.range(1, 10).take(2).subscribe(System.out::println);
Flux.range(1, 10).takeLast(2).subscribe(System.out::println);
Flux.range(1, 10).takeWhile(i -> i < 5).subscribe(System.out::println);
Flux.range(1, 10).takeUntil(i -> i == 6).subscribe(System.out::println);/<code>

第一個take(2)指提取前面的兩個元素;第二個takeLast(2)指提取最後的兩個元素;第三個takeWhile(Predicate p)指提取滿足條件的元素,這裡是1-4第四個takeUtil(Predicate p)指一直提取直到滿足條件的元素出現為止,這裡是1-6

3. 轉換

使用map函數可以將流中的元素進行個體轉換,如下:

<code>Flux.range(1, 10).map(x -> x*x).subscribe(System.out::println);/<code>

這裡的map使用的JDK8 所定義的 Function接口

4. 合併

某些情況下我們需要對兩個流中的元素進行合併處理,這與合併兩個數組有點相似,但結合流的特點又會有不同的需求。

使用zipWith函數可以實現簡單的流元素合併處理:

<code>Flux.just("I", "You")
.zipWith(Flux.just("Win", "Lose"))
.subscribe(System.out::println);
Flux.just("I", "You")
.zipWith(Flux.just("Win", "Lose"),
(s1, s2) -> String.format("%s!%s!", s1, s2))
.subscribe(System.out::println);/<code>

上面的代碼輸出為:

<code>[I,Win]
[You,Lose]
I!Win!
You!Lose!/<code>

第一個zipWith輸出的是Tuple對象(不可變的元祖),第二個zipWith增加了一個BiFunction來實現合併計算,輸出的是字符串。

注意到zipWith是分別按照元素在流中的順序進行兩兩合併的,合併後的流長度則最短的流為準,遵循最短對齊原則。

用於實現合併的還有 combineLastest函數,combinLastest 會動態的將流中新產生元素(末位)進行合併,注意是隻要產生新元素都會觸發合併動作併產生一個結果元素,如下面的代碼:

<code>Flux.combineLatest(
Arrays::toString,
Flux.interval(Duration.of(0, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2),
Flux.interval(Duration.of(50, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2)
).toStream().forEach(System.out::println);/<code>

輸出為:

<code>[0, 0]
[1, 0]
[1, 1]/<code>

5. 合流

與合併比較類似的處理概念是合流,合流的不同之處就在於元素之間不會產生合併,最終流的元素個數(長度)是兩個源的個數之和。合流的計算可以使用 merge或mergeSequential 函數,這兩者的區別在於:

merge後的元素是按產生時間排序的,而mergeSequential 則是按整個流被訂閱的時間來排序,如下面的代碼:

<code>Flux.merge(Flux.interval(
Duration.of(0, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2),
Flux.interval(
Duration.of(50, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2))
.toStream()

.forEach(System.out::println);
System.out.println("---");
Flux.mergeSequential(Flux.interval(
Duration.of(0, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2),
Flux.interval(
Duration.of(50, ChronoUnit.MILLIS),
Duration.of(100, ChronoUnit.MILLIS)).take(2))
.toStream()
.forEach(System.out::println);
/<code>

輸出為:

<code>0
0
1
1
---
0
1
0
1/<code>

merge 是直接將Flux 元素進行合流之外,而flatMap則提供了更加高級的處理:flatMap 函數會先將Flux中的元素轉換為 Flux(流),然後再新產生的Flux進行合流處理,如下:

<code>Flux.just(1, 2)
.flatMap(x -> Flux.interval(Duration.of(x * 10, ChronoUnit.MILLIS),
Duration.of(10, ChronoUnit.MILLIS)).take(x))
.toStream()
.forEach(System.out::println);/<code>

flatMap也存在flatMapSequential的一個兄弟版本,後者決定了合併流元素的順序是與流的訂閱順序一致的。

6. 累積

reduce 和 reduceWith 操作符

對流中包含的所有元素進行累積操作,得到一個包含計算結果的 Mono 序列。累積操作是通過一個 BiFunction 來表示的。reduceWith 允許在在操作時指定一個起始值(與第一個元素進行運算)

如下面的代碼:

<code>Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);/<code>

這裡通過reduce計算出1-100的累加結果(1+2+3+...100),結果輸出為:

<code>5050
5150/<code>

四、異常處理

在前面所提及的這些功能基本都屬於正常的流處理,然而對於異常的捕獲以及採取一些修正手段也是同樣重要的。

利用Flux/Mono 框架可以很方便的做到這點。

將正常消息和錯誤消息分別打印

<code>Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);/<code>

當產生錯誤時默認返回0

<code>Flux.just(1, 2) 

.concatWith(Mono.error(new IllegalStateException()))
.onErrorReturn(0)
.subscribe(System.out::println);/<code>

自定義異常時的處理

<code>Flux.just(1, 2)
.concatWith(Mono.error(new IllegalArgumentException()))
.onErrorResume(e -> {
if (e instanceof IllegalStateException) {
return Mono.just(0);
} else if (e instanceof IllegalArgumentException) {
return Mono.just(-1);
}
return Mono.empty();
})
.subscribe(System.out::println);/<code>

當產生錯誤時重試

<code>Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.retry(1)
.subscribe(System.out::println);/<code>

這裡的retry(1)表示最多重試1次,而且重試將從訂閱的位置開始重新發送流事件

五、線程調度

我們說過,響應式是異步化的,那麼就會涉及到多線程的調度。

Reactor 提供了非常方便的調度器(Scheduler)工具方法,可以指定流的產生以及轉換(計算)發佈所採用的線程調度方式。這些方式包括:

類別描述immediate採用當前線程single單一可複用的線程elastic彈性可複用的線程池(IO型)parallel並行操作優化的線程池(CPU計算型)timer支持任務調度的線程池fromExecutorService自定義線程池

下面,以一個簡單的實例來演示不同的線程調度:

<code>Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
/<code>

在這段代碼中,使用publishOn指定了流發佈的調度器,subscribeOn則指定的是流產生的調度器。首先是parallel調度器進行流數據的生成,接著使用一個single單線程調度器進行發佈,此時經過第一個map轉換為另一個Flux流,其中的消息疊加了當前線程的名稱。最後進入的是一個elastic彈性調度器,再次進行一次同樣的map轉換。

最終,經過多層轉換後的輸出如下:

<code>[elastic-2] [single-1] parallel-1/<code>

小結

SpringBoot 2.x、Spring 5 對於響應式的Web編程(基於Reactor)都提供了全面的支持,藉助於框架的能力可以快速的完成一些簡單的響應式代碼開發。本文提供了較多 Reactor API的代碼樣例,旨在幫助讀者能快速的理解 響應式編程的概念及方式。

對於習慣了傳統編程範式的開發人員來說,熟練使用 Reactor 仍然需要一些思維上的轉變。就筆者的自身感覺來看,Reactor 存在一些學習和適應的成本,但一旦熟悉使用之後便能體會它的先進之處。 就如 JDK8 引入的Stream API之後,許多開發者則漸漸拋棄forEach的方式..

本身這就是一種生產效率的提升機會,何樂而不為? 更何況,從應用框架的發展前景來看,響應式的前景是明朗的。

參考閱讀

使用 Reactor 進行反應式編程https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html

Spring 5 的 WebFlux 開發介紹https://www.ibm.com/developerworks/cn/java/spring5-webflux-reactive/index.html


分享到:


相關文章: