日常工作,MQ的8種常用使用場景

👉 這是一個或許對你有用的社群
🐱 一對一交流/面試小冊/簡歷最佳化/求職解惑,歡迎加入芋道快速開發平臺知識星球。下面是星球提供的部分資料:
👉這是一個或許對你有用的開源專案
國產 Star 破 10w+ 的開源專案,前端包括管理後臺 + 微信小程式,後端支援單體和微服務架構。
功能涵蓋 RBAC 許可權、SaaS 多租戶、資料許可權、商城、支付、工作流、大屏報表、微信公眾號、ERPCRMAI 大模型等等功能:
  • Boot 多模組架構:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 微服務架構:https://gitee.com/zhijiantianya/yudao-cloud
  • 影片教程:https://doc.iocoder.cn
【國內首批】支援 JDK 17/21 + SpringBoot 3.3、JDK 8/11 + Spring Boot 2.7 雙版本 

前言

我們日常開發中,經常跟MQ(訊息佇列)打交道。本文梳理了MQ的8種使用場景。
基於 Spring Boot + MyBatis Plus + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 影片教程:https://doc.iocoder.cn/video/

1. 非同步處理

面試官在問我們MQ作用時,很多夥伴馬上想到非同步處理、解耦、流量削鋒 等等。
MQ 最常見的應用場景之一就是非同步處理
比如,在使用者註冊場景中,當用戶資訊儲存成功後,系統需要傳送一個簡訊、或者郵箱訊息,通知使用者註冊成功。如果這個簡訊或者郵箱訊息傳送比較耗時 ,則可能拖垮註冊介面 。又或者如果呼叫第三方簡訊、郵件傳送介面失敗,也會影響註冊介面。一般我們不希望一個通知類的功能,去影響註冊主流程,這時候,則可以使用MQ來實現非同步處理
簡要程式碼如下 :先儲存使用者資訊,然後傳送註冊成功的MQ訊息
// 使用者註冊方法
publicvoidregisterUser(String username, String email, String phoneNumber)

{

// 儲存使用者資訊(簡化版)

      userService.add(buildUser(username,email,phoneNumber))

// 傳送訊息

      String registrationMessage = 

"User "

 + username + 

" has registered successfully."

;

// 傳送訊息到佇列

      rabbitTemplate.convertAndSend(

"registrationQueue"

, registrationMessage);

  }

消費者從佇列中讀取訊息併發送簡訊或郵件
@Service
publicclassNotificationService

{

// 監聽訊息佇列中的訊息併發送簡訊/郵件
@RabbitListener

(queues = 

"registrationQueue"

)

publicvoidhandleRegistrationNotification(String message)

{

// 這裡可以進行簡訊或郵件的傳送操作

        System.out.println(

"Sending registration notification: "

 + message);

// 假設這裡是傳送簡訊的操作

        sendSms(message);

// 也可以做其他通知(比如發郵件等)

        sendEmail(message);

    }

  }

基於 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/yudao-cloud
  • 影片教程:https://doc.iocoder.cn/video/

2. 解耦

在微服務架構中,各個服務通常需要進行相互通訊 。使用 MQ 可以幫助解耦服務 ,避免直接呼叫導致的強耦合。
一個電商平臺的庫存服務和支付服務。支付服務在處理支付後,需要向庫存服務傳送扣庫存的請求,但不直接呼叫 API,而是透過 MQ 傳送訊息,讓庫存服務非同步處理。
支付服務在支付成功後將訊息傳送到 RocketMQ:
import

 org.apache.rocketmq.client.producer.DefaultMQProducer;

import

 org.apache.rocketmq.common.message.Message;

publicclassPaymentService

{

private

 DefaultMQProducer producer;

publicPaymentService()throws Exception 

{

        producer = 

new

 DefaultMQProducer(

"PaymentProducerGroup"

);

        producer.setNamesrvAddr(

"localhost:9876"

);  

// RocketMQ NameServer 地址

        producer.start();

    }

publicvoidprocessPayment(String orderId, int quantity)throws Exception 

{

// 1. 模擬呼叫支付介面(例如:支付寶、微信支付等)
boolean

 paymentSuccessful = callPayment(orderId, quantity);

if

 (paymentSuccessful) {

// 2. 支付成功後,建立支付訊息併發送到 RocketMQ

            String messageBody = 

"OrderId: "

 + orderId + 

", Quantity: "

 + quantity;

            Message message = 

new

 Message(

"paymentTopic"

"paymentTag"

, messageBody.getBytes());

            producer.send(message);    

        }

    }

}

庫存服務從 RocketMQ 中消費支付訊息,並處理扣庫存的邏輯:
publicclassInventoryService

{

private

 DefaultMQPushConsumer consumer;

publicInventoryService()throws Exception 

{

        consumer = 

new

 DefaultMQPushConsumer(

"InventoryConsumerGroup"

);

        consumer.setNamesrvAddr(

"localhost:9876"

);

        consumer.subscribe(

"paymentTopic"

"paymentTag"

);

// 訊息監聽器

        consumer.registerMessageListener((msgs, context) -> {

for

 (MessageExt msg : msgs) {

                String messageBody = 

new

 String(msg.getBody());

// 執行扣庫存操作

                reduceStock(messageBody);

            }

returnnull

// 返回消費成功

        });
        consumer.start();

        System.out.println(

"InventoryService started..."

);

    }

}

3. 流量削鋒

在高併發的情況下,有些請求可能會產生瞬時流量峰值,直接處理可能會導致服務過載。比如:
  • 春運快到了,12306的搶票就是這種案例。
  • 又或者雙12這種大促,訂單壓力會比較大。
  • 秒殺的時候,也需要避免流量暴漲,打垮應用系統的風險
這些場景,我們都可以使用MQ來進行流量的削峰填谷,確保系統平穩執行。
假設秒殺系統每秒最多可以處理2k個請求,每秒卻有5k的請求過來,可以引入訊息佇列,秒殺系統每秒從訊息佇列拉2k請求處理得了。

4. 延時任務

在電商平臺的訂單處理中,如果使用者下單後一定時間內未支付,需要自動取消訂單。透過MQ的延時佇列功能 ,可以設定訊息延遲消費的時間,當訊息到達延遲時間後,由消費者處理取消訂單的邏輯。
當用戶下單時,生成一個訂單併發送一條延遲訊息到RocketMQ。延遲時間可以根據訂單的超時時間設定:
@Service
publicclassOrderService

{

@Autowired
private

 RocketMQTemplate rocketMQTemplate;

publicvoidcreateOrder(Order order)

{

// 儲存訂單邏輯(省略)

// 計算延遲時間(單位:毫秒)
long

 delay = order.getTimeout();

// 傳送延遲訊息

  rocketMQTemplate.syncSend(

"orderCancelTopic:delay"

 + delay,

    MessageBuilder.withPayload(order).build(),

10000

// 訊息傳送超時時間(單位:毫秒)

    (

int

) (delay / 

1000

// RocketMQ的延遲級別是以秒為單位的,因此需要轉換為秒

  );

 }

}

注意:RocketMQ的延遲級別是固定的,如1s、5s、10s等。如果訂單的延遲時間不是RocketMQ支援的延遲級別的整數倍,那麼訊息將不會精確地在預期的延遲時間後被消費。為了解決這個問題,你可以選擇最接近的延遲級別,或者根據業務需求進行適當的調整。
建立一個用來消費延遲訊息的消費者,處理取消訂單的邏輯。例如:
@Component
@RocketMQMessageListener

(topic = 

"orderCancelTopic"

, consumerGroup = 

"order-cancel-consumer-group"

)

publicclassOrderCancelListenerimplementsRocketMQListener<Order

{

@Override
publicvoidonMessage(Order order)

{

// 取消訂單邏輯
// 檢查訂單狀態,如果訂單仍處於未支付狀態則進行取消

  System.out.println(

"Cancelling order: "

 + order.getOrderId());

// (省略實際的取消訂單邏輯)

 }

}

5. 日誌收集

訊息佇列常用於日誌系統中,將應用生成的日誌非同步地傳送到日誌處理系統,進行統一儲存和分析。
假設你有一個微服務架構,每個微服務都會生成日誌。你可以將這些日誌透過訊息佇列(如Kafka)傳送到一個集中式的日誌收集系統(如 ELK(Elasticsearch, Logstash, Kibana) 或 Fluentd),從而實現日誌的統一管理。
生產者(傳送日誌到 Kafka)
// 配置和傳送日誌到 Kafka 主題 "app-logs"

KafkaProducer<String, String> producer = 

new

 KafkaProducer<>(config);

String logMessage = 

"{\"level\": \"INFO\", \"message\": \"Application started\", \"timestamp\": \"2024-12-29T20:30:59\"}"

;

producer.send(

new

 ProducerRecord<>(

"app-logs"

"log-key"

, logMessage));

消費者(收集日誌資訊)
@Service
publicclassLogConsumer

{

// 使用 @KafkaListener 註解來消費 Kafka 中的日誌
@KafkaListener

(topics = 

"app-logs"

, groupId = 

"log-consumer-group"

)

publicvoidconsumeLog(String logMessage)

{

// 列印或處理收到的日誌

        System.out.println(

"Received log: "

 + logMessage);

    }

}

6. 分散式事務

業界經常使用MQ來實現分散式事務。
我舉個下訂單的場景,使用MQ實現分散式事務的例子吧。
我們先來看,一條普通的MQ訊息,從產生到被消費,大概流程如下:
  • 生產者產生訊息,傳送帶MQ伺服器
  • MQ收到訊息後,將訊息持久化到儲存系統。
  • MQ伺服器返回ACk到生產者。
  • MQ伺服器把訊息push給消費者
  • 消費者消費完訊息,響應ACK
  • MQ伺服器收到ACK,認為訊息消費成功,即在儲存中刪除訊息。
回到下訂單 這個例子,訂單系統建立完訂單後,再發送訊息給下游系統。如果訂單建立成功,然後訊息沒有成功傳送出去,下游系統就無法感知這個事情,出導致資料不一致。
這時候就可以使用MQ實現分散式事務訊息。大家看下這個流程:
  1. 生產者產生訊息,傳送一條半事務訊息到MQ伺服器
  2. MQ收到訊息後,將訊息持久化到儲存系統,這條訊息的狀態是待發送狀態。
  3. MQ伺服器返回ACK確認到生產者,此時MQ不會觸發訊息推送事件
  4. 生產者執行本地事務
  5. 如果本地事務執行成功,即commit執行結果到MQ伺服器;如果執行失敗,傳送rollback。
  6. 如果是正常的commit,MQ伺服器更新訊息狀態為可傳送;如果是rollback,即刪除訊息。
  7. 如果訊息狀態更新為可傳送,則MQ伺服器會push訊息給消費者。消費者消費完就回ACK。
  8. 如果MQ伺服器長時間沒有收到生產者的commit或者rollback,它會反查生產者,然後根據查詢到的結果執行最終狀態。

7. 遠端呼叫

我以前公司(微眾)基於MQ(RocketMQ),自研了遠端呼叫框架
RocketMQ 作為遠端呼叫框架,主要就是金融場景的適配性。
  • 訊息查詢功能 :RocketMQ提供了訊息查詢功能,方便微眾銀行在需要時進行訊息對賬或問題排查。
  • 金融級穩定性 :RocketMQ在金融領域的應用非常廣泛,得到了眾多金融機構的認可。其穩定性和可靠效能夠滿足微眾銀行對金融級訊息服務的需求。
還有可以基於RocketMQ的定製開發:多中心多活、灰度釋出、流量權重與訊息去重、背壓模式 (能夠根據後續服務的治理能力決定拉取的訊息數量,確保系統的穩定執行。)

8. 廣播通知:事件驅動的訊息通知

訊息佇列(MQ) 可以非常適合用於 廣播通知。在廣播通知場景中,訊息佇列可以將訊息推送到多個訂閱者,讓不同的服務或者應用接收到通知。
  • 系統通知 :向所有使用者廣播應用更新、系統維護、公告通知等。
  • 事件驅動的訊息通知 :如庫存更新、使用者狀態變化、訂單支付成功等事件通知,多個系統可以訂閱這個事件。
針對事件驅動的訊息通知,我們以 訂單支付成功 事件為例,假設多個系統(如庫存管理系統、使用者積分系統、財務系統等)都需要監聽這個事件來進行相應處理。
當訂單支付成功 事件發生時,系統會透過訊息佇列廣播一個事件通知(比如訊息內容是訂單ID、支付金額等),其他系統可以根據這個事件來執行相應的操作,如:
  • 庫存系統:根據訂單資訊減少庫存。
  • 使用者積分系統:增加使用者積分。
  • 財務系統:記錄支付流水。
傳送訂單支付成功事件:
// 建立訂單支付成功事件訊息

String orderEventData = 

"{\"orderId\": 12345, \"userId\": 67890, \"amount\": 100.0, \"event\": \"ORDER_PAYMENT_SUCCESS\"}"

;

Message msg = 

new

 Message(

"order_event_topic"

"order_payment_success"

, orderEventData.getBytes());

// 傳送訊息

producer.send(msg);

事件消費者(接收並處理訂單支付成功事件):
  • 庫存系統:
// 註冊訊息監聽器

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {

for

 (Message msg : msgs) {

                String eventData = 

new

 String(msg.getBody());

                System.out.println(

"Inventory system received: "

 + eventData);

// 處理庫存減少邏輯
// 解析訊息(假設是 JSON 格式)
// updateInventory(eventData);  // 假設呼叫庫存更新方法

            }

return

 ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        });

  • 積分系統:
// 註冊訊息監聽器

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {

for

 (Message msg : msgs) {

                String eventData = 

new

 String(msg.getBody());

                System.out.println(

"Points system received: "

 + eventData);

// 處理使用者積分增加邏輯
// updateUserPoints(eventData);  // 假設呼叫積分更新方法

            }

return

 ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        });

  • 財務系統:
// 註冊訊息監聽器

        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {

for

 (Message msg : msgs) {

                String eventData = 

new

 String(msg.getBody());

                System.out.println(

"Finance system received: "

 + eventData);

// 處理財務記錄邏輯
// recordPaymentTransaction(eventData);  // 假設呼叫財務記錄方法

            }

return

 ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        });


歡迎加入我的知識星球,全面提升技術能力。
👉 加入方式,長按”或“掃描”下方二維碼噢
星球的內容包括:專案實戰、面試招聘、原始碼解析、學習路線。
文章有幫助的話,在看,轉發吧。
謝謝支援喲 (*^__^*)

相關文章