作业提交
以count算子为例,count算子是一个Action,会触发Job的运行。count算子源码,位于org.apache.spark.rdd.RDD#count
在RDD的源码中,count方法出发了SparkContext的runJob方法来提交作业,这个提交在内部隐性调用runJob方法进行的,对用户来说不用显式地取提交作业。
对于RDD来说,会根据批次之间的依赖关系形成一个有向无环图(DAG),然后把这个图交给DAGSchedler处理。从源码来看,SparkContext的runJob方法经过几次调用后,进入DAGScheduler的runJob方法,位于org.apache.spark.SparkContext#runJob
在DAGScheduler的runJob方法中,调用submitJob方法继续提交作业,这里会发生阻塞,知道作业完成或失败的结果;在submitJob方法里,创建一个JobWriter对象,并借助内部消息处理把这个对象发送给DAGScheduler的内嵌类DAGSchedulerEventProcessLoop进行处理;最后在DAGSchedulerEventProcessLoop消息接收onReceive方法中,接收到JobSubmitted样例类完成匹配后,继续调用DAGScheduler的handleJobSubmitted方法提交作业,在该方法中进行划分Stage。
DAGScheduler的runJob方法,调用submitJob方法继续提交作业,返回JobWriter对象,并等待任务处理成功或者失败。位于org.apache.spark.scheduler.DAGScheduler#runJob
JobSubmitted
DAGScheduler的submitJob方法中,submitJob首先获取rdd.partitions.length,校验运行的时候partitions是否存在。submitJob的关键代码是向DAGSchedulerEventProcessLoop发送JobSubmitted消息,JobSubmitted是一个case class,而不是一个case object。JobSubmitted的成员finalRDD是最后一个RDD。创建JobWaiter对象,对返回。
DAGScheduler的submitJob方法源码,位于org.apache.spark.scheduler.DAGScheduler#submitJob
由Action导致SparkContext.runJob的执行,最终导致DAGScheduler中的submitJob的执行,其核心是通过发送case class JobSubmitted对象给DAGSchedulerEventProcessLoop。
JobSubmitted 源码,位于org.apache.spark.scheduler.JobSubmitted
JobSubmitted是private[scheduler]级别的,用户不能直接调用。JobSubmitted封装了jobId、最后一个finalRDD、具体对RDD操作的函数func、哪些partitions要进行计算、作业监听器、状态等内容。
DAGSchedulerEventProcessLoop
DAGSchedulerEventProcessLoop继承于EventLoop
EventLoop中开启一个线程eventThread,线程设置成Daemon后台运行的方式;run方法中调用onReceive(event)方法。其中post方法是向事件队列eventQueue中放入一个元素。
EventLoop源码,位于org.apache.spark.util.EventLoop
DAGSchedulerEventProcessLoop接收到JobSubmitted消息后,调用DAGScheduler的handleJobSubmitted方法提交作业,进行阶段划分。
DAGSchedulerEventProcessLoop源码,位于org.apache.spark.scheduler.DAGSchedulerEventProcessLoop
EventLoop里面开启一个线程,线程里面不断循环一个队列,post的时候就是将消息放到队列中,由于消息放到消息队列中,在不断循环,所以可以拿到这个消息,转过来调用onReceive(event),在onReceive处理的时候就调用了doOnReceive方法。
閱讀更多 石頭渣渣 的文章