Java架構師之路-如何去實現一個分佈式定時任務?

尹家俊


雖說有現成的框架可以實現,不過還是一步一步地說一下思路。


需求

為方便大家的理解,先給大家講一個真實的需求,這是我在第二家公司的一個項目,定時任務每天凌晨執行,需求很簡單:把原始的業務數據,加工處理成待發送的短信。

原始數據:姓名-小明,所在地-北京,電話-13800000000,賬單最後還款日期-2018年4月30日。

加工後的數據是:親愛的小明,您的賬單最後還款日期為2018年4月30日,請提前繳費。然後把需要把這條短信發送到13800000000這個手機號上。


定時任務

定時任務框架裡面,最有名的就是quartz了,相信大部分Java程序員都用過。

我們項目最開始也用的是quartz,只有一個服務器跑定時任務。但是待處理的數據越來越多,定時服務執行的時間也越來越長,終於有一天,定時任務從晚上跑到了第二天白天也沒有跑完,耽誤了短信的發送。

改造後的定時任務

有人就有疑問了,能不能直接把定時服務部署多套不就行了。但是部署多套quartz的話,就會出現問題:待處理的任務有可能會被重複執行。

應對這種問題,我們當時有兩種處理方案:

方案一:定時服務只部署一套,但是定時任務的工作只是提取待處理的任務。

實際的業務處理服務集群化部署,然後由定式服務提取數據後,發送給業務處理服務器進行實際的處理。

方案二:這個是我當時自己想出的一個奇葩的方法,不過這個方案想明白了,對分佈式定式服務的理解很有幫助!

  • 定時任務程序部署多套,並且多套環境都是獨立的IP。每套程序定時將IP寫入到數據中(一分鐘對錶update一次,並更新時間戳)。
  • 多套服務選舉出一臺主服務器。
  • 主服務器把所有的待處理任務,儘可能平均分配給每一臺服務器。(IP和待處理任務對應上,也就是每一條待處理任務只能讓分配的IP處理)
  • 處理任務的時候,只處理自己IP對應的任務。
  • 一臺服務器掛了,主服務器負責把它的IP從數據庫中抹掉(三分鐘沒有對錶進行更新的IP,刪除掉),並重新分配這個IP對應的待處理任務。
  • 主服務器掛了,重新選舉出主服務器。

分佈式定時任務框架

我只用過Elastic-job,所以只給大家介紹一下這個框架。

任務分片:把一個任務拆分成幾個獨立的任務,然後由分佈式服務器分別執行一個或者多個子任務。比如還是上面那個需求,那麼可以按照【所在地】拆分任務,北京的待處理數據是一個子任務,天津的待處理數據是第二個子任務。

Elastic-Job並不直接提供數據處理的功能,實際的數據處理還是需要自己寫,Elastic-Job會將分片任務分配到各個運行中的作業服務器。

其實發現了沒有,Elastic-Job做的工作,就是我那個主服務器做的任務分配的工作,把所在地=北京的,分配給服務器1處理,把所在地=天津的,分配給服務器2處理;甚至包括監控每臺作業服務器是否存活,掛掉一臺重新分配待處理任務,也都是Elastic-Job來做的。

我將持續分享Java開發、架構設計、程序員職業發展等方面的見解,希望能得到你的關注。


會點代碼的大叔


一、在JAVA開發領域,目前可以通過以下幾種方式進行定時任務

1、單機部署模式

Timer:jdk中自帶的一個定時調度類,可以簡單的實現按某一頻度進行任務執行。提供的功能比較單一,無法實現複雜的調度任務。

ScheduledExecutorService:也是jdk自帶的一個基於線程池設計的定時任務類。其每個調度任務都會分配到線程池中的一個線程執行,所以其任務是併發執行的,互不影響。

Spring Task:Spring提供的一個任務調度工具,支持註解和配置文件形式,支持Cron表達式,使用簡單但功能強大。

Quartz:一款功能強大的任務調度器,可以實現較為複雜的調度功能,如每月一號執行、每天凌晨執行、每週五執行等等,還支持分佈式調度,就是配置稍顯複雜。

2、分佈式集群模式(不多介紹,簡單提一下)

問題:

I、如何解決定時任務的多次執行?

II、如何解決任務的單點問題,實現任務的故障轉移?

問題I的簡單思考:

1、固定執行定時任務的機器(可以有效避免多次執行的情況 ,缺點就是單點故障問題)。

2、藉助Redis的過期機制和分佈式鎖。

3、藉助mysql的鎖機制等。

成熟的解決方案:

1、Quartz:可以去看看這篇文章[Quartz分佈式]( https://www.cnblogs.com/jiafuwei/p/6145280.html)。

2、elastic-job:(https://github.com/elasticjob/elastic-job-lite)噹噹開發的彈性分佈式任務調度系統,採用zookeeper實現分佈式協調,實現任務高可用以及分片。

3、xxl-job:(https://github.com/xuxueli/xxl-job)是大眾點評員發佈的分佈式任務調度平臺,是一個輕量級分佈式任務調度框架。

4、saturn:(https://github.com/vipshop/Saturn) 是唯品會提供一個分佈式、容錯和高可用的作業調度服務框架。

二、SpringTask實現定時任務(這裡是基於springboot)

1、簡單的定時任務實現

使用方式:

使用@EnableScheduling註解開啟對定時任務的支持。

使用@Scheduled 註解即可,基於corn、fixedRate、fixedDelay等一些定時策略來實現定時任務。

使用缺點:

1、多個定時任務使用的是同一個調度線程,所以任務是阻塞執行的,執行效率不高。

2、其次如果出現任務阻塞,導致一些場景的定時計算沒有實際意義,比如每天12點的一個計算任務被阻塞到1點去執行,會導致結果並非我們想要的。

使用優點:

1、配置簡單

2、適用於單個後臺線程執行週期任務,並且保證順序一致執行的場景

源碼分析:

//默認使用的調度器

if(this.taskScheduler == null) {

this.localExecutor = Executors.newSingleThreadScheduledExecutor();

this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);

}

//可以看到SingleThreadScheduledExecutor指定的核心線程為1,說白了就是單線程執行

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {

return new DelegatedScheduledExecutorService

(new ScheduledThreadPoolExecutor(1));

}

//利用了DelayedWorkQueue延時隊列作為任務的存放隊列,這樣便可以實現任務延遲執行或者定時執行

public ScheduledThreadPoolExecutor(int corePoolSize) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

new DelayedWorkQueue());

}

  

2、實現併發的定時任務

使用方式:

方式一:由1中我們知道之所以定時任務是阻塞執行,是配置的線程池決定的,那就好辦了,換一個不就行了!直接上代碼:

@Configuration

public class ScheduledConfig implements SchedulingConfigurer {

@Autowired

private TaskScheduler myThreadPoolTaskScheduler;

@Override

public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {

//簡單粗暴的方式直接指定

//scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));

//也可以自定義的線程池,方便線程的使用與維護,這裡不多說了

scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);

}

}

@Bean(name = "myThreadPoolTaskScheduler")

public TaskScheduler getMyThreadPoolTaskScheduler() {

ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();

taskScheduler.setPoolSize(10);

taskScheduler.setThreadNamePrefix("Haina-Scheduled-");

taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//調度器shutdown被調用時等待當前被調度的任務完成

taskScheduler.setWaitForTasksToCompleteOnShutdown(true);

//等待時長

taskScheduler.setAwaitTerminationSeconds(60);

return taskScheduler;

}

方式二:方式一的本質改變了任務調度器默認使用的線程池,接下來這種是不改變調度器的默認線程池,而是把當前任務交給一個異步線程池去執行

廢話太多,直接上代碼:

@Scheduled(fixedRate = 1000*10,initialDelay = 1000*20)

@Async("myThreadPoolTaskExecutor")

//@Async

public void scheduledTest02(){

System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId());

}

//自定義線程池

@Bean(name = "myThreadPoolTaskExecutor")

public TaskExecutor getMyThreadPoolTaskExecutor() {

ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

taskExecutor.setCorePoolSize(20);

taskExecutor.setMaxPoolSize(200);

taskExecutor.setQueueCapacity(25);

taskExecutor.setKeepAliveSeconds(200);

taskExecutor.setThreadNamePrefix("Haina-ThreadPool-");

// 線程池對拒絕任務(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認為後者

taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//調度器shutdown被調用時等待當前被調度的任務完成

taskExecutor.setWaitForTasksToCompleteOnShutdown(true);

//等待時長

taskExecutor.setAwaitTerminationSeconds(60);

taskExecutor.initialize();

return taskExecutor;

}

首先使用@EnableAsync 啟用異步任務

然後在定時任務的方法加上@Async即可,默認使用的線程池為SimpleAsyncTaskExecutor(該線程池默認來一個任務創建一個線程,就會不斷創建大量線程,極有可能壓爆服務器內存。當然它有自己的限流機制,這裡就不多說了,有興趣的自己翻翻源碼~)

項目中為了更好的控制線程的使用,我們可以自定義我們自己的線程池,使用方式@Async("myThreadPool")

線程池的使用心得(後續有專門文章來探討)

java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,對應與spring中的ThreadPoolTaskExecutor和ThreadPoolTaskScheduler,但是在原有的基礎上增加了新的特性,在spring環境下更容易使用和控制。

使用自定義的線程池能夠避免一些默認線程池造成的內存溢出、阻塞等等問題,更貼合自己的服務特性

使用自定義的線程池便於對項目中線程的管理、維護以及監控。

即便在非spring環境下也不要使用java默認提供的那幾種線程池,坑很多,阿里代碼規約不說了嗎,得相信大廠!!!

三、動態定時任務的實現

問題:

使用@Scheduled註解來完成設置定時任務,但是有時候我們往往需要對週期性的時間的設置會做一些改變,或者要動態的啟停一個定時任務,那麼這個時候使用此註解就不太方便了,原因在於這個註解中配置的cron表達式必須是常量,那麼當我們修改定時參數的時候,就需要停止服務,重新部署。

解決辦法:

方式一:實現SchedulingConfigurer接口,重寫configureTasks方法,重新制定Trigger,核心方法就是addTriggerTask(Runnable task, Trigger trigger) ,不過需要注意的是,此種方式修改了配置值後,需要在下一次調度結束後,才會更新調度器,並不會在修改配置值時實時更新,實時更新需要在修改配置值時額外增加相關邏輯處理。

@Configuration

public class ScheduledConfig implements SchedulingConfigurer {

@Autowired

private TaskScheduler myThreadPoolTaskScheduler;

@Override

public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {

//scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));

scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);

//可以實現動態調整定時任務的執行頻率

scheduledTaskRegistrar.addTriggerTask(

//1.添加任務內容(Runnable)

() -> System.out.println("cccccccccccccccc--->" + Thread.currentThread().getId()),

//2.設置執行週期(Trigger)

triggerContext -> {

//2.1 從數據庫動態獲取執行週期

String cron = "0/2 * * * * ? ";

//2.2 合法性校驗.

// if (StringUtils.isEmpty(cron)) {

// // Omitted Code ..

// }

//2.3 返回執行週期(Date)

return new CronTrigger(cron).nextExecutionTime(triggerContext);

}

);

}

}

方式二:使用threadPoolTaskScheduler類可實現動態添加刪除功能,當然也可實現執行頻率的調整

首先,我們要認識下這個調度類,它其實是對java中ScheduledThreadPoolExecutor的一個封裝改進後的產物,主要改進有以下幾點:

1、提供默認配置,因為是ScheduledThreadPoolExecutor,所以只有poolSize這一個默認參數。

2、支持自定義任務,通過傳入Trigger參數。

3、對任務出錯處理進行優化,如果是重複性的任務,不拋出異常,通過日誌記錄下來,不影響下次運行,如果是隻執行一次的任務,將異常往上拋。

順便說下ThreadPoolTaskExecutor相對於ThreadPoolExecutor的改進點:

1、提供默認配置,原生的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其他沒有默認配置

2、實現AsyncListenableTaskExecutor接口,支持對FutureTask添加success和fail的回調,任務成功或失敗的時候回執行對應回調方法。

3、因為是spring的工具類,所以拋出的RejectedExecutionException也會被轉換為spring框架的TaskRejectedException異常(這個無所謂)

4、提供默認ThreadFactory實現,直接通過參數重載配置

扯了這麼多,還是直接上代碼:

@Component

public class DynamicTimedTask {

private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);

//利用創建好的調度類統一管理

//@Autowired

//@Qualifier("myThreadPoolTaskScheduler")

//private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;

//接受任務的返回結果

private ScheduledFuture> future;

@Autowired

private ThreadPoolTaskScheduler threadPoolTaskScheduler;

//實例化一個線程池任務調度類,可以使用自定義的ThreadPoolTaskScheduler

@Bean

public ThreadPoolTaskScheduler threadPoolTaskScheduler() {

ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();

return new ThreadPoolTaskScheduler();

}

/**

* 啟動定時任務

* @return

*/

public boolean startCron() {

boolean flag = false;

//從數據庫動態獲取執行週期

String cron = "0/2 * * * * ? ";

future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron);

if (future!=null){

flag = true;

logger.info("定時check訓練模型文件,任務啟動成功!!!");

}else {

logger.info("定時check訓練模型文件,任務啟動失敗!!!");

}

return flag;

}

/**

* 停止定時任務

* @return

*/

public boolean stopCron() {

boolean flag = false;

if (future != null) {

boolean cancel = future.cancel(true);

if (cancel){

flag = true;

logger.info("定時check訓練模型文件,任務停止成功!!!");

}else {

logger.info("定時check訓練模型文件,任務停止失敗!!!");

}

}else {

flag = true;

logger.info("定時check訓練模型文件,任務已經停止!!!");

}

return flag;

}

class CheckModelFile implements Runnable{

@Override

public void run() {

//編寫你自己的業務邏輯

System.out.print("模型文件檢查完畢!!!")

}

}

}

四、總結

到此基於springtask下的定時任務的簡單使用算是差不多了,其中不免有些錯誤的地方,或者理解有偏頗的地方歡迎大家提出來!

基於分佈式集群下的定時任務使用,後續有時間再繼續!!!


IT實戰聯盟


需要一個全局協調器,記master。接下來就是要幹活的應用服務器,記slave。定時器而且是分佈式的,就要求每個服務器在指定時間都執行相同任務,而且實現對任務的管理如暫停,啟動,停止等!甚至可以創建指定的定時器。還要保證任務執行的原子性!定時任務由master統一分發給每個slave!之後每個slave將處理結果返回給master。由master統一保存或是回滾!

實現的技術比較多:支持分佈式的如zookeeper,系統信息交互的如netty,支持定時器的如quartz。


tryetry


需要一個任務分發集群和一個任務執行集群,任務分發集群三臺機器即可,任務執行集群依賴你的業務量。任務分發集群執行分發任務,按照一定的策略將任務分發給執行集群各個機器。需要解決任務重複分發問題,以及執行任務的server和分發任務的server掛掉的問題,總之就是兩個集群的高可用。

解決重複分發問題,很簡單,採用主從模式,這涉及選主操作,分發集群只有一臺主server工作,其他server在主server掛掉後選主繼續分發,選主使用zk非常方便。

執行集群中server掛掉後,其上任務需要重新分發給其他server,這個問題使用zk也是很方便可以解決

手機打字 細節就不寫了


濺濺123321


分佈式任務本身涉及到分佈式構架,基於不同的任務處理量和反應時間可以考慮 hadoop spark flink 等。 定時最好用serverless的scheduler 觸發。


分享到:


相關文章: