09.18 使用scikit-learn和PySpark Pandas UDF進行大規模預測

使用scikit-learn和PySpark Pandas UDF進行大規模預測

scikit- learning是Python中機器學習的一個很好的工具,在實現管道和運行實驗方面具有很大的靈活性,但它並不是真正為“大數據”(例如數億條記錄或更多)上的分佈式計算而設計的。

一個常見的預測建模場景是有少量或中等數量的標記數據來估計一個模型(例如,10,000條記錄),但是需要一個大得多的未標記機器學習數據集來進行預測。在這個場景中,您可能希望使用scikit-learn在筆記本電腦或單臺服務器上對機器學習模型進行訓練,以獲得易用性和靈活性,但隨後通過使用PySpark來分發計算,可以更快地將該機器學習模型應用到大型的未標記數據集中。如果您的ETL任務已經使用PySpark實現了,那麼使用PySpark進行分佈式預測也是有意義的,這對於數據轉換和ETL非常有用。

PySpark具有pickle python對象(包括函數)的功能,並將其應用於跨進程、機器等分佈的數據。此外,它具有類似於pandas的語法,但將計算的定義與執行分離開來,類似於TensorFlow。

問題是,

  • a)基於java的Spark執行進程之間傳遞數據(在機器之間發送數據並能高效執行轉換)
  • b) Python進程(例如,用於用scikit-learn進行預測)由於序列化和進程間通信而帶來一些開銷。

一個解決方案是PySpark DataFrame API中的用戶定義函數(udf)。您可以使用DataFrame API在Java中高效地執行大多數操作(不需要編寫Java或Scala!),但是隻有在必要時才調用Python udf,這會導致Java-Python通信開銷。

正常的PySpark udf每次只執行一個值,這會導致大量Java-Python通信開銷。最近,PySpark添加了panda udf,它通過Apache Arrow有效地將DataFrame的數據塊轉換成panda系列對象,以避免常規udf的大量開銷。讓UDF預期panda系列還可以節省Python和NumPy浮點表示法之間的轉換,就像常規UDF那樣。

下面的Python示例代碼可能會幫助那些有在PySpark中部署scikit-learn機器學習模型用於預測的特定用例的人。

最後需要注意的是,我要提到將PySpark作為Pandas替代數據幀實現和/或Python的concurrent.futures進行並行化是值得的。PySpark只需要額外的努力就可以處理小型數據集,但如果需要可以更容易地擴展。

設置並生成一些數據

import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
import pandas as pd
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, StringType, ArrayType

# Make some fake data and train a model.
n_samples_test = 100000

n_samples_train = 1000
n_samples_all = n_samples_train + n_samples_test
n_features = 50

X, y = make_classification(n_samples=n_samples_all, n_features=n_features, random_state=123)
X_train, X_test, y_train, y_test = \\
train_test_split(X, y, test_size=n_samples_test, random_state=45)

# Use pandas to put the test data in parquet format to illustrate how to load it up later.
# In real usage, the data might be on S3, Azure Blog Storage, HDFS, etc.
column_names = [f'feature{i}' for i in range(n_features)]
(
pd.DataFrame(X_test, columns=column_names)
.reset_index()
.rename(columns={'index': 'id'})
.to_parquet('unlabeled_data')
)

用scikit-learn來訓練一個機器學習模型

param_grid = {'n_estimators': [100], 'max_depth': [2, 4, None]}
gs_rf = GridSearchCV(
RandomForestClassifier(random_state=42),
param_grid=param_grid,
scoring='roc_auc'
).fit(X_train, y_train)
print('ROC AUC: %.3f' % gs_rf.best_score_)
ROC AUC: 0.959

建立一個spark環境

sc = pyspark.SparkContext(appName="foo")
sqlContext = pyspark.SQLContext(sc)

現在加載數據並做出預測

在實際使用中,在讀取原始數據之後,我們可能會做一些ETL,但是在這裡,我們只是加載它。

df_unlabeled = sqlContext.read.parquet('unlabeled_data')
df_unlabeled

DataFrame[id: bigint, feature0: double, feature1: double, feature2: double, feature3: double, feature4: double, feature5: double, feature6: double, feature7: double, feature8: double, feature9: double, feature10: double, feature11: double, feature12: double, feature13: double, feature14: double, feature15: double, feature16: double, feature17: double, feature18: double, feature19: double, feature20: double, feature21: double, feature22: double, feature23: double, feature24: double, feature25: double, feature26: double, feature27: double, feature28: double, feature29: double, feature30: double, feature31: double, feature32: double, feature33: double, feature34: double, feature35: double, feature36: double, feature37: double, feature38: double, feature39: double, feature40: double, feature41: double, feature42: double, feature43: double, feature44: double, feature45: double, feature46: double, feature47: double, feature48: double, feature49: double, __index_level_0__: bigint]DataFra

用常規UDF進行預測

首先,我們將嘗試普通UDF。這將反序列化一行(即,實例,樣本,記錄),一次做一個預測,然後返回一個預測,這個預測將被序列化併發送回Spark以與所有其他預測相結合。

@F.udf(returnType=DoubleType())
def predict_udf(*cols):
# cols will be a tuple of floats here.
return float(gs_rf.predict_proba((cols,))[0, 1])

df_pred_a = df_unlabeled.select(
F.col('id'),
predict_udf(*column_names).alias('prediction')
)
df_pred_a.take(5)

[Row(id=0, prediction=0.96),

Row(id=1, prediction=0.13),

Row(id=2, prediction=0.95),

Row(id=3, prediction=0.43),

Row(id=4, prediction=0.95)]

用Pandas UDF做預測

現在我們將使用Pandas UDF(即矢量化UDF)。在這種情況下,Spark將一次發送多行Pandas序列對象的元組。元組每個列/特性將有一個Series ,按照它們被傳遞給UDF的順序。請注意,這些Series 對象中的一個不會一次包含所有行的特徵,因為Spark將數據集劃分為多個workers。分區大小可以調優,但這裡我們只使用默認值。

@F.pandas_udf(returnType=DoubleType())
def predict_pandas_udf(*cols):
# cols will be a tuple of pandas.Series here.
X = pd.concat(cols, axis=1)
return pd.Series(gs_rf.predict_proba(X)[:, 1])

df_pred_b = df_unlabeled.select(
F.col('id'),
predict_pandas_udf(*column_names).alias('prediction')
)
df_pred_b.take(5)

[Row(id=0, prediction=0.96),

Row(id=1, prediction=0.13),

Row(id=2, prediction=0.95),

Row(id=3, prediction=0.43),

Row(id=4, prediction=0.95)]

多類預測

上面,我們只是返回了一個正類的預測序列,它適用於單個binary或因變量。還可以在 Pandas udf中放置多類或多標籤模型。

@F.pandas_udf(returnType=ArrayType(DoubleType()))
def predict_pandas_udf(*cols):
X = pd.concat(cols, axis=1)
return pd.Series(row.tolist() for row in gs_rf.predict_proba(X))

df_pred_multi = (
df_unlabeled.select(
F.col('id'),
predict_pandas_udf(*column_names).alias('predictions')
)
# Select each item of the prediction array into its own column.
.select(
F.col('id'),
*[F.col('predictions')[i].alias(f'prediction_{c}')
for i, c in enumerate(gs_rf.classes_)]
)
)

df_pred_multi.take(5)

[Row(id=0, prediction_0=0.04, prediction_1=0.96),

Row(id=1, prediction_0=0.87, prediction_1=0.13),

Row(id=2, prediction_0=0.05, prediction_1=0.95),

Row(id=3, prediction_0=0.57, prediction_1=0.43),

Row(id=4, prediction_0=0.05, prediction_1=0.95)]


分享到:


相關文章: