RDD弹性特性

RDD作为弹性分布式数据集,弹性具体体现在

自动进行内存和磁盘数据存储的切换

Spark会优先把数据放到内存中,如果内存放不下,会放到磁盘里面。当应用程序内存不足时,Spark应用程序将数据自动从内存存储切换到磁盘存储,以保证其高效运行。

基于Lineage(血统)的高效容错机制

Lineage是基于Spark RDD的依赖关系来完成的,每个操作只关联其父操作,各个分片的数据之间互不影响,出现错误只需要恢复单个Split的特定部分即可。

常规容错有两种方式:

1、数据检查点

2、记录数据的更新操作

Spark的RDD通过记录数据更新的方式进行容错,主要原因有:RDD是不可变的且Lazy;RDD的写操作是粗粒度的。但是RDD的读既可以是粗粒度的,也可以是细粒度的。

Task如果失败,会自动进行特定次数的重试

默认重试次数为4次。TaskSchedulerImpl是底层任务调度接口TaskScheduler的实现,这些Schedulers从每个Stage中的DAGSchedler中获取TaskSet,运行它们,尝试是否有故障。DAGSchedler是高层调度,计算每个Job的Stage的DAG,然后提交Stage,用TaskSets的形式启动底层TaskScheduler调度在集群中运行。

Task默认重试次数,位于org.apache.spark.internal.config#MAX_TASK_FAILURES

 private[spark] val MAX_TASK_FAILURES =
ConfigBuilder("spark.task.maxFailures")
.intConf
.createWithDefault(4)

TaskSchedulerImpl源码,位于org.apache.spark.scheduler.TaskSchedulerImpl

 private[spark] class TaskSchedulerImpl private[scheduler](
val sc: SparkContext,
val maxTaskFailures: Int,
private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
isLocal: Boolean = false)
extends TaskScheduler with Logging {

import TaskSchedulerImpl._

def this(sc: SparkContext) = {
this(
sc,
sc.conf.get(config.MAX_TASK_FAILURES),
TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
}

Stage如果失败,会自动进行特定次数的重试

Stage对象可以跟踪多个StageInfo。默认重试次数为4次,且可以直接运行计算失败的阶段,只计算失败的数据分片。

Stage是Spark Job运行时具有相同逻辑功能和并行计算任务的一个基本单元。Stage中所有的任务都依赖同样的Shuffle,每个DAG任务通过DAGScheduler在Stage的边界处发生Shuffle形成Stage,然后DAGScheduler运行这些阶段的拓扑排序。

每个Stage都可能是ShuffleMapStage,如果是ShuffleMapStage,则跟踪每个输出节点上输出文件分区,任务结果会输入其他的Stage,或者输入一个ResultStage;如果是ResultStage,这个Stage的任务直接在这个RDD上运行计算这个Spark Action函数。

每个Stage会有firstJobId,度额定第一个提交Stage的Job,使用FIFO调度实,会使其前面的Job先计算或快速恢复。

ShuffleMapStage是DAG产生数据进行Shuffle的中间阶段,发生在每次Shuffle操作之前,可能包含多个Pipelined操作;ResultStage阶段捕获函数在RDD分区上运行Action算子计算结果。

Stage源码,位于org.apache.spark.scheduler.Stage

 private[scheduler] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val firstJobId: Int,
val callSite: CallSite)
extends Logging {

// partition的个数
val numPartitions = rdd.partitions.length

/** Set of jobs that this stage belongs to. */
/** 属于这个工作集的Stage */
val jobIds = new HashSet[Int]

/** The ID to use for the next new attempt for this stage. */
/** 用于此Stage的下一个新attempt的标识ID */
private var nextAttemptId: Int = 0

val name: String = callSite.shortForm
val details: String = callSite.longForm

/**
* Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).
*/
/**
* 最新的[StageInfo] object指针,需要被初始化
* 任何attempts都是被创造出来的,因为DAGScheduler使用StageInfo
* 告诉SparkListeners工作何时开始
*/
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)

/**
* Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
* failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
* multiple tasks from the same stage attempt fail (SPARK-5945).
*/
/**
* 设置stage attempy IDs 当失败是可以读取失败信息
* 跟踪这些失败,为了避免无休止地重复失败
* 跟踪每一次attempt,以便避免记录重复故障
* 如果从同一stage窗体间多任务失败
*/
val fetchFailedAttemptIds = new HashSet[Int]

private[scheduler] def clearFailures() : Unit = {
fetchFailedAttemptIds.clear()
}

/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
/** 在stage中创建一个新的attempt */

def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
val metrics = new TaskMetrics
metrics.register(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
nextAttemptId += 1
}

/** Returns the StageInfo for the most recent attempt for this stage. */
/** 放回当前stage中最新的StageInfo */
def latestInfo: StageInfo = _latestInfo

override final def hashCode(): Int = id

override final def equals(other: Any): Boolean = other match {
case stage: Stage => stage != null && stage.id == id
case _ => false
}

/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
/** 返回需要重新计算的分区标识的序列 */
def findMissingPartitions(): Seq[Int]
}

在Stage终止前允许Stage连续尝试4次,位于org.apache.spark.scheduler.DAGScheduler#maxConsecutiveStageAttempts

 /** 在终止之前允许的连续尝试次数 */
private[scheduler] val maxConsecutiveStageAttempts =
sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
private[spark] object DAGScheduler {
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
// as more failure events come in
/**
* 在毫秒级别,等待读取失败事件后就停止;这是一个避免重新提交任务的简单方法,非读取任务的map中更多失败事件的到来
*/

val RESUBMIT_TIMEOUT = 200

// Number of consecutive stage attempts allowed before a stage is aborted
/** 终止之前允许连续尝试的次数 */
val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
}

checkpoint和persist(检查点和持久化),可主动或被动触发

checkpoint是对RDD进行标记,会产生一些列的文件,且所有所有父依赖都会被删除,是整个依赖的重点。checkpoint是Lazy级别的。persist后RDD工作室每个工作节点都会把计算的分片结果保存在内存或者磁盘上,下一次对相同的RDD进行其他Action计算。就可以重用。

当RDD.iterator()被调用的时候,也就是计算该RDD中某个Partition的时候,会先去cacheManager获取一个blockId,然后去BlockManager里匹配Partition是否被checkpoint了。如果是,就不用计算该Partition,直接产品能够checkPoint中读取该Partition的所有records放入ArrayBuffer里面。如果没有被checkPoint过,将Partition计算出来,然后将其所有records放入到cache中。

总体来说,当RDD会被重复使用时,RDD需要cache。Spark自动监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据,如果想手动删除RDD,可以使用RDD.unpersist()方法。

RDD.iterator源码,位于org.apache.spark.rdd.RDD#iterator

 final def iterator(split: Partition, context: TaskContext): Iterator[T] = { 

// 判断此RDD的持久户登记是为为NONE,不进行持久化
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}

可以用不同的存储级别存储每一个被持久化的RDD。StorageLevel是控制存储RDD的标志,Spark的多个存储级别意味着在内存利用率和CPU利用率间的不同平衡。推荐通过下面的过程选择一个合适的存储级别

  • 如果RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是CPU利用率最高的选项,会使RDD上的操作尽可能地块。
  • 如果不适合用默认级别,就选择MEMOEY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,任然能够快速地访问。
  • 除非算子计算RDD话费较大或者需要过滤大量的数据,不要将RDD存储在磁盘上,否则重复计算一个分区,就会和从磁盘上读取数据一样慢。
  • 如果希望更快地恢复错误,可以利用replicated存储机制,所有的存储级别都可以通过replicated计算丢失的数据来支持完整的容错。另外,replicated的数据能在RDD上继续运行任务,而不需要重复计算丢失的数据。
  • 在拥有大量内存的环境中或者多应用程序的环境中,Off_Heap具有如下优势:Off_Heap运行多个执行者共享的Alluxio中的相同的内存池,限制减少GC,如果当个Executor崩溃,缓存的数据也不会丢失。

StorageLevel源码,位于org.apache.spark.storage.StorageLevel

 val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

数据弹性调度,DAGScheduler、TaskScheduler和资源管理无关

Spark将执行模型丑行为通过的有向无环图(DAG),可以将多个Stage的任务串联或并行执行,从而不需要将Stage中间结果输出到HDFS上,当发生节点运行故障时,可有其他可用节点代替该故障节点运行。

数据分片的高度弹性。

Spark 进行数据分片时,默认将数据放在内存汇总,如果内存放不下,一部分会放在磁盘上保存。

RDD的coalesce算子源码,位于org.apache.spark.rdd.RDD#coalesce

 def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
/** 从随机分区开始,将元素均匀分布在输出分区上 */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]

// include a shuffle step so that our upstream tasks are still distributed
// 包括一个Shuffle步骤,使上游任务仍然是分布式的
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}

如果在计算过程中,产生很多的数据碎片,这是产生的Partition可能会非常小,如果一个Partition非常小,每次都会消耗一个线程取处理,这是可能降低它的处理效率。可以考虑把许多个小的Partition合并成一个较大的Partition处理,会提高效率。


分享到:


相關文章: