spark核心構件之Dependency(依賴)

之前的文章說一個spark任務其實就是一系列rdd構成的有向無環圖(dag),今天我們來看看,spark是如何表示rdd之間的依賴關係建立這個dag的。

一、rdd如何構成dag

上篇文章講到了Partition和Partitioner知道了rdd是由一系列分區(partition)組成的,rdd之間的關係主要的其實就是分區之間的關係,也就是子rdd的某個分區數據需要依賴哪些rdd的哪些分區計算得到。

spark將rdd之間的關係抽象成了Dependency這個類,用於連接父子rdd,子rdd持有Dependency對象,Dependency對象裡包含了父rdd。也就是dag的構成就像下面這樣rdd1是rdd2和rdd3的子rdd


spark核心構件之Dependency(依賴)

dag


二、Dependency的定義


spark核心構件之Dependency(依賴)

依賴類圖


abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}

只包含了一個rdd,就是父rdd的對象了 。Dependency有兩個子類就是大家熟悉的款依賴和窄依賴了。

三、窄依賴 NarrowDependency

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}

這是窄依賴的定義,一看就知道它想幹嘛了是吧,就一個getParents函數,給一個子rdd的partitionId輸出所依賴的父rdd的partitionId。我們還能知道代表partition的其實就是一個int值的partitionId。我們還能知道只有通過子rdd的partition才能知道依賴的父rdd的partition,而不能通過父rdd得到子rdd,這就說明rdd得計算方式只能是從子rdd向上遍歷進行計算。

窄依賴有兩個子類一個是OneToOneDependency

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

很明顯就是一一對應,子rdd的一個partitionid就依賴一個父rdd同樣的partitionid

另一個窄依賴的子類RangeDependency只用於union的時候,子rdd會有多個依賴每一個依賴都指向一個父rdd,大家可以先想想如果是你,你會怎麼去實現多個rdd的union。

四、寬依賴 ShuffleDependency

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: product2="" v=""> val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
extends Dependency[Product2[K, V]] {
……
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
}

上面就是ShuffleDependency的定義了,Dependency對象裡面包含父rdd的對象,DAGScheduler在進行stage劃分和task分配的時候就可以通過Dependency獲取shuffleWriter寫數據了。

子rdd持有這個Dependency對象,子rdd就可以通過它獲得shuffle信息拉取上個stage的數據。

五、總結

Dependency是rdd之間的連接,表達了子rdd在計算某個partition的時候應該去哪個rdd的哪個partitions取數據。Dependency又分寬窄依賴,而寬依賴包含了shuffle信息,父rdd通過它寫數據,子rdd通過它獲取數據。

相關文章:

https://www.toutiao.com/i6663784331472798222/

https://www.toutiao.com/i6652848789008679432/

https://www.toutiao.com/i6647711384006033934/

https://www.toutiao.com/i6647711384006033934/

關注眾公號(曾二爺) 和優秀的人一起學習。


分享到:


相關文章: