Spark任務優化-checkpoint使用

Spark任務優化-checkpoint使用

下面結合persist來討論checkpoint。

1.什麼樣的 RDD 需要 checkpoint

運算時間很長或運算量太大才能得到的 RDD,computing chain 過長或依賴其他 RDD 很多的 RDD。 實際上,將 ShuffleMapTask 的輸出結果存放到本地磁盤也算是 checkpoint,只不過這個 checkpoint 的主要目的是去 partition 輸出數據。

2.什麼時候 checkpoint

cache 機制是每計算出一個要 cache 的 partition 就直接將其 cache 到內存了。但 checkpoint 沒有使用這種第一次計算得到就存儲的方法,而是等到 job 結束後另外啟動專門的 job 去完成 checkpoint 。也就是說需要 checkpoint 的 RDD 會被計算兩次。因此,在使用 rdd.checkpoint() 的時候,建議加上 rdd.cache(),這樣第二次運行的 job 就不用再去計算該 rdd 了,直接讀取 cache 寫磁盤。其實 Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 這樣的方法,相當於 cache 到磁盤上,這樣可以做到 rdd 第一次被計算得到時就存儲到磁盤上,但這個 persist 和 checkpoint 有很多不同,之後會討論。

3.checkpoint 怎麼實現?

RDD 需要經過 [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ] 這幾個階段才能被 checkpoint。

Initialized: 首先 driver program 需要使用 rdd.checkpoint() 去設定哪些 rdd 需要 checkpoint,設定後,該 rdd 就接受 RDDCheckpointData 管理。用戶還要設定 checkpoint 的存儲路徑,一般在 HDFS 上。

marked for checkpointing:初始化後,RDDCheckpointData 會將 rdd 標記為 MarkedForCheckpoint。

checkpointing in progress:每個 job 運行結束後會調用 finalRdd.doCheckpoint(),finalRdd 會順著 computing chain 回溯掃描,碰到要 checkpoint 的 RDD 就將其標記為 CheckpointingInProgress,然後將寫磁盤(比如寫 HDFS)需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 節點上的 blockManager。完成以後,啟動一個 job 來完成 checkpoint(使用 rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf)))。

checkpointed:job 完成 checkpoint 後,將該 rdd 的 dependency 全部清掉,並設定該 rdd 狀態為 checkpointed。然後,為該 rdd 強加一個依賴,設置該 rdd 的 parent rdd 為 CheckpointRDD,該 CheckpointRDD 負責以後讀取在文件系統上的 checkpoint 文件,生成該 rdd 的 partition。

有意思的是我在 driver program 裡 checkpoint 了兩個 rdd,結果只有一個(下面的 result)被 checkpoint 成功,pairs2 沒有被 checkpoint,也不知道是 bug 還是故意只 checkpoint 下游的 RDD:

val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'),

(4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))

val pairs1 = sc.parallelize(data1, 3)

val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))

val pairs2 = sc.parallelize(data2, 2)

pairs2.checkpoint

val result = pairs1.join(pairs2)

result.checkpoint

4.怎麼讀取 checkpoint 過的 RDD

在 runJob() 的時候會先調用 finalRDD 的 partitions() 來確定最後會有多個 task。rdd.partitions() 會去檢查(通過 RDDCheckpointData 去檢查,因為它負責管理被 checkpoint 過的 rdd)該 rdd 是會否被 checkpoint 過了,如果該 rdd 已經被 checkpoint 過了,直接返回該 rdd 的 partitions 也就是 Array[Partition]。

當調用 rdd.iterator() 去計算該 rdd 的 partition 的時候,會調用 computeOrReadCheckpoint(split: Partition) 去查看該 rdd 是否被 checkpoint 過了,如果是,就調用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),CheckpointRDD 負責讀取文件系統上的文件,生成該 rdd 的 partition。這就解釋了為什麼那麼 trickly 地為 checkpointed rdd 添加一個 parent CheckpointRDD。

5.cache 與 checkpoint 的區別

There is a significant difference between cache and checkpoint. Cache materializes the RDD and keeps it in memory and/or disk(其實只有 memory). But the lineage(也就是 computing chain) of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. However, checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This is allows long lineages to be truncated and the data to be saved reliably in HDFS (which is naturally fault tolerant by replication).


分享到:


相關文章: