鳥瞰 Java 併發框架,看了都說好

點擊上方 "程序員小樂"關注, 星標或置頂一起成長

每天凌晨00點00分, 第一時間與你相約


每日英文

There's no one that can influence the way you live your life. Sometimes, we just need a bit more confidence to stick with our choices.

沒有人可以左右你的人生,只是很多時候我們需要多一些勇氣,去堅定自己的選擇。


每日掏心話

有錢的人,把自己的房子裝飾得很漂亮;有德的人,把自己的身心修養得很健康。真正的富有是健康,真正的貧窮是無知。


鏈接:suo.im/6oynK

鳥瞰 Java 併發框架,看了都說好

程序員小樂(ID:study_tech)第 803 次推文 圖片來自百度


往日回顧:拒絕重複編碼:這款IDEA插件代碼神器瞭解一下


正文


1. 為什麼要寫這篇文章


幾年前 NoSQL 開始流行的時候,像其他團隊一樣,我們的團隊也熱衷於令人興奮的新東西,並且計劃替換一個應用程序的數據庫。但是,當深入實現細節時,我們想起了一位智者曾經說過的話:“細節決定成敗”。最終我們意識到 NoSQL 不是解決所有問題的銀彈,而 NoSQL vs RDMS 的答案是:“視情況而定”。類似地,去年RxJava 和 Spring Reactor 這樣的併發庫加入了讓人充滿激情的語句,如異步非阻塞方法等。為了避免再犯同樣的錯誤,我們嘗試評估諸如 ExecutorService、 RxJava、Disruptor 和 Akka 這些併發框架彼此之間的差異,以及如何確定各自框架的正確用法。


本文中用到的術語在這裡有更詳細的描述。


2. 分析併發框架的示例用例


鳥瞰 Java 併發框架,看了都說好


3. 快速更新線程配置


在開始比較併發框架的之前,讓我們快速複習一下如何配置最佳線程數以提高並行任務的性能。這個理論適用於所有框架,並且在所有框架中使用相同的線程配置來度量性能。


  • 對於內存任務,線程的數量大約等於具有最佳性能的內核的數量,儘管它可以根據各自處理器中的超線程特性進行一些更改。

  • 例如,在8核機器中,如果對應用程序的每個請求都必須在內存中並行執行4個任務,那麼這臺機器上的負載應該保持為 @2 req/sec,在 ThreadPool 中保持8個線程。

  • 對於 I/O 任務,ExecutorService 中配置的線程數應該取決於外部服務的延遲。

  • 與內存中的任務不同,I/O 任務中涉及的線程將被阻塞,並處於等待狀態,直到外部服務響應或超時。因此,當涉及 I/O 任務線程被阻塞時,應該增加線程的數量,以處理來自併發請求的額外負載。

  • I/O 任務的線程數應該以保守的方式增加,因為處於活動狀態的許多線程帶來了上下文切換的成本,這將影響應用程序的性能。為了避免這種情況,應該根據 I/O 任務中涉及的線程的等待時間按比例增加此機器的線程的確切數量以及負載。


參考: http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/


4. 性能測試結果


性能測試配置 GCP -> 處理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架構:x86_64;CPU 內核:8個(注意:這些結果僅對該配置有意義,並不表示一個框架比另一個框架更好)。


鳥瞰 Java 併發框架,看了都說好


5. 使用執行器服務並行化 IO 任務


5.1 何時使用?


如果一個應用程序部署在多個節點上,並且每個節點的 req/sec 小於可用的核心數量,那麼 ExecutorService 可用於並行化任務,更快地執行代碼。


5.2 什麼時候適用?


如果一個應用程序部署在多個節點上,並且每個節點的 req/sec 遠遠高於可用的核心數量,那麼使用 ExecutorService 進一步並行化只會使情況變得更糟。

當外部服務延遲增加到 400ms 時,性能測試結果如下(請求速率 @50 req/sec,8核)。


鳥瞰 Java 併發框架,看了都說好


5.3 所有任務按順序執行示例


// I/O 任務:調用外部服務
String posts = JsonService.getPosts();
String comments = JsonService.getComments();
String albums = JsonService.getAlbums();
String photos = JsonService.getPhotos();

// 合併來自外部服務的響應
// (內存中的任務將作為此操作的一部分執行)
int userId = new Random().nextInt(10) + 1;
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

// 構建最終響應並將其發送回客戶端
String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
return response;


5.4 I/O 任務與 ExecutorService 並行執行代碼示例


// 添加 I/O 任務
List<callable>> ioCallableTasks = new ArrayList<>();
ioCallableTasks.add(JsonService::getPosts);
ioCallableTasks.add(JsonService::getComments);
ioCallableTasks.add(JsonService::getAlbums);
ioCallableTasks.add(JsonService::getPhotos);

// 調用所有並行任務


ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
List<future>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks);

// 獲取 I/O 操作(阻塞調用)結果
String posts = futuresOfIOTasks.get(0).get();
String comments = futuresOfIOTasks.get(1).get();
String albums = futuresOfIOTasks.get(2).get();
String photos = futuresOfIOTasks.get(3).get();

// 合併響應(內存中的任務是此操作的一部分)
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

// 構建最終響應並將其發送回客戶端
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;/<future>/<callable>


6. 使用執行器服務並行化 IO 任務(CompletableFuture)


與上述情況類似:處理傳入請求的 HTTP 線程被阻塞,而 CompletableFuture 用於處理並行任務


6.1 何時使用?


如果沒有 AsyncResponse,性能與 ExecutorService 相同。如果多個 API 調用必須異步並且鏈接起來,那麼這種方法更好(類似 Node 中的 Promises)。


ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);

// I/O 任務
CompletableFuture<string> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<string> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
ioExecutorService);
CompletableFuture<string> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
ioExecutorService);
CompletableFuture<string> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
ioExecutorService);
CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();

// 從 I/O 任務(阻塞調用)獲得響應
String posts = postsFuture.get();
String comments = commentsFuture.get();
String albums = albumsFuture.get();
String photos = photosFuture.get();

// 合併響應(內存中的任務將是此操作的一部分)
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

// 構建最終響應並將其發送回客戶端
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;/<string>/<string>/<string>/<string>


7. 使用 ExecutorService 並行處理所有任務


使用 ExecutorService 並行處理所有任務,並使用 @suspended AsyncResponse response 以非阻塞方式發送響應。


鳥瞰 Java 併發框架,看了都說好

圖片來自 http://tutorials.jenkov.com/java-nio/nio-vs-io.html


  • HTTP 線程處理傳入請求的連接,並將處理傳遞給 Executor Pool,當所有任務完成後,另一個 HTTP 線程將把響應發送回客戶端(異步非阻塞)。

  • 性能下降原因:

  • 在同步通信中,儘管 I/O 任務中涉及的線程被阻塞,但是隻要進程有額外的線程來承擔併發請求負載,它仍然處於運行狀態。

  • 因此,以非阻塞方式保持線程所帶來的好處非常少,而且在此模式中處理請求所涉及的成本似乎很高。

  • 通常,對這裡討論採用的例子使用異步非阻塞方法會降低應用程序的性能。


7.1 何時使用?


如果用例類似於服務器端聊天應用程序,在客戶端響應之前,線程不需要保持連接,那麼異步、非阻塞方法比同步通信更受歡迎。在這些用例中,系統資源可以通過異步、非阻塞方法得到更好的利用,而不僅僅是等待。


// 為異步執行提交併行任務


ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
CompletableFuture<string> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<string> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
ioExecutorService);
CompletableFuture<string> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
ioExecutorService);
CompletableFuture<string> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
ioExecutorService);

// 當 /posts API 返回響應時,它將與來自 /comments API 的響應結合在一起
// 作為這個操作的一部分,將執行內存中的一些任務
CompletableFuture<string> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments),
ioExecutorService);

// 當 /albums API 返回響應時,它將與來自 /photos API 的響應結合在一起
// 作為這個操作的一部分,將執行內存中的一些任務
CompletableFuture<string> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos),
ioExecutorService);

// 構建最終響應並恢復 http 連接,把響應發送回客戶端
postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> {
LOG.info("Building Async Response in Thread " + Thread.currentThread().getName());
String response = s1 + s2;
asyncHttpResponse.resume(response);
}, ioExecutorService);/<string>/<string>/<string>/<string>/<string>/<string>


8. RxJava


  • 這與上面的情況類似,唯一的區別是 RxJava 提供了更好的 DSL 可以進行流式編程,下面的例子中沒有體現這一點。

  • 性能優於 CompletableFuture 處理並行任務。


8.1 何時使用?


如果編碼的場景適合異步非阻塞方式,那麼可以首選 RxJava 或任何響應式開發庫。還具有諸如 back-pressure 之類的附加功能,可以在生產者和消費者之間平衡負載。


int userId = new Random().nextInt(10) + 1;
ExecutorService executor = CustomThreads.getExecutorService(8);

// I/O 任務
Observable<string> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts())
.subscribeOn(Schedulers.from(executor));
Observable<string> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments())
.subscribeOn(Schedulers.from(executor));
Observable<string> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums())
.subscribeOn(Schedulers.from(executor));
Observable<string> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos())
.subscribeOn(Schedulers.from(executor));

// 合併來自 /posts 和 /comments API 的響應
// 作為這個操作的一部分,將執行內存中的一些任務
Observable<string> postsAndCommentsObservable = Observable
.zip(postsObservable, commentsObservable,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments))
.subscribeOn(Schedulers.from(executor));



// 合併來自 /albums 和 /photos API 的響應
// 作為這個操作的一部分,將執行內存中的一些任務
Observable<string> albumsAndPhotosObservable = Observable
.zip(albumsObservable, photosObservable,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos))
.subscribeOn(Schedulers.from(executor));

// 構建最終響應
Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)
.subscribeOn(Schedulers.from(executor))
.subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));/<string>/<string>/<string>/<string>/<string>/<string>


9. Disruptor


鳥瞰 Java 併發框架,看了都說好

[Queue vs RingBuffer]

鳥瞰 Java 併發框架,看了都說好

圖片1:http://tutorials.jenkov.com/java-concurrency/blocking-queues.html

圖片2:https://www.baeldung.com/lmax-disruptor-concurrency


  • 在本例中,HTTP 線程將被阻塞,直到 disruptor 完成任務,並且使用 countdowlatch 將 HTTP 線程與 ExecutorService 中的線程同步。

  • 這個框架的主要特點是在沒有任何鎖的情況下處理線程間通信。在 ExecutorService 中,生產者和消費者之間的數據將通過 Queue傳遞,在生產者和消費者之間的數據傳輸過程中涉及到一個鎖。Disruptor 框架通過一個名為 Ring Buffer 的數據結構(它是循環數組隊列的擴展版本)來處理這種生產者-消費者通信,並且不需要任何鎖。

  • 這個庫不適用於我們在這裡討論的這種用例。僅出於好奇而添加。


9.1 何時使用?


Disruptor 框架在下列場合性能更好:與事件驅動的體系結構一起使用,或主要關注內存任務的單個生產者和多個消費者。


static {
int userId = new Random().nextInt(10) + 1;

// 示例 Event-Handler; count down latch 用於使線程與 http 線程同步


EventHandler<event> postsApiHandler = (event, sequence, endOfBatch) -> {
event.posts = JsonService.getPosts();
event.countDownLatch.countDown();
};

// 配置 Disputor 用於處理事件
DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler)
.handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2)
.thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2)
.handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);
DISRUPTOR.start();
}

// 對於每個請求,在 RingBuffer 中發佈一個事件:
Event event = null;
RingBuffer<event> ringBuffer = DISRUPTOR.getRingBuffer();
long sequence = ringBuffer.next();
CountDownLatch countDownLatch = new CountDownLatch(6);
try {
event = ringBuffer.get(sequence);
event.countDownLatch = countDownLatch;
event.startTime = System.currentTimeMillis();
} finally {
ringBuffer.publish(sequence);
}
try {
event.countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}/<event>/<event>


10. Akka


鳥瞰 Java 併發框架,看了都說好


圖片來自:https://blog.codecentric.de/en/2015/08/introduction-to-akka-actors/


  • Akka 庫的主要優勢在於它擁有構建分佈式系統的本地支持。

  • 它運行在一個叫做 Actor System 的系統上。這個系統抽象了線程的概念,Actor System 中的 Actor 通過異步消息進行通信,這類似於生產者和消費者之間的通信。

  • 這種額外的抽象級別有助於 Actor System 提供諸如容錯、位置透明等特性。

  • 使用正確的 Actor-to-Thread 策略,可以對該框架進行優化,使其性能優於上表所示的結果。雖然它不能在單個節點上與傳統方法的性能匹敵,但是由於其構建分佈式和彈性系統的能力,仍然是首選。


10.1 示例代碼


// 來自 controller :
Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());

// handler :
public Receive createReceive() {
return receiveBuilder().match(Request.class, request -> {
Event event = request.event; // Ideally, immutable data structures should be used here.
request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());
request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());
request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());
request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());
}).match(Event.class, e -> {
if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {
int userId = new Random().nextInt(10) + 1;
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,
e.comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,
e.photos);
String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
e.response = response;
e.countDownLatch.countDown();
}
}).build();
}


11. 總結


  • 根據機器的負載決定 Executor 框架的配置,並檢查是否可以根據應用程序中並行任務的數量進行負載平衡。

  • 對於大多數傳統應用程序來說,使用響應式開發庫或任何異步庫都會降低性能。只有當用例類似於服務器端聊天應用程序時,這個模式才有用,其中線程在客戶機響應之前不需要保留連接。

  • Disruptor 框架在與事件驅動的架構模式一起使用時性能很好; 但是當 Disruptor 模式與傳統架構混合使用時,就我們在這裡討論的用例而言,它並不符合標準。這裡需要注意的是,Akka 和 Disruptor 庫值得單獨寫一篇文章,介紹如何使用它們來實現事件驅動的架構模式。

  • 這篇文章的源代碼可以在 GitHub 上找到。


鳥瞰 Java 併發框架,看了都說好

歡迎在留言區留下你的觀點,一起討論提高。如果今天的文章讓你有新的啟發,學習能力的提升上有新的認識,歡迎轉發分享給更多人。


猜你還想看


阿里、騰訊、百度、華為、京東最新面試題彙集

集合裡的元素怎麼“不見了”?

Github 3.4k星,200餘行代碼,讓你實時從視頻中隱身

一次SQL查詢優化原理分析(900W+數據,從17s到300ms)

關注訂閱號「程序員小樂」,收看更多精彩內容
嘿,你在看嗎?



分享到:


相關文章: