01.22 Java實現一致性哈希算法,並搭建環境測試其負載均衡特性

實現負載均衡是後端領域一個重要的話題,一致性哈希算法是實現服務器負載均衡的方法之一,你很可能已在一些遠程服務框架中使用過它。下面我們嘗試一下自己實現一致性哈希算法。

一. 簡述一致性哈希算法

這裡不詳細介紹一致性哈希算法的起源了,網上能方便地搜到許多介紹一致性哈希算法的好文章。本文主要想動手實現一致性哈希算法,並搭建一個環境進行實戰測試。

在開始之前先整理一下算法的思路:

一致性哈希算法通過把每臺服務器的哈希值打在哈希環上,把哈希環分成不同的段,然後對到來的請求計算哈希值從而得知該請求所歸屬的服務器。這個辦法解決了傳統服務器增減機器時需要重新計算哈希的麻煩。

但如果服務器的數量較少,可能導致計算出的哈希值相差較小,在哈希環上分佈不均勻,導致某臺服務器過載。為了解決負載均衡問題,我們引入虛擬節點技術,為每臺服務器分配一定數量的節點,通過節點的哈希值在哈希環上進行劃分。這樣一來,我們就可以根據機器的性能為其分配節點,性能好就多分配一點,差就少一點,從而達到負載均衡。

二. 實現一致性哈希算法

奠定了整體思路後我們開始考慮實現的細節

哈希算法的選擇

選擇能散列出32位整數的 FNV 算法, 由於該哈希函數可能產生負數, 需要作取絕對值處理.

請求節點在哈希環上尋找對應服務器的策略

策略為:新節點尋找最近比且它大的節點, 比如說現在已經有環[0, 5, 7, 10], 來了個哈希值為6的節點, 那麼它應該由哈希值為7對應的服務器處理. 如果請求節點所計算的哈希值大於環上的所有節點, 那麼就取第一個節點. 比如來了個11, 將分配到0所對應的節點.

哈希環的組織結構

開始的時候想過用順序存儲的結構存放,但是在一致性哈希中,最頻繁的操作是在集合中查找最近且比目標大的數. 如果用順序存儲結構的話,時間複雜度是收斂於O(N)的,而樹形結構則為更優的O(logN)。

但凡事有兩面,採用樹形結構存儲的代價是數據初始化的效率較低,而且運行期間如果有節點插入刪除的話效率也比較低。但是在現實中,服務器在一開始註冊後基本上就不怎麼變了,期間增減機器,宕機,機器修復等事件的頻率相比起節點的查詢簡直是微不足道。所以本案例決定使用使用樹形結構存儲。

貼合上述要求,並且提供有序存儲的,首先想到的是紅黑樹,而且Java中提供了紅黑樹的實現TreeMap。

虛擬節點與真實節點的映射關係

如何確定一個虛擬節點對應的真實節點也是個問題。理論上應該維護一張表記錄真實節點與虛擬節點的映射關係。本案例為了演示,採用簡單的字符串處理。

比方說服務器192.168.0.1:8888分配了 1000 個虛擬節點, 那麼它的虛擬節點名稱從192.168.0.1:8888@1一直到192.168.0.1:8888@1000。通過這樣的處理,我們在通過虛擬節點找真實節點時只需要裁剪字符串即可。

計劃定製好後, 下面是具體代碼:

<code>public class ConsistentHashTest {
/**
* 服務器列表,一共有3臺服務器提供服務, 將根據性能分配虛擬節點
*/
public static String[] servers = {
"192.168.0.1#100", //服務器1: 性能指數100, 將獲得1000個虛擬節點
"192.168.0.2#100", //服務器2: 性能指數100, 將獲得1000個虛擬節點
"192.168.0.3#30" //服務器3: 性能指數30, 將獲得300個虛擬節點
};
/**

* 真實服務器列表, 由於增加與刪除的頻率比遍歷高, 用鏈表存儲比較划算
*/
private static List<string> realNodes = new LinkedList<>();
/**
* 虛擬節點列表
*/
private static TreeMap<integer> virtualNodes = new TreeMap<>();

static{
for(String s : servers){
//把服務器加入真實服務器列表中
realNodes.add(s);
String[] strs = s.split("#");
//服務器名稱, 省略端口號
String name = strs[0];
//根據服務器性能給每臺真實服務器分配虛擬節點, 並把虛擬節點放到虛擬節點列表中.
int virtualNodeNum = Integer.parseInt(strs[1]) * 10;
for(int i = 1; i <= virtualNodeNum; i++){
virtualNodes.put(FVNHash(name + "@" + i), name + "@" + i);
}
}
}

public static void main(String[] args) {
new Thread(new RequestProcess()).start();
}

static class RequestProcess implements Runnable{
@Override
public void run() {
String client = null;
while(true){
//模擬產生一個請求
client = getN() + "." + getN() + "." + getN() + "." + getN() + ":" + (1000 + (int)(Math.random() * 9000));
//計算請求的哈希值
int hash = FVNHash(client);
//判斷請求將由哪臺服務器處理
System.out.println(client + " 的請求將由 " + getServer(client) + " 處理");

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}

private static String getServer(String client) {
//計算客戶端請求的哈希值
int hash = FVNHash(client);
//得到大於該哈希值的所有map集合
SortedMap<integer> subMap = virtualNodes.tailMap(hash);
//找到比該值大的第一個虛擬節點, 如果沒有比它大的虛擬節點, 根據哈希環, 則返回第一個節點.
Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
//通過該虛擬節點獲得真實節點的名稱
String virtualNodeName = virtualNodes.get(targetKey);
String realNodeName = virtualNodeName.split("@")[0];
return realNodeName;
}

public static int getN(){
return (int)(Math.random() * 128);
}

public static int FVNHash(String data){
final int p = 16777619;
int hash = (int)2166136261L;
for(int i = 0; i < data.length(); i++)
hash = (hash ^ data.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
return hash < 0 ? Math.abs(hash) : hash;
}
}

/* 運行結果片段

55.1.13.47:6240 的請求將由 192.168.0.1 處理
5.49.56.126:1105 的請求將由 192.168.0.1 處理
90.41.8.88:6884 的請求將由 192.168.0.2 處理
26.107.104.81:2989 的請求將由 192.168.0.2 處理
114.66.6.56:8233 的請求將由 192.168.0.1 處理
123.74.52.94:5523 的請求將由 192.168.0.1 處理
104.59.60.2:7502 的請求將由 192.168.0.2 處理
4.94.30.79:1299 的請求將由 192.168.0.1 處理
10.44.37.73:9332 的請求將由 192.168.0.2 處理
115.93.93.82:6333 的請求將由 192.168.0.2 處理
15.24.97.66:9177 的請求將由 192.168.0.2 處理
100.39.98.10:1023 的請求將由 192.168.0.2 處理
61.118.87.26:5108 的請求將由 192.168.0.2 處理
17.79.104.35:3901 的請求將由 192.168.0.1 處理
95.36.5.25:8020 的請求將由 192.168.0.2 處理
126.74.56.71:7792 的請求將由 192.168.0.2 處理
14.63.56.45:8275 的請求將由 192.168.0.1 處理
58.53.44.71:2089 的請求將由 192.168.0.3 處理
80.64.57.43:6144 的請求將由 192.168.0.2 處理
46.65.4.18:7649 的請求將由 192.168.0.2 處理
57.35.27.62:9607 的請求將由 192.168.0.2 處理
81.114.72.3:3444 的請求將由 192.168.0.1 處理
38.18.61.26:6295 的請求將由 192.168.0.2 處理
71.75.18.82:9686 的請求將由 192.168.0.2 處理
26.11.98.111:3781 的請求將由 192.168.0.1 處理
62.86.23.37:8570 的請求將由 192.168.0.3 處理
*/
/<integer>/<integer>/<string>/<code>

經過上面的測試我們可以看到性能較好的服務器1和服務器2分擔了大部分的請求,只有少部分請求落到了性能較差的服務器3上,已經初步實現了負載均衡。

下面我們將結合zookeeper,搭建一個更加逼真的服務器集群,看看在部分服務器上線下線的過程中,一致性哈希算法是否仍能夠實現負載均衡。

三. 結合zookeeper搭建環境

環境介紹

首先會通過啟動多臺虛擬機模擬服務器集群,各臺服務器都提供一個相同的接口供消費者消費。

同時會有一個消費者線程不斷地向服務器集群發起請求,這些請求會經過一致性哈希算法均衡負載到各個服務器。

為了能夠模擬上述場景, 我們必須在客戶端維護一個服務器列表, 使得客戶端能夠通過一致性哈希算法選擇服務器發送。 (現實中可能會把一致性哈希算法實現在前端服務器, 客戶先訪問前端服務器, 再路由到後端服務器集群)。

但是我們的重點是模擬服務器的宕機和上線,看看一致性哈希算法是否仍能實現負載均衡。所以客戶端必須能夠感知服務器端的變化並動態地調整它的服務器列表。

為了完成這項工作, 我們引入zookeeper, zookeeper的數據一致性算法保證數據實時, 準確, 客戶端能夠通過zookeeper得知實時的服務器情況。

具體操作是這樣的: 服務器集群先以臨時節點的方式連接到zookeeper, 並在zookeeper上註冊自己的接口服務(註冊節點). 客戶端連接上zookeeper後, 把已註冊的節點(服務器)添加到自己的服務器列表中。

如果有服務器宕機的話, 由於當初註冊的是瞬時節點的原因, 該臺服務器節點會從zookeeper中註銷。客戶端監聽到服務器節點有變時, 也會動態調整自己的服務器列表, 把當宕機的服務器從服務器列表中刪除, 因此不會再向該服務器發送請求, 負載均衡的任務將交到剩餘的機器身上。

當有服務器從新連接上集群后, 客戶端的服務器列表也會更新, 哈希環也將做出相應的變化以提供負載均衡。

具體操作:

I. 搭建zookeeper集群環境:

  1. 創建3個zookeeper服務, 構成集群. 在各自的data文件夾中添加一個myid文件, 各個id分別為1, 2, 3.
Java實現一致性哈希算法,並搭建環境測試其負載均衡特性

  1. 重新複製一份配置文件, 在配置文件中配置各個zookeeper的端口號. 本案例中三臺zookeeper分別在2181, 2182, 2183端口
Java實現一致性哈希算法,並搭建環境測試其負載均衡特性

  1. 啟動zookeeper集群

由於zookeeper不是本案例的重點, 細節暫不展開講了.

II. 創建服務器集群, 提供RPC遠程調用服務

  1. 首先創建一個服務器項目(使用Maven), 添加zookeeper依賴
  2. 創建常量接口, 用於存儲連接zookeeper 的信息
<code>public interface Constant {
//zookeeper集群的地址
String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";
//連接zookeeper的超時時間
int ZK_TIME_OUT = 5000;
//服務器所發佈的遠程服務在zookeeper中的註冊地址, 也就是說這個節點中保存了各個服務器提供的接口
String ZK_REGISTRY = "/provider";
//zookeeper集群中註冊服務的url地址的瞬時節點
String ZK_RMI = ZK_REGISTRY + "/rmi";
}
/<code>

3.封裝操作zookeeper和發佈遠程服務的接口供自己調用, 本案例中發佈遠程服務使用Java自身提供的rmi包完成, 如果沒有了解過可以參考這篇

<code>public class ServiceProvider {

private CountDownLatch latch = new CountDownLatch(1);

/**
* 連接zookeeper集群
*/
public ZooKeeper connectToZK(){
ZooKeeper zk = null;
try {
zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//如果連接上了就喚醒當前線程.
latch.countDown();
}
});
latch.await();//還沒連接上時當前線程等待
} catch (Exception e) {
e.printStackTrace();
}
return zk;
}

/**
* 創建znode節點
* @param zk
* @param url 節點中寫入的數據
*/
public void createNode(ZooKeeper zk, String url){
try{
//要把寫入的數據轉化為字節數組
byte[] data = url.getBytes();
zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 發佈rmi服務
*/
public String publishService(Remote remote, String host, int port){
String url = null;
try{
LocateRegistry.createRegistry(port);
url = "rmi://" + host + ":" + port + "/rmiService";

Naming.bind(url, remote);
} catch (Exception e) {
e.printStackTrace();
}
return url;
}

/**
* 發佈rmi服務, 並且將服務的url註冊到zookeeper集群中
*/
public void publish(Remote remote, String host, int port){
//調用publishService, 得到服務的url地址
String url = publishService(remote, host, port);
if(null != url){
ZooKeeper zk = connectToZK();//連接到zookeeper
if(null != zk){
createNode(zk, url);
}
}
}
}
/<code>
  1. 自定義遠程服務. 服務提供一個簡單的方法: 客戶端發來一個字符串, 服務器在字符串前面添加上Hello, 並返回字符串。
<code>//UserService
public interface UserService extends Remote {
public String helloRmi(String name) throws RemoteException;
}
//UserServiceImpl
public class UserServiceImpl implements UserService {

public UserServiceImpl() throws RemoteException{
super();
}

@Override
public String helloRmi(String name) throws RemoteException {
return "Hello " + name + "!";
}
}

/<code>
  1. 修改端口號, 啟動多個java虛擬機, 模擬服務器集群. 為了方便演示, 自定義7777, 8888, 9999端口開啟3個服務器進程, 到時會模擬7777端口的服務器宕機和修復重連。
<code>public static void main(String[] args) throws RemoteException {
//創建工具類對象
ServiceProvider sp = new ServiceProvider();
//創建遠程服務對象
UserService userService = new UserServiceImpl();
//完成發佈
sp.publish(userService, "localhost", 9999);
}
/<code>
Java實現一致性哈希算法,並搭建環境測試其負載均衡特性

III. 編寫客戶端程序(運用一致性哈希算法實現負載均衡

  1. 封裝客戶端接口: ```java public class ServiceConsumer { /**提供遠程服務的服務器列表, 只記錄遠程服務的url */ private volatile List urls = new LinkedList<>(); /**遠程服務對應的虛擬節點集合 */ private static TreeMap<integer> virtualNodes = new TreeMap<>();public ServiceConsumer(){ ZooKeeper zk = connectToZK();//客戶端連接到zookeeper if(null != zk){ //連接上後關注zookeeper中的節點變化(服務器變化) watchNode(zk); } }private void watchNode(final ZooKeeper zk) { try{ //觀察/provider節點下的子節點是否有變化(是否有服務器登入或登出) List nodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //如果服務器節點有變化就重新獲取 if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){ System.out.println("服務器端有變化, 可能有舊服務器宕機或者新服務器加入集群..."); watchNode(zk); } } }); //將獲取到的服務器節點數據保存到集合中, 也就是獲得了遠程服務的訪問url地址 List dataList = new LinkedList<>(); TreeMap<integer> newVirtualNodesList = new TreeMap<>(); for(String nodeStr : nodeList){ byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null); //放入服務器列表的url String url = new String(data); //為每個服務器分配虛擬節點, 為了方便模擬, 默認開啟在9999端口的服務器性能較差, 只分配300個虛擬節點, 其他分配1000個. if(url.contains("9999")){ for(int i = 1; i <= 300; i++){ newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i); } }else{ for(int i = 1; i <= 1000; i++){ newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i); } } dataList.add(url); } urls = dataList; virtualNodes = newVirtualNodesList; dataList = null;//好讓垃圾回收器儘快收集 newVirtualNodesList = null; } catch (Exception e) { e.printStackTrace(); } }/**根據url獲得遠程服務對象 */ public T lookUpService(String url){ T remote = null; try{ remote = (T)Naming.lookup(url); } catch (Exception e) { //如果該url連接不上, 很有可能是該服務器掛了, 這時使用服務器列表中的第一個服務器url重新獲取遠程對象. if(e instanceof ConnectException){ if (urls.size() != 0){ url = urls.get(0); return lookUpService(url); } } } return remote; }/**通過一致性哈希算法, 選取一個url, 最後返回一個遠程服務對象 */ public T lookUp(){ T service = null; //隨機計算一個哈希值 int hash = FVNHash(Math.random() * 10000 + ""); //得到大於該哈希值的所有map集合 SortedMap<integer> subMap = virtualNodes.tailMap(hash); //找到比該值大的第一個虛擬節點, 如果沒有比它大的虛擬節點, 根據哈希環, 則返回第一個節點. Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey(); //通過該虛擬節點獲得服務器url String virtualNodeName = virtualNodes.get(targetKey); String url = virtualNodeName.split("@")[0]; //根據服務器url獲取遠程服務對象 service = lookUpService(url); System.out.print("提供本次服務的地址為: " + url + ", 返回結果: "); return service; }private CountDownLatch latch = new CountDownLatch(1);public ZooKeeper connectToZK(){ ZooKeeper zk = null; try { zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //判斷是否連接zk集群 latch.countDown();//喚醒處於等待狀態的當前線程 } }); latch.await();//沒有連接上的時候當前線程處於等待狀態. } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return zk; }public static int FVNHash(String data){ final int p = 16777619; int hash = (int)2166136261L; for(int i = 0; i < data.length(); i++) hash = (hash ^ data.charAt(i)) * p; hash += hash « 13; hash ^= hash » 7; hash += hash « 3; hash ^= hash » 17; hash += hash « 5; return hash < 0 ? Math.abs(hash) : hash; } } ```/<integer>/<integer>/<integer>
  2. 啟動客戶端進行測試:
<code>public static void main(String[] args){
ServiceConsumer sc = new ServiceConsumer();//創建工具類對象
while(true){
//獲得rmi遠程服務對象
UserService userService = sc.lookUp();
try{
//調用遠程方法
String result = userService.helloRmi("炭燒生蠔");
System.out.println(result);
Thread.sleep(100);
}catch(Exception e){
e.printStackTrace();
}
}
}
/<code>
  1. 客戶端跑起來後, 在顯示臺不斷進行打印…下面將對數據進行統計。
Java實現一致性哈希算法,並搭建環境測試其負載均衡特性

Java實現一致性哈希算法,並搭建環境測試其負載均衡特性

IV. 對服務器調用數據進行統計分析

重溫一遍模擬的過程: 首先分別在7777, 8888, 9999端口啟動了3臺服務器. 然後啟動客戶端進行訪問. 7777, 8888端口的兩臺服務器設置性能指數為1000, 而9999端口的服務器性能指數設置為300。

在客戶端運行期間, 我手動關閉了8888端口的服務器, 客戶端正常打印出服務器變化信息。此時理論上不會有訪問被路由到8888端口的服務器。當我重新啟動8888端口服務器時, 客戶端打印出服務器變化信息, 訪問能正常到達8888端口服務器。

下面對各服務器的訪問量進行統計, 看是否實現了負載均衡。

測試程序如下:

<code>public class DataStatistics {
private static float ReqToPort7777 = 0;
private static float ReqToPort8888 = 0;
private static float ReqToPort9999 = 0;

public static void main(String[] args) {
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader("C://test.txt"));
String line = null;
while(null != (line = br.readLine())){
if(line.contains("7777")){
ReqToPort7777++;
}else if(line.contains("8888")){
ReqToPort8888++;
}else if(line.contains("9999")){
ReqToPort9999++;
}else{
print(false);
}
}
print(true);
} catch (Exception e) {
e.printStackTrace();
}finally {
if(null != br){
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
br = null;
}
}
}

private static void print(boolean isEnd){
if(!isEnd){
System.out.println("------------- 服務器集群發生變化 -------------");
}else{
System.out.println("------------- 最後一次統計 -------------");

}
System.out.println("截取自上次服務器變化到現在: ");
float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
System.out.println("7777端口服務器訪問量為: " + ReqToPort7777 + ", 佔比" + (ReqToPort7777 / total));
System.out.println("8888端口服務器訪問量為: " + ReqToPort8888 + ", 佔比" + (ReqToPort8888 / total));
System.out.println("9999端口服務器訪問量為: " + ReqToPort9999 + ", 佔比" + (ReqToPort9999 / total));
ReqToPort7777 = 0;
ReqToPort8888 = 0;
ReqToPort9999 = 0;
}
}

/* 以下是輸出結果
------------- 服務器集群發生變化 -------------
截取自上次服務器變化到現在:
7777端口服務器訪問量為: 198.0, 佔比0.4419643
8888端口服務器訪問量為: 184.0, 佔比0.4107143
9999端口服務器訪問量為: 66.0, 佔比0.14732143
------------- 服務器集群發生變化 -------------
截取自上次服務器變化到現在:
7777端口服務器訪問量為: 510.0, 佔比0.7589286
8888端口服務器訪問量為: 1.0, 佔比0.0014880953
9999端口服務器訪問量為: 161.0, 佔比0.23958333
------------- 最後一次統計 -------------
截取自上次服務器變化到現在:
7777端口服務器訪問量為: 410.0, 佔比0.43248945
8888端口服務器訪問量為: 398.0, 佔比0.41983122
9999端口服務器訪問量為: 140.0, 佔比0.14767933
*/

/<code>

V. 結果

從測試數據可以看出, 不管是8888端口服務器宕機之前, 還是宕機之後, 三臺服務器接收的訪問量和性能指數成正比,成功地驗證了一致性哈希算法的負載均衡作用。

四. 擴展思考

初識一致性哈希算法的時候, 對這種奇特的思路佩服得五體投地。但是一致性哈希算法除了能夠讓後端服務器實現負載均衡, 還有一個特點可能是其他負載均衡算法所不具備的。

這個特點是基於哈希函數的, 我們知道通過哈希函數, 固定的輸入能夠產生固定的輸出. 換句話說, 同樣的請求會路由到相同的服務器. 這點就很牛逼了, 我們可以結合一致性哈希算法和緩存機制提供後端服務器的性能。

比如說在一個分佈式系統中, 有一個服務器集群提供查詢用戶信息的方法, 每個請求將會帶著用戶的uid到達, 我們可以通過哈希函數進行處理(從上面的演示代碼可以看到, 這點是可以輕鬆實現的), 使同樣的uid路由到某個獨定的服務器. 這樣我們就可以在服務器上對該的uid背後的用戶信息進行緩存, 從而減少對數據庫或其他中間件的操作, 從而提高系統效率。

當然如果使用該策略的話, 你可能還要考慮緩存更新等操作, 但作為一種優良的策略, 我們可以考慮在適當的場合靈活運用。

以上思考受啟發於Dubbo框架中對其實現的四種負載均衡策略的描述。


分享到:


相關文章: