DAGScheduler-作业提交

作业提交

以count算子为例,count算子是一个Action,会触发Job的运行。count算子源码,位于org.apache.spark.rdd.RDD#count

在RDD的源码中,count方法出发了SparkContext的runJob方法来提交作业,这个提交在内部隐性调用runJob方法进行的,对用户来说不用显式地取提交作业。

对于RDD来说,会根据批次之间的依赖关系形成一个有向无环图(DAG),然后把这个图交给DAGSchedler处理。从源码来看,SparkContext的runJob方法经过几次调用后,进入DAGScheduler的runJob方法,位于org.apache.spark.SparkContext#runJob

在DAGScheduler的runJob方法中,调用submitJob方法继续提交作业,这里会发生阻塞,知道作业完成或失败的结果;在submitJob方法里,创建一个JobWriter对象,并借助内部消息处理把这个对象发送给DAGScheduler的内嵌类DAGSchedulerEventProcessLoop进行处理;最后在DAGSchedulerEventProcessLoop消息接收onReceive方法中,接收到JobSubmitted样例类完成匹配后,继续调用DAGScheduler的handleJobSubmitted方法提交作业,在该方法中进行划分Stage。

DAGScheduler的runJob方法,调用submitJob方法继续提交作业,返回JobWriter对象,并等待任务处理成功或者失败。位于org.apache.spark.scheduler.DAGScheduler#runJob

JobSubmitted

DAGScheduler的submitJob方法中,submitJob首先获取rdd.partitions.length,校验运行的时候partitions是否存在。submitJob的关键代码是向DAGSchedulerEventProcessLoop发送JobSubmitted消息,JobSubmitted是一个case class,而不是一个case object。JobSubmitted的成员finalRDD是最后一个RDD。创建JobWaiter对象,对返回。

DAGScheduler的submitJob方法源码,位于org.apache.spark.scheduler.DAGScheduler#submitJob

由Action导致SparkContext.runJob的执行,最终导致DAGScheduler中的submitJob的执行,其核心是通过发送case class JobSubmitted对象给DAGSchedulerEventProcessLoop。

JobSubmitted 源码,位于org.apache.spark.scheduler.JobSubmitted

JobSubmitted是private[scheduler]级别的,用户不能直接调用。JobSubmitted封装了jobId、最后一个finalRDD、具体对RDD操作的函数func、哪些partitions要进行计算、作业监听器、状态等内容。

DAGSchedulerEventProcessLoop

DAGSchedulerEventProcessLoop继承于EventLoop

EventLoop中开启一个线程eventThread,线程设置成Daemon后台运行的方式;run方法中调用onReceive(event)方法。其中post方法是向事件队列eventQueue中放入一个元素。

EventLoop源码,位于org.apache.spark.util.EventLoop

DAGSchedulerEventProcessLoop接收到JobSubmitted消息后,调用DAGScheduler的handleJobSubmitted方法提交作业,进行阶段划分。

DAGSchedulerEventProcessLoop源码,位于org.apache.spark.scheduler.DAGSchedulerEventProcessLoop

EventLoop里面开启一个线程,线程里面不断循环一个队列,post的时候就是将消息放到队列中,由于消息放到消息队列中,在不断循环,所以可以拿到这个消息,转过来调用onReceive(event),在onReceive处理的时候就调用了doOnReceive方法。


分享到:


相關文章: