SparkContext 初始化内部原理

刘彬同学准备写一系列spark实战系列,本文是第一篇,SparkContext初始化内部原理!赞!推荐给大家,

希望大家喜欢和支持!

如果编写Spark程序,那么第⼀⾏代码就是new SparkContext().setMaster(“”).setAppName(“xx”),可以说SparkContext是整个Spark 计算的启动器,只有将sparkContext 启动起来,后续的关于调 度,存储,计算相关的操作才能够运⾏,本⽂基于spark2.x源码概述关于SparkContext⾥⾯所包含的启动项都有哪些以及这些启动项的作⽤是什么,之后在说⼀下关于SparkEnv环境创建的过程。

阅读本⽂最好打开spark源码参考着看,可以在git上⾯打开spark关于sparkContext的代码,地址为:https://github.com/apache/spark/blob/c5f9b89dda40ffaa4622a7ba2b3d0605dbe815c0/core/src/main/scala/org/apache/spark/SparkContext.scala#L73

01 SparkContext内部组件:

如图为SparkContext内部的⼀些组件:

SparkContext 初始化内部原理

  • SparkEnv :Spark运⾏时环境,Spark 中任务执⾏是通过Executor,所有的Executor都有⾃⼰的执⾏环境SparkEnv,在Driver中也包含了SparkEnv,为了保证Local模式的运⾏,SparkEnv内部还提 供了不同的组件,来实现不同的功能

  • LiveListenerBus:SparkContext中的事件总线,可以接收各个使⽤者的事件,异步将SparkListenerevent传递给注册的SparkListener

  • Spark UI :Spark的⽤户界⾯,SparkUI间接依赖于计算引擎,调度引擎,存储引擎,Job,Stage,Executor等组件的监控都会以SparkListenerEvent的形式传递给LiveListenerBus,SparkUI将从各 个SparkListener中读取数据并显⽰在web界⾯

  • SparkStatusTracker:⽤于监控作业和Stage进度状态的低级API

  • ConsoleProgressBar :定期从sc.statusTracker获得active stage的状态信息,展⽰到进度条[在SparkUI上⾯可以看到进度条],会有⼀定的延时。内部有⼀个timer 500ms refresh⼀遍

  • DAGScheduler:DAG调度器,是Spark调度系统中重要的组件之⼀,负责创建Job,将DAG的RDD划分到不同的Stage,提交stage等,SparkUI中有关Job和Stage监控数据都来⾃DAGScheduer

  • TaskScheduler:Task调度器,是Spark调度系统中重要的组件之⼀,负责将任务发送到集群,运⾏,如果有失败的任务则重新执⾏,之后返回给DAGScheduler,TaskScheduler调度的Task是由 DAGScheduler创建的,所以DAGScheduler是TaskScheduler前置调度。

  • HeatbeatReceiver:⼼跳接收器,所有的Executor都会向HeatbeatReceiver发送⼼跳信息,HeatbeatReceiver接收到⼼跳之后,先更新Executor最后可⻅时间,然后将此信息交给TaskScheduler。

  • ContextCleaner:异步清理RDD、shuffle和⼲播状态信息

  • EventLoggingListener:将事件持久化到存储的监听器,是SparkContext的可选组件,当spark.eventLog.enable

  • ExecutorAllocationManager: Executor动态分配管理器,根据⼯作负载动态调整Executor数量,当在配置spark.dynamicAlloction.enabled属性为true的情况下,在⾮local模式下或者 spark.dynamicAllcation.testing属性为true时启⽤

  • ShutdownHookManager:设置关闭钩⼦的管理器,可以给应⽤设置钩⼦,这样当JVM退出的时候就会执⾏清理⼯作

除了以上这些SparkContext包含的内部组件,还包括如下⼀些属性:

  • creationSite:CallSite类型,保存着线程栈中最靠近栈顶的⽤户定义的类和最靠近栈底的Scala或者Spark核⼼类的信息,其中ShortForm属性保存着上述信息的间断描述,LongForm属性保存着上述 信息的完整描述,具体的信息可以参阅源码部分地址为:core/src/main/scala/org/apache/spark/util/Utils.scala/getCallSite

  • allowMulitipleContext : 是否允许多个SparkContext实例,默认为False,可以通过设置Spark.Driver.allowMulitipleContexts来控制

  • startTime:标记sparkContext的启动时间戳

  • stopped:标记sparkContext是否处于停⽌状态,采⽤原⼦类型AtomicBoolean

  • addedFiles:⽤于每个本地⽂件的URL与添加此⽂件到到addedFiles时的时间戳之间的映射缓存 new ConcurrentHashMap[String, Long]

  • addedJars:⽤于每个本地Jar⽂件的URL与添加此⽂件到addedJars时的时间戳之间的映射缓存 new ConcurrentHashMap[String, Long]

  • persistentRdds:⽤于对所有持久化的RDD保持跟踪

  • executorEnvs:⽤于存储环境变量,将⽤于Executor执⾏的时候使⽤

  • sparkUser:当前系统的登录⽤户,可以通过环境变量SPARK_USER来设置 通过Utils.getCurrentUserName()获取

  • checkpointDir:RDD计算过程中⽤于记录RDD检查点的⺫录

  • localProperties:InheritableThreadLocal保护的线程,其中的属性值可以沿着线程栈⼀直传递下去

  • _conf:SparkContext的配置,会先调⽤config的clone⽅法,在进⾏验证配置,是否设置了spark.master和spark.app.name

  • jars:⽤户提交的jar⽂件,当选择部署模式为yarn时,

_jars是由spark.jars属性指定的jar⽂件和spark.yarn.dist.jars属性指定的并集 _files:⽤户设置的⽂件,可以根据Spark.file属性指定

_eventLogDir:事件⽇志的路径,当spark.enabled属性为true时启⽤,默认为/tmp/spark-events,也可以通过spark.eventLog.dir来指定⺫录 _eventLogCoder:事件⽇志的压缩算法,当spark.eventLog.enabled属性与spark.eventLog.compress属性为true时,压缩算法默认为lz4,也可以通过spark.io.compression.codec属性指定,⺫前⽀持lzf,snappy和lz4

  • _hadoopConfiguration:Hadoop配置信息,如果系统属性SPARK_YARN_MODE为true或者环境变量SPARK_YARN_MODEL为true,那么将会是YARN的配置,否则为Hadoop的配置

  • _executorMemtory:Executor内存⼤⼩,默认为1024MB,可以通过设置环境变量(SPARK_MEM或者SPARK_EXECUTOR_MEMORY)或者Spark.executor.memory属性指定其中Spark.executor.memory优先级最⾼

  • _applicationId:当前应⽤的标识,TaskScheduler启动后会创建应⽤标识,通过调取TaskScheduler的ApplicationId获取的

  • _applicationAttempId:当前应⽤尝试执⾏的标识,SparkDriver在执⾏时会多次尝试,每次尝试都会⽣成⼀个标识来代表应⽤尝试执⾏的⾝份

  • _listenerBusStarted:LiveListenerBus是否已经启动的标记

  • nextShuffleId:类型为AtomicInteger,⽤于⽣成下⼀个shuffle标识

  • nextRddId:类型为atomicInteger,⽤于⽣成下⼀个rdd标识

02 初始化具体流程

  1. 创建SparkEnv

    在Spark中,需要执⾏任务的地⽅就需要SparkEnv,在⽣产环境中,Spark往往运⾏于不同节点的Execute中,SparkEnv中的createDriverEnv⽤于创建SparkEnv,之后sparkEnv的实例通过set 设置到SparkEnv伴⽣对象env属性中,然后在需要⽤到sparkEnv的地⽅直接通过伴⽣对象get获取SparkEnv

  2. 创建⼼跳接受器(HeatbeatReceiver)

    在Sparklocal运⾏模式中,driver和executor在同⼀个节点同⼀个进程中,所以driver和executor可以本地交互调⽤,但是在分布式的环境中,driver和executor往往运⾏在不同的节点不同 的进程中,driver就⽆法监控executor的信息了,所以driver端创建了⼼跳接收器,那么⼼跳接收器是如何创建的。 ⾸先通过SparkEnv的NettyRpcEnv(基于NettyRPC)的setupEndPoint⽅法,然后向Dispatcher注册HeartbeatReceiver,并返回HeartbeatReceiver的NettyRpcEndPointRef的引⽤

  3. .创建和启动调度系统

    Spark调度系统主要分为TaskScheduler和DAGScheduler,TaskScheduler负责请求集群管理器给应⽤程序分配并运⾏Executor并给Task分配Executor并执⾏,DAGScheduler主要⽤于在任务交 给TaskSchduler执⾏之前做⼀些准备⼯作,⽐如创建Job,将DAG的RDD划分到不同的Stage,提交Stage等,如代码:

    val (sched, ts) = SparkContext. createTaskScheduler( this, master, deployMode)

    _schedulerBackend = sched

    _taskScheduler = ts

    _dagScheduler = new DAGScheduler( this)

    _heartbeatReceiver. ask[ Boolean]( TaskSchedulerIsSet)

    SparkContext.createTaskScheduler⽅法⽤于创建和启动TaskScheduler,针对不同的部署模式创建调度器的⽅式也不同,在代码中,_schedulerBackend表⽰SchedulerBackend的引⽤, _taskScheduler表⽰TaskScheduler的引⽤,在TaskScheduler中还会创建DAGScheduler的实例,最后向_heartbeatReceiver发送TaskSchedulerSet的消息,HeartbeatReceiver接收到之后将获取 SparkContext的_taskScheduler属性设置到⾃⼰的Schduler属性中

  4. 创建Executor动态分配管理器

    ExecutorAllocationManager: Executor动态分配管理器,根据⼯作负载动态调整Executor数量,当在配置spark.dynamicAlloction.enabled属性为true的情况下,在⾮local模式下或者 spark.dynamicAllcation.testing属性为true时启⽤

    ExecutorAllocationManager内部会定期的根据负载计算所需的Executor数量,如果Executor需求数量⼤于之前向集群管理器申请的数量,那么向集群管理器申请添加executor数量,反之,如果 executor需求数量⼩于之前向集群管理器申请的数量,那么向集群管理器申请减少executor。此外,ExecutorAllocationManager还会定期向集群管理器申请移除已经过期的executor

  5. 创建和启动ContextCleaner

    ContextCleaner:异步清理RDD、shuffle和⼲播状态信息

    通过配置spark.cleaner.referenceTracking(默认为true)来决定是否启⽤ContextCleaner

    ContextCleaner的组成:

    referencesQueue:缓存顶级的AnyRef引⽤

    referencesBuffer:缓存AnyRef的虚引⽤

    listeners:缓存清理⼯作中的监听器数组

    cleaningThread:清理具体⼯作的线程,此线程为守护线程

    periodicGCService:⽤于执⾏GC的调度线程池

    periodicGCInterval:执⾏GC的时间间隔,可通过spark.cleaner.periodicGC.interval配置,默认30分钟

    blockOnCleanUpTasks:清理⾮shuffle数据是否是阻塞的,可通过配置spark.cleaner.referenceTracking.blocking配置,默认是true

    blockOnShuffleCleanUpTasks:清理shuffle数据是否是阻塞的,可通过配置spark.cleaner.referenceTracking.blocking.shuffle ,默认是false

    stoped:标记contextCleaner是否停⽌状态

以上可以在github上打开spark源码进⾏边看⽂章边看源码,你会受益良多。 在这⾥推荐⼀个github源码阅读插件Insight.io for Github 在chrome扩展程序里可以直接查询。


分享到:


相關文章: