Python 的異步 IO之Asyncio 簡介

所謂「異步 IO」,就是你發起一個 IO 操作,卻不用等它結束,你可以繼續做其他事情,當它結束時,你會得到通知。

Asyncio 是併發(concurrency)的一種方式。對 Python 來說,併發還可以通過線程(threading)和多進程(multiprocessing)來實現。

Asyncio 並不能帶來真正的並行(parallelism)。當然,因為 GIL(全局解釋器鎖)的存在,Python 的多線程也不能帶來真正的並行。

可交給 asyncio 執行的任務,稱為協程(coroutine)。一個協程可以放棄執行,把機會讓給其它協程(即 yield from 或 await)。

定義協程

協程的定義,需要使用 async def 語句。

async def do_some_work(x): pass

do_some_work 便是一個協程。

準確來說,do_some_work 是一個協程函數,可以通過 asyncio.iscoroutinefunction 來驗證:

print(asyncio.iscoroutinefunction(do_some_work)) # True

這個協程什麼都沒做,我們讓它睡眠幾秒,以模擬實際的工作量 :

async def do_some_work(x):

print("Waiting " + str(x))

await asyncio.sleep(x)

在解釋 await 之前,有必要說明一下協程可以做哪些事。協程可以:

* 等待一個 future 結束

* 等待另一個協程(產生一個結果,或引發一個異常)

* 產生一個結果給正在等它的協程

* 引發一個異常給正在等它的協程

asyncio.sleep 也是一個協程,所以 await asyncio.sleep(x) 就是等待另一個協程。可參見 asyncio.sleep 的文檔:

sleep(delay, result=None, *, loop=None)

Coroutine that completes after a given time (in seconds).

運行協程

調用協程函數,協程並不會開始運行,只是返回一個協程對象,可以通過 asyncio.iscoroutine 來驗證:

print(asyncio.iscoroutine(do_some_work(3))) # True

此處還會引發一條警告:

async1.py:16: RuntimeWarning: coroutine 'do_some_work' was never awaited

print(asyncio.iscoroutine(do_some_work(3)))

要讓這個協程對象運行的話,有兩種方式:

* 在另一個已經運行的協程中用 `await` 等待它

* 通過 `ensure_future` 函數計劃它的執行

簡單來說,只有 loop 運行了,協程才可能運行。

下面先拿到當前線程缺省的 loop ,然後把協程對象交給 loop.run_until_complete,協程對象隨後會在 loop 裡得到運行。

loop = asyncio.get_event_loop()

loop.run_until_complete(do_some_work(3))

run_until_complete 是一個阻塞(blocking)調用,直到協程運行結束,它才返回。這一點從函數名不難看出。

run_until_complete 的參數是一個 future,但是我們這裡傳給它的卻是協程對象,之所以能這樣,是因為它在內部做了檢查,通過 ensure_future 函數把協程對象包裝(wrap)成了 future。所以,我們可以寫得更明顯一些:

loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

完整代碼:

import asyncio

async def do_some_work(x):

print("Waiting " + str(x))

await asyncio.sleep(x)

loop = asyncio.get_event_loop()

loop.run_until_complete(do_some_work(3))

運行結果:

Waiting 3

回調

假如協程是一個 IO 的讀操作,等它讀完數據後,我們希望得到通知,以便下一步數據的處理。這一需求可以通過往 future 添加回調來實現。

def done_callback(futu):

print('Done')

futu = asyncio.ensure_future(do_some_work(3))

futu.add_done_callback(done_callback)

loop.run_until_complete(futu)

多個協程

實際項目中,往往有多個協程,同時在一個 loop 裡運行。為了把多個協程交給 loop,需要藉助 asyncio.gather 函數。

loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

或者先把協程存在列表裡:

coros = [do_some_work(1), do_some_work(3)]

loop.run_until_complete(asyncio.gather(*coros))

運行結果:

Waiting 3

Waiting 1

Done

這兩個協程是併發運行的,所以等待的時間不是 1 + 3 = 4 秒,而是以耗時較長的那個協程為準。

參考函數 gather 的文檔:

gather(*coros_or_futures, loop=None, return_exceptions=False)

Return a future aggregating results from the given coroutines or futures.

發現也可以傳 futures 給它:

futus = [asyncio.ensure_future(do_some_work(1)),

asyncio.ensure_future(do_some_work(3))]

loop.run_until_complete(asyncio.gather(*futus))

gather 起聚合的作用,把多個 futures 包裝成單個 future,因為 loop.run_until_complete 只接受單個 future。

run_until_complete 和 run_forever

我們一直通過 run_until_complete 來運行 loop ,等到 future 完成,run_until_complete 也就返回了。

async def do_some_work(x):

print('Waiting ' + str(x))

await asyncio.sleep(x)

print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3)

loop.run_until_complete(coro)

輸出:

Waiting 3

Done

現在改用 run_forever:

async def do_some_work(x):

print('Waiting ' + str(x))

await asyncio.sleep(x)

print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3)

asyncio.ensure_future(coro)

loop.run_forever()

輸出:

Waiting 3

Done

三秒鐘過後,future 結束,但是程序並不會退出。run_forever 會一直運行,直到 stop 被調用,但是你不能像下面這樣調 stop:

loop.run_forever()

loop.stop()

run_forever 不返回,stop 永遠也不會被調用。所以,只能在協程中調 stop:

async def do_some_work(loop, x):

print('Waiting ' + str(x))

await asyncio.sleep(x)

print('Done')

loop.stop()

這樣並非沒有問題,假如有多個協程在 loop 裡運行:

asyncio.ensure_future(do_some_work(loop, 1))

asyncio.ensure_future(do_some_work(loop, 3))

loop.run_forever()

第二個協程沒結束,loop 就停止了——被先結束的那個協程給停掉的。

要解決這個問題,可以用 gather 把多個協程合併成一個 future,並添加回調,然後在回調裡再去停止 loop。

async def do_some_work(loop, x):

print('Waiting ' + str(x))

await asyncio.sleep(x)

print('Done')

def done_callback(loop, futu):

loop.stop()

loop = asyncio.get_event_loop()

futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3))

futus.add_done_callback(functools.partial(done_callback, loop))

loop.run_forever()

其實這基本上就是 run_until_complete 的實現了,run_until_complete 在內部也是調用 run_forever。

Close Loop?

以上示例都沒有調用 loop.close,好像也沒有什麼問題。所以到底要不要調 loop.close 呢?

簡單來說,loop 只要不關閉,就還可以再運行:

loop.run_until_complete(do_some_work(loop, 1))

loop.run_until_complete(do_some_work(loop, 3))

loop.close()

但是如果關閉了,就不能再運行了:

loop.run_until_complete(do_some_work(loop, 1))

loop.close()

loop.run_until_complete(do_some_work(loop, 3)) # 此處異常

建議調用 loop.close,以徹底清理 loop 對象防止誤用。

gather vs. wait

asyncio.gather 和 asyncio.wait 功能相似。

coros = [do_some_work(loop, 1), do_some_work(loop, 3)]

loop.run_until_complete(asyncio.wait(coros))

具體差別可請參見 StackOverflow 的討論:Asyncio.gather vs asyncio.wait。

Timer

C++ Boost.Asio 提供了 IO 對象 timer,但是 Python 並沒有原生支持 timer,不過可以用 asyncio.sleep 模擬。

async def timer(x, cb):

futu = asyncio.ensure_future(asyncio.sleep(x))

futu.add_done_callback(cb)

await futu

t = timer(3, lambda futu: print('Done'))

loop.run_until_complete(t)


分享到:


相關文章: