防止超賣的七種實現

👉 這是一個或許對你有用的社群
🐱 一對一交流/面試小冊/簡歷最佳化/求職解惑,歡迎加入芋道快速開發平臺知識星球。下面是星球提供的部分資料:
👉這是一個或許對你有用的開源專案
國產 Star 破 10w+ 的開源專案,前端包括管理後臺 + 微信小程式,後端支援單體和微服務架構。
功能涵蓋 RBAC 許可權、SaaS 多租戶、資料許可權、商城、支付、工作流、大屏報表、微信公眾號、ERPCRMAI 大模型等等功能:
  • Boot 多模組架構:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 微服務架構:https://gitee.com/zhijiantianya/yudao-cloud
  • 影片教程:https://doc.iocoder.cn
【國內首批】支援 JDK 17/21 + SpringBoot 3.3、JDK 8/11 + Spring Boot 2.7 雙版本 

1.引言

高併發場景在現場的日常工作中很常見,特別是在網際網路公司中,這篇文章就來透過秒殺商品來模擬高併發的場景。文章末尾會附上文章的所有程式碼、指令碼和測試用例。
  • 本文環境: SpringBoot 2.5.7 + MySQL 8.0 X + MybatisPlus + Swagger2.9.2
  • 模擬工具: Jmeter
  • 模擬場景: 減庫存->建立訂單->模擬支付
基於 Spring Boot + MyBatis Plus + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 影片教程:https://doc.iocoder.cn/video/

2.商品秒殺-超賣

在開發中,對於下面的程式碼,可能很熟悉:在Service裡面加上@Transactional事務註解和Lock鎖
控制層:Controller
@ApiOperation

(value=

"秒殺實現方式——Lock加鎖"

)

@PostMapping

(

"/start/lock"

)

public Result startLock(long skgId)

{

try

 {

        log.info(

"開始秒殺方式一..."

);

finallong

 userId = (

int

) (

new

 Random().nextDouble() * (

99999

 - 

10000

 + 

1

)) + 

10000

;

        Result result = secondKillService.startSecondKillByLock(skgId, userId);

if

(result != 

null

){

            log.info(

"使用者:{}--{}"

, userId, result.get(

"msg"

));

        }

else

{

            log.info(

"使用者:{}--{}"

, userId, 

"哎呦喂,人也太多了,請稍後!"

);

        }

    } 

catch

 (Exception e) {

        e.printStackTrace();

    } 

finally

 {
    }

return

 Result.ok();

}

業務層:Service
@Override
@Transactional

(rollbackFor = Exception

.

class

)

publicResultstartSecondKillByLock

(

longskgId

longuserId

{

    lock.lock();

try

 {

// 校驗庫存

        SecondKill secondKill = secondKillMapper.selectById(skgId);

        Integer number = secondKill.getNumber();

if

 (number > 

0

) {

// 扣庫存

            secondKill.setNumber(number - 

1

);

            secondKillMapper.updateById(secondKill);

// 建立訂單

            SuccessKilled killed = 

new

 SuccessKilled();

            killed.setSeckillId(skgId);

            killed.setUserId(userId);

            killed.setState((

short

0

);

            killed.setCreateTime(

new

 Timestamp(System.currentTimeMillis()));

            successKilledMapper.insert(killed);

// 模擬支付

            Payment payment = 

new

 Payment();

            payment.setSeckillId(skgId);

            payment.setSeckillId(skgId);

            payment.setUserId(userId);

            payment.setMoney(

40

);

            payment.setState((

short

1

);

            payment.setCreateTime(

new

 Timestamp(System.currentTimeMillis()));

            paymentMapper.insert(payment);

        } 

else

 {

return

 Result.error(SecondKillStateEnum.END);

        }

    } 

catch

 (Exception e) {

thrownew

 ScorpiosException(

"異常了個乖乖"

);

    } 

finally

 {

        lock.unlock();

    }

return

 Result.ok(SecondKillStateEnum.SUCCESS);

}

對於上面的程式碼應該沒啥問題吧,業務方法上加事務,在處理業務的時候加鎖。
但上面這樣寫法是有問題的,會出現超賣的情況,看下測試結果:模擬1000個併發,搶100商品
Jmeter不瞭解的,可以參考這篇文章:
  • https://blog.csdn.net/zxd1435513775/article/details/106372446
這裡在業務方法開始加了鎖,在業務方法結束後釋放了鎖。但這裡的事務提交卻不是這樣的,有可能在事務提交之前,就已經把鎖釋放了,這樣會導致商品超賣現象。所以加鎖的時機很重要!
基於 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/yudao-cloud
  • 影片教程:https://doc.iocoder.cn/video/

3. 解決商品超賣

對於上面超賣現象,主要問題出現在事務中鎖釋放的時機,事務未提交之前,鎖已經釋放。(事務提交是在整個方法執行完)。如何解決這個問題呢,就是把加鎖步驟提前
  • 可以在controller層進行加鎖
  • 可以使用Aop在業務方法執行之前進行加鎖

3.1 方式一(改進版加鎖)

@ApiOperation

(value=

"秒殺實現方式——Lock加鎖"

)

@PostMapping

(

"/start/lock"

)

public Result startLock(long skgId)

{

// 在此處加鎖

    lock.lock();

try

 {

        log.info(

"開始秒殺方式一..."

);

finallong

 userId = (

int

) (

new

 Random().nextDouble() * (

99999

 - 

10000

 + 

1

)) + 

10000

;

        Result result = secondKillService.startSecondKillByLock(skgId, userId);

if

(result != 

null

){

            log.info(

"使用者:{}--{}"

, userId, result.get(

"msg"

));

        }

else

{

            log.info(

"使用者:{}--{}"

, userId, 

"哎呦喂,人也太多了,請稍後!"

);

        }

    } 

catch

 (Exception e) {

        e.printStackTrace();

    } 

finally

 {

// 在此處釋放鎖

        lock.unlock();

    }

return

 Result.ok();

}

上面這樣的加鎖就可以解決事務未提交之前,鎖釋放的問題,可以分三種情況進行壓力測試:
  • 併發數1000,商品100
  • 併發數1000,商品1000
  • 併發數2000,商品1000
對於併發量大於商品數的情況,商品秒殺一般不會出現少賣的請況,但對於併發數小於等於商品數的時候可能會出現商品少賣情況,這也很好理解。
對於沒有問題的情況就不貼圖了,因為有很多種方式,貼圖會太多

3.2 方式二(AOP版加鎖)

對於上面在控制層進行加鎖的方式,可能顯得不優雅,那就還有另一種方式進行在事務之前加鎖,那就是AOP
自定義AOP註解
@Target

({ElementType.PARAMETER, ElementType.METHOD})

@Retention

(RetentionPolicy.RUNTIME)

@Documented
public@interface

 ServiceLock {

String description()default ""

;

}

定義切面類
@Slf

4j

@Component
@Scope
@Aspect
@Order

(

1

//order越小越是最先執行,但更重要的是最先執行的最後結束
publicclassLockAspect

{

/**

     * 思考:為什麼不用synchronized

     * service 預設是單例的,併發下lock只有一個例項

     */


privatestatic

  Lock lock = 

new

 ReentrantLock(

true

); 

// 互斥鎖 引數預設false,不公平鎖

// Service層切點     用於記錄錯誤日誌
@Pointcut

(

"@annotation(com.scorpios.secondkill.aop.ServiceLock)"

)

publicvoidlockAspect()

{
    }

@Around

(

"lockAspect()"

)

public  Object around(ProceedingJoinPoint joinPoint)

{

        lock.lock();

        Object obj = 

null

;

try

 {

            obj = joinPoint.proceed();

        } 

catch

 (Throwable e) {

            e.printStackTrace();

thrownew

 RuntimeException();

        } 

finally

{

            lock.unlock();

        }

return

 obj;

    }

}

在業務方法上新增AOP註解
@Override
@ServiceLock// 使用Aop進行加鎖
@Transactional

(rollbackFor = Exception

.

class

)

publicResultstartSecondKillByAop

(

longskgId

longuserId

{

try

 {

// 校驗庫存

        SecondKill secondKill = secondKillMapper.selectById(skgId);

        Integer number = secondKill.getNumber();

if

 (number > 

0

) {

//扣庫存

            secondKill.setNumber(number - 

1

);

            secondKillMapper.updateById(secondKill);

//建立訂單

            SuccessKilled killed = 

new

 SuccessKilled();

            killed.setSeckillId(skgId);

            killed.setUserId(userId);

            killed.setState((

short

0

);

            killed.setCreateTime(

new

 Timestamp(System.currentTimeMillis()));

            successKilledMapper.insert(killed);

//支付

            Payment payment = 

new

 Payment();

            payment.setSeckillId(skgId);

            payment.setSeckillId(skgId);

            payment.setUserId(userId);

            payment.setMoney(

40

);

            payment.setState((

short

1

);

            payment.setCreateTime(

new

 Timestamp(System.currentTimeMillis()));

            paymentMapper.insert(payment);

        } 

else

 {

return

 Result.error(SecondKillStateEnum.END);

        }

    } 

catch

 (Exception e) {

thrownew

 ScorpiosException(

"異常了個乖乖"

);

    }

return

 Result.ok(SecondKillStateEnum.SUCCESS);

}

控制層:
@ApiOperation

(value=

"秒殺實現方式二——Aop加鎖"

)

@PostMapping

(

"/start/aop"

)

public Result startAop(long skgId)

{

try

 {

        log.info(

"開始秒殺方式二..."

);

finallong

 userId = (

int

) (

new

 Random().nextDouble() * (

99999

 - 

10000

 + 

1

)) + 

10000

;

        Result result = secondKillService.startSecondKillByAop(skgId, userId);

if

(result != 

null

){

            log.info(

"使用者:{}--{}"

, userId, result.get(

"msg"

));

        }

else

{

            log.info(

"使用者:{}--{}"

, userId, 

"哎呦喂,人也太多了,請稍後!"

);

        }

    } 

catch

 (Exception e) {

        e.printStackTrace();

    }

return

 Result.ok();

}

這種方式在對鎖的使用上,更高階、更美觀!

3.3 方式三(悲觀鎖一)

除了上面在業務程式碼層面加鎖外,還可以使用資料庫自帶的鎖進行併發控制。
悲觀鎖,什麼是悲觀鎖呢?通俗的說,在做任何事情之前,都要進行加鎖確認。這種資料庫級加鎖操作效率較低。
使用for update一定要加上事務,當事務處理完後,for update才會將行級鎖解除
如果請求數和秒殺商品數量一致,會出現少賣
@ApiOperation

(value=

"秒殺實現方式三——悲觀鎖"

)

@PostMapping

(

"/start/pes/lock/one"

)

public Result startPesLockOne(long skgId)

{

try

 {

        log.info(

"開始秒殺方式三..."

);

finallong

 userId = (

int

) (

new

 Random().nextDouble() * (

99999

 - 

10000

 + 

1

)) + 

10000

;

        Result result = secondKillService.startSecondKillByUpdate(skgId, userId);

if

(result != 

null

){

            log.info(

"使用者:{}--{}"

, userId, result.get(

"msg"

));

        }

else

{

            log.info(

"使用者:{}--{}"

, userId, 

"哎呦喂,人也太多了,請稍後!"

);

        }

    } 

catch

 (Exception e) {

        e.printStackTrace();

    }

return

 Result.ok();

}

業務邏輯
@Override
@Transactional

(rollbackFor = Exception

.

class

)

publicResultstartSecondKillByUpdate

(

longskgId

longuserId

{

try

 {

// 校驗庫存-悲觀鎖

        SecondKill secondKill = secondKillMapper.querySecondKillForUpdate(skgId);

        Integer number = secondKill.getNumber();

if

 (number > 

0

) {

//扣庫存

            secondKill.setNumber(number - 

1

);

            secondKillMapper.updateById(secondKill);

//建立訂單

            SuccessKilled killed = 

new

 SuccessKilled();

            killed.setSeckillId(skgId);

            killed.setUserId(userId);

            killed.setState((

short

0

);

            killed.setCreateTime(

new

 Timestamp(System.currentTimeMillis()));

            successKilledMapper.insert(killed);

//支付

            Payment payment = 

new

 Payment();

            payment.setSeckillId(skgId);

            payment.setSeckillId(skgId);

            payment.setUserId(userId);

            payment.setMoney(

40

);

            payment.setState((

short

1

);

            payment.setCreateTime(

new

 Timestamp(System.currentTimeMillis()));

            paymentMapper.insert(payment);

        } 

else

 {

return

 Result.error(SecondKillStateEnum.END);

        }

    } 

catch

 (Exception e) {

thrownew

 ScorpiosException(

"異常了個乖乖"

);

    } 

finally

 {

    }

return

 Result.ok(SecondKillStateEnum.SUCCESS);

}

Dao層
@Repository
publicinterfaceSecondKillMapperextendsBaseMapper<SecondKill

{

/**

     * 將此行資料進行加鎖,當整個方法將事務提交後,才會解鎖

     * 

@param

 skgId

     * 

@return

     */


@Select

(value = 

"SELECT * FROM seckill WHERE seckill_id=#{skgId} FOR UPDATE"

)

SecondKill querySecondKillForUpdate(@Param("skgId") Long skgId)

;
}

上面是利用for update進行對查詢資料加鎖,加的是行鎖

3.4 方式四(悲觀鎖二)

悲觀鎖的第二種方式就是利用update更新命令來加表鎖

/**

 * UPDATE鎖表

 * 

@param

 skgId  商品id

 * 

@param

 userId    使用者id

 * 

@return

 */


@Override
@Transactional

(rollbackFor = Exception

.

class

)

publicResultstartSecondKillByUpdateTwo

(

longskgId

longuserId

{

try

 {

// 不校驗,直接扣庫存更新
int

 result = secondKillMapper.updateSecondKillById(skgId);

if

 (result > 

0

) {

//建立訂單

            SuccessKilled killed = 

new

 SuccessKilled();

            killed.setSeckillId(skgId);

            killed.setUserId(userId);

            killed.setState((

short

0

);

            killed.setCreateTime(

new

 Timestamp(System.currentTimeMillis()));

            successKilledMapper.insert(killed);

//支付

            Payment payment = 

new

 Payment();

            payment.setSeckillId(skgId);

            payment.setSeckillId(skgId);

            payment.setUserId(userId);

            payment.setMoney(

40

);

            payment.setState((

short

1

);

            payment.setCreateTime(

new

 Timestamp(System.currentTimeMillis()));

            paymentMapper.insert(payment);

        } 

else

 {

return

 Result.error(SecondKillStateEnum.END);

        }

    } 

catch

 (Exception e) {

thrownew

 ScorpiosException(

"異常了個乖乖"

);

    } 

finally

 {

    }

return

 Result.ok(SecondKillStateEnum.SUCCESS);

}

Dao層
@Repository
publicinterfaceSecondKillMapperextendsBaseMapper<SecondKill

{

/**

     * 將此行資料進行加鎖,當整個方法將事務提交後,才會解鎖

     * 

@param

 skgId

     * 

@return

     */


@Select

(value = 

"SELECT * FROM seckill WHERE seckill_id=#{skgId} FOR UPDATE"

)

SecondKill querySecondKillForUpdate(@Param("skgId") Long skgId)

;

@Update

(value = 

"UPDATE seckill SET number=number-1 WHERE seckill_id=#{skgId} AND number > 0"

)

intupdateSecondKillById(@Param("skgId")long skgId)

;

}

3.5 方式五(樂觀鎖)

樂觀鎖,顧名思義,就是對操作結果很樂觀,透過利用version欄位來判斷資料是否被修改
樂觀鎖,不進行庫存數量的校驗,直接做庫存扣減
這裡使用的樂觀鎖會出現大量的資料更新異常(拋異常就會導致購買失敗)、如果配置的搶購人數比較少、比如120:100(人數:商品) 會出現少買的情況,不推薦使用樂觀鎖。
@ApiOperation

(value=

"秒殺實現方式五——樂觀鎖"

)

@PostMapping

(

"/start/opt/lock"

)

public Result startOptLock(long skgId)

{

try

 {

        log.info(

"開始秒殺方式五..."

);

finallong

 userId = (

int

) (

new

 Random().nextDouble() * (

99999

 - 

10000

 + 

1

)) + 

10000

;

// 引數添加了購買數量

        Result result = secondKillService.startSecondKillByPesLock(skgId, userId,

1

);

if

(result != 

null

){

            log.info(

"使用者:{}--{}"

, userId, result.get(

"msg"

));

        }

else

{

            log.info(

"使用者:{}--{}"

, userId, 

"哎呦喂,人也太多了,請稍後!"

);

        }

    } 

catch

 (Exception e) {

        e.printStackTrace();

    }

return

 Result.ok();

}

@Override
@Transactional

(rollbackFor = Exception

.

class

)

publicResultstartSecondKillByPesLock

(

longskgId

longuserId

intnumber

{

// 樂觀鎖,不進行庫存數量的校驗,直接
try

 {

        SecondKill kill = secondKillMapper.selectById(skgId);

// 剩餘的數量應該要大於等於秒殺的數量
if

(kill.getNumber() >= number) {

int

 result = secondKillMapper.updateSecondKillByVersion(number,skgId,kill.getVersion());

if

 (result > 

0

) {

//建立訂單

                SuccessKilled killed = 

new

 SuccessKilled();

                killed.setSeckillId(skgId);

                killed.setUserId(userId);

                killed.setState((

short

0

);

                killed.setCreateTime(

new

 Timestamp(System.currentTimeMillis()));

                successKilledMapper.insert(killed);

//支付

                Payment payment = 

new

 Payment();

                payment.setSeckillId(skgId);

                payment.setSeckillId(skgId);

                payment.setUserId(userId);

                payment.setMoney(

40

);

                payment.setState((

short

1

);

                payment.setCreateTime(

new

 Timestamp(System.currentTimeMillis()));

                paymentMapper.insert(payment);

            } 

else

 {

return

 Result.error(SecondKillStateEnum.END);

            }

        }

    } 

catch

 (Exception e) {

thrownew

 ScorpiosException(

"異常了個乖乖"

);

    } 

finally

 {

    }

return

 Result.ok(SecondKillStateEnum.SUCCESS);

}

@Repository
publicinterfaceSecondKillMapperextendsBaseMapper<SecondKill

{

/**

     * 將此行資料進行加鎖,當整個方法將事務提交後,才會解鎖

     * 

@param

 skgId

     * 

@return

     */


@Select

(value = 

"SELECT * FROM seckill WHERE seckill_id=#{skgId} FOR UPDATE"

)

SecondKill querySecondKillForUpdate(@Param("skgId") Long skgId)

;

@Update

(value = 

"UPDATE seckill SET number=number-1 WHERE seckill_id=#{skgId} AND number > 0"

)

intupdateSecondKillById(@Param("skgId")long skgId)

;

@Update

(value = 

"UPDATE seckill  SET number=number-#{number},version=version+1 WHERE seckill_id=#{skgId} AND version = #{version}"

)

intupdateSecondKillByVersion(@Param("number")int number, @Param("skgId")long skgId, @Param("version")int version)

;

}

樂觀鎖會出現大量的資料更新異常(拋異常就會導致購買失敗),會出現少買的情況,不推薦使用樂觀鎖

3.6 方式六(阻塞佇列)

利用阻塞隊類,也可以解決高併發問題。其思想就是把接收到的請求按順序存放到佇列中,消費者執行緒逐一從佇列裡取資料進行處理,看下具體程式碼。
阻塞佇列:這裡使用靜態內部類的方式來實現單例模式,在併發條件下不會出現問題。
// 秒殺佇列(固定長度為100)
publicclassSecondKillQueue

{

// 佇列大小
staticfinalint

 QUEUE_MAX_SIZE = 

100

;

// 用於多執行緒間下單的佇列
static

 BlockingQueue<SuccessKilled> blockingQueue = 

new

 LinkedBlockingQueue<SuccessKilled>(QUEUE_MAX_SIZE);

// 使用靜態內部類,實現單例模式
privateSecondKillQueue()

{};

privatestaticclassSingletonHolder

{

// 靜態初始化器,由JVM來保證執行緒安全
privatestatic

 SecondKillQueue queue = 

new

 SecondKillQueue();

    }

/**

     * 單例佇列

     * 

@return

     */


publicstatic SecondKillQueue getSkillQueue()

{

return

 SingletonHolder.queue;

    }

/**

     * 生產入隊

     * 

@param

 kill

     * 

@throws

 InterruptedException

     * add(e) 佇列未滿時,返回true;佇列滿則丟擲IllegalStateException(“Queue full”)異常——AbstractQueue

     * put(e) 佇列未滿時,直接插入沒有返回值;佇列滿時會阻塞等待,一直等到佇列未滿時再插入。

     * offer(e) 佇列未滿時,返回true;佇列滿時返回false。非阻塞立即返回。

     * offer(e, time, unit) 設定等待的時間,如果在指定時間內還不能往佇列中插入資料則返回false,插入成功返回true。

     */


public  Boolean  produce(SuccessKilled kill)

{

return

 blockingQueue.offer(kill);

    }

/**

     * 消費出隊

     * poll() 獲取並移除隊首元素,在指定的時間內去輪詢佇列看有沒有首元素有則返回,否者超時後返回null

     * take() 與帶超時時間的poll類似不同在於take時候如果當前佇列空了它會一直等待其他執行緒呼叫notEmpty.signal()才會被喚醒

     */


public  SuccessKilled consume()throws InterruptedException 

{

return

 blockingQueue.take();

    }

/**

     * 獲取佇列大小

     * 

@return

     */


publicintsize()

{

return

 blockingQueue.size();

    }

}

消費秒殺佇列:實現ApplicationRunner介面
// 消費秒殺佇列
@Slf

4j

@Component
publicclassTaskRunnerimplementsApplicationRunner

{

@Autowired
private

 SecondKillService seckillService;

@Override
publicvoidrun(ApplicationArguments var)

{

new

 Thread(() -> {

            log.info(

"佇列啟動成功"

);

while

(

true

){

try

 {

// 程序內佇列

                    SuccessKilled kill = SecondKillQueue.getSkillQueue().consume();

if

(kill != 

null

){

                        Result result = seckillService.startSecondKillByAop(kill.getSeckillId(), kill.getUserId());

if

(result != 

null

 && result.equals(Result.ok(SecondKillStateEnum.SUCCESS))){

                            log.info(

"TaskRunner,result:{}"

,result);

                            log.info(

"TaskRunner從訊息佇列取出使用者,使用者:{}{}"

,kill.getUserId(),

"秒殺成功"

);

                        }

                    }

                } 

catch

 (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        }).start();

    }

}

@ApiOperation

(value=

"秒殺實現方式六——訊息佇列"

)

@PostMapping

(

"/start/queue"

)

public Result startQueue(long skgId)

{

try

 {

        log.info(

"開始秒殺方式六..."

);

finallong

 userId = (

int

) (

new

 Random().nextDouble() * (

99999

 - 

10000

 + 

1

)) + 

10000

;

        SuccessKilled kill = 

new

 SuccessKilled();

        kill.setSeckillId(skgId);

        kill.setUserId(userId);

        Boolean flag = SecondKillQueue.getSkillQueue().produce(kill);

// 雖然進入了佇列,但是不一定能秒殺成功 進隊出隊有時間間隙
if

(flag){

            log.info(

"使用者:{}{}"

,kill.getUserId(),

"秒殺成功"

);

        }

else

{

            log.info(

"使用者:{}{}"

,userId,

"秒殺失敗"

);

        }

    } 

catch

 (Exception e) {

        e.printStackTrace();

    }

return

 Result.ok();

}

注意:在業務層和AOP方法中,不能丟擲任何異常, throw new RuntimeException()這些拋異常程式碼要註釋掉。因為一旦程式丟擲異常就會停止,導致消費秒殺佇列程序終止!
使用阻塞佇列來實現秒殺,有幾點要注意:
  • 消費秒殺佇列中呼叫業務方法加鎖與不加鎖情況一樣,也就是seckillService.startSecondKillByAop()seckillService.startSecondKillByLock()方法結果一樣,這也很好理解
  • 當佇列長度與商品數量一致時,會出現少賣的現象,可以調大數值
  • 下面是佇列長度1000,商品數量1000,併發數2000情況下出現的少賣

3.7.方式七(Disruptor佇列)

Disruptor是個高效能佇列,研發的初衷是解決記憶體佇列的延遲問題,在效能測試中發現竟然與I/O操作處於同樣的數量級,基於Disruptor開發的系統單執行緒能支撐每秒600萬訂單。
// 事件生成工廠(用來初始化預分配事件物件)
publicclassSecondKillEventFactoryimplementsEventFactory<SecondKillEvent

{

@Override
public SecondKillEvent newInstance()

{

returnnew

 SecondKillEvent();

    }

}

// 事件物件(秒殺事件)
publicclassSecondKillEventimplementsSerializable

{

privatestaticfinallong

 serialVersionUID = 

1L

;

privatelong

 seckillId;

privatelong

 userId;

// set/get方法略

}

// 使用translator方式生產者
publicclassSecondKillEventProducer

{

privatefinalstatic

 EventTranslatorVararg<SecondKillEvent> translator = (seckillEvent, seq, objs) -> {

        seckillEvent.setSeckillId((Long) objs[

0

]);

        seckillEvent.setUserId((Long) objs[

1

]);

    };

privatefinal

 RingBuffer<SecondKillEvent> ringBuffer;

publicSecondKillEventProducer(RingBuffer<SecondKillEvent> ringBuffer)

{

this

.ringBuffer = ringBuffer;

    }

publicvoidsecondKill(long seckillId, long userId)

{

this

.ringBuffer.publishEvent(translator, seckillId, userId);

    }

}

// 消費者(秒殺處理器)
@Slf

4j

publicclassSecondKillEventConsumerimplementsEventHandler<SecondKillEvent

{

private

 SecondKillService secondKillService = (SecondKillService) SpringUtil.getBean(

"secondKillService"

);

@Override
publicvoidonEvent(SecondKillEvent seckillEvent, long seq, boolean bool)

{

        Result result = secondKillService.startSecondKillByAop(seckillEvent.getSeckillId(), seckillEvent.getUserId());

if

(result.equals(Result.ok(SecondKillStateEnum.SUCCESS))){

            log.info(

"使用者:{}{}"

,seckillEvent.getUserId(),

"秒殺成功"

);

        }

    }

}

publicclassDisruptorUtil

{

static

 Disruptor<SecondKillEvent> disruptor;

static

{

        SecondKillEventFactory factory = 

new

 SecondKillEventFactory();

int

 ringBufferSize = 

1024

;

        ThreadFactory threadFactory = runnable -> 

new

 Thread(runnable);

        disruptor = 

new

 Disruptor<>(factory, ringBufferSize, threadFactory);

        disruptor.handleEventsWith(

new

 SecondKillEventConsumer());

        disruptor.start();

    }

publicstaticvoidproducer(SecondKillEvent kill)

{

        RingBuffer<SecondKillEvent> ringBuffer = disruptor.getRingBuffer();

        SecondKillEventProducer producer = 

new

 SecondKillEventProducer(ringBuffer);

        producer.secondKill(kill.getSeckillId(),kill.getUserId());

    }

}

@ApiOperation

(value=

"秒殺實現方式七——Disruptor佇列"

)

@PostMapping

(

"/start/disruptor"

)

public Result startDisruptor(long skgId)

{

try

 {

        log.info(

"開始秒殺方式七..."

);

finallong

 userId = (

int

) (

new

 Random().nextDouble() * (

99999

 - 

10000

 + 

1

)) + 

10000

;

        SecondKillEvent kill = 

new

 SecondKillEvent();

        kill.setSeckillId(skgId);

        kill.setUserId(userId);

        DisruptorUtil.producer(kill);

    } 

catch

 (Exception e) {

        e.printStackTrace();

    }

return

 Result.ok();

}

經過測試,發現使用Disruptor佇列佇列,與自定義佇列有著同樣的問題,也會出現超賣的情況,但效率有所提高。

4. 小結

對於上面七種實現併發的方式,做一下總結:
  • 一、二方式是在程式碼中利用鎖和事務的方式解決了併發問題,主要解決的是鎖要載入事務之前
  • 三、四、五方式主要是資料庫的鎖來解決併發問題,方式三是利用for upate對錶加行鎖,方式四是利用update來對錶加鎖,方式五是透過增加version欄位來控制資料庫的更新操作,方式五的效果最差
  • 六、七方式是透過佇列來解決併發問題,這裡需要特別注意的是,在程式碼中不能透過throw拋異常,否則消費執行緒會終止,而且由於進隊和出隊存在時間間隙,會導致商品少賣
上面所有的情況都經過程式碼測試,測試分一下三種情況:
  • 併發數1000,商品數100
  • 併發數1000,商品數1000
  • 併發數2000,商品數1000
思考:分散式情況下如何解決併發問題呢?下次繼續試驗。
原始碼地址:
  • https://github.com/Hofanking/springboot-second-skill-example

歡迎加入我的知識星球,全面提升技術能力。
👉 加入方式,長按”或“掃描”下方二維碼噢
星球的內容包括:專案實戰、面試招聘、原始碼解析、學習路線。
文章有幫助的話,在看,轉發吧。
謝謝支援喲 (*^__^*)

相關文章