任務調度應用場景
所謂的任務調度是指安排任務的執行計劃,即何時執行,怎麼執行等。在現實項目中經常出現它們的身影;特別是數據類項目,比如實時統計每5分鐘網站的訪問量,就需要每5分鐘定時從日誌數據分析訪問量。
總結下任務調度應用場景:
- 離線作業調度:按時間粒度執行某項任務
- 共享緩存更新:定時刷新緩存,如redis緩存;不同進程間的共享數據
任務調度工具
- linux的crontab, 支持按照分鐘/小時/天/月/周粒度,執行任務
- java的Quartz
- windows的任務計劃
本文介紹的是python中的任務調度庫,APScheduler(advance python scheduler)。如果你瞭解Quartz的話,可以看出APScheduler是Quartz的python實現;APScheduler提供了基於時間,固定時間點和crontab方式的任務調用方案, 可以當作一個跨平臺的調度工具來使用。
APScheduler
組件介紹
APScheduler由5個部分組成:觸發器、調度器、任務存儲器、執行器和任務事件。
- 任務job:任務id和任務執行func
- 觸發器triggers:確定任務何時開始執行
- 任務存儲器job stores: 保存任務的狀態
- 執行器executors:確定任務怎麼執行
- 任務事件event:監控任務執行異常情況
- 調度器schedulers:串聯任務的整個生命週期,添加編輯任務到任務存儲器,在任務的執行時間到來時,把任務交給執行器執行返回結果;同時發出事件監聽,監控任務事件 。
安裝
<code>pip install apscheduler
/<code>
簡單例子
<code>from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging
import datetime
# 任務執行函數
def job_func(job_id):
print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 事件監聽
def job_exception_listener(event):
if event.exception:
# todo:異常處理, 告警等
print('The job crashed :(')
else:
print('The job worked :)')
# 日誌
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
# 定義一個後臺任務非阻塞調度器
scheduler = BackgroundScheduler()
# 添加一個任務到內存中
# 觸發器:trigger='interval' seconds=10 每10s觸發執行一次
# 執行器:executor='default' 線程執行
# 任務存儲器:jobstore='default' 默認內存存儲
# 最大併發數:max_instances
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
# 設置任務監聽
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 啟動調度器
scheduler.start()
/<code>
運行情況:
<code>job 1 is runed at 2020-03-21 20:00:38
The job worked :)
job 1 is runed at 2020-03-21 20:00:48
The job worked :)
job 1 is runed at 2020-03-21 20:00:58
The job worked :)
/<code>
觸發器
觸發器決定何時執行任務,APScheduler支持的觸發器有3種
- trigger='interval':按固定時間週期執行,支持weeks,days,hours,minutes, seconds, 還可指定時間範圍
<code>sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
/<code>
- trigger='date': 固定時間,執行一次
<code>sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
/<code>
- trigger='cron': 支持crontab方式,執行任務
- 參數:分鐘/小時/天/月/周粒度,也可指定時間範圍
<code>year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of the (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
/<code>
- 例子
<code># 星期一到星期五,5點30執行任務job_function,直到2014-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
# 按照crontab格式執行, 格式為:分鐘 小時 天 月 周,*表示所有
# 5月到8月的1號到15號,0點0分執行任務job_function
sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))
/<code>
執行器
執行器決定如何執行任務;APScheduler支持4種不同執行器,常用的有pool(線程/進程)和gevent(io多路複用,支持高併發),默認為pool中線程池, 不同的執行器可以在調度器的配置中進行配置(見調度器)
- apscheduler.executors.asyncio:同步io,阻塞
- apscheduler.executors.gevent:io多路複用,非阻塞
- apscheduler.executors.pool: 線程ThreadPoolExecutor和進程ProcessPoolExecutor
- apscheduler.executors.twisted:基於事件驅動
任務存儲器
任務存儲器決定任務的保存方式, 默認存儲在內存中(MemoryJobStore),重啟後就沒有了。APScheduler支持的任務存儲器有:
- apscheduler.jobstores.memory:內存
- apscheduler.jobstores.mongodb:存儲在mongodb
- apscheduler.jobstores.redis:存儲在redis
- apscheduler.jobstores.rethinkdb:存儲在rethinkdb
- apscheduler.jobstores.sqlalchemy:支持sqlalchemy的數據庫如mysql,sqlite等
- apscheduler.jobstores.zookeeper:zookeeper
不同的任務存儲器可以在調度器的配置中進行配置(見調度器)
調度器
APScheduler支持的調度器方式如下,比較常用的為BlockingScheduler和BackgroundScheduler
- BlockingScheduler:適用於調度程序是進程中唯一運行的進程,調用start函數會阻塞當前線程,不能立即返回。
- BackgroundScheduler:適用於調度程序在應用程序的後臺運行,調用start後主線程不會阻塞。
- AsyncIOScheduler:適用於使用了asyncio模塊的應用程序。
- GeventScheduler:適用於使用gevent模塊的應用程序。
- TwistedScheduler:適用於構建Twisted的應用程序。
- QtScheduler:適用於構建Qt的應用程序。
從前面的例子,我們可以看到,調度器可以操作任務(併為任務指定觸發器、任務存儲器和執行器)和監控任務。
<code>scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
/<code>
我們來詳細看下各個部分
- 調度器配置:在add_job我們看到jobstore和executor都是default,APScheduler在定義調度器時可以指定不同的任務存儲和執行器,以及初始的參數
<code>from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# 通過dict方式執行不同的jobstores、executors和默認的參數
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
# 定義調度器
scheduler = BackgroundScheduler(jobstoresjobstores=jobstores, executorsexecutors=executors, job_defaultsjob_defaults=job_defaults, timezone=utc)
def job_func(job_id):
print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 添加任務
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='processpool', seconds=10)
# 啟動調度器
scheduler.start()
/<code>
- 操作任務:調度器可以增加,刪除,暫停,恢復和修改任務。需要注意的是這裡的操作只是對未執行的任務起作用,已經執行和正在執行的任務不受這些操作的影響。
- add_job
<code>scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
/<code>
- remove_job: 通過任務唯一的id,刪除的時候對應的任務存儲器裡記錄也會刪除
<code>scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
/<code>
- Pausing and resuming jobs:暫停和重啟任務
<code>scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.pause_job('my_job_id')
scheduler.resume_job('my_job_id')
/<code>
- Modifying jobs:修改任務的配置
<code>job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10)
# 修改任務的屬性
job.modify(max_instances=6, name='Alternate name')
# 修改任務的觸發器
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
/<code>
- 監控任務事件類型,比較常用的類型有: EVENT_JOB_ERROR: 表示任務在執行過程的出現異常觸發 EVENT_JOB_EXECUTED:任務執行成功時 EVENT_JOB_MAX_INSTANCES:調度器上執行的任務超過配置的參數時
<code>scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
/<code>
閱讀更多 sandag 的文章