使用Cettia構建實時Web應用程序第2部分

本教程的第二部分討論套接字和套接字組的特性和狀態,以及縮放Cettia。

使用Cettia構建實時Web應用程序第2部分

廣播事件

要將事件發送到多個套接字,您可以創建一個套接字,向該套接字添加套接字併發送迭代到該套接字上的事件。它應該可以工作,但是socket是有狀態的而不是可序列化的,這意味著調用者應該總是檢查每次這個套接字是否可用; 無法在導線的另一側處理此插座。Cettia以功能性的方式解決了這些問題。

應用程序創建一個套接字動作並將其傳遞給服務器的其中一個查找器方法。

服務器找到相應的套接字並執行傳遞的動作,逐個傳遞套接字。

在這裡,動作表示用於處理給定參數的功能接口。通過這種方式,您可以將狀態管理委託給服務器,並通過構建套接字操作來關注套接字處理; 您還可以序列化並向群集中的其他服務器廣播一個操作而不是套接字,然後讓服務器為其自己的套接字執行操作。

查找方法來查找服務器中的所有套接字 server.all(Action action)。將以下chat 事件處理程序添加 到套接字處理程序以將給定chat 事件發送 到服務器中的每個套接字:

socket.on("chat", data -> {

server.all((Action & Serializable) s -> {

s.send("chat", data);

});

});

不要將它與通過註冊的套接字處理程序混淆 server.onsocket(Action action)。Finder方法包括 server.all 處理服務器中的現有套接字(集群中的每個服務器), server.onsocket 並且要初始化由此新接受的套接字 server。

事實上,編寫和提交一個動作只有在你需要做一些比發送事件更復雜的事情時才有用。如果它不那麼複雜,可以使用一行代碼完成 Sentence。chat 在一行中重寫上述 事件處理程序:

socket.on("chat", data -> server.all().send("chat", data));

Sentence 是由服務器創建並返回的,當它的finder方法被調用時沒有動作,即 server.all()。Sentence與上面一樣, 每個方法 send都映射到預先實現的通用套接字操作,因此如果執行該方法,則其映射操作將根據被調用的查找器方法與服務器找到的套接字一起執行。這就是為什麼上述兩個代碼片段完全相同的原因。

要演示 chat 事件處理程序,請在一個選項卡中打開2個套接字,或在每個瀏覽器中打開2個瀏覽器和一個套接字。在跟蹤套接字的狀態時,將第1部分的logState 事件處理程序添加到內置事件中很方便 。

var socket1 = cettia.open("http://127.0.0.1:8080/cettia");

socket1.on("chat", data => console.log("socket1", data));

var socket2 = cettia.open("http://127.0.0.1:8080/cettia");

socket2.on("chat", data => console.log("socket2", data));

一旦所有套接字打開,選擇其中一個併發送一個 chat 事件。然後,你應該看到一個聊天活動通過發送 socket1 和廣播 socket1 和 socket2。

socket1.send("chat", "Is it safe to invest in Bitcoin?");

您可能急於回答這個問題。在控制檯上使用它。

使用特定的套接字

在大多數情況下,您可能會處理代表特定實體的一組套接字,而不是簡單的所有套接字。例如,實體可以是登錄到多個瀏覽器的用戶,進入聊天室的用戶,遊戲中的紅隊員等等。正如所解釋的,服務器的finder方法接受一個標準來查找套接字和一個用找到的套接字執行的動作,這裡使用的標準是標籤。Cettia允許在套接字中添加和移除標籤,並提供查找器方法來查找標籤套接字,例如查詢數據庫。

作為一個簡單的例子,我們來編寫 myself 事件處理程序,它將給定的事件發送到使用我的用戶名標記的套接字。這裡,這些套接字表示一個名為我自己的實體。假設用戶名包含在username URI中的命名查詢參數 中,並且URI編碼為safe。例如,如果套接字的URI是 /cettia?username=alice,那麼套接字處理程序將通過添加 alice 標記到套接字 socket.tag(String tagName),並且在myself 分派事件時 ,服務器將查找包含該alice 標記的 套接字 server.byTag(String... names) 並將事件發送給它們。

這是一個myself 事件處理程序的實現 。假設有一種方法被稱為 findUsernameParameter 從給定的URI查找用戶名參數。

String username = findUsernameParameter(socket.uri());

socket.tag(username).on("myself", data -> server.byTag(username).send("myself", data));

要測試 myself 事件處理程序,請在一個選項卡中打開3個套接字,或在每個瀏覽器中打開3個瀏覽器和一個套接字

var socket1 = cettia.open("http://127.0.0.1:8080/cettia?username=alice");

socket1.on("myself", data => console.log("socket1", data));

var socket2 = cettia.open("http://127.0.0.1:8080/cettia?username=alice");

socket2.on("myself", data => console.log("socket2", data));

var socket3 = cettia.open("http://127.0.0.1:8080/cettia?username=bob");

socket3.on("myself", data => console.log("socket3", data));

一旦所有套接字打開後,選擇其中一個併發送一個 myself 事件。

socket2.send("myself", "A private message for me");

你應該看到,發送的事件 socket2 被廣播到 socket1 和 socket2,但不 socket3,他的用戶名是不同的。這樣,如果您向自己發送一條直接消息,無論您使用哪個瀏覽器或設備,它都將廣播到您打開套接字的每個瀏覽器和設備,這對於改善多設備用戶體驗非常有用。

您可能需要比較 myself 與事件echo 和 chat 事件,從第1部分。運行以下代碼並找出這些事件之間的不同之處。

[socket1, socket2, socket3].forEach((socket, i) => {

const log = data => console.log(`socket${i + 1}`, data);

socket.on("echo", log).on("chat", log);

});

斷開處理

到目前為止,我們只處理打開狀態的套接字,但斷開連接是不可避免的。如果任何事件由於斷開而無法發送給用戶,並且在連接恢復時儘管延遲發送,但情況變得複雜。並非所有斷線都是相同的; 它們在斷開和重新連接之間的時間段中變化。通常,臨時斷開比永久斷開更常見,特別是在移動環境中,並且每種情況的用戶體驗都不相同。如果某些事件由於暫時斷開而延遲幾秒鐘後傳遞,則客戶可以將它們視為按時交付,但如果延遲由於永久斷開而延遲幾分鐘或幾小時,則可能會更好發送關於錯過活動的電子郵件。

Cettia將臨時斷開定義為在60秒內重新連接後的斷開連接。它將套接字的生命週期設計為不受臨時斷開影響,並提供事件驅動的方式來處理斷開連接。這是一個服務器端示例,用於發送由於下一個連接斷開連接而導致事件失敗的事件。附加以下導入:

import java.util.Queue;

import java.util.concurrent.ConcurrentLinkedQueue;

以下代碼給套接字處理程序:

Queue queue = new ConcurrentLinkedQueue<>();

socket.oncache(args -> queue.offer(args));

socket.onopen(v -> {

while (socket.state() == ServerSocket.State.OPENED && !queue.isEmpty()) {

Object[] args = queue.poll();

socket.send((String) args[0], args[1], (Action>) args[2], (Action>) args[3]);

}

});

socket.ondelete(v -> queue.forEach(args -> System.out.println(socket + " missed event - name: " + args[0] + ", data: " + args[1])));

請參閱第1部分的套接字生命週期部分,瞭解服務器的socket 事件與套接字的 open 事件之間的區別 ,以及套接字 open 和 delete 事件的發送時間。默認情況下,客戶端重新連接到服務器,延遲間隔由延遲500和比率2(500,1000,2000,4000 ...)的幾何級數確定。

  • 如果send 調用該方法時套接字沒有活動連接 ,cache 則會使用用於調用send 方法的參數數組觸發該 事件 。在這種情況下,您可以決定並收集下次重新連接時發送的事件 queue。

  • 如果 open 事件被觸發,queue 通過一個新的連接逐個發送項目來清除 。即使在 open 事件中,您也應該檢查套接字是否已打開,以免干擾 queue 。

  • 如果 delete 事件被觸發並且 queue 不為空,則必須根據您希望提供的其他事件的用戶體驗與應用程序的其他構建塊一起工作。例如,可以使用數據庫來存儲錯過的事件,並在下次訪問服務時顯示它們。可以使用推送通知系統來通知用戶錯過的事件,並且可以使用SMTP服務器發送錯過事件的摘要電子郵件。

請注意,在編寫套接字操作並將其提交給服務器時,您無需關心給定套接字的狀態。即使套接字沒有連接並且未能發送事件,也可以在cache 處理程序中安全地處理它們 。

模擬暫時斷開的最簡單方法是設置一個 name 選項來打開套接字並刷新網頁。該 name 選項是瀏覽上下文中的一個標識符,以允許套接字name 在下一頁共享相同的 選項,並繼承當前頁面中套接字的生命週期。由於此選項可幫助在頁面導航過程中恢復錯過的事件,因此將實時Web功能添加到多頁面應用程序時非常有用。打開開發人員工具 index.html 並運行以下代碼片段:

var socket1 = cettia.open("http://127.0.0.1:8080/cettia", {name: "main"});

socket1.on("chat", data => console.log("socket1", "message", data.message, "with", Date.now() - data.sentAt, "ms delay"));

刷新網頁,然後 socket1 應該斷開連接。在刷新的頁面上運行以下代碼片段:

var socket2 = cettia.open("http://127.0.0.1:8080/cettia");

socket2.once("open", () => socket2.send("chat", {message: "ㅇㅅㅇ", sentAt: Date.now()}));

socket2.on("chat", data => console.log("socket2", "message", data.message, "with", Date.now() - data.sentAt, "ms delay"));

發送的聊天事件 socket2 無法到達, socket1 因為它沒有活動連接,而是將事件緩存在隊列中 socket1。如果您在刷新的頁面上再次運行第一個代碼段,以便 socket1延長生命週期,則應該看到 socket1 接收緩存的事件。當然,如果您推遲運行第一個代碼片段1分鐘,您將看到 socket1 調度該 delete 事件,因此它的緩存事件在服務器中記錄為錯過的事件。

縮放Cettia應用程序

最後但並非最不重要的是擴展應用程序。如前所述,任何發佈 - 訂閱消息系統都可用於水平擴展Cettia應用程序,並且不需要對現有應用程序進行任何修改。縮放Cettia應用程序的想法非常簡單:

當調用服務器的其中一個finder方法時,它會將此方法調用序列化為消息並將其發佈到群集。

當一個服務器從集群接收到一些消息時,它將反序列化為方法調用並將其應用於其自己的套接字。

無論傳遞給finder方法的套接字動作多麼複雜,只要它是可序列化的,它就可以在其他服務器中的套接字中執行; 你不需要擔心很多序列化。由Sentence 所提供的操作 都是可序列化的,並且您可以使用Java 8的強制表達式簡單地進行操作,就像 (Action & Serializable) socket -> {} 您必須使用普通操作一樣。

在本教程中,我們將使用Hazelcast作為發佈 - 訂閱消息系統。添加以下依賴項:

com.hazelcast

hazelcast

3.9.3

com.hazelcast

hazelcast-client

3.9.3

現在,添加以下導入:

import com.hazelcast.config.Config;

import com.hazelcast.core.HazelcastInstance;

import com.hazelcast.core.ITopic;

import com.hazelcast.instance.HazelcastInstanceFactory;

import io.cettia.ClusteredServer;

import java.util.Map;

將Cettia零件的第一行替換為 Server server = new DefaultServer();以下行:

ClusteredServer server = new ClusteredServer();

ClusteredServer 類有兩種方法:

  1. onpublish(Action> action) - 服務器攔截並序列化對包裝服務器的finder方法調用,並將它們傳遞給參數操作。該操作應該將其發佈到群集。

  2. messageAction() - 此操作將已發佈的消息進行反序列化,並調用已包裝的服務器的查找程序方法。應該在從群集到達時用消息調用它。

為了給你一個想法server.onpublish(message -> server.messageAction().on(message));, ClusteredServer 將會和你的想法 完全一樣 DefaultServer。將下面的代碼添加到 CettiaConfigListener#contextInitialized 方法中:

// Hazelcast part

HazelcastInstance hazelcast = HazelcastInstanceFactory.newHazelcastInstance(new Config());

ITopic> topic = hazelcast.getTopic("cettia");

// It publishes messages given by the server

server.onpublish(message -> topic.publish(message));

// It relays published messages to the server

topic.addMessageListener(message -> server.messageAction().on(message.getMessageObject()));

現在,如果應用程序server.all 使用動作調用 ,則傳遞的動作將被序列化並廣播到群集中的所有服務器,並由群集中的每個服務器反序列化並執行。讓我們在端口8080上重新啟動服務器,打開一個新的shell,並通過運行在端口8090上啟動另一臺服務器 mvn jetty:run -Djetty.port=8090。然後您會看到8080和8090上的Hazelcast節點形成群集。

要測試實現,請在每個端口的一個選項卡中打開2個套接字,或者在每個瀏覽器中打開2個瀏覽器和一個套接字:

var socket1 = cettia.open("http://127.0.0.1:8080/cettia");

socket1.on("chat", data => console.log("socket1", data));

var socket2 = cettia.open("http://127.0.0.1:8090/cettia");

socket2.on("chat", data => console.log("socket2", data));

所有套接字打開後,選擇其中一個併發送 chat 事件:

socket1.send("chat", "Greetings from 8080");

正如您所看到的,chat 從8080上連接到服務器的客戶端發送的 事件會傳播到連接到8090和8080上的服務器的客戶端。

至於部署,畢竟它只是一個Web應用程序,所以您可以像往常一樣部署應用程序並配置環境。請記住,您應該啟用“粘性會話”來部署集群式Cettia應用程序。需要管理由多個傳輸組成的套接字生命週期,並啟用由多個HTTP請求響應交換組成的HTTP傳輸。

結論

Cettia 是一個功能齊全的實時Web應用程序框架,可用於實時在服務器和客戶端之間交換事件。在分離關注原則之後,框架分為三層:一個I / O框架不可知層,用於在JVM上的任何I / O框架上運行Cettia應用程序; 傳輸層以提供可靠的全雙工消息信道; 和一個套接字層來提供優雅的模式,以在實時網絡中實現更好的用戶體驗。這種多層體系結構只允許專注於應用級實時事件處理,並且在技術堆棧上有更大的選擇自由度。

在本教程中,我們已經瞭解了Cettia團隊在Cettia中做出的關鍵設計決策背後的原因,以及構建實時導向應用程序所需的各種模式和功能,而不會影響Cettia,因此,我們已經構建了入門套件。


分享到:


相關文章: