什麼是反應式編程 (Reactive programming)?

理解反應式編程

什麼是反應式編程 (Reactive programming)?

你曾有過訂閱報紙或者雜誌的經歷嗎?互聯網的確從傳統的出版發行商那兒分得了一杯羹,但是過去訂閱報紙真的是我們瞭解時事的最佳方式。那時,我們每天早上都會收到一份新鮮出爐的報紙,並在早飯時間或上班路上閱讀。現在假設一下,在支付完訂閱費用之後,幾天的時間過去了,你卻沒有收到任何報紙。又過了幾天,你打電話給報社的銷售部門詢問為什麼還沒有收到報紙。

想象一下,如果他們告訴你:“因為你支付的是一整年的訂閱費用,而現在這一年還沒有結束,當這一年結束時,你肯定可以一次性完整地收到它們。”那麼你會有多麼驚訝。值得慶幸的是,這並非訂閱的真正運作方式。報紙具有一定的時效性。在出版後,報紙需要及時投遞,以確保在閱讀它們時內容仍然是新鮮的。此外,當你在閱讀最新一期的報紙時,記者們正在為未來的版本撰寫內容,同時印刷機正在滿速運轉,印刷下一期的內容——一切都是並行的。在開發應用程序代碼時,我們可以編寫兩種風格的代碼,即命令式和反應式。

•命令式(Imperative)的代碼:非常類似於上文所提的虛構的報紙訂閱方式。它由一組任務組成,每次只運行一項任務,每項任務又都依賴於前面的任務。數據會按批次進行處理,在前一項任務還沒有完成對當前數據批次的處理時,不能將這些數據遞交給下一項處理任務。

•反應式(Reactive)的代碼:非常類似於真實的報紙訂閱方式。它定義了一組用來處理數據的任務,但是這些任務可以並行地執行。每項任務處理數據的一部分子集,並將結果交給處理流程中的下一項任務,同時繼續處理數據的另一部分子集。

反應式編程簡介

What

反應式編程(Reactive programming,Rx)最初來源於函數式語言裡面的函數式反應編程(Functional Reactive programming,FRP)。後來隨著微軟.Net Framework增加了Reactive Extension而在主流語言中流行起來。

反應式編程是一種編程思想、編程方式,是為了簡化併發編程而出現的。與傳統的處理方式相比,它能夠基於數據流中的事件進行反應處理。例如:a+b=c的場景,在傳統編程方式下如果a、b發生變化,那麼我們需要重新計算a+b來得到c的新值。而反應式編程中,我們不需要重新計算,a、b的變化事件會觸發c的值自動更新。這種方式類似於我們在消息中間件中常見的發佈/訂閱模式。由流發佈事件,而我們的代碼邏輯作為訂閱方基於事件進行處理,並且是異步處理的。

反應式編程中,最基本的處理單元是事件流(事件流是不可變的,對流進行操作只會返回新的流)中的事件。流中的事件包括正常事件(對象代表的數據、數據流結束標識)和異常事件(異常對象,例如Exception)。同時,只有當訂閱者第一次發佈者,發佈者發佈的事件流才會被消費,後續的訂閱者只能從訂閱點開始消費,但是我們可以通過背壓、流控等方式控制消費。

常用的反應式編程實現類庫包括:Reactor、RxJava 2,、Akka Streams、Vert.x以及Ratpack。本文基於Reactor (由於Reactor有Spring背書,同時反應式編程已經集成於Java 9)。

反應式編程與Java8提供的Streams有眾多相似之處(尤其是API上),且提供了相互轉化的API。但是反應式編程更加強調異步非阻塞,通過onComplete等註冊監聽的方式避免阻塞,同時支持delay、interval等特性。而Streams本質上是對集合的並行處理,並不是非阻塞的。

Why

反應式編程的核心是基於事件流、無阻塞、異步的,使用反應式編程不需要編寫底層的併發、並行代碼。並且由於其聲明式編寫代碼的方式,使得異步代碼易讀且易維護。

How

基本概念

  • Flux,是Reactor中的一種發佈者,包含0到N個元素的異步序列。通過其提供的操作可以生成、轉換、編排序列。如果不觸發異常事件,Flux是無限的。
  • Mono,是Reactor中的一種發佈者,包含0或者1個的異步序列。可以用於類似於Runnable的場景。背壓(backpressure),由訂閱者聲明的、限定本消費者可處理的流中的元素個數。

操作

所有的流都是不可變的,所以對流的操作都會返回一個新的流。

創建(數據流模型)

just,根據參數創建數據流

never,創建一個不會發出任何數據的無限運行的數據流

empty,創建一個不包含任何數據的數據流,不會無限運行。

error,創建一個訂閱後立刻返回異常的數據流

concact,從多個Mono創建Flux

generate,同步、逐一的創建複雜流。重載方法支持生成狀態。在方法內部的lambda中通過調用next和complete、error來指定當前循環返回的流中的元素(並不是return)。

create,支持同步、異步、批量的生成流中的元素。

zip,將多個流合併為一個流,流中的元素一一對應

delay,Mono方法,用於指定流中的第一個元素產生的延遲時間

interval,Flux方法,用於指定流中各個元素產生時間的間隔(包括第一個元素產生時間的延遲),從0開始的Long對象組成的流

justOrEmpty,Mono方法,用於指定當初始化時的值為null時返回空的流

defaultIfEmpty,Mono方法,用於指定當流中元素為空時產生的默認值

range,生成一個範圍的Integer隊列

轉化(就是一些標準函數算子)

map,將流中的數據按照邏輯逐個映射為一個新的數據,當流是通過zip創建時,有一個元組入參,元組內元素代表zip前的各個流中的元素。

flatMap,將流中的數據按照邏輯逐個映射一個新的流,新的流之間是異步的。

take,從流中獲取N個元素,有多個擴展方法。

zipMap,將當前流和另一個流合併為一個流,兩個流中的元素一一對應。

mergeWith,將當前流和另一個流合併為一個流,兩個流中的元素按照生成順序合併,無對應關係。

join,將當前流和另一個流合併為一個流,流中的元素不是一一對應的關係,而是根據產生時間進行合併。

concactWith,將當前流和另一個流按聲明順序(不是元素的生成時間)鏈接在一起,保證第一個流消費完後再消費第二流

zipWith,將當前流和另一個流合併為一個新的流,這個流可以通過lambda表達式設定合併邏輯,並且流中元素一一對應

first,對於Mono返回多個流中,第一個產生元素的Mono。對於Flux,返回多個Flux流中第一個產生元素的Flux。

block,Mono和Flux中類似的方法,用於阻塞當前線程直到流中生成元素

toIterable,Flux方法,將Flux生成的元素返回一個迭代器

defer,Flux方法,用於從一個Lambda表達式獲取結果來生成Flux,這個Lambda一般是線程阻塞的

buffer相關方法,用於將流中的元素按照時間、邏輯規則分組為多個元素集合,並且這些元素集合組成一個元素類型為集合的新流。

window,與buffer類似,但是window返回的流中元素類型還是流,而不是buffer的集合。

filter,顧名思義,返回負責規則的元素組成的新流

reduce,用於將流中的各個元素與初始值(可以設置)逐一累積,最終得到一個Mono。

其他

doOnXXX,當流發生XXX時間時的回調方法,可以有多個,類似於監聽。XXX包括Subscribe、Next、Complete、Error等。

onErrorResume,設置流發生異常時返回的發佈者,此方法的lambda是異常對象

onErrorReturn,設置流發生異常時返回的元素,無法捕獲異常

then,返回Mono,跳過整個流的消費

ignoreElements,忽略整個流中的元素

subscribeOn,配合Scheduler使用,訂閱時的線程模型。

publisherOn,配合Scheduler使用,發佈時的線程模型。

retry,訂閱者重試次數

異步 Web 框架

異步的Web框架能夠以更少的線程獲得更高的可擴展性,通常它們只需要與CPU核心數量相同的線程。通過使用所謂的事件輪詢(event looping)機制(如圖11.1所示),這些框架能夠用一個線程處理很多請求,這樣每次連接的成本會更低。

什麼是反應式編程 (Reactive programming)?

在事件輪詢中,所有事情都是以事件的方式來進行處理的,包括請求以及密集型操作(如數據庫和網絡操作)的回調。當需要執行成本高昂的操作時,事件輪詢會為該操作註冊一個回調,這樣操作可以並行執行,而事件輪詢則會繼續處理其他的事件。當操作完成時,事件輪詢機制會將其作為一個事件,這一點與請求是相同的。這樣達到的效果就是,在面臨大量負載的時候,異步Web框架能夠以更少的線程實現更好的可擴展性,這樣會減少線程管理的開銷。Spring 5引入了一個非阻塞、異步的Web框架,該框架在很大程度上是基於Reactor項目的,能夠解決Web應用和API中對更好的可擴展性的需求。接下來我們看一下Spring WebFlux:面向Spring的反應式Web框架。

當Spring團隊思考如何向Web層添加反應式編程模型時,如果不在Spring MVC中做大量工作,顯然很難實現這一點。這會在代碼中產生分支以決定是否要以反應式的方式來處理請求。如果這樣做,本質上就是將兩個Web框架打包成一個,依靠if語句來區分反應式和非反應式。與其將反應式編程模型硬塞進Spring MVC中,還不如創建一個單獨的反應式Web框架,並儘可能多地借鑑Spring MVC。這樣,Spring WebFlux就應運而生了。Spring 5定義的完整Web開發技術棧如圖11.2所示。

什麼是反應式編程 (Reactive programming)?

在圖11.2的左側,我們會看到Spring MVC技術棧,這是Spring框架2.5版本就引入的。SpringMVC 建立在Java Servlet API之上,因此需要Servlet容器(比如Tomcat)才能執行。

與之不同,Spring WebFlux(在圖11.2的右側,和Spring MVC系出同門,並且很多核心組件都是公用的)並不會綁定Servlet API,所以它構建在Reactive HTTP API之上,這個API與ServletAPI具有相同的功能,只不過是採用了反應式的方式。因為Spring WebFlux沒有與Servlet API耦合,所以它的運行並不需要Servlet容器。它可以運行在任意非阻塞Web容器中,包括Netty、Undertow、Tomcat、Jetty或任意Servlet 3.1及以上的容器。

在圖11.2中,最值得注意的是左上角,它代表了Spring MVC和Spring WebFlux公用的組件,主要用來定義控制器的註解。因為Spring MVC和Spring WebFlux會使用相同的註解,所以SpringWebFlux與Spring MVC在很多方面並沒有區別。右上角的方框表示另一種編程模型,它使用函數式編程範式來定義控制器,而不是使用註解。

Spring MVC和Spring WebFlux之間最顯著的區別在於函數式Web編程模型。

什麼是反應式編程 (Reactive programming)?

在使用Spring WebFlux時,我們需要添加Spring Boot WebFlux starter依賴項.

反應式宣言(The Reactive Manifesto)

反應式系統是:

響應:該系統及時響應,如果在所有可能的。響應能力是可用性和實用性的基石,但更重要的是,響應能力意味著可以快速發現問題並進行有效處理。響應系統專注於提供快速且一致的響應時間,建立可靠的上限,以便它們提供一致的服務質量。這種一致的行為又簡化了錯誤處理,建立了最終用戶的信心,並鼓勵了進一步的交互。

彈性:面對故障時,系統保持響應能力。這不僅適用於高可用性,關鍵任務系統,任何非彈性的系統在發生故障後都將無響應。彈性是通過複製,遏制,隔離和委派實現的。故障包含在每個組件中,使組件彼此隔離,從而確保系統的各個部分可以發生故障並可以恢復而不會損害整個系統。每個組件的恢復都委派給另一個(外部)組件,並在必要時通過複製來確保高可用性。組件的客戶端不承擔處理其故障的負擔。

彈性:系統在變化的工作負載下保持響應能力。無功系統可以通過增加或減少分配給這些輸入的資源來對輸入速率的變化做出反應。這意味著沒有爭用點或中央瓶頸的設計,從而具有分片或複製組件並在其中分配輸入的能力。反應性系統通過提供相關的實時性能指標來支持預測性和反應性縮放算法。它們在商品硬件和軟件平臺上以經濟高效的方式實現了彈性。

什麼是反應式編程 (Reactive programming)?

消息驅動:響應式系統依靠異步 消息傳遞在組件之間建立邊界,以確保鬆散的耦合,隔離和位置透明性。此邊界還提供了將故障委派為消息的方法。通過使用顯式消息傳遞,可以通過成形和監視系統中的消息隊列並在必要時施加背壓來實現負載管理,彈性和流量控制。位置透明消息傳遞作為一種通信手段,使得故障管理有可能在整個集群或單個主機內以相同的構造和語義進行工作。不阻塞通信允許接收者僅在活動狀態下消耗資源,從而減少了系統開銷。

大型系統由較小的系統組成,因此取決於其組成部分的反應性。這意味著反應式系統將應用設計原則,以便這些屬性可應用於所有級別的規模,從而使其可組合。世界上最大的系統依賴於基於這些屬性的體系結構,每天滿足數十億人的需求。現在是時候從一開始就有意識地應用這些設計原則,而不是每次都重新發現它們。

英文閱讀能力提升:

Reactive Systems are:

Reactive Systems are:

Responsive: The system responds in a timely manner if at all possible. Responsiveness is the cornerstone of usability and utility, but more than that, responsiveness means that problems may be detected quickly and dealt with effectively. Responsive systems focus on providing rapid and consistent response times, establishing reliable upper bounds so they deliver a consistent quality of service. This consistent behaviour in turn simplifies error handling, builds end user confidence, and encourages further interaction.

Resilient: The system stays responsive in the face of failure. This applies not only to highly-available, mission-critical systems — any system that is not resilient will be unresponsive after a failure. Resilience is achieved by replication, containment, isolation and delegation. Failures are contained within each component, isolating components from each other and thereby ensuring that parts of the system can fail and recover without compromising the system as a whole. Recovery of each component is delegated to another (external) component and high-availability is ensured by replication where necessary. The client of a component is not burdened with handling its failures.

Elastic: The system stays responsive under varying workload. Reactive Systems can react to changes in the input rate by increasing or decreasing the resources allocated to service these inputs. This implies designs that have no contention points or central bottlenecks, resulting in the ability to shard or replicate components and distribute inputs among them. Reactive Systems support predictive, as well as Reactive, scaling algorithms by providing relevant live performance measures. They achieve elasticity in a cost-effective way on commodity hardware and software platforms.

Message Driven: Reactive Systems rely on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation and location transparency. This boundary also provides the means to delegate failures as messages. Employing explicit message-passing enables load management, elasticity, and flow control by shaping and monitoring the message queues in the system and applying back-pressure when necessary. Location transparent messaging as a means of communication makes it possible for the management of failure to work with the same constructs and semantics across a cluster or within a single host. Non-blocking communication allows recipients to only consume resources while active, leading to less system overhead.

Large systems are composed of smaller ones and therefore depend on the Reactive properties of their constituents. This means that Reactive Systems apply design principles so these properties apply at all levels of scale, making them composable. The largest systems in the world rely upon architectures based on these properties and serve the needs of billions of people daily. It is time to apply these design principles consciously from the start instead of rediscovering them each time.


分享到:


相關文章: