APScheduler 簡介

一、簡介

APScheduler是類似Quartz的一個Python定時任務框架,實現了Quartz的所有功能,使用起來十分方便。提供了基於日期、固定時間間隔以及crontab類型的任務,並且可以持久化任務。如果將任務持久化到數據庫中,那麼它們也將在調度程序在重新啟動後繼續運行並保持其狀態。重啟啟動調度程序後,它將運行它在脫機時運行的所有任務。

除此之外,APScheduler還可以用作特定於平臺的調度程序(如cron守護程序或Windows任務調度程序)的跨平臺,特定於應用程序的替代程序。但請注意,APScheduler本身不是守護程序或服務,也不附帶任何命令行工具。它主要用於在現有應用程序中運行。也就是說,APScheduler確實為我們提供了一些構建塊來構建調度程序服務或運行專用的調度程序進程。

APScheduler 簡介

官方地址:https://github.com/agronholm/apscheduler

二、使用

2.1、簡單的例子

#!usr/bin/env python3
# -*- coding:utf-8 _*-
from datetime import datetime
import time
import os
from apscheduler.schedulers.background import BackgroundScheduler

def tick():
print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
# 1、創建後臺執行的 schedulers
scheduler = BackgroundScheduler()
# 2、添加調度任務,觸發器選擇 interval(間隔),每隔3秒執行一次
scheduler.add_job(tick, 'interval', seconds=3)
# 3、啟動調度任務
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
# This is here to simulate application activity (which keeps the main thread alive).
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but should be done if possible
scheduler.shutdown()

jitter: 振動參數,給每次觸發添加一個隨機浮動秒數,一般適用於多服務器,避免同時運行造成服務擁堵。

# 每小時(上下浮動120秒區間內)運行`tick` 

sched.add_job(tick, 'interval', hours=1, jitter=120)

2.2、自定義配置

方法一:

#!usr/bin/env python3
# -*- coding:utf-8 _*-
from pytz import utc
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
def tick():
print('Tick! The time is: %s' % datetime.now())
# 選擇MongoDB作為任務存儲數據庫
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 默認使用線程池
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
# 默認參數配置
job_defaults = {
'coalesce': False, # 積攢的任務是否只跑一次,是否合併所有錯過的Job
'max_instances': 3, # 默認同一時刻只能有一個實例運行,通過max_instances=3修改為3個。
'misfire_grace_time': 30 # 30秒的任務超時容錯
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
scheduler.add_job(tick, 'interval', seconds=3)
scheduler.start()

方法二:

from apscheduler.schedulers.background import BackgroundScheduler 

# The "apscheduler." prefix is hard coded
scheduler = BackgroundScheduler({
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
})

方法三:

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler()
# ..這裡可以添加任務
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

misfire_grace_time:如果一個job本來14:00有一次執行,但是由於某種原因沒有被調度上,現在14:01了,這個14:00的運行實例被提交時,會檢查它預訂運行的時間和當下時間的差值(這裡是1分鐘),大於我們設置的30秒限制,那麼這個運行實例不會被執行。

合併:最常見的情形是scheduler被shutdown後重啟,某個任務會積攢了好幾次沒執行如5次,下次這個job被submit給executor時,執行5次。將coalesce=True後,只會執行一次

replace_existing:如果在程序初始化時,是從數據庫讀取任務的,那麼必須為每個任務定義一個明確的ID,並且使用replace_existing=True,否則每次重啟程序,你都會得到一份新的任務拷貝,也就意味著任務的狀態不會保存。

2.3、調度器的操作

scheduler.add_job() 
scheduler.modify_job()
scheduler.remove_job()
scheduler.pause_job()
scheduler.resume_job()
scheduler.reschedule_job()
scheduler.start()
scheduler.shutdown()
scheduler.get_jobs()
scheduler.remove_all_jobs()

添加job,所需要的屬性
add_job(func, trigger=None, args=None, kwargs=None, id=None, name=None,
misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
next_run_time=undefined, jobstore='default', executor='default',

replace_existing=False, **trigger_args):
job_kwargs = {
'trigger': self._create_trigger(trigger, trigger_args),
'executor': executor,
'func': func,
'args': tuple(args) if args is not None else (),
'kwargs': dict(kwargs) if kwargs is not None else {},
'id': id,
'name': name,
'misfire_grace_time': misfire_grace_time,
'coalesce': coalesce,
'max_instances': max_instances,
'next_run_time': next_run_time
}
job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if
value is not undefined)
job = Job(self, **job_kwargs)

2.4、調度監聽

def my_listener(event):
if event.exception:
print('The job() crashed :('.format(event.job_id)) # or logger.fatal('The job crashed :(')
else:
print('The job() worked :)'.format(event.job_id))
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

2.5、夏令時問題

有些`timezone`時區可能會有夏令時的問題。這個可能導致令時切換時,任務不執行或任務執行兩次。避免這個問題,可以使用UTC時間,或提前預知並規劃好執行的問題。

# 在Europe/Helsinki時區, 在三月最後一個週一就不會觸發;在十月最後一個週一會觸發兩次
sched.add_job(job_function, 'cron', hour=3, minute=30)


分享到:


相關文章: