用redisson的分佈式鎖實現主從選舉(leader election)

問題

用戶數上升,服務要集群,如何實現主從機制,並且當主服務掛掉或停機維護時,其它任意從服務可自動變成主服務?

程序猿A:這還不簡單,用zookeeper就行了,配上Apache curator更方便,直接幫你實現好Leader Election了。

程序猿B:嗯...zookeeper又要安裝個服務,不想只為了這個主從又引入一個新東西,我們已經有redis了,能不能基於redis來實現?

答案是:yes。

思路

開始前,先說一下基本的實現思路:

  1. 先有一個redisson的分佈式鎖RLock,名稱為:leader-lock
  2. 所有服務在啟動的時候都去嘗試獲取leader鎖
  3. 獲取鎖成功的服務為主服務
  4. 未獲取鎖的其它服務為從服務
  5. 從服務每隔幾秒鐘一直去嘗試獲取leader鎖,當主服務掛掉或停機時,其中一個從服務就會獲取到鎖變成主服務

碼起來

分佈式鎖的初始化

RLock leaderLock = redissonClient.getLock(“leader-lock”);

ElectionThread

有了鎖後,我們需要一個專門的線程用於獲取鎖

class ElectionThread extends Thread {

 private boolean isMaster = false;

 public ElectionThread() {
 setName("leader-election");
 }

 @Override
 public void run() {
 while (!stop) {
 try {
 if (isMaster) {
 synchronized (masterLock) {
 //leader鎖獲取到了,就不需要再去獲取了,進入阻塞狀態
 masterLock.wait();
 }
 } else {
 //所有從服務嘗試獲取leader鎖,嘗試並等待一定時間,如果未獲取成功,就一直重試
 isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS);
 if (isMaster) {
 //leader鎖獲取成功,當前服務為主服務
 logger.info("got leadership");
 }
 }
 } catch (InterruptedException e) {

 } 
 }
 }

 //判斷leader鎖是否獲取成功
 public boolean isMaster() {
 return isMaster;
 }
}

tryHold

有了ElectionThread,需要提供一個方法啟動它去獲取鎖

public void tryHold(String leaderName) {
 //分佈式鎖的初始化
 leaderLock = redissonClient.getLock(leaderName);
 //啟動獲取鎖的線程
 electionThread.start();
}

鎖釋放

鎖獲取到了,如果要釋放怎麼釋放?ElectionThread需要加上釋放鎖的邏輯

class ElectionThread extends Thread {

 private boolean isMaster = false;

 public ElectionThread() {
 setName("leader-election");
 }

 @Override
 public void run() {
 while (!stop) {
 try {
 if (isMaster) {
 synchronized (masterLock) {
 //leader鎖獲取到了,就不需要再去獲取了,進入阻塞狀態
 masterLock.wait();
 }
 } else {
 //所有從服務嘗試獲取leader鎖,嘗試並等待一定時間,如果未獲取成功,就一直重試
 isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS);
 if (isMaster) {
 //leader鎖獲取成功,當前服務為主服務
 logger.info("got leadership");
 }
 }
 } catch (InterruptedException e) {

 } 
 }
 //如果leader鎖被當前線程佔用,就釋放鎖
 if (leaderLock.isLocked() && leaderLock.isHeldByCurrentThread()) {
 leaderLock.unlock();
 }

 if (isMaster) {
 isMaster = false;
 }
 }

 //判斷leader鎖是否獲取成功
 public boolean isMaster() {
 return isMaster;
 }
}

shutdown

等等,ElectionThread是加上了釋放鎖的邏輯了,但當ElectionThread得到鎖的時候,線程已經阻塞了,我們需要在外部喚醒ElectionThread線程並跳出while循環

public void shutdown() {
 //stop設為true,ElectionThread中的while循環即可退出
 stop = true;
 try {
 synchronized (masterLock) {
 //喚醒ElectionThread
 masterLock.notifyAll();
 }
 //等待ElectionThread死亡
 electionThread.join();
 } catch (InterruptedException e) {

 }
 logger.info("shutdown and give up leadership");
}

shutdown hook

有了shutdown方法後,我們再加個shutdownHook,就可以在jvm停止時調用shutdown方法,leader鎖就會被釋放

public void tryHold(String leaderName) {
 //分佈式鎖的初始化
 leaderLock = redissonClient.getLock(leaderName);
 //啟動獲取鎖的線程
 electionThread.start();
 //shutdown hook
 Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown()));
}

jvm的shutdownHook是在服務正常退出的情況下才會生效,如果服務異常退出,會怎樣?leader鎖會釋放嗎?放心,redisson有個lockWatchdogTimeout配置,這個配置會讓redisson客戶端處於正常狀態的時候,給那些不會自動釋放的鎖延長過期時間,如果服務異常了,那些不會自動釋放的鎖由於沒有延長過期時間,會被redis自動清除,所以leader鎖即使在服務異常退出的情況下,也會自動釋放。

lockWatchdogTimeout

lockWatchdogTimeout的配置可以用代碼方式設置,也可以用配置文件方式設置,一定要注意它的單位是毫秒(我測試的時候設成了10,結果找了半天問題),默認值是30000毫秒,即30秒。

import org.redisson.config.Config
//從yaml配置文件中讀取配置
Config config = Config.fromYAML(configFile.getInputStream());
//代碼方式設置,如果希望主從切換更快,時間可以設置成5秒
config.setLockWatchdogTimeout(5000)

應用

看起來差不多了,現在我們提供一個方法用於判斷當前服務是否是主服務

public boolean isMaster() {
 return electionThread.isMaster();
}

Ok,現在可以拿來用了,首先看下定時任務的情況,定時任務一般只想在主服務上運行,這時就可以這樣寫了

scheduler.scheduleAtFixedRate(() -> {
 if (!isMaster()) {
 //do nothing
 return;
 }
 //do something
}, 0, 60, TimeUnit.MINUTES);

初始化問題

這個判斷方法能不能在鎖狀態初始化完成之前阻塞,這樣在類似上面的定時任務裡(如1小時)做判斷時,主服務不至於因為沒初始化,然後就得等到下個小時才能執行,我們修改兩個地方。

public boolean isMaster() {
 //這裡加上一個初始化鎖,當沒有初始化時,阻塞當前線程
 synchronized (initLock) {
 if (!isInit) {
 try {
 initLock.wait();
 } catch (InterruptedException e) {

 }
 }
 }
 return electionThread.isMaster();
}

class ElectionThread extends Thread {

 private boolean isMaster = false;

 public ElectionThread() {
 setName("leader-election");
 }

 @Override
 public void run() {
 while (!stop) {
 try {
 if (isMaster) {
 synchronized (masterLock) {
 if (isInit) {
 //獲得鎖且已經初始化過就進入阻塞狀態
 masterLock.wait();
 } else {
 //獲得鎖,還未設置初始化狀態,就等待一會兒,給線程機會設置初始化狀態
 masterLock.wait(Duration.ofSeconds(WAIT_SECONDS).toMillis());
 }
 }
 } else {
 isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS);
 if (isMaster) {
 logger.info("got leadership");
 }
 }
 } catch (InterruptedException e) {

 } finally {
 //設置線程初始化狀態
 synchronized (initLock) {
 if (!isInit) {
 //初始化完成喚醒調用isMaster方法處於阻塞狀態的線程,並設置初始化狀態
 initLock.notifyAll();
 isInit = true;
 }
 }
 }
 }

 if (leaderLock.isLocked() && leaderLock.isHeldByCurrentThread()) {
 leaderLock.unlock();
 }

 if (isMaster) {
 isMaster = false;
 }
 }

 public boolean isMaster() {
 return isMaster;
 }
}

從服務變成主服務

還有一種情況,我們需要考慮,當主服務掛了,從服務變成主服務時,在上面的1小時定時任務已經過了執行時間,現在想在從服務變成主服務時,馬上就執行任務,要怎麼辦?我們可以加上一個從服務變成主服務的監聽器

private List listeners = new ArrayList<>();
public interface ElectionListener {
 void onElected();
}
public void addElectionListener(ElectionListener electionListener) {
 if (listeners.contains(electionListener)) {
 return;
 }
 listeners.add(electionListener);
}
private void notifyElected() {
 for (ElectionListener listener : listeners) {
 listener.onElected();
 }
}

//ElectionThread代碼片斷
isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS);
if (isMaster) {
 logger.info("got leadership");
 notifyElected();
}

總結

大功告成,最後看一下完整版的代碼

LeaderElection

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class LeaderElection {

 private static final Logger logger = LoggerFactory.getLogger(LeaderElection.class);

 private static final int WAIT_SECONDS = 1;

 private RedissonClient redissonClient;

 private RLock leaderLock;

 private boolean stop = false;

 private boolean isInit = false;

 private Object masterLock = new Object();

 private Object initLock = new Object();

 private ElectionThread electionThread = new ElectionThread();

 private List listeners = new ArrayList<>();

 public void tryHold(String leaderName) {
 leaderLock = redissonClient.getLock(leaderName);
 electionThread.start();
 Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdown()));
 }

 public boolean isMaster() {
 synchronized (initLock) {
 if (!isInit) {
 try {
 initLock.wait();
 } catch (InterruptedException e) {

 }
 }
 }
 return electionThread.isMaster();
 }

 public void addElectionListener(ElectionListener electionListener) {
 if (listeners.contains(electionListener)) {
 return;
 }
 listeners.add(electionListener);
 }

 public void shutdown() {
 stop = true;
 try {
 synchronized (masterLock) {
 masterLock.notifyAll();
 }
 electionThread.join();
 listeners.clear();
 } catch (InterruptedException e) {

 }
 logger.info("shutdown and give up leadership");
 }

 class ElectionThread extends Thread {

 private boolean isMaster = false;

 public ElectionThread() {
 setName("leader-election");
 }

 @Override
 public void run() {
 while (!stop) {
 try {
 if (isMaster) {
 synchronized (masterLock) {
 if (isInit) {
 masterLock.wait();
 } else {
 masterLock.wait(Duration.ofSeconds(WAIT_SECONDS).toMillis());
 }
 }
 } else {
 isMaster = leaderLock.tryLock(WAIT_SECONDS, TimeUnit.SECONDS);
 if (isMaster) {
 logger.info("got leadership");
 notifyElected();
 }
 }
 } catch (InterruptedException e) {

 } finally {
 synchronized (initLock) {
 if (!isInit) {
 initLock.notifyAll();
 isInit = true;
 }
 }
 }
 }

 if (leaderLock.isLocked() && leaderLock.isHeldByCurrentThread()) {
 leaderLock.unlock();
 }

 if (isMaster) {
 isMaster = false;
 }
 }

 public boolean isMaster() {
 return isMaster;
 }
 }

 private void notifyElected() {
 for (ElectionListener listener : listeners) {
 listener.onElected();
 }
 }

 public void setRedissonClient(RedissonClient redissonClient) {
 this.redissonClient = redissonClient;
 }

}

ElectionListener

public interface ElectionListener {
 void onElected();
}

在springboot中的配置應用

@Configuration
public class LeaderElectionConfig {

 @Value("${spring.application.name}")
 private String appName;

 @Bean(destroyMethod = "shutdown")
 public LeaderElection leaderElection(RedissonClient redissonClient) {
 LeaderElection leaderElection = new LeaderElection();
 leaderElection.setRedissonClient(redissonClient);
 leaderElection.tryHold("leader-lock-" + appName);
 return leaderElection;
 }

}

源碼

https://github.com/huangyemin/wheel
https://gitee.com/huangyemin/wheel


分享到:


相關文章: