06.22 Python 模塊 多進程 Multiprocessing 通訊

Python 模塊 多進程 Multiprocessing 通訊

有時,在多進程之間需要彼此通訊,例如把他們的結果聚合起來。最簡單的方式是使用 Queue 傳遞消息,任何可以由 pickle 序列化的對象都可以使用 Queue 傳遞

Python 模塊 多進程 Multiprocessing 通訊

執行:

Python 模塊 多進程 Multiprocessing 通訊

這個例子把一個對象 A() 放到 Queue 裡,然後在進程中,通過 Queue 的 get() 方法獲取到對象,執行 do_work() 方法。

下面的例子比較複雜,工作進程從 JoinableQueue 獲取數據,然後返回數據給父進程。

Python 模塊 多進程 Multiprocessing 通訊

執行:

Python 模塊 多進程 Multiprocessing 通訊

這個例子中個,創建了 num_consumers 消費者進程,消費者進程開啟之後,創建了10個任務工作任務,消費者進程處理這10個任務。

最後,又創建了和消費者數量相等的任務 None,用於標識每個消費者進程結束的標誌。run() 方法中,判斷下一個任務為 None 時,結束消費者進程。

進程間發送信號


Event 類提供了一個簡單的方式,在進程間發送信號。它內部包含兩種狀態,已設置和未設置。使用 Event 對象,可以一直等待或者等待特定的秒,直到 Event 內部狀態變為已設置。

Python 模塊 多進程 Multiprocessing 通訊

執行:

Python 模塊 多進程 Multiprocessing 通訊

wait() 默認是阻塞的,可以傳遞一個超時的秒數,變為非阻塞。本例中,非阻塞的進程休眠了2秒,因為主進程休眠3秒,Event 才設置,所以非阻塞的進程 Event.is_set() 返回 False。

控制資源訪問


有時候,資源是在多個進程中共享的。Lock 用來控制訪問共享的資源,同一時間只能有一個進程訪問。

Python 模塊 多進程 Multiprocessing 通訊

執行:

Python 模塊 多進程 Multiprocessing 通訊

這個例子中,sys.stdout 同一時間只能有一個進程訪問。使用 Lock 進行訪問控制。因為 Lock 實現了上下文接口API,所以使用 with 和 try finally 效果一樣,進程結束都會確保釋放鎖。

同步操作


可以使用 Condition 對象同步併發操作的一部分,這樣既可以利用併發的優勢,又可以使規定的一部分按順序執行。

Python 模塊 多進程 Multiprocessing 通訊

執行:

Python 模塊 多進程 Multiprocessing 通訊

wait() 函數一直等待,直到進程 p1開始運行,調用 nofity_all() 方法後,3個客戶進程才開始運行。

控制對資源的訪問


有時候,允許多個進程同時訪問某個資源,只是限制進程的數量。例如一個連接池支持多個進程同時連接,或者一個網絡應用支持一定數量的客戶端併發(concurrent)下載。使用 Semaphore 可以實現。

Python 模塊 多進程 Multiprocessing 通訊

執行:

Python 模塊 多進程 Multiprocessing 通訊

這個例子中, ActivePool 只是跟蹤當前同時運行的進程。可以看到,由於實例化 Semaphore(2) 時,只限制2個進程可以同時訪問,所以當前運行的進程從來沒有超過2個。

最後,遍歷每個進程,使用的是非阻塞的 join() 方法,用 alive 變量跟蹤活動的進程數,如果等於0時,進程都執行完畢,退出 while 無限循環。

管理共享狀態


上面的例子中,使用了 Manager 的 list 來管理某一時間點活動的所有進程名稱。Manager 的目的就是管理進程間共享的數據(shared information data)

Python 模塊 多進程 Multiprocessing 通訊

執行:

Python 模塊 多進程 Multiprocessing 通訊

本例中,用了字典 dict 存儲共享數據。

共享命名空間


Manager 除了創建列表 list 和 字段 dict 外,還可以創建命名空間 Namespace

Python 模塊 多進程 Multiprocessing 通訊

執行:

Python 模塊 多進程 Multiprocessing 通訊

進程 p 的 Event 設置完之後,進程 p2 就會獲取到命名空間的 value 值了。

如果修改命名空間的可更改(mutable)的數據結構的時候(例如列表 list),是不會自動傳遞給其他進程的。

Python 模塊 多進程 Multiprocessing 通訊

執行:

Python 模塊 多進程 Multiprocessing 通訊

調用 ns.data.append() 方法並不會傳遞更改後的值給其他進程,如果想更改的話,需要直接設置:

Python 模塊 多進程 Multiprocessing 通訊

進程池(Process Pool)


類 Process 可以管理一定數量的進程,這樣可以使工作分配給各個進程,然後在一起返回結果。

Python 模塊 多進程 Multiprocessing 通訊

執行:

Python 模塊 多進程 Multiprocessing 通訊

在這個例子中,使用內置的 map() 函數和使用 Pool 的 map() 方法結果是一致的。初始化進程池 Pool 的時候,傳遞了2個參數,processes 指定了進程數量,initializer 指定了每個進程首先調用的初始化函數。


分享到:


相關文章: