通過Apache Spark和Pandas輕鬆介紹Apache Arrow

這次,我將嘗試解釋如何將Apache Arrow與Apache Spark和Python結合使用。 首先,讓我分享有關此開源項目的一些基本概念。

通過Apache Spark和Pandas輕鬆介紹Apache Arrow

> [Apache Arrow]

Apache Arrow是用於內存數據的跨語言開發平臺。 它為平面和分層數據指定了一種與語言無關的標準化列式存儲格式,該格式組織用於在現代硬件上進行有效的分析操作。 [Apache箭頭頁面]

簡而言之,它促進了許多組件之間的通信,例如,使用Python(熊貓)讀取實木複合地板文件並轉換為Spark數據框,Falcon Data Visualization或Cassandra,而無需擔心轉換。

通過Apache Spark和Pandas輕鬆介紹Apache Arrow

> Overview Apache Arrow [Julien Le Dem, Spark Summit 2017]


一個好問題是問數據在內存中的外觀如何? 好吧,Apache Arrow利用列緩衝區來減少IO並加快分析處理性能。

通過Apache Spark和Pandas輕鬆介紹Apache Arrow

> Columnar In-memory [Apache Arrow page]

在我們的例子中,我們將使用pyarrow庫執行一些基本代碼並檢查一些功能。 為了安裝,我們有兩個使用conda或pip命令*的選項。

<code>

conda

install -c conda-forge pyarrow

pip

install pyarrow

/<code>

*建議在Python 3環境中使用conda。

帶有HDFS的Apache Arrow(遠程文件系統)

Apache Arrow附帶了到Hadoop File System的基於C ++的接口的綁定。 這意味著我們可以從HDFS讀取或下載所有文件,並直接使用Python進行解釋。

連接

主機是名稱節點,端口通常是RPC或WEBHDFS,允許使用更多參數,例如user,kerberos ticket。 強烈建議您閱讀所需的環境變量。

<code>

import

pyarrow as pa

host

=

'1970.x.x.x'

port

=

8022

fs

=

pa.hdfs.connect(host, port)

/<code>

· 如果您的連接位於數據或邊緣節點的前面,則可以選擇使用

<code>

fs

= pa.hdfs.connect()/<code>

將Parquet文件寫入HDFS

<code>

pq

.write_to_dataset

(table, root_path=

'dataset_name'

, partition_cols=[

'one'

,

'two'

], filesystem=fs)/<code>

從HDFS讀取CSV

<code>

import

pandas

as

pd

from

pyarrow

import

csv

import

pyarrow

as

pa fs = pa.hdfs.connect()

with

fs.open(

'iris.csv'

,

'rb'

)

as

f: df = pd.read_csv(f, nrows =

10

) df.head()/<code>
通過Apache Spark和Pandas輕鬆介紹Apache Arrow

> Reading CSV from HDFS

從HDFS讀取Parquet文件

有兩種形式可以從HDFS讀取實木複合地板文件

使用Pandas和Pyarrow引擎

<code>

import

pandas

as

pd pdIris = pd.read_parquet(

'hdfs:///iris/part-00000–27c8e2d3-fcc9–47ff-8fd1–6ef0b079f30e-c000.snappy.parquet'

, engine=

'pyarrow'

) pdTrain.head()/<code>

Parquet

<code>import pyarrow.parquet as pq

path

=

'hdfs:///iris/part-00000–71c8h2d3-fcc9–47ff-8fd1–6ef0b079f30e-c000.snappy.parquet'

table

= pq.read_table(

path

)

table

.schema df =

table

.to_pandas() df.head()/<code>

其他文件擴展名

由於我們可以存儲任何類型的文件(SAS,STATA,Excel,JSON或對象),因此Python可以輕鬆解釋其中的大多數文件。 為此,我們將使用open函數,該函數返回一個緩衝區對象,許多pandas函數(如read_sas,read_json)都可以接收該緩衝區對象作為輸入,而不是字符串URL。

SAS

<code>

import

pandas

as

pd

import

pyarrow

as

pa fs = pa.hdfs.connect()

with

fs.open(

'/datalake/airplane.sas7bdat'

,

'rb'

)

as

f: sas_df = pd.read_sas(f, format=

'sas7bdat'

) sas_df.head()/<code>

電子表格

<code>

import

pandas

as

pd

import

pyarrow

as

pa fs = pa.hdfs.connect()

with

fs.open(

'/datalake/airplane.xlsx'

,

'rb'

)

as

f: g.download(

'airplane.xlsx'

) ex_df = pd.read_excel(

'airplane.xlsx'

)/<code>

JSON格式

<code> 

import

pandas

as

pd

import

pyarrow

as

pa fs = pa.hdfs.connect()

with

fs.open(

'/datalake/airplane.json'

,

'rb'

)

as

f: g.download(

'airplane.json'

) js_df = pd.read_json(

'airplane.json'

)/<code>

從HDFS下載文件

如果我們只需要下載文件,Pyarrow為我們提供了下載功能,可以將文件保存在本地。

<code>

import

pandas

as

pd

import

pyarrow

as

pa fs = pa.hdfs.connect()

with

fs.open(

'/datalake/airplane.cs'

,

'rb'

)

as

f: g.download(

'airplane.cs'

)/<code>

上傳文件到HDFS

如果我們只需要下載文件,Pyarrow為我們提供了下載功能,可以將文件保存在本地。

<code>

import

pyarrow

as

pa fs = pa.hdfs.connect()

with

open(

'settings.xml'

)

as

f: pa.hdfs.HadoopFileSystem.upload(fs,

'/datalake/settings.xml'

, f)/<code>

Apache Arrow with Pandas(本地文件系統)

將Pandas Dataframe轉換為Apache Arrow Table

<code>

import

numpy

as

np

import

pandas

as

pd

import

pyarrow

as

pa df = pd.DataFrame({

'one'

: [

20

, np.nan,

2.5

],

'two'

: [

'january'

,

'february'

,

'march'

],

'three'

: [

True

,

False

,

True

]},index=list(

'abc'

)) table = pa.Table.from_pandas(df)/<code>

Pyarrow表到Pandas數據框

<code>

df_new

= table.to_pandas()/<code>

讀取CSV

<code>

from

pyarrow

import

csv fn =

'data/demo.csv'

table = csv.read_csv(fn) Ω/<code>

從Apache Arrow編寫Parquet文件

<code>

import

pyarrow.parquet

as

pq pq.write_table(table,

'example.parquet'

)/<code>

讀取Parquet文件

<code>

table2

= pq.read_table(

'example.parquet'

) table2/<code>

從parquet文件中讀取一些列

<code>

table2

= pq.read_table(

'example.parquet'

, columns=[

'one'

,

'three'

])/<code>

從分區數據集讀取

<code>dataset = pq.ParquetDataset(

'dataset_name_directory/'

)

table

= dataset.

read

()

table

/<code>

將Parquet文件轉換為Pandas DataFrame

<code>

pdf

= pq.read_pandas(

'example.parquet'

, columns=[

'two'

]).to_pandas() pdf/<code>

避免Pandas指數

<code>

table

= pa.Table.from_pandas(df, preserve_index=False) pq.write_table(

table

,

'example_noindex.parquet'

) t = pq.read_table(

'example_noindex.parquet'

) t.to_pandas()/<code>

檢查元數據

<code>

parquet_file

= pq.ParquetFile(

'example.parquet'

) parquet_file.metadata/<code>

查看數據模式

<code>

parquet_file

.schema

/<code>

時間戳記

請記住,Pandas使用納秒,因此您可以以毫秒為單位截斷兼容性。

<code>pq.write_table(table, 

where

, coerce_timestamps=

'ms'

) pq.write_table(table,

where

, coerce_timestamps=

'ms'

, allow_truncated_timestamps=True)/<code>

壓縮

默認情況下,儘管允許其他編解碼器,但Apache arrow使用快速壓縮(壓縮程度不高,但更易於訪問)。

<code>pq.write_table(table, 

where

, compression=

'snappy'

) pq.write_table(table,

where

, compression=

'gzip'

) pq.write_table(table,

where

, compression=

'brotli'

) pq.write_table(table,

where

, compression=

'none'

)/<code>

另外,在一個表中可以使用多個壓縮

<code>pq.write_table(

table

,

'example_diffcompr.parquet'

, compression={b

'one'

:

'snappy'

, b

'two'

:

'gzip'

})/<code>

編寫分區的Parquet表

<code>df = pd.DataFrame({

'one'

: [

1

,

2.5

,

3

],

'two'

: [

'Peru'

,

'Brasil'

,

'Canada'

],

'three'

: [

True

,

False

,

True

]}, index=

list

(

'abc'

)) table = pa.Table.from_pandas(df) pq.write_to_dataset(table, root_path=

'dataset_name'

,partition_cols=[

'one'

,

'two'

])/<code>

· 兼容性說明:如果您使用pq.write_to_dataset創建一個供HIVE使用的表,則分區列值必須與您正在運行的HIVE版本的允許字符集兼容。


帶有Apache Spark的Apache Arrow

Apache Arrow自2.3版本以來已與Spark集成在一起,它很好地演示瞭如何優化時間以避免序列化和反序列化過程,並與其他庫進行了集成,例如Holden Karau上關於在Spark上加速Tensorflow Apache Arrow的演示。

存在其他有用的文章,例如Brian Cutler發表的文章以及Spark官方文檔中的非常好的示例

Apache Arrow的一些有趣用法是:

· 加快從Pandas數據框到Spark數據框的轉換

· 加快從Spark數據框到Pandas數據框的轉換

· 與Pandas UDF(也稱為矢量化UDF)一起使用

· 使用Apache Spark優化R

第三項是下一篇文章的一部分,因為這是一個非常有趣的主題,目的是在不損失性能的情況下擴展Pandas和Spark之間的集成,對於第四項,我建議您閱讀該文章(於2019年發佈!)以獲得 瞭解更多。

讓我們先測試Pandas和Spark之間的轉換,而不進行任何修改,然後再使用Arrow。

<code>from pyspark.sql import SparkSession
warehouseLocation = 

"/antonio"

spark = SparkSession\ .builder.appName(

"demoMedium"

)\ .config(

"spark.sql.warehouse.dir"

, warehouseLocation)\ .enableHiveSupport()\ .getOrCreate() from pyspark.sql.functions import rand df = spark.range(1 << 22).toDF(

"id"

).withColumn(

"x"

, rand()) df.printSchema() pdf = df.toPandas()spark.conf.set(

"spark.sql.execution.arrow.enabled"

,

"true"

) %time pdf = df.toPandas() pdf.describe()/<code>

結果顯然是使用Arrow減少時間轉換更方便。

通過Apache Spark和Pandas輕鬆介紹Apache Arrow

> Optimizing transformation from Spark Data Frame to Pandas

如果我們需要測試相反的情況(Pandas來激發df),那麼我們也會及時發現優化。

<code>%

time

df = spark.createDataFrame(pdf) spark.conf.set(

"spark.sql.execution.arrow.enabled"

,

"false"

) %

time

df = spark.createDataFrame(pdf) df.describe().show()/<code>
通過Apache Spark和Pandas輕鬆介紹Apache Arrow

結論

本文的目的是發現並瞭解Apache Arrow以及它如何與Apache Spark和Pandas一起使用,我也建議您查看It的官方頁面,以進一步瞭解CUDA或C ++等其他可能的集成,如果您想更深入地瞭解它, 並瞭解有關Apache Spark的更多信息,我認為Spark:權威指南是一本很好的書。

附註:如果您有任何疑問,或者想澄清一些問題,可以在Twitter和LinkedIn上找到我。 我最近發表了Apache Druid的簡要介紹,這是一個新的Apache項目,非常適合分析數十億行。


(本文翻譯自Antonio Cachuan的文章《A gentle introduction to Apache Arrow with Apache Spark and Pandas》,參考:
https://towardsdatascience.com/a-gentle-introduction-to-apache-arrow-with-apache-spark-and-pandas-bb19ffe0ddae)


分享到:


相關文章: