99偷拍视频精品区一区二,口述久久久久久久久久久久,国产精品夫妇激情啪发布,成人永久免费网站在线观看,国产精品高清免费在线,青青草在线观看视频观看,久久久久久国产一区,天天婷婷久久18禁,日韩动漫av在线播放直播

提交stage

  //提交stage,為stage創(chuàng)建一批task,task數(shù)量和partition數(shù)量相同

站在用戶的角度思考問題,與客戶深入溝通,找到立山網(wǎng)站設(shè)計(jì)與立山網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗(yàn)好的作品,建站類型包括:成都網(wǎng)站制作、成都做網(wǎng)站、外貿(mào)營銷網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、國際域名空間、雅安服務(wù)器托管、企業(yè)郵箱。業(yè)務(wù)覆蓋立山地區(qū)。

  private def submitMissingTasks(stage: Stage, jobId: Int) {

    logDebug("submitMissingTasks(" + stage + ")")

    // Get our pending tasks and remember them in our pendingTasks entry

    stage.pendingTasks.clear()

    // First figure out the indexes of partition ids to compute.

//獲取要創(chuàng)建的task的數(shù)量

    val partitionsToCompute: Seq[Int] = {

      if (stage.isShuffleMap) {

        (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)

      } else {

        val job = stage.resultOfJob.get

        (0 until job.numPartitions).filter(id => !job.finished(id))

      }

    }

    val properties = if (jobIdToActiveJob.contains(jobId)) {

      jobIdToActiveJob(stage.jobId).properties

    } else {

      // this stage will be assigned to "default" pool

      null

    }

//將stage加入runningstage隊(duì)列

    runningStages += stage

    // SparkListenerStageSubmitted should be posted before testing whether tasks are

    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event

    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted

    // event.

    stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))

    outputCommitCoordinator.stageStart(stage.id)

    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.

    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast

    // the serialized copy of the RDD and for each task we will deserialize it, which means each

    // task gets a different copy of the RDD. This provides stronger isolation between tasks that

    // might modify state of objects referenced in their closures. This is necessary in Hadoop

    // where the JobConf/Configuration object is not thread-safe.

    var taskBinary: Broadcast[Array[Byte]] = null

    try {

      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).

      // For ResultTask, serialize and broadcast (rdd, func).

      val taskBinaryBytes: Array[Byte] =

        if (stage.isShuffleMap) {

          closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()

        } else {

          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()

        }

      taskBinary = sc.broadcast(taskBinaryBytes)

    } catch {

      // In the case of a failure during serialization, abort the stage.

      case e: NotSerializableException =>

        abortStage(stage, "Task not serializable: " + e.toString)

        runningStages -= stage

        return

      case NonFatal(e) =>

        abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")

        runningStages -= stage

        return

    }

//為stage創(chuàng)建指定數(shù)量的task

    val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {

      partitionsToCompute.map { id =>

//給每個partition創(chuàng)建一個task

//給每個task計(jì)算最佳位置

        val locs = getPreferredLocs(stage.rdd, id)

        val part = stage.rdd.partitions(id)

//對于finalstage之外的stage的isShuffleMap都是true

//所以會創(chuàng)建ShuffleMapTask

        new ShuffleMapTask(stage.id, taskBinary, part, locs)

      }

    } else {

//如果不是ShuffleMap,就會創(chuàng)建finalstage

//finalstage是穿件resultTask

      val job = stage.resultOfJob.get

      partitionsToCompute.map { id =>

        val p: Int = job.partitions(id)

        val part = stage.rdd.partitions(p)

//獲取task計(jì)算的最佳位置的方法 getPreferredLocs

        val locs = getPreferredLocs(stage.rdd, p)

        new ResultTask(stage.id, taskBinary, part, locs, id)

      }

    }

    if (tasks.size > 0) {

      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")

      stage.pendingTasks ++= tasks

      logDebug("New pending tasks: " + stage.pendingTasks)

      taskScheduler.submitTasks(

        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))

      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

    } else {

      // Because we posted SparkListenerStageSubmitted earlier, we should post

      // SparkListenerStageCompleted here in case there are no tasks to run.

      outputCommitCoordinator.stageEnd(stage.id)

      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))

      logDebug("Stage " + stage + " is actually done; %b %d %d".format(

        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))

      runningStages -= stage

    }

  }

  def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {

    getPreferredLocsInternal(rdd, partition, new HashSet)

  }

//task對應(yīng)partition的最佳位置

//就是從stage的最后一個RDD開始,找哪個RDD是被持久化了或者checkpoint

//那么task的最佳位置就是緩存的/checkpoint 的 partition的位置

//因?yàn)檫@樣的話,task就在那個節(jié)點(diǎn)上執(zhí)行,不需要計(jì)算之前的RDD

  private def getPreferredLocsInternal(

      rdd: RDD[_],

      partition: Int,

      visited: HashSet[(RDD[_],Int)])

    : Seq[TaskLocation] =

  {

    // If the partition has already been visited, no need to re-visit.

    // This avoids exponential path exploration.  SPARK-695

    if (!visited.add((rdd,partition))) {

      // Nil has already been returned for previously visited partitions.

      return Nil

    }

    // If the partition is cached, return the cache locations

//尋找當(dāng)前RDD是否緩存了

    val cached = getCacheLocs(rdd)(partition)

    if (!cached.isEmpty) {

      return cached

    }

    // If the RDD has some placement preferences (as is the case for input RDDs), get those

//尋找當(dāng)前RDD是否checkpoint了

    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList

    if (!rddPrefs.isEmpty) {

      return rddPrefs.map(TaskLocation(_))

    }

    // If the RDD has narrow dependencies, pick the first partition of the first narrow dep

    // that has any placement preferences. Ideally we would choose based on transfer sizes,

    // but this will do for now.

//遞歸調(diào)用,看看父RDD是否緩存或者checkpoint

    rdd.dependencies.foreach {

      case n: NarrowDependency[_] =>

        for (inPart <- n.getParents(partition)) {

          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)

          if (locs != Nil) {

            return locs

          }

        }

      case _ =>

    }

//如果從第一個RDD到最后一個RDD都沒有緩存或者checkpoint,那最佳位置就是Nil,也就是沒有最佳位置

//那他的位置就要由taskscheduler來分配

    Nil

  }

文章題目:提交stage
URL鏈接:http://www.yijiale78.com/article26/pcphcg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站建設(shè)品牌網(wǎng)站制作網(wǎng)站營銷商城網(wǎng)站關(guān)鍵詞優(yōu)化電子商務(wù)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都seo排名網(wǎng)站優(yōu)化