作者:牛肉圓粉不加蔥
本文由 簡書 牛肉圓粉不加蔥 授權發布,版權所有歸作者,轉載請聯系作者!
Streaming job 的調度與執行
結合文章 揭開 Spark Streaming神秘面紗④ – job 的提交與執行 我們畫出了如下 job 調度執行流程圖:

為什么很難保證 exactly once
上面這張流程圖最主要想說明的就是,job 的提交執行是異步的,與 checkpoint 操作并不是原子操作。這樣的機制會引起數據重復消費問題:
為了簡化問題容易理解,我們假設一個 batch 只生成一個 job,并且 spark.streaming.concurrentJobs 值為1,該值代表 jobExecutor 線程池中線程的個數,也即可以同時執行的 job 的個數。
假設,batch duration 為2s,一個 batch 的總共處理時間為1s,此時,一個 batch 開始了,第一步生成了一個 job,假設花了0.1s,然后把該 job 丟到了 jobExecutor 線程池中等待調度執行,由于 checkpoint 操作和 job 在線程池中執行是異步的,在0.2s 的時候,checkpoint 操作完成并且此時開始了 job 的執行。
注意,這個時候 checkpoint 完成了并且該 job 在 checkpoint 中的狀態是未完成的,隨后在第1s 的時候 job 完成了,那么在這個 batch 結束的時候 job 已經完成了但該 job 在 checkpoint 中的狀態是未完成的(要了解 checkpoint 都保存了哪些數據請移步 Spark Streaming的還原藥水——Checkpoint )。
在下一個 batch 運行到 checkpoint 之前就掛了(比如在拉取數據的時候掛了、OOM 掛了等等異常情況),driver 隨后從 checkpoint 中恢復,那么上述的 job 依然是未執行的,根據使用的 api 不同,對于這個 job 會再次拉取數據或從 wal 中恢復數據重新執行該 job,那么這種情況下該 job 的數據就就會被重復處理。比如這時記次的操作,那么次數就會比真實的多。
如果一個 batch 有多個 job 并且spark.streaming.concurrentJobs大于1,那么這種情況就會更加嚴重,因為這種情況下就會有多個 job 已經完成但在 checkpoint 中還是未完成狀態,在 driver 重啟后這些 job 對應的數據會被重復消費處理。
另一種會導致數據重復消費的情況主要是由于 Spark 處理的數據單位是 partition 引起的。比如在處理某 partition 的數據到一半的時候,由于數據內容或格式會引起拋異常,此時 task 失敗,Spark 會調度另一個同樣的 task 執行,那么此時引起 task 失敗的那條數據之前的該 partition 數據就會被重復處理,雖然這個 task 被再次調度依然會失敗。若是失敗還好,如果某些特殊的情況,新的 task 執行成功了,那么我們就很難發現數據被重復消費處理了。
如何保證 exactly once
至于如何才能保證 exactly once,其實要根據具體情況而定(廢話)??傮w來說,可以考慮以下幾點:
- 業務是否不能容忍即使是極少量的數據差錯,如果是那么考慮 exactly once。如果可以容忍,那就沒必要非實現 exactly once 不可
- 即使重復處理極小部分數據會不會對最終結果產生影響。若不會,那重復處理就重復吧,比如排重統計
- 若一定要保證 exactly once,應該考慮將對 partition 處理和 checkpoint或自己實現類似 checkpoint 功能的操作做成原子的操作;并且對 partition 整批數據進行類似事物的處理
End.