
一 業務背景


二 離線 SQL 模式存在的問題
-
1.對賬問題定位困難,核算小二主要透過下載分錄及對應的業務單據彙總資料進行對賬,如果某一分錄和業務資料有出入,只能逐一業務要素分析,由於缺乏透過分錄精確追溯到關聯業務單據的下鑽能力,問題定位耗時較長,造成這一問題的主要原因在於透過離線SQL實現的原始加工邏輯無法精確的建立業務單據和原始憑證的關聯關係。
-
2.日常運維困難,隨著業務的不斷發展,業務接入離線任務在不斷的膨脹,最終成為一個橫跨4個專案空間,150+離線任務、100+離線表的工程,任一節點的錯誤都會造成月結資料出錯。
-
3.行業實施效率較低,每次新接入行業都需要開發小二新建一套離線表+離線任務,相應的也造成運維問題的持續惡化。
三 為什麼選擇Spark
1 核心訴求
-
1.核算規則(業務接入/記賬/拋賬)可配、可視,不存在黑盒的加工邏輯,加工流程對核算小二全透明(提升實施+對賬效率)
-
2.建立整個核算鏈路單據維度的關聯關係(業務單據<->原始憑證<->記賬憑證<->拋賬憑證),具備雙向的單據追溯能力(提升對賬效率)
2 Spark VS MapReduce
|
|
|
|
|
|
|
|
|
|
|
|

3 程式設計模式優勢: RDD + DataFrame 的程式設計模式
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schemais encoded in a string
val schemaString = "name age"
// Generate the schema based on the stringofschema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convertrecordsof the RDD (people) toRows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schemato the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporaryviewusing the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporaryview created using DataFrames
val results = spark.sql("SELECT name FROM people")
四 spark基礎介紹
1 基礎概念
-
Rdd(Resilient distributed dataset):不可變的彈性分散式資料集(不可變性似於docker中的只讀映象層),只能透過其他的transformation運算元建立新的RDD。 -
Operations:運算元,spark包括兩類運算元,transformation(轉換運算元,透過對前置rdd的處理生成新的rdd)/action(觸發spark job的拆分及執行,負責將rdd輸出)。
-
Task:執行器執行的任務單元,一般基於當前rdd的分割槽數量拆分。
-
Job:包含多個task的集合,基於Action運算元拆分。
-
Stage:基於當前rdd處理邏輯的寬窄依賴拆分,spark中非常重要的概念,stage的切換會涉及到IO。
-
Narrow/Wide dependencies:參考下圖,區分的重要依據在於父節點是否會被多個子節點使用。

2 Spark on MaxCompute(ODPS)
-
Spark 有很多的後端的 Runtime,例如其商業化公司的Databricks Runtime, 彈內我們使用的是 AliSpark,是集團的適配 MaxComputer,同時在離線互動是使用了 Cupid-SDK 的 Client模式,這個模式不是獨立叢集的模式,類Serveless 模式,整體的成本上比獨立叢集要低,當然資源保障上沒有獨立叢集好。
-
集團client模式將spark session作為服務提供,可以方便地與線上系統互動,包括任務的提交、關閉、例項的關閉等;
-
在使用集團提供的spark能力時,比較麻煩的在於如何方便的檢視日誌,從我們的實踐看主要有以下2個路徑;
-
申請odps對應專案空間的logview許可權,可以直接在https://logview.alibaba-inc.com/中基於sparkInstanceId定位到具體的日誌;
-
藉助odps client+提交spark任務時返回的例項ID獲取log地址,程式碼參考如下:
//instanceIdd對應odps client中的lookupName
Account account = new AliyunAccount(sparkSessionConfig.getAccessId(), sparkSessionConfig.getAccessKey());
Odps odps = new Odps(account);
odps.setEndpoint(sparkSessionConfig.getEndPoint());
odps.setDefaultProject(sparkSessionConfig.getNamespace());
//日誌地址目前設定有效期為7*24小時
try {
return odps.logview().generateLogView(odps.instances().get(sparkInstanceId), 7 * 24L);
} catch (OdpsException e) {
LOGGER.error("生成logView地址失敗,config:{},instanceId:{},e:{}", sparkSessionConfig, sparkInstanceId, e);
}
五 技術方案
1 整體方案


-
spark任務管理:負責spark任務相關生命週期的管理,承接核算任務和spark session之間的互動;
-
spark session管理:負責spark例項的建立、銷燬、job提交等,另外針對不同型別的session,支援自定義所需資源,包括例項worker數量、分割槽大小等,主要與spark on odps互動;
-
核算任務管理:負責業務接入、記賬、拋賬等核算任務的生命週期管理;
-
spark job版本管理:spark任務所需jar包會不斷的迭代,針對不同的核算場景可以定製所需的job版本;

|
|
|
|
|
|
|
|
|
|
2 資料處理流程

六 運維及調優
1 資料量評估
-
spark.hadoop.odps.input.split.size:用於設定spark讀取odps離線表的分割槽大小,預設為256M,在實踐過程中需要結合當前分割槽的大小進行調整,比如當前分割槽大小為1GB,那麼預設情況下會拆分為4個分割槽 ;
-
spark讀寫lindorm(類hbase)的分割槽數主要受到region數量的影響,在供應鏈核算系統的實踐中,由於初始region數量較少,導致分割槽數量很少,spark執行效率很差,,針對此問題我們實踐了兩種處理策略 ;
-
1.進行重分割槽(repartition運算元):針對資料傾斜進行重新分割槽,但是會拆分stage,觸發shuffle,增加額外的IO成本。
-
2.lindorm進行預分割槽,比如預分割槽為128個region,但此種實現方案需要結合rowkey的設計一起使用,會影響到scan的效率。
2 程式碼邏輯相關job/stage/task評估
-
慎用效率角度的運算元,比如groupBy -
儘量減少stage數量
3 計算儲存資源評估
-
spark.executor.instances executor:設定當前例項的worker數量 ;
-
spark.executor.cores:核數,每個Executor中的可同時執行的task數目 ;
-
spark.executor.memory:executor記憶體 ;
4 其他引數
5 Spark UI


6 互動式開發測試
Web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala, Python, R and more.

7 效果
七 總結
參考文件
Elasticsearch實戰進階營
關鍵詞
任務
問題
問題
過程中
例項