RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
如何进行JobScheduler内幕实现和深度思考

本篇文章为大家展示了如何进行JobScheduler内幕实现和深度思考,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

创新互联专业为企业提供五华网站建设、五华做网站、五华网站设计、五华网站制作等企业网站建设、网页设计与制作、五华企业网站模板建站服务,十多年五华做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。

DStream的foreachRDD方法,实例化ForEachDStream对象,并将用户定义的函数foreachFunc传入到该对象中。foreachRDD方法是输出操作,foreachFunc方法会作用到这个DStream中的每个RDD。

/**
 * Apply a function to each RDD in this DStream. This is an output operator, so
 * 'this' DStream will be registered as an output stream and therefore materialized.
 * @param foreachFuncforeachRDD function
 * @param displayInnerRDDOpsWhether the detailed callsites and scopes of the RDDs generated
 *                           in the `foreachFuncto be displayed in the UI. If `false`, then
 *                           only the scopes and callsites of `foreachRDDwill override those
 *                           of the RDDs on the display.
 */
private defforeachRDD(
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean): Unit = {
  newForEachDStream(this,
    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

ForEachDStream对象中重写了generateJob方法,调用父DStream的getOrCompute方法来生成RDD并封装Job,传入对该RDD的操作函数foreachFunc和time。dependencies方法定义为父DStream的集合。

/**
 * An internal DStream used to represent output operations like DStream.foreachRDD.
 * @param parent        Parent DStream
 * @param foreachFunc   Function to apply on each RDD generated by the parent DStream
 * @param displayInnerRDDOpsWhether the detailed callsites and scopes of the RDDs generated
 *                           by `foreachFuncwill be displayed in the UI; only the scope and
 *                           callsite of `DStream.foreachRDDwill be displayed.
 */
private[streaming]
classForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extendsDStream[Unit](parent.ssc) {

  override defdependencies: List[DStream[_]] = List(parent)

  override defslideDuration: Duration = parent.slideDuration

  override defcompute(validTime: Time): Option[RDD[Unit]] = None

  override defgenerateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match{
      caseSome(rdd) =>
        valjobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(newJob(time, jobFunc))
      caseNone => None
    }
  }
}

DStreamGraph的generateJobs方法中会调用outputStream的generateJob方法,就是调用ForEachDStream的generateJob方法。

defgenerateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time "+ time)
  valjobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      valjobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated "+ jobs.length + " jobs for time "+ time)
  jobs
}

DStream的generateJob定义如下,其子类中只有ForEachDStream重写了generateJob方法。

/**
 * Generate a SparkStreaming job for the given time. This is an internal method that
 * should not be called directly. This default implementation creates a job
 * that materializes the corresponding RDD. Subclasses of DStream may override this
 * to generate their own jobs.
 */
private[streaming] defgenerateJob(time: Time): Option[Job] = {
  getOrCompute(time) match{
    caseSome(rdd) => {
      valjobFunc = () => {
        valemptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(newJob(time, jobFunc))
    }
    caseNone => None
  }
}

DStream的print方法内部还是调用foreachRDD来实现,传入了内部方法foreachFunc,来取出num+1个数后打印输出。

/**
 * Print the first num elements of each RDD generated in this DStream. This is an output
 * operator, so this DStream will be registered as an output stream and there materialized.
 */
defprint(num: Int): Unit = ssc.withScope {
  defforeachFunc: (RDD[T], Time) => Unit = {
    (rdd: RDD[T], time: Time) => {
      val firstNum = rdd.take(num + 1)
      // scalastyle:off println
      println("-------------------------------------------")
      println("Time: "+ time)
      println("-------------------------------------------")
      firstNum.take(num).foreach(println)
      if(firstNum.length > num) println("...")
      println()
      // scalastyle:on println
    }
  }
  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}

总结:JobScheduler是SparkStreaming 所有Job调度的中心,内部有两个重要的成员:

JobGenerator负责Job的生成,ReceiverTracker负责记录输入的数据源信息。

JobScheduler的启动会导致ReceiverTracker和JobGenerator的启动。ReceiverTracker的启动导致运行在Executor端的Receiver启动并且接收数据,ReceiverTracker会记录Receiver接收到的数据meta信息。JobGenerator的启动导致每隔BatchDuration,就调用DStreamGraph生成RDD Graph,并生成Job。JobScheduler中的线程池来提交封装的JobSet对象(时间值,Job,数据源的meta)。Job中封装了业务逻辑,导致最后一个RDD的action被触发,被DAGScheduler真正调度在Spark集群上执行该Job。

上述内容就是如何进行JobScheduler内幕实现和深度思考,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。


网站名称:如何进行JobScheduler内幕实现和深度思考
链接分享:http://scyingshan.cn/article/ispgjg.html