如何設計分佈式系統開關

背景

在分佈式系統中為什麼要使用開關?例如雙十一電商平臺需要做促銷活動,此時訂單量暴增,在下單環節,可能需要調用A、B、C三個接口來完成,但是其實A和B是必須的,C只是附加的功能(例如在下單的時候獲取用戶常用地址,或者發個推送消息之類的),可有可無,在平時系統沒有壓力,在容量充足的情況下,調用下沒問題,但是在特殊節日的大促環節,系統已經滿負荷了,這時候其實完全可以不去調用C接口,怎麼實現這個呢?改代碼重新發布?no,這樣不太敏捷,於是開關誕生了,開發人員只要簡單執行一下命令或者點一下頁面,就可以關掉對於C接口的調用,在請求高峰過去之後,再把開關恢復回去即可。類似的使用場景還有A/B Test、灰度發佈和數據的不停服切換等。

整體設計

高可用分佈式開關設計?

一、 需求分析

開關服務的核心需求主要有以下幾點:

 1. 支持開關的分佈式化管理

• 開關統一管理,發佈、更新等操作只需在中心服務器上進行,一次操作,處處可用。

• 開關更新自動化,當開關發生變更,訂閱該開關的客戶端會自動發現變更,進而同步新值。

 2. 具有容災機制,保證服務的高可用

• 服務集群的高可用,當集群中的一臺server不可用了,client發現後可以自動切換到其他server上進行訪問。

• 客戶端具備容災機制,當開關中心完全不可用,可以在客戶端對開關進行操作。

二、方案設計

針對此需求分析,我們抽象成了兩大塊分別是配置中心和SDK,整體架構如下:

如何設計分佈式系統開關

各個系統模塊介紹:

  1. 配置中心:
  2. 配置中心在此處提供開關的統一管理
  3. zookeeper:
  4. 分佈式開關統一註冊中心,主要提供變更通知服務,客戶端通過訂閱開關節點,實時獲取開關變更信息,從而同步更新到本地緩存
  5. SDK:
  6. client端獲取開關,以及監聽開關的變化從而更新本地緩存。

配置中心設計

配置中心在此處的作用有如下幾點:

  1. 提供開關統一管理,包含開關發佈、更新、查詢等基本服務
  2. 操作開關的日誌,比如誰在某一時刻將開關從關閉狀態修改為打開狀態。
  3. 系統權限控制,只有擁有相關權限的人才能操作開關。

配置中心整體設計如下:

如何設計分佈式系統開關

其中db主要保存了開關信息,以及日誌信息。

zk主要用來創建開關和監聽開關的變化。

配置中心的設計看似簡單,其實也需要注意以下幾點:

1. 開關的命名重複問題

在設計系統的時候,開關是否要共享給所有系統,還是其中某一個系統,如果共享給所有系統,那麼有權限的人對開關命名的時候難免會重複,針對此,我們設計了appid的概念,一個系統對應一個appid,在一個appid內開關名稱不允許有重複,只有該appid的owner才有權限對該appid的下的開關做操作。

2. 開關的分類

我們可以將開關分為三大類,分別是功能開關、降級開關、灰度開關:

  1. 功能開關
  2. 針對某一個功能是否打開,例如在訂單下單的時候需要獲取下單用戶的歷史換綁手機號信息,但是由於B系統只是提供了接口定義,實際業務還未開發完成,A系統可以先提前開發並上線,待B系統上線之後,A系統將該功能開關打開。
  3. 降級開關
  4. 典型的應用場景是電商做促銷的時候,比如雙十一電商做促銷,用戶下單的時候獲取用戶歷史常用地址,因為雙十一系統已經達到負荷,為了系統性能,將該業務邏輯降級。或者A系統調用B系統,由於B系統整體宕機,為了不影響A系統繼續運行,可以手動將B系統降級等。
  5. 灰度開關
  6. 針對某一功能做灰度,例如我們需要針對刷單用戶在下單過程中做攔截,為此我們在下單階段做了一套黑白名單處理,但是我們也無法知曉該套黑白名單的正確率多少,為了避免造成誤攔,我們需要對該功能做灰度採樣,以便及時調整我們的黑白名單邏輯。通常的灰度策略為 1% 灰度,10%灰度,30%灰度,50%。。。

3. zk中開關設計

zk中的設計結構為路徑格式,我們將/appid 設置為根路徑,例如appid為order的根路徑為/order,則在該appid下設置的user_open開關的路徑則為:

/order/user_open ,所以我們設計的路徑公式如下:

/appid/switch

部分頁面效果如下:

如何設計分佈式系統開關

SDK設計

SDK主要是以jar的形式嵌入在client端的,它的作用主要是在client端獲取開關,以及監聽開關的變化從而更新本地緩存。SDK整體設計如下所示:

如何設計分佈式系統開關

我們使用Curator來操作zk,因為它相比原生的zk 客戶端確實好用不少,這裡不做過多展開,為了提高系統性能我們將開關信息緩存在本地內存,這樣做的目的是提升系統的性能,所以獲取開關的流程圖如下:

如何設計分佈式系統開關

1. 監聽開關變化

如果開關發生改變,我們需要將開關變化的信息載入到本地,監聽代碼如下:

1 private void nodeListener(final String key) {

2 final NodeCache nodeCache = new NodeCache(client, basePath + "/" + key);

3 try {

4 nodeCache.start();

5 nodeCache.getListenable().addListener(new NodeCacheListener() {

6

7 public void nodeChanged() throws Exception {

8 String msg = new String(nodeCache.getCurrentData().getData());

9 System.out.println("監聽事件觸發");

10 System.out.println("重新獲得節點內容為:" + msg);

11 //加入到本地緩存

12 dataMap.put(key, msg);

13 }

14 });

15 } catch (Exception e) {

16 e.printStackTrace();

17 }

18

19

20 }

2. 降級開關

降級開關和功能開關在底層實現上是一樣的,就是從zk獲取value為true的時候,是打開狀態的,代碼如下:

1 /**

2 * 獲取開關,默認是打開的

3 *

4 * @param switchKey

5 * @return

6 */

7 public boolean getSwitch(String switchKey) {

8 try {

9 String dataMsg = getDataMsg(switchKey);

10 if (isEmpty(dataMsg)) {

11 return true;

12 }

13

14 return Boolean.parseBoolean(dataMsg);

15 } catch (Exception e) {

16 e.printStackTrace();

17 }

18

19 return true;

20 }

其中getDataMsg方法封裝了本地緩存的調用,具體代碼如下:

1 private String getDataMsg(String key) {

2 byte[] data;

3 try {

4 //先從本地緩存中找

5 String msg = dataMap.get(key);

6 if (!isEmpty(msg)) {

7 return msg;

8 }

9

10 //本地緩存沒有,則從zk中去查找

11 data = dataBuilder.forPath(basePath + "/" + key);

12 if (data != null) {

13 String dataMsg = new String(data);

14 //重新塞入緩存

15 dataMap.put(key, dataMsg);

16 nodeListener(key);

17 return dataMsg;

18 }

19 } catch (Exception e) {

20 e.printStackTrace();

21 }

22

23 return null;

24 }

降級開關和功能開關的代碼完成後,接下來是一段測試demo,測試開關是否可以正常使用,代碼如下:

1public class SwitchDemo {

2

3 public static void main(String[] args)throws Exception {

4

5 //user_open 開關 打開

6 if(SwitchHandler.config().getSwitch("user_open")){

7 System.out.println("exe user open switch 1");

8 }

9

10 Thread.sleep(10000);

11

12

13 //user_open 開關 關閉

14 if (SwitchHandler.config().getSwitch("user_open")){

15 System.out.println("exe user open switch 2");

16 }else{

17

18 System.out.println("exe user not open");

19 }

20

21

22 Thread.sleep(1000000000);

23 }

24}

接下來在本地啟動一個zk單機服務,進入到zk的安裝目錄 ,啟動命令如下:

1./zkServer.sh start

啟動一個客戶端,創建一個開關user_open value為true,假設我這個服務的appid叫sky,那麼我應該先創建/sky 這個路徑,接著創建,/sky/user_open這個路徑,命令如下:

1create /sky 1

2create /sky/user_open true

接下來我們啟動SwitchDemo測試類,在代碼走到第一次sleep階段,我們立馬將user_open 這個值修改為false,修改zk的命令為:

1set /sky/user_open false

最終打印結果如下:

如何設計分佈式系統開關

從結果可以看出,第一次執行的時候,由於user_open的value為true,所以

日誌 exe user open switch 1 打印出來了,其次監聽的日誌也打印出來了,當代碼執行到第十行的時候,我們將user_open的value修改為false,此時監聽的日誌監聽到開關發生了變化,並將本地內存的開關地址修改了false,最後執行第14行代碼的時候,由於開關是關閉狀態,所以走到了第18行的邏輯。

3. 灰度開關設計

灰度開關主要針對某一個功能來進行灰度,那麼就需要有一個灰度策略的概念,比如設置的是灰度10%,此時有1000個請求進來,應該只有100個左右的請求是命中這段邏輯,在微服務架構中,服務與服務之間的調用都會透傳一個requestId(請求id),因此將requestId 當做灰度的主體是最適合不過了,簡單的灰度算法可以將requestId 進行hash 取模100 然後跟設置的灰度值進行比較即可。代碼如下:

1 /**

2 * 灰度開關

3 *

4 * @param switchKey

5 * @param strategyId 灰度策略,可以傳入requestId,手機號來進行灰度

6 * @return

7 */

8 public boolean getGrayscaleSwitch(String switchKey, String strategyId) {

9

10 int value = getInt(switchKey, 100);

11

12 int hash = strategyId.hashCode();

13

14 return Math.abs(hash) % 100 <= value;

15 }

灰度不是一個精確值,請求量越大灰度的越精確,因此接下來我們的測試demo,會模擬10000條請求,如果命中了大約1000條左右,那麼說明我們的灰度算法沒啥問題,我們將當前時間戳當做requestId(當然實際不要這麼做,應該用微服務之間透傳的requestId,這裡只是為了測試)

首先設置一個灰度開關user_gary,value為10(代表灰度10%,最大為100)

1create /sky/user_gary 10

測試代碼如下:

1 //灰度10%開關

2 int grayCount=0;//

3 for (int i=0;i<10000;i++){

4 String requestId = System.currentTimeMillis()+"-"+i;

5 if (SwitchHandler.config().getGrayscaleSwitch("user_gary",requestId)){

6 grayCount++;

7 }

8 }

9

10 System.out.println("進入灰度開關的次數為:"+grayCount);

運行結果:

如何設計分佈式系統開關

我們看到10000次請求,命中了1176次,大約灰度10%,說明灰度起作用了。

SDK完整代碼

pom依賴:

1 <dependency>

2 <groupid>org.apache.curator/<groupid>

3 <artifactid>curator-recipes/<artifactid>

4 <version>4.0.1/<version>

5

SDK完整代碼:

1package com.wuzy.myswitch;

2

3import org.apache.curator.RetryPolicy;

4import org.apache.curator.framework.CuratorFramework;

5import org.apache.curator.framework.CuratorFrameworkFactory;

6import org.apache.curator.framework.api.GetDataBuilder;

7import org.apache.curator.framework.recipes.cache.NodeCache;

8import org.apache.curator.framework.recipes.cache.NodeCacheListener;

9import org.apache.curator.retry.ExponentialBackoffRetry;

10

11import java.util.Map;

12import java.util.concurrent.ConcurrentHashMap;

13

14public class SwitchHandler {

15

16 private String basePath = "/sky";

17 private GetDataBuilder dataBuilder;

18

19 private Map<string> dataMap = new ConcurrentHashMap<string>();/<string>/<string>

20

21 private SwitchHandler() {

22 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

23 client = CuratorFrameworkFactory.builder()

24 .connectString("127.0.0.1:2181")

25 .retryPolicy(retryPolicy)

26 .sessionTimeoutMs(6000)

27 .connectionTimeoutMs(3000)

28 .build();

29 client.start();

30

31 dataBuilder = client.getData();

32

33 //TODO 後期這裡最好將數據庫中的開關加載出來,從zk中找出放入本地緩存中,加快查詢速度

34 }

35

36 private static class SwitchHandlerHolder {

37 private static final SwitchHandler switchHandler = new SwitchHandler();

38 }

39

40 public static SwitchHandler config() {

41 return SwitchHandlerHolder.switchHandler;

42 }

43

44 private CuratorFramework client;

45

46

47 /**

48 * 獲取開關,默認是打開的

49 *

50 * @param switchKey

51 * @return

52 */

53 public boolean getSwitch(String switchKey) {

54 try {

55 String dataMsg = getDataMsg(switchKey);

56 if (isEmpty(dataMsg)) {

57 return true;

58 }

59

60 return Boolean.parseBoolean(dataMsg);

61 } catch (Exception e) {

62 e.printStackTrace();

63 }

64

65 return true;

66 }

67

68 /**

69 * 灰度開關

70 *

71 * @param switchKey

72 * @param strategyId 灰度策略,可以傳入requestId,手機號來進行灰度

73 * @return

74 */

75 public boolean getGrayscaleSwitch(String switchKey, String strategyId) {

76

77 int value = getInt(switchKey, 100);

78

79 int hash = strategyId.hashCode();

80

81 return Math.abs(hash) % 100 <= value;

82 }

83

84

85 public int getInt(String key, int defaultValue) {

86 try {

87 String dataMsg = getDataMsg(key);

88 if (isEmpty(dataMsg)) {

89 return defaultValue;

90 }

91 return Integer.parseInt(dataMsg);

92

93 } catch (Exception e) {

94 e.printStackTrace();

95 }

96

97 return defaultValue;

98 }

99

100

101 private boolean isEmpty(String msg) {

102 return msg == null || msg.trim().equals("");

103 }

104

105 private String getDataMsg(String key) {

106 byte[] data;

107 try {

108 //先從本地緩存中找

109 String msg = dataMap.get(key);

110 if (!isEmpty(msg)) {

111 return msg;

112 }

113

114 //本地緩存沒有,則從zk中去查找

115 data = dataBuilder.forPath(basePath + "/" + key);

116 if (data != null) {

117 String dataMsg = new String(data);

118 //重新塞入緩存

119 dataMap.put(key, dataMsg);

120 nodeListener(key);

121 return dataMsg;

122 }

123 } catch (Exception e) {

124 e.printStackTrace();

125 }

126

127 return null;

128 }

129

130

131 private void nodeListener(final String key) {

132 final NodeCache nodeCache = new NodeCache(client, basePath + "/" + key);

133 try {

134 nodeCache.start();

135 nodeCache.getListenable().addListener(new NodeCacheListener() {

136

137 public void nodeChanged() throws Exception {

138 String msg = new String(nodeCache.getCurrentData().getData());

139 System.out.println("監聽事件觸發");

140 System.out.println("重新獲得節點內容為:" + msg);

141 //加入到本地緩存

142 dataMap.put(key, msg);

143 }

144 });

145 } catch (Exception e) {

146 e.printStackTrace();

147 }

148

149

150 }

151}


分享到:


相關文章: