CQRS:是什麼? 為什麼? 怎麼做?

CQRS是推理特定域活動的有用模式。 但這肯定帶有陡峭的學習曲線。 讀取,寫入,DDD,事件源,最終一致性,我們為什麼要關心?

這是我在自己空間上的原始文章的交叉發佈:CQRS:為什麼? 以及所有需要考慮的事情。 隨意看看並查看我的其他文章。

CQRS:是什麼? 為什麼? 怎麼做?

Photo by Franck V. on Unsplash

用例

我正在研究一個新項目,該項目旨在替換會產生後果並在整個公司中引入耦合的大型(太大)整體。

為了適應不斷變化的業務需求,縮短產品上市時間,改善團隊之間的可追溯性和溝通,我們決定使用CQRS異步體系結構(但不提供事件源功能,稍後再介紹)。

長話短說,將所有人"轉換"為新的思維方式並強加於人並非易事。 人們認為必須重新質疑,挑戰和產生新的約束(例如最終的一致性,或者知道我們是否應該擁有某些數據)會受到質疑。 我曾經是這個傢伙,總是說"你做不到",提醒了為什麼。

CQRS(與DDD結合使用)迫使我們施加強大的約束,並具有我們大多數人以前沒有的限制(使用經典後端+ DB)。 這並不總是一件壞事:它揭示了一切。 我們對域本身瞭解很多,總是質疑域的分離和責任。 它使我們瞭解業務。

在這裡,我將大致討論CQRS及其與DDD和事件源的鏈接。 所有這些都始於一個用例,在該用例中,我們需要向其他服務公開最終不一致(也就是高度一致)的數據。 CQRS不一定要最終保持一致,但在我們的情況下,它是(因為異步且高度可用)。 我看到有些人皺著眉頭,因為我們應該接受最終的一致性,這是生活(和分佈式系統)的一部分。 同意 但..



我們為什麼需要它?

CQRS(命令查詢響應隔離)是非典型的年前,但是如今,我們可以找到很多有關它的內容,許多公司正在使用它並進行溝通。 我們甚至可以在不知道自己做的情況下進行CQRS!

模式很簡單:

CQRS:是什麼? 為什麼? 怎麼做?

CQRS並不意味著實際上可以使用微服務,消息傳遞基礎結構或事件,也不能進行DDD。 出於某種原因,它經常與這些技術一起使用。

一個寫模型,多個讀模型

CQRS的想法是使一個應用程序(廣義上)能夠與不同的模型一起工作:

· 它使用的一種內部模型:寫模型,由Commands修改(稍後會詳細介紹)

· 它和其他應用讀取的一個或幾個讀取模型(我們不能讓其他人讀取寫入模型)

讀取模型可以由前端或API讀取,沒關係。

例如:如果我們有一個在某些資源上採用POST / PUT的API,將狀態保存到PostgreSQL,並有一個將其同步到Elasticsearch上以在我們的網頁上進行智能查找的過程,那麼我們幾乎在做CQRS(我們可能還會公開我們的 PostgreSQL ..)。

可擴展性

CQRS允許我們獨立擴展系統:我們通常必須處理更多的讀取而不是寫入的操作,因此具有不同的可伸縮性。

我們希望在O(1)中訪問讀取:我們不希望它們花更多的時間來獲取實體,因為它具有與其他實體的更多鏈接:我們想避免爆炸性的JOIN。 解決方案是在任何人請求之前,在數據更改時預先計算結果。 這樣做還會在請求發生時使用更少的資源,從而減少延遲並使其在p999上可預測且穩定。 我們在爭取空間。 稍後再詳細介紹。

當我們實現Redis緩存以避免主數據庫過載(進行寫操作)時,這就是CQRS的精神。

業務需求

將寫入模型與讀取模型分開可以幫助我們分離域的複雜方面(誰需要什麼,誰負責什麼),並增加解決方案的靈活性。 我們可以更簡單地適應不斷變化的業務需求。

這是因為我們更多地考慮了責任:誰在變異數據? 用例是什麼? 我們應該真正擁有這些數據嗎? 我們會採取行動嗎? 這不是其他應用程序的責任嗎? 誰只需要讀取數據? 應該是高度一致的嗎? 由於這種思維方式,CQRS通常與DDD相關聯。

併發

從技術上講,CQRS還可以通過顯示併發和鎖定(事務數據庫)來簡化它們。

當我們使用異步CQRS模式工作時,我們經常談論數據的最終一致性,數據生命週期,數據所有權,業務需求以及許多關於建模的問題:實體的事務邊界以及我們應始終擁有的不變式。 同樣,這就是為什麼CQRS通常是面向DDD的原因:必須非常仔細地定義數據形式的集合。 (稍後會詳細介紹)

沒有"讀自己寫"的語義

必須明確處理陳舊數據。 如果我們更改資源(通過發送命令)並立即讀取相同的資源,我們將看不到更改。 異步CQRS不提供"讀取自己的寫入"語義。

前端可以通過進行樂觀併發來模擬它:它可以嵌入一些知識,並假設它要求的突變會很好,因此它在獲得真正的突變之前會顯示它認為將是答案的東西。 如果有差異,它會適應。

當它是同步CQRS時,我們可以具有以下語義:我們將這兩個模型在同一事務中寫入不同的表中。 它始終是同步的。 CQRS很少是同步的,因為我們想使用不同的資源,數據庫類型,使用消息傳遞基礎結構來進行工作/擴展:我們很少能夠使分佈式事務跨資源(我們可以談論XA或Sagas,但是……現在不行!)。

CQRS是一種模式,嚴格將處理命令(變異)的職責與處理無副作用查詢/讀取訪問的職責區分開來。

這不是萬能的解決方案

我們僅應在某些情況下考慮實施CQRS:

· 我們有很多不斷變化的業務需求

· 公司不知道確切的去向

· 我們有可擴展性問題

· 我們與其他團隊合作(即:其他受限環境)

· 多種服務競爭改變相同的資源

· 我們正在安排我們周圍的其他服務

· 我們域中發生的事情會影響他們,反之亦然

· 我們的領域是面向寫入的,我們不會讀取您自己的數據,其他應用程序會

對於單獨工作的簡單API或明確定義範圍的CQRS的開銷可能是巨大且不必要的。

即使我們要實現CQRS架構,團隊(開發人員和業務人員)也將需要繞過對變更的恐懼,遵循學習曲線,彎曲思路,然後適應過去的工作方式。

AKF規模立方

如果我們遵循Write + Reads邏輯,則意味著我們將以某種方式在系統中複製數據。 "相同"數據將出現在寫入模型和讀取模型下。

我遇到了一些真的很害怕重複數據的人,或者覺得這是一種反模式。 數據只有一個主機,它應該是控制誰有權訪問數據的唯一主機,沒有人可以在自己的數據庫或消息傳遞基礎結構中複製它。 您必須始終調用我的API! 瓦德復古!

是的,有時可能需要始終調用API而不復制數據(例如GDPR下的用戶數據)。 但是它很少是強制性的,它本身也可以成為反模式,因為它導致更多的複雜性,更多的依賴關係,不良的性能,服務中斷,SLA減少。

我喜歡AKF規模多維數據集,以瞭解為什麼組織中的重複是橫向的:

CQRS:是什麼? 為什麼? 怎麼做?

簡而言之,三軸:

· X:低級技術:我們應該複製,複製,緩存,負載均衡數據

· Y:組織:我們應該擁有獨立的服務來處理不同的領域:它們負責自己的數據(按DDD表示)

· Z:高級分片:我們應該根據用例將相似的內容(按resource_id,按地理位置)細分為自己的基礎結構

命令/書寫:副作用

因此,導致對寫模型進行寫操作的事物稱為命令。 這是設計我們始終使用的工具的通用術語:改變系統狀態的工具(任何形式的更新)。

· 命令可以被同步或異步處理。

· 命令是否可以通過消息總線。

· 命令可以是簡單的API調用。

· 如果不執行OOP,則Command可以是超類。

· 命令可以是簡單的函數調用。

所有這些概念都與命令是正交的。 我們命令某些狀態以某種方式改變。 命令是一種意圖(不是事實),並且會導致副作用(對資源)。 它指向特定的目的地(不廣播),並根據消費者域(共域)進行定義。

通常定義為VerbSomething:CreateOrder,ShipProduct,ComputePrice,GiveMoney。 稍後我們將看到定義為OrderCreated的事件的對稱性(同樣,事件不在CQRS範圍內,但它們運行得很好)。

命令以行為為中心,而不是以數據為中心。 它是關於更改內容的意圖,它並不映射到資源的格式(例如API中的DTO)。 它可以包含有助於處理意圖的數據,僅此而已。

我們還將討論基於任務的系統,而不是基於資源的系統。 寫入部分不接受新資源或現有資源的補丁:它們接受任務(也稱為命令)。 它可能已經被命名為TQRS :-)。

處理命令的流程始終相同:

· 如果是異步的,它將保存到消息傳遞基礎結構中,並向呼叫者返回"確定"(不能做更多的事情)

· 在處理它時-同步還是異步-根據其形式(API,消息,函數調用),調用正確的命令處理程序(函數)

· 該處理程序必須確定是否有可能對其進行處理:

· ➡如果必須採取行動(從數據庫或使用事件源),它將檢索當前狀態

· ➡它使用一些業務規則來知道是否可以在狀態上授予或拒絕Command

· 如果被授予,它將在狀態上應用命令(可以生成或不生成事件)

· ➡如果未授予,則返回錯誤(同步或異步)

· 保存新狀態

· 如果同步,它將返回OK到調用者-或最少的信息,例如ID,但不返回整個狀態(即寫模型)

· ➡如果異步,則將消息提交到消息傳遞基礎結構。

CQRS:是什麼? 為什麼? 怎麼做?

指揮部應遵循一勞永逸的原則:除"確定"外,我們不應期望指揮部有任何結果。 命令只是一個要求。 處理可以是同步的或異步的,我們不應該儘快獲得結果。 結果將稍後來自讀取的部分。

命令處理程序必須包含業務邏輯,以便能夠立即拒絕或不拒絕命令。 例如,要處理

<code>class OrderCommandHandler {
...
fun cancelOrder(cmd: CancelOrderCommand) {
val order = repo.fetch(cmd.orderId) // the only place where we can fetch the write model
if (order.inTransit()) error("Too late, we can't cancel an order in transit")
// ...
val newState = order.cancel()
repo.persist(newState) // OR persist only events, OR both atomically (Outbox Pattern)
}/<code>

關於命令還有很多要了解的內容,在這裡我將不做介紹,因為這不是本文的重點。

· 命令處理應該是冪等的

· 命令不能使用讀取端來獲取數據

· 僅應出於技術原因重試命令,而不得出於業務原因重播命令:結果可能會有所不同(例如:此後更改了增值稅)

· 可以將命令保存到命令總線中以進行進一步處理

命令是關於管理系統中的副作用的。

查詢/讀取/服務層:無副作用

如前所述,我們可以從相同的原始數據構建不同的讀取模型。

當我們需要查詢數據來回答不同的用例時,我們通常都會這樣做。 我們在某些地方(PostgreSQL,MongoDB,Cassandra,S3…)有我們的真理源,在這裡我們編寫/更新我們的東西,但是我們想使用專門的數據庫來不同地解釋它們:

· 緩存數據

· 快速文本搜索

· 使用圖形語言查詢

· 處理時間序列數據

· 在一組維度內預先計算聚合

· 使用實時數據庫(例如Google Cloud Firestore)向客戶端發送實時更新

非規範化/合併

通常的做法是將我們的數據從關係數據庫存儲到Elasticsearch中,以享受快速搜索,自動完成,令牌生成器,令牌過濾器,評分,排名,穩定的延遲,而原始數據庫通常無法獲得這些功能。

如果我們在廣告領域工作,並且想要有關廣告商和發佈者的實時儀表板,我們希望在時間序列數據庫中預先彙總數據,因為我們關心按時間段和亞秒級響應進行彙總。 因此,我們將數據存儲到Apache Druid或ClickHouse中,它們將在其中預聚合傳入的數據(內部使用CQRS方法,啊!)。

我們正在對原始模型進行非規範化處理以適合其他模型:

· 以不同的方式顯示數據(SQL到NoSQL)

· 到外部模型(公共),其他應用程序將使用(我們要隱藏實現)

· 不需要所有數據的"更輕便"模型

· 到"較重"的模型中,我們將添加許多我們不擁有的數據

最後一點很重要。 當我們更新讀取數據庫(由讀取服務提供)時,通常會合並數據。 我們可以聯接我們擁有的其他表或查詢其他應用程序以獲取我們不擁有/無法計算的數據(客戶名稱,產品說明,增值稅等)。 我們不想在查詢時執行此操作:我們想要可預測的穩定延遲,因此需要訪問O(1)。

這種合併/轉換是在我們需要更新數據庫時(發生更改時)完成的,而不是在查詢時進行的。 它的意思是:

· 我們可以更新永遠不會讀取的實體

· 我們可以更新將讀取一百萬次的實體

· 我喜歡的一個縮寫是WORM:多次寫入。 這是使用CQRS的最佳選擇。

· 我們必須有一種方法來檢測原始數據中的更改以觸發讀取數據庫上的更新。 這是事件有用的地方。

此外,Reads數據庫可能會令人恐懼,因為:

· 它可以包含多個域的數據

· 這意味著Reads Services應該由消費團隊而非生產團隊管理。 稍後再詳細介紹。

· 可能會很大

· 可以在某種程度上從零開始銷燬和重建它:假定其他服務具有冪等性。

· ➡如果沒有,重建可能會與以前有所不同。

· 我們可以升級我們的Writes服務,而不會同時關閉Reads Services(獨立的生命週期)。

CQRS:是什麼? 為什麼? 怎麼做?

傳統堆棧和新系統

在製作新產品時,通常希望將其集成到現有堆棧中(如果有的話)(例如,舊式堆棧)。

舊版不希望(或根本無法)適應我們的新模式或交流方式。 它只希望經典API重用它瞭解的現有格式來獲取數據。 這是專用閱讀服務("傳統閱讀服務")的理想用例。 它可能需要不同的服務集來構建遺留視圖,並需要不同的業務規則來匹配遺留模型。

傳統模型傾向於具有更多數據(由於採用整體方法),可以是跨域的(現代堆棧由於DDD方法而被拆分),並且可能繼承了更舊的堆棧約束。 大規模發佈是不可行的,因此我們必須確保服務的連續性和重疊性。

CQRS:是什麼? 為什麼? 怎麼做?

沒有副作用

具有多個數據庫強加了一個約束:"讀取服務"必須僅處理讀取,而不處理寫入。 它們不是真理的源頭。

它們和Writes數據庫之間沒有同步。 讀取數據庫不能在寫入數據庫上產生副作用。 由於非規範化,甚至不可能找回原始記錄(如果我們不保留PK之類的所有必要信息,因為我們不需要它們)

當我們查詢API(GET或某些GraphQL查詢)時:我們不希望對數據進行突變,這是相同的。

最終一致性

如前所述,使用Reads Service的用戶可以處理過時的數據。

當我們發送要處理的命令時,它的處理可以異步完成(無結果,即焚即忘),並且讀取數據庫的更新將是異步的(除非它是同一數據庫,但很少見?)。

例:

· T0:我們發送有關訂單1234(v1)的AddProductToOrderCommand。

· T1:Writes Service更新1234號訂單(v2)的項目。

· T2:通知Reads Service,並查詢產品服務以將其視圖與名稱和描述合併(進行中)。

· T3:還收到通知的外部電子郵件服務,通過查詢Reads Service(v1)來請求訂單的詳細信息。

· T4:產品服務響應讀取服務,該服務最終可以更新其讀取數據庫(v2)。

在T3,儘管先前的更新發生在T2之前(絕對時間),但Reads Service發送了Order(v1)的舊版本,因為它不是最新的。 我們不知道系統收斂需要多長時間。 這就是為什麼CQRS沒有"讀自己寫"的語義的原因,也是為什麼當外部系統與我們交談時我們必須始終考慮最終的一致性。

我們剛介紹的版本在該系統中必須具有。 這表示一個邏輯時間,一個單調的計數器,可以幫助我們瞭解流程的進展情況並制定決策。 它是Writes模型的一部分,並傳播到整個系統(如果有事件,則讀取模型…)。

版本不是我們在此類系統中可以找到的唯一人工製品。 我們還可以使用向量時鐘和CRDTs結構來處理事件排序因果關係。 這些通常是外部系統不想處理的事情,這是我們內部的混亂,因此需要一個獨特的讀取模型來消除噪聲。

有一些技術可以幫助外部系統獲取他們期望的版本,稍後再進行介紹。

如果我們希望在讀寫數據庫之間保持高度一致性,則需要對它們進行原子更新(由於所使用的異構系統,這種情況很少出現)。 即使這樣,如果這樣做,我們將失去依靠不同系統提供的可用性(CAP定理,我們可能是處於故障狀態的CP或AP,而不是同時處於一致狀態和可用狀態)。

通過將兩個系統清楚地分開,讀取服務獨立於寫入服務工作:不同的生命週期,不同的部署,不同的約束,不同的SLA。

一套不同的功能

讀取服務可以提供比寫入服務更多的功能。 雙方的參與者和用例都不相同。 這些功能可以是:

· 認證方式

· 快取

· 加密

· 限速

· 保留期有限

· 彈性縮放

· 複製和LB

· 分片

· SSE或Websocket用於實時更新(推送)

當我們具有用於讀寫的相同數據庫時,我們會將不同的關注點混在一起。

· 假設我們有一個電子商務網站,它使用一項閱讀服務向客戶顯示購物車和訂單。 通信量大(面向客戶),讀取服務被複制或分片,依靠緩存,進行網絡套接字。

· 另一方面,我們公司的員工擁有自己的管理應用程序,他們需要查看和更新訂單以準備訂單,打包訂單,更新交貨狀態等。他們不需要花哨的功能,流量 不重,過程很慢。 他們只是需要99.999%的正常運行時間,因為這是他們工作的一部分。

兩種Reads Service都有不同的SLA(一個可以"降級",而另一個不可以),功能和模型(admin Reads Service將提供更多內部詳細信息,對客戶Reads Service而言是隱藏的)。

如何建立讀服務

作為開發人員,我們喜歡測試新技術,新數據庫,新系統。 拆分我們的寫作方式和閱讀方式的奇妙想法為我們提供了實現這種愛所需要的東西。

創建新的Reads Service(具有其獨特的Reads Database)並在現有流程中進行並行測試,沒有比這容易的了。 如果我們處理事件,則只需訂閱事件並構建新模型。 如果我們沒有任何事件,那麼我們可能有一個來自數據庫的同步程序,我們可以更改或複製它以寫入新系統。

而且,我們可以有多種語言。 每個閱讀服務都可以用任何語言編寫,以更準確地回答讀者使用的用例和技術。

這是構建此類服務的技術列表:

雙重寫入

· 在代碼中,當我們使用(可愛的)ORM(或不使用)寫入數據庫X時,我們添加代碼或抽象以也寫入Y

· 我們必須使用交易或適當的交易來確保它們之間的原子性/一致性

數據庫同步

· 每N分鐘複製和轉換數據的一批

· 一種"幾乎實時"的後臺服務,每N分鐘輪詢一次更改

更改數據捕獲(CDC)

· 它將數據庫更改導出到事件流中

· 它依靠複製日誌充當數據庫發出的事件; 然後我們可以對它們進行一些流處理

· 這是Kafka Connect與Debezium(PostgreSQL,MySQL)的絕佳用法之一

大事記

· 我們訂閱現有事件(在原始狀態更改時發佈),並根據它們建立我們的讀取狀態(我們可以從過去重放它們)到任何數據庫

· 我們還可以使用Kafka Streams及其交互式查詢來創建分佈式自動分片數據庫。

隱藏Lambda架構

· 與批處理視圖合併的實時視圖

最好的方法是依靠pub/sub系統的事件。 從本質上講,它們已經使發佈者與消費者脫鉤。 這是CQRS通常與事件相關聯的另一個原因。

DDD在哪裡?

到目前為止,我們幾乎沒有談論過DDD,它經常與CQRS關聯(但不是強制性的)。 為什麼?

我們說我們有一些命令,這些命令的處理程序會檢查業務需求,如果可以,請更新"狀態"並將其保存到Writes數據庫中。

在DDD中,此狀態包含在我們所謂的聚合中。 它是一棵實體樹(具有實體根),該樹是獨立的,自治的,並且從外部和業務的角度來看始終保持一致。

CQRS:是什麼? 為什麼? 怎麼做?

集合必須確保其實體內的交易邊界:從外部看,我們永遠看不到集合"半形成","半變換","半有效"。 我們無法直接訪問子實體:所有內容都只能引用集合(以控制一致性)。 始終遵守業務不變性規則(例如:"如果尚未付款,則訂單不可轉讓"(好的,這要取決於我們的想法了!))。

在DDD中,實體具有特殊含義。 這是一個由其唯一標識而不是由其屬性(例如User(userId))定義的對象。 Color(r,g,b)不是實體,其屬性定義了它,它是一個Value Object,它是不可變的,因此可以共享。

聚合通常是OOP中具有所有與業務相關的代碼的不可變且具有副作用的免費類(與DTO相反,DTO通常是貧乏的,即:沒有業務邏輯)。 它不依賴於任何序列化框架(沒有糟糕的JSON註釋,沒有Avro生成的類..),它沒有任何註釋等。

這是企業應該理解的簡單,可重複使用的代碼,因為它使用的是他們的話:這是無所不在的語言。 使用相同的語言可以減少混淆和誤解,因為我們不會在"技術"和"業務"方面進行隱式(或寬鬆地)翻譯。

<code>// our aggregate with a private constructor
case class Order private(id: OrderId, items: List[LineItem], state: OrderStatus) {
def addItem(item: LineItem): Order = ???
def removeItem(item: LineItem): Order = ???
def canBeCancelled(): Boolean = ???
def startProcessing(): Order = ???
def addCoupon(): Order = ???
def cancel(): Order = ???
// Optional: events generated by the previous methods to be persisted separately
private val events: List[OrderEvents] = List()
}
object Order {
// the public constructor ("factory method")
def apply(): Order = Order(randomOrderId(), List(), OrderStatus.CREATED)
}/<code>


聚合由存儲庫(具有自己的持久化方法,已隱藏)檢索並持久化。 聚合不代表基礎數據庫或序列化格式的確切映射。 它是面向業務的,而不是面向技術的。

<code>class OrderRepository {
def fetch(id: OrderId): IO[Option[Order]] = ???
def save(order: Order): IO[Unit] = ???
}/<code>


回到命令,我們在處理聚合時真正發生了什麼:

· 代碼將其定向到命令處理程序以對其進行處理

· CH從存儲庫中獲取聚合(在命令中引用)

· Command命令可以與現有資源有關,也可以請求創建一個資源,例如:CreateOrderCommand

· CH會檢查一些業務規則,以查看命令是否符合業務規則

· 是的,它在聚合上調用必要的函數。

· these這些函數中的每一個都返回聚合的新狀態。

· 聚合的結束狀態將保留在Writes數據庫中(或將事件存儲到事件存儲中)。

在複雜的域中,DDD使我們能夠以商業方式構造命令和事件,每個理解該域的人都可以理解。 DDD有助於發現界限,限制責任並使系統的不同部分可維護,因為它們具有"合理性"。 這樣的系統並非僅憑開發人員的有機發展。

查找命令和聚合以及更多

查找集合及其命令(以及事件等)的一種流行做法是與開發人員,業務人員和專家進行事件風暴研討會。

通過查找我們的域必須處理的所有可能事件,我們可以將它們重新分組並形成包含相關事件的聚合。 由此,我們形成了具有凝聚力的子域(事物屬於一起),我們形成了實體,集合並同意了泛在語言。

CQRS:是什麼? 為什麼? 怎麼做?

另一種技術是域故事講述。 我們想到了一個用戶場景。 我們繪製它來顯示流程中的人工製品和人員(從何處,何人,往何處去,由誰,何處,誰做出反應,誰發送東西等進行驗證)。 有4個項目:演員,工作項目(文檔,交流),活動(動詞),(評論)。

CQRS:是什麼? 為什麼? 怎麼做?

如果您感到好奇,也可以檢查業務模型畫布。

大事記

使用CQRS廣播事件不是強制性的。 這是一種自然的工作方式。 更新聚合時會發出事件。 將它們廣播給"誰想聽"(其他人和自己)。 使系統具有反應性和獨立性非常有用。

這是事件列表:

<code>case class OrderCreated(orderId: OrderId, items: List[LineItem], customer: CustomerId)
case class OrderLineAdded(orderId: OrderId, item: LineItem)
case class OrderCancelled(orderId: OrderId, reason: Option[String])
.../<code>


現在,我們可以看看依賴於CQRS,聚合,事件的更完整的圖片:

CQRS:是什麼? 為什麼? 怎麼做?

Raison d'être

當我們發生事件時,他們通常會成為一等公民。 這意味著一切都圍繞它們構建:業務邏輯,依賴關係("在執行Z之前我們需要X和Y")。

作為命令,事件是一個通用術語,它不定義其實現,而是更多其行為和起源。

事件是過去的事實。 它本質上是一成不變的(我們不能改變過去的權利嗎?)。 與具有固定目的地的命令不同,它們可以包含其起源,是有目的的,而事件則相反:事件只是以一種"忘卻"的方式向世界廣播,是無目的的,並且可以是匿名的。 因為我們不知道誰在聽,所以我們無法對誰需要它進行硬編碼(我們僅依靠傳輸事件的技術總線):這減少了系統之間的耦合。 我們可以創建新的監聽系統,而無需發出系統來了解和關心。

CQRS:是什麼? 為什麼? 怎麼做?

這就是像Kafka這樣的發佈/訂閱系統有用的地方,因為它們是這種中間總線,可以保持事件,處理可以動態添加或刪除的分發給消費者的東西等等。

重要的是,事件在生產者域中定義。 在使用者域中定義了一個命令(或更通用的"消息")(我們說目標的語言,給它一個"命令")。

投影事件

事件可以由任何事物創建。 在執行CQRS和DDD時,它們主要由聚合創建。 更改彙總後,它將發出一個或多個與更改相對應的事件。 事件還可以由諸如調度程序之類的外部事物產生,這取決於時間:"在小時結束時發送此事件X"。

在這裡,我們可以選擇是否進行事件採購。

· 我們可以決定獨立於聚合事件的生成來更新其狀態:

<code>def increment(state: State): (State, Event) = {
val newState = state + 1
(newState, NumberWasIncremented(1))
}
val (newState, event) = increment(5)/<code>


· 我們可以決定通過"播放"當前狀態的事件來創建聚合的新狀態:

<code>def increment(): Event = NumberWasIncremented(1)
def apply(state: State, e: Event): State = e match {
case NumberWasIncremented(x) => state + x
}
val newState = apply(5, increment())/<code>

Event-Sourcing方式可能看起來過度設計。 我們還需要一個功能應用和模式匹配。 我們已經將事件創建的邏輯與事件應用程序分開了。 這樣做比較容易。 申請活動時,我們不必知道活動的來源。

一個事件是輕量級的,它僅與變化有關。 通過重播其生命中的所有事件,我們可以從頭開始創建聚合。 我們對發生的事情有更多的瞭解,我們不僅保留最新狀態,也不保留更改之間的快照。 我們自己保留更改。 狀態是一個彙總,是事件的投影。 我們經常具有一次重播所有內容的功能,有時稱為計算器,reduce,replayer:

<code>def applyAll(initialState: State, events: List[Event]): State = {
events.foldLeft(initialState) { case (s, e) => apply(s, e) }
}/<code>


進行事件採購的優勢之一就是我們還可以以不同的方式重放事件:這將形成不同的狀態。 當我們的模型經常變化時,這是完美的:事件沒有變化,但我們對它們的解釋有所不同。 當我們的模型因業務模式不斷變化而動態變化時,這很有用。 諸如Kafka之類的技術使消費者可以重播過去的事件(以重建其狀態/聚合)並趕上當前的事件(通過重置偏移量)。

CQRS:是什麼? 為什麼? 怎麼做?

與CQRS和DDD一起使用時,我們傾向於發出事件,因為它們代表了業務現實:業務中發生了某些事情! 過去不會改變,因為我們想添加新功能。 我們只能解釋過去。

我們可能有現有的用例來處理事件,但是將來我們可能會發現新的用例。 這就是為什麼我們不想丟棄任何東西,而我們希望存儲所有事件的原因:以後以未知的新方式(出於可追溯性和更多原因)處理它們。 事件是黃金。 不要失去他們。

事件並不意味著事件源

不是因為我們要處理事件,而是必須進行事件源。 如果我們不從事件中構建彙總:我們不是從事件中進行採購,因此不是事件中的採購。 QED。

(或沒有)事件來源的邏輯隱藏在AggregateRepository的實現中。 這只是獲取集合的一種特定方式。

當我們進行事件採購時,我們不是直接從數據庫中獲取最新狀態,而是要求事件存儲獲取有關特定aggregationId的所有事件(因此,對於事件存儲,Kafka不是一個好的選擇,但是Kafka Streams可能是 ,以及"互動式查詢"),我們會重播它們以建立最新狀態。 這也意味著我們必須將所有事件保留在歷史記錄中以進行事件搜索。

有一些策略可以每X個事件對狀態進行快照,從而避免在大量更新聚合時每次重播10000個事件。 但這是另一篇文章。

事件源是CQRS的正交概念。 我們可以一無所有。

智能端點和啞管

馬丁·福勒(Martin Fowler)介紹了"智能端點,啞管"的概念。 簡而言之:不要在傳輸機制中加入業務邏輯。 我們必須控制我們的業務邏輯。 我們不希望某些管道成為瓶頸(因為ESB成為大公司)。 運輸機制應該保持愚蠢:它在技術方面就在那裡,僅此而已。

這讓我想起了ReactJS,我們在這裡談論:

-愚蠢的組件:純淨,呈現一段UI,無邏輯(按鈕,日曆,菜單)-智能組件:處理數據,具有狀態和一些業務邏輯。

智能組件依賴於啞組件。 我們可以更改智能組件的實現方式,只需更改其使用的啞巴組件即可:我們不會更改其邏輯。

發佈或使用的系統很聰明:它們處理映射,域轉換等(使用反腐敗層,當不同的有界上下文相互交談時使用DDD概念)。 它具有更高的可擴展性,通常更易於維護,因為每個人都有自己的邏輯並提出要求。 自己動手

讓某些州居住在ESB中也很常見,因為我們想根據過去或某些動態情況來更改行為。 智能終端將創建並維護自己的私有狀態。 這不是"管道"的作用。

格式和語義

我們知道為什麼發出事件很好,但是應該遵循什麼格式? 有標準嗎? 我們應該存入他們什麼? 那元數據呢? 如果我不進行事件採購但想生成事件該怎麼辦?有什麼區別? 事件是一個簡單的詞,但具有巨大的語義。

沒有什麼是"官方的",但是我們可以依靠或從中獲得啟發:

· 這提供了與JSON-LD兼容的基於JSON的語法。

· 語境

· 一種活動

· 演員

· 一個東西

· 潛在目標

· CloudEvents更有用,更完整,它提供了不同語言,不同序列化格式(Json,Avro,Protobuf等)的規範和SDK,並且可能最終落入CNCF(雲原生計算基金會)。 它是面向無服務器的,但是事件是事件吧?

<code>{
// CloudEvents metadata data goes here
"eventType": "order.line.added",
"eventID": "C1234-1234-1234",
"eventTime": "2019-08-08T14:48:09.769Z",
"eventTypeVersion": "1.0",
"source": "/orders",
"extensions": {},
"contentType": "application/json",
"cloudEventsVersion": "0.1",
// Custom event data goes here
"data": {
"orderId": "abc",
"item": { "reference": "prod123", "quantity": 12 }
}
}/<code>


關於語義,Mathias Verraes(DDD專家)在列出並解釋事件可以擁有的不同語義方面做得很好。 以下是簡短摘錄:

· 摘要事件:將多個事件合併為一個(將多個項目添加到項目收藏夾中)

· 時間流逝事件:調度程序發出事件而不是命令DayHasPassed

· 隔離事件:將事件從一個域轉換為另一個域

· 節流事件:每個時間單位僅發出最新事件

· 變更檢測事件:僅當事件流中的值更改時,才產生新事件

· 胖事件:向事件添加冗餘信息以減少使用者的複雜性

· …

注意最後一個事件:"胖事件"。 我們將很快使用此策略將聚合的完整狀態嵌入事件本身。

內部VS外部事件

有關語義和DDD的更多信息:事件可以是內部的或外部的。

· 內部事件是由我們的域(受限制的上下文)產生和使用的,它是私有的。

· 外部事件將由我們無法控制的其他域(公開)使用。

· ➡我們自己的應用程序將使用來自其他方的外部事件。

· ➡外部事件具有使人們知道如何使用它們的架構。 它被共享到元數據服務或某些註冊表(或Excel ..)中。

兩種類型都有不同的目標,因此約束和用法也不同。

內部事件傾向於規範化,並且主要包含引用(ID)。 它與內部模型一起使用。 我們不想添加無用的信息或重複我們系統其他部分已經擁有的信息。 參考文獻越多,對未來的發展越好。 我們將能夠重構模型而無需更改事件。

與外部模型一起使用的外部事件是供公眾使用的。 它們經常被非規範化(包含名稱,地址,沒有版本,更簡單),以避免消費者理解依賴關係。

將內部事件暴露給外部會導致複雜的體系結構,困難的開發,缺乏可見性並模糊域邊界。 如果我們這樣做,卻沒有注意到,就將內部事件轉換為外部事件。 因此,這也將我們的內部模型轉換為外部模型。 我們正在與其他服務之間引入強大的耦合。

這就是本文的重點:為什麼以及如何保護我們免受此侵害。

從內部到外部

一種解決方案是進行一些流處理,以轉換和合並事件(通過調用其他服務)以使事件成為外部事件。 既包含胖事件又包含隔離事件層。 這是一個從私有(有點複雜,看到明顯的區別)到公共轉換的示例:

CQRS:是什麼? 為什麼? 怎麼做?

事件進行狀態轉移

有時,出於性能和簡化的目的,我們希望將聚合的狀態與更新時生成的自身事件一起包括在內。

確切知道要放入事件中的內容可能很困難。 如果我們忘記了某些東西,或者對於消費者(內部或外部)來說不夠用,我們可以引入網絡/請求開銷,並使他們的工作複雜化。

如果事件是普通事件/裸體事件(只是發生了什麼變化,沒有最新狀態),則他們可能需要更多信息,並且需要根據過去的所有事件建立自己的狀態(即:充當讀取服務;它可以是KTable (如果使用Kafka Streams)或請求現有的Reads Service進行工作。

這給消費者帶來了一些問題:

· 他們不想做有狀態的並複製一個已經存在的狀態

· 如果他們只關心一種類型的事件(例如OrderShipped發送電子郵件),則他們不希望監聽所有事件來建立狀態(OrderCreated,OrderItem Transactions,OrderShipped等)來構建狀態,他們需要 訂單,但OrderShipped只能具有orderId)

· 他們想查詢一些讀取服務以獲取缺少的內容

· ➡這會引入最終一致性:他們可能無法接受。

CQRS:是什麼? 為什麼? 怎麼做?

因此,一種解決方案是將狀態嵌入事件本身。 這是事件進行的狀態轉移。

想象3個簡單事件:

<code>{ type: "OrderCreated", orderId: 1337, customer: "john" }
{ type: "OrderLineAdded", orderId: 1337, item: { ref: "prodA", qty: 2 } }
{ type: "OrderShipped", orderId: 1337 }/<code>


如果我們的電子郵件服務僅偵聽OrderShipped事件,則其中沒有商品,因此需要查詢可能尚未處理OrderLineAdded(滯後,網絡問題)的讀取服務。 發送的電子郵件中不會包含任何內容,或更糟的是,閱讀服務甚至可能不知道orderId 1337。

一種解決方案是在事件上添加一個版本(每個事件都會增加其表示的聚合的版本),然後我們可以在Reads Service中請求特定版本:

<code>{ type: "OrderCreated", orderId: 1337, v: 1, customer: "john" }
{ type: "OrderLineAdded", orderId: 1337, v: 2, item: { ref: "prodA", qty: 2 } }
{ type: "OrderShipped", orderId: 1337, v: 3 }
GET /reads/orders/1337?v=3/<code>


然後,讀取服務可能會阻塞,直到它獲得預期的版本或超時(如果從未發生過)。

避免依賴關係,網絡調用和阻止請求的另一種解決方案是使用此事件承載狀態轉移策略(我們將狀態放入事件中):

<code>{ type: "OrderCreated", orderId: 1337, customer: "john",
state: { id: 1337, customer: "john", items: [], state: "CREATED" }
}
{ type: "OrderLineAdded", orderId: 1337, item: { ref: "prodA", qty: 2 },
state: { id: 1337, customer: "john", items: { ref: "prodA", qty: 2 } }, state: "CREATED" }
{ type: "OrderShipped", orderId: 1337,
state: { id: 1337, customer: "john", items: { ref: "prodA", qty: 2 }, state: "SHIPPED" } }
}/<code>


CQRS:是什麼? 為什麼? 怎麼做?

對於消費者而言,這是一項毫無頭腦的工作。 他們只需要閱讀"狀態"字段,該字段就始終使用相同的模型(這裡是"訂單寫入模型")來完成工作。

開銷現在在消息傳遞基礎結構上(消息更大,因為事件現在包含整個狀態)。

保護寫模型

上面的示例僅適用於內部使用,不適用於外部服務(例如此處的市場營銷服務),因為我們不想公開我們的寫入模型。

私人_wm:WriteModel;

Write模型是Writes端的模型。 它是我們域負責的彙總模型。

如果我們碰巧像以前那樣使用事件狀態策略來公開它,則第三方服務可以將其邏輯基於狀態,這就是我們的Write模型。 因此,在維持消費者兼容性(最大程度上)的同時,我們將無法發展自己的模型。

如果我們正在執行CQRS,則不需要這樣做。 如果您還記得的話,當我們有不斷變化的業務需求時,CQRS會很有用,因此變化會發生很多。 我們不想通知消費者我們已經更改了格式,不贊成使用當前主題,進行了重大更改等等。 我們想隔離我們的內部模式和我們在事件中公開的內容。

如果我們需要公開它怎麼辦?

商界人士很少會考慮最終一致性,這對他們來說毫無意義("這是技術性的,我不在乎")。 他們希望使用反應式架構,微服務,可在全球範圍內擴展,具有可用的彈性一致服務(使用最新狀態)。 不幸的是,在這些約束條件下,我們通常最終會遇到一個最終一致性系統(AP)。

從技術上講,我們可以快速考慮在Write Service上添加/ get路由並完成此操作。 "為什麼要麻煩讀服務使數據最終保持一致,卻沒有任何意義"。 它將起作用,但是由於本文列舉的所有原因以及CQRS體系結構中的反模式,這將是一個嚴重的錯誤。 我們決定使用CQRS,我們必須尊重它的工作方式。

· 除了寫模型的系統之外,沒人應該知道寫模型。

· 由於可伸縮性原因,我們選擇CQRS

· 如果我們向讀者展示我們的Write Model,那麼它就不是任何人的Write Model,而是Write / Read模型,沒有區別了,我們無法以不同的方式縮放它們。

· 我們無法公開發布包含寫入模型(事件承載狀態)的事件

如果另一個服務必須絕對擁有我們的最新書面數據,我們應該問自己:

· 為什麼,它的業務案例是什麼?

· 為什麼它不能處理最終一致性? 有事實和數字支持的真正原因嗎?

· 為什麼我們的事件未涵蓋其用例?

· 我們應該發送命令而不是被動地發送命令嗎?

· 我們是否有錯誤的域分隔? (應該在使用Commands而不是在外面)

ACL搶救

解決我們的Write模型的私密性(而不是解決可伸縮性問題)的最後一種解決方案是在Writes Service中編寫一個ACL(反腐敗層)。 我們可以將寫入模型轉換為讀取模型。

· 即時運行,但將我們的Writes Service與讀取結合起來:

CQRS:是什麼? 為什麼? 怎麼做?

· 或將其保存在數據庫中,以避免即時轉換(仍然耦合):

CQRS:是什麼? 為什麼? 怎麼做?

在後一種情況下,必須在同一事務中以原子方式保存寫模型(或事件存儲中的事件)和此讀模型(在發生故障時不會不一致)。 另外,我們也無法調用外部API:如果它們失敗了,我們將無法完成轉換,因此不會提交事務,因此所有命令都將失敗。 SLA =最小值(取決於SLA)。 這將破壞CQRS的目的。




結論

對企業

當我們習慣使用CQRS時,它是推理整個系統的好模式。 肯定存在陡峭的學習曲線,以瞭解分離和約束。 而不是全局考慮,我們必須本地考慮(命令,事件)。 很難預測總體行為。 這些功能分為幾部分,我們可以有不同的參與者和不同的時間表來完成它們(更多情況下是失敗的)。 最終的一致性還可以增加我們以前採用整體式設計所沒有的問題。 我們對可擴展性和靈活性(CQRS)進行控制貿易。

通常,CQRS會事先結合良好的DDD思想,以避免混淆不同的域(或我們不應該使用的反向拆分域),或者創建畢竟不適合業務的奇怪聚合。 寫和讀模型之間的區別有很大幫助。

我經常問:"為什麼在寫入模型中有此屬性,我們對此有業務規則嗎?"。 如果答案是"否,但這是針對下游內部服務的。"那麼我知道它不屬於這裡。 Write Model上的每個屬性都應在至少一個CommandHandler中具有一條業務規則。

關於Read模型,完全是YOLO,FFA,因為這無關緊要,沒有影響。 它的目的是根據前端或消費者的需求進行大量改進。 我只想保護寫模型。

CQRS還可以與事件一起使用。 他們是關於生意的。 它們提供了可追溯性,並在其他方面解耦了系統。 Event Sourcing方法本質上非常有用且非常安全,但是它附帶了自己的約束(性能,快照,存儲..)。 處理事件意味著我們擁有一個事件驅動系統。 這樣,一項功能可能需要通過消息總線進行20跳才能完成(一系列邊界事件處理程序/命令處理程序)。 如果沒有良好的性能調整和良好的系統可觀察性(日誌/指標/跟蹤,以及對事物之間如何關聯的良好理解),它的性能可能會很差。 即使可以擴展,也不會有效。

..高級內容!

此外,必須注意一些技術上的預防措施,例如冪等和一次處理(由於瞬態網絡問題,我們不想對一個事件進行兩次處理)。 給定聚合的事件必須按順序處理(您不希望在OrderCreated之前添加OrderItemAdded)。 事件還意味著我們將討論事件流處理。

此外,當在處理事件的中間發生錯誤(反序列化問題,意外值,網絡錯誤)時,我們的處理可能會停留在處理大型功能的過程中(拆分為多個事件):我們該怎麼辦? 結果是什麼? 我們如何發出錯誤,誰來消耗它? 我們如何聯繫用戶以告訴他(命令已經消失了一段時間)? 我們如何確保狀態一致? 我們甚至沒有談論Sagas和錯誤管理,因為這是一個完整的故事。

我們還必須考慮對聚合和事件進行版本控制,必要時管理因果關係(向量時鐘或矩陣時鐘)。

所有這些問題不是由CQRS本身引起的,而是因為我們正在使用分佈式系統。 如果您的公司希望從整體架構過渡到CQRS架構,則需要考慮所有方面:這是您需要堅強的肩膀(..要哭泣)的地方。

這是我在自己空間上的原始文章的交叉發佈:CQRS:為什麼? 以及所有需要考慮的事情。 隨意看看並查看我的其他文章。

謝謝閱讀!


(本文翻譯自Stéphane Derosiaux的文章《CQRS: What? Why? How?》,參考:https://medium.com/@sderosiaux/cqrs-what-why-how-945543482313)


分享到:


相關文章: