在以如此惊人的速度生成数据的世界中,在正确的时间对数据进行正确分析非常有用。实时处理大数据并执行分析的最令人惊奇的框架之一是Apache Spark,如果我们谈论现在用于处理复杂数据分析和数据修改任务的编程语言,我相信Python会超越这个图表。所以在这个
PySpark教程中,我将讨论以下主题:什么是PySpark?
PySpark在业界
为什么选择Python?
Spark RDDs
使用PySpark进行机器学习
PySpark教程:什么是PySpark?
Apache Spark是一个快速的集群计算框架,用于处理,查询和分析大数据。基于内存计算,它具有优于其他几个大数据框架的优势。
开源社区最初是用Scala编程语言编写的,它开发了一个支持Apache Spark的神奇工具。PySpark通过其库Py4j帮助数据科学家与Apache Spark和Python中的RDD进行交互。有许多功能使PySpark成为比其他更好的框架:
- 速度:比传统的大规模数据处理框架快100倍。
- 强大的缓存:简单的编程层提供强大的缓存和磁盘持久性功能。
- 部署:可以通过Mesos,Hadoop通过Yarn或Spark自己的集群管理器进行部署。
- 实时:由于内存计算,实时计算和低延迟。
- Polyglot: 支持Scala,Java,Python和R编程。
PySpark在业界
让我们继续我们的PySpark教程,看看Spark在业界的使用位置。
每个行业都围绕大数据展开,而大数据则涉及分析。那么让我们来看看使用Apache Spark的各个行业。
Media是向在线流媒体发展的最大行业之一。Netflix使用Apache Spark进行实时流处理,为其客户提供个性化的在线推荐。它每天处理4500亿个事件,流向服务器端应用程序。
财务是Apache Spark的实时处理发挥重要作用的另一个领域。银行正在使用Spark访问和分析社交媒体资料,以获取洞察力,从而帮助他们为
信用风险评估,有针对性的广告和客户细分做出正确的业务决策。使用Spark还可以减少客户流失。欺诈检测是涉及Spark的最广泛使用的机器学习领域之一。医疗保健提供商正在使用Apache Spark来分析患者记录以及过去的临床数据,以确定哪些患者在从诊所出院后可能面临健康问题。Apache Spark用于基因组测序,以减少处理基因组数据所需的时间。
零售和电子商务是一个人们无法想象它在没有使用分析和有针对性的广告的情况下运行的行业。作为当今最大的电子商务平台之一,Alibabaruns是世界上一些最大的Spark职位,用于分析数PB的数据。阿里巴巴在图像数据中执行特征提取。易趣使用Apache Spark提供有针对性的优惠,增强客户体验并优化整体性能。
旅游业也使用Apache Spark。TripAdvisor是一家帮助用户计划完美旅行的领先旅游网站,它正在使用Apache Spark来加速其个性化的客户推荐。TripAdvisor使用Apache Spark通过比较数百个网站为数百万旅客提供建议,以便为其客户找到最佳的酒店价格。
这个PySpark教程的一个重要方面是理解为什么我们需要使用Python。为什么不使用Java,Scala或R?
易于学习: 对于程序员来说,Python因其语法和标准库而相对容易学习。而且,它是一种动态类型语言,这意味着RDD可以保存多种类型的对象。
大量的库: Scala没有足够的数据科学工具和Python,如机器学习和自然语言处理。此外,Scala缺乏良好的可视化和本地数据转换。
巨大的社区支持: Python拥有一个全球社区,拥有数百万开发人员,可在数千个虚拟和物理位置进行在线和离线交互。
Spark RDDs
当涉及到迭代分布式计算,即在计算中处理多个作业的数据时,我们需要在多个作业之间重用或共享数据。像Hadoop这样的早期框架在处理多个操作/作业时遇到了问题:
- 将数据存储在HDFS等中间存储中。
- 多个I / O作业使计算变慢。
- 复制和序列化反过来使进程更慢。
RDD尝试通过启用容错分布式内存计算来解决所有问题。RDD是弹性分布式数据集的缩写。RDD是一种分布式内存抽象,它允许程序员以容错的方式在大型集群上执行内存计算。它们是在一组计算机上分区的对象的只读集合,如果分区丢失,可以重建这些对象。在RDD上执行了几个操作:
- 转换:转换从现有数据集创建新数据集。懒惰的评价。
- 操作:仅当在RDD上调用操作时, Spark才会强制执行计算。
让我们理解一些转换,动作和函数。
读取文件并显示前n个元素:
rdd = sc.textFile("file:///home/edureka/Desktop/Sample")
rdd.take(n)
转换为小写和拆分:(降低和拆分)
def Func(lines):
lines = lines.lower()
lines = lines.split()
return lines
rdd1 = rdd.map(Func)
rdd1.take(5)
删除StopWords :(过滤器)
stop_words = ['a','all','the','as','is','am','an','and','be','been','from','had','I','I'd','why','with']
rdd2 = rdd1.filter(lambda z: z not in stop_words)
rdd2.take(10)
数字总和从1到500 :(减少)
sum_rdd = sc.parallelize(range(1,500))
sum_rdd.reduce(lambda x,y: x+y)
124750
使用PySpark进行机器学习
继续我们的PySpark教程,让我们分析一些篮球数据并进行一些预测。所以,在这里我们将使用自1980年以来NBA所有球员的数据[引入3指针的年份]。
df = spark.read.option('header','true')\
.option('inferSchema','true')
.csv("file:///home/edureka/Downloads/season_totals.csv")
print(df.columns)
排序玩家(OrderBy)和 toPandas:
在这里,我们根据一个赛季得分来排序球员。
df.orderBy('pts',ascending = False).limit(10).toPandas()[['yr','player','age','pts','fg3']]
使用DSL和matplotlib:
在这里,我们分析了每个赛季
3次尝试的平均次数,在36分钟 的时间限制内[对应于足够休息的近似完整的NBA比赛的间隔]。我们使用3点射门次数(fg3a)和分钟数(mp)来计算此指标,然后使用matlplotlib。来自 pyspark。sql。函数 import col
from pyspark.sql.functions import col
fga_py = df.groupBy('yr')\
.agg({'mp' : 'sum', 'fg3a' : 'sum'})
.select(col('yr'), (36*col('sum(fg3a)')/col('sum(mp)')).alias('fg3a_p36m'))\
.orderBy('yr')
from matplotlib import pyplot as plt
import seaborn as sns
plt.style.use('fivethirtyeight')
_df = fga_py.toPandas()
plt.plot(_df.yr,_df.fg3a_p36m, color = '#CD5C5C')
plt.xlabel('Year')
_=plt.title('Player average 3-point attempts (per 36 minutes)')
plt.annotate('3 pointer introduced', xy=(1980, .5), xytext=(1981, 1.1), fontsize = 9,
arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
plt.annotate('NBA moved in 3-point line', xy=(1996, 2.4), xytext=(1991.5, 2.7), fontsize = 9,
arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
plt.annotate('NBA moved back\n3-point line', xy=(1998, 2.), xytext=(1998.5, 2.4), fontsize = 9, arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
线性回归和VectorAssembler:
我们可以在此曲线上拟合线性回归模型,以模拟未来5年的射击次数。我们必须使用VectorAssembler 函数将数据转换 为单个列。这是一个必要条件为在MLlib线性回归API。
来自 pyspark。毫升。功能 导入 VectorAssembler
from pyspark.ml.feature import VectorAssembler
t = VectorAssembler(inputCols=['yr'], outputCol = 'features')
training = t.transform(fga_py)\
.withColumn('yr',fga_py.yr)\
.withColumn('label',fga_py.fg3a_p36m)
training.toPandas().head()
然后,我们使用转换后的数据构建线性回归模型对象。
来自 pyspark。毫升。回归 导入 LinearRegression
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10)
model = lr.fit(training)
将训练模型应用于数据集:
我们将训练有素的模型对象模型应用于我们的原始训练集以及5年的未来数据:
from pyspark.sql.types import Row
# apply model for the 1979-80 season thru 2020-21 season
training_yrs = training.select('yr').rdd.map(lambda x: x[0]).collect()
training_y = training.select('fg3a_p36m').rdd.map(lambda x: x[0]).collect()
prediction_yrs = [2017, 2018, 2019, 2020, 2021]
all_yrs = training_yrs + prediction_yrs
# built testing DataFrame
test_rdd = sc.parallelize(all_yrs)
row = Row('yr')<
all_years_features = t.transform(test_rdd.map(row).toDF())
# apply linear regression model
df_results = model.transform(all_years_features).toPandas()
绘制最终预测:
然后,我们可以绘制结果并将图表保存在指定位置。
plt.plot(df_results.yr,df_results.prediction, linewidth = 2, linestyle = '--',color = '#224df7', label = 'L2 Fit')
plt.plot(training_yrs, training_y, color = '#f08080', label = None)
plt.xlabel('Year')
plt.ylabel('Number of attempts')
plt.legend(loc = 4)
_=plt.title('Player average 3-point attempts (per 36 minutes)')
plt.tight_layout()
plt.savefig("/home/edureka/Downloads/Images/REGRESSION.png")
而且,通过这个图,我们来到这个PySpark教程的末尾。
伙计们,这就是它!
我希望你们知道PySpark是什么,为什么Python最适合Spark,RDD和Pyspark机器学习的一瞥。恭喜,您不再是PySpark的新手了。
閱讀更多 故事你真的zai聽嗎 的文章