
一 簡介
-
允許建立複雜的業務流程,流程中每個獨立的任務都是由一個微服務所實現。 -
基於JSON DSL 建立工作流,對任務的執行進行編排。 -
工作流在執行的過程中可見、可追溯。 -
提供暫停、恢復、重啟等多種控制模型。 -
提供一種簡單的方式來最大限度重用微服務。 -
擁有擴充套件到百萬流程併發執行的服務能力。 -
透過佇列服務實現客戶端與服務端的分離。 -
支援 HTTP 或其他RPC協議進行資料傳送
二 基本概念
1 Task
-
System Task:被conductor服務執行,這些任務的執行與引擎在同一個JVM中。
-
Worker Task:被worker服務執行,執行與引擎隔離開,worker透過佇列獲取任務後,執行並更新結果狀態到引擎。Worker的實現是跨語言的,其使用Http協議與Server通訊。
-
功能性Task:
-
HTTP:傳送http請求
-
JSON_JQ_TRANSFORM:jq命令執行,一般使用者json的轉換,具體可見jq官方文件
-
KAFKA_PUBLISH: 釋出kafka訊息
-
流程控制Task:
-
SWITCH(原Decision):條件判斷分支,類似於程式碼中的switch case
-
FORK:啟動並行分支,用於排程並行任務
-
JOIN:彙總並行分支,用於彙總並行任務
-
DO_WHILE:迴圈,類似於程式碼中的do while
-
WAIT:一直在執行中,直到外部時間觸發更新節點狀態,可用於等待外部操作
-
SUB_WORKFLOW:子流程,執行其他的流程
-
TERMINATE:結束流程,以指定輸出提前結束流程,可以與SWITCH節點配合使用,類似程式碼中的提前return語句
-
對於System Task,Conductor提供了WorkflowSystemTask 抽象類,可以自定義擴充套件實現。
-
對於Worker Task,可以實現conductor的client Worker介面實現執行邏輯。
2 Workflow
-
Workflow由一系列需要執行的Task組成,conductor採用json來描述Task的流轉關係。
-
除基本的順序流程外,藉助內建的SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE任務,還能實現分支、並行、迴圈、提前結束等流程控制。
3 Input&Output
-
Task有自己的輸入和輸出,輸入輸出都是jsonobject型別。
-
Task可以引用其他Task的輸入輸出,使用${taskxxx.output}的方式引用。引用語法為json-path,除最基礎的${taskxxx.output}的值解析方式外,還支援其他複雜操作,如過濾等,具體見json-path語法。
-
啟動Workflow時可以傳入流程的輸入資料,Task可以透過${workflow.input}的方式引用。
三 整體架構

-
Orchestrator: 負責流程的流轉排程工作;
-
Management/Execution Service: 提供流程、任務的管理更新等操作;
-
TaskQueues: 任務佇列,Orchestrator解析出來的待執行Task會放到佇列中;
-
Worker: 任務執行worker,從TaskQueues中獲取任務,透過Execution Service更新任務狀態與結果資料;
-
Database: 元資料&執行時資料庫,用於儲存執行時的Workflow、Task等狀態資訊,以及流程任務定義的等原資訊;
-
Index: 索引資料庫,用於儲存執行歷史;
四 執行模型
1 Task狀態轉移
-
SCHEDULED:待排程,task放到佇列中還沒有被poll出來執行時的狀態 -
IN_PROGRESS:執行中,被poll出來執行但還沒有完成時的狀態 -
COMPLETED:執行完成 -
FAILED:執行失敗 -
CANCELLED:被中止時為此狀態,一般出現在兩種情況:
-
1.手動中止流程時,正在執行中的task會被置為此狀態;
-
2.多個fork分支,當某個分支的task失敗時,其它分支中正在執行的task會被置為此狀態;

2 任務佇列
-
任務佇列,是一個帶有延遲、優先順序功能的佇列;
-
每種型別的Task是一個單獨的佇列,此外,如果配置了domain、isolationGroup,還會拆分成多個佇列實現執行隔離;
-
decider service是生產者,其根據流程配置與當前執行情況,解析出可執行的task後,新增到佇列;
-
任務執行器(SystemTaskWorker、Worker)是消費者,其長輪詢對應的佇列,從佇列中獲取任務執行;
3 核心功能實現機制


decide的觸發時機
-
新啟動執行時,會觸發decide操作 -
系統任務執行完成時,會觸發decide操作 -
Workder任務透過ExecutionService更新任務狀態時,會觸發decide操作
流程控制節點的實現機制
1)Task & TaskMapper
-
Task:任務的執行邏輯程式碼,它的作用是Task的執行
-
TaskMapper:任務的對映邏輯程式碼,它透過Task的定義配置、當前例項的執行狀態等資訊,返回實際需要執行的Task列表
2)條件分支(SWITCH)的實現機制
// 待排程的Task list,最終返回結果
List<Task> tasksToBeScheduled = new LinkedList<>();
// evalResult是分支條件變數的值(case)
// decisionCases是一個Map結構,key為分支的case值,value為對應分支的任務定義list(分支內的任務定義會有多個)
// 根據分支變數的實際值,獲取對應分支的任務定義list
List<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);
// default的邏輯:如果獲取不到對應的分支或者分支為空,則用預設的分支
if (selectedTasks == null || selectedTasks.isEmpty()) {
selectedTasks = taskToSchedule.getDefaultCase();
}
if (selectedTasks != null && !selectedTasks.isEmpty()) {
// 獲取分支的第一個(下標0)task,返回給decider service去做排程(decider會把任務新增到佇列裡,交給worker去執行)
WorkflowTask selectedTask = selectedTasks.get(0);
// 呼叫了deciderService的getTasksToBeScheduled方法,此方法裡又獲取到TaskMapper呼叫了getMappedTasks。這裡採用了遞迴呼叫的方式,解析巢狀的Task
List<Task> caseTasks = taskMapperContext.getDeciderService()
.getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId());
tasksToBeScheduled.addAll(caseTasks);
switchTask.getInputData().put("hasChildren", "true");
}
return tasksToBeScheduled;
3)並行(FORK)的實現機制
// 待排程的Task list,最終返回結果
List<Task> tasksToBeScheduled = new LinkedList<>();
// 配置中的所有fork分支
List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks();
for (List<WorkflowTask> wfts : forkTasks) {
// 每個分支取第一個Task
WorkflowTask wft = wfts.get(0);
// 呼叫了deciderService的getTasksToBeScheduled方法,此方法裡又獲取到TaskMapper呼叫了getMappedTasks。這裡採用了遞迴呼叫的方式,解析巢狀的Task
List<Task> tasks2 = taskMapperContext.getDeciderService()
.getTasksToBeScheduled(workflowInstance, wft, retryCount);
tasksToBeScheduled.addAll(tasks2);
}
return tasksToBeScheduled;
重試的實現機制
五 完整性保障機制
1 WorkflowReconciler
2 decideQueue
3 ExecutionLockService
if (!executionLockService.acquireLock(workflowId)) {
return false;
}
PostgreSQL實戰進階
PostgreSQL被譽為“世界上功能最強大的開源資料庫”,是以加州大學伯克利分校計算機系開發的POSTGRES 4.2為基礎的物件關係型資料庫管理系統。PostgreSQL支援大部分 SQL標準並且提供了許多其他現代特性:複雜查詢、外部索引鍵、觸發器、檢視、事務完整性、MVCC。同樣,PostgreSQL 可以用許多方法擴充套件,比如,透過增加新的資料型別、函式、運算子、聚集函式、索引。開發者可以免費使用、修改、和分發 PostgreSQL,不管是私用、商用、還是學術研究使用。
本課程由PostgreSQL社群核心成員出品,帶你快速從0-1深入PostgreSQL核心特性。點選閱讀原文檢視詳情。
關鍵詞
佇列中
任務佇列
方法
狀態
框架