古詩詞大全網 - 四字成語 - Flink | Checkpoint 機制詳解

Flink | Checkpoint 機制詳解

壹、Checkpoint 簡介

Flink 的 Checkpoint 機制是其 可靠性 的基石。當壹個任務在運行過程中出現故障時,可以根據 Checkpoint 的信息恢復到故障之前的某壹狀態,然後從該狀態恢復任務的運行。 在 Flink 中,Checkpoint 機制采用的是 chandy-lamport (分布式快照)算法,通過 Checkpoint 機制,保證了 Flink 程序內部的 Exactly Once 語義。

二、Checkpoint 機制流程詳解

1. 任務啟動

我們假設任務從 Kafka 的某個 Topic 中讀取數據,該Topic 有 2 個 Partition,故任務的並行度為 2。根據讀取到數據的奇偶性,將數據分發到兩個 task 進行求和。

某壹時刻,狀態如下:

2.啟動Checkpoint

JobManager 根據 Checkpoint 間隔時間,啟動 Checkpoint。此時會給每個 Source 發送壹個 barrier 消息,消息中的數值表示 Checkpoint 的序號,每次啟動新的 Checkpoint 該值都會遞增。

3. Source啟動Checkpoint

當Source接收到barrier消息,會將當前的狀態(Partition、Offset)保存到 StateBackend,然後向 JobManager 報告Checkpoint 完成。之後Source會將barrier消息廣播給下遊的每壹個 task:

4.task 接收 barrier

當task接收到某個上遊(如這裏的Source1)發送來的barrier,會將該上遊barrier之前的數據繼續進行處理,而barrier之後發送來的消息不會進行處理,會被緩存起來。

之前對barrier的理解比較模糊,直到看到了下面這幅圖。barrier的作用和這裏 "歡迎光臨" 牌子的作用類似,用於區分流中的數據屬於哪壹個 Checkpoint:

我們可以理解為:barrier之前的數據屬於本次Checkpoint,barrier之後的數據屬於下壹次Checkpoint,所以下次Checkpoint的數據是不應該在本次Checkpoint過程中被計算的,因此會將數據進行緩存。

5.barrier對齊

如果某個task有多個上遊輸入,如這裏的 sum_even 有兩個 Source 源,當接收到其中壹個 Source 的barrier後,會等待其他 Source 的 barrier 到來。在此期間,接收到 barrier 的 Source 發來的數據不會處理,只會緩存(如下圖中的數據4)。而未接收到 barrier 的 Source 發來的數據依然會進行處理,直到接收到該Source 發來的 barrier,這個過程稱為 barrier的對齊

barrier是否對齊決定了程序實現的是 Exactly Once 還是 At Least Once:

如果不進行barrier對齊,那麽這裏 sum_even 在接收 Source2 的 barrier 之前,對於接收到 Source1的 數據4 ,不會進行緩存,而是直接進行計算,sum_even 的狀態改為12,當接收到 Source2 的barrier,會將 sum_even 的狀態 sum=12 進行持久化。如果本次Checkpoint成功,在進行下次 Checkpoint 前任務崩潰,會根據本次Checkpoint進行恢復。此時狀態如下:

從這裏我們就可以看出, Source1的數據4被計算了兩次 。因此,Exactly Once語義下,必須進行barrier的對齊,而 At Least Once語義下 barrier 可以不對齊。

註意:barrier對齊只會發生在多對壹的Operator(如 join)或者壹對多的Operator(如 reparation/shuffle)。如果是壹對壹的Operator,如map、flatMap 或 filter 等,則沒有對齊這個概念,都會實現Exactly Once語義,即使程序中配置了At Least Once 。

6.處理緩存數據

當task接收到所有上遊發送來的barrier,即可以認為當前task收到了本次 Checkpoint 的所有數據。之後 task 會將 barrier 繼續發送給下遊,然後處理緩存的數據,比如這裏 sum_even 會處理 Source1 發送來的數據4. 而且,在這個過程中 Source 會 繼續讀取數據 發送給下遊,並不會中斷。

7.上報Checkpoint完成

當sink收到barrier後,會向JobManager上報本次Checkpoint完成。至此,本次Checkpoint結束,各階段的狀態均進行了持久化,可以用於後續的故障恢復。