Airflow中的路徑選擇

Airflow 是 Airbnb 公司開源的任務調度系統, 通過使用 Python 開發 DAG, 非常方便的調度計算任務。

Airflow之前,當多個上下游的任務間存在依賴時,需要我們自己來寫一些調度的code,常用的有幾種:

  • 上游任務完成後置個done標記。需要下游隔段時間檢查上游任務有沒有完成。
  • 上游完成任務後通過某個api來call下游。你說call 失敗了?retry大法用起來呀​​。。。

模塊太多,依賴太複雜,記不住怎麼辦?寫個文檔吧。。。上游改了??改文檔吧。。。下游又變了???再改。。。哎呀,上線來不及了,過幾天更新吧。。。。那個文檔很久沒更新了,還是看code吧。。。

而Airflow使用DAG,可以很方便的通過定義Operator,及Operator間的關係(upstream, downstream)來解決任務之間的各種依賴。同時,它還提供了UI來查看任務依賴的DAG,任務運行狀態。簡單的說,它可以使我們更多的關注到業務的實現上而不是各種複雜的模塊調用及繁瑣的文檔維護。

當然,作為一條完整的業務流,根據上游的執行結果來決定下游的行為是最常見不過的事了。比如的例子:

有個淘氣的Sam小盆友,每天按時 完成作業 是個大難題。於是,媽媽想了個辦法:

Airflow中的路徑選擇

branch

真是個不錯的法子!這裡,8點前寫完在Airflow裡就是個判斷用的task,由它來決定整個DAG的下一步操作。那麼,怎麼實現呢?Airflow提供了BranchPythonOperator,用來支持分支, 通過函數返回要執行的分支。

<code>check_hour_op = BranchPythonOperator(
 task_id = "check_hour",
 python_callable = check_hour,
 dag = dag
)
def check_hour(){
 if current_hour > 8:
   return "sleep"
 else:
   return "watch_animation"
}
do_homework_op >> check_hour_op >> [watch_animation_op, sleep_op]
watch_animation_op >> sleep_op/<code>

Ok 簡直太簡單了!執行一下試試,然後你會發現,當check_hour返回watch_animation時,sleep這個task的狀態永遠是skip的???讓我們翻一下官方的解釋:

Note that using tasks with depends_on_past=True downstream from BranchPythonOperator is logically unsound as skipped status will invariably lead to block tasks that depend on their past successes. skipped states propagates where all directly upstream tasks are skipped.

就是說,sleep這個task在check_hour選擇了watch_animation的時候就被skip掉了,即便這裡它是watch_animation的下游任務,仍然不會被執行。原因呢?官方說是downstream設了depends_on_past=True。要知道,operator中還有個屬性叫trigger_rule,默認值是all_success。這兩個組合在一起就要上游的task狀態都為success才能繼續往下執行。而這裡sleep的上游任務watch_animation狀態是skipped, 所以sleep就沒有被執行到了。那麼,如果這裡我們把sleep的trigger_rule置成none_failed的話,整個過程就能按我們預期的執行下去了:

<code>sleep_op = PythonOperator(
 task_id = "sleep",
 trigger_rule = "none_failed",
 python_callable = "sleep",
 dag = dag
)/<code>

再試一下,果然可以啦

Airflow現在支持的Trigger Rules:

ALL_SUCCESS = 'all_success'

ALL_FAILED = 'all_failed'

ALL_DONE = 'all_done'

ONE_SUCCESS = 'one_success'

ONE_FAILED = 'one_failed'

NONE_FAILED = 'none_failed'

NONE_SKIPPED = 'none_skipped'

DUMMY = 'dummy'

除了上面這個方法,我們也可以利用DummyOperator來達到想要的效果。也就是在watch_animation平行的分支上加一個dummytask,它啥活都不幹:

<code>dummp_op = DummyOperator(
task_id = "dummy",
 dag = dag
)
do_homework_op >> check_hour_op >> [watch_animation_op, dummp_op]
watch_animation_op >> sleep_op
dummp_op >> sleep_op/<code>

這種方式更簡單易懂,代價就是會稍稍增加些代碼,喜歡哪個就仁者見仁了,歡迎留言討論。



今天的內容就到這了,如果覺得對你有幫助的話,請幫忙轉發~

更多技術文章,可關注我的微信號KiwiHome獲取


分享到:


相關文章: