
一 背景
Flink 和 ClickHouse 分別是即時流式計算和 OLAP 領域的翹楚,很多網際網路、廣告、遊戲等客戶都將兩者聯合使用於構建使用者畫像、即時 BI 報表、應用監控指標查詢、監控等業務,形成了即時數倉解決方案(如圖-1)。這些業務對資料的準確性要求都十分嚴格,所以即時數倉整個鏈路需要保證端到端的 Exactly-Once。通常來說 Flink 的上游是可以重複讀取或者消費的 pull-based 持久化儲存(例如Kafka),要實現 Source 端的 Exactly-Once 只需要回溯 Source 端的讀取進度即可。Sink 端的 Exactly-Once 則比較複雜,因為 Sink 是 push-based 的,需要依賴目標輸出系統的事務保證,但社群 ClickHouse 對事務並不支援,所以針對此情況阿里雲 EMR ClickHouse 與 Flink 團隊一起深度研發,支援了 Flink 到 ClickHouse 的 Exactly-Once寫入來保證整個即時數倉資料的準確性。本文將分別介紹下現有機制以及實現方案。

圖-1 即時數倉架構
二 機制梳理
1 ClickHouse 寫入機制
ClickHouse 是一個 MPP 架構的列式 OLAP 系統(如圖-2),各個節點是對等的,透過 Zookeeper 協同資料,可以透過併發對各個節點寫本地表的方式進行大批次的資料匯入。
ClickHouse 的 data part 是資料儲存的最小單元,ClickHouse 接收到的資料 Block 在寫入時,會按照 partition 粒度進行拆分,形成一個或多個 data part。data part 在寫入磁碟後,會通過後臺merge執行緒不斷的合併,將小塊的 data part 合併成大塊的 data part,以此降低儲存和讀取的開銷。
在向本地表寫入資料時,ClickHouse 首先會寫入一個臨時的 data part,這個臨時 data part 的資料對客戶端不可見,之後會直接進行 rename 操作,使這個臨時 data part 成為正式 data part,此時資料對客戶端可見。幾乎所有的臨時 data part 都會快速地成功被 rename 成正式 data part,沒有被 rename 成功的臨時 data part 最終將被 ClickHouse 清理策略從磁碟上刪除。
透過上述分析,可以看出 ClickHouse 的資料寫入有一個從臨時 data part 轉為正式 data part 的機制,加以修改可以符合兩階段提交協議,這是實現分散式系統中事務提交一致性的重要協議。

圖-2 Flink作業寫入ClickHouse
注:多個 Flink Task 可以寫入同一個 shard 或 replica
2 Flink 寫機制
Flink 作為一個分散式處理引擎,提供了基於事務的 Sink 機制,該機制可以保障寫入的 Exactly-Once,相應的資料接收方需要提供遵守 XA 規範的 JDBC 。由於完整的 XA 規範相當複雜,因此,我們先對 Flink 的處理機制進行梳理,結合 ClickHouse 的實際情況,確定需要實現的介面範圍。
為了實現分散式寫入時的事務提交統一,Flink 藉助了 checkpoint 機制。該機制能夠週期性地將各個 Operator 中的狀態生成快照並進行持久化儲存。在 checkpoint 機制中,有一個 Coordinator 角色,用來協調所有 Operator 的行為。從 Operator 的角度來看,一次 checkpoint 有三個階段,初始化–>生成快照–>完成/廢棄 checkpoint。從Coordinator的角度來看,需要定時觸發 checkpoint,以及在所有 Operator 完成快照後,觸發 complete 通知。(參考附錄1)
接下來介紹 Flink 中的 Operator 是如何藉助事務和 checkpoint 機制來保障 Exactly-Once,Operator 的完整執行需要經過 initial、writeData、snapshot、commit 和 close 階段。
initial階段:
-
從快照中取出上次任務執行時持久化的 xid 記錄。快照中主要儲存兩種 xid,一組是未完成 snapshot 階段的 xid,一組是已經完成了 snapshot 的 xid。
-
接下來對上次未完成 snapshot 的 xid 進行 rollback 操作;對上次已經完成了 snapshot 但 commit 未成功的 xid 進行 commit 重試操作。
-
若上述操作失敗,則任務初始化失敗,任務中止,進入 close 階段;若上述操作成功,則繼續。
-
建立一個新的唯一的 xid,作為本次事務ID,將其記錄到快照中。
-
使用新生成的 xid,呼叫 JDBC 提供的 start() 介面。
writeData階段:
-
事務開啟後,進入寫資料的階段,Operator 的大部分時間都會處於這個階段。在與 ClickHouse 的互動中,此階段為呼叫 JDBC 提供的 preparedStatement 的 addBatch() 和 executeBatch() 介面,每次寫資料時都會在報文中攜帶當前 xid。
-
在寫資料階段,首先將資料寫到 Operator 記憶體中,向 ClickHouse 提交記憶體中的批次資料有三種觸發方式:記憶體中的資料條數達到batchsize的閾值;後臺定時執行緒每隔一段時間觸發自動flush;在 snapshot 階段呼叫end() 和 prepare() 介面之前會呼叫flush清空快取。
snapshot階段:
-
當前事務會呼叫 end() 和 prepare() 介面,等待 commit,並更新快照中的狀態。
-
接下來,會開啟一個新的事務,作為本 Task 的下一次 xid,將新事務記錄到快照中,並呼叫 JDBC 提供的start() 介面開啟新事務。
-
將快照持久化儲存。
complete階段:
在所有 Operator 的 snapshot 階段全部正常完成後,Coordinator 會通知所有 Operator 對已經成功的checkpoint 進行 complete 操作,在與 ClickHouse 的互動中,此階段為 Operator 呼叫 JDBC 提供的 commit() 介面對事務進行提交。
close階段:
-
若當前事務尚未進行到 snapshot 階段,則對當前事務進行 rollback 操作。
-
關閉所有資源。
從上述流程可以總結出,Flink 透過 checkpoint 和事務機制,將上游資料按 checkpoint 週期分割成批,保障每一批資料在全部寫入完成後,再由 Coordinator 通知所有 Operator 共同完成 commit 操作。當有 Operator 寫入失敗時,將會退回到上次成功的 checkpoint 的狀態,並根據快照記錄的 xid 對這一批 checkpoint 的所有 xid 進行 rollback 操作。在有 commit 操作失敗時,將會重試 commit 操作,仍然失敗將會交由人工介入處理。
三 技術方案
1 整體方案
根據 Flink 和 ClickHouse 的寫入機制,可以描繪出一個Flink 到 ClickHouse 的事務寫入的時序圖(如圖-3)。由於寫的是 ClickHouse 的本地表,並且事務的統一提交由 Coordinator 保障,因此 ClickHouse 無需實現 XA 規範中標準的分散式事務,只需實現兩階段提交協議中的少數關鍵介面,其他介面在 JDBC 側進行預設即可。

圖-3 Flink到ClickHouse事務寫入的時序圖
2 ClickHouse-Server
狀態機
為了實現 ClickHouse 的事務,我們首先定義一下所要實現的事務允許的幾種操作:
-
Begin:開啟一個事務。 -
Write Data:在一個事務內寫資料。 -
Commit:提交一個事務。 -
Rollback:回滾一個未提交的事務。 事務狀態: -
Unknown:事務未開啟,此時執行任何操作都是非法的。 -
Initialized:事務已開啟,此時允許所有操作。 -
Committing:事務正在被提交,不再允許 Begin/Write Data 兩種操作。 -
Committed:事務已經被提交,不再允許任何操作。 -
Aborting:事務正在被回滾,不再允許任何操作。 -
Aborted:事務已經被回滾,不再允許任何操作。
完整的狀態機如下圖-4所示:

圖-4 ClickHouse Server支援事務的狀態機
圖中所有操作均是冪等的。其中,Committing 到 Committed 和 Aborting 到 Aborted 是不需要執行任何操作的,在開始執行 Commit 或 Rollback 時,事務的狀態即轉成 Committing 或 Aborting;在執行完 Commit 或 Rollback 之後,事務的狀態會被設定成 Committed 或 Aborted。
事務處理
Client 透過 HTTP Restful API 訪問 ClickHouse Server,Client 與 ClickHouse Server 間一次完整事務的互動過程如圖-5所示:

圖-5 Clickhouse事務處理的時序圖
正常流程:
-
Client 向 ClickHouse 叢集任意一個 ClickHouse Server 傳送 Begin Transaction 請求,並攜帶由 Client 生成的全域性唯一的 Transaction ID。ClickHouse Server 收到 Begin Transaction 請求時,會向 Zookeeper 註冊該Transaction ID(包括建立 Transaction ID 及子 Znode 節點),並初始化該 Transaction 的狀態為 Initialized。
-
Client 接收到 Begin Transaction 成功響應時,可以開始寫入資料。當 ClickHouse Server 收到來自 Client 傳送的資料時,會生成臨時 data part,但不會將其轉為正式 data part,ClickHouse Server 會將寫入的臨時 data part 資訊,以 JSON 的形式,記錄到 Zookeeper 上該 Transaction 的資訊中。
-
Client 完成資料的寫入後,會向 ClickHouse Server 傳送 Commit Transaction 請求。ClickHouse Server 在收到 Commit Transaction 請求後,根據 ZooKeeper 上對應的Transaction的 data part 資訊,將 ClickHouse Server 本地臨時 data part 資料轉為正式的 data part 資料,並更新Transaction 狀態為Committed。Rollback 的過程與 Commit 類似。
異常處理:
-
如果建立 Transaction ID 過程中發現 Zookeeper 中已經存在相同 Transaction ID,根據 Zookeeper 中記錄的 Transaction 狀態進行處理:如果狀態是 Unknown 則繼續進行處理;如果狀態是 Initialized則直接返回;否則會拋異常。
-
目前實現的事務還不支援分散式事務,只支援單機事務,所以 Client 只能往記錄該 Transaction ID 的 ClickHouse Server 節點寫資料,如果 ClickHouse Server 接收到到非該節點事務的資料,ClickHouse Server 會直接返回錯誤資訊。
-
與寫入資料不同,如果 Commit 階段 Client 向未記錄該 Transaction ID 的 ClickHouse Server 傳送了 Commit Transaction 請求,ClickHouse Server 不會返回錯誤資訊,而是返回記錄該 Transaction ID 的 ClickHouse Server 地址給 Client,讓 Client 端重定向到正確的 ClickHouse Server。Rollback 的過程與 Commit 類似。
3 ClickHouse-JDBC
根據 XA 規範,完整的分散式事務機制需要實現大量的標準介面(參考附錄2)。在本設計中,實際上只需要實現少量關鍵介面,因此,採用了基於組合的介面卡模式,向 Flink 提供基於標準 XA 介面的 XAResource 實現,同時對 ClickHouse Server 遮蔽了不需要支援的介面。
對於 XADataSource 的實現,採用了基於繼承的介面卡模式,並針對 Exactly-Once 的特性,修改了部分預設配置,如傳送失敗的重試次數等引數。
另外,在生產環境中,通常不會透過分散式表,而是透過 SLB 進行資料寫入時的負載均衡。在 Exactly-Once 場景中,Flink 側的 Task 需要保持針對某一 ClickHouse Server 節點的連線,因此不能使用 SLB 的方式進行負載均衡。針對這一問題,我們借鑑了 BalanceClickHouseDataSource 的思路,透過在 URL 中配置多個IP,並在 properties 配置中將 write_mode 設定為 Random ,可以使 XADataSource 在保障 Exactly-Once 的同時,具有負載均衡的能力。
4 Flink-Connector-ClickHouse
Flink 作為一個流式資料處理引擎,支援向多種資料接收端寫入的能力,每種接收端都需要實現特定的Connector。針對 Exactly-Once,ClickHouse Connector 增加了對於 XADataSource 的選項配置,根據客戶端的配置提供 Exactly-Once 功能。
四 測試結果
1 ClickHouse事務效能測試
-
寫入 ClickHouse 單批次資料量和總批次相同,Client端併發寫執行緒不同效能比較。
由圖-6可以看出,無論ClickHouse 是否開啟事務, ClickHouse 的吞吐量都與 Client 端併發寫的執行緒數成正比。開啟事務時,ClickHouse中臨時 data part 不會立刻被轉為正式 data part,所以在事務完成前大量臨時 data part 不會參與 ClickHouse merge 過程,降低磁碟IO對寫效能的影響,所以開啟事務寫效能較未開啟事務寫效能更好;但事務內包含的批次變多,臨時 data part 在磁碟上的增多導致了合併時 CPU 的壓力增大,從而影響了寫入的效能,開啟事務的寫效能也會降低。

圖-6 ClickHouse寫入效能壓測(一)
-
寫入 ClickHouse 總批次 和 Client 端併發寫執行緒相同,單批次寫入 ClickHouse 資料量不同效能比較。
由圖-7可以看出,無論ClickHouse 是否開啟事務, ClickHouse 的吞吐量都與單批次資料量大小成正比。開啟事務時,每批次資料越小,ClickHouse 的吞吐量受事務是否開啟的影響就越大,這是因為每批次寫入的時間在事務處理的佔比較小,事務會對此產生一定的影響,因此,一次事務包含的批次數量越多,越能夠減少事務對寫入效能的影響;當事務包含批次的增大,事務處理時間在寫入中的佔比逐漸降低,ClickHouse merge 產生的影響越來越大,從而影響了寫入的效能,開啟事務較不開啟事務寫效能更好。

圖-7 ClickHouse寫入效能壓測(二)
-
總體來說,開啟事務對寫入效能幾乎沒有影響,這個結論是符合我們預期的。
2 Flink寫入ClickHouse效能比較
-
對於相同資料量和不同 checkpoint 週期,Flink 寫入 ClickHouse 總耗時如圖-8所示。可以看出,checkpoint 週期對於不開啟 Exactly-Once 的任務耗時沒有影響。對於開啟 Exactly-Once 的任務,在5s 到60s的範圍內,耗時呈現一個先降低後增長的趨勢。原因是在 checkpoint 週期較短時,開啟 Exactly-Once 的 Operator 與 Clickhouse 之間有關事務的互動過於頻繁;在 checkpoint 週期較長時,開啟 Exactly-Once 的 Operator 需要等待 checkpoint 週期結束才能提交最後一次事務,使資料可見。在本測試中,checkpoint週期數據僅作為一個參考,生產環境中,需要根據機器規格和資料寫入速度進行調整。
-
總體來說,Flink寫入Clickhouse時開啟 Exactly-Once 特性,效能會稍有影響,這個結論是符合我們預期的。

圖-8 Flink寫入ClickHouse測試
五 未來規劃
該版本 EMR ClickHouse 實現的事務還不是很完善,只支援單機事務,不支援分散式事務。分散式系統一般都是透過 Meta Server 來做統一元資料管理來支援分散式事務機制。當前我們也正在規劃設計 ClickHouse MetaServer 來支援分散式事務,同時可以移除 ClickHouse 對 ZooKeeper 的依賴。
附錄
https://flink.apache.org/2020/10/15/from-aligned-to-unaligned-checkpoints-part-1.html
https://pubs.opengroup.org/onlinepubs/009680699/toc.pdf
《SaaS模式雲原生資料倉庫應用場景實踐》電子書重磅來襲!
啟用資料生產力,讓分析產生價值!
點選閱讀原文檢視詳情!
關鍵詞
系統
階段
分散式事務
機制
磁碟