RDD彈性特性

RDD作為彈性分佈式數據集,彈性具體體現在

自動進行內存和磁盤數據存儲的切換

Spark會優先把數據放到內存中,如果內存放不下,會放到磁盤裡面。當應用程序內存不足時,Spark應用程序將數據自動從內存存儲切換到磁盤存儲,以保證其高效運行。

基於Lineage(血統)的高效容錯機制

Lineage是基於Spark RDD的依賴關係來完成的,每個操作只關聯其父操作,各個分片的數據之間互不影響,出現錯誤只需要恢復單個Split的特定部分即可。

常規容錯有兩種方式:

1、數據檢查點

2、記錄數據的更新操作

Spark的RDD通過記錄數據更新的方式進行容錯,主要原因有:RDD是不可變的且Lazy;RDD的寫操作是粗粒度的。但是RDD的讀既可以是粗粒度的,也可以是細粒度的。

Task如果失敗,會自動進行特定次數的重試

默認重試次數為4次。TaskSchedulerImpl是底層任務調度接口TaskScheduler的實現,這些Schedulers從每個Stage中的DAGSchedler中獲取TaskSet,運行它們,嘗試是否有故障。DAGSchedler是高層調度,計算每個Job的Stage的DAG,然後提交Stage,用TaskSets的形式啟動底層TaskScheduler調度在集群中運行。

Task默認重試次數,位於org.apache.spark.internal.config#MAX_TASK_FAILURES

 private[spark] val MAX_TASK_FAILURES =
ConfigBuilder("spark.task.maxFailures")
.intConf
.createWithDefault(4)

TaskSchedulerImpl源碼,位於org.apache.spark.scheduler.TaskSchedulerImpl

 private[spark] class TaskSchedulerImpl private[scheduler](
val sc: SparkContext,
val maxTaskFailures: Int,
private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
isLocal: Boolean = false)
extends TaskScheduler with Logging {

import TaskSchedulerImpl._

def this(sc: SparkContext) = {
this(
sc,
sc.conf.get(config.MAX_TASK_FAILURES),
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
}

Stage如果失敗,會自動進行特定次數的重試

Stage對象可以跟蹤多個StageInfo。默認重試次數為4次,且可以直接運行計算失敗的階段,只計算失敗的數據分片。

Stage是Spark Job運行時具有相同邏輯功能和並行計算任務的一個基本單元。Stage中所有的任務都依賴同樣的Shuffle,每個DAG任務通過DAGScheduler在Stage的邊界處發生Shuffle形成Stage,然後DAGScheduler運行這些階段的拓撲排序。

每個Stage都可能是ShuffleMapStage,如果是ShuffleMapStage,則跟蹤每個輸出節點上輸出文件分區,任務結果會輸入其他的Stage,或者輸入一個ResultStage;如果是ResultStage,這個Stage的任務直接在這個RDD上運行計算這個Spark Action函數。

每個Stage會有firstJobId,度額定第一個提交Stage的Job,使用FIFO調度實,會使其前面的Job先計算或快速恢復。

ShuffleMapStage是DAG產生數據進行Shuffle的中間階段,發生在每次Shuffle操作之前,可能包含多個Pipelined操作;ResultStage階段捕獲函數在RDD分區上運行Action算子計算結果。

Stage源碼,位於org.apache.spark.scheduler.Stage

 private[scheduler] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val firstJobId: Int,
val callSite: CallSite)
extends Logging {

// partition的個數
val numPartitions = rdd.partitions.length

/** Set of jobs that this stage belongs to. */
/** 屬於這個工作集的Stage */
val jobIds = new HashSet[Int]

/** The ID to use for the next new attempt for this stage. */
/** 用於此Stage的下一個新attempt的標識ID */
private var nextAttemptId: Int = 0

val name: String = callSite.shortForm
val details: String = callSite.longForm

/**
* Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).
*/
/**
* 最新的[StageInfo] object指針,需要被初始化
* 任何attempts都是被創造出來的,因為DAGScheduler使用StageInfo
* 告訴SparkListeners工作何時開始
*/
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)

/**
* Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
* failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
* multiple tasks from the same stage attempt fail (SPARK-5945).
*/
/**
* 設置stage attempy IDs 當失敗是可以讀取失敗信息
* 跟蹤這些失敗,為了避免無休止地重複失敗
* 跟蹤每一次attempt,以便避免記錄重複故障
* 如果從同一stage窗體間多任務失敗
*/
val fetchFailedAttemptIds = new HashSet[Int]

private[scheduler] def clearFailures() : Unit = {
fetchFailedAttemptIds.clear()
}

/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
/** 在stage中創建一個新的attempt */

def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
val metrics = new TaskMetrics
metrics.register(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
nextAttemptId += 1
}

/** Returns the StageInfo for the most recent attempt for this stage. */
/** 放回當前stage中最新的StageInfo */
def latestInfo: StageInfo = _latestInfo

override final def hashCode(): Int = id

override final def equals(other: Any): Boolean = other match {
case stage: Stage => stage != null && stage.id == id
case _ => false
}

/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
/** 返回需要重新計算的分區標識的序列 */
def findMissingPartitions(): Seq[Int]
}

在Stage終止前允許Stage連續嘗試4次,位於org.apache.spark.scheduler.DAGScheduler#maxConsecutiveStageAttempts

 /** 在終止之前允許的連續嘗試次數 */
private[scheduler] val maxConsecutiveStageAttempts =
sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
private[spark] object DAGScheduler {
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
// as more failure events come in
/**
* 在毫秒級別,等待讀取失敗事件後就停止;這是一個避免重新提交任務的簡單方法,非讀取任務的map中更多失敗事件的到來
*/

val RESUBMIT_TIMEOUT = 200

// Number of consecutive stage attempts allowed before a stage is aborted
/** 終止之前允許連續嘗試的次數 */
val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
}

checkpoint和persist(檢查點和持久化),可主動或被動觸發

checkpoint是對RDD進行標記,會產生一些列的文件,且所有所有父依賴都會被刪除,是整個依賴的重點。checkpoint是Lazy級別的。persist後RDD工作室每個工作節點都會把計算的分片結果保存在內存或者磁盤上,下一次對相同的RDD進行其他Action計算。就可以重用。

當RDD.iterator()被調用的時候,也就是計算該RDD中某個Partition的時候,會先去cacheManager獲取一個blockId,然後去BlockManager裡匹配Partition是否被checkpoint了。如果是,就不用計算該Partition,直接產品能夠checkPoint中讀取該Partition的所有records放入ArrayBuffer裡面。如果沒有被checkPoint過,將Partition計算出來,然後將其所有records放入到cache中。

總體來說,當RDD會被重複使用時,RDD需要cache。Spark自動監控每個節點緩存的使用情況,利用最近最少使用原則刪除老舊的數據,如果想手動刪除RDD,可以使用RDD.unpersist()方法。

RDD.iterator源碼,位於org.apache.spark.rdd.RDD#iterator

 final def iterator(split: Partition, context: TaskContext): Iterator[T] = { 

// 判斷此RDD的持久戶登記是為為NONE,不進行持久化
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}

可以用不同的存儲級別存儲每一個被持久化的RDD。StorageLevel是控制存儲RDD的標誌,Spark的多個存儲級別意味著在內存利用率和CPU利用率間的不同平衡。推薦通過下面的過程選擇一個合適的存儲級別

  • 如果RDD適合默認的存儲級別(MEMORY_ONLY),就選擇默認的存儲級別。因為這是CPU利用率最高的選項,會使RDD上的操作儘可能地塊。
  • 如果不適合用默認級別,就選擇MEMOEY_ONLY_SER。選擇一個更快的序列化庫提高對象的空間使用率,任然能夠快速地訪問。
  • 除非算子計算RDD話費較大或者需要過濾大量的數據,不要將RDD存儲在磁盤上,否則重複計算一個分區,就會和從磁盤上讀取數據一樣慢。
  • 如果希望更快地恢復錯誤,可以利用replicated存儲機制,所有的存儲級別都可以通過replicated計算丟失的數據來支持完整的容錯。另外,replicated的數據能在RDD上繼續運行任務,而不需要重複計算丟失的數據。
  • 在擁有大量內存的環境中或者多應用程序的環境中,Off_Heap具有如下優勢:Off_Heap運行多個執行者共享的Alluxio中的相同的內存池,限制減少GC,如果當個Executor崩潰,緩存的數據也不會丟失。

StorageLevel源碼,位於org.apache.spark.storage.StorageLevel

 val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

數據彈性調度,DAGScheduler、TaskScheduler和資源管理無關

Spark將執行模型醜行為通過的有向無環圖(DAG),可以將多個Stage的任務串聯或並行執行,從而不需要將Stage中間結果輸出到HDFS上,當發生節點運行故障時,可有其他可用節點代替該故障節點運行。

數據分片的高度彈性。

Spark 進行數據分片時,默認將數據放在內存彙總,如果內存放不下,一部分會放在磁盤上保存。

RDD的coalesce算子源碼,位於org.apache.spark.rdd.RDD#coalesce

 def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
/** 從隨機分區開始,將元素均勻分佈在輸出分區上 */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]

// include a shuffle step so that our upstream tasks are still distributed
// 包括一個Shuffle步驟,使上游任務仍然是分佈式的
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}

如果在計算過程中,產生很多的數據碎片,這是產生的Partition可能會非常小,如果一個Partition非常小,每次都會消耗一個線程取處理,這是可能降低它的處理效率。可以考慮把許多個小的Partition合併成一個較大的Partition處理,會提高效率。


分享到:


相關文章: