背景
在分佈式系統中為什麼要使用開關?例如雙十一電商平臺需要做促銷活動,此時訂單量暴增,在下單環節,可能需要調用A、B、C三個接口來完成,但是其實A和B是必須的,C只是附加的功能(例如在下單的時候獲取用戶常用地址,或者發個推送消息之類的),可有可無,在平時系統沒有壓力,在容量充足的情況下,調用下沒問題,但是在特殊節日的大促環節,系統已經滿負荷了,這時候其實完全可以不去調用C接口,怎麼實現這個呢?改代碼重新發布?no,這樣不太敏捷,於是開關誕生了,開發人員只要簡單執行一下命令或者點一下頁面,就可以關掉對於C接口的調用,在請求高峰過去之後,再把開關恢復回去即可。類似的使用場景還有A/B Test、灰度發佈和數據的不停服切換等。
整體設計
高可用分佈式開關設計?
一、 需求分析
開關服務的核心需求主要有以下幾點:
1. 支持開關的分佈式化管理
• 開關統一管理,發佈、更新等操作只需在中心服務器上進行,一次操作,處處可用。
• 開關更新自動化,當開關發生變更,訂閱該開關的客戶端會自動發現變更,進而同步新值。
2. 具有容災機制,保證服務的高可用
• 服務集群的高可用,當集群中的一臺server不可用了,client發現後可以自動切換到其他server上進行訪問。
• 客戶端具備容災機制,當開關中心完全不可用,可以在客戶端對開關進行操作。
二、方案設計
針對此需求分析,我們抽象成了兩大塊分別是配置中心和SDK,整體架構如下:
各個系統模塊介紹:
- 配置中心:
- 配置中心在此處提供開關的統一管理
- zookeeper:
- 分佈式開關統一註冊中心,主要提供變更通知服務,客戶端通過訂閱開關節點,實時獲取開關變更信息,從而同步更新到本地緩存
- SDK:
- client端獲取開關,以及監聽開關的變化從而更新本地緩存。
配置中心設計
配置中心在此處的作用有如下幾點:
- 提供開關統一管理,包含開關發佈、更新、查詢等基本服務
- 操作開關的日誌,比如誰在某一時刻將開關從關閉狀態修改為打開狀態。
- 系統權限控制,只有擁有相關權限的人才能操作開關。
配置中心整體設計如下:
其中db主要保存了開關信息,以及日誌信息。
zk主要用來創建開關和監聽開關的變化。
配置中心的設計看似簡單,其實也需要注意以下幾點:
1. 開關的命名重複問題
在設計系統的時候,開關是否要共享給所有系統,還是其中某一個系統,如果共享給所有系統,那麼有權限的人對開關命名的時候難免會重複,針對此,我們設計了appid的概念,一個系統對應一個appid,在一個appid內開關名稱不允許有重複,只有該appid的owner才有權限對該appid的下的開關做操作。
2. 開關的分類
我們可以將開關分為三大類,分別是功能開關、降級開關、灰度開關:
- 功能開關
- 針對某一個功能是否打開,例如在訂單下單的時候需要獲取下單用戶的歷史換綁手機號信息,但是由於B系統只是提供了接口定義,實際業務還未開發完成,A系統可以先提前開發並上線,待B系統上線之後,A系統將該功能開關打開。
- 降級開關
- 典型的應用場景是電商做促銷的時候,比如雙十一電商做促銷,用戶下單的時候獲取用戶歷史常用地址,因為雙十一系統已經達到負荷,為了系統性能,將該業務邏輯降級。或者A系統調用B系統,由於B系統整體宕機,為了不影響A系統繼續運行,可以手動將B系統降級等。
- 灰度開關
- 針對某一功能做灰度,例如我們需要針對刷單用戶在下單過程中做攔截,為此我們在下單階段做了一套黑白名單處理,但是我們也無法知曉該套黑白名單的正確率多少,為了避免造成誤攔,我們需要對該功能做灰度採樣,以便及時調整我們的黑白名單邏輯。通常的灰度策略為 1% 灰度,10%灰度,30%灰度,50%。。。
3. zk中開關設計
zk中的設計結構為路徑格式,我們將/appid 設置為根路徑,例如appid為order的根路徑為/order,則在該appid下設置的user_open開關的路徑則為:
/order/user_open ,所以我們設計的路徑公式如下:
/appid/switch
部分頁面效果如下:
SDK設計
SDK主要是以jar的形式嵌入在client端的,它的作用主要是在client端獲取開關,以及監聽開關的變化從而更新本地緩存。SDK整體設計如下所示:
我們使用Curator來操作zk,因為它相比原生的zk 客戶端確實好用不少,這裡不做過多展開,為了提高系統性能我們將開關信息緩存在本地內存,這樣做的目的是提升系統的性能,所以獲取開關的流程圖如下:
1. 監聽開關變化
如果開關發生改變,我們需要將開關變化的信息載入到本地,監聽代碼如下:
1 private void nodeListener(final String key) {
2 final NodeCache nodeCache = new NodeCache(client, basePath + "/" + key);
3 try {
4 nodeCache.start();
5 nodeCache.getListenable().addListener(new NodeCacheListener() {
6
7 public void nodeChanged() throws Exception {
8 String msg = new String(nodeCache.getCurrentData().getData());
9 System.out.println("監聽事件觸發");
10 System.out.println("重新獲得節點內容為:" + msg);
11 //加入到本地緩存
12 dataMap.put(key, msg);
13 }
14 });
15 } catch (Exception e) {
16 e.printStackTrace();
17 }
18
19
20 }
2. 降級開關
降級開關和功能開關在底層實現上是一樣的,就是從zk獲取value為true的時候,是打開狀態的,代碼如下:
1 /**
2 * 獲取開關,默認是打開的
3 *
4 * @param switchKey
5 * @return
6 */
7 public boolean getSwitch(String switchKey) {
8 try {
9 String dataMsg = getDataMsg(switchKey);
10 if (isEmpty(dataMsg)) {
11 return true;
12 }
13
14 return Boolean.parseBoolean(dataMsg);
15 } catch (Exception e) {
16 e.printStackTrace();
17 }
18
19 return true;
20 }
其中getDataMsg方法封裝了本地緩存的調用,具體代碼如下:
1 private String getDataMsg(String key) {
2 byte[] data;
3 try {
4 //先從本地緩存中找
5 String msg = dataMap.get(key);
6 if (!isEmpty(msg)) {
7 return msg;
8 }
9
10 //本地緩存沒有,則從zk中去查找
11 data = dataBuilder.forPath(basePath + "/" + key);
12 if (data != null) {
13 String dataMsg = new String(data);
14 //重新塞入緩存
15 dataMap.put(key, dataMsg);
16 nodeListener(key);
17 return dataMsg;
18 }
19 } catch (Exception e) {
20 e.printStackTrace();
21 }
22
23 return null;
24 }
降級開關和功能開關的代碼完成後,接下來是一段測試demo,測試開關是否可以正常使用,代碼如下:
1public class SwitchDemo {
2
3 public static void main(String[] args)throws Exception {
4
5 //user_open 開關 打開
6 if(SwitchHandler.config().getSwitch("user_open")){
7 System.out.println("exe user open switch 1");
8 }
9
10 Thread.sleep(10000);
11
12
13 //user_open 開關 關閉
14 if (SwitchHandler.config().getSwitch("user_open")){
15 System.out.println("exe user open switch 2");
16 }else{
17
18 System.out.println("exe user not open");
19 }
20
21
22 Thread.sleep(1000000000);
23 }
24}
接下來在本地啟動一個zk單機服務,進入到zk的安裝目錄 ,啟動命令如下:
1./zkServer.sh start
啟動一個客戶端,創建一個開關user_open value為true,假設我這個服務的appid叫sky,那麼我應該先創建/sky 這個路徑,接著創建,/sky/user_open這個路徑,命令如下:
1create /sky 1
2create /sky/user_open true
接下來我們啟動SwitchDemo測試類,在代碼走到第一次sleep階段,我們立馬將user_open 這個值修改為false,修改zk的命令為:
1set /sky/user_open false
最終打印結果如下:
從結果可以看出,第一次執行的時候,由於user_open的value為true,所以
日誌 exe user open switch 1 打印出來了,其次監聽的日誌也打印出來了,當代碼執行到第十行的時候,我們將user_open的value修改為false,此時監聽的日誌監聽到開關發生了變化,並將本地內存的開關地址修改了false,最後執行第14行代碼的時候,由於開關是關閉狀態,所以走到了第18行的邏輯。
3. 灰度開關設計
灰度開關主要針對某一個功能來進行灰度,那麼就需要有一個灰度策略的概念,比如設置的是灰度10%,此時有1000個請求進來,應該只有100個左右的請求是命中這段邏輯,在微服務架構中,服務與服務之間的調用都會透傳一個requestId(請求id),因此將requestId 當做灰度的主體是最適合不過了,簡單的灰度算法可以將requestId 進行hash 取模100 然後跟設置的灰度值進行比較即可。代碼如下:
1 /**
2 * 灰度開關
3 *
4 * @param switchKey
5 * @param strategyId 灰度策略,可以傳入requestId,手機號來進行灰度
6 * @return
7 */
8 public boolean getGrayscaleSwitch(String switchKey, String strategyId) {
9
10 int value = getInt(switchKey, 100);
11
12 int hash = strategyId.hashCode();
13
14 return Math.abs(hash) % 100 <= value;
15 }
灰度不是一個精確值,請求量越大灰度的越精確,因此接下來我們的測試demo,會模擬10000條請求,如果命中了大約1000條左右,那麼說明我們的灰度算法沒啥問題,我們將當前時間戳當做requestId(當然實際不要這麼做,應該用微服務之間透傳的requestId,這裡只是為了測試)
首先設置一個灰度開關user_gary,value為10(代表灰度10%,最大為100)
1create /sky/user_gary 10
測試代碼如下:
1 //灰度10%開關
2 int grayCount=0;//
3 for (int i=0;i<10000;i++){
4 String requestId = System.currentTimeMillis()+"-"+i;
5 if (SwitchHandler.config().getGrayscaleSwitch("user_gary",requestId)){
6 grayCount++;
7 }
8 }
9
10 System.out.println("進入灰度開關的次數為:"+grayCount);
運行結果:
我們看到10000次請求,命中了1176次,大約灰度10%,說明灰度起作用了。
SDK完整代碼
pom依賴:
1 <dependency>
2 <groupid>org.apache.curator/<groupid>
3 <artifactid>curator-recipes/<artifactid>
4 <version>4.0.1/<version>
5
SDK完整代碼:
1package com.wuzy.myswitch;
2
3import org.apache.curator.RetryPolicy;
4import org.apache.curator.framework.CuratorFramework;
5import org.apache.curator.framework.CuratorFrameworkFactory;
6import org.apache.curator.framework.api.GetDataBuilder;
7import org.apache.curator.framework.recipes.cache.NodeCache;
8import org.apache.curator.framework.recipes.cache.NodeCacheListener;
9import org.apache.curator.retry.ExponentialBackoffRetry;
10
11import java.util.Map;
12import java.util.concurrent.ConcurrentHashMap;
13
14public class SwitchHandler {
15
16 private String basePath = "/sky";
17 private GetDataBuilder dataBuilder;
18
19 private Map<string> dataMap = new ConcurrentHashMap<string>();/<string>/<string>
20
21 private SwitchHandler() {
22 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
23 client = CuratorFrameworkFactory.builder()
24 .connectString("127.0.0.1:2181")
25 .retryPolicy(retryPolicy)
26 .sessionTimeoutMs(6000)
27 .connectionTimeoutMs(3000)
28 .build();
29 client.start();
30
31 dataBuilder = client.getData();
32
33 //TODO 後期這裡最好將數據庫中的開關加載出來,從zk中找出放入本地緩存中,加快查詢速度
34 }
35
36 private static class SwitchHandlerHolder {
37 private static final SwitchHandler switchHandler = new SwitchHandler();
38 }
39
40 public static SwitchHandler config() {
41 return SwitchHandlerHolder.switchHandler;
42 }
43
44 private CuratorFramework client;
45
46
47 /**
48 * 獲取開關,默認是打開的
49 *
50 * @param switchKey
51 * @return
52 */
53 public boolean getSwitch(String switchKey) {
54 try {
55 String dataMsg = getDataMsg(switchKey);
56 if (isEmpty(dataMsg)) {
57 return true;
58 }
59
60 return Boolean.parseBoolean(dataMsg);
61 } catch (Exception e) {
62 e.printStackTrace();
63 }
64
65 return true;
66 }
67
68 /**
69 * 灰度開關
70 *
71 * @param switchKey
72 * @param strategyId 灰度策略,可以傳入requestId,手機號來進行灰度
73 * @return
74 */
75 public boolean getGrayscaleSwitch(String switchKey, String strategyId) {
76
77 int value = getInt(switchKey, 100);
78
79 int hash = strategyId.hashCode();
80
81 return Math.abs(hash) % 100 <= value;
82 }
83
84
85 public int getInt(String key, int defaultValue) {
86 try {
87 String dataMsg = getDataMsg(key);
88 if (isEmpty(dataMsg)) {
89 return defaultValue;
90 }
91 return Integer.parseInt(dataMsg);
92
93 } catch (Exception e) {
94 e.printStackTrace();
95 }
96
97 return defaultValue;
98 }
99
100
101 private boolean isEmpty(String msg) {
102 return msg == null || msg.trim().equals("");
103 }
104
105 private String getDataMsg(String key) {
106 byte[] data;
107 try {
108 //先從本地緩存中找
109 String msg = dataMap.get(key);
110 if (!isEmpty(msg)) {
111 return msg;
112 }
113
114 //本地緩存沒有,則從zk中去查找
115 data = dataBuilder.forPath(basePath + "/" + key);
116 if (data != null) {
117 String dataMsg = new String(data);
118 //重新塞入緩存
119 dataMap.put(key, dataMsg);
120 nodeListener(key);
121 return dataMsg;
122 }
123 } catch (Exception e) {
124 e.printStackTrace();
125 }
126
127 return null;
128 }
129
130
131 private void nodeListener(final String key) {
132 final NodeCache nodeCache = new NodeCache(client, basePath + "/" + key);
133 try {
134 nodeCache.start();
135 nodeCache.getListenable().addListener(new NodeCacheListener() {
136
137 public void nodeChanged() throws Exception {
138 String msg = new String(nodeCache.getCurrentData().getData());
139 System.out.println("監聽事件觸發");
140 System.out.println("重新獲得節點內容為:" + msg);
141 //加入到本地緩存
142 dataMap.put(key, msg);
143 }
144 });
145 } catch (Exception e) {
146 e.printStackTrace();
147 }
148
149
150 }
151}
閱讀更多 科技伍小黑 的文章