Spark 2.0 Structured Streaming 分析
作者:祝威廉
本文由 簡書 祝威廉 授權發布,版權所有歸作者,轉載請聯系作者!
前言
Spark 2.0 將流式計算也統一到DataFrame里去了,提出了Structured Streaming的概念,將數據源映射為一張無線長度的表,同時將流式計算的結果映射為另外一張表,完全以結構化的方式去操作流式數據,復用了其對象的Catalyst引擎。
Spark 2.0 之前
作為Spark平臺的流式實現,Spark Streaming 是有單獨一套抽象和API的,大體如下

上面都是套路,基本都得照著這么寫。
Spark 2.0 時代
概念上,所謂流式,無非就是無限大的表,官方給出的圖一目了然:


- 沒有定時器么,我怎么設置duration?
- 在哪里設置awaitTermination呢?
- 如果我要寫入到其他引擎,而其他引擎沒有適配咋辦?
這些疑問其實歸結起來就是:
Structured Streaming 的完整套路是啥?
我們來看看代碼(例子來源于Spark源碼,我稍微做了些修改):
這個就是Structured Streaming 的完整套路了。
Structured Streaming 目前Source源只支持File 和 Socket 兩種。輸出則是四種,前面已經提到。foreach則是可以無限擴展的。我舉個例子:
我把數據最后寫到各個節點的臨時目錄里。當然,這只是個例子,不過其他類似于寫入Redis的,則是類似的。
Structured Streaming 不僅僅在于API的變化
如果Structured Streaming 僅僅是換個API,或者能夠支持DataFrame操作,那么我只能感到遺憾了,因為2.0之前通過某些封裝也能夠很好的支持DataFrame的操作。那么 Structured Streaming 的意義到底何在?
- 重新抽象了流式計算
- 易于實現數據的exactly-once
我們知道,2.0之前的Spark Streaming 只能做到at-least once,框架層次很難幫你做到exactly-once,參考我以前寫的文章 Spark Streaming Crash 如何保證Exactly Once Semantics 。 現在通過重新設計了流式計算框架,使得實現exactly-once 變得容易了。
可能你會注意到,在Structured Streaming 里,多出了outputMode,現在有complete,append,update 三種,現在的版本只實現了前面兩種。
complete,每次計算完成后,你都能拿到全量的計算結果。
append,每次計算完成后,你能拿到增量的計算結果。
但是,這里有個但是,使用了聚合類函數才能用complete模式,只是簡單的使用了map,filter等才能使用append模式。 不知道大家明白了這里的含義么?
complete 就是我們前面提到的mapWithState實現。 append 模式則是標準的對數據做解析處理,不做復雜聚合統計功能。
官方給出了complete 模式的圖:

前面我們說到,現在的設計很簡單,其實就是 無限大的 Source Table 映射到一張無限大的 Result Table上,每個周期完成后,都會更新Result Table。我們看到,Structured Streaming 已經接管了端到端了,可以通過內部機制保證數據的完整性,可靠性。
offset 概念,流式計算一定有offset的概念。
對于無法回溯的數據源則采用了WAL日志
state概念,對result table 的每個分區都進行狀態包裝,分區的的每個ADD,PUT,UPDATE,DELETE操作,都會寫入到HDFS上,方便系統恢復。
其中第三點是只有在2.0才有的概念。不過比較遺憾的是,result table 和ForeachWriter 并沒有什么結合,系統只是保證result table的完整性,通過HDFSBackedStateStoreProvider將result table 保存到HDFS。
以前的API就是給你個partition的iterator,你愛怎么玩怎么玩,但是到了現在,以ForeachWriter為例,
override def process(value: Row): Unit = {
數據你只能一條一條處理了。理論上如果假設正好在process的過程中,系統掛掉了,那么數據就會丟了,但因為 Structured Streaming 如果是complete模式,因為是全量數據,所以其實做好覆蓋就行,也就說是冪等的。
如果是append 模式,則可能只能保證at-least once ,而對于其內部,也就是result table 是可以保證exactly-once 的。對于比如數據庫,本身是可以支持事物的,可以在foreachWrite close的時候commit下,有任何失敗的時候則在close的時候,rollback 就行。但是對于其他的,比如HBase,Redis 則較為困難。
另外在ForeachWriter提供的初始化函數,
override def open(partitionId: Long, version: Long): Boolean = {
返回值是Boolean,通過檢測版本號,是否跳過這個分區的數據處理。返回true是為不跳過,否則為跳過。當你打開的時候,可以通過某種手段保存version,再系統恢復的時候,則可以讀取該版本號,低于該版本的則返回false,當前的則繼續處理。
原文>>>
End.