開源微服務編排框架:NetflixConductor

本文主要介紹netflix conductor的基本概念和主要執行機制。

一  簡介

netflix conductor是基於JAVA語言編寫的開源流程引擎,用於架構基於微服務的流程。它具備如下特性:
  • 允許建立複雜的業務流程,流程中每個獨立的任務都是由一個微服務所實現。
  • 基於JSON DSL 建立工作流,對任務的執行進行編排。
  • 工作流在執行的過程中可見、可追溯。
  • 提供暫停、恢復、重啟等多種控制模型。
  • 提供一種簡單的方式來最大限度重用微服務。
  • 擁有擴充套件到百萬流程併發執行的服務能力。
  • 透過佇列服務實現客戶端與服務端的分離。
  • 支援 HTTP 或其他RPC協議進行資料傳送

二  基本概念

1  Task

Task是最小執行單元,承載了一段執行邏輯,如傳送HTTP請求等。
  • System Task:被conductor服務執行,這些任務的執行與引擎在同一個JVM中。
  • Worker Task:被worker服務執行,執行與引擎隔離開,worker透過佇列獲取任務後,執行並更新結果狀態到引擎。Worker的實現是跨語言的,其使用Http協議與Server通訊。
conductor提供了若干內建SystemTask:
  • 功能性Task:
    • HTTP:傳送http請求
    • JSON_JQ_TRANSFORM:jq命令執行,一般使用者json的轉換,具體可見jq官方文件
    • KAFKA_PUBLISH: 釋出kafka訊息
  • 流程控制Task:
    • SWITCH(原Decision):條件判斷分支,類似於程式碼中的switch case
    • FORK:啟動並行分支,用於排程並行任務
    • JOIN:彙總並行分支,用於彙總並行任務
    • DO_WHILE:迴圈,類似於程式碼中的do while
    • WAIT:一直在執行中,直到外部時間觸發更新節點狀態,可用於等待外部操作
    • SUB_WORKFLOW:子流程,執行其他的流程
    • TERMINATE:結束流程,以指定輸出提前結束流程,可以與SWITCH節點配合使用,類似程式碼中的提前return語句
自定義Task:
  • 對於System Task,Conductor提供了WorkflowSystemTask 抽象類,可以自定義擴充套件實現。
  • 對於Worker Task,可以實現conductor的client Worker介面實現執行邏輯。

2  Workflow

  • Workflow由一系列需要執行的Task組成,conductor採用json來描述Task的流轉關係。
  • 除基本的順序流程外,藉助內建的SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE任務,還能實現分支、並行、迴圈、提前結束等流程控制。

3  Input&Output

Task的輸入是一種對映,其作為工作流例項化的一部分或某些其他Task的輸出。允許將來自工作流或其他Task的輸入/輸出作為隨後執行的Task的輸入。
  • Task有自己的輸入和輸出,輸入輸出都是jsonobject型別。
  • Task可以引用其他Task的輸入輸出,使用${taskxxx.output}的方式引用。引用語法為json-path,除最基礎的${taskxxx.output}的值解析方式外,還支援其他複雜操作,如過濾等,具體見json-path語法。
  • 啟動Workflow時可以傳入流程的輸入資料,Task可以透過${workflow.input}的方式引用。
Task實現原子操作的處理以及流程控制操作,Workflow定義描述Task的流轉關係,Task引用Workflow或者其它Task的輸入輸出。透過這些機制,conductor實現了JSON DSL對流程的描述。

三  整體架構

主要分為幾個部分:
  • Orchestrator: 負責流程的流轉排程工作;
  • Management/Execution Service: 提供流程、任務的管理更新等操作;
  • TaskQueues: 任務佇列,Orchestrator解析出來的待執行Task會放到佇列中;
  • Worker: 任務執行worker,從TaskQueues中獲取任務,透過Execution Service更新任務狀態與結果資料;
  • Database: 元資料&執行時資料庫,用於儲存執行時的Workflow、Task等狀態資訊,以及流程任務定義的等原資訊;
  • Index: 索引資料庫,用於儲存執行歷史;

四  執行模型

1  Task狀態轉移

  • SCHEDULED:待排程,task放到佇列中還沒有被poll出來執行時的狀態
  • IN_PROGRESS:執行中,被poll出來執行但還沒有完成時的狀態
  • COMPLETED:執行完成
  • FAILED:執行失敗
  • CANCELLED:被中止時為此狀態,一般出現在兩種情況:
    • 1.手動中止流程時,正在執行中的task會被置為此狀態;
    • 2.多個fork分支,當某個分支的task失敗時,其它分支中正在執行的task會被置為此狀態;

2  任務佇列

任務的執行(同步的系統任務除外)都會先新增到任務佇列中,是典型的生產者消費者模式。
  • 任務佇列,是一個帶有延遲、優先順序功能的佇列;
  • 每種型別的Task是一個單獨的佇列,此外,如果配置了domain、isolationGroup,還會拆分成多個佇列實現執行隔離;
  • decider service是生產者,其根據流程配置與當前執行情況,解析出可執行的task後,新增到佇列;
  • 任務執行器(SystemTaskWorker、Worker)是消費者,其長輪詢對應的佇列,從佇列中獲取任務執行;
佇列介面可插拔,conductor提供了Dynomite 、MySQL、PostgreSQL的實現。

3  核心功能實現機制

conductor排程的核心是decider service,其根據當前流程執行的狀態,解析出將要執行的任務列表,將任務入隊交給worker執行。
decide主要流程簡化如下,詳細程式碼見WorkflowExecutor.java的decide方法:
其中,排程任務處理流程簡化如下,詳細程式碼見WorkflowExecutor.java的scheduleTask方法:

decide的觸發時機

最主要的觸發時機:
  1. 新啟動執行時,會觸發decide操作
  2. 系統任務執行完成時,會觸發decide操作
  3. Workder任務透過ExecutionService更新任務狀態時,會觸發decide操作

流程控制節點的實現機制

1)Task & TaskMapper

對於每一個Task來說,都有Task和TaskMapper兩部分:
  • Task:任務的執行邏輯程式碼,它的作用是Task的執行
  • TaskMapper:任務的對映邏輯程式碼,它透過Task的定義配置、當前例項的執行狀態等資訊,返回實際需要執行的Task列表
對於一般的任務來說,TaskMapper返回的是就是Task本身,補充一些執行例項的狀態資訊。但是對於控制節點來說,會有不同的邏輯。

2)條件分支(SWITCH)的實現機制

SWITCH用於根據條件判斷,執行不同的分支。
實際上,該節點的Task不做任何操作,TaskMapper根據分支條件,判斷出要走的分之後,返回對應分支的第一個Task。
SwitchTaskMapper.java getMappedTasks方法關鍵程式碼:
// 待排程的Task list,最終返回結果List<Task> tasksToBeScheduled = new LinkedList<>();// evalResult是分支條件變數的值(case)// decisionCases是一個Map結構,key為分支的case值,value為對應分支的任務定義list(分支內的任務定義會有多個)// 根據分支變數的實際值,獲取對應分支的任務定義listList<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);// default的邏輯:如果獲取不到對應的分支或者分支為空,則用預設的分支if (selectedTasks == null || selectedTasks.isEmpty()) { selectedTasks = taskToSchedule.getDefaultCase();}if (selectedTasks != null && !selectedTasks.isEmpty()) { // 獲取分支的第一個(下標0)task,返回給decider service去做排程(decider會把任務新增到佇列裡,交給worker去執行) WorkflowTask selectedTask = selectedTasks.get(0); // 呼叫了deciderService的getTasksToBeScheduled方法,此方法裡又獲取到TaskMapper呼叫了getMappedTasks。這裡採用了遞迴呼叫的方式,解析巢狀的Task List<Task> caseTasks = taskMapperContext.getDeciderService() .getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId()); tasksToBeScheduled.addAll(caseTasks); switchTask.getInputData().put("hasChildren", "true");}return tasksToBeScheduled;

3)並行(FORK)的實現機制

FORK用於開啟多個並行分支。
實際上,該節點的Task不做任何操作,TaskMapper返回所有並行分支的第一個Task。
ForkJoinTaskMapper.java getMappedTasks關鍵程式碼:
// 待排程的Task list,最終返回結果List<Task> tasksToBeScheduled = new LinkedList<>();// 配置中的所有fork分支List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks();for (List<WorkflowTask> wfts : forkTasks) { // 每個分支取第一個Task WorkflowTask wft = wfts.get(0); // 呼叫了deciderService的getTasksToBeScheduled方法,此方法裡又獲取到TaskMapper呼叫了getMappedTasks。這裡採用了遞迴呼叫的方式,解析巢狀的Task List<Task> tasks2 = taskMapperContext.getDeciderService() .getTasksToBeScheduled(workflowInstance, wft, retryCount); tasksToBeScheduled.addAll(tasks2);}return tasksToBeScheduled;
總的來說,分支(SWITCH)、並行(FORK)節點本身沒有執行邏輯,其透過TaskMapper返回到實際要執行的Task,然後交給Decider Service處理。

重試的實現機制

重試和其延遲時間設定,都是藉助任務佇列的功能實現的。
重試:將任務重新新增到任務佇列
重試的延遲時間:新增到任務佇列時設定延遲時間,延遲時間過後,任務才能在佇列中被poll出來執行

五  完整性保障機制

由於排程過程中可能會出現因機器重啟、網路異常、JVM崩潰等偶發情況,這些會導致的decide過程意外終止,流程執行不完整,展現出如流程一直執行中(實際已經沒有在排程),或者其它狀態錯誤等異常現象。

1  WorkflowReconciler

針對這種情況,conductor有一個WorkflowReconciler,會定期嘗試decide所有正在執行中的流程,修復流程執行的一致性。此外,它還有一個作用是校驗流程超時時間。

2  decideQueue

那麼WorkflowReconciler是如何獲取到當前執行中的流程呢,答案是decideQueue。
decideQueue和任務佇列相同,也是一個具有延遲功能的佇列,其存放的是正在執行中的流程的例項id。在任務開始執行時(包括新啟動執行、重試執行、恢復執行、重跑執行等),會將例項id push到decideQueue中;在執行結束(成功、失敗)時,會從decideQueue中刪除例項id。

3  ExecutionLockService

WorkflowReconciler會定期嘗試decide所有正在執行中的流程用於超時判斷、維護流程一致性。但是流程本身正常執行也會觸發decide,如果同一個執行同時觸發兩個decide,可能會導致狀態混亂,執行卡住等問題。
conductor採用了鎖來解決這個問題,其提供了單機LocalOnlyLock(基於訊號量實現)、redis分散式鎖(基於redission實現)、zookeeper分散式鎖三種實現。
decide方法中最開始會嘗試獲取鎖,如果獲取失敗則直接返回。透過鎖來保障不會對同一個流程例項併發執行decide。
if (!executionLockService.acquireLock(workflowId)) { return false;}
由於鎖是可配置的,可能會導致一個誤區:單臺機器的話不用配置鎖。其實單機也是需要配置鎖的,因為WorkflowReconciler和流程正常執行會產生衝突,可能會導致偶發的流程狀態混亂問題。
參考:
Github: https://github.com/Netflix/conductor
官方文件:https://netflix.github.io/conductor/
WorkflowReconciler:https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowReconciler.java
WorkflowSystemTask:https://github.com/Netflix/conductor/blob/main/core/src/main/java/com/netflix/conductor/core/execution/tasks/WorkflowSystemTask.java?spm=ata.21736010.0.0.2b501a3cYnrSfT&file=WorkflowSystemTask.java

PostgreSQL實戰進階

PostgreSQL被譽為“世界上功能最強大的開源資料庫”,是以加州大學伯克利分校計算機系開發的POSTGRES 4.2為基礎的物件關係型資料庫管理系統。PostgreSQL支援大部分 SQL標準並且提供了許多其他現代特性:複雜查詢、外部索引鍵、觸發器、檢視、事務完整性、MVCC。同樣,PostgreSQL 可以用許多方法擴充套件,比如,透過增加新的資料型別、函式、運算子、聚集函式、索引。開發者可以免費使用、修改、和分發 PostgreSQL,不管是私用、商用、還是學術研究使用。
本課程由PostgreSQL社群核心成員出品,帶你快速從0-1深入PostgreSQL核心特性。點選閱讀原文檢視詳情。

相關文章