内射老阿姨1区2区3区4区_久久精品人人做人人爽电影蜜月_久久国产精品亚洲77777_99精品又大又爽又粗少妇毛片

Spark2.x中怎么實(shí)現(xiàn)CacheManager源碼深度剖析

這篇文章將為大家詳細(xì)講解有關(guān)Spark2.x中怎么實(shí)現(xiàn)CacheManager源碼深度剖析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

為鎮(zhèn)巴等地區(qū)用戶提供了全套網(wǎng)頁(yè)設(shè)計(jì)制作服務(wù),及鎮(zhèn)巴網(wǎng)站建設(shè)行業(yè)解決方案。主營(yíng)業(yè)務(wù)為成都做網(wǎng)站、網(wǎng)站建設(shè)、鎮(zhèn)巴網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!

一、概述

    CacheManager主要發(fā)生在利用RDD的數(shù)據(jù)執(zhí)行算子的時(shí)候,之前我們講過在ShufffleWriter進(jìn)行數(shù)據(jù)寫時(shí),會(huì)調(diào)用RDD對(duì)應(yīng)的Iterator()方法,獲取RDD對(duì)應(yīng)的數(shù)據(jù),CacheManager主要干三件事:

    a. 管理Spark的緩存,可以基于內(nèi)存,也可以基于磁盤;

    b.底層是通過BlockManager進(jìn)行數(shù)據(jù)的讀寫操作;

    c.Task運(yùn)行會(huì)調(diào)用RDD中的iterator方法進(jìn)行數(shù)據(jù)的計(jì)算;

二、CacheManager源碼剖析

1.之前我們講解的ShuffleMapTask中的runTask方法時(shí),ShuffleWriter寫數(shù)據(jù)的參數(shù)傳入的是rdd.iterator()方法計(jì)算出來的那個(gè)partition數(shù)據(jù),代碼如下:

  var writer: ShuffleWriter[Any, Any] = null    try {      val manager = SparkEnv.get.shuffleManager      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)      //這里就是ShuffleMapTask類的runTask()方法中對(duì)應(yīng)的代碼調(diào)用      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])      writer.stop(success = true).get    } catch {      ...................    }

2.這里看RDD類中的iterator方法,代碼如下:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {    //判斷下如果StorageLevel.NONE這說明RDD,之前肯定是進(jìn)行了持久化    //getOrCompute中會(huì)通過CacheManager獲取之前持久化的數(shù)據(jù)    if (storageLevel != StorageLevel.NONE) {      getOrCompute(split, context)      //如果沒有進(jìn)行過持久化,就需要通過父RDD定義的算子去獲取數(shù)據(jù)      //注意這里如果有CheckPoint,會(huì)通過CheckPoint獲取,checkPoint獲取不到才去重新計(jì)算      } else {      computeOrReadCheckpoint(split, context)    }  }

3.跟進(jìn)去看下持久化的RDD的處理,getOrCompute()函數(shù),代碼如下:

 private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {    val blockId = RDDBlockId(id, partition.index)    var readCachedBlock = true    //CacheManger這里是通過BlockManager獲取持久化數(shù)據(jù),    //如果獲取成功直接返回,如果獲取失敗,調(diào)用computeOrReadCheckpoint進(jìn)行計(jì)算     //內(nèi)存數(shù)據(jù)為啥會(huì)丟失? 之前我們知道內(nèi)存中的數(shù)據(jù)如果空間不夠的話,同時(shí)如果指定可以將數(shù)據(jù)緩存到磁盤,會(huì)溢寫到磁盤,   //如果未指定溢寫到磁盤,這些數(shù)據(jù)就會(huì)丟失掉 就需要重新計(jì)算    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {      readCachedBlock = false      //獲取不到重新計(jì)算,這里要注意,代碼執(zhí)行到這里說明這個(gè)RDD肯定是經(jīng)過持久化的      //這里計(jì)算出數(shù)據(jù)后,會(huì)在getOrElseUpdate里面通過makeIterator參數(shù)對(duì)數(shù)據(jù)進(jìn)行重新持久化(這里理解的不太透徹)      computeOrReadCheckpoint(partition, context)    }) match {      case Left(blockResult) =>        if (readCachedBlock) {          val existingMetrics = context.taskMetrics().inputMetrics          existingMetrics.incBytesRead(blockResult.bytes)          new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {            override def next(): T = {              existingMetrics.incRecordsRead(1)              delegate.next()            }          }        } else {          new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])        }      case Right(iter) =>        new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])    }  }

4.這里繼續(xù)跟蹤getOrElseUpdate()獲取持久化的數(shù)據(jù) ,代碼如下:

//這里會(huì)調(diào)用get()方法從本地或者遠(yuǎn)程獲取block數(shù)據(jù),直接返回//如果沒有讀取到數(shù)據(jù)就需要重新計(jì)算數(shù)據(jù),由于代碼執(zhí)行到這里,rdd肯定是經(jīng)過持久化的//這里計(jì)算出數(shù)據(jù)后,通過makeIterator迭代器,重新進(jìn)行持久化(這里理解的不太透徹) def getOrElseUpdate[T](      blockId: BlockId,      level: StorageLevel,      classTag: ClassTag[T],      makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {    // Attempt to read the block from local or remote storage. If it's present, then we don't need    // to go through the local-get-or-put path.    //這里會(huì)調(diào)用get()方法從本地或者遠(yuǎn)程獲取block數(shù)據(jù),直接返回    get[T](blockId)(classTag) match {      case Some(block) =>        return Left(block)      case _ =>        // Need to compute the block.    }    //這里的處理意思是:對(duì)于本地遠(yuǎn)程沒有獲取到數(shù)據(jù),然后computeOrReadCheckpoint重新計(jì)算的數(shù)據(jù)    //由于RDD是持久化的,原來的持久化數(shù)據(jù)可能丟了,這里根據(jù)持久化級(jí)別重新進(jìn)行數(shù)據(jù)的持久化    //這里代碼有點(diǎn)不太好理解 要結(jié)合上面2中第12-14行代碼 一起理解    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {      case None =>        // doPut() didn't hand work back to us, so the block already existed or was successfully        // stored. Therefore, we now hold a read lock on the block.        val blockResult = getLocalValues(blockId).getOrElse {          // Since we held a read lock between the doPut() and get() calls, the block should not          // have been evicted, so get() not returning the block indicates some internal error.          releaseLock(blockId)          throw new SparkException(s"get() failed for block $blockId even though we held a lock")        }        // We already hold a read lock on the block from the doPut() call and getLocalValues()        // acquires the lock again, so we need to call releaseLock() here so that the net number        // of lock acquisitions is 1 (since the caller will only call release() once).        releaseLock(blockId)        Left(blockResult)      case Some(iter) =>        // The put failed, likely because the data was too large to fit in memory and could not be        // dropped to disk. Therefore, we need to pass the input iterator back to the caller so        // that they can decide what to do with the values (e.g. process them without caching).       Right(iter)    }  }

5.這里回過頭來看computeOrReadCheckpoint方法,如果計(jì)算數(shù)據(jù)的,代碼如下:

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =  {  //如果設(shè)置了CheckPoint,從Checkpoint中獲取數(shù)據(jù)  //這里CheckPoint相關(guān)的知識(shí),先不講解,后面有篇文章單獨(dú)講解    if (isCheckpointedAndMaterialized) {          firstParent[T].iterator(split, context)    } else {    //如果數(shù)據(jù)沒有進(jìn)行過Checkpoint,這里只能重新計(jì)算一次    //這里就是根據(jù)自己的rdd算子重新計(jì)算      compute(split, context)    }  }

6.CacheManager數(shù)據(jù)計(jì)算的大體流程:

    1).如果RDD進(jìn)行過持久化,根據(jù)持久化級(jí)別通過BlockManager從本地或者遠(yuǎn)程獲取數(shù)據(jù),如果數(shù)據(jù)獲取不到,則需要重新計(jì)算,由于這里RDD進(jìn)行過持久化,只是由于某種原因丟失,還需要根據(jù)持久化級(jí)別重新進(jìn)行一次數(shù)據(jù)的持久化。

    2).如果RDD沒有進(jìn)行持久化,就需要重新計(jì)算,重新計(jì)算時(shí),這里如果RDD進(jìn)行了CheckPoint,則優(yōu)先獲取CheckPoint過的數(shù)據(jù),如果沒有,則需要從RDD的父RDD執(zhí)行我們定義的算子來重新計(jì)算Partition數(shù)據(jù)。

關(guān)于Spark2.x中怎么實(shí)現(xiàn)CacheManager源碼深度剖析就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

網(wǎng)頁(yè)名稱:Spark2.x中怎么實(shí)現(xiàn)CacheManager源碼深度剖析
標(biāo)題網(wǎng)址:http://m.rwnh.cn/article44/gspoee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站導(dǎo)航網(wǎng)站設(shè)計(jì)、商城網(wǎng)站外貿(mào)網(wǎng)站建設(shè)、品牌網(wǎng)站設(shè)計(jì)域名注冊(cè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

營(yíng)銷型網(wǎng)站建設(shè)
应城市| 阳泉市| 曲周县| 定安县| 邢台县| 三穗县| 延津县| 北宁市| 侯马市| 甘谷县| 正镶白旗| 兴文县| 陵水| 马龙县| 从江县| 武清区| 台南县| 广宗县| 凌海市| 孟连| 航空| 林州市| 南雄市| 化隆| 疏附县| 安化县| 凤凰县| 安徽省| 溧阳市| 南京市| 千阳县| 金塔县| 大新县| 文昌市| 望谟县| 旌德县| 景谷| 南宁市| 平邑县| 西畴县| 道孚县|