朱曄和你聊Spring系列S1E5:Spring WebFlux小探

本文會來做一些應用對比Spring MVC和Spring WebFlux,觀察線程模型的區別,然後做一下簡單的壓力測試。

創建一個傳統的Spring MVC應用

先來創建一個新的webflux-mvc的模塊:


xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
me.josephzhu
spring101-webflux-mvc
0.0.1-SNAPSHOT
jar
spring101-webflux-mvc


me.josephzhu
spring101
0.0.1-SNAPSHOT



org.springframework.boot

spring-boot-starter-data-mongodb


org.springframework.boot
spring-boot-starter-web





org.springframework.boot
spring-boot-maven-plugin




複製代碼

然後在項目裡定義一個我們會使用到的POJO:

package me.josephzhu.spring101webfluxmvc;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;

import org.springframework.data.mongodb.core.mapping.Document;
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "mydata")
public class MyData {
@Id
private String id;
private String payload;
private long time;
}
複製代碼

這裡的@Document和@Id是為Mongodb服務的,我們定義了MyData將會以mydata作為Collection的名字,然後id字段是Document的Id列。 然後我們來創建Controller,在這個Controller裡面我們嘗試三種不同的操作:

  1. Sleep 100ms的純獲取數據的方法。從請求中獲得length參數作為payload字符串的長度,從請求中獲得size參數作為MyData的個數。我們在之後的測試過程中可以隨意調節這兩個參數來調整我們的數據量。
  2. 從Mongodb獲取數據的方法,獲取到數據後直接返回。
  3. 複合邏輯。先走HTTP請求從data方法獲取數據,然後把數據保存進入Mongodb,最後返回這些數據。
package me.josephzhu.spring101webfluxmvc;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;

import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@RestController
public class MyController {
@Autowired
private RestTemplate restTemplate;
@Autowired
private MyRepository myRepository;
@GetMapping("/data")
public List getData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
return IntStream.rangeClosed(1, size)
.mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))
.collect(Collectors.toList());
}
@GetMapping("/dbData")
public List getDbData() {
return myRepository.findAll();
}
@GetMapping("/saveData")
public List saveData(@RequestParam(value = "size", defaultValue = "10") int size,@RequestParam(value = "length", defaultValue = "100") int length){
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl("http://localhost:8080/data")
.queryParam("size", size)
.queryParam("length", length);
ResponseEntity> responseEntity =
restTemplate.exchange(builder.toUriString(),
HttpMethod.GET, null, new ParameterizedTypeReference>() {});
return responseEntity.getBody().stream().map(myRepository::save).collect(Collectors.toList());
}
}
複製代碼

注意,在這裡我們使用了Java 8的Steam來做一些操作避免使用for循環:

  1. 通過length參數構建payload(payload由length個字符a構成)。
  2. 通過size參數構建MyData的List。
  3. 在RestTemplate獲取到MyData的List後,把每一個對象交由myRepository的save方法來處理,然後統一收集返回結果。 這些Stream的代碼都是同步處理,也不涉及外部IO,和非阻塞沒有任何關係,只是方便代碼編寫。為了讓代碼可以運行,我們還需要繼續來配置下Mongodb的Repository:
package me.josephzhu.spring101webfluxmvc;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface MyRepository extends MongoRepository { }
複製代碼

因為我們沒有用到複雜的查詢,在代碼裡只是用到了findAll方法,所以這裡我們無需定義額外的方法,只是聲明接口即可。 最後,我們創建主應用程序,順便配置一下Mongodb和RestTemplate:

package me.josephzhu.spring101webfluxmvc;
import com.mongodb.MongoClientOptions;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
@SpringBootApplication
@Configuration
public class Spring101WebfluxMvcApplication {
@Bean
MongoClientOptions mongoClientOptions(){
return MongoClientOptions.builder().connectionsPerHost(1000).build();
}
@Bean
public RestTemplate restTemplate(RestTemplateBuilder builder) {
return builder.build();
}
public static void main(String[] args) {
SpringApplication.run(Spring101WebfluxMvcApplication.class, args);
}
}
複製代碼

這裡我們配置了Mongodb客戶端使得之後在進行壓力測試的時候能有超過100個連接連接到Mongodb,否則會出現無法獲取連接的問題。

創建WebFlux版本的應用

現在我們再來新建一個webflux模塊:


xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
me.josephzhu
spring101-webflux
0.0.1-SNAPSHOT

jar
spring101-webflux


me.josephzhu
spring101
0.0.1-SNAPSHOT



org.springframework.boot
spring-boot-starter-data-mongodb-reactive


org.springframework.boot
spring-boot-starter-webflux



io.projectreactor
reactor-test
test





org.springframework.boot
spring-boot-maven-plugin




複製代碼

這裡可以注意到,我們引入了webflux這個starter以及data-mongodb-reactive這個starter。在之前的Spring MVC項目中,我們引入的是mvc和data-mongodb兩個starter。 然後,我們同樣需要創建一下MyData類(代碼和之前一模一樣,這裡省略)。 最關鍵的一步,我們來創建三個Controller方法的定義:

package me.josephzhu.spring101webflux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
@Component
public class MyHandler {
@Autowired
private MyReactiveRepository myReactiveRepository;
public Mono getData(ServerRequest serverRequest) {
int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));
String payload = IntStream.rangeClosed(1,length).mapToObj(i->"a").collect(Collectors.joining());
Flux data = Flux.fromStream(IntStream.rangeClosed(1, size)
.mapToObj(i->new MyData(UUID.randomUUID().toString(), payload, System.currentTimeMillis()))).delaySequence(Duration.ofMillis(100));
return ok()
.contentType(MediaType.APPLICATION_JSON)
.body(data, MyData.class);
}
public Mono getDbData(ServerRequest serverRequest) {
Flux data = myReactiveRepository.findAll();
return ok()
.contentType(MediaType.APPLICATION_JSON)
.body(data, MyData.class);
}
public Mono saveData(ServerRequest serverRequest) {
int size = Integer.parseInt(serverRequest.queryParam("size").orElse("10"));
int length = Integer.parseInt(serverRequest.queryParam("length").orElse("100"));
Flux data = WebClient.create().get()
.uri(builder -> builder
.scheme("http")
.host("localhost")
.port(8080)
.path("data")
.queryParam("size", size)
.queryParam("length", length)
.build())
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToFlux(MyData.class)
.flatMap(myReactiveRepository::save);
return ok()

.contentType(MediaType.APPLICATION_JSON)
.body(data, MyData.class);
}
}
複製代碼

這裡要說明幾點:

  1. 在WebFlux中,我們可以採用傳統的@Controller方式來定義Controller,也可以採用函數式方式來聲明對外的Endpoint,也就是聲明Handler+Router。我們這裡採用的是更有特色的後者來演示。
  2. 請你比較一下三個方法的實現對於兩個版本的區別。最主要的區別,我們返回的實際數據是Mono<>和Flux<>,分別代表0~1個對象和0~N對象的響應式流。
  3. 在saveData方法中,對於Spring MVC我們使用的是阻塞的RestTemplate來從遠端獲取數據,對於Spring WebFlux我們使用的是非阻塞的WebClient來獲取數據。獲取數據後,我們直接使用flatMap獲取到了所有的MyData轉給我們的響應式的Mongodb Repository來處理數據。
  4. 對於saveData方法中插入Mongodb的操作,這裡和MVC的例子有很大的不同需要注意。在MVC中,我們把遠程服務返回的結果轉為Stream數據流,同步依次調用save方法,整個過程只會有佔用一個Mongodb的連接。而在這裡,直接對Flux流進行了Map,整個過程相當於併發進行了Mongodb的調用。在之後做壓測的時候,我們會再次提到這點。 剛才有提到,採用函數式聲明對外的Endpoint的話除了定義Handler,還需要配置Router來和Handler關聯,配置如下:
package me.josephzhu.spring101webflux;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class RouterConfig {
@Autowired
private MyHandler myHandler;
@Bean
public RouterFunction config() {
return route(GET("/data"), myHandler::getData)
.andRoute(GET("/dbData"), myHandler::getDbData)
.andRoute(GET("/saveData"), myHandler::saveData);
}
}
複製代碼

這段代碼沒有太多需要說明,這裡我們定義了三個GET請求(相當於MVC的@GetMapping),然後對應到注入的myHandler的三個方法上。 然後我們還需要創建Mongodb的Repository:

package me.josephzhu.spring101webflux;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface MyReactiveRepository extends ReactiveMongoRepository { }
複製代碼

以及配置和啟動類:

package me.josephzhu.spring101webflux;
import com.mongodb.ConnectionString;
import com.mongodb.async.client.MongoClientSettings;

import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@Configuration
public class Spring101WebfluxApplication {
@Bean
MongoClient mongoClient(){
return MongoClients.create(mongoClientSettings());
}
@Bean
MongoClientSettings mongoClientSettings(){
return MongoClientSettings.builder()
.clusterSettings(ClusterSettings.builder().applyConnectionString(new ConnectionString("mongodb://localhost")).build())
.connectionPoolSettings(ConnectionPoolSettings.builder().minSize(200).maxSize(1000).maxWaitQueueSize(1000000).build())
.build();
}
public static void main(String[] args) {
SpringApplication.run(Spring101WebfluxApplication.class, args);
}
}
複製代碼

這裡對Mongodb做了一些配置,主要也是希望放大連接池這塊的默認限制,為今後的壓測服務。注意,在這裡配置的Bean是com.mongodb.reactivestream.client下的MongoClient,如下圖所示,還有其它兩個MongoClient,如果修改了不匹配的MongoClient的話是不會有作用的,我在這個坑裡躺了兩小時。

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

完成後可以打開瀏覽器測試一下接口:

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

Spring MVC還是WebFlux?

下圖是官網的一個圖說明了兩者的關係,然後官網也給出了一些建議:

  1. 如果你現在的Spring MVC運行的沒啥問題的話就別改了,有大量的類庫可以使用,實現簡單易於理解。
  2. 如果你希望實現輕量級的,函數式Web框架,那麼可以考慮WebFlux的函數Web端點。
  3. 如果你依賴阻塞的持久化API比如JPA和JDBC那麼也就只能選擇Spring MVC了。目前對於非阻塞的JDBC實現有一些早期的項目在探索,但是沒有到可以上生產的成熟度。
  4. 在Spring MVC應用程序中進行遠程調用也是可以使用響應式的WebClient的。Spring MVC也可以使用其它的響應式組件。每次調用延遲越厲害受益越大。
  5. 對於大型應用程序要考慮到非阻塞方式實現的學習曲線。最簡單的起步方式就是使用WebClient,完全切換到非阻塞需要花時間熟悉函數式聲明式的編程API。
朱曄和你聊Spring系列S1E5:Spring WebFlux小探

官方的意思也是可以在一些小引用上嘗試WebFlux,對於大型應用不建議冒然轉到WebFlux。

觀察線程模型

我們知道對於阻塞的實現方式,我們採用線程池來服務請求(線程池中的會維護一組普通的線程,線程池只是節省線程創建的時間),對於每一個請求的處理,至始至終都是在一個線程中進行,如果處理的過程中我們需要訪問外部的網絡或數據庫,那麼線程就處於阻塞狀態,這個線程無法服務其它請求,如果當時還有更多的併發的話,就需要創建更多的線程來服務其它請求。這種實現方式是非常簡單的,應對壓力的增長擴容方式也是粗暴的,那就是增加更多線程。 對於非阻塞的方式,採用的是EventLoop的方式,IO操作的時候是不佔用工作線程的,因此只會創建一組和CPU核數相當的工作線程用於工作處理(NodeJS甚至是單線程的,這種就更危險了,就那麼一個工作線程,一旦被長時間佔用其它請求都無法處理)。由於整個處理過程中IO請求不佔用線程時間,線程不會阻塞等待,再增加超過CPU核數的工作線程也是沒有意義的(只會白白增加線程切換的開銷)。對於這種方式在壓力增長後,因為我們不需要增加額外的線程,也就沒有了絕對的瓶頸。 試想一下在阻塞模型下,對於5000的併發,而且每一個併發阻塞的時間非常長,那麼我們其實需要5000個線程來服務(這麼多線程99%其實都是在等待,屬於空耗系統資源),創建5000的線程不談其它的,如果線程棧大小是1M的話就需要5GB的內存。對於非阻塞的線程模型在8核機器上還是8個工作線程,內存佔用還是這麼小,可以以最小的開銷應對大併發,系統的損耗很少。非阻塞的Reactive模式是內耗非常小的模式,但是這是有代價的,在實現上我們需要確保處理過程中沒有阻塞產生,否則就會浪費寶貴的數目固定的工作線程,也就是說我們需要依賴配套的非阻塞IO類庫來使用。 在默認情況下tomcat的工作線程池初始化為10,最大200,我們通過啟動本文創建的Spring101WebfluxMvcApplication程序,用jvisualvm工具來看下初始的情況(35個線程):

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

在項目的application.properties文件中我們配置tomcat的最大線程數: server.tomcat.max-threads=250 在壓力的情況下,我們再來觀察一下線程的情況(272個線程):

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

的確是創建多達250個工作線程。這裡看到大部分線程都在休眠,因為我們這裡運行的是剛才的data()方法,在方法內我們休眠了100毫秒。對於同樣的壓力,我們再來看一下Spring101WebfluxApplication程序的線程情況(44個線程):

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

可以看到用於處理HTTP的Reactor線程只有8個,和本機CPU核數量一致(下面有十個Thread打頭的線程是處理和Mongodb交互的,忽略),只需要這8個線程處理HTTP請求足以,因為HTTP請求的IO處理不會佔用線程。

使用Gatling進行壓力測試

我們可以使用Gatling類庫進行壓力測試,我個人感覺比Jmeter方便。配置很簡單,首先我們要安裝Scala的SDK,然後我們新建一個模塊:


xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
me.josephzhu
spring101-webstresstest
0.0.1-SNAPSHOT
jar
spring101-webstresstest



io.gatling.highcharts
gatling-charts-highcharts
2.3.1





io.gatling
gatling-maven-plugin
2.2.4

me.josephzhu.spring101.webstresstest.StressTest
/Users/zyhome/gatling





複製代碼

引入了garling的maven插件,在這裡配置了測試結果輸出路徑以及壓測的類。接下去創建一下這個Scala測試類:

package me.josephzhu.spring101.webstresstest
import io.gatling.core.Predef._
import io.gatling.core.scenario.Simulation
import io.gatling.http.Predef._
class StressTest extends Simulation {
val scn = scenario("data").repeat(1000) {
exec(

http("data")
.get("http://localhost:8080/data?size=10&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
複製代碼

這段代碼定義瞭如下的測試行為:

  1. 聲明一個data測試場景,重複進行1000次測試,發起一個遠程調用,驗證調用結果的響應狀態碼是200並且返回的結果包含字符串payload。
  2. 測試啟動的時候直接壓上去200個用戶,每一個用戶運行完這1000次測試後結束了,所以這種方式一開始會是200用戶到測試最後階段用戶數會慢慢減少。當然還有其它一些測試方式(比如慢慢遞增用戶的方式),詳見官網:
 nothingFor(4 seconds), // 1
atOnceUsers(10), // 2
rampUsers(10) over (5 seconds), // 3
constantUsersPerSec(20) during (15 seconds), // 4
constantUsersPerSec(20) during (15 seconds) randomized, // 5
rampUsersPerSec(10) to 20 during (10 minutes), // 6
rampUsersPerSec(10) to 20 during (10 minutes) randomized, // 7
splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy (10 seconds), // 8
splitUsers(1000) into (rampUsers(10) over (10 seconds)) separatedBy atOnceUsers(30), // 9
heavisideUsers(1000) over (20 seconds) // 10
複製代碼

壓力測試一

先來進行第一個測試,1000併發對data接口進行100次循環(還記得嗎,接口有100ms休眠or延遲的):

class StressTest extends Simulation {
val scn = scenario("data").repeat(100) {
exec(
http("mvc data")
.get("http://localhost:8080/data?size=10&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(1000)))
}
複製代碼

下面兩個圖分別是MVC和WebFlux的測試結果(因為都是8080端口,所以測試的時候記得切換重啟兩個應用哦):

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

可以看到WebFlux的吞吐幾乎是MVC的翻倍,平均響應時間少了兩倍不止,很明顯,在等待的時候,2000個併發用戶大大超過了我們配置的250個線程池的線程數量,這個時候只能排隊,對於非阻塞的方式,延遲是不會佔用處理線程的,在延遲結束後才會去佔用處理線程的資源進行處理,不會收到併發用戶數受限於線程池線程數的情況。 我們把Sleep相關代碼註釋再進行一次測試看看情況,分別是MVC和WebFlux:

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

這個時候WebFlux優勢沒有那麼明顯了。

性能測試二

現在我們來訪問一下http://localhost:8080/saveData?size=100&length=1000 接口往Mongodb來初始化100條數據,然後修改一下測試腳本壓測dbData接口: class StressTest extends Simulation {

val scn = scenario("data").repeat(100) { exec( http("data") .get("http://localhost:8080/dbData") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) }

setUp(scn.inject(atOnceUsers(1000))) } 下面看下這次的測試結果 ,分別是MVC和WebFlux:

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

吞吐量沒有太多提高,平均響應時間快不少。

性能測試三

再來試一下第三個saveData接口的情況。修改測試代碼:

class StressTest extends Simulation {

val scn = scenario("data").repeat(100) { exec( http("data") .get("http://localhost:8080/saveData?size=5&length=100000") .header("Content-Type", "application/json") .check(status.is(200)).check(substring("payload"))) }

setUp(scn.inject(atOnceUsers(200))) }

這裡我們修改併發用戶為200,每個用戶進行100次測試,每次測試存入Mongodb 5條100KB的數據,一次測試後總數據量在10萬條。這次測試我們並沒有使用1000併發用戶,原因是這個測試我們會先從遠端獲取數據然後再存入Mongodb,遠端的服務也是來自於當前應用程序,我們的Tomcat最多隻有250個線程,在啟動1000個用戶後,一些線程服務於saveData接口,一些線程服務於data接口(saveData接口用到的),這樣相當於造成了循環依賴問題,請求在等待更多的可用線程執行服務data接口的響應,而這個時候線程又都被佔了導致無法分配更多的請求,測試幾乎全部超時。 下面看下這次的測試結果 ,分別是MVC和WebFlux:

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

WebFlux也是併發略高,性能略好的優勢。對於響應時間的分佈我們再來細看下下面的圖:

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

第一個圖是MVC版本的響應時間分佈,可以看到抖動比第二個圖的WebFlux的大不少。 最後來看看測試過程中MVC的JVM情況(263個線程):

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

以及WebFlux的(41線程):

朱曄和你聊Spring系列S1E5:Spring WebFlux小探

性能測試四:

我們來測試一下下面兩種情況下對於WebFlux版本Mongodb側的情況:

class StressTest extends Simulation {
val scn = scenario("data").repeat(1000) {
exec(
http("data")
.get("http://localhost:8080/saveData?size=1&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
複製代碼

以及

class StressTest extends Simulation {
val scn = scenario("data").repeat(1000) {
exec(
http("data")
.get("http://localhost:8080/saveData?size=5&length=1000")
.header("Content-Type", "application/json")
.check(status.is(200)).check(substring("payload")))
}
setUp(scn.inject(atOnceUsers(200)))
}
複製代碼

區別就在遠程服務返回的Flux是1個還是5個。在1個的時候運行測試可以看到我們Mongodb有64個連接(需要把之前連接池的配置最小設置為小一點,比如50):

> db.serverStatus().connections
{ "current" : 64, "available" : 3212, "totalCreated" : 8899 }
複製代碼

在size為5的時候,Flux返回的是5個對象,使用這個請求壓測的時候Mongodb的連接數如下:

> db.serverStatus().connections
{ "current" : 583, "available" : 2693, "totalCreated" : 10226 }
複製代碼

這是因為Flux拿到的數據直接以響應式進入Mongodb,並沒有等到所有數據拿到之後串行調用方法。 總結一下這幾次的測試,我們發現WebFlux方式對於MVC方式能有略微的性能提升,對於請求阻塞的時候性能優勢明顯。我本金的測試並沒有看到現象中的幾倍甚至幾十倍的性能提升,我猜原因如下:

  1. 本機有性能瓶頸了,壓測客戶端、Mongodb服務器、服務端都在本機運行,干擾因素太多,CPU的使用你爭我奪,測試不公平
  2. 測試的時候CPU永遠是100%還死機好幾次,我根本無法測試更高的併發,無法完全把非阻塞的性能壓出來
  3. 我本機測試的時候走的是localhost而不是內網,不經過物理網卡,可能無法體現非阻塞的性能 如果有條件可以使用三臺獨立服務器在內網進行1萬以上併發用戶的性能測試或許可以得到更科學的結果。

總結

本文我們創建了WebFlux和MVC兩套應用對比演示了簡單返回數據、發出遠程請求、使用Mongodb等一些簡單的應用場景,然後來看了一下ThreadPerRequest和EventLoop方式線程模型的區別,最後使用Gatling進行了幾個Case的壓力測試並且觀察結果。我覺得:

  1. 非阻塞模型肯定是好東西,在IO壓力和IO延遲很大的情況下,非阻塞模型因為不需要更多的線程,內耗小,性能略好,而且也穩定,所以更利於高併發
  2. WebFlux的函數式和聲明方式實現需要有很高的API熟悉使用門檻,對於複雜的邏輯這種方式的實現比回調地獄更容易繞暈,而且容易產生Bug(或許以後有可能響應式的編程在API上有可能和傳統方式進行統一)
  3. 目前和WebFlux配套的其它一些Reactive的庫還不是很全面成熟,要對複雜的業務邏輯全面啟用響應式編程有點難,阻塞調用不是不能在WebFlux中混用,但是這種方式還是採用了線程池來處理,現在容器也是NIO的了,有又多大區別
  4. 採用阻塞方式實現,由阻塞的線程進行天然背壓進行流控,非阻塞方式很直接一竿子到底,從外部請求直接到最底層存儲,需要做好流控,這是非常容易產生問題的一個點,當請求的處理無需通過線程來承載的時候,前端壓力會直通最底層數據源,不收任何擴容方面的限制,直接擊潰底層
  5. 對於阻塞的方式,多線程的調度天然就是一個任務的負載均衡,並不會出現太嚴重的卡死工作線程的問題,非阻塞應用編程我們要有意識代碼在哪個線程上運行,如果是reactor線程的話千萬不能長時間阻塞 綜上所述,使用WebFlux進行響應式編程我個人認為目前只適合做類IO轉發的高併發的又看中資源使用效率的應用場景(比如Gateway網關服務),對於複雜的業務邏輯不太適合,在90%的情況下響應式的編程模型和線程模型不會享受大幅性能優勢,更不建議盲目把現有的應用使用WebFlux來重寫。當然,這肯定是一個會持續發展的方向,可以先接觸研究起來。


分享到:


相關文章: