古詩詞大全網 - 成語故事 - 揭秘Spark_checkpoint

揭秘Spark_checkpoint

checkpoint是什麽

(1)、Spark 在生產環境下經常會面臨transformation的RDD非常多(例如壹個Job中包含1萬個RDD)或者具體transformation的RDD本身計算特別復雜或者耗時(例如計算時長超過1個小時),這個時候就要考慮對計算結果數據持久化保存;

(2)、Spark是擅長多步驟叠代的,同時擅長基於Job的復用,這個時候如果能夠對曾經計算的過程產生的數據進行復用,就可以極大的提升效率;

(3)、如果采用persist把數據放在內存中,雖然是快速的,但是也是最不可靠的;如果把數據放在磁盤上,也不是完全可靠的!例如磁盤會損壞,系統管理員可能清空磁盤。

(4)、Checkpoint的產生就是為了相對而言更加可靠的持久化數據,在Checkpoint的時候可以指定把數據放在本地,並且是多副本的方式,但是在生產環境下是放在HDFS上,這就天然的借助了HDFS高容錯、高可靠的特征來完成了最大化的可靠的持久化數據的方式;

假如進行壹個1萬個算子操作,在9000個算子的時候persist,數據還是有可能丟失的,但是如果checkpoint,數據丟失的概率幾乎為0。

checkpoint原理機制

1.當RDD使用cache機制從內存中讀取數據,如果數據沒有讀到,會使用checkpoint機制讀取數據。此時如果沒有checkpoint機制,那麽就需要找到父RDD重新計算數據了,因此checkpoint是個很重要的容錯機制。checkpoint就是對於壹個RDD chain(鏈)如果後面需要反復使用某些中間結果RDD,可能因為壹些故障導致該中間數據丟失,那麽就可以針對該RDD啟動checkpoint機制,使用checkpoint首先需要調用sparkContext的setCheckpoint方法,設置壹個容錯文件系統目錄,比如hdfs,然後對RDD調用checkpoint方法。之後在RDD所處的job運行結束後,會啟動壹個單獨的job來將checkpoint過的數據寫入之前設置的文件系統持久化,進行高可用。所以後面的計算在使用該RDD時,如果數據丟失了,但是還是可以從它的checkpoint中讀取數據,不需要重新計算。

2.persist或者cache與checkpoint的區別在於,前者持久化只是將數據保存在BlockManager中但是其lineage是不變的,但是後者checkpoint執行完後,rdd已經沒有依賴RDD,只有壹個checkpointRDD,checkpoint之後,RDD的lineage就改變了。persist或者cache持久化的數據丟失的可能性更大,因為可能磁盤或內存被清理,但是checkpoint的數據通常保存到hdfs上,放在了高容錯文件系統。

問題:哪些 RDD 需要 cache?

會被重復使用的(但不能太大)。

問題:用戶怎麽設定哪些 RDD 要 cache?

因為用戶只與 driver program 打交道,因此只能用 rdd.cache() 去 cache 用戶能看到的 RDD。所謂能看到指的是調用 transformation() 後生成的 RDD,而某些在 transformation() 中 Spark 自己生成的 RDD 是不能被用戶直接 cache 的,比如 reduceByKey() 中會生成的 ShuffledRDD、MapPartitionsRDD 是不能被用戶直接 cache 的。

運算時間很長或運算量太大才能得到的 RDD,computing chain 過長或依賴其他 RDD 很多的 RDD。 實際上,將 ShuffleMapTask 的輸出結果存放到本地磁盤也算是 checkpoint,只不過這個 checkpoint 的主要目的是去 partition 輸出數據。

問題:什麽時候 checkpoint?

cache 機制是每計算出壹個要 cache 的 partition 就直接將其 cache 到內存了。但 checkpoint 沒有使用這種第壹次計算得到就存儲的方法,而是等到 job 結束後另外啟動專門的 job 去完成 checkpoint 。 也就是說需要 checkpoint 的 RDD 會被計算兩次。因此,在使用 rdd.checkpoint() 的時候,建議加上 rdd.cache(), 這樣第二次運行的 job 就不用再去計算該 rdd 了,直接讀取 cache 寫磁盤。其實 Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 這樣的方法,相當於 cache 到磁盤上,這樣可以做到 rdd 第壹次被計算得到時就存儲到磁盤上,但這個 persist 和 checkpoint 有很多不同,之後會討論。

RDD 需要經過 [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ] 這幾個階段才能被 checkpoint。

Initialized: 首先 driver program 需要使用 rdd.checkpoint() 去設定哪些 rdd 需要 checkpoint,設定後,該 rdd 就接受 RDDCheckpointData 管理。用戶還要設定 checkpoint 的存儲路徑,壹般在 HDFS 上。

marked for checkpointing: 初始化後,RDDCheckpointData 會將 rdd 標記為 MarkedForCheckpoint。

checkpointing in progress: 每個 job 運行結束後會調用 finalRdd.doCheckpoint(),finalRdd 會順著 computing chain 回溯掃描,碰到要 checkpoint 的 RDD 就將其標記為 CheckpointingInProgress,然後將寫磁盤(比如寫 HDFS)需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 節點上的 blockManager。完成以後,啟動壹個 job 來完成 checkpoint(使用 rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf)) )。

checkpointed: job 完成 checkpoint 後,將該 rdd 的 dependency 全部清掉,並設定該 rdd 狀態為 checkpointed。然後, 為該 rdd 強加壹個依賴,設置該 rdd 的 parent rdd 為 CheckpointRDD, 該 CheckpointRDD 負責以後讀取在文件系統上的 checkpoint 文件,生成該 rdd 的 partition。

有意思的是我在 driver program 裏 checkpoint 了兩個 rdd,結果只有壹個(下面的 result)被 checkpoint 成功,pairs2 沒有被 checkpoint,也不知道是 bug 還是故意只 checkpoint 下遊的 RDD:

checkPoint是壹種容錯機制,當我們的程序需要很多transformation操作的時候,如果我們擔心中間某些關鍵的後面會反復幾次使用的RDD,可能會因為節點的故障,導致持久化數據的丟失,那麽就可以針對該RDD額外啟動checkpoint機制, 實現容錯和高可用。

首先要調用 SparkContext的setCheckPointDir()方法,設置壹個容錯的文件系統的目錄,比如HDFS; 然後對RDD調用checkpoint()方法。之後,在RDD所處的job運行結束之後,會啟動壹個單獨的job,來將checkPoint的RDD的數據寫入之前設置的文件系統,進行高可用、容錯的類持久化操作。

此時就算在後面使用RDD時,它的持久化的數據,不小心丟失了,但是還是可以從它的checkpoint文件中直接讀取其數據,從而不需要重新計算。 (CacheManager)

答:首先使用SparkContext.setCheckpointDir() ,設置checkpoint的目錄,然後使用RDD.checkpoin進行checkpoint。

剖析,當我們使用了checkpoint之後,發生的壹系列操作:

1、 對RDD調用了checkpoint()方法之後,它就接受RDDCheckpointData對象的管理。

2、 RDDCheckpointData對象,會負責將調用了checkpoint()方法的RDD的狀態,設置為MarkedForCheckpoint。

3、 在RDD所在的那個job運行結束之後,會調用job中,最後壹個RDD的doCheckPoint()方法,該方法沿著finalRDD的lineage向上查找,標記為MarkedForCheckpoint的RDD,並將其標記為CheckpointingInProgress。

4、 然後啟動壹個單獨的job,來將lineage中,標記為CheckpointingInProgress的RDD,進行checkpoint的操作,也就是將這個RDD寫入到Sparkcontext.setCheckpointDir()方法設置的文件系統中。

答:最主要的區別:

在於持久化,只是將數據保存在BlockManager中,但是rdd的lineage沒有發生改變。

但是checkpoint執行完以後,rdd已經沒有之前所謂的依賴rdd了,而只有壹個強行為其設置的checkpointRDD,也就是說,checkpoint之後,rdd的lineage就改變了。

其次,持久化的數據丟失的可能性更大,無論是磁盤、或者是內存,都有可能丟失;但是checkpoint的數據,通常是保存在容錯、高可用的文件系統中的,比如HDFS,依賴於這種高容錯的文件西永,所以checkpoint的數據丟失可能性非常低。

答:如果壹個RDD沒有緩存,而且還設置了checkpoint,這樣的操作是很悲劇的,細想,本來當前RDD的這個job都執行結束了,但是由於中間的rdd沒有持久化,那麽checkpoint job想要將rdd 的數據寫入到外部文件系統中的話,還得從rdd之前所有的rdd,全部重新計算壹次,然後才能計算出rdd的數據,再將其checkpoint到外部的文件系統中,

所以我們通常建議,對要checkpoint的rdd使用persisit(StorageLevel_DISK_OMLY),該rdd計算之後,就直接將其持久化到磁盤上去,然後後面進行checkpoint操作是,直接從磁盤上讀取rdd的數據,並checkpoint到外部文件系統即可,不需要重新計算壹次rdd。