
一 背景
JUC 工具包是 JAVA 併發程式設計的利器。
本文講述在沒有 JUC 工具包幫助下,藉助原生的 JAVA 同步原語, 如何實現一個公平有界的阻塞佇列。
希望你也能在文後體會到併發程式設計的複雜之處,以及 JUC 工具包的強大。
二 方法
本文使用到的基本工具:
-
同步監聽器 synchronized ,方法基本和程式碼塊級別; -
Object 基礎類的 wait, notify, notifyAll;
基於以上基礎工具,實現公平有界的阻塞佇列,此處:
-
將公平的定義限定為 FIFO ,也就是先阻塞等待的請求,先解除等待; -
並不保證解除等待後執行 Action 的先後順序; -
確保佇列的大小始終不超過設定的容量;但阻塞等待的請求數不做限制;
三 實現
1 基礎版本
首先,考慮在非併發場景下,藉助 ADT 實現一個基礎版本
interface Queue {
boolean offer(Object obj);
Object poll();
}
class FairnessBoundedBlockingQueue implements Queue {
// 當前大小
protected int size;
// 容量
protected final int capacity;
// 頭指標,empty: head.next == tail == null
protected Node head;
// 尾指標
protected Node tail;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}
// 如果佇列已滿,透過返回值標識
public boolean offer(Object obj) {
if (size < capacity) {
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
return true;
}
return false;
}
// 如果佇列為空,head.next == null;返回空元素
public Object poll() {
if (head.next != null) {
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丟棄頭結點
--size;
return result;
}
return null;
}
class Node {
Object value;
Node next;
Node(Object obj) {
this.value = obj;
next = null;
}
}
}
以上
-
定義支援佇列的兩個基礎介面, poll 和 offer; -
佇列的實現,採用經典實現; -
考慮在佇列空的情況下, poll 返回為空,非阻塞; -
佇列在滿的情況下, offer 返回 false ,入隊不成功,無異常;
需要注意的一點:在出隊時,本文透過遷移頭結點的方式實現,避免修改尾結點。
在下文實現併發版本時,會看到此處的用意。
2 併發版本
如果在併發場景下,上述的實現面臨一些問題,同時未實現給定的一些需求。
透過新增 synchronized ,保證併發條件下的執行緒安全問題。
注意此處做同步的原因是為了保證類的不變式。
併發問題
在併發場景下,基礎版本的實現面臨的問題包括:原子性,可見性和指令重排的問題。
參考 JMM 的相關描述。
併發問題,最簡單的解決方法是:透過 synchronized 加鎖,一次性解決問題。
// 省略介面定義
class BoundedBlockingQueue implements Queue {
// 當前大小
protected int size;
// 容量
protected final int capacity;
// 頭指標,empty: head.next == tail == null
protected Node head;
// 尾指標
protected Node tail;
public BoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}
// 如果佇列已滿,透過返回值標識
public synchronized boolean offer(Object obj) {
if (size < capacity) {
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
return true;
}
return false;
}
// 如果佇列為空,head.next == null;返回空元素
public synchronized Object poll() {
if (head.next != null) {
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丟棄頭結點
--size;
return result;
}
return null;
}
// 省略 Node 的定義
}
以上,簡單粗暴的加 synchronized 可以解決問題,但會引入新的問題:系統活性問題(此問題下文會解決)。
同時,簡單加 synchronized 同步是無法實現阻塞等待;即
-
如果佇列為空,那麼出隊的動作還是會立即返回,返回為空; -
如果佇列已滿,那麼入隊動作還是會立即返回,返回操作不成功;
實現阻塞等待,需要藉助 JAVA 中的 PV 原語:wait, notify, notifyAll
。
參考:JDK 中對 wait, notify, notifyAll 的相關描述。
衛式方法
阻塞等待,可以透過簡單的衛式方法來實現,此問題本質上可以抽象為:
-
任何一個方法都需要在滿足一定條件下才可以執行; -
執行方法前需要首先校驗不變式,然後執行變更; -
在執行完成後,校驗是否滿足後驗不變式;
WHEN(condition) Object action(Object arg) {
checkPreCondition();
doAction(arg);
checkPostCondition();
}
此種抽象 Ada 在語言層面上實現。在 JAVA 中,藉助 wait, notify, notifyAll 可以翻譯為:
// 當前執行緒
synchronized Object action(Object arg) {
while(!condition) {
wait();
}
// 前置條件,不變式
checkPreCondition();
doAction();
// 後置條件,不變式
checkPostCondition();
}
// 其他執行緒
synchronized Object notifyAction(Object arg) {
notifyAll();
}
需要注意:
-
通常會採用 notifyAll 傳送通知,而非 notify ;因為如果當前執行緒收到 notify 通知後被中斷,那麼系統將一直等待下去。
-
如果使用了 notifyAll 那麼衛式語句必須放在 while 迴圈中;因為執行緒喚醒後,執行條件已經不滿足,雖然當前執行緒持有互斥鎖。
-
衛式條件的所有變數,有任何變更都需要傳送 notifyAll 不然面臨系統活性問題
據此,不難實現簡單的阻塞版本的有界佇列,如下
interface Queue {
boolean offer(Object obj) throws InterruptedException;
Object poll() throws InterruptedException;
}
class FairnessBoundedBlockingQueue implements Queue {
// 當前大小
protected int size;
// 容量
protected final int capacity;
// 頭指標,empty: head.next == tail == null
protected Node head;
// 尾指標
protected Node tail;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.head = new Node(null);
this.tail = head;
this.size = 0;
}
// 如果佇列已滿,透過返回值標識
public synchronized boolean offer(Object obj) throws InterruptedException {
while (size < capacity) {
wait();
}
Node node = new Node(obj);
tail.next = node;
tail = node;
++size;
notifyAll(); // 可以出隊
return true;
}
// 如果佇列為空,阻塞等待
public synchronized Object poll() throws InterruptedException {
while (head.next == null) {
wait();
}
Object result = head.next.value;
head.next.value = null;
head = head.next; // 丟棄頭結點
--size;
notifyAll(); // 可以入隊
return result;
}
// 省略 Node 的定義
}
以上,實現了阻塞等待,但也引入了更大的效能問題
-
入隊和出隊動作阻塞等待同一把鎖,惡性競爭; -
當佇列變更時,所有阻塞執行緒被喚醒,大量的執行緒上下文切換,競爭同步鎖,最終可能只有一個執行緒能執行;
需要注意的點:
-
阻塞等待 wait 會丟擲中斷異常。關於異常的問題下文會處理; -
介面需要支援丟擲中斷異常; -
隊裡變更需要 notifyAll 避免執行緒中斷或異常,丟失訊息;
3 鎖拆分最佳化
以上第一個問題,可以透過鎖拆分來解決,即:定義兩把鎖,讀鎖和寫鎖;讀寫分離。
// 省略介面定義
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 頭指標,empty: head.next == tail == null
protected Node head;
// 尾指標
protected Node tail;
// guard: canPollCount, head
protected final Object pollLock = new Object();
protected int canPollCount;
// guard: canOfferCount, tail
protected final Object offerLock = new Object();
protected int canOfferCount;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.head = new Node(null);
this.tail = head;
}
// 如果佇列已滿,透過返回值標識
public boolean offer(Object obj) throws InterruptedException {
synchronized(offerLock) {
while(canOfferCount <= 0) {
offerLock.wait();
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
pollLock.notifyAll();
}
return true;
}
// 如果佇列為空,阻塞等待
public Object poll() throws InterruptedException {
Object result = null;
synchronized(pollLock) {
while(canPollCount <= 0) {
pollLock.wait();
}
result = head.next.value;
head.next.value = null;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
offerLock.notifyAll();
}
return result;
}
// 省略 Node 定義
}
以上
-
定義了兩把鎖, pollLock 和 offerLock 拆分出隊和入隊競爭; -
入隊鎖同步的變數為:callOfferCount 和 tail; -
出隊鎖同步的變數為:canPollCount 和 head; -
出隊的動作:首先拿到 pollLock 衛式等待後,完成出隊動作;然後拿到 offerLock 傳送通知,解除入隊的等待執行緒。
-
入隊的動作:首先拿到 offerLock 衛式等待後,完成入隊的動作;然後拿到 pollLock 傳送通知,解除出隊的等待執行緒。
以上實現
-
確保透過入隊鎖和出隊鎖,分別保證入隊和出隊的原子性; -
出隊動作,透過特別的實現,確保出隊只會變更 head ,避免獲取 offerLock; -
透過 offerLock.notifyAll 和 pollLock.notifyAll 解決讀寫競爭的問題;
但上述實現還有未解決的問題:
當有多個入隊執行緒等待時,一次出隊的動作會觸發所有入隊執行緒競爭,大量的執行緒上下文切換,最終只有一個執行緒能執行。
即,還有 讀與讀 和 寫與寫 之間的競爭問題。
4 狀態追蹤解除競爭
此處可以透過狀態追蹤,解除讀與讀之間和寫與寫之間的競爭問題
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 頭指標,empty: head.next == tail == null
protected Node head;
// 尾指標
protected Node tail;
// guard: canPollCount, head
protected final Object pollLock = new Object();
protected int canPollCount;
protected int waitPollCount;
// guard: canOfferCount, tail
protected final Object offerLock = new Object();
protected int canOfferCount;
protected int waitOfferCount;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.waitPollCount = 0;
this.waitOfferCount = 0;
this.head = new Node(null);
this.tail = head;
}
// 如果佇列已滿,透過返回值標識
public boolean offer(Object obj) throws InterruptedException {
synchronized(offerLock) {
while(canOfferCount <= 0) {
waitOfferCount++;
offerLock.wait();
waitOfferCount--;
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
if (waitPollCount > 0) {
pollLock.notify();
}
}
return true;
}
// 如果佇列為空,阻塞等待
public Object poll() throws InterruptedException {
Object result;
synchronized(pollLock) {
while(canPollCount <= 0) {
waitPollCount++;
pollLock.wait();
waitPollCount--;
}
result = head.next.value;
head.next.value = null;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
if (waitOfferCount > 0) {
offerLock.notify();
}
}
return result;
}
// 省略 Node 的定義
}
以上
-
透過 waitOfferCount 和 waitPollCount 的狀態追蹤解決 讀寫內部的競爭問題; -
當佇列變更時,根據追蹤的狀態,決定是否派發訊息,觸發執行緒阻塞狀態解除;
但,上述的實現在某些場景下會執行失敗,面臨活性問題,考慮
情況一:
-
初始狀態佇列為空 執行緒 A 執行出隊動作,被阻塞在 pollLock , 此時 waitPollCount==1; -
此時執行緒 A 在執行 wait 時被中斷,丟擲異常, waitPollCount==1 並未被重置; -
阻塞佇列為空,但 waitPollCount==1 類狀態異常;
情況二:
-
初始狀態佇列為空 執行緒 A B 執行出隊動作,被阻塞在 pollLock , 此時 waitPollCount==2; -
執行緒 C 執行入隊動作,可以立即執行,執行完成後,觸發 pollLock 解除一個執行緒等待 notify; -
觸發的執行緒在 JVM 實現中是隨機的,假設執行緒 A 被解除阻塞; -
假設執行緒 A 在阻塞過程中已被中斷,阻塞解除後 JVM 檢查 interrupted 狀態,丟擲 InterruptedException 異常; -
此時佇列中有一個元素,但執行緒 A 仍阻塞在 pollLock 中,且一直阻塞下去;
以上為解除阻塞訊息丟失的例子,問題的根源在與異常處理。
5 解決異常問題
解決執行緒中斷退出的問題,執行緒校驗中斷狀態的場景
-
JVM 通常只會在有限的幾個場景檢測執行緒的中斷狀態, wait, Thread.join, Thread.sleep; -
JVM 在檢測到執行緒中斷狀態 Thread.interrupted() 後,會清除中斷標誌,丟擲 InterruptedException; -
通常為了保證執行緒對中斷及時響應, run 方法中需要自主檢測中斷標誌,中斷執行緒,特別是對中斷比較敏感需要保持類的不變式的場景;
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 頭指標,empty: head.next == tail == null
protected Node head;
// 尾指標
protected Node tail;
// guard: canPollCount, head, waitPollCount
protected final Object pollLock = new Object();
protected int canPollCount;
protected int waitPollCount;
// guard: canOfferCount, tail, waitOfferCount
protected final Object offerLock = new Object();
protected int canOfferCount;
protected int waitOfferCount;
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canPollCount = 0;
this.canOfferCount = capacity;
this.waitPollCount = 0;
this.waitOfferCount = 0;
this.head = new Node(null);
this.tail = head;
}
// 如果佇列已滿,透過返回值標識
public boolean offer(Object obj) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException(); // 執行緒已中斷,直接退出即可,防止中斷執行緒競爭鎖
}
synchronized(offerLock) {
while(canOfferCount <= 0) {
waitOfferCount++;
try {
offerLock.wait();
} catch (InterruptedException e) {
// 觸發其他執行緒
offerLock.notify();
throw e;
} finally {
waitOfferCount--;
}
}
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
if (waitPollCount > 0) {
pollLock.notify();
}
}
return true;
}
// 如果佇列為空,阻塞等待
public Object poll() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Object result = null;
synchronized(pollLock) {
while(canPollCount <= 0) {
waitPollCount++;
try {
pollLock.wait();
} catch (InterruptedException e) {
pollLock.notify();
throw e;
} finally {
waitPollCount--;
}
}
result = head.next.value;
head.next.value = 0;
// ignore head;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
if (waitOfferCount > 0) {
offerLock.notify();
}
}
return result;
}
// 省略 Node 的定義
}
以上
-
當等待執行緒中斷退出時,捕獲中斷異常,透過 pollLock.notify 和 offerLock.notify 轉發訊息; -
透過在 finally 中恢復狀態追蹤變數;
透過狀態變數追蹤可以解決讀與讀之間和寫與寫之間的鎖競爭問題。
以下考慮如果解決讀與讀之間和寫與寫之間的公平性問題。
6 解決公平性
公平性的問題的解決需要將狀態變數的追蹤轉換為:請求監視器追蹤。
-
每個請求對應一個監視器; -
透過內部維護一個 FIFO 佇列,實現公平性; -
在佇列狀態變更時,釋放佇列中的監視器;
以上邏輯可以統一抽象為
boolean needToWait;
synchronized(this) {
needToWait = calculateNeedToWait();
if (needToWait) {
enqueue(monitor); // 請求對應的monitor
}
}
if (needToWait) {
monitor.doWait();
}
需要注意
-
monitor.doWait() 需要在 this 的衛式語句之外,因為如果在內部, monitor.doWait 並不會釋放 this鎖; -
calculateNeedToWait() 需要在 this 的守衛之內完成,避免同步問題; -
需要考慮中斷異常的問題;
基於以上的邏輯抽象,實現公平佇列
// 省略介面定義
class FairnessBoundedBlockingQueue implements Queue {
// 容量
protected final int capacity;
// 頭指標,empty: head.next == tail == null
protected Node head;
// 尾指標
protected Node tail;
// guard: canPollCount, head, pollQueue
protected final Object pollLock = new Object();
protected int canPollCount;
// guard: canOfferCount, tail, offerQueue
protected final Object offerLock = new Object();
protected int canOfferCount;
protected final WaitQueue pollQueue = new WaitQueue();
protected final WaitQueue offerQueue = new WaitQueue();
public FairnessBoundedBlockingQueue(int capacity) {
this.capacity = capacity;
this.canOfferCount = capacity;
this.canPollCount = 0;
this.head = new Node(null);
this.tail = head;
}
// 如果佇列已滿,透過返回值標識
public boolean offer(Object obj) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException(); // 執行緒已中斷,直接退出即可,防止中斷執行緒競爭鎖
}
WaitNode wait = null;
synchronized(offerLock) {
// 在有阻塞請求或者佇列為空時,阻塞等待
if (canOfferCount <= 0 || !offerQueue.isEmpty()) {
wait = new WaitNode();
offerQueue.enq(wait);
} else {
// continue.
}
}
try {
if (wait != null) {
wait.doWait();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} catch (InterruptedException e) {
offerQueue.doNotify();
throw e;
}
// 確保此時執行緒狀態正常,以下不會校驗中斷
synchronized(offerLock) {
Node node = new Node(obj);
tail.next = node;
tail = node;
canOfferCount--;
}
synchronized(pollLock) {
++canPollCount;
pollQueue.doNotify();
}
return true;
}
// 如果佇列為空,阻塞等待
public Object poll() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Object result = null;
WaitNode wait = null;
synchronized(pollLock) {
// 在有阻塞請求或者佇列為空時,阻塞等待
if (canPollCount <= 0 || !pollQueue.isEmpty()) {
wait = new WaitNode();
pollQueue.enq(wait);
} else {
// ignore
}
}
try {
if (wait != null) {
wait.doWait();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
} catch (InterruptedException e) {
// 傳遞訊息
pollQueue.doNotify();
throw e;
}
// 以下不會檢測執行緒中斷狀態
synchronized(pollLock) {
result = head.next.value;
head.next.value = 0;
// ignore head;
head = head.next;
canPollCount--;
}
synchronized(offerLock) {
canOfferCount++;
offerQueue.doNotify();
}
return result;
}
class WaitQueue {
WaitNode head;
WaitNode tail;
WaitQueue() {
head = new WaitNode();
tail = head;
}
synchronized void doNotify() {
for(;;) {
WaitNode node = deq();
if (node == null) {
break;
} else if (node.doNotify()) {
// 此處確保NOTIFY成功
break;
} else {
// ignore, and retry.
}
}
}
synchronized boolean isEmpty() {
return head.next == null;
}
synchronized void enq(WaitNode node) {
tail.next = node;
tail = tail.next;
}
synchronized WaitNode deq() {
if (head.next == null) {
return null;
}
WaitNode res = head.next;
head = head.next;
if (head.next == null) {
tail = head; // 為空,遷移tail節點
}
return res;
}
}
class WaitNode {
boolean released;
WaitNode next;
WaitNode() {
released = false;
next = null;
}
synchronized void doWait() throws InterruptedException {
try {
while (!released) {
wait();
}
} catch (InterruptedException e) {
if (!released) {
released = true;
throw e;
} else {
// 如果是NOTIFY之後收到中斷的訊號,不能丟擲異常;需要做RELAY處理
Thread.currentThread().interrupt();
}
}
}
synchronized boolean doNotify() {
if (!released) {
released = true;
notify();
// 明確釋放了一個執行緒,返回true
return true;
} else {
// 沒有釋放新的執行緒,返回false
return false;
}
}
}
// 省略 Node 的定義
}
以上
-
核心是替換狀態追蹤變數為同步節點, WaitNode; -
WaitNode 透過簡單的同步佇列組織實現 FIFO 協議,每個執行緒等待各自的 WaitNode 監視器; -
WaitNode 內部維持 released 狀態,標識執行緒阻塞狀態是否被釋放,主要是為了處理中斷的問題; -
WaitQueue 本身是全同步的,由於已解決了讀寫競爭已經讀寫內部競爭的問題, WaitQueue 同步並不會造成問題; -
WaitQueue 是無界佇列,是一個潛在的問題;但由於其只做同步的追蹤,而且追蹤的通常是執行緒,通常並不是問題; -
最終的公平有界佇列實現,無論是入隊還是出隊,首先衛式語句判定是否需要入隊等待,如果入隊等待,透過公平性協議等待;當訊號釋放時,藉助讀寫鎖同步更新佇列;最後同樣藉助讀寫鎖,觸發佇列更新訊息;
7 等待時間的問題
併發場景下,等待通常會設定為限時等待 TIMED_WAITING ,避免死鎖或損失系統活性;
實現同步佇列的限時等待,並沒想象的那麼困難
class TimeoutException extends InterruptedException {}
class WaitNode {
boolean released;
WaitNode next;
WaitNode() {
released = false;
next = null;
}
synchronized void doWait(long milliSeconds) throws InterruptedException {
try {
long startTime = System.currentTimeMillis();
long toWait = milliSeconds;
for (;;) {
wait(toWait);
if (released) {
return;
}
long now = System.currentTimeMillis();
toWait = toWait - (now - startTime);
if (toWait <= 0) {
throw new TimeoutException();
}
}
} catch (InterruptedException e) {
if (!released) {
released = true;
throw e;
} else {
// 如果已經釋放訊號量,此處不丟擲異常;但恢復中斷狀態
Thread.currentThread().interrupt();
}
}
}
synchronized boolean doNotify() {
if (!released) {
released = true;
notify();
return true;
} else {
return false;
}
}
由於所有的等待都阻塞在 WaitNode 監視器,以上
-
首先定義超時異常,此處只是為了方便異常處理,繼承 InterruptedException;
-
此處依賴於 wait(long timeout) 的超時等待實現,這通常不是問題;
最後,將 WaitNode 超時等待的邏輯,帶入到 FairnessBoundedBlockingQueue 實現中,即可。
四 總結
本文透過一步步迭代,最終藉助 JAVA 同步原語實現初版的公平有界佇列。迭代實現過程中可以看到以下幾點:
-
觀念的轉變,將呼叫一個類的方法思維轉換為:在滿足一定條件下方法才可以呼叫,在呼叫前需要滿足不變式,呼叫後滿足不變式; 由於併發的問題很難測試,通常要採用衛式表達證明併發的正確性; -
在迭代實現中會看到很多模式,比如,讀寫分離時,其實可以抽象為讀鎖和寫鎖;就得到了一個抽象的
Lock
的定義;比如,讀寫狀態追蹤,可以採用Exchanger
抽象表達; -
另外,本文的實現遠非完善,還需要考慮支援 Iterator 遍歷、狀態查詢及資料遷移等操作;
最後,相信大家再看 JUC 的工具包實現,定有不一樣的體會。
阿里雲資源編排ROS使用教程
資源編排(Resource Orchestration)是一種簡單易用的雲計算資源管理和自動化運維服務。使用者透過模板描述多個雲計算資源的依賴關係、配置等,並自動完成所有資源的建立和配置,以達到自動化部署、運維等目的。編排模板同時也是一種標準化的資源和應用交付方式,並且可以隨時編輯修改,使基礎設施即程式碼(Infrastructure as Code)成為可能。
點選閱讀原文檢視詳情!
關鍵詞
方法
佇列
問題
系統
為空