Spring Reactor 入門與實踐

適合閱讀的人群:本文適合對 Spring、Netty 等框架,以及 Java 8 的 Lambda、Stream 等特性有基本認識,希望瞭解 Spring 5 的反應式編程特性的技術人員閱讀。

一、前言

最近幾年,隨著 Node.js、Golang 等新技術、新語言的出現,Java 的服務器端開發語言老大的地位受到了不小的挑戰。雖然,Java 的市場份額依舊很大,短時間內也不會改變,但 Java 社區對於挑戰也並沒有無動於衷。相反,Java 社區積極應對這些挑戰,不斷提高自身應對高併發服務器端開發場景的能力。

為了應對高併發的服務器端開發,在2009年的時候,微軟提出了一個更優雅地實現異步編程的方式 —— Reactive Programming,中文稱反應式編程。隨後,其它技術也迅速地跟上了腳步,像 ES6 通過 Promise 引入了類似的異步編程方式。Java 社區也沒有落後很多,Netflix 和 TypeSafe 公司提供了 RxJava 和 Akka Stream 技術,讓 Java 平臺也有了能夠實現反應式編程的框架。

其實,在更早之前,像 Mina 和 Netty 這樣的 NIO 框架其實也能搞定高併發的服務器端開發任務,但這樣的技術相對來說只是少數高級開發人員手中的工具。對於更多的普通開發者來說,難度顯得大了些,所以不容易普及。

很多年過去了,到了2017年,雖然已經有不少公司在實踐反應式編程。但整體來說,應用範圍依舊不大。原因在於缺少簡單易用的技術將反應式編程推廣普及,並同諸如 MVC 框架、HTTP 客戶端、數據庫技術等整合。

終於,在2017年9月28日,解決上面問題的利器浮出水面 —— Spring 5 正式發佈。Spring 5 其最大的意義就是能將反應式編程技術的普及向前推進一大步。而作為在背後支持 Spring 5 反應式編程的框架 Reactor,也相應的發佈了 3.1.0 版本。

Spring Reactor 入門與實踐

本文接下來將會向大家介紹 Reactive Programming(反應式編程)、Reactor 的入門以及實踐技巧等相關的內容。文章中的實踐內容來自作者使用 Spring 5 和 Reactor 等技術改造實際項目的經歷。

二、Reactor 簡介

先介紹一下 Reactor 技術。Reactor 框架是 Pivotal 公司(開發 Spring 等技術的公司)開發的,實現了 Reactive Programming 思想,符合 Reactive Streams 規範(Reactive Streams 是由 Netflix、TypeSafe、Pivotal 等公司發起的)的一項技術。其名字有反應堆之意,反映了其背後的強大的性能。

Reactive Programming

Reactive Programming,中文稱反應式編程,是一種高性能應用的編程方式。其最早是由微軟提出並引入到 .NET 平臺中,隨後 ES6 也引入了類似的技術。在 Java 平臺上,較早採用反應式編程技術的是 Netflix 公司開源的 RxJava 框架。現在大家比較熟知的 Hystrix 就是以 RxJava 為基礎開發的。

反應式編程其實並不神秘,通過與我們熟悉的迭代器模式對比便可瞭解其基本思想:

event Iterable (pull) Observable (push) retrieve data T next() onNext(T) discover error throws Exception onError(Exception) complete !hasNext() onCompleted()

上面表格的中的 Observable 那一列便代表反應式編程的 API 使用方式。可見,它就是常見的觀察者模式的一種延伸。如果將迭代器看作是拉模式,那觀測者模式便是推模式。被訂閱者(Publisher)主動的推送數據給訂閱者(Subscriber),觸發 onNext 方法。異常和完成時觸發另外兩個方法。如果 Publisher 發佈消息太快了,超過了 Subscriber 的處理速度,那怎麼辦。這就是 Backpressure 的由來,Reactive Programming 框架需要提供機制,使得 Subscriber 能夠控制消費消息的速度。

在 Java 平臺上,Netflix(開發了 RxJava)、TypeSafe(開發了 Scala、Akka)、Pivatol(開發了 Spring、Reactor)共同制定了一個被稱為 Reactive Streams 項目(規範),用於制定反應式編程相關的規範以及接口。其主要的接口有這三個:

  • Publisher
  • Subscriber
  • Subcription

其中,Subcriber 中便包含了上面表格提到的 onNext、onError、onCompleted 這三個方法。

對於 Reactive Streams,大家只需要理解其思想就可以,包括基本思想以及 Backpressure 等思想即可。

Imperative vs Reactive

對於上面表格裡提到的 Iterable 和 Observale 兩種風格,還有另一個稱呼,便是 Imperative(指令式編程)和 Reactive(反應式編程)這兩種風格。其實就是拉模型和推模型的另一種表述,大家理解其中的思想即可。對於 Imperative,老外寫的文章有時會用,直譯就是指令式編程,其實就是我們大家平時用 Java、Python 等語言寫代碼的常見風格,代碼執行順序和編寫順序基本一致(這裡不考慮 JVM 指令重排)

Reactor 的主要模塊

Reactor 框架主要有兩個主要的模塊:reactor-core 和 reactor-ipc。前者主要負責 Reactive Programming 相關的核心 API 的實現,後者負責高性能網絡通信的實現,目前是基於 Netty 實現的。

Reactor 的主要類

在 Reactor 中,經常使用的類並不是很多,主要有以下兩個:

  • Mono 實現了 org.reactivestreams.Publisher 接口,代表0到1個元素的發佈者。
  • Flux 同樣實現了 org.reactivestreams.Publisher 接口,代表0到N個元素的發表者。

可能會使用到的類

  • Scheduler 表示背後驅動反應式流的調度器,通常由各種線程池實現。

Web Flux

Spring Reactor 入門與實踐

Spring 5 引入的一個基於 Netty 而不是 Servlet 的高性能的 Web 框架,但是使用方式並沒有同傳統的基於 Servlet 的 Spring MVC 有什麼大的不同。

▼ Web Flux 中 MVC 接口的示例

<code>

@RequestMapping

(

"/demo"

)

@RestController

public class DemoController {

@RequestMapping

(value =

"/foobar"

) public Mono foobar() {

return

Mono

.just

(new Foobar()); } } /<code>

最大的變化就是返回值從 Foobar 所表示的一個對象變為 Mono (或 Flux)

當然,實際的程序並不會像示例那樣就一行代碼。關於如何開發實際的應用,這些正是後面介紹 Reactor 的部分所要詳細敘述的。

Reactive Streams、Reactor 和 Web Flux

上面介紹了反應式編程的一些概念,以及 Reactor 和 Web Flux。可能讀者看到這裡有些亂。這裡介紹一下三者的關係。其實很簡單:

Reactive Streams 是規範,Reactor 實現了 Reactive Streams。Web Flux 以 Reactor 為基礎,實現 Web 領域的反應式編程框架。

其實,對於大部分業務開發人員來說,當編寫反應式代碼時,我們通常只會接觸到 Publisher 這個接口,對應到 Reactor 便是 Mono 和 Flux。對於 Subscriber 和 Subcription 這兩個接口,Reactor 必然也有相應的實現。但是,這些都是 Web Flux 和 Spring Data Reactive 這樣的框架用到的。如果不開發中間件,通常開發人員是不會接觸到的。

比如,在 Web Flux,你的方法只需返回 Mono 或 Flux 即可。你的代碼基本也只和 Mono 或 Flux 打交道。而 Web Flux 則會實現 Subscriber ,onNext 時將業務開發人員編寫的 Mono 或 Flux 轉換為 HTTP Response 返回給客戶端。

三、Reactor 入門

接下來介紹一下 Reactor 中 Mono 和 Flux 這兩個類中的主要方法的使用。

如同 Java 8 所引入的 Stream 一樣,Reactor 的使用方式基本上也是分三步:開始階段的創建、中間階段的處理和最終階段的消費。只不過創建和消費可能是通過像 Spring 5 這樣框架完成的(比如通過 Web Flux 中的 WebClient 調用 HTTP 接口,返回值便是一個 Mono)。但我們還是需要基本瞭解這些階段的開發方式。

1. 創建 Mono 和 Flux(開始階段)

使用 Reactor 編程的開始必然是先創建出 Mono 或 Flux。有些時候不需要我們自己創建,而是實現例如 WebFlux 中的 WebClient 或 Spring Data Reactive 得到一個 Mono 或 Flux。

▼ 使用 WebFlux WebClient 調用 HTTP 接口

<code>WebClient webClient = WebClient.create(

"http://localhost:8080"

);

public

Mono findById(

Long

userId) {

return

webClient .

get

() .uri(

"/users/"

+ userId) .accept(MediaType.APPLICATION_JSON) .exchange() .flatMap(cr -> cr.bodyToMono(User

.

class

));

} /<code>

▼ 使用 ReactiveMongoRepository 查詢 User

<code>

public

interface

UserRepository

extends

ReactiveMongoRepository

<

User

,

Long

>

{

Mono

findByUsername

(String username)

; } /<code>

但有些時候,我們也需要主動地創建一個 Mono 或 Flux。

“普通”的創建方式

簡單的創建方式是主要是使用像 just 這樣的方法創建

<code>Mono<

String

> helloWorld = Mono.just(

"Hello World"

); Flux<

String

> fewWords = Flux.just(

"Hello"

,

"World"

); Flux<

String

> manyWords = Flux.fromIterable(words); /<code>

這樣的創建方式在什麼時候用呢?一般是用在當你在經過一系列非 IO 型的操作之後,得到了一個對象。接下來要基於這個對象運用 Reactor 進行高性能的 IO 操作時,可以用這種方式將你之前得到的對象轉換為 Mono 或 Flux。

“文藝”的創建方式

上述是我們通過一個同步調用得到的結果創建出 Mono 或 Flux,但有時我們需要從一個非 Reactive 的異步調用的結果創建出 Mono 或 Flux。那如何實現呢。

如果這個異步方法返回一個 CompletableFuture,那我們可以基於這個 CompletableFuture 創建一個 Mono:

<code>

Mono

.fromFuture

(

aCompletableFuture

); /<code>

如果這個異步調用不會返回 CompletableFuture,是有自己的回調方法,那怎麼創建 Mono 呢?我們可以使用 static Mono create(Consumer> callback) 方法:

<code>Mono.create(sink -> {
    ListenableFuture> entity = asyncRestTemplate.getForEntity(url, String

.

class

)

; entity.addCallback(

new

ListenableFutureCallback>() {

public

void

onFailure

(Throwable ex)

{ sink.error(ex); }

public

void

onSuccess

(ResponseEntity result)

{ sink.success(result.getBody()); } }); }); /<code>

在使用 WebFlux 之後,AsyncRestTemplate 已經不推薦使用,這裡只是做演示。

2. 處理 Mono 和 Flux(中間階段)

中間階段的 Mono 和 Flux 的方法主要有 filter、map、flatMap、then、zip、reduce 等。這些方法使用方法和 Stream 中的方法類似。對於這些方法的介紹,將會放在下一節“Reactor 進階”中,主要介紹這些方法不容易理解和使用容易出問題的點。

3. 消費 Mono 和 Flux(結束階段)

直接消費的 Mono 或 Flux 的方式就是調用 subscribe 方法。如果在 Web Flux 接口中開發,直接返回 Mono 或 Flux 即可。Web Flux 框架會為我們完成最後的 Response 輸出工作。

四、Reactor 進階

接下來我將介紹一下我在使用 Reactor 開發實際項目時遇到的一些稍顯複雜的問題,以及解決方法。

問題一:map、flatMap 和 then 分別在什麼時候使用?

本段內容將涉及到如下類和方法:

  • 方法:Mono.map
  • 方法:Mono.flatMap
  • 方法:Mono.then
  • 類:Function

在 Mono 和 Flux 中間環節處理的處理過程中,有三個有些類似的方法:map、flatMap 和 then。這三個方法可以說是 Reactor 中使用頻率很高的方法。

▼ 傳統的命令式編程

<code> 

Object

result1 = doStep1(params);

Object

result2 = doStep2(result1);

Object

result3 = doStep3(result2); /<code>

▼ 對應的反應式編程

<code>

Mono

.just

(

params

)

.flatMap

(

v

-

>

doStep1

(

v

))

.flatMap

(

v

-

>

doStep2

(

v

))

.flatMap

(

v

-

>

doStep3

(

v

)); /<code>

從上面兩段代碼的對比就很容易看出來 flatMap 方法在其中起到的作用,map 和 then 方法也有類似的作用。但這些方法之間的區別是什麼呢?我們先來看看這三個方法的簽名(以 Mono 為例):

  • flatMap(Function super T, ? extends Mono extends R>> transformer)
  • map(Function super T, ? extends R> mapper)
  • then(Mono other)

可見,最複雜的是 flatMap 方法,map 次之,then 最簡單。從方法名字上看,flatMap 和 map 都是做映射之用。而 then 則是下一步的意思,最適合用於鏈式調用,但為什麼上面的例子使用的是 flatMap 而不是 then?

then 表面看上去是下一步的意思,但它只表示執行順序的下一步,不表示下一步依賴於上一步。這個語義同 ES6 Promise 中的 then 方法是不同的。從 then 方法的參數只是一個 Mono,無從接受上一步的執行結果。而 flatMap 和 map 的參數都是一個 Function。入參是上一步的執行結果。

而 flatMap 和 map 的區別在於,flatMap 中的入參 Function 的返回值要求是一個 Mono(不明白的複習一下 Function 接口的定義),而 map 的入參 Function 只要求返回一個普通對象。因為我們在業務處理中常需要調用 WebClient 或 ReactiveXxxRepository 中的方法,這些方法的返回值都是 Mono(或 Flux)。所以要將這些調用串聯為一個整體鏈式調用,就必須使用 flatMap,而不是 map。

所以,我們要正確理解 flatMap、map 和 then 這三個方法的用法和背後的含義,這樣才能正確實踐反應式編程。

問題二:如何實現併發執行

本段內容將涉及到如下類和方法:

  • 方法:Mono.zip
  • 類:Tuple2
  • 類:BiFunction

併發執行是常見的一個需求。Reactive Programming 雖然是一種異步編程方式,但是異步不代表就是併發並行的。

在傳統的命令式開發方式中,併發執行是通過線程池加 Future 的方式實現的。

<code>Future result1Future = doStep1(

params

); Future result2Future = doStep2(

params

); Result1 result1 = result1Future.

get

(); Result2 result2 = result2Future.

get

();

return

mergeResult; /<code>

因為上面的代碼雖然有一些異步效果在裡面,但 Future.get() 方法是阻塞的。所以,當我們使用 Reactor 開發有併發執行場景的反應式代碼時,肯定不能用上面的方式。這時,需要使用到 Mono 和 Flux 中的 zip 方法。這裡我們以 Mono 為例演示。代碼如下:

<code>Mono item1Mono = ...;
Mono item2Mono = ...;
Mono.zip(items -> {
    CustomType1 item1 = CustomType1

.

class

.

cast

(items[

0

]); CustomType2 item2 = CustomType2

.

class

.

cast

(items[

1

]);

return

mergeResult; }, item1Mono, item2Mono); /<code>

上述代碼中,產生 item1Mono 和 item2Mono 的過程是並行的。比如,調用一個 HTTP 接口的同時,執行一個數據庫查詢操作。這樣就可以加快程序的執行。

但上述代碼存在一個問題,就是 zip 方法需要做強制類型轉換。而強制類型轉換是不安全的。所以我們需要更優雅的方式。

好在 zip 方法存在多種重載形式。除了最基本的形式以外,還有多種類型安全的形式:

<code>

static

Mono> zip(Mono

extends

T1> p1, Mono

extends

T2> p2);

static

Mono zip(Mono

extends

T1> p1, Mono

extends

T2> p2, BiFunction

super

T1, ?

super

T2, ?

extends

O> combinator);

static

Mono> zip(Mono

extends

T1> p1, Mono

extends

T2> p2, Mono

extends

T3> p3); /<code>

對於不超過7個元素的合併操作,都有類型安全的 zip 方法可選。

以兩個元素的合併為例,介紹一下使用方法:

<code>

Mono

.

zip

(item1Mono, item2Mono).

map

(tuple -> {

CustomType1

item1 = tuple.getT1();

CustomType2

item2 = tuple.getT2();

return

mergeResult; }); /<code>

上述代碼中,map 方法的參數是一個 Tuple2,表示一個二元數組,相應的還有 Tuple3、Tuple4 等。

另外,對於兩個元素的併發執行,也可以通過 zip(Mono extends T1> p1, Mono extends T2> p2, BiFunction super T1, ? super T2, ? extends O> combinator) 方法直接將結果合併。方法是傳遞 BiFunction 實現合併算法。

問題三:集合循環之後的匯聚

本段內容將涉及到如下類和方法:

  • 方法:Flux.fromIterable
  • 方法:Flux.reduce
  • 類:BiFunction

另外一個稍微複雜的場景是對一個對象中的一個類型為集合類的(List、Set)進行處理之後,再對原本的對象進行處理。使用 Imperative 風格的代碼很容易編寫:

<code>List subDataList = 

data

.getSubDataList();

for

(SubData item : subDataList) { } /<code>

是不是簡單到無以復加的地步了。但當我們要用 Reactive 風格的代碼實現上述邏輯時,就不是那麼簡單了。

要在 Reactive 風格的代碼中實現上述邏輯,我們主要是要用到 Flux 的 reduce 方法。我們先來看 reduce 方法的簽名:

<code>

<

A

>

Mono

<

A

>

reduce(A initial, BiFunction

<

A,

?

super

T

,

A

>

accumulator); /<code>

從方法簽名我們可以看出 reduce 方法的功能就是講一個 Flux 聚合成一個 Mono。參數中第一個參數是返回值 Mono 中元素的初始值。

第二個參數是一個 BiFunction,用來實現聚合操作的邏輯。泛型參數 中,第一個 A 表示每次聚合操作(因為需要對集合中每個元素進行操作)之後的結果的類型,它作為 BiFunction.apply 方法的第一個入參 ;? super T 表示集合中的每個元素,它作為 BiFunction.apply 方法的第二個入參;最後一個 A 表示聚合操作的結果,它作為 BiFunction.apply 方法的返回值。

接下來看一下示例:

<code>Data initData = ...;
List aList = ...;
Flux.fromIterable(aList)
    .reduce(initData, 

(data, itemInList)

->

{

//

Do something

on

data

and

itemInList

return

data; }); /<code>

上面的示例代碼中,initData 和 data 的類型相同,我們,但是命名不能重複。執行完上述代碼之後, reduce 方法會返回 Mono。

五、小結

本文介紹了反應式編程的一些概念和 Spring 的 Reactor 框架的基本用法,還介紹瞭如何用 Reactor 解決一些稍微複雜一點的問題。文章中的這些示例都來自我使用 Reactor 改造真實項目的實踐過程,因為精力和時間有限,上面的例子還有很多侷限性。但是希望本文能起到拋磚引玉的作用,讓我們能看到更多關於反應式編程和 Reactor 使用方面的實踐分享。

上面的示例為了簡單清晰,都是直接調用 Reactor 中相關的方法。但這裡給大家一個建議,就是實踐 Reactive 編程,更需要不斷地使用像提取方法這樣的手段進行重構以提高代碼可讀性,否則項目代碼的可讀性只會比傳統風格的代碼低。

除了代碼可讀性方面的問題,實踐反應式編程還有很多很多問題需要考慮。比如引入像 Netty 這樣的 NIO 框架所帶來的技術複雜度、單元測試的難度提升、和其它框架技術的整合等等。所以,對於像反應式編程這樣的新技術,我們一方面要積極探索,另一方面也要評估它所帶來的技術風險和它所帶來的價值對於你的項目是否匹配。

當然,整體而言,因為能帶來更高的吞吐量,所以反應式編程的前景是非常光明。可以說,未來高性能的 Java Web 應用基本上都是反應式編程技術的天下,也是 Java 平臺對抗 Golang、Node.js 等新技術的利器。


分享到:


相關文章: