為了介紹 Zookeeper Java API的基本用法,本文將一步一步的從配置ZooKeeper的Java開發環境開始介紹,實現幾個功能簡單的Zookeeper小程序。
一、Eclipse環境搭建
1.1、創建一個Maven工程
1.2、添加pom文件
<code> <dependencies>
<dependency>
<groupid>junit/<groupid>
<artifactid>junit/<artifactid>
<version>RELEASE/<version>
/<dependency>
<dependency>
<groupid>org.apache.logging.log4j/<groupid>
<artifactid>log4j-core/<artifactid>
<version>2.8.2/<version>
/<dependency>
<dependency>
<groupid>org.apache.zookeeper/<groupid>
<artifactid>zookeeper/<artifactid>
<version>3.4.10/<version>
/<dependency>
/<dependencies>/<code>
1.3、拷貝log4j.properties文件到項目根目錄
需要在項目的 src/main/resources 目錄下,新建一個文件,命名為“log4j.properties”,在文件中填入。
<code>log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n/<code>
二、創建ZooKeeper客戶端測試程序
<code>package com.qinghe.zookeeper;
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
public class BaseOperateZK {
private static int sessionTimeout = 2000;
private String connectString = "bigdata101:2181,bigdata102:2181,bigdata103:2181";
private ZooKeeper zkClient = null;
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知後回調函數(真正的監聽業務邏輯)
System.out.println(event.getType() + "------" + event.getPath());
// 再次啟動監聽程序
try {
zkClient.getChildren("/", true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// 1 創建節點
@Test
public void createNode() throws KeeperException, InterruptedException {
String path = zkClient.create("/server",
"server".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(path);
}
// 2.1 獲取節點信息
@Test
public void getDataNode() throws KeeperException, InterruptedException {
List<string> children = zkClient.getChildren("/", false);
for (String child : children) {
System.out.println(child);
}
}
// 2.2 獲取節點信息並監控節點變化
@Test
public void getDataNodeWatch() throws KeeperException, InterruptedException {
List<string> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
// 延時阻塞,就是為了觀察監聽節點變化情況
Thread.sleep(Long.MAX_VALUE);
}
// 3 判斷節點是否存在
@Test
public void existNode() throws KeeperException, InterruptedException {
Stat stat = zkClient.exists("/liangshan", true);
System.out.println(stat == null ? "not exist" : "exist");
}
}/<string>/<string>/<code>
三、監聽服務器節點動態上下線案例
在實際的生產環境中,大型的程序都是部署在集群之中的。為了保證高可用性,我們會將同一套程序部署在多臺服務器上面,然後通過負載均衡服務器去調度,但是我們不能快速定位是哪臺服務器掛掉了,這時需要使用ZooKeeper來解決這個問題。
3.1ZooKeeper的動態感知
動態感知就是使用ZooKeeper的watch監聽功能,首先展現一下我們的Web生產服務器的樣子。
如果我們藉助與ZooKeeper的註冊功能,將每天服務器採用臨時節點的方式動態註冊到ZooKeeper服務器,Nginx讀取ZooKeeper的動態節點,這樣如果是正常的服務,Nginx就能繼續分配給相應的服務器任務。
3.2具體代碼實現
1)現在集群上面創建/servers節點
<code>[zk: localhost:2181(CONNECTED) 0] create /servers "servers"
Created /servers/<code>
2)服務器端註冊代碼
<code>package com.qinghe.zookeeper;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class DistributeServerDemo {
private static final String connectString = "bigdata101:2181,bigdata102:2181,bigdata103:2181";
private static final int sessionTimeout = 2000;
private ZooKeeper zooKeeper = null;
private String parentNode = "/servers";
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
DistributeServerDemo serverDemo = new DistributeServerDemo();
// 1 連接ZooKeeper集群
serverDemo.getConnect();
// 2 註冊節點
serverDemo.registerServer(args[0]);
// 3 業務邏輯處理
serverDemo.business(args[0]);
}
/**
- 業務邏輯
- @param string
- @throws InterruptedException
*/
private void business(String hostName) throws InterruptedException {
System.out.println(hostName + "is working ......");
Thread.sleep(Long.MAX_VALUE);
}
/**
- 向zk集群註冊服務器信息
- @param hostName
- @throws KeeperException
- @throws InterruptedException
*/
private void registerServer(String hostName) throws KeeperException, InterruptedException {
String zkCreate = zooKeeper.create(parentNode + "/server", hostName.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostName + "is online " + zkCreate);
}
/**
- 創建到zk的客戶端連接
- @throws IOException
*/
private void getConnect() throws IOException {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知後的回調函數(應該是我們自己的事件處理邏輯)
System.out.println(event.getType() + "---" + event.getPath());
}
});
}
}/<code>
3)客戶端代碼
<code>package com.qinghe.zookeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DistributeClientDemo {
private static final String connectString = "bigdata101:2181,bigdata102:2181,bigdata103:2181";
private static final int sessionTimeout = 2000;
private ZooKeeper zooKeeper = null;
private static final String parentNode = "/servers";
private volatile List<string> serverList = null;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
// 1 獲取zk連接
DistributeClientDemo clientDemo = new DistributeClientDemo();
clientDemo.getConnect();
// 2 獲取servers的子節點並監聽
clientDemo.getServerList();
// 3 業務邏輯
clientDemo.business();
}
/**
- 業務邏輯處理
-
- @throws InterruptedException
*/
private void business() throws InterruptedException {
System.out.println("client is working......");
Thread.sleep(Long.MAX_VALUE);
}
/**
- 獲取服務器列表
- @throws KeeperException
- @throws InterruptedException
*/
private void getServerList() throws KeeperException, InterruptedException {
// 獲取服務器子節點信息,並且對父節點進行監聽
List<string> children = zooKeeper.getChildren(parentNode , true);
// 創建一個局部的list來存服務器信息
List<string> servers = new ArrayList<string>();
for (String child : children) {
// child 是節點名
byte[] data = zooKeeper.getData(parentNode + "/" + child, false, null);
servers.add(new String(data));
}
// 把servers賦值給成員變量serverList,以提供給各業務線程使用
serverList = servers;
System.out.println(serverList);
}
/**
- 創建到zk的客戶端連接
- @throws IOException
*/
private void getConnect() throws IOException {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知後的回調函數(應該是我們自己的事件處理邏輯)
System.out.println(event.getType() + "---" + event.getPath());
try {
getServerList();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
});
}
}/<string>/<string>/<string>/<string>/<code>
4)測試
啟動三個服務端帶參數程序DistributeServerDemo.java
再啟動另外兩個服務器連接DistributeServerDemo.java
啟動客戶端程序DistributeClientDemo.java
順序關閉兩個服務器程序,然後在客戶端上查看監控信息
閱讀更多 大數據漫路求索 的文章