推薦學習
今天的主題,本文的主要內容如下圖所示:
什麼是延遲任務?
顧明思議,我們把需要延遲執行的任務叫做延遲任務。
延遲任務的使用場景有以下這些:
- 紅包 24 小時未被查收,需要延遲執退還業務;
- 每個月賬單日,需要給用戶發送當月的對賬單;
- 訂單下單之後 30 分鐘後,用戶如果沒有付錢,系統需要自動取消訂單。
等事件都需要使用延遲任務。
延遲任務實現思路分析
延遲任務實現的關鍵是在某個時間節點執行某個任務。基於這個信息我們可以想到實現延遲任務的手段有以下兩個:
- 自己手寫一個“死循環”一直判斷當前時間節點有沒有要執行的任務;
- 藉助 JDK 或者第三方提供的工具類來實現延遲任務。
而通過 JDK 實現延遲任務我們能想到的關鍵詞是:DelayQueue、ScheduledExecutorService,而第三方提供的延遲任務執行方法就有很多了,例如:Redis、Netty、MQ 等手段。
延遲任務實現
下面我們將結合代碼來講解每種延遲任務的具體實現。
1.無限循環實現延遲任務
此方式我們需要開啟一個無限循環一直掃描任務,然後使用一個 Map 集合用來存儲任務和延遲執行的時間,實現代碼如下:
<code>import
java.time.Instant;import
java.time.LocalDateTime;import
java.util.HashMap;import
java.util.Iterator;import
java.util.Map;public
class
DelayTaskExample
{private
static
Map
<String
,Long
> _TaskMap = newHashMap
<>();public
static
void main(String
[] args) {System
.out.println
("程序啟動時間:"
+LocalDateTime
.now()); _TaskMap.put("task-1"
,Instant
.now().plusSeconds(3
).toEpochMilli()); loopTask(); }public
static
void loopTask() {Long
itemLong = 0L;while
(true
) {Iterator
it = _TaskMap.entrySet().iterator();while
(it.hasNext()) {Map
.Entry
entry = (Map
.Entry
) it.next(); itemLong = (Long
) entry.getValue();if
(Instant
.now().toEpochMilli() >= itemLong) {System
.out.println
("執行任務:"
+ entry.getKey() +" ,執行時間:"
+LocalDateTime
.now()); _TaskMap.remove(entry.getKey()); } } } } }/<code>
以上程序執行的結果為:
程序啟動時間:2020-04-12T18:51:28.188
執行任務:task-1 ,執行時間:2020-04-12T18:51:31.189
可以看出任務延遲了 3s 鍾執行了,符合我們的預期。
2.Java API 實現延遲任務
Java API 提供了兩種實現延遲任務的方法:DelayQueue 和 ScheduledExecutorService。
① ScheduledExecutorService 實現延遲任務
我們可以使用 ScheduledExecutorService 來以固定的頻率一直執行任務,實現代碼如下:
<code>public
class
DelayTaskExample
{public
static
void
main
(String[] args
) { System.out
.println("程序啟動時間:"
+ LocalDateTime.now()); scheduledExecutorServiceTask(); }public
static
void
scheduledExecutorServiceTask
() { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1
); executor.scheduleWithFixedDelay(new
Runnable() { @Override
public
void
run
() { System.out
.println("執行任務"
+" ,執行時間:"
+ LocalDateTime.now()); } },2
,2
, TimeUnit.SECONDS); } }/<code>
以上程序執行的結果為:
程序啟動時間:2020-04-12T21:28:10.416
執行任務 ,執行時間:2020-04-12T21:28:12.421
執行任務 ,執行時間:2020-04-12T21:28:14.422
......
可以看出使用 ScheduledExecutorService#scheduleWithFixedDelay(...) 方法之後,會以某個頻率一直循環執行延遲任務。
② DelayQueue 實現延遲任務
DelayQueue 是一個支持延時獲取元素的無界阻塞隊列,隊列中的元素必須實現 Delayed 接口,並重寫 getDelay(TimeUnit) 和 compareTo(Delayed) 方法,DelayQueue 實現延遲隊列的完整代碼如下:
<code>public
class
DelayTest
{public
static
void
main
(String[] args)
throws
InterruptedException { DelayQueue delayQueue =new
DelayQueue(); delayQueue.put(new
DelayElement(1000
)); delayQueue.put(new
DelayElement(3000
)); delayQueue.put(new
DelayElement(5000
)); System.out.println("開始時間:"
+ DateFormat.getDateTimeInstance().format(new
Date()));while
(!delayQueue.isEmpty()){ System.out.println(delayQueue.take()); } System.out.println("結束時間:"
+ DateFormat.getDateTimeInstance().format(new
Date())); }static
class
DelayElement
implements
Delayed
{long
delayTime = System.currentTimeMillis();public
DelayElement
(
long
delayTime) {this
.delayTime = (this
.delayTime + delayTime); }public
long
getDelay
(TimeUnit unit)
{return
unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }public
int
compareTo
(Delayed o)
{if
(this
.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return
1
; }else
if
(this
.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {return
-1
; }else
{return
0
; } }public
StringtoString
()
{return
DateFormat.getDateTimeInstance().format(new
Date(delayTime)); } } }/<code>
以上程序執行的結果為:
開始時間:2020-4-12 20:40:38
2020-4-12 20:40:39
2020-4-12 20:40:41
2020-4-12 20:40:43
結束時間:2020-4-12 20:40:43
3.Redis 實現延遲任務
使用 Redis
實現延遲任務的方法大體可分為兩類:通過 zset 數據判斷的方式,和通過鍵空間通知的方式。① 通過數據判斷的方式
我們藉助 zset 數據類型,把延遲任務存儲在此數據集合中,然後在開啟一個無線循環查詢當前時間的所有任務進行消費,實現代碼如下(需要藉助 Jedis 框架):
<code>import
redis.clients.jedis.Jedis;import
utils.JedisUtils;import
java.time.Instant;import
java.util.Set;public
class
DelayQueueExample
{private
static
final
String _KEY ="myDelayQueue"
;public
static
void
main
(String[] args)
throws
InterruptedException { Jedis jedis = JedisUtils.getJedis();long
delayTime = Instant.now().plusSeconds(30
).getEpochSecond(); jedis.zadd(_KEY, delayTime,"order_1"
); jedis.zadd(_KEY, Instant.now().plusSeconds(2
).getEpochSecond(),"order_2"
); jedis.zadd(_KEY, Instant.now().plusSeconds(2
).getEpochSecond(),"order_3"
); jedis.zadd(_KEY, Instant.now().plusSeconds(7
).getEpochSecond(),"order_4"
); jedis.zadd(_KEY, Instant.now().plusSeconds(10
).getEpochSecond(),"order_5"
); doDelayQueue(jedis); }public
static
void
doDelayQueue
(Jedis jedis)
throws
InterruptedException {while
(true
) { Instant nowInstant = Instant.now();long
lastSecond = nowInstant.plusSeconds(-1
).getEpochSecond();long
nowSecond = nowInstant.getEpochSecond(); Set data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);for
(String item : data) { System.out.println("消費:"
+ item); } jedis.zremrangeByScore(_KEY, lastSecond, nowSecond); Thread.sleep(1000
); } } }/<code>
② 通過鍵空間通知
默認情況下 Redis 服務器端是不開啟鍵空間通知的,需要我們通過 config set notify-keyspace-events Ex 的命令手動開啟,開啟鍵空間通知後,我們就可以拿到每個鍵值過期的事件,我們利用這個機制實現了給每個人開啟一個定時任務的功能,實現代碼如下:
<code>import
redis.clients.jedis.Jedis;import
redis.clients.jedis.JedisPubSub;import
utils.JedisUtils;public
class
TaskExample
{public
static
final
String _TOPIC ="__keyevent@0__:expired"
;public
static
void
main
(String[] args)
{ Jedis jedis = JedisUtils.getJedis(); doTask(jedis); }public
static
void
doTask
(Jedis jedis)
{ jedis.psubscribe(new
JedisPubSub() {public
void
onPMessage
(String pattern, String channel, String message)
{ System.out.println("收到消息:"
+ message); } }, _TOPIC); } }/<code>
4.Netty 實現延遲任務
Netty 是由 JBOSS 提供的一個 Java 開源框架,它是一個基於 NIO 的客戶、服務器端的編程框架,使用 Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶、服務端應用。Netty 相當於簡化和流線化了網絡應用的編程開發過程,例如:基於 TCP 和 UDP 的 socket 服務開發。
可以使用 Netty 提供的工具類 HashedWheelTimer 來實現延遲任務,實現代碼如下。
首先在項目中添加 Netty 引用,配置如下:
<code><
dependency
><
groupId
>io.nettygroupId
><
artifactId
>netty-commonartifactId
><
version
>4.1.48.Finalversion
>dependency
>/<code>
Netty 實現的完整代碼如下:
<code>public
class
DelayTaskExample
{public
static
void
main
(String[] args
) { System.out
.println("程序啟動時間:"
+ LocalDateTime.now()); NettyTask(); }private
static
void
NettyTask
() { HashedWheelTimer timer =new
HashedWheelTimer(3
, TimeUnit.SECONDS,100
); TimerTask task =new
TimerTask() { @Override
public
void
run
(Timeout timeout
) throws Exception { System.out
.println("執行任務"
+" ,執行時間:"
+ LocalDateTime.now()); } }; timer.newTimeout(task,0
, TimeUnit.SECONDS); } }/<code>
以上程序執行的結果為:
程序啟動時間:2020-04-13T10:16:23.033
執行任務 ,執行時間:2020-04-13T10:16:26.118
HashedWheelTimer 是使用定時輪實現的,定時輪其實就是一種環型的數據結構,可以把它想象成一個時鐘,分成了許多格子,每個格子代表一定的時間,在這個格子上用一個鏈表來保存要執行的超時任務,同時有一個指針一格一格的走,走到那個格子時就執行格子對應的延遲任務,如下圖所示:
以上的圖片可以理解為,時間輪大小為 8,某個時間轉一格(例如 1s),每格指向一個鏈表,保存著待執行的任務。
5.MQ 實現延遲任務
如果專門開啟一個 MQ 中間件來執行延遲任務,就有點殺雞用宰牛刀般的奢侈了,不過已經有了 MQ 環境的話,用它來實現延遲任務的話,還是可取的。
幾乎所有的 MQ 中間件都可以實現延遲任務,在這裡更準確的叫法應該叫延隊列。本文就使用 RabbitMQ 為例,來看它是如何實現延遲任務的。
RabbitMQ 實現延遲隊列的方式有兩種:
- 通過消息過期後進入死信交換器,再由交換器轉發到延遲消費隊列,實現延遲功能;
- 使用 rabbitmq-delayed-message-exchange 插件實現延遲功能。
注意: 延遲插件
rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的,依賴 Erlang/OPT 18.0 及以上運行環境。
由於使用死信交換器比較麻煩,所以推薦使用第二種實現方式
rabbitmq-delayed-message-exchange 插件的方式實現延遲隊列的功能。
首先,我們需要下載並安裝
rabbitmq-delayed-message-exchange 插件,下載地址:
www.rabbitmq.com/community-p…
選擇相應的對應的版本進行下載,然後拷貝到 RabbitMQ 服務器目錄,使用命令 rabbitmq-plugins enable
rabbitmq_delayed_message_exchange 開啟插件,在使用命令 rabbitmq-plugins list 查詢安裝的所有插件,安裝成功如下圖所示:
最後重啟 RabbitMQ 服務,使插件生效。
首先,我們先要配置消息隊列,實現代碼如下:
<code>import
com.example.rabbitmq.mq.DirectConfig;import
org.springframework.amqp.core.*;import
org.springframework.context.annotation.Bean;import
org.springframework.context.annotation.Configuration;import
java.util.HashMap;import
java.util.Map;public
class
DelayedConfig
{final
static
String QUEUE_NAME ="delayed.goods.order"
;final
static
String EXCHANGE_NAME ="delayedec"
;public
Queuequeue
()
{return
new
Queue(DelayedConfig.QUEUE_NAME); }CustomExchange
customExchange
()
{ Map args =new
HashMap<>(); args.put("x-delayed-type"
,"direct"
);return
new
CustomExchange(DelayedConfig.EXCHANGE_NAME,"x-delayed-message"
,true
,false
, args); }Binding
binding
(Queue queue, CustomExchange exchange)
{return
BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs(); } }/<code>
然後添加增加消息的代碼,具體實現如下:
<code>import
org.springframework.amqp.AmqpException;import
org.springframework.amqp.core.AmqpTemplate;import
org.springframework.amqp.core.Message;import
org.springframework.amqp.core.MessagePostProcessor;import
org.springframework.beans.factory.annotation.Autowired;import
org.springframework.stereotype.Component;import
java.text.SimpleDateFormat;import
java.util.Date;public
class
DelayedSender
{private
AmqpTemplate rabbitTemplate;public
void
send
(String msg)
{ SimpleDateFormat sf =new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss"
); System.out.println("發送時間:"
+ sf.format(new
Date())); rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg,new
MessagePostProcessor() {public
MessagepostProcessMessage
(Message message)
throws
AmqpException { message.getMessageProperties().setHeader("x-delay"
,3000
);return
message; } }); } }/<code>
再添加消費消息的代碼:
<code>import
org.springframework.amqp.rabbit.annotation
.RabbitHandler;import
org.springframework.amqp.rabbit.annotation
.RabbitListener;import
org.springframework.stereotype.Component;import
java.text.SimpleDateFormat;import
java.util.Date;public
class
DelayedReceiver
{public
void process(String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"
); System.out
.println("接收時間:"
+ sdf.format(new Date())); System.out
.println("消息內容:"
+ msg); } }/<code>
最後,我們使用代碼測試一下:
<code>import
com.example.rabbitmq.RabbitmqApplication;import
com.example.rabbitmq.mq.delayed.DelayedSender;import
org.junit.Test;import
org.junit.runner.RunWith;import
org.springframework.beans.factory.annotation.Autowired;import
org.springframework.boot.test.context.SpringBootTest;import
org.springframework.test.context.junit4.SpringRunner;import
java.text.SimpleDateFormat;import
java.util.Date; @RunWith
(SpringRunner
.
class
) @SpringBootTest
public
class
DelayedTest
{ @Autowired
private
DelayedSender
sender; @Test
public
voidTest
()throws
InterruptedException
{SimpleDateFormat
sf = newSimpleDateFormat
("yyyy-MM-dd"
); sender.send("Hi Admin."
);Thread
.sleep(5
*1000
); } }/<code>
以上程序的執行結果如下:
發送時間:2020-04-13 20:47:51
接收時間:2020-04-13 20:47:54
消息內容:Hi Admin.
從結果可以看出,以上程序執行符合延遲任務的實現預期。
6.使用 Spring 定時任務
如果你使用的是 Spring 或 SpringBoot 的項目的話,可以使用藉助 Scheduled 來實現,本文將使用 SpringBoot 項目來演示 Scheduled 的實現,實現我們需要聲明開啟 Scheduled,實現代碼如下:
<code>@SpringBootApplication
@EnableScheduling
public class Application {public
static
void
main
(String[] args) {SpringApplication
.run
(Application.class, args); } }/<code>
然後添加延遲任務,實現代碼如下:
<code>public
class
ScheduleJobs
{ (fixedDelay =2
*1000
)public
void
fixedDelayJob
()
throws
InterruptedException { System.out.println("任務執行,時間:"
+ LocalDateTime.now()); } }/<code>
此時當我們啟動項目之後就可以看到任務以延遲了 2s 的形式一直循環執行,結果如下:
任務執行,時間:2020-04-13T14:07:53.349
任務執行,時間:2020-04-13T14:07:55.350
任務執行,時間:2020-04-13T14:07:57.351
...
我們也可以使用 Corn 表達式來定義任務執行的頻率,例如使用 @Scheduled(cron = "0/4 * * * * ?") 。
7.Quartz 實現延遲任務
Quartz 是一款功能強大的任務調度器,可以實現較為複雜的調度功能,它還支持分佈式的任務調度。
我們使用 Quartz 來實現一個延遲任務,首先定義一個執行任務代碼如下:
<code>import
org.quartz.JobExecutionContext;import
org.quartz.JobExecutionException;import
org.springframework.scheduling.quartz.QuartzJobBean;import
java.time.LocalDateTime;public
class
SampleJob
extends
QuartzJobBean
{protected
void
executeInternal
(JobExecutionContext jobExecutionContext)
throws
JobExecutionException { System.out.println("任務執行,時間:"
+ LocalDateTime.now()); } }/<code>
在定義一個 JobDetail 和 Trigger 實現代碼如下:
<code>import
org.quartz.*;import
org.springframework.context.annotation
.Bean;import
org.springframework.context.annotation
.Configuration;public
class
SampleScheduler
{public
JobDetail sampleJobDetail() {return
JobBuilder.newJob(SampleJob.
class
).withIdentity
("sampleJob"
) .storeDurably().build(); }public
Trigger sampleJobTrigger() { SimpleScheduleBuilder scheduleBuilder = SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3
).withRepeatCount(1
);return
TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger"
) .withSchedule(scheduleBuilder).build(); } }/<code>
最後在 SpringBoot 項目啟動之後開啟延遲任務,實現代碼如下:
<code>import
org.springframework.beans.factory.annotation.Autowired;import
org.springframework.boot.CommandLineRunner;import
org.springframework.scheduling.quartz.SchedulerFactoryBean;public
class
MyStartupRunner
implements
CommandLineRunner
{private
SchedulerFactoryBean schedulerFactoryBean;private
SampleScheduler sampleScheduler;public
void
run
(String... args)
throws
Exception { schedulerFactoryBean.getScheduler().scheduleJob( sampleScheduler.sampleJobTrigger()); } }/<code>
以上程序的執行結果如下:
2020-04-13 19:02:12.331 INFO 17768 --- [ restartedMain] com.example.demo.DemoApplication : Started DemoApplication in 1.815 seconds (JVM running for 3.088)
任務執行,時間:2020-04-13T19:02:15.019
從結果可以看出在項目啟動 3s 之後執行了延遲任務。
總結
本文講了延遲任務的使用場景,以及延遲任務的 10 種實現方式:
- 手動無線循環;
- ScheduledExecutorService;
- DelayQueue;
- Redis zset 數據判斷的方式;
- Redis 鍵空間通知的方式;
- Netty 提供的 HashedWheelTimer 工具類;
- RabbitMQ 死信隊列;
- RabbitMQ 延遲消息插件 rabbitmq-delayed-message-exchange;
- Spring Scheduled;
- Quartz。
作者:Java中文社群
原文鏈接:
https://juejin.im/post/5e95248551882573725049dc
關鍵字: springframework org void