02.21 取代 Python 多進程!伯克利開源分佈式框架 Ray

取代 Python 多進程!伯克利開源分佈式框架 Ray

Ray 由伯克利開源,是一個用於並行計算和分佈式 Python 開發的開源項目。本文將介紹如何使用 Ray 輕鬆構建可從筆記本電腦擴展到大型集群的應用程序。

並行和分佈式計算是現代應用程序的主要內容。我們需要利用多個核心或多臺機器來加速應用程序或大規模運行它們。網絡爬蟲和搜索所使用的基礎設施並不是在某人筆記本電腦上運行的單線程程序,而是相互通信和交互的服務的集合。

取代 Python 多進程!伯克利開源分佈式框架 Ray


雲計算承諾在所有維度上(內存、計算、存儲等)實現無限的可擴展性。實現這一承諾需要新的工具進行雲編程和構建分佈式應用程序。

為什麼要使用 Ray?

很多教程解釋瞭如何使用 Python 的多進程模塊(https://docs.python.org/2/library/multiprocessing.html)。遺憾的是,多進程模塊在處理現代應用程序的要求方面存在嚴重的短板。這些要求包括以下這些內容:

  • 在多臺計算機上運行相同的代碼。
  • 構建有狀態且可以與之通信的微服務和 actor。
  • 優雅地處理機器故障。
  • 有效處理大對象和數值數據。

Ray(https://github.com/ray-project/ray)解決了所有這些問題,在保持簡單性的同時讓複雜的行為成為可能。

取代 Python 多進程!伯克利開源分佈式框架 Ray


必要的概念

傳統編程依賴於兩個核心概念:函數和類。使用這些構建塊就可以構建出無數的應用程序。

但是,當我們將應用程序遷移到分佈式環境時,這些概念通常會發生變化。

一方面,OpenMPI、Python 多進程和 ZeroMQ 等工具提供了用於發送和接收消息的低級原語。這些工具非常強大,但它們提供了不同的抽象,因此要使用它們就必須從頭開始重寫單線程應用程序。

另一方面,我們也有一些特定領域的工具,例如用於模型訓練的 TensorFlow、用於數據處理且支持 SQL 的 Spark,以及用於流式處理的 Flink。這些工具提供了更高級別的抽象,如神經網絡、數據集和流。但是,因為它們與用於串行編程的抽象不同,所以要使用它們也必須從頭開始重寫應用程序。

取代 Python 多進程!伯克利開源分佈式框架 Ray


用於分佈式計算的工具

Ray 佔據了一個獨特的中間地帶。它並沒有引入新的概念,而是採用了函數和類的概念,並將它們轉換為分佈式的任務和 actor。Ray 可以在不做出重大修改的情況下對串行應用程序進行並行化。

開始使用 Ray

ray.init() 命令將啟動所有相關的 Ray 進程。在切換到集群時,這是唯一需要更改的行(我們需要傳入集群地址)。這些過程包括:

  • 有很多 worker 進程並行執行 Python 函數(大概是每個 CPU 核心對應一個 worker)。
  • 用於將“任務”分配給 worker(以及其他計算機)的調度程序進程。任務是 Ray 調度的工作單元,對應於一個函數調用或方法調用。
  • 共享內存對象存儲庫,用於在 worker 之間有效地共享對象(無需創建副本)。
  • 內存數據庫,用於存儲在發生機器故障時重新運行任務所需的元數據。

Ray worker 是獨立的進程,而不是線程,因為在 Python 中存在全局解釋器鎖,所以對多線程的支持非常有限。

並行任務

要將 Python 函數 f 轉換為一個“遠程函數”(可以遠程和異步執行的函數),可以使用 @ray.remote 裝飾器來聲明這個函數。然後函數調用 f.remote() 將立即返回一個 future(future 是對最終輸出的引用),實際的函數執行將在後臺進行(我們將這個函數執行稱為任務)。

複製代碼

import ray
import time

# Start Ray.
ray.init()

@ray.remote
def f(x):
time.sleep(1)
return x

# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
result_ids.append(f.remote(i))

# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_ids) # [0, 1, 2, 3]

在 Python 中運行並行任務的代碼

因為對 f.remote(i) 的調用會立即返回,所以運行這行代碼四次就可以並行執行 f 的四個副本。

任務依賴

一個任務還可以依賴於其他任務。在下面的代碼中,multiply_matrices 任務依賴兩個 create_matrix 任務的輸出,因此在執行前兩個任務之前它不會先執行。前兩個任務的輸出將自動作為參數傳給第三個任務,future 將被替換為相應的值。通過這種方式,任務可以按照任意的 DAG 依賴關係組合在一起。

複製代碼

import numpy as np

@ray.remote
def create_matrix(size):
return np.random.normal(size=size)

@ray.remote
def multiply_matrices(x, y):
return np.dot(x, y)

x_id = create_matrix.remote([1000, 1000])
y_id = create_matrix.remote([1000, 1000])
z_id = multiply_matrices.remote(x_id, y_id)

# Get the results.
z = ray.get(z_id)

這裡有三個任務,其中第三個任務依賴前兩個任務的輸出

有效地對值進行聚合

我們可以以更復雜的方式使用任務依賴。例如,假設我們希望將 8 個值聚合在一起。在我們的示例中,我們將進行整數加法,但在很多應用程序中,跨多臺計算機聚合大型向量可能會造成性能瓶頸。在這個時候,只要修改一行代碼就可以將聚合的運行時間從線性降為對數級別,即聚合值的數量。

取代 Python 多進程!伯克利開源分佈式框架 Ray


左側的依賴圖深度為 7,右側的依賴圖深度為 3。計算產生相同的結果,但右側的依賴圖執行得更快。

如上所述,要將一個任務的輸出作為輸入提供給後續任務,只需將第一個任務返回的 future 作為參數傳給第二個任務。Ray 的調度程序會自動考慮任務依賴關係。在第一個任務完成之前不會執行第二個任務,第一個任務的輸出將自動被髮送給執行第二個任務的機器。

複製代碼

import time

@ray.remote
def add(x, y):
time.sleep(1)
return x + y

# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)

# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)

以線性方式聚合值與以樹形結構方式聚合值的對比

上面的代碼非常清晰,但請注意,這兩種方法都可以使用 while 循環來實現,這種方式更為簡潔。

複製代碼

# Slow approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
values = [add.remote(values[0], values[1])] + values[2:]

result = ray.get(values[0])

# Fast approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
values = values[2:] + [add.remote(values[0], values[1])]
result = ray.get(values[0])

更簡潔的聚合實現方案。兩個代碼塊之間的唯一區別是“add.remote”的輸出是放在列表的前面還是後面。

從類到 actor

在不使用類的情況下開發有趣的應用程序很具挑戰性,在分佈式環境中也是如此。

你可以使用 @ray.remote 裝飾器聲明一個 Python 類。在實例化類時,Ray 會創建一個新的“actor”,這是一個運行在集群中並持有類對象副本的進程。對這個 actor 的方法調用轉變為在 actor 進程上運行的任務,並且可以訪問和改變 actor 的狀態。通過這種方式,可以在多個任務之間共享可變狀態,這是遠程函數無法做到的。

各個 actor 按順序執行方法(每個方法都是原子方法),因此不存在競態條件。可以通過創建多個 actor 來實現並行性。

複製代碼

@ray.remote
class Counter(object):
def __init__(self):
self.x = 0


def inc(self):
self.x += 1

def get_value(self):
return self.x

# Create an actor process.
c = Counter.remote()

# Check the actor's counter value.
print(ray.get(c.get_value.remote())) # 0

# Increment the counter twice and check the value again.
c.inc.remote()
c.inc.remote()
print(ray.get(c.get_value.remote())) # 2

將 Python 類實例化為 actor

上面的例子是 actor 最簡單的用法。Counter.remote() 創建一個新的 actor 進程,它持有一個 Counter 對象副本。對 c.get_value.remote() 和 c.inc.remote() 的調用會在遠程 actor 進程上執行任務並改變 actor 的狀態。

actor 句柄

在上面的示例中,我們只在主 Python 腳本中調用 actor 的方法。actor 的一個最強大的地方在於我們可以將句柄傳給它,讓其他 actor 或其他任務都調用同一 actor 的方法。

以下示例創建了一個可以保存消息的 actor。幾個 worker 任務反覆將消息推送給 actor,主 Python 腳本定期讀取消息。

複製代碼

import time

@ray.remote

class MessageActor(object):
def __init__(self):
self.messages = []

def add_message(self, message):
self.messages.append(message)

def get_and_clear_messages(self):
messages = self.messages
self.messages = []
return messages

# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
for i in range(100):
time.sleep(1)
message_actor.add_message.remote(
"Message {} from actor {}.".format(i, j))

# Create a message actor.
message_actor = MessageActor.remote()

# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]

# Periodically get the messages and print them.
for _ in range(100):
new_messages = ray.get(message_actor.get_and_clear_messages.remote())
print("New messages:", new_messages)
time.sleep(1)

# This/># New messages: []
# New messages: ['Message 0 from actor 1.', 'Message 0 from actor 0.']
# New messages: ['Message 0 from actor 2.', 'Message 1 from actor 1.', 'Message 1 from actor 0.', 'Message 1 from actor 2.']
# New messages: ['Message 2 from actor 1.', 'Message 2 from actor 0.', 'Message 2 from actor 2.']
# New messages: ['Message 3 from actor 2.', 'Message 3 from actor 1.', 'Message 3 from actor 0.']
# New messages: ['Message 4 from actor 2.', 'Message 4 from actor 0.', 'Message 4 from actor 1.']
# New messages: ['Message 5 from actor 2.', 'Message 5 from actor 0.', 'Message 5 from actor 1.']

在多個併發任務中調用 actor 的方法

actor 非常強大。你可以通過它將 Python 類實例化為微服務,可以從其他 actor 和任務(甚至其他應用程序中)查詢這個微服務。

任務和 actor 是 Ray 提供的核心抽象。這兩個概念非常通用,可用於實現複雜的應用程序,包括用於強化學習、超參數調整、加速 Pandas等 Ray 內置庫。

https://medium.com/@robertnishihara/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8


分享到:


相關文章: