第五章 ZooKeeper Java案例

為了介紹 Zookeeper Java API的基本用法,本文將一步一步的從配置ZooKeeper的Java開發環境開始介紹,實現幾個功能簡單的Zookeeper小程序。

一、Eclipse環境搭建

1.1、創建一個Maven工程

1.2、添加pom文件

<code>    <dependencies>
       <dependency>
           <groupid>junit/<groupid>
           <artifactid>junit/<artifactid>
           <version>RELEASE/<version>
       /<dependency>
       <dependency>
               <groupid>org.apache.logging.log4j/<groupid>
               <artifactid>log4j-core/<artifactid>
               <version>2.8.2/<version>
       /<dependency>
       <dependency>
               <groupid>org.apache.zookeeper/<groupid>
               <artifactid>zookeeper/<artifactid>
               <version>3.4.10/<version>
       /<dependency>
   /<dependencies>/<code>

1.3、拷貝log4j.properties文件到項目根目錄

需要在項目的 src/main/resources 目錄下,新建一個文件,命名為“log4j.properties”,在文件中填入。

<code>log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n/<code>


二、創建ZooKeeper客戶端測試程序

<code>package com.qinghe.zookeeper;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

public class BaseOperateZK {

private static int sessionTimeout = 2000;
private String connectString = "bigdata101:2181,bigdata102:2181,bigdata103:2181";
private ZooKeeper zkClient = null;

@Before
public void init() throws IOException {

zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

@Override
public void process(WatchedEvent event) {

// 收到事件通知後回調函數(真正的監聽業務邏輯)
System.out.println(event.getType() + "------" + event.getPath());

// 再次啟動監聽程序
try {
zkClient.getChildren("/", true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {

e.printStackTrace();
}
}
});
}

// 1 創建節點
@Test
public void createNode() throws KeeperException, InterruptedException {
String path = zkClient.create("/server",
"server".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(path);
}


// 2.1 獲取節點信息
@Test
public void getDataNode() throws KeeperException, InterruptedException {
List<string> children = zkClient.getChildren("/", false);

for (String child : children) {
System.out.println(child);
}
}


// 2.2 獲取節點信息並監控節點變化
@Test
public void getDataNodeWatch() throws KeeperException, InterruptedException {
List<string> children = zkClient.getChildren("/", true);

for (String child : children) {
System.out.println(child);
}


// 延時阻塞,就是為了觀察監聽節點變化情況
Thread.sleep(Long.MAX_VALUE);
}

// 3 判斷節點是否存在
@Test
public void existNode() throws KeeperException, InterruptedException {

Stat stat = zkClient.exists("/liangshan", true);

System.out.println(stat == null ? "not exist" : "exist");


}

}/<string>/<string>/<code>


三、監聽服務器節點動態上下線案例

在實際的生產環境中,大型的程序都是部署在集群之中的。為了保證高可用性,我們會將同一套程序部署在多臺服務器上面,然後通過負載均衡服務器去調度,但是我們不能快速定位是哪臺服務器掛掉了,這時需要使用ZooKeeper來解決這個問題。

3.1ZooKeeper的動態感知

動態感知就是使用ZooKeeper的watch監聽功能,首先展現一下我們的Web生產服務器的樣子。

第五章 ZooKeeper Java案例

如果我們藉助與ZooKeeper的註冊功能,將每天服務器採用臨時節點的方式動態註冊到ZooKeeper服務器,Nginx讀取ZooKeeper的動態節點,這樣如果是正常的服務,Nginx就能繼續分配給相應的服務器任務。

第五章 ZooKeeper Java案例

3.2具體代碼實現

1)現在集群上面創建/servers節點

<code>[zk: localhost:2181(CONNECTED) 0] create /servers "servers"
Created /servers/<code>

2)服務器端註冊代碼

<code>package com.qinghe.zookeeper;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class DistributeServerDemo {

private static final String connectString = "bigdata101:2181,bigdata102:2181,bigdata103:2181";
private static final int sessionTimeout = 2000;
private ZooKeeper zooKeeper = null;
private String parentNode = "/servers";

public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeServerDemo serverDemo = new DistributeServerDemo();

// 1 連接ZooKeeper集群
serverDemo.getConnect();

// 2 註冊節點
serverDemo.registerServer(args[0]);

// 3 業務邏輯處理
serverDemo.business(args[0]);
}

/**

- 業務邏輯
- @param string

- @throws InterruptedException
 */
 private void business(String hostName) throws InterruptedException {
 System.out.println(hostName + "is working ......");
 Thread.sleep(Long.MAX_VALUE);
}

/**

- 向zk集群註冊服務器信息

- @param hostName

- @throws KeeperException

- @throws InterruptedException
 */
 private void registerServer(String hostName) throws KeeperException, InterruptedException {

 String zkCreate = zooKeeper.create(parentNode + "/server", hostName.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
 System.out.println(hostName + "is online " + zkCreate);
}

/**

- 創建到zk的客戶端連接

- @throws IOException
 */
 private void getConnect() throws IOException {

 zooKeeper  = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

@Override
public void process(WatchedEvent event) {
// 收到事件通知後的回調函數(應該是我們自己的事件處理邏輯)
System.out.println(event.getType() + "---" + event.getPath());
}
});
}


}/<code>


3)客戶端代碼

<code>package com.qinghe.zookeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class DistributeClientDemo {

private static final String connectString = "bigdata101:2181,bigdata102:2181,bigdata103:2181";
private static final int sessionTimeout = 2000;
private ZooKeeper zooKeeper = null;
private static final String parentNode = "/servers";

private volatile List<string> serverList = null;

public static void main(String[] args) throws IOException, KeeperException, InterruptedException {

// 1 獲取zk連接
DistributeClientDemo clientDemo = new DistributeClientDemo();
clientDemo.getConnect();

// 2 獲取servers的子節點並監聽
clientDemo.getServerList();

// 3 業務邏輯
clientDemo.business();
}

/**

- 業務邏輯處理

-

- @throws InterruptedException
 */
 private void business() throws InterruptedException {

 System.out.println("client is working......");
 Thread.sleep(Long.MAX_VALUE);

}

/**

- 獲取服務器列表

- @throws KeeperException

- @throws InterruptedException
 */
 private void getServerList() throws KeeperException, InterruptedException {

 // 獲取服務器子節點信息,並且對父節點進行監聽
 List<string> children = zooKeeper.getChildren(parentNode , true);

 // 創建一個局部的list來存服務器信息
 List<string> servers = new ArrayList<string>();
 for (String child : children) {

// child 是節點名
byte[] data = zooKeeper.getData(parentNode + "/" + child, false, null);
servers.add(new String(data));
}

 // 把servers賦值給成員變量serverList,以提供給各業務線程使用
 serverList = servers;

 System.out.println(serverList);

}

/**

- 創建到zk的客戶端連接

- @throws IOException
 */
 private void getConnect() throws IOException {

 zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

@Override
public void process(WatchedEvent event) {


// 收到事件通知後的回調函數(應該是我們自己的事件處理邏輯)
System.out.println(event.getType() + "---" + event.getPath());

try {
getServerList();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
});
}

}/<string>/<string>/<string>/<string>/<code>


4)測試

啟動三個服務端帶參數程序DistributeServerDemo.java

第五章 ZooKeeper Java案例

第五章 ZooKeeper Java案例

再啟動另外兩個服務器連接DistributeServerDemo.java

第五章 ZooKeeper Java案例

啟動客戶端程序DistributeClientDemo.java

第五章 ZooKeeper Java案例

順序關閉兩個服務器程序,然後在客戶端上查看監控信息

第五章 ZooKeeper Java案例



分享到:


相關文章: