10種延遲任務的實現方式彙總!附詳細代碼(牆裂推薦)

推薦學習

今天的主題,本文的主要內容如下圖所示:

10種延遲任務的實現方式彙總!附詳細代碼(牆裂推薦)

什麼是延遲任務?

顧明思議,我們把需要延遲執行的任務叫做延遲任務

延遲任務的使用場景有以下這些:

  1. 紅包 24 小時未被查收,需要延遲執退還業務;
  2. 每個月賬單日,需要給用戶發送當月的對賬單;
  3. 訂單下單之後 30 分鐘後,用戶如果沒有付錢,系統需要自動取消訂單。

等事件都需要使用延遲任務。

延遲任務實現思路分析

延遲任務實現的關鍵是在某個時間節點執行某個任務。基於這個信息我們可以想到實現延遲任務的手段有以下兩個:

  1. 自己手寫一個“死循環”一直判斷當前時間節點有沒有要執行的任務;
  2. 藉助 JDK 或者第三方提供的工具類來實現延遲任務。

而通過 JDK 實現延遲任務我們能想到的關鍵詞是:DelayQueue、ScheduledExecutorService,而第三方提供的延遲任務執行方法就有很多了,例如:Redis、Netty、MQ 等手段。

延遲任務實現

下面我們將結合代碼來講解每種延遲任務的具體實現。

1.無限循環實現延遲任務

此方式我們需要開啟一個無限循環一直掃描任務,然後使用一個 Map 集合用來存儲任務和延遲執行的時間,實現代碼如下:

<code>

import

java.time.Instant;

import

java.time.LocalDateTime;

import

java.util.HashMap;

import

java.util.Iterator;

import

java.util.Map;

public

class

DelayTaskExample

{

private

static

Map

<

String

,

Long

> _TaskMap = new

HashMap

<>();

public

static

void main(

String

[] args) {

System

.out.

println

(

"程序啟動時間:"

+

LocalDateTime

.now()); _TaskMap.put(

"task-1"

,

Instant

.now().plusSeconds(

3

).toEpochMilli()); loopTask(); }

public

static

void loopTask() {

Long

itemLong = 0L;

while

(

true

) {

Iterator

it = _TaskMap.entrySet().iterator();

while

(it.hasNext()) {

Map

.

Entry

entry = (

Map

.

Entry

) it.next(); itemLong = (

Long

) entry.getValue();

if

(

Instant

.now().toEpochMilli() >= itemLong) {

System

.out.

println

(

"執行任務:"

+ entry.getKey() +

" ,執行時間:"

+

LocalDateTime

.now()); _TaskMap.remove(entry.getKey()); } } } } }/<code>

以上程序執行的結果為:

程序啟動時間:2020-04-12T18:51:28.188

執行任務:task-1 ,執行時間:2020-04-12T18:51:31.189

可以看出任務延遲了 3s 鍾執行了,符合我們的預期。

2.Java API 實現延遲任務

Java API 提供了兩種實現延遲任務的方法:DelayQueue 和 ScheduledExecutorService。

① ScheduledExecutorService 實現延遲任務

我們可以使用 ScheduledExecutorService 來以固定的頻率一直執行任務,實現代碼如下:

<code>

public

class

DelayTaskExample

{

public

static

void

main

(

String[] args

)

{ System.

out

.println(

"程序啟動時間:"

+ LocalDateTime.now()); scheduledExecutorServiceTask(); }

public

static

void

scheduledExecutorServiceTask

(

)

{ ScheduledExecutorService executor = Executors.newScheduledThreadPool(

1

); executor.scheduleWithFixedDelay(

new

Runnable() { @

Override

public

void

run

(

)

{ System.

out

.println(

"執行任務"

+

" ,執行時間:"

+ LocalDateTime.now()); } },

2

,

2

, TimeUnit.SECONDS); } }/<code>

以上程序執行的結果為:

程序啟動時間:2020-04-12T21:28:10.416

執行任務 ,執行時間:2020-04-12T21:28:12.421

執行任務 ,執行時間:2020-04-12T21:28:14.422

......

可以看出使用 ScheduledExecutorService#scheduleWithFixedDelay(...) 方法之後,會以某個頻率一直循環執行延遲任務。

② DelayQueue 實現延遲任務

DelayQueue 是一個支持延時獲取元素的無界阻塞隊列,隊列中的元素必須實現 Delayed 接口,並重寫 getDelay(TimeUnit) 和 compareTo(Delayed) 方法,DelayQueue 實現延遲隊列的完整代碼如下:

<code>

public

class

DelayTest

{

public

static

void

main

(String[] args)

throws

InterruptedException

{ DelayQueue delayQueue =

new

DelayQueue(); delayQueue.put(

new

DelayElement(

1000

)); delayQueue.put(

new

DelayElement(

3000

)); delayQueue.put(

new

DelayElement(

5000

)); System.out.println(

"開始時間:"

+ DateFormat.getDateTimeInstance().format(

new

Date()));

while

(!delayQueue.isEmpty()){ System.out.println(delayQueue.take()); } System.out.println(

"結束時間:"

+ DateFormat.getDateTimeInstance().format(

new

Date())); }

static

class

DelayElement

implements

Delayed

{

long

delayTime = System.currentTimeMillis();

public

DelayElement

(

long

delayTime)

{

this

.delayTime = (

this

.delayTime + delayTime); }

public

long

getDelay

(TimeUnit unit)

{

return

unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }

public

int

compareTo

(Delayed o)

{

if

(

this

.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {

return

1

; }

else

if

(

this

.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {

return

-

1

; }

else

{

return

0

; } }

public

String

toString

()

{

return

DateFormat.getDateTimeInstance().format(

new

Date(delayTime)); } } }/<code>

以上程序執行的結果為:

開始時間:2020-4-12 20:40:38

2020-4-12 20:40:39

2020-4-12 20:40:41

2020-4-12 20:40:43

結束時間:2020-4-12 20:40:43

3.Redis 實現延遲任務

使用 Redis

實現延遲任務的方法大體可分為兩類:通過 zset 數據判斷的方式,和通過鍵空間通知的方式

① 通過數據判斷的方式

我們藉助 zset 數據類型,把延遲任務存儲在此數據集合中,然後在開啟一個無線循環查詢當前時間的所有任務進行消費,實現代碼如下(需要藉助 Jedis 框架):

<code>

import

redis.clients.jedis.Jedis;

import

utils.JedisUtils;

import

java.time.Instant;

import

java.util.Set;

public

class

DelayQueueExample

{

private

static

final

String _KEY =

"myDelayQueue"

;

public

static

void

main

(String[] args)

throws

InterruptedException

{ Jedis jedis = JedisUtils.getJedis();

long

delayTime = Instant.now().plusSeconds(

30

).getEpochSecond(); jedis.zadd(_KEY, delayTime,

"order_1"

); jedis.zadd(_KEY, Instant.now().plusSeconds(

2

).getEpochSecond(),

"order_2"

); jedis.zadd(_KEY, Instant.now().plusSeconds(

2

).getEpochSecond(),

"order_3"

); jedis.zadd(_KEY, Instant.now().plusSeconds(

7

).getEpochSecond(),

"order_4"

); jedis.zadd(_KEY, Instant.now().plusSeconds(

10

).getEpochSecond(),

"order_5"

); doDelayQueue(jedis); }

public

static

void

doDelayQueue

(Jedis jedis)

throws

InterruptedException

{

while

(

true

) { Instant nowInstant = Instant.now();

long

lastSecond = nowInstant.plusSeconds(-

1

).getEpochSecond();

long

nowSecond = nowInstant.getEpochSecond(); Set data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);

for

(String item : data) { System.out.println(

"消費:"

+ item); } jedis.zremrangeByScore(_KEY, lastSecond, nowSecond); Thread.sleep(

1000

); } } }/<code>

② 通過鍵空間通知

默認情況下 Redis 服務器端是不開啟鍵空間通知的,需要我們通過 config set notify-keyspace-events Ex 的命令手動開啟,開啟鍵空間通知後,我們就可以拿到每個鍵值過期的事件,我們利用這個機制實現了給每個人開啟一個定時任務的功能,實現代碼如下:

<code>

import

redis.clients.jedis.Jedis;

import

redis.clients.jedis.JedisPubSub;

import

utils.JedisUtils;

public

class

TaskExample

{

public

static

final

String _TOPIC =

"__keyevent@0__:expired"

;

public

static

void

main

(String[] args)

{ Jedis jedis = JedisUtils.getJedis(); doTask(jedis); }

public

static

void

doTask

(Jedis jedis)

{ jedis.psubscribe(

new

JedisPubSub() {

public

void

onPMessage

(String pattern, String channel, String message)

{ System.out.println(

"收到消息:"

+ message); } }, _TOPIC); } }/<code>

4.Netty 實現延遲任務

Netty 是由 JBOSS 提供的一個 Java 開源框架,它是一個基於 NIO 的客戶、服務器端的編程框架,使用 Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶、服務端應用。Netty 相當於簡化和流線化了網絡應用的編程開發過程,例如:基於 TCP 和 UDP 的 socket 服務開發。

可以使用 Netty 提供的工具類 HashedWheelTimer 來實現延遲任務,實現代碼如下。

首先在項目中添加 Netty 引用,配置如下:

<code> 

<

dependency

>

<

groupId

>

io.netty

groupId

>

<

artifactId

>

netty-common

artifactId

>

<

version

>

4.1.48.Final

version

>

dependency

>

/<code>

Netty 實現的完整代碼如下:

<code>

public

class

DelayTaskExample

{

public

static

void

main

(

String[] args

)

{ System.

out

.println(

"程序啟動時間:"

+ LocalDateTime.now()); NettyTask(); }

private

static

void

NettyTask

(

)

{ HashedWheelTimer timer =

new

HashedWheelTimer(

3

, TimeUnit.SECONDS,

100

); TimerTask task =

new

TimerTask() { @

Override

public

void

run

(

Timeout timeout

) throws Exception

{ System.

out

.println(

"執行任務"

+

" ,執行時間:"

+ LocalDateTime.now()); } }; timer.newTimeout(task,

0

, TimeUnit.SECONDS); } }/<code>

以上程序執行的結果為:

程序啟動時間:2020-04-13T10:16:23.033

執行任務 ,執行時間:2020-04-13T10:16:26.118

HashedWheelTimer 是使用定時輪實現的,定時輪其實就是一種環型的數據結構,可以把它想象成一個時鐘,分成了許多格子,每個格子代表一定的時間,在這個格子上用一個鏈表來保存要執行的超時任務,同時有一個指針一格一格的走,走到那個格子時就執行格子對應的延遲任務,如下圖所示:

10種延遲任務的實現方式彙總!附詳細代碼(牆裂推薦)

以上的圖片可以理解為,時間輪大小為 8,某個時間轉一格(例如 1s),每格指向一個鏈表,保存著待執行的任務。

5.MQ 實現延遲任務

如果專門開啟一個 MQ 中間件來執行延遲任務,就有點殺雞用宰牛刀般的奢侈了,不過已經有了 MQ 環境的話,用它來實現延遲任務的話,還是可取的。

幾乎所有的 MQ 中間件都可以實現延遲任務,在這裡更準確的叫法應該叫延隊列。本文就使用 RabbitMQ 為例,來看它是如何實現延遲任務的。

RabbitMQ 實現延遲隊列的方式有兩種:

  • 通過消息過期後進入死信交換器,再由交換器轉發到延遲消費隊列,實現延遲功能;
  • 使用 rabbitmq-delayed-message-exchange 插件實現延遲功能。

注意: 延遲插件
rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的,依賴 Erlang/OPT 18.0 及以上運行環境。

由於使用死信交換器比較麻煩,所以推薦使用第二種實現方式


rabbitmq-delayed-message-exchange 插件的方式實現延遲隊列的功能。

首先,我們需要下載並安裝
rabbitmq-delayed-message-exchange 插件,下載地址:
www.rabbitmq.com/community-p…

選擇相應的對應的版本進行下載,然後拷貝到 RabbitMQ 服務器目錄,使用命令 rabbitmq-plugins enable
rabbitmq_delayed_message_exchange 開啟插件,在使用命令 rabbitmq-plugins list 查詢安裝的所有插件,安裝成功如下圖所示:

10種延遲任務的實現方式彙總!附詳細代碼(牆裂推薦)

最後重啟 RabbitMQ 服務,使插件生效。

首先,我們先要配置消息隊列,實現代碼如下:

<code>

import

com.example.rabbitmq.mq.DirectConfig;

import

org.springframework.amqp.core.*;

import

org.springframework.context.annotation.Bean;

import

org.springframework.context.annotation.Configuration;

import

java.util.HashMap;

import

java.util.Map;

public

class

DelayedConfig

{

final

static

String QUEUE_NAME =

"delayed.goods.order"

;

final

static

String EXCHANGE_NAME =

"delayedec"

;

public

Queue

queue

()

{

return

new

Queue(DelayedConfig.QUEUE_NAME); }

CustomExchange

customExchange

()

{ Map args =

new

HashMap<>(); args.put(

"x-delayed-type"

,

"direct"

);

return

new

CustomExchange(DelayedConfig.EXCHANGE_NAME,

"x-delayed-message"

,

true

,

false

, args); }

Binding

binding

(Queue queue, CustomExchange exchange)

{

return

BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs(); } }/<code>

然後添加增加消息的代碼,具體實現如下:

<code>

import

org.springframework.amqp.AmqpException;

import

org.springframework.amqp.core.AmqpTemplate;

import

org.springframework.amqp.core.Message;

import

org.springframework.amqp.core.MessagePostProcessor;

import

org.springframework.beans.factory.annotation.Autowired;

import

org.springframework.stereotype.Component;

import

java.text.SimpleDateFormat;

import

java.util.Date;

public

class

DelayedSender

{

private

AmqpTemplate rabbitTemplate;

public

void

send

(String msg)

{ SimpleDateFormat sf =

new

SimpleDateFormat(

"yyyy-MM-dd HH:mm:ss"

); System.out.println(

"發送時間:"

+ sf.format(

new

Date())); rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg,

new

MessagePostProcessor() {

public

Message

postProcessMessage

(Message message)

throws

AmqpException

{ message.getMessageProperties().setHeader(

"x-delay"

,

3000

);

return

message; } }); } }/<code>

再添加消費消息的代碼:

<code>

import

org.springframework.amqp.rabbit.

annotation

.RabbitHandler;

import

org.springframework.amqp.rabbit.

annotation

.RabbitListener;

import

org.springframework.stereotype.Component;

import

java.text.SimpleDateFormat;

import

java.util.Date;

public

class

DelayedReceiver

{

public

void process(String msg) { SimpleDateFormat sdf = new SimpleDateFormat(

"yyyy-MM-dd HH:mm:ss"

); System.

out

.println(

"接收時間:"

+ sdf.format(new Date())); System.

out

.println(

"消息內容:"

+ msg); } }/<code>

最後,我們使用代碼測試一下:

<code>

import

com.example.rabbitmq.RabbitmqApplication;

import

com.example.rabbitmq.mq.delayed.DelayedSender;

import

org.junit.Test;

import

org.junit.runner.RunWith;

import

org.springframework.beans.factory.annotation.Autowired;

import

org.springframework.boot.test.context.SpringBootTest;

import

org.springframework.test.context.junit4.SpringRunner;

import

java.text.SimpleDateFormat;

import

java.util.Date; @

RunWith

(

SpringRunner

.

class

) @

SpringBootTest

public

class

DelayedTest

{ @

Autowired

private

DelayedSender

sender; @

Test

public

void

Test

()

throws

InterruptedException

{

SimpleDateFormat

sf = new

SimpleDateFormat

(

"yyyy-MM-dd"

); sender.send(

"Hi Admin."

);

Thread

.sleep(

5

*

1000

); } }/<code>

以上程序的執行結果如下:

發送時間:2020-04-13 20:47:51

接收時間:2020-04-13 20:47:54

消息內容:Hi Admin.

從結果可以看出,以上程序執行符合延遲任務的實現預期。

6.使用 Spring 定時任務

如果你使用的是 Spring 或 SpringBoot 的項目的話,可以使用藉助 Scheduled 來實現,本文將使用 SpringBoot 項目來演示 Scheduled 的實現,實現我們需要聲明開啟 Scheduled,實現代碼如下:

<code>

@SpringBootApplication

@EnableScheduling

public class Application {

public

static

void

main

(String[] args) {

SpringApplication

.run

(Application.class, args); } }/<code>

然後添加延遲任務,實現代碼如下:

<code> 

public

class

ScheduleJobs

{ (fixedDelay =

2

*

1000

)

public

void

fixedDelayJob

()

throws

InterruptedException

{ System.out.println(

"任務執行,時間:"

+ LocalDateTime.now()); } }/<code>

此時當我們啟動項目之後就可以看到任務以延遲了 2s 的形式一直循環執行,結果如下:

任務執行,時間:2020-04-13T14:07:53.349

任務執行,時間:2020-04-13T14:07:55.350

任務執行,時間:2020-04-13T14:07:57.351

...

我們也可以使用 Corn 表達式來定義任務執行的頻率,例如使用 @Scheduled(cron = "0/4 * * * * ?") 。

7.Quartz 實現延遲任務

Quartz 是一款功能強大的任務調度器,可以實現較為複雜的調度功能,它還支持分佈式的任務調度。

我們使用 Quartz 來實現一個延遲任務,首先定義一個執行任務代碼如下:

<code>

import

org.quartz.JobExecutionContext;

import

org.quartz.JobExecutionException;

import

org.springframework.scheduling.quartz.QuartzJobBean;

import

java.time.LocalDateTime;

public

class

SampleJob

extends

QuartzJobBean

{

protected

void

executeInternal

(JobExecutionContext jobExecutionContext)

throws

JobExecutionException

{ System.out.println(

"任務執行,時間:"

+ LocalDateTime.now()); } }/<code>

在定義一個 JobDetail 和 Trigger 實現代碼如下:

<code>

import

org.quartz.*;

import

org.springframework.context.

annotation

.Bean;

import

org.springframework.context.

annotation

.Configuration;

public

class

SampleScheduler

{

public

JobDetail sampleJobDetail() {

return

JobBuilder.newJob(SampleJob

.

class

).

withIdentity

(

"sampleJob"

) .storeDurably().build(); }

public

Trigger sampleJobTrigger() { SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(

3

).withRepeatCount(

1

);

return

TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity(

"sampleTrigger"

) .withSchedule(scheduleBuilder).build(); } }/<code>

最後在 SpringBoot 項目啟動之後開啟延遲任務,實現代碼如下:

<code>

import

org.springframework.beans.factory.annotation.Autowired;

import

org.springframework.boot.CommandLineRunner;

import

org.springframework.scheduling.quartz.SchedulerFactoryBean;

public

class

MyStartupRunner

implements

CommandLineRunner

{

private

SchedulerFactoryBean schedulerFactoryBean;

private

SampleScheduler sampleScheduler;

public

void

run

(String... args)

throws

Exception

{ schedulerFactoryBean.getScheduler().scheduleJob( sampleScheduler.sampleJobTrigger()); } }/<code>

以上程序的執行結果如下:

2020-04-13 19:02:12.331 INFO 17768 --- [ restartedMain] com.example.demo.DemoApplication : Started DemoApplication in 1.815 seconds (JVM running for 3.088)

任務執行,時間:2020-04-13T19:02:15.019

從結果可以看出在項目啟動 3s 之後執行了延遲任務。

總結

本文講了延遲任務的使用場景,以及延遲任務的 10 種實現方式:

  1. 手動無線循環;
  2. ScheduledExecutorService;
  3. DelayQueue;
  4. Redis zset 數據判斷的方式;
  5. Redis 鍵空間通知的方式;
  6. Netty 提供的 HashedWheelTimer 工具類;
  7. RabbitMQ 死信隊列;
  8. RabbitMQ 延遲消息插件 rabbitmq-delayed-message-exchange;
  9. Spring Scheduled;
  10. Quartz。

作者:Java中文社群

原文鏈接:
https://juejin.im/post/5e95248551882573725049dc


分享到:


相關文章: