自己寫一個分散式定時任務框架+負載均衡+OpenAPI非同步呼叫!!

👉 這是一個或許對你有用的社群
🐱 一對一交流/面試小冊/簡歷最佳化/求職解惑,歡迎加入芋道快速開發平臺知識星球。下面是星球提供的部分資料:
👉這是一個或許對你有用的開源專案
國產 Star 破 10w+ 的開源專案,前端包括管理後臺 + 微信小程式,後端支援單體和微服務架構。
功能涵蓋 RBAC 許可權、SaaS 多租戶、資料許可權、商城、支付、工作流、大屏報表、微信公眾號、ERPCRMAI 大模型等等功能:
  • Boot 多模組架構:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 微服務架構:https://gitee.com/zhijiantianya/yudao-cloud
  • 影片教程:https://doc.iocoder.cn
【國內首批】支援 JDK 17/21 + SpringBoot 3.3、JDK 8/11 + Spring Boot 2.7 雙版本 

專案背景

目前的定時任務框架已經很成熟,從QuartZxxl-job,再到近幾年出現的PowerJob,既然有這麼多的好的實現,為什麼還是選擇重寫一個定時任務框架呢?
開發中遇到這樣的場景,業務層面需要頻繁的建立修改定時任務,在考慮分散式的架構下,對於目前可以實現該功能的框架中:
  • MQ的延時佇列無法動態調整任務引數;
  • redis的過期策略需要儲存太久的key且可能會有BigKey
  • xxljob沒有原生的openAPI,其基於資料庫鎖的排程只是實現server的高可用而不是高效能;
  • powerjob的openAPI又是基於http的同步阻塞排程,並且對於server的負載均衡,由於其分組隔離設計,需要開發者手動配置,在高併發下的定時任務操作下,並不能很好的排程server叢集。
主流框架往往為了適配更多的場景,支援足夠多的功能,往往體積大,且不易動態擴充套件,為了對專案有最大的控制,在解決以上業務場景的前提下,進行部分功能的修剪,也希望能更好的從中學習主流框架的設計思想,於是決定重寫一個定時任務框架。
本文章主要介紹該專案相對於目前主流定時任務框架的特性,對於定時任務排程和發現的詳細可以見原始碼,文章末尾也給出了流程圖方便理解
基於 Spring Boot + MyBatis Plus + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 影片教程:https://doc.iocoder.cn/video/

定位

這是一個基於 PowerJob 的重寫和重構版本,修改和擴充套件了原始專案的功能,以更好地適配業務需求。
  • 支援定時任務頻繁建立和任務引數頻繁動態變動的場景(提供輕量API,並使用內建訊息佇列非同步處理)
  • 支援大量定時任務併發執行的場景,實現負載均衡(分組隔離+應用級別的鎖實現)
  • 主要針對小型任務 ,無需過多配置,不對任務例項進行操作
基於 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/yudao-cloud
  • 影片教程:https://doc.iocoder.cn/video/

技術選型

通訊 : gRPC(基於netty的nio)

序列化 :Protobuf編碼格式編解碼

負載均衡 :自己實現的註冊中心NameServer

    |___ 策略 : 服務端最小排程次數策略

    |___ 互動 :pull+push

訊息佇列 : 自己實現的簡易訊息佇列

    |___ 訊息傳送 : 非同步+超時重試

    |___ 持久化 :mmap+同步刷盤策略

    |___ 訊息重試 :多級延時佇列+死信佇列

定時排程 : 時間輪演算法

專案結構

├── LICENSE

├── k-job-common 

// 各元件的公共依賴,開發者無需感知

├── k-job-nameServer 

// server和worker的註冊中心,提供負載均衡

├── k-job-producer 

//普通Jar包,提供 OpenAPI,內建訊息佇列的非同步傳送

├── k-job-server 

// 基於SpringBoot實現的排程伺服器

├── k-job-worker-boot-starter 

// kjob-worker 的 spring-boot-starter ,spring boot 應用可以通用引入該依賴一鍵接入 kjob-server 

├── k-job-worker 

// 普通Jar包,接入kjob-server的應用需要依賴該Jar包

└── pom.xml

特性

負載均衡(解決大量定時任務併發執行場景)

對於worker的負載均衡策略有許多且已經由較好的解決(輪詢,健康值等),但是,我們目前的系統存在大量的定時任務,考慮server層面,可能會存在以下情況:
  • server一次排程從DB中獲取太多工,可能會OOM
  • 發起排程請求是由執行緒池負責,可能會有效能瓶頸,我們的系統對時間是敏感的,對時間精度高要求
  • 我們的OpenAPI同樣也不希望大量請求落在同一個server上
在分散式系統下,解決定時任務併發執行往往考慮server叢集的負載均衡(這裡的負載均衡特指server叢集能夠根據自身負載,動態排程worker叢集),但是對於定時任務框架,需要關注叢集下的任務重複排程問題,目前的定時任務框架大都為了解決該問題而不能較好實現負載均衡。
透過檢視原始碼,xxljob的排程,在每次查詢資料庫獲取任務前,透過資料庫行鎖進行了全域性加鎖,保證同一時刻只有一個server在進行排程來避免重複排程,但是無法發揮叢集server的排程能力
對於powerjob的排程,透過分組隔離機制(詳細可以看官方文件)避免了重複排程,但是同樣帶來了問題:同一app下的worker叢集只能被一臺server排程,如果該server的任務太多了呢?如果只有一個業務對應的app,如何用server叢集來負載均衡呢?
基於以上問題,增加了一個註冊中心nameServer模組來負責負載均衡:
最小排程次數策略:NameServer記錄server叢集狀態並維護各個server的分配任務次數,由於server是否排程某個worker由表中資料決定,worker會在每次pull判斷是否發起請求更新server中的排程關係表,並將目前分組交由最小排程次數的server來排程,當且僅當以下發生:
  • 同一app分組下的workerNum > threshold
  • 該分組對應的server的scheduleTimes > minServerScheduleTime x 2
考慮到server的地理位置,通訊效率等因素,後續可以考慮增加每個server的權重來更優分配
關鍵程式碼如下:
public ReBalanceInfo getServerAddressReBalanceList(String serverAddress, String appName)

{

// first req, serverAddress is empty
if

(serverAddress.isEmpty()){

            ReBalanceInfo reBalanceInfo = 

new

 ReBalanceInfo();

            reBalanceInfo.setSplit(

false

);

            reBalanceInfo.setServerIpList(

new

 ArrayList<String>(serverAddressSet));

            reBalanceInfo.setSubAppName(

""

);

return

 reBalanceInfo;

        }

        ReBalanceInfo reBalanceInfo = 

new

 ReBalanceInfo();

// get sorted scheduleTimes serverList

        List<String> newServerIpList = serverAddress2ScheduleTimesMap.keySet().stream().sorted(

new

 Comparator<String>() {

@Override
publicintcompare(String o1, String o2)

{

return

 (

int

) (serverAddress2ScheduleTimesMap.get(o1) - serverAddress2ScheduleTimesMap.get(o2));

            }

        }).collect(Collectors.toList());

// see if split
if

(!appName2WorkerNumMap.isEmpty() && appName2WorkerNumMap.get(appName) > maxWorkerNum && appName2WorkerNumMap.get(appName) % maxWorkerNum == 

1

){

// return new serverIpList

            reBalanceInfo.setSplit(

true

);

            reBalanceInfo.setChangeServer(

false

);

            reBalanceInfo.setServerIpList(newServerIpList);

            reBalanceInfo.setSubAppName(appName + 

":"

 + appName2WorkerNumMap.size());

return

 reBalanceInfo;

        }

// see if need change server

        Long lestScheduleTimes = serverAddress2ScheduleTimesMap.get(newServerIpList.get(newServerIpList.size() - 

1

));

        Long comparedScheduleTimes = lestScheduleTimes == 

0

 ? 

1

 : lestScheduleTimes;

if

(serverAddress2ScheduleTimesMap.get(serverAddress) / comparedScheduleTimes > 

2

){

            reBalanceInfo.setSplit(

false

);

            reBalanceInfo.setChangeServer(

true

);

// first server is target lest scheduleTimes server

            reBalanceInfo.setServerIpList(newServerIpList);

            reBalanceInfo.setSubAppName(

""

);

return

 reBalanceInfo;

        }

// return default list

        reBalanceInfo.setSplit(

false

);

        reBalanceInfo.setServerIpList(

new

 ArrayList<String>(serverAddressSet));

        reBalanceInfo.setSubAppName(

""

);

return

 reBalanceInfo;
    }

實現功能:
  • app組自動拆分: 可以為app設定組內worker數量閾值,超過閾值自動拆分subApp並分配負載均衡後的server
  • worker動態分配: 對於每一個subApp,當觸發pull時,根據最小排程次數策略,可以分配至負載均衡後的server,開發者無需感知subApp
以上,解決PowerJob中同一worker分組只能被一個server排程問題,且subApp分組可以根據server的負載,實現動態依附至不同server,對於可能的重複排程問題,我們只需加上App級別的鎖,相對於xxl-job的全域性加鎖效能更好。

訊息佇列(解決任務大量建立和頻繁更改場景)

其實一開始用powerjob作為專案中的中介軟體,業務中的任務操作使用其openAPI。過程中感受最大的就是,我的業務只是根據任務id修改了任務引數,並不需要server的響應,為什麼要同步阻塞?可靠性應由server保證而不是客戶端的大量重試及等待。對於業務中頻繁建立定時任務和改動,更應是非同步操作。
一開始的想法是,使用grpc的futureStub進行非同步傳送,請求由Reactor執行緒監聽事件,當事件可讀時分配給業務執行緒池進行處理(gRPC內部已經實現)。所以需要做的似乎只是做一個Producer服務,並把stub全換成Future型別,對於jobId,我們用雪花演算法拿到一個全域性id就可以,無需server分配。
但是以上設計有一個致命的問題——阻塞在BlockingQueue的請求無法ack,且server宕機存在訊息丟失的可能!這違背了訊息佇列的設計(入隊--ack--持久化--消費),意味著只有被分配到執行緒(消費者)消費時,才能被ack,而活躍的執行緒數並不多。故不能僅僅依賴gRPC的內部實現,需要自己實現訊息佇列
  • 可靠訊息
以rocketMQ為例,producer的訊息會先到達broker中的佇列後返回ack,consumer再輪詢從broker中pull重平衡處理後的訊息消費。
考慮到本專案的設計無需路由,所有的server都可以接受訊息,於是不再設計broker,將server和broker結合,每個server維護自己的佇列,且消費自己佇列的訊息,這樣還能減少一次通訊。
這樣可靠訊息的解決就變成了:
  • producer到server的訊息丟失——失敗或者超時則依次遍歷所有的server,一定能保證訊息抵達,不再闡述
  • server的佇列訊息丟失(機器宕機)——持久化,採用同步刷盤策略,百分之百的可靠
持久化:同步刷盤機制借鑑了rocketMQ的mmap和commitLog/consumerQueue設計,將磁碟的檔案對映到記憶體進行讀寫,每次訊息進來先存到buffer後觸發刷盤,成功後執行寫響應的回撥;用consumerQueue檔案作為佇列,server定時pull消費訊息,詳細見k-job-server.consumer.DefaultMessageStore,有詳細註釋
// 和rocketMQ一樣,讀寫都是用mmap,因為記憶體buffer就是檔案的對映,只是有刷盤機制
private

 MappedByteBuffer commitLogBuffer;  

// 對映到記憶體的commitlog檔案
private

 MappedByteBuffer consumerQueueBuffer; 

// 對映到記憶體的consumerQueue檔案
privatefinal

 AtomicLong commitLogBufferPosition = 

new

 AtomicLong(

0

);

// consumerLog的buffer的位置,同步刷盤的情況下與consumerLog檔案的位置保持一致
privatefinal

 AtomicLong commitLogCurPosition = 

new

 AtomicLong(

0

);

// consumerLog檔案的目前位置,每次刷盤後就等於buffer位置
privatefinal

 AtomicLong lastProcessedOffset = 

new

 AtomicLong(

0

);

// consumerQueue的buffer拉取commitLog的位置,與commitLog相比,重啟時就是consumerQueue檔案最後一條訊息的索引位置
privatefinal

 AtomicLong currentConsumerQueuePosition = 

new

 AtomicLong(

0

); 

// consumerQueue檔案的目前位置
privatefinal

 AtomicLong consumerPosition = 

new

 AtomicLong(

0

); 

// 記錄消費者在consumerQueue中的消費位置,這個只在目前的系統中有,類似於rocketMQ透過pull遠端拉取

  • 訊息重試
對於producer,前面提到,為了應對大量定時任務的場景,對於任務的操作,應全部是非同步的,我們引入超時機制即可,當超過一定的時間未收到ack,或者返回錯誤響應,選擇下一個server發起重試
對於consumer(server),使用多級延時佇列,當某個訊息消費失敗後,投遞至下一級延遲更久的延時佇列,若全都消費失敗則進入死信佇列,需要人工干預
privatestaticfinal

 Deque<MqCausa.Message> deadMessageQueue = 

new

 ArrayDeque<>();

privatestaticfinal

 List<DelayQueue<DelayedMessage>> delayQueueList = 

new

 ArrayList<>(

2

);

/**

     * 逆序排序,因為重試次數到0則不再重試

     */


privatestatic

 List<Long> delayTimes = Lists.newArrayList(

10000L

5000L

);

publicstaticvoidinit(Consumer consumer)

{

        delayQueueList.add(

new

 DelayQueue<>());

        delayQueueList.add(

new

 DelayQueue<>());

        Thread consumerThread1 = 

new

 Thread(() -> {

try

 {

while

 (

true

) {

// 從延時佇列中取出訊息(會等待直到訊息到期)

                    DelayQueue<DelayedMessage> delayQueue = delayQueueList.get(

0

);

if

(!delayQueue.isEmpty()) {

                        DelayedMessage message = delayQueue.take();

                        consumer.consume(message.message);

                        delayQueue.remove(message);

                        System.out.println(

"Consumed: "

 + message.getMessage() + 

" at "

 + System.currentTimeMillis());

                    }

                }

            } 

catch

 (InterruptedException e) {

                Thread.currentThread().interrupt();

                System.out.println(

"Consumer thread interrupted"

);

            }

        });

//  其他等級的延時佇列

        consumerThread1.start();

    }

publicstaticvoidreConsume(MqCausa.Message msg)

{

if

 (msg.getRetryTime() == 

0

) {

            log.error(

"msg : {} is dead"

, msg);

            deadMessageQueue.add(msg);

return

;

        }

        MqCausa.Message build = msg.toBuilder().setRetryTime(msg.getRetryTime() - 

1

).build();

        DelayedMessage delayedMessage = 

new

 DelayedMessage(build, delayTimes.get(build.getRetryTime()));

        delayQueueList.get(msg.getRetryTime() - 

1

).add(delayedMessage);

    }

// 定義一個延時訊息類,實現 Delayed 介面
staticclassDelayedMessageimplementsDelayed

{

privatefinal

 MqCausa.Message message;

privatefinallong

 triggerTime; 

// 到期時間

publicDelayedMessage(MqCausa.Message message, long delayTime)

{

this

.message = message;

// 當前時間加上延時時間,設定訊息的觸發時間
this

.triggerTime = System.currentTimeMillis() + delayTime;

    }

// 獲取剩餘的延時時間
@Override
publiclonggetDelay(TimeUnit unit)

{

return

 unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

    }

// 比較方法,用於確定訊息的順序
@Override
publicintcompareTo(Delayed other)

{

if

 (

this

.triggerTime < ((DelayedMessage) other).triggerTime) {

return

 -

1

;

        } 

elseif

 (

this

.triggerTime > ((DelayedMessage) other).triggerTime) {

return1

;

        }

return0

;

    }

public

 MqCausa.

Message getMessage()

{

return

 message;

    }

}

最終實現如圖所示:
實現功能:
  • 對於任務操作請求的非同步傳送
  • 輪詢策略實現消費的負載均衡

其他

附上個人總結的對於worker和server之間服務發現以及排程的流程圖

服務發現

排程

專案地址

https://github.com/karatttt/k-job

歡迎加入我的知識星球,全面提升技術能力。
👉 加入方式,長按”或“掃描”下方二維碼噢
星球的內容包括:專案實戰、面試招聘、原始碼解析、學習路線。
文章有幫助的話,在看,轉發吧。
謝謝支援喲 (*^__^*)

相關文章