併發程式設計實踐之公平有界阻塞佇列實現

一  背景

JUC 工具包是 JAVA 併發程式設計的利器。

本文講述在沒有 
JUC 工具包幫助下,藉助原生的 JAVA 同步原語, 如何實現一個公平有界的阻塞佇列。

希望你也能在文後體會到併發程式設計的複雜之處,以及 
JUC 工具包的強大。

二  方法

本文使用到的基本工具:
  1. 同步監聽器 synchronized ,方法基本和程式碼塊級別;
  2. Object 基礎類的 waitnotifynotifyAll;
基於以上基礎工具,實現公平有界的阻塞佇列,此處:
  1. 將公平的定義限定為 FIFO ,也就是先阻塞等待的請求,先解除等待;
  2. 並不保證解除等待後執行 Action 的先後順序;
  3. 確保佇列的大小始終不超過設定的容量;但阻塞等待的請求數不做限制;

三  實現

1  基礎版本

首先,考慮在非併發場景下,藉助 ADT 實現一個基礎版本
interface Queue { boolean offer(Object obj); Object poll();}class FairnessBoundedBlockingQueue implements Queue { // 當前大小 protected int size; // 容量 protected final int capacity; // 頭指標,empty: head.next == tail == null protected Node head; // 尾指標 protected Node tail; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0; } // 如果佇列已滿,透過返回值標識 public boolean offer(Object obj) { if (size < capacity) { Node node = new Node(obj); tail.next = node; tail = node; ++size; return true; } return false; } // 如果佇列為空,head.next == null;返回空元素 public Object poll() { if (head.next != null) { Object result = head.next.value; head.next.value = null; head = head.next; // 丟棄頭結點 --size; return result; } return null; } class Node { Object value; Node next; Node(Object obj) { this.value = obj; next = null; } }}
以上
  1. 定義支援佇列的兩個基礎介面, poll 和 offer;
  2. 佇列的實現,採用經典實現;
  3. 考慮在佇列空的情況下, poll 返回為空,非阻塞;
  4. 佇列在滿的情況下, offer 返回 false ,入隊不成功,無異常;

需要注意的一點:在出隊時,本文透過遷移頭結點的方式實現,避免修改尾結點。

在下文實現併發版本時,會看到此處的用意。

2  併發版本

如果在併發場景下,上述的實現面臨一些問題,同時未實現給定的一些需求。

透過新增
synchronized ,保證併發條件下的執行緒安全問題。

注意此處做同步的原因是為了保證類的不變式。

併發問題

在併發場景下,基礎版本的實現面臨的問題包括:原子性,可見性和指令重排的問題。

參考
JMM 的相關描述。
併發問題,最簡單的解決方法是:透過 synchronized 加鎖,一次性解決問題。
// 省略介面定義class BoundedBlockingQueue implements Queue { // 當前大小 protected int size; // 容量 protected final int capacity; // 頭指標,empty: head.next == tail == null protected Node head; // 尾指標 protected Node tail; public BoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0; } // 如果佇列已滿,透過返回值標識 public synchronized boolean offer(Object obj) { if (size < capacity) { Node node = new Node(obj); tail.next = node; tail = node; ++size; return true; } return false; } // 如果佇列為空,head.next == null;返回空元素 public synchronized Object poll() { if (head.next != null) { Object result = head.next.value; head.next.value = null; head = head.next; // 丟棄頭結點 --size; return result; } return null; } // 省略 Node 的定義}
以上,簡單粗暴的加 synchronized 可以解決問題,但會引入新的問題:系統活性問題(此問題下文會解決)。

同時,簡單加
synchronized 同步是無法實現阻塞等待;即
  1. 如果佇列為空,那麼出隊的動作還是會立即返回,返回為空;
  2. 如果佇列已滿,那麼入隊動作還是會立即返回,返回操作不成功;
實現阻塞等待,需要藉助 JAVA 中的 PV 原語:wait, notify, notifyAll

參考:JDK 中對 wait, notify, notifyAll 的相關描述。

衛式方法

阻塞等待,可以透過簡單的衛式方法來實現,此問題本質上可以抽象為:
  1. 任何一個方法都需要在滿足一定條件下才可以執行;
  2. 執行方法前需要首先校驗不變式,然後執行變更;
  3. 在執行完成後,校驗是否滿足後驗不變式;
WHEN(condition) Object action(Object arg) { checkPreCondition(); doAction(arg); checkPostCondition();}
此種抽象 Ada 在語言層面上實現。在 JAVA 中,藉助 wait, notify, notifyAll 可以翻譯為:
// 當前執行緒synchronized Object action(Object arg) { while(!condition) { wait(); } // 前置條件,不變式 checkPreCondition(); doAction(); // 後置條件,不變式 checkPostCondition();}// 其他執行緒synchronized Object notifyAction(Object arg) { notifyAll();}
需要注意:
  1. 通常會採用 notifyAll 傳送通知,而非 notify ;
    因為如果當前執行緒收到 notify 通知後被中斷,那麼系統將一直等待下去。
  2. 如果使用了 notifyAll 那麼衛式語句必須放在 while 迴圈中;
    因為執行緒喚醒後,執行條件已經不滿足,雖然當前執行緒持有互斥鎖。
  3. 衛式條件的所有變數,有任何變更都需要傳送 notifyAll 不然面臨系統活性問題
據此,不難實現簡單的阻塞版本的有界佇列,如下
interface Queue { boolean offer(Object obj) throws InterruptedException; Object poll() throws InterruptedException;}class FairnessBoundedBlockingQueue implements Queue { // 當前大小 protected int size; // 容量 protected final int capacity; // 頭指標,empty: head.next == tail == null protected Node head; // 尾指標 protected Node tail; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.head = new Node(null); this.tail = head; this.size = 0; } // 如果佇列已滿,透過返回值標識 public synchronized boolean offer(Object obj) throws InterruptedException { while (size < capacity) { wait(); } Node node = new Node(obj); tail.next = node; tail = node; ++size; notifyAll(); // 可以出隊 return true; } // 如果佇列為空,阻塞等待 public synchronized Object poll() throws InterruptedException { while (head.next == null) { wait(); } Object result = head.next.value; head.next.value = null; head = head.next; // 丟棄頭結點 --size; notifyAll(); // 可以入隊 return result; } // 省略 Node 的定義}
以上,實現了阻塞等待,但也引入了更大的效能問題
  1. 入隊和出隊動作阻塞等待同一把鎖,惡性競爭;
  2. 當佇列變更時,所有阻塞執行緒被喚醒,大量的執行緒上下文切換,競爭同步鎖,最終可能只有一個執行緒能執行;
需要注意的點:
  1. 阻塞等待 wait 會丟擲中斷異常。關於異常的問題下文會處理;
  2. 介面需要支援丟擲中斷異常;
  3. 隊裡變更需要 notifyAll 避免執行緒中斷或異常,丟失訊息;

3  鎖拆分最佳化

以上第一個問題,可以透過鎖拆分來解決,即:定義兩把鎖,讀鎖和寫鎖;讀寫分離。
// 省略介面定義class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity; // 頭指標,empty: head.next == tail == null protected Node head; // 尾指標 protected Node tail; // guard: canPollCount, head protected final Object pollLock = new Object(); protected int canPollCount; // guard: canOfferCount, tail protected final Object offerLock = new Object(); protected int canOfferCount; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.head = new Node(null); this.tail = head; } // 如果佇列已滿,透過返回值標識 public boolean offer(Object obj) throws InterruptedException { synchronized(offerLock) { while(canOfferCount <= 0) { offerLock.wait(); } Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; pollLock.notifyAll(); } return true; } // 如果佇列為空,阻塞等待 public Object poll() throws InterruptedException { Object result = null; synchronized(pollLock) { while(canPollCount <= 0) { pollLock.wait(); } result = head.next.value; head.next.value = null; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; offerLock.notifyAll(); } return result; } // 省略 Node 定義}
以上
  1. 定義了兩把鎖, pollLock 和 offerLock 拆分出隊和入隊競爭;
  2. 入隊鎖同步的變數為:callOfferCount 和 tail;
  3. 出隊鎖同步的變數為:canPollCount 和 head;
  4. 出隊的動作:首先拿到 pollLock 衛式等待後,完成出隊動作;
    然後拿到 offerLock 傳送通知,解除入隊的等待執行緒。
  5. 入隊的動作:首先拿到 offerLock 衛式等待後,完成入隊的動作;
    然後拿到 pollLock 傳送通知,解除出隊的等待執行緒。
以上實現
  1. 確保透過入隊鎖和出隊鎖,分別保證入隊和出隊的原子性;
  2. 出隊動作,透過特別的實現,確保出隊只會變更 head ,避免獲取 offerLock;
  3. 透過 offerLock.notifyAll 和 pollLock.notifyAll 解決讀寫競爭的問題;
但上述實現還有未解決的問題:
當有多個入隊執行緒等待時,一次出隊的動作會觸發所有入隊執行緒競爭,大量的執行緒上下文切換,最終只有一個執行緒能執行。

即,還有 讀與讀 和 寫與寫 之間的競爭問題。

4  狀態追蹤解除競爭

此處可以透過狀態追蹤,解除讀與讀之間和寫與寫之間的競爭問題
class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity; // 頭指標,empty: head.next == tail == null protected Node head; // 尾指標 protected Node tail; // guard: canPollCount, head protected final Object pollLock = new Object(); protected int canPollCount; protected int waitPollCount; // guard: canOfferCount, tail protected final Object offerLock = new Object(); protected int canOfferCount; protected int waitOfferCount; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.waitPollCount = 0; this.waitOfferCount = 0; this.head = new Node(null); this.tail = head; } // 如果佇列已滿,透過返回值標識 public boolean offer(Object obj) throws InterruptedException { synchronized(offerLock) { while(canOfferCount <= 0) { waitOfferCount++; offerLock.wait(); waitOfferCount--; } Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; if (waitPollCount > 0) { pollLock.notify(); } } return true; } // 如果佇列為空,阻塞等待 public Object poll() throws InterruptedException { Object result; synchronized(pollLock) { while(canPollCount <= 0) { waitPollCount++; pollLock.wait(); waitPollCount--; } result = head.next.value; head.next.value = null; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; if (waitOfferCount > 0) { offerLock.notify(); } } return result; } // 省略 Node 的定義}
以上
  1. 透過 waitOfferCount 和 waitPollCount 的狀態追蹤解決 讀寫內部的競爭問題;
  2. 當佇列變更時,根據追蹤的狀態,決定是否派發訊息,觸發執行緒阻塞狀態解除;
但,上述的實現在某些場景下會執行失敗,面臨活性問題,考慮
情況一:
  1. 初始狀態佇列為空 執行緒 A 執行出隊動作,被阻塞在 pollLock , 此時 waitPollCount==1;
  2. 此時執行緒 A 在執行 wait 時被中斷,丟擲異常, waitPollCount==1 並未被重置;
  3. 阻塞佇列為空,但 waitPollCount==1 類狀態異常;
情況二:
  1. 初始狀態佇列為空 執行緒 A B 執行出隊動作,被阻塞在 pollLock , 此時 waitPollCount==2;
  2. 執行緒 C 執行入隊動作,可以立即執行,執行完成後,觸發 pollLock 解除一個執行緒等待 notify;
  3. 觸發的執行緒在 JVM 實現中是隨機的,假設執行緒 A 被解除阻塞;
  4. 假設執行緒 A 在阻塞過程中已被中斷,阻塞解除後 JVM 檢查 interrupted 狀態,丟擲 InterruptedException 異常;
  5. 此時佇列中有一個元素,但執行緒 A 仍阻塞在 pollLock 中,且一直阻塞下去;
以上為解除阻塞訊息丟失的例子,問題的根源在與異常處理。

5  解決異常問題

解決執行緒中斷退出的問題,執行緒校驗中斷狀態的場景
  1. JVM 通常只會在有限的幾個場景檢測執行緒的中斷狀態, wait, Thread.join, Thread.sleep;
  2. JVM 在檢測到執行緒中斷狀態 Thread.interrupted() 後,會清除中斷標誌,丟擲 InterruptedException;
  3. 通常為了保證執行緒對中斷及時響應, run 方法中需要自主檢測中斷標誌,中斷執行緒,特別是對中斷比較敏感需要保持類的不變式的場景;
class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity; // 頭指標,empty: head.next == tail == null protected Node head; // 尾指標 protected Node tail; // guard: canPollCount, head, waitPollCount protected final Object pollLock = new Object(); protected int canPollCount; protected int waitPollCount; // guard: canOfferCount, tail, waitOfferCount protected final Object offerLock = new Object(); protected int canOfferCount; protected int waitOfferCount; public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canPollCount = 0; this.canOfferCount = capacity; this.waitPollCount = 0; this.waitOfferCount = 0; this.head = new Node(null); this.tail = head; } // 如果佇列已滿,透過返回值標識 public boolean offer(Object obj) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); // 執行緒已中斷,直接退出即可,防止中斷執行緒競爭鎖 } synchronized(offerLock) { while(canOfferCount <= 0) { waitOfferCount++; try { offerLock.wait(); } catch (InterruptedException e) { // 觸發其他執行緒 offerLock.notify(); throw e; } finally { waitOfferCount--; } } Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; if (waitPollCount > 0) { pollLock.notify(); } } return true; } // 如果佇列為空,阻塞等待 public Object poll() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } Object result = null; synchronized(pollLock) { while(canPollCount <= 0) { waitPollCount++; try { pollLock.wait(); } catch (InterruptedException e) { pollLock.notify(); throw e; } finally { waitPollCount--; } } result = head.next.value; head.next.value = 0; // ignore head; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; if (waitOfferCount > 0) { offerLock.notify(); } } return result; } // 省略 Node 的定義}
以上
  1. 當等待執行緒中斷退出時,捕獲中斷異常,透過 pollLock.notify 和 offerLock.notify 轉發訊息;
  2. 透過在 finally 中恢復狀態追蹤變數;
透過狀態變數追蹤可以解決讀與讀之間和寫與寫之間的鎖競爭問題。

以下考慮如果解決讀與讀之間和寫與寫之間的公平性問題。

6  解決公平性

公平性的問題的解決需要將狀態變數的追蹤轉換為:請求監視器追蹤。
  1. 每個請求對應一個監視器;
  2. 透過內部維護一個 FIFO 佇列,實現公平性;
  3. 在佇列狀態變更時,釋放佇列中的監視器;
以上邏輯可以統一抽象為
boolean needToWait;synchronized(this) { needToWait = calculateNeedToWait(); if (needToWait) { enqueue(monitor); // 請求對應的monitor }}if (needToWait) { monitor.doWait();}
需要注意
  1. monitor.doWait() 需要在 this 的衛式語句之外,因為如果在內部, monitor.doWait 並不會釋放 this鎖;
  2. calculateNeedToWait() 需要在 this 的守衛之內完成,避免同步問題;
  3. 需要考慮中斷異常的問題;
基於以上的邏輯抽象,實現公平佇列
// 省略介面定義class FairnessBoundedBlockingQueue implements Queue { // 容量 protected final int capacity; // 頭指標,empty: head.next == tail == null protected Node head; // 尾指標 protected Node tail; // guard: canPollCount, head, pollQueue protected final Object pollLock = new Object(); protected int canPollCount; // guard: canOfferCount, tail, offerQueue protected final Object offerLock = new Object(); protected int canOfferCount; protected final WaitQueue pollQueue = new WaitQueue(); protected final WaitQueue offerQueue = new WaitQueue(); public FairnessBoundedBlockingQueue(int capacity) { this.capacity = capacity; this.canOfferCount = capacity; this.canPollCount = 0; this.head = new Node(null); this.tail = head; } // 如果佇列已滿,透過返回值標識 public boolean offer(Object obj) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); // 執行緒已中斷,直接退出即可,防止中斷執行緒競爭鎖 } WaitNode wait = null; synchronized(offerLock) { // 在有阻塞請求或者佇列為空時,阻塞等待 if (canOfferCount <= 0 || !offerQueue.isEmpty()) { wait = new WaitNode(); offerQueue.enq(wait); } else { // continue. } } try { if (wait != null) { wait.doWait(); } if (Thread.interrupted()) { throw new InterruptedException(); } } catch (InterruptedException e) { offerQueue.doNotify(); throw e; } // 確保此時執行緒狀態正常,以下不會校驗中斷 synchronized(offerLock) { Node node = new Node(obj); tail.next = node; tail = node; canOfferCount--; } synchronized(pollLock) { ++canPollCount; pollQueue.doNotify(); } return true; } // 如果佇列為空,阻塞等待 public Object poll() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } Object result = null; WaitNode wait = null; synchronized(pollLock) { // 在有阻塞請求或者佇列為空時,阻塞等待 if (canPollCount <= 0 || !pollQueue.isEmpty()) { wait = new WaitNode(); pollQueue.enq(wait); } else { // ignore } } try { if (wait != null) { wait.doWait(); } if (Thread.interrupted()) { throw new InterruptedException(); } } catch (InterruptedException e) { // 傳遞訊息 pollQueue.doNotify(); throw e; } // 以下不會檢測執行緒中斷狀態 synchronized(pollLock) { result = head.next.value; head.next.value = 0; // ignore head; head = head.next; canPollCount--; } synchronized(offerLock) { canOfferCount++; offerQueue.doNotify(); } return result; } class WaitQueue { WaitNode head; WaitNode tail; WaitQueue() { head = new WaitNode(); tail = head; } synchronized void doNotify() { for(;;) { WaitNode node = deq(); if (node == null) { break; } else if (node.doNotify()) { // 此處確保NOTIFY成功 break; } else { // ignore, and retry. } } } synchronized boolean isEmpty() { return head.next == null; } synchronized void enq(WaitNode node) { tail.next = node; tail = tail.next; } synchronized WaitNode deq() { if (head.next == null) { return null; } WaitNode res = head.next; head = head.next; if (head.next == null) { tail = head; // 為空,遷移tail節點 } return res; } } class WaitNode { boolean released; WaitNode next; WaitNode() { released = false; next = null; } synchronized void doWait() throws InterruptedException { try { while (!released) { wait(); } } catch (InterruptedException e) { if (!released) { released = true; throw e; } else { // 如果是NOTIFY之後收到中斷的訊號,不能丟擲異常;需要做RELAY處理 Thread.currentThread().interrupt(); } } } synchronized boolean doNotify() { if (!released) { released = true; notify(); // 明確釋放了一個執行緒,返回true return true; } else { // 沒有釋放新的執行緒,返回false return false; } } } // 省略 Node 的定義}
以上
  1. 核心是替換狀態追蹤變數為同步節點, WaitNode
  2. WaitNode 透過簡單的同步佇列組織實現 FIFO 協議,每個執行緒等待各自的 WaitNode 監視器;
  3. WaitNode 內部維持 released 狀態,標識執行緒阻塞狀態是否被釋放,主要是為了處理中斷的問題;
  4. WaitQueue 本身是全同步的,由於已解決了讀寫競爭已經讀寫內部競爭的問題, WaitQueue 同步並不會造成問題;
  5. WaitQueue 是無界佇列,是一個潛在的問題;但由於其只做同步的追蹤,而且追蹤的通常是執行緒,通常並不是問題;
  6. 最終的公平有界佇列實現,無論是入隊還是出隊,首先衛式語句判定是否需要入隊等待,如果入隊等待,透過公平性協議等待;
    當訊號釋放時,藉助讀寫鎖同步更新佇列;最後同樣藉助讀寫鎖,觸發佇列更新訊息;

7  等待時間的問題

併發場景下,等待通常會設定為限時等待 TIMED_WAITING ,避免死鎖或損失系統活性;

實現同步佇列的限時等待,並沒想象的那麼困難
class TimeoutException extends InterruptedException {}class WaitNode { boolean released; WaitNode next; WaitNode() { released = false; next = null; } synchronized void doWait(long milliSeconds) throws InterruptedException { try { long startTime = System.currentTimeMillis(); long toWait = milliSeconds; for (;;) { wait(toWait); if (released) { return; } long now = System.currentTimeMillis(); toWait = toWait - (now - startTime); if (toWait <= 0) { throw new TimeoutException(); } } } catch (InterruptedException e) { if (!released) { released = true; throw e; } else { // 如果已經釋放訊號量,此處不丟擲異常;但恢復中斷狀態 Thread.currentThread().interrupt(); } } } synchronized boolean doNotify() { if (!released) { released = true; notify(); return true; } else { return false; } }
由於所有的等待都阻塞在 WaitNode 監視器,以上
  • 首先定義超時異常,此處只是為了方便異常處理,繼承 InterruptedException
  • 此處依賴於 wait(long timeout) 的超時等待實現,這通常不是問題;
最後,將 WaitNode 超時等待的邏輯,帶入到 FairnessBoundedBlockingQueue 實現中,即可。

四  總結

本文透過一步步迭代,最終藉助 JAVA 同步原語實現初版的公平有界佇列。迭代實現過程中可以看到以下幾點:
  1. 觀念的轉變,將呼叫一個類的方法思維轉換為:在滿足一定條件下方法才可以呼叫,在呼叫前需要滿足不變式,呼叫後滿足不變式;
    由於併發的問題很難測試,通常要採用衛式表達證明併發的正確性;
  2. 在迭代實現中會看到很多模式,比如,讀寫分離時,其實可以抽象為讀鎖和寫鎖;就得到了一個抽象的 

    Lock

     的定義;
    比如,讀寫狀態追蹤,可以採用 

    Exchanger

     抽象表達;
  3. 另外,本文的實現遠非完善,還需要考慮支援 Iterator 遍歷、狀態查詢及資料遷移等操作;
最後,相信大家再看 JUC 的工具包實現,定有不一樣的體會。

阿里雲資源編排ROS使用教程

資源編排(Resource Orchestration)是一種簡單易用的雲計算資源管理和自動化運維服務。使用者透過模板描述多個雲計算資源的依賴關係、配置等,並自動完成所有資源的建立和配置,以達到自動化部署、運維等目的。編排模板同時也是一種標準化的資源和應用交付方式,並且可以隨時編輯修改,使基礎設施即程式碼(Infrastructure as Code)成為可能。
點選閱讀原文檢視詳情!


相關文章