自從系統引入Disruptor後,效能起飛!

👉 這是一個或許對你有用的社群
🐱 一對一交流/面試小冊/簡歷最佳化/求職解惑,歡迎加入芋道快速開發平臺知識星球。下面是星球提供的部分資料:
👉這是一個或許對你有用的開源專案
國產 Star 破 10w+ 的開源專案,前端包括管理後臺 + 微信小程式,後端支援單體和微服務架構。
功能涵蓋 RBAC 許可權、SaaS 多租戶、資料許可權、商城、支付、工作流、大屏報表、微信公眾號、ERPCRMAI 大模型等等功能:
  • Boot 多模組架構:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 微服務架構:https://gitee.com/zhijiantianya/yudao-cloud
  • 影片教程:https://doc.iocoder.cn
【國內首批】支援 JDK 17/21 + SpringBoot 3.3、JDK 8/11 + Spring Boot 2.7 雙版本 

Disruptor 是一個很受歡迎的記憶體訊息佇列,它源於 LMAX 對併發、效能和非阻塞演算法的研究。今天一起來學習一下這個訊息佇列。

簡介

對於主流的分散式訊息佇列來說,一般會包含 Producer、Broker、Consumer、註冊中心等模組。比如 RocketMQ 架構如下:
Disruptor 並不是分散式訊息佇列,它是一款記憶體訊息佇列,因此架構上跟分散式訊息佇列有很大差別。下面是一張 LMAX 使用 Disruptor 的案例圖:
我們介紹一下 Disruptor 架構中的核心概念。

1.1 Ring Buffer

Ring Buffer 通常被認為是 Disruptor 的最主要的設計,但是從 3.0 版本開始,Ring Buffer 只負責儲存和更新經過 Disruptor 的資料。在一些高階的使用場景,它甚至完全可以被使用者替換。

1.2 Sequence

Disruptor 使用 Sequence 來識別特定元件的位置。每個 Consumer(也就是事件處理器)都像 Disruptor 一樣持有一個 Sequence。併發相關的核心程式碼依賴 Sequence 的自增值,因此 Sequence 具有跟 AtomicLong 相似的特性,事實上唯一的不同就是不同的 Sequence 之間不存在偽共享問題。
偽共享:CPU 快取是以快取行為單位進行載入和儲存,CPU 每次從主存中拉取資料時,會把相鄰的資料也存入同一個快取行。即使多個執行緒操作的是同一快取行中不同的變數,只要有一個執行緒修改了快取行中的某一個變數值,該快取行就會被標記為無效,需要重新從主從中載入。在多執行緒環境下,頻繁地重新載入快取行,會嚴重影響了程式執行效率。

1.3 Sequencer

Sequencer 是 Disrupter 的真正核心,有單個生產者和多個生產者兩種實現(SingleProducerSequencer 和 MultiProducerSequencer)。為了讓資料在生產者和消費者之間快速、準確地傳輸,它們都實現了所有併發演算法。

1.4 Sequence Barrier

Sequencer 生成一個 Sequence Barrier,它包含由 Sequencer 生成的 Sequence 和消費者擁有的 Sequence 的引用。Sequence Barrier 決定是否有事件給消費者處理。

1.5 Wait Strategy

消費者怎樣等待事件的到來。

1.6 Event Processor

主要負責迴圈處理來自 Disruptor 事件,它擁有消費者 Sequence 的所有權。有一個單獨的實現類 BatchEventProcessor,這個類擁有高效的事件迴圈處理能力並且處理完成後可以回撥實現 EventHandler 介面的使用者。

1.7 Event Handler

由使用者來實現並且代表 Disruptor 消費者的介面。
基於 Spring Boot + MyBatis Plus + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 影片教程:https://doc.iocoder.cn/video/

2 Disruptor 特性

2.1 多播事件

多播事件是 Disruptor 區別於其他佇列的最大差異。其他佇列都是一個事件訊息只能被單個消費者消費,而 Disruptor 如果有多個消費者監聽,則可以將所有事件訊息傳送給所有消費者。
在前面 LMAX 使用 Disruptor 的案例圖中,有 JournalConsumer、ReplicationConsumer 和 ApplicationConsumer 三個消費者監聽了 Disruptor,這三個消費者將收到來了 Disruptor 的所有訊息。

2.2 消費者依賴關係圖

為了支援併發處理在實際業務場景中的需要,有時消費者直接需要做協調。再回到前面 LMAX 使用 Disruptor 的案例,在 journalling 和 replication 這兩個消費者處理完成之前,有必要阻止業務邏輯消費者開始處理。我們稱這個特徵為“gating”(或者更準確地說,該特徵是“gating”的一種形式)。
首先,確保生產者數量不會超過消費者。這透過呼叫 RingBuffer.addGatingConsumers()來將相關消費者新增到 Disruptor。其次,消費者依賴關係的實現是透過構建一個 SequenceBarrier,SequenceBarrier 擁有需要在它前面完成處理邏輯的消費者的 Sequence。
就拿前面 LMAX 使用 Disruptor 的案例來說,ApplicationConsumer 的 SequenceBarrier 擁有 JournalConsumer 和 ReplicationConsumer 這 2 個消費者的 Sequence,所以 ApplicationConsumer 對 JournalConsumer 和 ReplicationConsumer 的依賴關係可以從 SequenceBarrier 到 Sequence 的連線中看到。
Sequencer 和下游消費者的關係也需要注意。Sequencer 的一個角色就是釋出的事件訊息不能超出 Ring Buffer。這就要求下游消費者的 Sequence 不能小於 Ring Buffer 的 Sequence,也不能小於 Ring Buffer 的大小。
上面圖中,因為 ApplicationConsumer 的 Sequence 必須要保證小於等於 JournalConsumer 和 ReplicationConsumer 的 Sequence,因此 Sequencer 只需要關心 ApplicationConsumer 的 Sequence。

2.3 記憶體預分配

Disruptor 的目標是低延遲,因此減少或者去除記憶體分配是必要的。在基於 Java 的系統中,目標是減少 STW 次數。
為了支援這一點,使用者可以在 Disruptor 中預分配事件所需的記憶體。在預分配記憶體時,使用者提供的 EventFactory 將對 Ring Buffer 的所有元素進行呼叫。當生產者向 Disruptor 傳送新的事件訊息時,Disruptor 的 API 允許使用者使用構造好的物件,他們可以呼叫物件的方法或者更新物件的欄位。Disruptor 需要確保併發安全。

2.4 無鎖併發

Disruptor 實現低延遲的另一個關鍵方法時使用無鎖演算法,透過使用記憶體屏障和 CAS 來實現記憶體可見性和正確性。Disruptor 唯一使用鎖的地方就是在 BlockingWaitStrategy。
基於 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/yudao-cloud
  • 影片教程:https://doc.iocoder.cn/video/

3 調優選項

雖然大多數場景下 Disruptor 可以表現出優秀的效能,但是仍然有一些調優引數可以改進 Disruptor 的效能。

3.1 單個/多個生產者

Disruptor<LongEvent> disruptor = 

new

 Disruptor(

 factory,

 bufferSize,

 DaemonThreadFactory.INSTANCE,

 ProducerType.SINGLE, 

new

 BlockingWaitStrategy() 

);

上面是 disruptor 的建構函式,ProducerType.SINGLE 表示建立單生產者的 Sequencer,ProducerType.MULTI  表示建立多生產者的 Sequencer。
在併發系統中提高系統性能的最好方式是遵循單寫原則。下面是官方的一個 disruptor 吞吐量測試結果,測試環境是 i7 Sandy Bridge MacBook Air。
單生產者:
多生產者:

3.2 等待策略

  1. BlockingWaitStrategy
disruptor 的預設等待策略是 BlockingWaitStrategy,這種策略使用鎖和喚醒鎖的 Condition 變數。
  1. SleepingWaitStrategy
跟 BlockingWaitStrategy 策略類似,他是透過 LockSupport.parkNanos(1) 方法來實現等待,不需要給 Condition 變數傳送訊號來喚醒等待。
主要適用於對延時要求不高的場景,比如非同步列印日誌。
  1. YieldingWaitStrategy
YieldingWaitStrategy 策略使用 Busy spin‌(不釋放 CPU 資源,透過迴圈檢查條件直到條件滿足為止)技術來等待 sequence 增長到一個合適的值。在迴圈內部會呼叫 Thread#yield() 方法允許其他排隊執行緒去執行。
這種策略主要用於透過消耗 CPU 來實現低延遲的場景。當 EventHandler 數量訊息邏輯 CPU 核數並且對延遲要求較高時,可以考慮這種等待策略。
  1. BusySpinWaitStrategy
BusySpinWaitStrategy 是效能最高的等待策略,它適用於低延遲系統,但是對部署環境要求很高。
這種等待策略的唯一適用場景是當 EventHandler 數量訊息邏輯 CPU 核數並且超執行緒被停用。

4 官方示例

下面是一個官方示例。這個例子比較簡單,就是生產者向消費者傳送一個 long 型別的值。
  1. 首先定義一個 Event。
publicclassLongEvent

{

privatelong

 value;

publicvoidset(long value)

{

this

.value = value;

    }

@Override
public

 String 

toString()

{

return"LongEvent{"

 + 

"value="

 + value + 

'}'

;

    }

}

  1. 為了能讓 Disruptor 預分配記憶體,這裡定義一個 LongEventFactory。
publicclassLongEventFactoryimplementsEventFactory

<

LongEvent

>

{

@Override
public

 LongEvent 

newInstance()

{

returnnew

 LongEvent();

    }

}

  1. 建立一個消費者來處理事件
publicclassLongEventHandlerimplementsEventHandler

<

LongEvent

>

{

@Override
publicvoidonEvent(LongEvent event, long sequence, boolean endOfBatch)

{

        System.out.println(

"Event: "

 + event);

    }

}

  1. 編寫傳送事件訊息的邏輯
import

 com.lmax.disruptor.dsl.Disruptor;

import

 com.lmax.disruptor.RingBuffer;

import

 com.lmax.disruptor.examples.longevent.LongEvent;

import

 com.lmax.disruptor.util.DaemonThreadFactory;

import

 java.nio.ByteBuffer;

publicclassLongEventMain

{

publicstaticvoidmain(String[] args)throws

 Exception

{

int

 bufferSize = 

1024

        Disruptor<LongEvent> disruptor = 

new

 Disruptor<>(LongEvent::

new

, bufferSize, DaemonThreadFactory.INSTANCE);
        disruptor.handleEventsWith((event, sequence, endOfBatch) ->

                System.out.println(

"Event: "

 + event)); 

        disruptor.start(); 

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); 

        ByteBuffer bb = ByteBuffer.allocate(

8

);

for

 (

long

 l = 

0

true

; l++)

        {

            bb.putLong(

0

, l);

            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(

0

)), bb);

            Thread.sleep(

1000

);

        }

    }

}

5 總結

作為一款高效能的記憶體佇列,Disruptor 有不少優秀的設計思想值得我們學習,比如記憶體預分配、無鎖併發。同時它的使用非常簡單,推薦大家使用。

歡迎加入我的知識星球,全面提升技術能力。
👉 加入方式,長按”或“掃描”下方二維碼噢
星球的內容包括:專案實戰、面試招聘、原始碼解析、學習路線。
文章有幫助的話,在看,轉發吧。
謝謝支援喲 (*^__^*)

相關文章