👉 這是一個或許對你有用的社群
《專案實戰(影片)》:從書中學,往事上“練” 《網際網路高頻面試題》:面朝簡歷學習,春暖花開 《架構 x 系統設計》:摧枯拉朽,掌控面試高頻場景題 《精進 Java 學習指南》:系統學習,網際網路主流技術棧 《必讀 Java 原始碼專欄》:知其然,知其所以然

👉這是一個或許對你有用的開源專案國產 Star 破 10w+ 的開源專案,前端包括管理後臺 + 微信小程式,後端支援單體和微服務架構。功能涵蓋 RBAC 許可權、SaaS 多租戶、資料許可權、商城、支付、工作流、大屏報表、微信公眾號、ERP、CRM、AI 大模型等等功能:
Boot 多模組架構:https://gitee.com/zhijiantianya/ruoyi-vue-pro Cloud 微服務架構:https://gitee.com/zhijiantianya/yudao-cloud 影片教程:https://doc.iocoder.cn 【國內首批】支援 JDK 17/21 + SpringBoot 3.3、JDK 8/11 + Spring Boot 2.7 雙版本
前言

基於 Spring Boot + MyBatis Plus + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
專案地址:https://github.com/YunaiV/ruoyi-vue-pro 影片教程:https://doc.iocoder.cn/video/
1. 非同步處理
比如,在使用者註冊場景中,當用戶資訊儲存成功後,系統需要傳送一個簡訊、或者郵箱訊息,通知使用者註冊成功。如果這個簡訊或者郵箱訊息傳送比較耗時 ,則可能拖垮註冊介面 。又或者如果呼叫第三方簡訊、郵件傳送介面失敗,也會影響註冊介面。一般我們不希望一個通知類的功能,去影響註冊主流程,這時候,則可以使用MQ來實現非同步處理 。
// 使用者註冊方法
publicvoidregisterUser(String username, String email, String phoneNumber)
{
// 儲存使用者資訊(簡化版)
userService.add(buildUser(username,email,phoneNumber))
// 傳送訊息
String registrationMessage =
"User "
+ username +
" has registered successfully."
;
// 傳送訊息到佇列
rabbitTemplate.convertAndSend(
"registrationQueue"
, registrationMessage);
}
@Service
publicclassNotificationService
{
// 監聽訊息佇列中的訊息併發送簡訊/郵件
@RabbitListener
(queues =
"registrationQueue"
)
publicvoidhandleRegistrationNotification(String message)
{
// 這裡可以進行簡訊或郵件的傳送操作
System.out.println(
"Sending registration notification: "
+ message);
// 假設這裡是傳送簡訊的操作
sendSms(message);
// 也可以做其他通知(比如發郵件等)
sendEmail(message);
}
}
基於 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
專案地址:https://github.com/YunaiV/yudao-cloud 影片教程:https://doc.iocoder.cn/video/
2. 解耦

一個電商平臺的庫存服務和支付服務。支付服務在處理支付後,需要向庫存服務傳送扣庫存的請求,但不直接呼叫 API,而是透過 MQ 傳送訊息,讓庫存服務非同步處理。
import
org.apache.rocketmq.client.producer.DefaultMQProducer;
import
org.apache.rocketmq.common.message.Message;
publicclassPaymentService
{
private
DefaultMQProducer producer;
publicPaymentService()throws Exception
{
producer =
new
DefaultMQProducer(
"PaymentProducerGroup"
);
producer.setNamesrvAddr(
"localhost:9876"
);
// RocketMQ NameServer 地址
producer.start();
}
publicvoidprocessPayment(String orderId, int quantity)throws Exception
{
// 1. 模擬呼叫支付介面(例如:支付寶、微信支付等)
boolean
paymentSuccessful = callPayment(orderId, quantity);
if
(paymentSuccessful) {
// 2. 支付成功後,建立支付訊息併發送到 RocketMQ
String messageBody =
"OrderId: "
+ orderId +
", Quantity: "
+ quantity;
Message message =
new
Message(
"paymentTopic"
,
"paymentTag"
, messageBody.getBytes());
producer.send(message);
}
}
}
publicclassInventoryService
{
private
DefaultMQPushConsumer consumer;
publicInventoryService()throws Exception
{
consumer =
new
DefaultMQPushConsumer(
"InventoryConsumerGroup"
);
consumer.setNamesrvAddr(
"localhost:9876"
);
consumer.subscribe(
"paymentTopic"
,
"paymentTag"
);
// 訊息監聽器
consumer.registerMessageListener((msgs, context) -> {
for
(MessageExt msg : msgs) {
String messageBody =
new
String(msg.getBody());
// 執行扣庫存操作
reduceStock(messageBody);
}
returnnull
;
// 返回消費成功
});
consumer.start();
System.out.println(
"InventoryService started..."
);
}
}
3. 流量削鋒
-
春運快到了,12306的搶票就是這種案例。 -
又或者雙12這種大促,訂單壓力會比較大。 -
秒殺的時候,也需要避免流量暴漲,打垮應用系統的風險

4. 延時任務
在電商平臺的訂單處理中,如果使用者下單後一定時間內未支付,需要自動取消訂單。透過MQ的延時佇列功能 ,可以設定訊息延遲消費的時間,當訊息到達延遲時間後,由消費者處理取消訂單的邏輯。
@Service
publicclassOrderService
{
@Autowired
private
RocketMQTemplate rocketMQTemplate;
publicvoidcreateOrder(Order order)
{
// 儲存訂單邏輯(省略)
// 計算延遲時間(單位:毫秒)
long
delay = order.getTimeout();
// 傳送延遲訊息
rocketMQTemplate.syncSend(
"orderCancelTopic:delay"
+ delay,
MessageBuilder.withPayload(order).build(),
10000
,
// 訊息傳送超時時間(單位:毫秒)
(
int
) (delay /
1000
)
// RocketMQ的延遲級別是以秒為單位的,因此需要轉換為秒
);
}
}
注意:RocketMQ的延遲級別是固定的,如1s、5s、10s等。如果訂單的延遲時間不是RocketMQ支援的延遲級別的整數倍,那麼訊息將不會精確地在預期的延遲時間後被消費。為了解決這個問題,你可以選擇最接近的延遲級別,或者根據業務需求進行適當的調整。
@Component
@RocketMQMessageListener
(topic =
"orderCancelTopic"
, consumerGroup =
"order-cancel-consumer-group"
)
publicclassOrderCancelListenerimplementsRocketMQListener<Order>
{
@Override
publicvoidonMessage(Order order)
{
// 取消訂單邏輯
// 檢查訂單狀態,如果訂單仍處於未支付狀態則進行取消
System.out.println(
"Cancelling order: "
+ order.getOrderId());
// (省略實際的取消訂單邏輯)
}
}
5. 日誌收集
// 配置和傳送日誌到 Kafka 主題 "app-logs"
KafkaProducer<String, String> producer =
new
KafkaProducer<>(config);
String logMessage =
"{\"level\": \"INFO\", \"message\": \"Application started\", \"timestamp\": \"2024-12-29T20:30:59\"}"
;
producer.send(
new
ProducerRecord<>(
"app-logs"
,
"log-key"
, logMessage));
@Service
publicclassLogConsumer
{
// 使用 @KafkaListener 註解來消費 Kafka 中的日誌
@KafkaListener
(topics =
"app-logs"
, groupId =
"log-consumer-group"
)
publicvoidconsumeLog(String logMessage)
{
// 列印或處理收到的日誌
System.out.println(
"Received log: "
+ logMessage);
}
}
6. 分散式事務
MQ
來實現分散式事務。
-
生產者產生訊息,傳送帶MQ伺服器 -
MQ收到訊息後,將訊息持久化到儲存系統。 -
MQ伺服器返回ACk到生產者。 -
MQ伺服器把訊息push給消費者 -
消費者消費完訊息,響應ACK -
MQ伺服器收到ACK,認為訊息消費成功,即在儲存中刪除訊息。

-
生產者產生訊息,傳送一條半事務訊息到MQ伺服器 -
MQ收到訊息後,將訊息持久化到儲存系統,這條訊息的狀態是待發送狀態。 -
MQ伺服器返回ACK確認到生產者,此時MQ不會觸發訊息推送事件 -
生產者執行本地事務 -
如果本地事務執行成功,即commit執行結果到MQ伺服器;如果執行失敗,傳送rollback。 -
如果是正常的commit,MQ伺服器更新訊息狀態為可傳送;如果是rollback,即刪除訊息。 -
如果訊息狀態更新為可傳送,則MQ伺服器會push訊息給消費者。消費者消費完就回ACK。 -
如果MQ伺服器長時間沒有收到生產者的commit或者rollback,它會反查生產者,然後根據查詢到的結果執行最終狀態。
7. 遠端呼叫
-
訊息查詢功能 :RocketMQ提供了訊息查詢功能,方便微眾銀行在需要時進行訊息對賬或問題排查。 -
金融級穩定性 :RocketMQ在金融領域的應用非常廣泛,得到了眾多金融機構的認可。其穩定性和可靠效能夠滿足微眾銀行對金融級訊息服務的需求。
8. 廣播通知:事件驅動的訊息通知
系統通知 :向所有使用者廣播應用更新、系統維護、公告通知等。 事件驅動的訊息通知 :如庫存更新、使用者狀態變化、訂單支付成功等事件通知,多個系統可以訂閱這個事件。

-
庫存系統:根據訂單資訊減少庫存。 -
使用者積分系統:增加使用者積分。 -
財務系統:記錄支付流水。
// 建立訂單支付成功事件訊息
String orderEventData =
"{\"orderId\": 12345, \"userId\": 67890, \"amount\": 100.0, \"event\": \"ORDER_PAYMENT_SUCCESS\"}"
;
Message msg =
new
Message(
"order_event_topic"
,
"order_payment_success"
, orderEventData.getBytes());
// 傳送訊息
producer.send(msg);
-
庫存系統:
// 註冊訊息監聽器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for
(Message msg : msgs) {
String eventData =
new
String(msg.getBody());
System.out.println(
"Inventory system received: "
+ eventData);
// 處理庫存減少邏輯
// 解析訊息(假設是 JSON 格式)
// updateInventory(eventData); // 假設呼叫庫存更新方法
}
return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
-
積分系統:
// 註冊訊息監聽器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for
(Message msg : msgs) {
String eventData =
new
String(msg.getBody());
System.out.println(
"Points system received: "
+ eventData);
// 處理使用者積分增加邏輯
// updateUserPoints(eventData); // 假設呼叫積分更新方法
}
return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
-
財務系統:
// 註冊訊息監聽器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for
(Message msg : msgs) {
String eventData =
new
String(msg.getBody());
System.out.println(
"Finance system received: "
+ eventData);
// 處理財務記錄邏輯
// recordPaymentTransaction(eventData); // 假設呼叫財務記錄方法
}
return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });





