實時數倉不用愁,StarRocks+Flink來解憂!
2022年1月9日,StarRocks亮相Flink Forward Asia 2021大會開源解決方案專場,StarRocks解決方案架構師謝寅做了題為“雙劍合璧:Flink+StarRocks構建實時數倉解決方案”的主題演講。本文以主講嘉賓從技術方案的角度,為社區的小伙伴帶來最全、最詳細的文字版實錄回顧!
本文從以下5個方面介紹:
第一部分,實時數倉技術的發展趨勢和技術挑戰,以及為什么Flink+StarRocks能夠提供端到端的極速實時數倉體驗。
第二部分,介紹什么是StarRocks,它有哪些技術特點,擅長的場景是什么,以及為什么作為OLAP層的極速分析引擎,它能夠很好與Flink技術進行整合。
第三部分,重點介紹聯合Flink和StarRocks兩大技術棧構建實時數倉的方法論。
第四部分,介紹一些利用Flink和StarRocks構建實時數倉的最佳實踐案例。
第五部分,展望了StarRocks在實時數倉方向以及Flink社區貢獻等方面的后續規劃。
1.實時數倉概述
隨著各行各業對數據越來越重視,實時計算技術也在不斷的演進。從時效性上來講,對于小時級或者分鐘級的計算已經不能滿足客戶業務的需要,需求逐漸從時窗驅動,升級到事件驅動,甚至每產生一條數據,都想盡快看到數據。ETL過程也從離線或者微批的ETL,變為Flink擅長的實時流式處理。
數據源上,早先只能支持單一的數據源,整體的數據表現力較差。而當下,人們不僅希望能對單一數據流進行分析計算,還希望能聯合多個數據源進行多流計算,為此不惜想盡一切辦法,來讓數據的表現力更加豐富。
從工程效率的角度上看,技術團隊也逐漸意識到,工程代碼開發的成本高企不下,更希望能構建自己的平臺化IDE工具,讓業務人員能基于其上直接進行FlinkSQL的開發。在這些演進的過程也逐漸浮現出一些技術難點亟待解決,比如:
·亂序數據怎么更好的處理?
·通過Watermark之類的手段,是讓過去的數據隨即失效,還是希望所有的明細數據都能入庫?
·多流Join到底應該怎么做合適?
·維表是一次性加載進來,還是放到外存儲做熱查詢,除此之外還有沒有其他的技術選擇?
·數據處理作業一旦重啟,怎么保證在恢復之后還能做到不丟不重的續接消費?
·怎么才能提高整體的業務開發效率,保證業務上線時沒有業務中斷,更優雅快捷的進行業務邏輯迭代?
在此之外,還有一件事也是業務人員或平臺架構師最關注的,那就是通過Flink這么強大的實時計算引擎,費勁千辛萬苦好不容易把計算層效率從小時級或者分鐘級的延遲提升到了秒級,結果現有的OLAP產品拖了后腿,查詢耗費了好幾分鐘,辜負了計算團隊的大量心血。
以上種種,充分證明了極速OLAP+實時計算的重要性,以此我們就可以打造一套端到端的極速實時數倉解決方案,即所謂“雙劍合璧”!
談到數倉,目前業界落地較多的還是Lambda架構,也就是離線數倉和實時數倉分開構建。邏輯分層的形式,也基本形成了業界的共識。業務數據有的是RDBMS采集上來的,有的是日志采集上來的,有的是批量抽取上來的,有的是CDC或者流式寫上來的。原始操作層(ODS)基本都是保持數據原貌,然后經過維度擴展、清洗過濾、轉換,構建成明細層(DWD)。再往上層走,數據開始做輕度聚合,并有原子指標出現。最后按照主題或者應用的需要產出ADS層里的派生指標或者衍生指標。
企業構建實時數倉,為了讓整體的邏輯清晰,通常情況下也會沿用這種分層模式,只不過受限于實時數據到達的先后情況以及業務需要,可能會有些層次的裁剪,不像離線數倉里那么豐富。中間的一些維度信息,可能會同時被離線數倉和實時數倉共享使用。最后將數據送入OLAP產品,供報表系統、接口或者Adhoc查詢所調用。
基于前面對數倉典型邏輯分層的探討,問題也隨之而來:
是否有一款OLAP產品能夠很好的和Flink結合,滿足持續的秒級的數據攝入和極速分析查詢能力?
答案是一定的,StarRocks的定位正是要提供極速分析查詢能力,來適應各種各樣的OLAP場景。
2.StarRocks是什么
這是StarRocks的宏觀架構圖。
從左邊我們可以看到常見的Kafka、分布式文件系統、傳統關系型數據庫,都可以作為StarRocks的數據源。
StarRocks提供了4種模型:
·如果業務場景只涉及數據的持續Append,可以選擇Duplicate明細模型,在其上可以實時構建物化視圖加速DWS層查詢;
·如果業務場景不關注明細的下鉆,StarRocks還有Aggregate聚合模型表,相當于數據直接秒級打入DWS層,滿足高并發的聚合指標查詢;
·對于ODS層做業務庫數據還原時,若涉及到數據更新的場合,可以采用Unique模型,利用Flink的Append流Sink數據進來,完成ODS數據去重和更新;
·另外,StarRocks最新2.0版本提供的PrimaryKey主鍵模型,比Unique模型查詢性能快3倍以上,內置了OP字段來標記Upsert/Delete操作,并且能夠很好的吻合Flink的Retract回撤流語義,聚合計算不必非要開窗轉為Append流來Sink,進一步增強了FlinkSQL的表現力。
StarRocks還提供了邏輯View和物化視圖,提供了更豐富的建模能力。
在上圖的右側是StarRocks的物理架構,整體還是非常簡潔的,主要就是兩種角色:FE前端節點和BE后端節點。
·FE負責查詢規劃、元數據管理、集群高可用,并包含CBO優化器,為分布式多表關聯和復雜Adhoc查詢提供最優的執行規劃。
·BE節點承載了列式存儲引擎和全面向量化的執行引擎,保障在OLAP分析場景中提供極速查詢體驗。
·對上層應用提供MySQL連接協議,可以用MySQL客戶端輕松連入進行開發和查詢,和主流BI工具有很好的兼容性,也可以服務于OLAP報表和API封裝。
3.StarRocks擅長哪些場景
基于StarRocks的4種模型,可以提供明細查詢和聚合查詢,能夠應對OLAP報表的上卷和下鉆,比如在廣告主報表場景應對高并發點查詢。
StarRocks基于Roaring Bitmap提供了Bitmap數據結構,并配套有集合計算函數,可以用于精確去重計算和用戶畫像的客群圈選業務。在實時方面,StarRocks可以用于支撐實時大屏看板、實時數倉,秒級延遲的呈現業務原貌和數倉指標。
最后,基于CBO優化器,StarRorcks在OLAP場景下有很好的多表關聯、子查詢嵌套等復雜查詢的性能,可以用于自助BI平臺、自助指標平臺和即席數據探查等自助分析場景。
StarRocks能夠用于構建實時數倉,得益于他的三種實時數據攝入能力:
·可以直接消費Kafka的消息。
·可以借助Flink-connecor實現Exactly-once語義的流式數據攝入。
·另外,結合Flink-CDC和PrimaryKey模型,可以實現從TP庫Binlog實時同步Upsert和Delete等操作,更好的服務于ODS層業務庫還原。
利用Flink-Connector-StarRocks插件,可以實現從TP庫Binlog實時同步Upsert和Delete等操作,更好的服務于ODS層業務庫還原。配套的SMT(StarRocks Migration Tool)工具,可以自動映射Flink中的TP庫Source和StarRocks庫的Sink建表語句,使得基于FlinkSQL的開發工作變得簡單便捷。
另外,Flink-Connector更重要的功能是提供了通用Sink能力,開發者把依賴加入后,無論是工程編碼還是FlinkSQL都可以輕松Add Sink,保障數據秒級導入時效性。
結合Flink的Checkpoint機制和StarRocks的導入事務標簽,還可以保障不丟不重的精準一次導入。
StarRocks的實時物化視圖構建能力,結合Flink-Connector的持續增量數據導入,可以在流量類指標計算的建模中,實現DWD明細數據導入完成的同時,DWS聚合指標也同步增量構建完成,極大提升聚合指標產出效率,縮短分層ETL的旅程。
StarRocks提供的Replace_if_not_null能力比較有意思,正如語義所述,只要插入的數據不是null,那么就可以去替換數據。
如圖所示,右側是個建表示例,里面維度列為日期和Uid,其余3列中SRC表示數據源,另外帶了v1,v2兩個Metric;
通過2個Insert語句我們可以看到,來自2個Kafka主題的數據源的數據,輕松的實現了同時寫入一張表的不同列。因此,這個功能提供了兩種實時數倉能力:
1)Join on Load,也就是在導入的過程中,基于StarRocks來實現流式Join。
2)部分列更新能力。
StarRocks為了支持更好的Upsert/Delete,提供了PrimaryKey表模型。
如上圖所示,最左側是經典的LSM模型,也就是Merge-on-Read的形式。這種模型寫入時不用去判斷既有鍵位,對寫友好,但讀取時需要Merge合并,所以對讀取數據不友好。
而最右側是Copy-on-Write的模型,典型的產品就是DeltaLake。這種模型和LSM正好相反,有比較好的讀效率,但是對于寫入不是很友好。
比較平衡讀取和寫入的,就是上圖中間的兩種Record級別沖突檢查的模型,Kudu的Write Delta和StarRocks的Delete+Insert模型。
由于維護了內存表,PrimaryKey模型更適合冷熱特征明顯的場合,對熱數據頻繁的更新和刪除更友好;
另外非常適合PrimaryKey較少的表(如用戶畫像的寬表),雖然列很多,但是主鍵其實只有UUID這種字段。
StarRocks早期的Unique模型就是采用了最左邊的LSM模型,因此查詢效率較差,并且對于Delete不友好,結合Flink開發應用時,只能使用Append流進行Sink。
StarRocks 2.0版本中新增加的PrimaryKey模型,提供了軟刪除字段,通過在內存中維護最新數據,使得查詢時避免了Merge的過程,從而極大提升了查詢性能,并且既可以使用Append流也可以使用Retract流進行Sink,豐富了與Flink結合時的應用場景。
4.構建實時數倉的具體方法
眾所周知,在按照邏輯分層自下而上的構建實時數倉時,多流Join是有一定的技術門檻的。傳統的實時計算引擎如Storm、Spark Streaming在這方面做的都不是很好。而Flink其實提供了很多通用的解決方法,如:
·基于MapStat做狀態計算,或者BroadcastStat將維度緩存廣播出去;
·用Flink關聯外部熱存儲,如HBase/Redis等;
·一些相對穩定、更新頻率低的維度數據或者碼表數據,可以利用RichFlatMapFunc的Open方法,在啟動時就全部加裝到內存里;
不限于以上這些,其實Flink已經在維度擴展上,給了開發者很多可以落地的選擇。然而有了StarRocks,我們會有更多的想象空間。
比如利用前面介紹的Replace_if_not_null的能力,開發者可以實現多個數據源稀疏寫入寬表的不同列,來實現Join-on-Load的效果。
另外StarRocks強悍的CBO優化器在多表關聯查詢能力方面也表現優異,如果數據量不大或者在查詢并發不高的場景,甚至可以把Join的邏輯下推到OLAP層來做,這樣可以釋放掉Flink上的一些構建負荷,讓Flink專注于清洗和穩定的數據導入,而多表關聯和復雜查詢等業務邏輯在StarRocks上進行。
不僅如此,還可以結合Join-on-Load和Join on StarRocks的兩種形式,也就是稀疏寫入有限張表,通過表之間做Colocation join策略,保證有限的表之間數據分布一致,做Join的時候沒有節點間Shuffle,在上層構建邏輯View面向查詢。
雙劍方案1.微批調度
Flink清洗導入Kafka的日志或者用Flink-CDC-StarRocks讀取MySQL Binlog導入StarRocks,ETL過程中埋入批次處理時間,采用外圍調度系統,基于批次處理時間篩選數據,做分鐘級微批調度,向上構建邏輯分層。
這種方案的主要特點是:StarRocks作為ETL的Source和Sink,計算邏輯在StarRocks側,適用于分鐘級延遲,數據體量不大的場景。
雙劍方案2.Flink增量構建
實時消息流通過Kafka接?,采用Flink進?流式ETL、多流Join、增量聚合等,在內存中完成分層構建,然后將相應的數據,層對層的通過Flink-connector寫出到StarRocks對應表內。各層按需面向下游提供OLAP查詢能力。
該方案的主要特點是:計算邏輯在Flink側,適用于需要前導做較重ETL的場景,StarRocks不參與ETL,只承載OLAP查詢,應對較高QPS查詢負荷。
雙劍方案3.StarRocksView視圖
Flink清洗導入Kafka的日志或者用Flink-CDC-StarRocks工具讀取MySQL Binlog導入StarRocks;根據需要選用明細、聚合、更新、主鍵各種模型,只物理落地ODS和DIM層,向上采用View視圖;利用StarRocks向量化極速查詢和CBO優化器滿足多表關聯、嵌套子查詢等復雜SQL,查詢時現場計算指標結果,保證指標上卷和下鉆高度同源一致。
該方案主要特點是:計算邏輯在StarRocks側(現場查詢),適用于業務庫高頻數據更新的場景,實體數據只在ODS或DWD存儲(未來StarRocks提供多表Materialized Views,將會進一步提升查詢性能)。
5.最佳實踐案例
前面我們介紹了一些聯合Flink和StarRocks構建實時數倉的幾種方法論,下面我們來看4個實際的客戶案例。
汽車 之家目前在智能推薦的效果分析、物料點擊、曝光、計算點擊率、流量寬表等場景,對實時分析的需求日益強烈。經過多輪的探索,最終選定StarRocks作為實時OLAP分析引擎,實現了對數據的秒級實時分析。
在數據處理流程上,SQLServer、MySQL、TiDB等數據源,通過CDC打入多個Topic主題,用FlinkSQL進行ETL清洗和聚合計算,然后通過Flink-Connector導入StarRocks。早期選擇的Unique表模型,由于業務有很多Delete操作,而Merge-on-Read的模型對Delete支持不好,如果只做Update而不做Delete,會造成結果數據比業務庫多的問題。
最新的PrimaryKey模型支持了OP字段(更新/刪除操作),改為PrimaryKey模型后,數據結果與上游業務完全一致。
上圖右側是在硬件配置6x 48c 256G、數據量3500W+、有持續寫入情況下,22個SQL用例的測試情況,查詢性能也比Unique模型有大幅提升。
在合理的選型和建模之后,汽車之家在實時平臺IDE上也做了很多工作,開發運維人員可以在頁面里進行DDL建表,FlinkSQL開發,作業的起停、上線管理等工作。結合Flink-Connecotor,可以直接通過FlinkSQL將加工后的數據導入StarRocks,完成端到端的實時平臺集成。
另外,利用StarRocks提供的200多個監控Metric,汽車之家用Prometheus和Grafana等組件做了充分的可視化監控,即時查看集群的統計指標,把握集群的健康狀態。
第2個案例,順豐 科技 的運單分析場景實踐。在2021年雙11大促活動中,運單分析場景應對了15w TPS消息體量的實時數據導入和更新。整體的處理流程如圖所示,多個業務系統中的數據源打到幾個Source Kafka,用Flink來對數據進行加工、字段補充、重新組織,然后整理后的數據打到若干個Sink Kafka主題,最后利用前面介紹的Join-on-Load的形式,將多個數據源的數據,稀疏的寫入寬表的不同列,以此來實現寬表拼齊的過程。
在具體使用上,順豐科技將運單的數據根據更新的頻度,劃分為了2張寬表,按照相同的數據分布做成Colocation組,保證Join的時候沒有額外的節點Shuffle。一張表涉及的更新較少,命名為公表。另一張表涉及的更新較多,命名為私表。
每個子表都利用了Replace_if_not_null的部分列更新的能力,合理的設計了維度和聚合指標,并引入了Bloom Filter索引加速篩選的效率,用日期做范圍分區,用訂單號做數據分布,配置了動態分區,自動淘汰冷數據。對外通過邏輯View的形式關聯成一張寬表,底層是以現場Join的形式,整體面向業務提供查詢服務。
第3個案例是來自多點DMALL的實時數倉實踐。實時更新場景主要對實時監控經營的各項指標進行分析,如當前時間段內的GMV、下單數量、妥投數量、指標達成、對比、環比等指標分析,為客戶的經營決策提供更具時效性的參考依據。
早期,針對數據為實時(秒級)更新的場景,主要使用Impala on Kudu引擎,采用Lambda架構,基于相同的主鍵,將流式的預計算的結果數據、批計算的結果數據,基于相同的主鍵進行Merge。
這個Case早期的架構如左圖所示,ODS、DWD、DWS等分層在Kafka里承載,ADS層在Kudu/MySQL里,維表放在HBase里,采用Flink查詢外表熱存儲的形式實現維度數據和事實消息的關聯。如右圖所示,經過梳理和改造,順豐科技將DWD到DWS的聚合處理從Flink下沉到OLAP層,用StarRocks替換了Kudu,簡化了預聚合鏈路,提升了開發效率。
第4個案例是來自一個某車聯網企業的Fusion數倉建設。隨著新能源汽車的普及,車聯網IOT數據的實時接入分析的需求也越來越多。
業務邏輯如左圖所示,傳感器上報的儀表、空調、發動機、整車控制器、電池電壓、電池溫度等1000+傳感器Metric要通過Flink做實時ETL清洗,同時要完成功能主題實時分揀、數據質量實時報告,最終滿足于時序數據綜合分析和可視化展示。技術上,大量采用Flink.Jar的工程代碼開發,對于某些碼值還涉及到Flink多流Join及狀態計算。流量類的主題,采用StarRocks的增量聚合模型出聚合指標。也利用FlinkSQL對于運營分析類業務進行了實時數倉構建,將ADS層結果導入StarRocks供統一接口查詢。
整體上也是按照Lambda模型設計的,FLink清洗整合后的合規數據,會通過落盤程序沉降到HDFS,用于持久存儲、離線數倉進行跑批及更復雜的模型訓練,最終Hive的結果數據也會送到StarRocks供接口查詢使用。
數據邏輯設計如右圖所示,上面為離線數倉,下面為實時數倉邏輯分層。
可以看到實時清洗后的DWD層數據會成為離線數倉的ODS層,而離線數倉構建好的一些相對固定的維表數據,也會用于實時數倉的流式維度擴展。實時數倉的邏輯分層相較于離線數倉更為簡約,DWD明細層會存在于獨立的Kafka或者在Flink內存中,DWS層在FlinkSQL聚合完成后就直接下沉到StarRocks了。
這里其實是進行了兩次聚合,在Flink里進行了秒級的聚合,而StarRocks里的時間信息相關的維度列是到分鐘或者15分鐘的,利用StarRocks的聚合模型,將Flink匯聚的5-10s的聚合結果,再次匯聚到分鐘級鍵位。這樣設計有兩個好處,第一,能夠減少LSM模型的Version版本,提升查詢性能;第二,抽稀到分鐘級后,更便于可視化展示,降低了前端取數的壓力。
6.實時即未來,StarRocks后續規劃
關于PrimaryKey模型,后續版本即將支持部分列更新,進一步豐富TP業務庫還原的能力;并在PrimaryKey模型上支持Bloom Filter、Bitmap等索引能力,進一步提升數據查詢性能。
資源隔離方面,后續StarRocks會發布自適應內存、CPU分配能力,客戶不再需要手動調整配置參數;未來也會支持多租戶資源隔離的Feature。
對于Apache Flink項目的貢獻方面,當前Flink-Connector-StarRocks還只具備Sink能力,后續會在Source方面提供支撐,屆時用戶可以通過Flink分布式讀取StarRocks數據,用FlinkSQL做跑批任務。
另外,在CDC適配上,后續也會提供Oracle/PostgreSQL等更豐富的TP庫的DDL自動映射能力,適應更多CDC應用。
在云原生時代,StarRocks已經開始了積極探索和實踐,很快就會提供存儲計算分離、異地容災等能力,為客戶提供彈性、可靠的OLAP層查詢分析體驗。
以上就是本次分享的全部內容。實時即未來,歡迎大家一起加入到Apache Flink和StarRocks社區建設,共同探索出更多實時數倉的最佳實踐。