300秒到4秒,如何將MySQL批次寫入的耗時縮短99%?

👉 這是一個或許對你有用的社群
🐱 一對一交流/面試小冊/簡歷最佳化/求職解惑,歡迎加入芋道快速開發平臺知識星球。下面是星球提供的部分資料:
👉這是一個或許對你有用的開源專案
國產 Star 破 10w+ 的開源專案,前端包括管理後臺 + 微信小程式,後端支援單體和微服務架構。
功能涵蓋 RBAC 許可權、SaaS 多租戶、資料許可權、商城、支付、工作流、大屏報表、微信公眾號、ERPCRMAI 大模型等等功能:
  • Boot 多模組架構:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 微服務架構:https://gitee.com/zhijiantianya/yudao-cloud
  • 影片教程:https://doc.iocoder.cn
【國內首批】支援 JDK 17/21 + SpringBoot 3.3、JDK 8/11 + Spring Boot 2.7 雙版本 

最近碰到一個場景,從 XML 檔案匯入 6 萬多條資料到 MySQL 中。需求並不複雜,基於 XML 檔案和 xlsx 檔案的相似性,其實這就是一個老生常談的資料匯入問題。
本文將介紹我如何將匯入操作耗時從 300 秒最佳化到 4 秒。

程式碼執行的環境

Java 程式碼在筆記本上執行,MySQL 在區域網內的虛擬機器上。
筆記本配置八核 i9 2.3 GHz,16 GB 記憶體,最近氣溫偏高,CPU 存在降頻現象,對執行耗時有一定影響。
MySQL 資料庫執行在 VirtualBox 虛擬機器內。虛擬機器分配了 4 核 4 GB 記憶體,但宿主機硬體效能比較羸弱,導致 MySQL 寫入耗時較長。
JDK 採用 21 版本,MySQL 採用 8.0 版本。
在這個環境配置下,從 XML 檔案中讀取一條資料耗時 0.08 秒,向 MySQL 匯入一條資料耗時 0.5 秒。
基於 Spring Boot + MyBatis Plus + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 影片教程:https://doc.iocoder.cn/video/

基礎實現的效能

基礎實現就是過程式處理方式。
voidimportData()

{

    Document doc = 解析 XML 檔案,獲取 Document 物件;

    List<Product> products = 從 Document 物件中提取資料,返回集合;

    遍歷 products 集合,逐條插入 MySQL;

}

基礎程式碼比較簡單,就不詳細展示了。本地測試整個流程需要 300 秒。
其中 parse-xml 和 build-producs 可以合併,統一看作解析 XML 檔案,相對於寫入 MySQL 的耗時,這部分簡直微不足道。
對應記憶體佔用為峰值 656.6 MB。
總之,300 秒和 656 MB,就是最初的起點。旅途就此開始。
基於 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/yudao-cloud
  • 影片教程:https://doc.iocoder.cn/video/

從哪個方向開始最佳化?

很明顯,最佳化 MySQL 寫入效能是目前最具價效比的方向,那長達 298.3 秒的耗時簡直就是一片尚未開採的富礦,蘊藏著極大的最佳化空間。
對於寫入的最佳化通常有兩個方向:寫聚合和非同步寫。單次寫入操作有一定成本,寫聚合是指在一次寫操作裡儘可能多地寫入資料,透過減少操作次數來降低成本。非同步寫是指非同步進行寫入過程的耗時操作,引入佇列作為中轉容器,透過減少單次操作的成本來降低總體的成本。
寫聚合是分批次寫入,單批次資料 b 越多,節約的時間成本也越多,但批次太大也會帶來記憶體和頻寬上的開銷,需要均衡取捨。同時,對於流式資料來源,寫聚合需要湊齊一批資料統一操作,即時性不如逐條寫入。
非同步寫是將耗時操作移出操作流程,從資料來源角度看,總時間成本有所降低。但從系統的角度看,時間成本 = N x (t' + t2),t' 與 t2 之和通常大於 t,總時間成本反而有所增加。
非同步寫的意義在於可以對流量削峰,透過生產者消費者模型,讓消費端可以平滑地處理資料。此外,增加消費者數量,也能透過併發處理的方式來縮短 t2,從而提升系統時間成本。
寫聚合和非同步寫可以組合使用,更進一步縮短時間,提升效能。

開啟 MySQL 批處理

對於資料庫寫入操作,最典型的寫聚合莫過於批次處理。單次寫入的成本包括網路傳輸的成本和資料庫程序寫資料的成本,透過批處理,可以節約大量網路傳輸成本。
MySQL 本身支援一次請求中包含多條 SQL 語句,JDBC 提供對應的批處理 Batch API。

Connection connection = ...;

// PreparedStatement 可以快取 SQL 語句,避免多次編譯,提高 SQL 語句執行效能

PreparedStatement statement = connection.prepareStatement(sql);

connection.setAutoCommit(

false

); 

// 關閉自動提交事務,以批為單位執行事務
for

 (

int

 i = 

0

; i < batchSize; i++) {

    statement.setString(...);

// ...
// 加入當前批次

    statement.addBatch();

}

// 統一執行命令

statement.executeBatch();

statement.clearBatch();

connection.commit();

// close 相關

主要涉及三個方法:
  • statement.addbatch() ,不立即執行 SQL,將資料新增進當前批,稍後一起執行。
  • statement.executeBatch() ,批次提交 SQL,交給資料庫統一執行。
  • statement.clearBatch() ,清空當前批快取的 SQL,回收記憶體。
透過多次 addBatch,然後統一 executeBatch,這就是 JDBC 提供的批處理方式。
但是,此處有一個坑,必須開啟 rewriteBatchedStatements=true 才能讓 JDBC 的 Batch API 生效,否則仍然是以逐條 SQL 的方式執行。對此在 JDBC 連線中新增 &rewriteBatchedStatements=true 選項即可。
rewriteBatchedStatements 用於將包含 INSERT 和 REPLACE 的 SQL 語句合併,比如將多條 insert into t1(c1, c2, c3) values (?, ?, ?) 語句合併到一條語句中,insert into t1(c1, c2, c3) values (?, ?, ?), (?, ?, ?), ...,透過將多條 SQL 合併為一條 SQL 可以提高效率。但我使用這個選項主要是為了開啟批處理,重寫只是附帶的功能。
另一個需要注意的地方是,MySQL 對單次請求的包大小有限制,注意 batchSize 不要太大導致包體積超過上限。透過 max_allowed_packet 可以調整包體積上限,具體可以參考官方文件。
開啟 MySQL 批處理後,立竿見影,MySQL 寫入耗時降到了 9 秒!
記憶體開銷比較穩定,相較於之前並沒有增加。
現在的成績是 12 秒 673 MB,顯著的進步!
為什麼一定要開啟 rewriteBatchedStatements?
答案在 JDBC 程式碼中。
我是用的 MySQL 驅動為 8.3.0 版本。在com.mysql.cj.jdbc.ClientPreparedStatement 類中定義了批處理邏輯。
程式碼中限制了只有開啟 rewriteBatchedStatements 才能使用批處理功能。
重複匯入的問題
由於是匯入資料的場景,可能遇到需要重新匯入的情況。重複匯入時,如何處理已經存在的資料,有不同做法。
可以在匯入前將目標表的資料刪除,然後以新匯入資料為準。ETL 流程中的臨時表常用這種方式。
可以增加一個步驟,區分出新增資料和更新資料,然後分別執行更新和新增,內部仍然可以批處理。
還可以使用 INSERT ... ON DUPLICATE KEY UPDATE 語句兼顧更新和新增兩種情況,保證操作的冪等性。
INSERTINTO

 t1 (a,b,c) 

VALUES

 (

1

,

2

,

3

),(

4

,

5

,

6

)

ONDUPLICATEKEYUPDATE

 a=

VALUES

(a), b=

VALUES

(b), c=

VALUES

(c);

MySQL 還有另一個 REPLACE INTO 語句,功能相同,特殊之處在於 ORACLE 和 PostgreSQL 也支援 REPLACE INTO,在涉及多種資料庫時有利於統一程式碼。

開啟多執行緒寫入

透過開啟資料庫批處理,我們已經大幅度縮短了耗時,其實已經適合大多數場景的需求了。但還有進一步提升的空間。
透過非同步寫的方式,可以縮短請求方的等待時間,且可以根據資料量動態調整消費者的數量,這樣能在效能和成本之間達到動態均衡。
我使用 Disruptor 併發佇列來實現非同步寫。解析 XML 檔案後,將資料入隊 Disruptor 提供的佇列,並開啟了 4 個消費者進行消費。
var

 disruptor = 

new

 Disruptor<>(

        ProductEvent::

new

,

16384

,

        DaemonThreadFactory.INSTANCE,

        ProducerType.SINGLE,

new

 BusySpinWaitStrategy());

var

 consumers = 

new

 SaveDbHandler[

4

];

for

 (

int

 i = 

0

; i < consumers.length; i++) {

    consumers[i] = 

new

 SaveDbHandler(i, consumers.length, shutdownLatch);

}

disruptor.handleEventsWith(

new

 SimpleBatchRewindStrategy(), consumers)

        .then(

new

 ClearingEventHandler());

var

 ringBuffer = disruptor.start();

// 透過 ringBuffer 佇列釋出資料
for

 (

var

 it = document.getRootElement().elementIterator(); it.hasNext(); ) {

var

 element = it.next();

if

 (!StringUtils.hasText(element.elementTextTrim(

"id"

))) {

continue

;

    }

var

 product = ObjectMapper.buildProduct(element);

    ringBuffer.publishEvent((event, sequence, buffer) -> event.setProduct(product));

}

Disruptor 4.0 之後,去掉了 worker 相關的 API,一切統一以 EventHandler 來處理。同一資料會被註冊到 Disruptor 的所有 EventHandler 獲取到,類似廣播模式。而我們的場景需要的是爭搶模式,為了保證每個消費者只消費自己相關的資料,可以在獲取資料時進行判斷。
具體做法是為每個 EventHandler 分配一個從 0 開始的序號,每條資料帶有一個序號,EventHandler 透過對序號取模的方式判斷是否應該丟棄。
@Override
publicvoidonEvent(ProductEvent event, long sequence, boolean endOfBatch)throws Exception, RewindableException 

{

if

 (sequence % numberOfConsumers != ordinal) {

return

;

    }

try

 {

// 設定 statement
// ...

        statement.addBatch();

if

 (endOfBatch) {

            statement.executeBatch();

            statement.clearBatch();

            connection.commit();

        }

    } 

catch

 (SQLException e) {

        log.error(

"[{}] handler error, sequence: {}, endOfBatch:{}"

, ordinal, sequence, endOfBatch, e);

try

 {

if

 (connection != 

null

)

                connection.rollback();

        } 

catch

 (SQLException se2) {

            log.error(

"rollback error"

, se2);

        }

    }

}

Disruptor 4.0 還提供了重新消費的功能。在 Disruptor 內部會對資料分批次,同一批次內的資料支援從頭開始重新消費。具體做法是實現 RewindableEventHandler 介面,在需要重新消費時,丟擲 RewindableException
對於批次寫資料庫的場景,rewind 機制可以用在資料庫出現異常的情況,先回滾事務,然後丟擲 RewindableException 通知 Disruptor 重傳當前批次資料。
透過開啟多執行緒非同步寫入,將耗時從 12 秒降到了 4.5 秒。雖然不如批處理的效果明顯,但以 12 秒為基準,也縮短了 60% 的時間。
記憶體方面,由於開啟了多執行緒,每個執行緒會帶來一定的開銷,因此峰值記憶體提升了不少,能否接受就看具體需求場景。
最終成績,4 秒 1 GB。

進一步的最佳化方向

最佳化 XML 解析
我測試的資料量並不大,解析 XML 檔案並不是瓶頸。大資料量時,存在一個性能隱患:大量物件帶來的記憶體壓力。解決方案也並不複雜,可以採用事件模型來解析 XML 檔案。SAX 解析器基於事件驅動,逐行讀取 XML,解析為多種事件,開發者可以實現事件處理介面來處理事件。
SAX 的優點在於記憶體消耗小,適合處理大型 XML 文件。我們可以將事件處理介面作為生產者,獲取 Element 並交給 Disruptor 分發。事件模型下每個事件物件存活週期很短,可以節約記憶體開銷。
最佳化 Disruptor 記憶體佔用
Disruptor 的 batchSize 越大,理論上效能也越高。我將示例程式碼的 batchSize 從 1024 提升到 16384 後,整體時間能縮短到 3.5 秒。但作為利刃的另一面,大 batch 會導致 Ring Buffer 的體積增大。
Ring Buffer 的大小 = Event 物件記憶體佔用 x batchSize。Disruptor 提供了提前釋放記憶體來最佳化記憶體使用的例子:Early release。
最佳化 MySQL 的寫效能
MySQL 的寫入最佳化是一個比較複雜的問題,涉及因素很多。通常的做法是增大 log_buffer(innodb-log-buffer-size)Buffer Pool(innodb_buffer_pool_size),調整 innodb_flush_log_at_trx_commit 來控制日誌刷盤時機。
這次批次寫入最佳化實踐的經歷對我而言挺有成就感,因此分享出來。我瞭解了 JDBC Batch 的概念,也學習了 Disruptor 的使用。這種最佳化方案比較適合批次匯入資料的場景。按照慣例,相關程式碼已經上傳 GitHub。
https://github.com/xioshe/xml-to-mysql

歡迎加入我的知識星球,全面提升技術能力。
👉 加入方式,長按”或“掃描”下方二維碼噢
星球的內容包括:專案實戰、面試招聘、原始碼解析、學習路線。
文章有幫助的話,在看,轉發吧。
謝謝支援喲 (*^__^*)

相關文章