戰“疫”期,阿里云云效團隊在家高效開發實錄

背景

在數據科學世界,Python 是一個不可忽視的存在,且有愈演愈烈之勢。而其中主要的使用工具,包括 Numpy、Pandas 和 Scikit-learn 等。

Numpy

Numpy 是數值計算的基礎包,內部提供了多維數組(ndarray)這樣一個數據結構,用戶可以很方便地在任意維度上進行數值計算。

戰“疫”期,阿里云云效團隊在家高效開發實錄

我們舉一個蒙特卡洛方法求解 Pi 的例子。這背後的原理非常簡單,現在我們有個半徑為1的圓和邊長為2的正方形,他們的中心都在原點。現在我們生成大量的均勻分佈的點,讓這些點落在正方形內,通過簡單的推導,我們就可以知道,Pi 的值 = 落在圓內的點的個數 / 點的總數 * 4。

這裡要注意,就是隨機生成的點的個數越多,結果越精確。

用 Numpy 實現如下:

<code>

import

numpy

as

np

N

=

10

**

7

data

=

np.random.uniform(-1,

1

,

size=(N,

2

))

inside

=

(np.sqrt((data

**

2

).sum(axis=1))

<

1

).sum()

pi

=

4

*

inside

/

N

print('pi:

%.5f'

%

pi)

/<code>

可以看到,用 Numpy 來進行數值計算非常簡單,只要寥寥數行代碼,而如果讀者習慣了 Numpy 這種面相數組的思維方式之後,無論是代碼的可讀性還是執行效率都會有巨大提升。

pandas

pandas 是一個強大的數據分析和處理的工具,它其中包含了海量的 API 來幫助用戶在二維數據(DataFrame)上進行分析和處理。

pandas 中的一個核心數據結構就是 DataFrame,它可以簡單理解成表數據,但不同的是,它在行和列上都包含索引(Index),要注意這裡不同於數據庫的索引的概念,它的索引可以這麼理解:當從行看 DataFrame 時,我們可以把 DataFrame 看成行索引到行數據的這麼一個字典,通過行索引,可以很方便地選中一行數據;列也同理。

我們拿 movielens 的數據集 作為簡單的例子,來看 pandas 是如何使用的。這裡我們用的是 Movielens 20M Dataset.

<code>

import

pandas

as

pd ratings = pd.read_csv(

'ml-20m/ratings.csv'

) ratings.groupby(

'userId'

).agg({

'rating'

: [

'sum'

,

'mean'

,

'max'

,

'min'

]})/<code>

通過一行簡單的 pandas.read_csv 就可以讀取 CSV 數據,接著按 userId 做分組聚合,求 rating 這列在每組的總和、平均、最大、最小值。

“食用“ pandas 的最佳方式,還是在 Jupyter notebook 裡,以交互式的方式來分析數據,這種體驗會讓你不由感嘆:人生苦短,我用 xx()

scikit-learn

scikit-learn 是一個 Python 機器學習包,提供了大量機器學習算法,用戶不需要知道算法的細節,只要通過幾個簡單的 high-level 接口就可以完成機器學習任務。當然現在很多算法都使用深度學習,但 scikit-learn 依然能作為基礎機器學習庫來串聯整個流程。

我們以 K-最鄰近算法為例,來看看用 scikit-learn 如何完成這個任務。

<code>

import

pandas

as

pd

from

sklearn.neighbors

import

NearestNeighbors df = pd.read_csv(

'data.csv'

) nn = NearestNeighbors(n_neighbors=

10

) nn.fit(df) neighbors = nn.kneighbors(df)/<code>

fit接口就是 scikit-learn 裡最常用的用來學習的接口。可以看到整個過程非常簡單易懂。

Mars——Numpy、pandas 和 scikit-learn 的並行和分佈式加速器

Python 數據科學棧非常強大,但它們有如下幾個問題:

  1. 現在是多核時代,這幾個庫裡鮮有操作能利用得上多核的能力。
  2. 隨著深度學習的流行,用來加速數據科學的新的硬件層出不窮,這其中最常見的就是 GPU,在深度學習前序流程中進行數據處理,我們是不是也能用上 GPU 來加速呢?
  3. 這幾個庫的操作都是命令式的(imperative),和命令式相對應的就是聲明式(declarative)。命令式的更關心 how to do,每一個操作都會立即得到結果,方便對結果進行探索,優點是很靈活;缺點則是中間過程可能佔用大量內存,不能及時釋放,而且每個操作之間就被割裂了,沒有辦法做算子融合來提升性能;那相對應的聲明式就剛好相反,它更關心 what to do,它只關心結果是什麼,中間怎麼做並沒有這麼關心,典型的聲明式像 SQL、TensorFlow 1.x,聲明式可以等用戶真正需要結果的時候才去執行,也就是 lazy evaluation,這中間過程就可以做大量的優化,因此性能上也會有更好的表現,缺點自然也就是命令式的優點,它不夠靈活,調試起來比較困難。

為了解決這幾個問題,Mars 被我們開發出來,Mars 在 MaxCompute 團隊內部誕生,它的主要目標就是讓 Numpy、pandas 和 scikit-learn 等數據科學的庫能夠並行和分佈式執行,充分利用多核和新的硬件。

Mars 的開發過程中,我們核心關注的幾點包括:

  1. 我們希望 Mars 足夠簡單,只要會用 Numpy、pandas 或 scikit-learn 就會用 Mars。
  2. 避免重複造輪子,我們希望能利用到這些庫已有的成果,只需要能讓他們被調度到多核/多機上即可。
  3. 聲明式和命令式兼得,用戶可以在這兩者之間自由選擇,靈活度和性能兼而有之。
  4. 足夠健壯,生產可用,能應付各種 failover 的情況。

當然這些是我們的目標,也是我們一直努力的方向。

Mars tensor:Numpy 的並行和分佈式加速器

上面說過,我們的目標之一是,只要會用 Numpy 等數據科學包,就會用 Mars。我們直接來看代碼,還是以蒙特卡洛為例。變成 Mars 的代碼是什麼樣子呢?

<code>import mars.tensor as mt

N = 

10

**

10

data = mt.

random

.uniform(

-1

,

1

, size=(N,

2

)) inside = (mt.

sqrt

((data **

2

).sum(axis=

1

))

1

).sum()

pi

= (

4

* inside / N).

execute

()

print

(

'pi: %.5f'

%

pi

)/<code>

可以看到,區別就只有兩處:import numpy as np 變成 import mars.tensor as mt ,後續的 np. 都變成 mt. ;pi 在打印之前調用了一下 .execute() 方法。

也就是默認情況下,Mars 會按照聲明式的方式,代碼本身移植的代價極低,而在真正需要一個數據的時候,通過 .execute() 去觸發執行。這樣能最大限度得優化性能,以及減少中間過程內存消耗。

這裡,我們還將數據的規模擴大了 1000 倍,來到了 100 億個點。之前 1/1000 的數據量的時候,在我的筆記本上需要 757ms;而現在數據擴大一千倍,光 data 就需要 150G 的內存,這用 Numpy 本身根本無法完成。而使用 Mars,計算時間只需要 3min 44s,而峰值內存只需要 1G 左右。假設我們認為內存無限大,Numpy 需要的時間也就是之前的 1000 倍,大概是 12min 多,可以看到 Mars 充分利用了多核的能力,並且通過聲明式的方式,極大減少了中間內存佔用。

前面說到,我們試圖讓聲明式和命令式兼得,而使用命令式的風格,只需要在代碼的開始配置一個選項即可。

<code>

import

mars.tensor

as

mt

from

mars.config

import

options

options.eager_mode

=

True

N

=

10

**

7

data

=

mt.random.uniform(-1,

1

,

size=(N,

2

))

inside

=

(mt.linalg.norm(data,

axis=1)

<

1

).sum()

pi

=

4

*

inside

/

N

print('pi:

%.5f'

%

pi.fetch())

/<code>

Mars DataFrame:pandas 的並行和分佈式加速器

看過怎麼樣輕鬆把 Numpy 代碼遷移到 Mars tensor ,想必讀者也知道怎麼遷移 pandas 代碼了,同樣也只有兩個區別。我們還是以 movielens 的代碼為例。

<code>

import

mars.dataframe

as

md ratings = md.read_csv(

'ml-20m/ratings.csv'

) ratings.groupby(

'userId'

).agg({

'rating'

: [

'sum'

,

'mean'

,

'max'

,

'min'

]}).execute()/<code>

Mars Learn:scikit-learn 的並行和分佈式加速器

Mars Learn 也同理,這裡就不做過多闡述了。但目前 Mars learn 支持的 scikit-learn 算法還不多,我們也在努力移植的過程中,這需要大量的人力和時間,歡迎感興趣的同學一起參與。

<code>

import

mars.dataframe

as

md

from

mars.learn.neighbors

import

NearestNeighbors df = md.read_csv(

'data.csv'

) nn = NearestNeighbors(n_neighbors=

10

) nn.fit(df) neighbors = nn.kneighbors(df).fetch() /<code>

這裡要注意的是,對於機器學習的 fit、predict 等高層接口,Mars Learn 也會立即觸發執行,以保證語義的正確性。

RAPIDS:GPU 上的數據科學

相信細心的觀眾已經發現,GPU 好像沒有被提到。不要著急,這就要說到 RAPIDS。

在之前,雖然 CUDA 已經將 GPU 編程的門檻降到相當低的一個程度了,但對於數據科學家們來說,在 GPU 上處理 Numpy、pandas 等能處理的數據無異於天方夜譚。幸運的是,NVIDIA 開源了 RAPIDS 數據科學平臺,它和 Mars 的部分思想高度一致,即使用簡單的 import 替換,就可以將 Numpy、pandas 和 scikit-learn 的代碼移植到 GPU 上。

戰“疫”期,阿里云云效團隊在家高效開發實錄

其中,RAPIDS cuDF 用來加速 pandas,而 RAPIDS cuML 用來加速 scikit-learn。

對於 Numpy 來說,CuPy 已經很好地支持用 GPU 來加速了,這樣 RAPIDS 也得以把重心放在數據科學的其他部分。

CuPy:用 GPU 加速 Numpy

還是蒙特卡洛求解 Pi。

<code>

import

cupy

as

cp

N

=

10

**

7

data

=

cp.random.uniform(-1,

1

,

size=(N,

2

))

inside

=

(cp.sqrt((data

**

2

).sum(axis=1))

<

1

).sum()

pi

=

4

*

inside

/

N

print('pi:

%.5f'

%

pi)

/<code>

在我的測試中,它將 CPU 的 757ms,降到只有 36ms,提升超過 20 倍,可以說效果非常顯著。這正是得益於 GPU 非常適合計算密集型的任務。

RAPIDS cuDF:用 GPU 加速 pandas

將 import pandas as pd 替換成 import cudf,GPU 內部如何並行,CUDA 編程這些概念,用戶都不再需要關心。

<code>

import

cudf ratings = cudf.read_csv(

'ml-20m/ratings.csv'

) ratings.groupby(

'userId'

).agg({

'rating'

: [

'sum'

,

'mean'

,

'max'

,

'min'

]})/<code>

運行時間從 CPU 上的 18s 提升到 GPU 上的 1.66s,提升超過 10 倍。

RAPIDS cuML:用 GPU 加速 scikit-learn

同樣是 k-最鄰近問題。

<code>

import

cudf

from

cuml.neighbors import NearestNeighbors

df

=

cudf.read_csv('data.csv')

nn

=

NearestNeighbors(n_neighbors=10)

nn.fit(df)

neighbors

=

nn.kneighbors(df)

/<code>

運行時間從 CPU 上 1min52s,提升到 GPU 上 17.8s。

Mars 和 RAPIDS 結合能帶來什麼?

RAPIDS 將 Python 數據科學帶到了 GPU,極大地提升了數據科學的運行效率。它們和 Numpy 等一樣,是命令式的。通過和 Mars 結合,中間過程將會使用更少的內存,這使得數據處理量更大;Mars 也可以將計算分散到多機多卡,以提升數據規模和計算效率。

在 Mars 裡使用 GPU 也很簡單,只需要在對應函數上指定 gpu=True。例如創建 tensor、讀取 CSV 文件等都適用。

<code>

import

mars.tensor

as

mt

import

mars.dataframe

as

md a = mt.random.uniform(

-1

,

1

, size=(

1000

,

1000

), gpu=

True

) df = md.read_csv(

'ml-20m/ratings.csv'

, gpu=

True

)/<code>

下圖是用 Mars 分別在 Scale up 和 Scale out 兩個維度上加速蒙特卡洛計算 Pi 這個任務。一般來說,我們要加速一個數據科學任務,可以有這兩種方式,Scale up 是指可以使用更好的硬件,比如用更好的 CPU、更大的內存、使用 GPU 替代 CPU等;Scale out 就是指用更多的機器,用分佈式的方式提升效率。

戰“疫”期,阿里云云效團隊在家高效開發實錄

可以看到在一臺 24 核的機器上,Mars 計算需要 25.8s,而通過分佈式的方式,使用 4 臺 24 核的機器的機器幾乎以線性的時間提升。而通過使用一個 NVIDIA TESLA V100 顯卡,我們就能將單機的運行時間提升到 3.98s,這已經超越了4臺 CPU 機器的性能。通過再將單卡拓展到多卡,時間進一步降低,但這裡也可以看到,時間上很難再線性擴展了,這是因為 GPU 的運行速度提升巨大,這個時候網絡、數據拷貝等的開銷就變得明顯。

性能測試

我們使用了 https://github.com/h2oai/db-benchmark 的數據集,測試了三個數據規模的 groupby 和 一個數據規模的 join。而我們主要對比了 pandas 和 DASK。DASK 和 Mars 的初衷很類似,也是試圖並行和分佈式化 Python 數據科學,但它們的設計、實現、分佈式都存在較多差異,這個後續我們再撰文進行詳細對比。

測試機器配置是 500G 內存、96 核、NVIDIA V100 顯卡。Mars 和 DASK 在 GPU 上都使用 RAPIDS 執行計算。

Groupby

數據有三個規模,分別是 500M、5G 和 20G。

查詢也有三組。

查詢一

<code>df = read_csv(

'data.csv'

) df.groupby(

'id1'

).agg({

'v1'

:

'sum'

})/<code>

查詢二

<code>df = read_csv(

'data.csv'

) df.groupby([

'id1'

,

'id2'

]).agg({

'v1'

:

'sum'

})/<code>

查詢三

<code>df = read_csv(

'data.csv'

) df.gropuby([

'id6'

]).agg({

'v1'

:

'sum'

,

'v2'

:

'sum'

,

'v3'

:

'sum'

})/<code>

數據大小 500M,性能結果

戰“疫”期,阿里云云效團隊在家高效開發實錄

數據大小 5G,性能結果

戰“疫”期,阿里云云效團隊在家高效開發實錄

數據大小 20G,性能結果

戰“疫”期,阿里云云效團隊在家高效開發實錄

數據大小到 20G 時,pandas 在查詢2會內存溢出,得不出結果。

可以看到,隨著數據增加,Mars 的性能優勢會愈發明顯。

得益於 GPU 的計算能力,GPU 運算性能相比於 CPU 都有數倍的提升。如果單純使用 RAPIDS cuDF,由於顯存大小的限制,數據來到 5G 都難以完成,而由於 Mars 的聲明式的特點,中間過程對顯存的使用大幅得到優化,所以整組測試來到 20G 都能輕鬆完成。這正是 Mars + RAPIDS 所能發揮的威力。

Join

測試查詢:

<code>

x

= read_csv(

'x.csv'

) y = read_csv(

'y.csv'

) x.merge(y,

on

=

'id1'

)/<code>

測試數據 x 為500M,y 包含10行數據。

戰“疫”期,阿里云云效團隊在家高效開發實錄

總結

RAPIDS 將 Python 數據科學帶到了 GPU,極大提升了數據分析和處理的效率。Mars 的注意力更多放在並行和分佈式。相信這兩者的結合,在未來會有更多的想象空間。


分享到:


相關文章: