Linxu磁盘 个人收款码 merge playframework bluetooth background vue入门 vue配置 网页后台模板 seo计费系统 jq选择子元素 java反射方法 cos图像和sin图像 mser算法 linux查看防火墙 mysql卸载工具 mysql函数返回结果集 java高级特性 kubernetes实战 mysql新建数据库 random函数用法 python打开文件夹 python可视化编程 java正则匹配 java数据库 java时间戳转时间 java替换字符 java字符串格式化 python 教程 猫爪 电脑cmd命令大全 算法笔记 文章查重软件 枪林弹雨辅助 爱奇艺视频下载到电脑 西门子触摸屏编程软件 qq浏览器全屏 cad拉伸命令 python编译 sai怎么导入图片
当前位置: 首页 > 学习教程  > 编程语言

Spark Driver Program剖析

2020/9/19 16:17:50 文章标签: 测试文章如有侵权请发送至邮箱809451989@qq.com投诉后文章立即删除

  SparkContext是通往Spark集群的唯一入口,是整个Application运行调度的核心。

一、Spark Driver Program

  Spark Driver Program(以下简称Driver)是运行Application的main函数并且新建SparkContext实例的程序。其实,初始化SparkContext是为了准备Spark应用程序的运行环境,在Spark中,由SparkContext负责与集群进行通信、资源的申请、任务的分配和监控等。当Worker节点中的Executor运行完毕Task后,Driver同时负责将SparkContext关闭。通常也可以使用SparkContext来代表驱动程序(Driver)。

  Driver(SparkContext)整体架构图如下所示。

   

二、SparkContext深度剖析

  SparkContext是通往Spark集群的唯一入口,可以用来在Spark集群中创建RDDs、累加器(Accumulators)和广播变量(Broadcast Variables)。SparkContext也是整个Spark应用程序(Application)中至关重要的一个对象,可以说是整个Application运行调度的核心(不是指资源调度)。

  SparkContext的核心作用是初始化Spark应用程序运行所需要的核心组件,包括高层调度器(DAGScheduler)、底层调度器(TaskScheduler)和调度器的通信终端(SchedulerBackend),同时还会负责Spark程序向Master注册程序等。

  一般而言,通常为了测试或者学习Spark开发一个Application,在Application的main方法中,最开始几行编写的代码一般是这样的:

  首先,创建SparkConf实例,设置SparkConf实例的属性,以便覆盖Spark默认配置文件spark-env.sh,spark-default.sh和log4j.properties中的参数;

  然后,SparkConf实例作为SparkContext类的唯一构造参数来实例化SparkContext实例对象。SparkContext在实例化的过程中会初始化DAGScheduler、TaskScheduler和SchedulerBackend,而当RDD的action触发了作业(Job)后,SparkContext会调用DAGScheduler将整个Job划分成几个小的阶段(Stage),TaskScheduler会调度每个Stage的任务(Task)进行处理。

  还有,SchedulerBackend管理整个集群中为这个当前的Application分配的计算资源,即Executor。

  如果用一个车来比喻Spark Application,那么SparkContext就是车的引擎,而SparkConf是关于引擎的配置参数。说明:只可以有一个SparkContext实例运行在一个JVM内存中,所以在创建新的SparkContext实例前,必须调用stop方法停止当前JVM唯一运行的SparkContext实例。

  Spark程序在运行时分为Driver和Executor两部分:Spark程序编写是基于SparkContext的,具体包含两方面。

  Spark编程的核心基础RDD是由SparkContext最初创建的(第一个RDD一定是由SparkContext创建的)。

  Spark程序的调度优化也是基于SparkContext,首先进行调度优化。

  Spark程序的注册是通过SparkContext实例化时生产的对象来完成的(其实是SchedulerBackend来注册程序)。

  Spark程序在运行时要通过Cluster Manager获取具体的计算资源,计算资源获取也是通过SparkContext产生的对象来申请的(其实是SchedulerBackend来获取计算资源的)。

  SparkContext崩溃或者结束的时候,整个Spark程序也结束。

三、SparkContext源码解析

  SparkContext是Spark应用程序的核心。我们运行WordCount程序,通过日志来深入了解SparkContext。

  WordCount.scala的代码如下。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.HadoopRDD
import org.apache.log4j.Logger
import org.apache.log4j.Level

object wordcount {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ALL);
    
    // 第1步:创建Spark的配置对象SparkConf,设置Spark程序运行时的配置信息,
    val conf = new SparkConf().setAppName("My First Spark APP").setMaster("local")
    
    // 第2步:创建SparkContext对象
    val sc = new SparkContext(conf)
    
    // 第3步:根据具体的数据来源来创建RDD
    val lines = sc.textFile("helloSpark.txt", 1)
    
    // 第4步:对初始的RDD进行Transformation级别的处理,如通过map、filter等
    val words = lines.flatMap{line=>line.split(" ")}
    val pairs = words.map{word=>(word,1)}
    pairs.cache()
    val wordCountsOdered = pairs.reduceByKey(_+_).saveAsTextFile("wordCountResult.log")
//    val wordCountsOdered = pairs.reduceByKey(_+_).map(
//      pair=>(pair._2,pair._1)    
//    ).sortByKey(false).map(pair=>(pair._2,pair._1))
//    wordCountsOdered.collect.foreach(wordNumberPair=>println(wordNumberPair._1+" : "+wordNumberPair._2))
//    while(true){
//      
//    }
    sc.stop()
    
    
  }
}

  在Eclipse中运行wordcount代码,日志显示如下:

   

  程序一开始,日志里显示的是:INFO SparkContext: Running Spark version 2.0.1,日志中间部分是一些随着SparkContext创建而创建的对象,另一条比较重要的日志信息,作业启动了并正在运行:INFO SparkContext: Starting job: saveAsTextFile at WordCountJobRuntime.scala:58。

  在程序运行的过程中会创建TaskScheduler、DAGScheduler和SchedulerBackend,它们有各自的功能。DAGScheduler是面向Job的Stage的高层调度器;TaskScheduler是底层调度器。SchedulerBackend是一个接口,根据具体的ClusterManager的不同会有不同的实现。程序打印结果后便开始结束。日志显示:INFO SparkContext: Successfully stopped SparkContext。

   

  通过这个例子可以感受到Spark程序的运行到处都可以看到SparkContext的存在,我们将SparkContext作为Spark源码阅读的入口,来理解Spark的所有内部机制。

  我们从一个整体去看SparkContext创建的实例对象。首先,SparkContext构建的顶级三大核心为DAGScheduler、TaskScheduler、SchedulerBackend,其中,DAGScheduler是面向Job的Stage的高层调度器;TaskScheduler是一个接口,是底层调度器,根据具体的ClusterManager的不同会有不同的实现,Standalone模式下具体的实现是TaskSchedulerImpl。SchedulerBackend是一个接口,根据具体的ClusterManager的不同会有不同的实现。Standalone模式下具体的实现是StandaloneSchedulerBackend。下图为SparkContext整体运行图

   

  从整个程序运行的角度讲,SparkContext包含四大核心对象:DAGScheduler、TaskScheduler、SchedulerBackend、MapOutputTrackerMaster。StandaloneSchedulerBackend有三大核心功能:负责与Master连接,注册当前程序RegisterWithMaster;接收集群中为当前应用程序分配的计算资源Executor的注册并管理Executors;负责发送Task到具体的Executor执行。

  第一步:程序一开始运行时会实例化SparkContext里的对象,所有不在方法里的成员都会被实例化!一开始实例化时第一个关键的代码是createTaskScheduler,它位于SparkContext的PrimaryConstructor中,当它实例化时会直接被调用,这个方法返回的是taskScheduler和dagScheduler的实例,然后基于这个内容又构建了DAGScheduler,最后调用taskScheduler的start()方法。要先创建taskScheduler,然后再创建dagScheduler,因为taskScheduler是受dagScheduler管理的。

  SparkContext.scala的源码如下。

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

  第二步:调用createTaskScheduler,这个方法创建了TaskSchedulerImpl和StandaloneSchedulerBackend,createTaskScheduler方法的第一个入参是SparkContext,传入的this对象是在应用程序中创建的sc,第二个入参是master的地址。

  以下是wordcount.scala创建SparkConf和SparkContext的上下文信息

    // 第1步:创建Spark的配置对象SparkConf,设置Spark程序运行时的配置信息,
    val conf = new SparkConf().setAppName("My First Spark APP").setMaster("local")
    
    // 第2步:创建SparkContext对象
    val sc = new SparkContext(conf)

  当SparkContext调用createTaskScheduler方法时,根据集群的条件创建不同的调度器,例如,createTaskScheduler第二个入参master如传入local参数,SparkContext将创建TaskSchedulerImpl实例及LocalSchedulerBackend实例,在测试代码的时候,可以尝试传入local[*]或者是local[2]的参数,然后跟踪代码,看看创建了什么样的实例对象。

  SparkContext中的SparkMasterRegex对象定义不同的正则表达式,从master字符串中根据正则表达式适配master信息。

  SparkContext.scala的源码如下。

 

/**
 * A collection of regexes for extracting information from the master string.
 */
private object SparkMasterRegex {
  // Regular expression used for local[N] and local[*] master formats
  val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
  // Regular expression for local[N, maxRetries], used in tests with failing tasks
  val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
  // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
  val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
  // Regular expression for connecting to Spark deploy clusters
  val SPARK_REGEX = """spark://(.*)""".r
  // Regular expression for connection to Mesos cluster by mesos:// or mesos://zk:// url
  val MESOS_REGEX = """mesos://(.*)""".r
}

  这是设计模式中的策略模式,它会根据实际需要创建出不同的SchedulerBackend的子类。

  SparkContext.scala的createTaskScheduler方法的源码如下。

 

  /**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
        scheduler.initialize(backend)
        (backend, scheduler)

  在实际生产环境下,我们都是用集群模式,即以spark://开头,此时在程序运行时,框架会创建一个TaskSchedulerImpl和StandaloneSchedulerBackend的实例,在这个过程中也会初始化taskscheduler,把StandaloneSchedulerBackend的实例对象作为参数传入。StandaloneSchedulerBackend被TaskSchedulerImpl管理,最后返回TaskScheduler和StandaloneSchdeulerBackend。

  SparkContext.scala的源码如下。

      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

  createTaskScheduler方法执行完毕后,调用了taskscheduler.start()方法来正式启动taskscheduler,这里虽然调用了taskscheduler.start方法,但实际上是调用了taskSchedulerImpl的start方法,因为taskSchedulerImpl是taskScheduler的子类。

  Task默认失败重试次数是4次,如果任务不容许失败,就可以调大这个参数。调大spark.task.maxFailures参数有助于确保重要的任务失败后可以重试多次。

  初始化TaskSchedulerImpl:调用createTaskScheduler方法时会初始化TaskSchedulerImpl,然后把StandaloneSchedulerBackend当作参数传进去,初始化TaskSchedulerImpl时首先是创建一个Pool来初定义资源分布的模式Scheduling Mode,默认是先进先出(FIFO)的模式。

  回到taskScheduler start方法,taskScheduler.start方法调用时会再调用schedulerbackend的start方法。

  SchedulerBackend包含多个子类,分别是LocalSchedulerBackend、CoarseGrainedScheduler-Backend和StandaloneSchedulerBackend、MesosCoarseGrainedSchedulerBackend、YarnScheduler-Backend。

  StandaloneSchedulerBackend的start方法调用了CoarseGraninedSchedulerBackend的start方法,通过StandaloneSchedulerBackend注册程序把command提交给Master:Command ("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)来创建一个StandaloneAppClient的实例。

  Master发指令给Worker去启动Executor所有的进程时加载的Main方法所在的入口类就是command中的CoarseGrainedExecutorBackend,在CoarseGrainedExecutorBackend中启动Executor(Executor是先注册,再实例化),Executor通过线程池并发执行Task,然后再调用它的run方法。

  回到StandaloneSchedulerBackend.scala的start方法:其中创建了一个很重要的对象,即StandaloneAppClient对象,然后调用它的client.start()方法。

  在start方法中创建一个ClientEndpoint对象。

  StandaloneAppClient.scala的star方法的源码如下。

  def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

  ClientEndpoint是一个RpcEndPoint,首先调用自己的onStart方法,接下来向Master注册。

  调用registerWithMaster方法,从registerWithMaster调用tryRegisterAllMasters,开一条新的线程来注册,然后发送一条信息(RegisterApplication的case class)给Master。

  Master收到RegisterApplication信息后便开始注册,注册后再次调用schedule()方法。

四、总结

  从SparkContext创建taskSchedulerImpl初始化不同的实例对象来完成最终向Master注册的任务,中间包括调用scheduler的start方法和创建StandaloneAppClient来间接创建ClientEndPoint完成注册工作。

  我们把SparkContext称为天堂之门,SparkContext开启天堂之门:Spark程序是通过SparkContext发布到Spark集群的;SparkContext导演天堂世界:Spark程序的运行都是在SparkContext为核心的调度器的指挥下进行的;SparkContext关闭天堂之门:SparkContext崩溃或者结束的时候整个Spark程序也结束。 


本文链接: http://www.dtmao.cc/news_show_200397.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?