原理 Spark是一個極為優秀的大數據框架,在大數據批處理上基本無人能敵,流處理上也有一席之地,機器學習則是當前正火熱AI人工智能的驅動引擎,在大數據場景下如何發揮AI技術成為優秀的大數據挖掘工程師必備技能。
本文采用的組件版本為:Ubuntu 19.10、Jdk 1.8.0_241、Scala 2.11.12、Hadoop 3.2.1、Spark 2.4.5,老規矩先開啟一系列Hadoop、Spark服務與Spark-shell窗口:
1.LDA原理介紹
隱式狄利克雷分佈(LDA) 是一種主題模型,可以從文本文檔集合中推斷出主題。可以將LDA視為一種聚類算法,如下所示:
- 主題對應於聚類中心,文檔對應於數據集中的示例(行)。
- 主題和文檔都存在於特徵空間中,其中特徵向量是字數(詞袋)的向量。
- LDA不會使用傳統的距離來估計聚類,而是使用基於文本文件生成方式的統計模型的功能。
DA涉及到的先驗知識有:二項分佈、Gamma函數、Beta分佈、多項分佈、Dirichlet分佈、馬爾科夫鏈、MCMC、Gibs Sampling、EM算法等。限於篇幅,本文不設計具體理論推導,具體可參考知乎深度文章:https://zhuanlan.zhihu.com/p/31470216
2.LDA參數
LDA通過setOptimizer功能支持不同的推理算法。EMLDAOptimizer使用似然函數的期望最大化學習聚類併產生綜合結果,同時 OnlineLDAOptimizer使用迭代小批量採樣進行在線變異推斷 ,並且通常對內存友好。
LDA將文檔集合作為單詞計數和以下參數(使用構建器模式設置)的向量:
- k:主題數(即群集中心)
- optimizer:用於學習LDA模型的優化程序, EMLDAOptimizer或者OnlineLDAOptimizer
- docConcentration:Dirichlet參數,用於優先於主題的文檔分佈。較大的值鼓勵更平滑的推斷分佈。
- topicConcentration:Dirichlet參數,用於表示主題(單詞)在先主題的分佈。較大的值鼓勵更平滑的推斷分佈。
- maxIterations:限制迭代次數。
- checkpointInterval:如果使用檢查點(在Spark配置中設置),則此參數指定創建檢查點的頻率。如果maxIterations很大,使用檢查點可以幫助減少磁盤上的隨機文件大小,並有助於故障恢復。
所有spark.mllib的LDA模型都支持:
- describeTopics:以最重要的術語和術語權重的數組形式返回主題
- topicsMatrix:返回一個vocabSize由k矩陣,其中各列是一個主題
期望最大化在EMLDAOptimizer和DistributedLDAModel中實現。對於提供給LDA的參數:
- docConcentration:僅支持對稱先驗,因此提供的k維向量中的所有值都必須相同。所有值也必須> 1.0。提供Vector(-1)會導致默認行為(值(50 / k)+1的統一k維向量)
- topicConcentration:僅支持對稱先驗。值必須> 1.0。提供-1會導致默認值為0.1 + 1。
- maxIterations:EM迭代的最大數量。
- 注意:進行足夠的迭代很重要。在早期的迭代中,EM通常沒有用的主題,但是經過更多的迭代後,這些主題會大大改善。根據您的數據集,通常至少合理使用20次甚至50-100次迭代。
- EMLDAOptimizer生成一個DistributedLDAModel,它不僅存儲推斷出的主題,還存儲完整的訓練語料庫和訓練語料庫中每個文檔的主題分佈。DistributedLDAModel支持:
- topTopicsPerDocument:訓練語料庫中每個文檔的主要主題及其權重
- topDocumentsPerTopic:每個主題的頂部文檔以及該主題在文檔中的相應權重。
- logPrior:給定超參數docConcentration和topicConcentration時,估計主題和文檔主題分佈的對數概率
- logLikelihood:給定推斷的主題和文檔主題分佈,訓練語料庫的對數可能性
3.Spark示例
在以下示例中,我們加載代表文檔語料庫的單詞計數向量。然後,我們使用LDA從文檔中推斷出三個主題。所需聚類的數量傳遞給算法。然後,我們輸出主題,表示為單詞上的概率分佈。
<code>import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA} import org.apache.spark.mllib.linalg.Vectors // 加載和解析數據 val data = sc.textFile("data/mllib/sample_lda_data.txt") val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))) // 用唯一ID索引文章 val corpus = parsedData.zipWithIndex.map(_.swap).cache() // 使用LDA將文章聚類為3類主題 val ldaModel = new LDA().setK(3).run(corpus) // 輸出主題。每個主題都是單詞分佈(匹配單詞向量) println(s"Learned topics (as distributions over vocab of ${ldaModel.vocabSize} words):") val topics = ldaModel.topicsMatrix for (topic /<code>
4.源碼解析
以上代碼主要做了兩件事:加載和切分數據、訓練模型。在樣本數據中,每一行代表一篇文檔,經過處理後,corpus的類型為List((id,vector)*),一個(id,vector)代表一篇文檔。將處理後的數據傳給org.apache.spark.mllib.clustering.LDA類的run方法, 就可以開始訓練模型。run方法的代碼如下所示:
<code>def run(documents: RDD[(Long, Vector)]): LDAModel = { val state = ldaOptimizer.initialize(documents, this) var iter = 0 val iterationTimes = Array.fill[Double](maxIterations)(0) while (iter < maxIterations) { val start = System.nanoTime() state.next() val elapsedSeconds = (System.nanoTime() - start) / 1e9 iterationTimes(iter) = elapsedSeconds iter += 1 } state.getLDAModel(iterationTimes) }/<code>
這段代碼首先調用initialize方法初始化狀態信息,然後循環迭代調用next方法直到滿足最大的迭代次數。在我們沒有指定的情況下,迭代次數默認為20。需要注意的是, ldaOptimizer有兩個具體的實現類EMLDAOptimizer和OnlineLDAOptimizer,它們分別表示使用EM算法和在線學習算法實現參數估計。在未指定的情況下,默認使用EMLDAOptimizer。
Spark kmeans族的聚類算法的內容至此結束,有關Spark的基礎文章可參考前文:
阿里是怎麼做大數據的?淘寶怎麼能承載雙11?大數據之眸告訴你
Spark分佈式機器學習源碼分析:如何用分佈式集群構建線性模型?
高頻面經總結:最全大數據+AI方向面試100題(附答案詳解)
Spark分佈式機器學習系列:一文帶你理解並實戰樸素貝葉斯!
Spark分佈式機器學習系列:一文帶你理解並實戰決策樹模型!
Spark分佈式機器學習系列:一文帶你理解並實戰集成樹模型!
參考鏈接:
http://spark.apache.org/docs/latest/mllib-clustering.html
https://github.com/endymecy/spark-ml-source-analysis