寫在前面:本文討論的冪等問題,均為併發場景下的冪等問題。即系統本存在冪等設計,但是在併發場景下失效了。
一 摘要
本文從釘釘實人認證場景的一例資料重複問題出發,分析了其原因是因為併發導致冪等失效,引出冪等的概念。
針對併發場景下的冪等問題,提出了一種實現冪等可行的方法論,結合通訊錄加人業務場景對資料庫冪等問題進行了簡單分析,就分散式鎖實現冪等方法展開了詳細討論。
分析了鎖在分散式場景下存在的問題,包括單點故障、網路超時、錯誤釋放他人鎖、提前釋放鎖以及分散式鎖單點故障等,提出了對應的解決方案,介紹了對應方案的具體實現。
二 問題
釘釘實人認證業務存在資料重複的問題。
1 問題現象
正常情況下,資料庫中應該只有一條實人認證成功記錄,但是實際上某使用者有多條。

2 問題原因
併發導致了不冪等。
我們先來回顧一下冪等的概念:
冪等(idempotent、idempotence)是一個數學與計算機學概念,常見於抽象代數中。在程式設計中一個冪等操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同。–來自百度百科
實人認證在業務上有冪等設計,其一般流程為:
1)使用者選擇實人認證後會在服務端初始化一條記錄;
2)使用者在釘釘移動端按照指示完成人臉比對;
3)比對完成後訪問服務端修改資料庫狀態。
在第3步中,在修改資料庫狀態之前,會判斷「是否已經初始化」、「是否已經實人認證」以及「智科是否返回認證成功」以保證冪等。僅當請求首次訪問服務端嘗試修改資料庫狀態時,才能滿足冪等的判斷條件並修改資料庫狀態。其餘任意次請求將直接返回,對資料庫狀態無影響。請求多次訪問服務端所產生的結果,和請求首次訪問服務端一致。因此,在實人認證成功的前提下,資料庫應當有且僅有一條認證成功的記錄。

但是在實際過程中我們發現,同一個請求會多次修改資料庫狀態,系統並未按照我們預期的那樣實現冪等。究其原因,是因為請求併發訪問,在首次請求完成修改服務端狀態前,併發的其他請求和首次請求都通過了冪等判斷,對資料庫狀態進行了多次修改。
併發導致了原冪等設計失效。
併發導致了不冪等。
三 解決方案
解決併發場景下冪等問題的關鍵,是找到唯一性約束,執行唯一性檢查,相同的資料儲存一次,相同的請求操作一次。
一次訪問服務端的請求,可能產生以下幾種互動:
-
與資料來源互動,例如資料庫狀態變更等; -
與其他業務系統互動,例如呼叫下游服務或傳送訊息等;
一次請求可以只包含一次互動,也可以包含多次互動。例如一次請求可以僅僅修改一次資料庫狀態,也可以在修改資料庫狀態後再發送一條資料庫狀態修改成功的訊息。
於是我們可以得出一個結論:併發場景下,如果一個系統依賴的元件冪等,那麼該系統在天然冪等。
以資料庫為例,如果一個請求對資料造成的影響是新增一條資料,那麼唯一索引可以是冪等問題的解法。資料庫會幫助我們執行唯一性檢查,相同資料不會重複落庫。
釘釘通訊錄加人就是透過資料庫的唯一索引解決了冪等問題。以釘釘通訊錄加人為例,在向資料庫寫資料之前,會先判斷資料是否已經存在於資料庫之中,如果不存在,加人請求最終會向資料庫的員工表插入一條資料。大量相同的併發的通訊錄加人請求讓系統的冪等設計失效成為可能。在一次加人請求中,(組織ID,工號)可以唯一標記一個請求,在資料庫中,也存在(組織ID,工號)的唯一索引。因此我們可以保證,多次相同的加人請求,只會修改一次資料庫狀態,即新增一條記錄。
如果所依賴的元件天然冪等,那麼問題就簡單了,但是實際情況往往更加複雜。併發場景下,如果系統依賴的元件無法冪等,我們就需要使用額外的手段實現冪等。
一個常用的手段就是使用分散式鎖。分散式鎖的實現方式有很多,比較常用的是快取式分散式鎖。
四 分散式鎖
在What is a Java distributed lock?中有這樣幾段話:
In computer science, locks are mechanisms in a multithreaded environment to prevent different threads from operating on the same resource. When using locking, a resource is "locked" for access by a specific thread, and can only be accessed by a different thread once the resource has been released. Locks have several benefits: they stop two threads from doing the same work, and they prevent errors and data corruption when two threads try to use the same resource simultaneously.
Distributed locks in Java are locks that can work with not only multiple threads running on the same machine, but also threads running on clients on different machines in a distributed system. The threads on these separate machines must communicate and coordinate to make sure that none of them try to access a resource that has been locked up by another.
這幾段話告訴我們,鎖的本質是共享資源的互斥訪問,分散式鎖解決了分散式系統中共享資源的互斥訪問的問題。
java.util.concurrent.locks包提供了豐富的鎖實現,包括公平鎖/非公平鎖,阻塞鎖/非阻塞鎖,讀寫鎖以及可重入鎖等。
我們要如何實現一個分散式鎖呢?
方案一
分散式系統中常見有兩個問題:
1)單點故障問題,即當持有鎖的應用發生單點故障時,鎖將被長期無效佔有;
2)網路超時問題,即當客戶端發生網路超時但實際上鎖成功時,我們無法再次正確的
獲取鎖。
要解決問題1,一個簡單的方案是引入過期時間(lease time),對鎖的持有將是有時效的,當應用發生單點故障時,被其持有的鎖可以自動釋放。
要解決問題2,一個簡單的方案是支援可重入,我們為每個獲取鎖的客戶端都配置一個不會重複的身份標識(通常是UUID),上鎖成功後鎖將帶有該客戶端的身份標識。當實際上鎖成功而客戶端超時重試時,我們可以判斷鎖已被該客戶端持有而返回成功。
綜上我們給出了一個lease-based distribute lock方案。出於效能考量,使用快取作為鎖的儲存介質,利用MVCC(Multiversion concurrency control)機制解決共享資源互斥訪問問題,具體實現可見附錄程式碼。
分散式鎖的一般使用方式如下
● 初始化分散式鎖的工廠
● 利用工廠生成一個分散式鎖例項
● 使用該分散式例項上鎖和解鎖操作
@Test
public void testTryLock() {
//初始化工廠
MdbDistributeLockFactory mdbDistributeLockFactory = new MdbDistributeLockFactory();
mdbDistributeLockFactory.setNamespace(603);
mdbDistributeLockFactory.setMtairManager(new MultiClusterTairManager());
//獲得鎖
DistributeLock lock = mdbDistributeLockFactory.getLock("TestLock");
//上鎖解鎖操作
boolean locked = lock.tryLock();
if (!locked) {
return;
}
try {
//do something
} finally {
lock.unlock();
}
}
該方案簡單易用,但是問題也很明顯。例如,釋放鎖的時候只是簡單的將快取中的key失效,所以存在錯誤釋放他人已持有鎖問題。所幸只要鎖的租期設定的足夠長,該問題出現機率就足夠小。
我們借用Martin Kleppmann在文章How to do distributed locking中的一張圖說明該問題。

設想一種情況,當佔有鎖的Client 1在釋放鎖之前,鎖就已經到期了,Client 2將獲取鎖,此時鎖被Client 2持有,但是Client 1可能會錯誤的將其釋放。一個更優秀的方案,我們給每個鎖都設定一個身份標識,在釋放鎖的時候,1)首先查詢鎖是否是自己的,2)如果是自己的則釋放鎖。受限於實現方式,步驟1和步驟2不是原子操作,在步驟1和步驟2之間,如果鎖到期被其他客戶端獲取,此時也會錯誤的釋放他人的鎖。
方案二
藉助Redis的Lua指令碼,可以完美的解決存在錯誤釋放他人已持有鎖問題的。在Distributed locks with Redis這篇文章的 Correct implementation with a single instance 這一節中,我們可以得到我們想要的答案——如何實現一個分散式鎖。
當我們想要獲取鎖時,我們可以執行如下方法
SET resource_name my_random_value NX PX 30000
當我們想要釋放鎖時,我們可以執行如下的Lua指令碼
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
方案三
在方案一和方案二的討論過程中,有一個問題被我們反覆提及:鎖的自動釋放。
這是一把雙刃劍:
1)一方面它很好的解決了持有鎖的客戶端單點故障的問題
2)另一方面,如果鎖提前釋放,就會出現鎖的錯誤持有狀態
這個時候,我們可以引入Watch Dog自動續租機制,我們可以參考以下Redisson是如何實現的。
在上鎖成功後,Redisson會呼叫renewExpiration()方法開啟一個Watch Dog執行緒,為鎖自動續期。每過1/3時間續一次,成功則繼續下一次續期,失敗取消續期操作。
我們可以再看看Redisson是如何續期的。renewExpiration()方法的第17行renewExpirationAsync()方法是執行鎖續期的關鍵操作,我們進入到方法內部,可以看到Redisson也是使用Lua指令碼進行鎖續租的:1)判斷鎖是否存在;2)如果存在則重置過期時間。
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(timeout -> {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
藉助Redisson的自動續期機制,我們無需再擔心鎖的自動釋放。但是討論到這裡,我還是不得不面對一個問題:分散式鎖本身不是一個分散式應用。當Redis伺服器故障無法正常工作時,整個分散式鎖也就無法提供服務。
更進一步,我們可以看看Distributed locks with Redis這篇文章中提到的Redlock演算法及其實現。
Redlock演算法不是銀彈,關於它的好與壞,也有很多爭論:
How to do distributed locking:
https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
Is Redlock safe?:
http://antirez.com/news/101
Martin Kleppmann和Antirez關於Redlock的爭辯:
https://news.ycombinator.com/item
參考資料
What is a Java distributed lock?
https://redisson.org/glossary/java-distributed-lock.html
Distributed locks and synchronizers:
https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers
Distributed locks with Redis:
https://redis.io/topics/distlock?spm=ata.21736010.0.0.31f77e3aFs96rz
附錄
分散式鎖
public class MdbDistributeLock implements DistributeLock {
/**
* 鎖的名稱空間
*/
private final int namespace;
/**
* 鎖對應的快取key
*/
private final String lockName;
/**
* 鎖的唯一標識,保證可重入,以應對put成功,但是返回超時的情況
*/
private final String lockId;
/**
* 是否持有鎖。true:是
*/
private boolean locked;
/**
* 快取例項
*/
private final TairManager tairManager;
public MdbDistributeLock(TairManager tairManager, int namespace, String lockCacheKey) {
this.tairManager = tairManager;
this.namespace = namespace;
this.lockName = lockCacheKey;
this.lockId = UUID.randomUUID().toString();
}
@Override
public boolean tryLock() {
try {
//獲取鎖狀態
Result<DataEntry> getResult = null;
ResultCode getResultCode = null;
for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) {
getResult = tairManager.get(namespace, lockName);
getResultCode = getResult == null ? null : getResult.getRc();
if (noNeedRetry(getResultCode)) {
break;
}
}
//重入,已持有鎖,返回成功
if (ResultCode.SUCCESS.equals(getResultCode)
&& getResult.getValue() != null && lockId.equals(getResult.getValue().getValue())) {
locked = true;
return true;
}
//不可獲取鎖,返回失敗
if (!ResultCode.DATANOTEXSITS.equals(getResultCode)) {
log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId());
return false;
}
//嘗試獲取鎖
ResultCode putResultCode = null;
for (int cnt = 0; cnt < DEFAULT_RETRY_TIMES; cnt++) {
putResultCode = tairManager.put(namespace, lockName, lockId, MDB_CACHE_VERSION,
DEFAULT_EXPIRE_TIME_SEC);
if (noNeedRetry(putResultCode)) {
break;
}
}
if (!ResultCode.SUCCESS.equals(putResultCode)) {
log.error("tryLock fail code={} lock={} traceId={}", getResultCode, this, EagleEye.getTraceId());
return false;
}
locked = true;
return true;
} catch (Exception e) {
log.error("DistributedLock.tryLock fail lock={}", this, e);
}
return false;
}
@Override
public void unlock() {
if (!locked) {
return;
}
ResultCode resultCode = tairManager.invalid(namespace, lockName);
if (!resultCode.isSuccess()) {
log.error("DistributedLock.unlock fail lock={} resultCode={} traceId={}", this, resultCode,
EagleEye.getTraceId());
}
locked = false;
}
/**
* 判斷是否需要重試
*
* @param resultCode 快取的返回碼
* @return true:不用重試
*/
private boolean noNeedRetry(ResultCode resultCode) {
return resultCode != null && !ResultCode.CONNERROR.equals(resultCode) && !ResultCode.TIMEOUT.equals(
resultCode) && !ResultCode.UNKNOW.equals(resultCode);
}
}
分散式鎖工廠
public class MdbDistributeLockFactory implements DistributeLockFactory {
/**
* 快取的名稱空間
*/
@Setter
private int namespace;
@Setter
private MultiClusterTairManager mtairManager;
@Override
public DistributeLock getLock(String lockName) {
return new MdbDistributeLock(mtairManager, namespace, lockName);
}
}
資料庫安全
點選閱讀原文檢視詳情!
關鍵詞
分散式鎖
問題
獲取鎖
redis.call
資料