背景
在數據科學世界,Python 是一個不可忽視的存在,且有愈演愈烈之勢。而其中主要的使用工具,包括 Numpy、Pandas 和 Scikit-learn 等。
Numpy
Numpy 是數值計算的基礎包,內部提供了多維數組(ndarray)這樣一個數據結構,用戶可以很方便地在任意維度上進行數值計算。
我們舉一個蒙特卡洛方法求解 Pi 的例子。這背後的原理非常簡單,現在我們有個半徑為1的圓和邊長為2的正方形,他們的中心都在原點。現在我們生成大量的均勻分佈的點,讓這些點落在正方形內,通過簡單的推導,我們就可以知道,Pi 的值 = 落在圓內的點的個數 / 點的總數 * 4。
這裡要注意,就是隨機生成的點的個數越多,結果越精確。
用 Numpy 實現如下:
<code>import
numpy
as
np
N
=
10
**
7
data
=
np.random.uniform(-1,
1
,
size=(N,
2
))
inside
=
(np.sqrt((data
**
2
).sum(axis=1))
<
1
).sum()
pi
=
4
*
inside
/
N
print('pi:
%.5f'
%
pi)
/<code>
可以看到,用 Numpy 來進行數值計算非常簡單,只要寥寥數行代碼,而如果讀者習慣了 Numpy 這種面相數組的思維方式之後,無論是代碼的可讀性還是執行效率都會有巨大提升。
pandas
pandas 是一個強大的數據分析和處理的工具,它其中包含了海量的 API 來幫助用戶在二維數據(DataFrame)上進行分析和處理。
pandas 中的一個核心數據結構就是 DataFrame,它可以簡單理解成表數據,但不同的是,它在行和列上都包含索引(Index),要注意這裡不同於數據庫的索引的概念,它的索引可以這麼理解:當從行看 DataFrame 時,我們可以把 DataFrame 看成行索引到行數據的這麼一個字典,通過行索引,可以很方便地選中一行數據;列也同理。
我們拿 movielens 的數據集 作為簡單的例子,來看 pandas 是如何使用的。這裡我們用的是 Movielens 20M Dataset.
<code>import
pandasas
pd ratings = pd.read_csv('ml-20m/ratings.csv'
) ratings.groupby('userId'
).agg({'rating'
: ['sum'
,'mean'
,'max'
,'min'
]})/<code>
通過一行簡單的 pandas.read_csv 就可以讀取 CSV 數據,接著按 userId 做分組聚合,求 rating 這列在每組的總和、平均、最大、最小值。
“食用“ pandas 的最佳方式,還是在 Jupyter notebook 裡,以交互式的方式來分析數據,這種體驗會讓你不由感嘆:人生苦短,我用 xx()
scikit-learn
scikit-learn 是一個 Python 機器學習包,提供了大量機器學習算法,用戶不需要知道算法的細節,只要通過幾個簡單的 high-level 接口就可以完成機器學習任務。當然現在很多算法都使用深度學習,但 scikit-learn 依然能作為基礎機器學習庫來串聯整個流程。
我們以 K-最鄰近算法為例,來看看用 scikit-learn 如何完成這個任務。
<code>import
pandasas
pdfrom
sklearn.neighborsimport
NearestNeighbors df = pd.read_csv('data.csv'
) nn = NearestNeighbors(n_neighbors=10
) nn.fit(df) neighbors = nn.kneighbors(df)/<code>
fit接口就是 scikit-learn 裡最常用的用來學習的接口。可以看到整個過程非常簡單易懂。
Mars——Numpy、pandas 和 scikit-learn 的並行和分佈式加速器
Python 數據科學棧非常強大,但它們有如下幾個問題:
- 現在是多核時代,這幾個庫裡鮮有操作能利用得上多核的能力。
- 隨著深度學習的流行,用來加速數據科學的新的硬件層出不窮,這其中最常見的就是 GPU,在深度學習前序流程中進行數據處理,我們是不是也能用上 GPU 來加速呢?
- 這幾個庫的操作都是命令式的(imperative),和命令式相對應的就是聲明式(declarative)。命令式的更關心 how to do,每一個操作都會立即得到結果,方便對結果進行探索,優點是很靈活;缺點則是中間過程可能佔用大量內存,不能及時釋放,而且每個操作之間就被割裂了,沒有辦法做算子融合來提升性能;那相對應的聲明式就剛好相反,它更關心 what to do,它只關心結果是什麼,中間怎麼做並沒有這麼關心,典型的聲明式像 SQL、TensorFlow 1.x,聲明式可以等用戶真正需要結果的時候才去執行,也就是 lazy evaluation,這中間過程就可以做大量的優化,因此性能上也會有更好的表現,缺點自然也就是命令式的優點,它不夠靈活,調試起來比較困難。
為了解決這幾個問題,Mars 被我們開發出來,Mars 在 MaxCompute 團隊內部誕生,它的主要目標就是讓 Numpy、pandas 和 scikit-learn 等數據科學的庫能夠並行和分佈式執行,充分利用多核和新的硬件。
Mars 的開發過程中,我們核心關注的幾點包括:
- 我們希望 Mars 足夠簡單,只要會用 Numpy、pandas 或 scikit-learn 就會用 Mars。
- 避免重複造輪子,我們希望能利用到這些庫已有的成果,只需要能讓他們被調度到多核/多機上即可。
- 聲明式和命令式兼得,用戶可以在這兩者之間自由選擇,靈活度和性能兼而有之。
- 足夠健壯,生產可用,能應付各種 failover 的情況。
當然這些是我們的目標,也是我們一直努力的方向。
Mars tensor:Numpy 的並行和分佈式加速器
上面說過,我們的目標之一是,只要會用 Numpy 等數據科學包,就會用 Mars。我們直接來看代碼,還是以蒙特卡洛為例。變成 Mars 的代碼是什麼樣子呢?
<code>import mars.tensor as mt N =10
**10
data = mt.random
.uniform(-1
,1
, size=(N,2
)) inside = (mt.sqrt
((data **2
).sum(axis=1
))1
).sum()pi
= (4
* inside / N).execute
()'pi: %.5f'
%pi
)/<code>
可以看到,區別就只有兩處:import numpy as np 變成 import mars.tensor as mt ,後續的 np. 都變成 mt. ;pi 在打印之前調用了一下 .execute() 方法。
也就是默認情況下,Mars 會按照聲明式的方式,代碼本身移植的代價極低,而在真正需要一個數據的時候,通過 .execute() 去觸發執行。這樣能最大限度得優化性能,以及減少中間過程內存消耗。
這裡,我們還將數據的規模擴大了 1000 倍,來到了 100 億個點。之前 1/1000 的數據量的時候,在我的筆記本上需要 757ms;而現在數據擴大一千倍,光 data 就需要 150G 的內存,這用 Numpy 本身根本無法完成。而使用 Mars,計算時間只需要 3min 44s,而峰值內存只需要 1G 左右。假設我們認為內存無限大,Numpy 需要的時間也就是之前的 1000 倍,大概是 12min 多,可以看到 Mars 充分利用了多核的能力,並且通過聲明式的方式,極大減少了中間內存佔用。
前面說到,我們試圖讓聲明式和命令式兼得,而使用命令式的風格,只需要在代碼的開始配置一個選項即可。
<code>import
mars.tensor
as
mt
from
mars.config
import
options
options.eager_mode
=
True
N
=
10
**
7
data
=
mt.random.uniform(-1,
1
,
size=(N,
2
))
inside
=
(mt.linalg.norm(data,
axis=1)
<
1
).sum()
pi
=
4
*
inside
/
N
print('pi:
%.5f'
%
pi.fetch())
/<code>
Mars DataFrame:pandas 的並行和分佈式加速器
看過怎麼樣輕鬆把 Numpy 代碼遷移到 Mars tensor ,想必讀者也知道怎麼遷移 pandas 代碼了,同樣也只有兩個區別。我們還是以 movielens 的代碼為例。
<code>import
mars.dataframeas
md ratings = md.read_csv('ml-20m/ratings.csv'
) ratings.groupby('userId'
).agg({'rating'
: ['sum'
,'mean'
,'max'
,'min'
]}).execute()/<code>
Mars Learn:scikit-learn 的並行和分佈式加速器
Mars Learn 也同理,這裡就不做過多闡述了。但目前 Mars learn 支持的 scikit-learn 算法還不多,我們也在努力移植的過程中,這需要大量的人力和時間,歡迎感興趣的同學一起參與。
<code>import
mars.dataframeas
mdfrom
mars.learn.neighborsimport
NearestNeighbors df = md.read_csv('data.csv'
) nn = NearestNeighbors(n_neighbors=10
) nn.fit(df) neighbors = nn.kneighbors(df).fetch() /<code>
這裡要注意的是,對於機器學習的 fit、predict 等高層接口,Mars Learn 也會立即觸發執行,以保證語義的正確性。
RAPIDS:GPU 上的數據科學
相信細心的觀眾已經發現,GPU 好像沒有被提到。不要著急,這就要說到 RAPIDS。
在之前,雖然 CUDA 已經將 GPU 編程的門檻降到相當低的一個程度了,但對於數據科學家們來說,在 GPU 上處理 Numpy、pandas 等能處理的數據無異於天方夜譚。幸運的是,NVIDIA 開源了 RAPIDS 數據科學平臺,它和 Mars 的部分思想高度一致,即使用簡單的 import 替換,就可以將 Numpy、pandas 和 scikit-learn 的代碼移植到 GPU 上。
其中,RAPIDS cuDF 用來加速 pandas,而 RAPIDS cuML 用來加速 scikit-learn。
對於 Numpy 來說,CuPy 已經很好地支持用 GPU 來加速了,這樣 RAPIDS 也得以把重心放在數據科學的其他部分。
CuPy:用 GPU 加速 Numpy
還是蒙特卡洛求解 Pi。
<code>import
cupy
as
cp
N
=
10
**
7
data
=
cp.random.uniform(-1,
1
,
size=(N,
2
))
inside
=
(cp.sqrt((data
**
2
).sum(axis=1))
<
1
).sum()
pi
=
4
*
inside
/
N
print('pi:
%.5f'
%
pi)
/<code>
在我的測試中,它將 CPU 的 757ms,降到只有 36ms,提升超過 20 倍,可以說效果非常顯著。這正是得益於 GPU 非常適合計算密集型的任務。
RAPIDS cuDF:用 GPU 加速 pandas
將 import pandas as pd 替換成 import cudf,GPU 內部如何並行,CUDA 編程這些概念,用戶都不再需要關心。
<code>import
cudf ratings = cudf.read_csv('ml-20m/ratings.csv'
) ratings.groupby('userId'
).agg({'rating'
: ['sum'
,'mean'
,'max'
,'min'
]})/<code>
運行時間從 CPU 上的 18s 提升到 GPU 上的 1.66s,提升超過 10 倍。
RAPIDS cuML:用 GPU 加速 scikit-learn
同樣是 k-最鄰近問題。
<code>import
cudf
from
cuml.neighbors import NearestNeighbors
df
=cudf.read_csv('data.csv')
nn
=NearestNeighbors(n_neighbors=10)
nn.fit(df)
neighbors
=nn.kneighbors(df)
/<code>
運行時間從 CPU 上 1min52s,提升到 GPU 上 17.8s。
Mars 和 RAPIDS 結合能帶來什麼?
RAPIDS 將 Python 數據科學帶到了 GPU,極大地提升了數據科學的運行效率。它們和 Numpy 等一樣,是命令式的。通過和 Mars 結合,中間過程將會使用更少的內存,這使得數據處理量更大;Mars 也可以將計算分散到多機多卡,以提升數據規模和計算效率。
在 Mars 裡使用 GPU 也很簡單,只需要在對應函數上指定 gpu=True。例如創建 tensor、讀取 CSV 文件等都適用。
<code>import
mars.tensoras
mtimport
mars.dataframeas
md a = mt.random.uniform(-1
,1
, size=(1000
,1000
), gpu=True
) df = md.read_csv('ml-20m/ratings.csv'
, gpu=True
)/<code>
下圖是用 Mars 分別在 Scale up 和 Scale out 兩個維度上加速蒙特卡洛計算 Pi 這個任務。一般來說,我們要加速一個數據科學任務,可以有這兩種方式,Scale up 是指可以使用更好的硬件,比如用更好的 CPU、更大的內存、使用 GPU 替代 CPU等;Scale out 就是指用更多的機器,用分佈式的方式提升效率。
可以看到在一臺 24 核的機器上,Mars 計算需要 25.8s,而通過分佈式的方式,使用 4 臺 24 核的機器的機器幾乎以線性的時間提升。而通過使用一個 NVIDIA TESLA V100 顯卡,我們就能將單機的運行時間提升到 3.98s,這已經超越了4臺 CPU 機器的性能。通過再將單卡拓展到多卡,時間進一步降低,但這裡也可以看到,時間上很難再線性擴展了,這是因為 GPU 的運行速度提升巨大,這個時候網絡、數據拷貝等的開銷就變得明顯。
性能測試
我們使用了 https://github.com/h2oai/db-benchmark 的數據集,測試了三個數據規模的 groupby 和 一個數據規模的 join。而我們主要對比了 pandas 和 DASK。DASK 和 Mars 的初衷很類似,也是試圖並行和分佈式化 Python 數據科學,但它們的設計、實現、分佈式都存在較多差異,這個後續我們再撰文進行詳細對比。
測試機器配置是 500G 內存、96 核、NVIDIA V100 顯卡。Mars 和 DASK 在 GPU 上都使用 RAPIDS 執行計算。
Groupby
數據有三個規模,分別是 500M、5G 和 20G。
查詢也有三組。
查詢一
<code>df = read_csv('data.csv'
) df.groupby('id1'
).agg({'v1'
:'sum'
})/<code>
查詢二
<code>df = read_csv('data.csv'
) df.groupby(['id1'
,'id2'
]).agg({'v1'
:'sum'
})/<code>
查詢三
<code>df = read_csv('data.csv'
) df.gropuby(['id6'
]).agg({'v1'
:'sum'
,'v2'
:'sum'
,'v3'
:'sum'
})/<code>
數據大小 500M,性能結果
數據大小 5G,性能結果
數據大小 20G,性能結果
數據大小到 20G 時,pandas 在查詢2會內存溢出,得不出結果。
可以看到,隨著數據增加,Mars 的性能優勢會愈發明顯。
得益於 GPU 的計算能力,GPU 運算性能相比於 CPU 都有數倍的提升。如果單純使用 RAPIDS cuDF,由於顯存大小的限制,數據來到 5G 都難以完成,而由於 Mars 的聲明式的特點,中間過程對顯存的使用大幅得到優化,所以整組測試來到 20G 都能輕鬆完成。這正是 Mars + RAPIDS 所能發揮的威力。
Join
測試查詢:
<code>x
= read_csv('x.csv'
) y = read_csv('y.csv'
) x.merge(y,on
='id1'
)/<code>
測試數據 x 為500M,y 包含10行數據。
總結
RAPIDS 將 Python 數據科學帶到了 GPU,極大提升了數據分析和處理的效率。Mars 的注意力更多放在並行和分佈式。相信這兩者的結合,在未來會有更多的想象空間。