👉 這是一個或許對你有用的社群
《專案實戰(影片)》:從書中學,往事上“練” 《網際網路高頻面試題》:面朝簡歷學習,春暖花開 《架構 x 系統設計》:摧枯拉朽,掌控面試高頻場景題 《精進 Java 學習指南》:系統學習,網際網路主流技術棧 《必讀 Java 原始碼專欄》:知其然,知其所以然

👉這是一個或許對你有用的開源專案國產 Star 破 10w+ 的開源專案,前端包括管理後臺 + 微信小程式,後端支援單體和微服務架構。功能涵蓋 RBAC 許可權、SaaS 多租戶、資料許可權、商城、支付、工作流、大屏報表、微信公眾號、ERP、CRM、AI 大模型等等功能:
Boot 多模組架構:https://gitee.com/zhijiantianya/ruoyi-vue-pro Cloud 微服務架構:https://gitee.com/zhijiantianya/yudao-cloud 影片教程:https://doc.iocoder.cn 【國內首批】支援 JDK 17/21 + SpringBoot 3.3、JDK 8/11 + Spring Boot 2.7 雙版本
專案背景
QuartZ
到xxl-job
,再到近幾年出現的PowerJob
,既然有這麼多的好的實現,為什麼還是選擇重寫一個定時任務框架呢?-
MQ的延時佇列無法動態調整任務引數; -
redis的過期策略需要儲存太久的key且可能會有BigKey -
xxljob沒有原生的openAPI,其基於資料庫鎖的排程只是實現server的高可用而不是高效能; -
powerjob的openAPI又是基於http的同步阻塞排程,並且對於server的負載均衡,由於其分組隔離設計,需要開發者手動配置,在高併發下的定時任務操作下,並不能很好的排程server叢集。
本文章主要介紹該專案相對於目前主流定時任務框架的特性,對於定時任務排程和發現的詳細可以見原始碼,文章末尾也給出了流程圖方便理解
基於 Spring Boot + MyBatis Plus + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
專案地址:https://github.com/YunaiV/ruoyi-vue-pro 影片教程:https://doc.iocoder.cn/video/
定位
-
支援定時任務頻繁建立和任務引數頻繁動態變動的場景(提供輕量API,並使用內建訊息佇列非同步處理) -
支援大量定時任務併發執行的場景,實現負載均衡(分組隔離+應用級別的鎖實現) -
主要針對小型任務 ,無需過多配置,不對任務例項進行操作
基於 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
專案地址:https://github.com/YunaiV/yudao-cloud 影片教程:https://doc.iocoder.cn/video/
技術選型
通訊 : gRPC(基於netty的nio)
序列化 :Protobuf編碼格式編解碼
負載均衡 :自己實現的註冊中心NameServer
|___ 策略 : 服務端最小排程次數策略
|___ 互動 :pull+push
訊息佇列 : 自己實現的簡易訊息佇列
|___ 訊息傳送 : 非同步+超時重試
|___ 持久化 :mmap+同步刷盤策略
|___ 訊息重試 :多級延時佇列+死信佇列
定時排程 : 時間輪演算法
專案結構
├── LICENSE
├── k-job-common
// 各元件的公共依賴,開發者無需感知
├── k-job-nameServer
// server和worker的註冊中心,提供負載均衡
├── k-job-producer
//普通Jar包,提供 OpenAPI,內建訊息佇列的非同步傳送
├── k-job-server
// 基於SpringBoot實現的排程伺服器
├── k-job-worker-boot-starter
// kjob-worker 的 spring-boot-starter ,spring boot 應用可以通用引入該依賴一鍵接入 kjob-server
├── k-job-worker
// 普通Jar包,接入kjob-server的應用需要依賴該Jar包
└── pom.xml
特性
負載均衡(解決大量定時任務併發執行場景)
-
server一次排程從DB中獲取太多工,可能會OOM -
發起排程請求是由執行緒池負責,可能會有效能瓶頸,我們的系統對時間是敏感的,對時間精度高要求 -
我們的OpenAPI同樣也不希望大量請求落在同一個server上
nameServer
模組來負責負載均衡:
-
同一app分組下的 workerNum > threshold
-
該分組對應的server的 scheduleTimes > minServerScheduleTime x 2
考慮到server的地理位置,通訊效率等因素,後續可以考慮增加每個server的權重來更優分配
public ReBalanceInfo getServerAddressReBalanceList(String serverAddress, String appName)
{
// first req, serverAddress is empty
if
(serverAddress.isEmpty()){
ReBalanceInfo reBalanceInfo =
new
ReBalanceInfo();
reBalanceInfo.setSplit(
false
);
reBalanceInfo.setServerIpList(
new
ArrayList<String>(serverAddressSet));
reBalanceInfo.setSubAppName(
""
);
return
reBalanceInfo;
}
ReBalanceInfo reBalanceInfo =
new
ReBalanceInfo();
// get sorted scheduleTimes serverList
List<String> newServerIpList = serverAddress2ScheduleTimesMap.keySet().stream().sorted(
new
Comparator<String>() {
@Override
publicintcompare(String o1, String o2)
{
return
(
int
) (serverAddress2ScheduleTimesMap.get(o1) - serverAddress2ScheduleTimesMap.get(o2));
}
}).collect(Collectors.toList());
// see if split
if
(!appName2WorkerNumMap.isEmpty() && appName2WorkerNumMap.get(appName) > maxWorkerNum && appName2WorkerNumMap.get(appName) % maxWorkerNum ==
1
){
// return new serverIpList
reBalanceInfo.setSplit(
true
);
reBalanceInfo.setChangeServer(
false
);
reBalanceInfo.setServerIpList(newServerIpList);
reBalanceInfo.setSubAppName(appName +
":"
+ appName2WorkerNumMap.size());
return
reBalanceInfo;
}
// see if need change server
Long lestScheduleTimes = serverAddress2ScheduleTimesMap.get(newServerIpList.get(newServerIpList.size() -
1
));
Long comparedScheduleTimes = lestScheduleTimes ==
0
?
1
: lestScheduleTimes;
if
(serverAddress2ScheduleTimesMap.get(serverAddress) / comparedScheduleTimes >
2
){
reBalanceInfo.setSplit(
false
);
reBalanceInfo.setChangeServer(
true
);
// first server is target lest scheduleTimes server
reBalanceInfo.setServerIpList(newServerIpList);
reBalanceInfo.setSubAppName(
""
);
return
reBalanceInfo;
}
// return default list
reBalanceInfo.setSplit(
false
);
reBalanceInfo.setServerIpList(
new
ArrayList<String>(serverAddressSet));
reBalanceInfo.setSubAppName(
""
);
return
reBalanceInfo;
}
-
app組自動拆分: 可以為app設定組內worker數量閾值,超過閾值自動拆分subApp並分配負載均衡後的server -
worker動態分配: 對於每一個subApp,當觸發pull時,根據最小排程次數策略,可以分配至負載均衡後的server,開發者無需感知subApp
PowerJob
中同一worker
分組只能被一個server排程問題,且subApp分組可以根據server的負載,實現動態依附至不同server
,對於可能的重複排程問題,我們只需加上App級別的鎖,相對於xxl-job的全域性加鎖效能更好。訊息佇列(解決任務大量建立和頻繁更改場景)
powerjob
作為專案中的中介軟體,業務中的任務操作使用其openAPI。過程中感受最大的就是,我的業務只是根據任務id修改了任務引數,並不需要server的響應,為什麼要同步阻塞?可靠性應由server保證而不是客戶端的大量重試及等待。對於業務中頻繁建立定時任務和改動,更應是非同步操作。futureStub
進行非同步傳送,請求由Reactor
執行緒監聽事件,當事件可讀時分配給業務執行緒池進行處理(gRPC內部已經實現)。所以需要做的似乎只是做一個Producer
服務,並把stub全換成Future型別,對於jobId,我們用雪花演算法拿到一個全域性id就可以,無需server分配。
BlockingQueue
的請求無法ack,且server宕機存在訊息丟失的可能!這違背了訊息佇列的設計(入隊--ack--持久化--消費
),意味著只有被分配到執行緒(消費者)消費時,才能被ack,而活躍的執行緒數並不多。故不能僅僅依賴gRPC的內部實現,需要自己實現訊息佇列-
可靠訊息
producer
的訊息會先到達broker中的佇列後返回ack,consumer
再輪詢從broker中pull重平衡處理後的訊息消費。-
producer到server的訊息丟失——失敗或者超時則依次遍歷所有的server,一定能保證訊息抵達,不再闡述 -
server的佇列訊息丟失(機器宕機)——持久化,採用同步刷盤策略,百分之百的可靠
commitLog/consumerQueue
設計,將磁碟的檔案對映到記憶體進行讀寫,每次訊息進來先存到buffer後觸發刷盤,成功後執行寫響應的回撥;用consumerQueue
檔案作為佇列,server定時pull消費訊息,詳細見k-job-server.consumer.DefaultMessageStore
,有詳細註釋// 和rocketMQ一樣,讀寫都是用mmap,因為記憶體buffer就是檔案的對映,只是有刷盤機制
private
MappedByteBuffer commitLogBuffer;
// 對映到記憶體的commitlog檔案
private
MappedByteBuffer consumerQueueBuffer;
// 對映到記憶體的consumerQueue檔案
privatefinal
AtomicLong commitLogBufferPosition =
new
AtomicLong(
0
);
// consumerLog的buffer的位置,同步刷盤的情況下與consumerLog檔案的位置保持一致
privatefinal
AtomicLong commitLogCurPosition =
new
AtomicLong(
0
);
// consumerLog檔案的目前位置,每次刷盤後就等於buffer位置
privatefinal
AtomicLong lastProcessedOffset =
new
AtomicLong(
0
);
// consumerQueue的buffer拉取commitLog的位置,與commitLog相比,重啟時就是consumerQueue檔案最後一條訊息的索引位置
privatefinal
AtomicLong currentConsumerQueuePosition =
new
AtomicLong(
0
);
// consumerQueue檔案的目前位置
privatefinal
AtomicLong consumerPosition =
new
AtomicLong(
0
);
// 記錄消費者在consumerQueue中的消費位置,這個只在目前的系統中有,類似於rocketMQ透過pull遠端拉取
-
訊息重試
consumer(server)
,使用多級延時佇列,當某個訊息消費失敗後,投遞至下一級延遲更久的延時佇列,若全都消費失敗則進入死信佇列,需要人工干預privatestaticfinal
Deque<MqCausa.Message> deadMessageQueue =
new
ArrayDeque<>();
privatestaticfinal
List<DelayQueue<DelayedMessage>> delayQueueList =
new
ArrayList<>(
2
);
/**
* 逆序排序,因為重試次數到0則不再重試
*/
privatestatic
List<Long> delayTimes = Lists.newArrayList(
10000L
,
5000L
);
publicstaticvoidinit(Consumer consumer)
{
delayQueueList.add(
new
DelayQueue<>());
delayQueueList.add(
new
DelayQueue<>());
Thread consumerThread1 =
new
Thread(() -> {
try
{
while
(
true
) {
// 從延時佇列中取出訊息(會等待直到訊息到期)
DelayQueue<DelayedMessage> delayQueue = delayQueueList.get(
0
);
if
(!delayQueue.isEmpty()) {
DelayedMessage message = delayQueue.take();
consumer.consume(message.message);
delayQueue.remove(message);
System.out.println(
"Consumed: "
+ message.getMessage() +
" at "
+ System.currentTimeMillis());
}
}
}
catch
(InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(
"Consumer thread interrupted"
);
}
});
// 其他等級的延時佇列
consumerThread1.start();
}
publicstaticvoidreConsume(MqCausa.Message msg)
{
if
(msg.getRetryTime() ==
0
) {
log.error(
"msg : {} is dead"
, msg);
deadMessageQueue.add(msg);
return
;
}
MqCausa.Message build = msg.toBuilder().setRetryTime(msg.getRetryTime() -
1
).build();
DelayedMessage delayedMessage =
new
DelayedMessage(build, delayTimes.get(build.getRetryTime()));
delayQueueList.get(msg.getRetryTime() -
1
).add(delayedMessage);
}
// 定義一個延時訊息類,實現 Delayed 介面
staticclassDelayedMessageimplementsDelayed
{
privatefinal
MqCausa.Message message;
privatefinallong
triggerTime;
// 到期時間
publicDelayedMessage(MqCausa.Message message, long delayTime)
{
this
.message = message;
// 當前時間加上延時時間,設定訊息的觸發時間
this
.triggerTime = System.currentTimeMillis() + delayTime;
}
// 獲取剩餘的延時時間
@Override
publiclonggetDelay(TimeUnit unit)
{
return
unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 比較方法,用於確定訊息的順序
@Override
publicintcompareTo(Delayed other)
{
if
(
this
.triggerTime < ((DelayedMessage) other).triggerTime) {
return
-
1
;
}
elseif
(
this
.triggerTime > ((DelayedMessage) other).triggerTime) {
return1
;
}
return0
;
}
public
MqCausa.
Message getMessage()
{
return
message;
}
}

-
對於任務操作請求的非同步傳送 -
輪詢策略實現消費的負載均衡
其他
服務發現

排程

專案地址
https://github.com/karatttt/k-job





