-
《專案實戰(影片)》:從書中學,往事上“練” -
《網際網路高頻面試題》:面朝簡歷學習,春暖花開 -
《架構 x 系統設計》:摧枯拉朽,掌控面試高頻場景題 -
《精進 Java 學習指南》:系統學習,網際網路主流技術棧 -
《必讀 Java 原始碼專欄》:知其然,知其所以然

-
Boot 倉庫:https://gitee.com/zhijiantianya/ruoyi-vue-pro -
Cloud 倉庫:https://gitee.com/zhijiantianya/yudao-cloud -
影片教程:https://doc.iocoder.cn
一、🌈前言
-
專案地址:https://github.com/YunaiV/ruoyi-vue-pro -
影片教程:https://doc.iocoder.cn/video/
二、非同步的八種實現方式
-
執行緒Thread -
Future -
非同步框架CompletableFuture -
Spring註解@Async -
Spring ApplicationEvent事件 -
訊息佇列 -
第三方非同步框架,比如Hutool的ThreadUtil -
Guava非同步
-
專案地址:https://github.com/YunaiV/yudao-cloud -
影片教程:https://doc.iocoder.cn/video/
三、什麼是非同步?

贈送積分
和傳送簡訊
這兩個操作能夠同時進行,比如:
四、非同步程式設計
4.1 執行緒非同步
publicclassAsyncThreadextendsThread{
@Override
publicvoidrun(){
System.out.println("Current thread name:" + Thread.currentThread().getName() + " Send email success!");
}
publicstaticvoidmain(String[] args){
AsyncThread asyncThread = new AsyncThread();
asyncThread.run();
}
}
Thread
執行緒,頻繁的建立、銷燬,浪費系統資源,我們可以採用執行緒池:private ExecutorService executorService = Executors.newCachedThreadPool();
publicvoidfun(){
executorService.submit(new Runnable() {
@Override
publicvoidrun(){
log.info("執行業務邏輯...");
}
});
}
Runnable
或Callable
中,交由執行緒池來執行。4.2 Future非同步
@Slf4j
publicclassFutureManager{
public String execute()throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call()throws Exception {
System.out.println(" --- task start --- ");
Thread.sleep(3000);
System.out.println(" --- task finish ---");
return"this is future execute final result!!!";
}
});
//這裡需要返回值時會阻塞主執行緒
String result = future.get();
log.info("Future get result: {}", result);
return result;
}
@SneakyThrows
publicstaticvoidmain(String[] args){
FutureManager manager = new FutureManager();
manager.execute();
}
}
--- task start ---
--- task finish ---
Future get result: this is future execute final result!!!
4.2.1 Future的不足之處
4.3 CompletableFuture實現非同步
publicclassCompletableFutureCompose{
/**
* thenAccept子任務和父任務公用同一個執行緒
*/
@SneakyThrows
publicstaticvoidthenRunAsync(){
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread() + " cf1 do something....");
return1;
});
CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
System.out.println(Thread.currentThread() + " cf2 do something...");
});
//等待任務1執行完成
System.out.println("cf1結果->" + cf1.get());
//等待任務2執行完成
System.out.println("cf2結果->" + cf2.get());
}
publicstaticvoidmain(String[] args){
thenRunAsync();
}
}
ForkJoinPool
來處理非同步任務,如果在某些業務場景我們想自定義自己的非同步執行緒池也是可以的。4.4 Spring的@Async非同步
4.4.1 自定義非同步執行緒池
/**
* 執行緒池引數配置,多個執行緒池實現執行緒池隔離,@Async註解,預設使用系統自定義執行緒池,可在專案中設定多個執行緒池,在非同步呼叫的時候,指明需要呼叫的執行緒池名稱,比如:@Async("taskName")
@EnableAsync
@Configuration
public class TaskPoolConfig {
/**
* 自定義執行緒池
*
**/
@Bean("taskExecutor")
public Executor taskExecutor(){
//返回可用處理器的Java虛擬機器的數量 12
int i = Runtime.getRuntime().availableProcessors();
System.out.println("系統最大執行緒數 : " + i);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心執行緒池大小
executor.setCorePoolSize(16);
//最大執行緒數
executor.setMaxPoolSize(20);
//配置佇列容量,預設值為Integer.MAX_VALUE
executor.setQueueCapacity(99999);
//活躍時間
executor.setKeepAliveSeconds(60);
//執行緒名字字首
executor.setThreadNamePrefix("asyncServiceExecutor -");
//設定此執行程式應該在關閉時阻止的最大秒數,以便在容器的其餘部分繼續關閉之前等待剩餘的任務完成他們的執行
executor.setAwaitTerminationSeconds(60);
//等待所有的任務結束後再關閉執行緒池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
4.4.2 AsyncService
publicinterfaceAsyncService{
MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);
MessageResult sendEmail(String email, String subject, String content);
}
@Slf4j
@Service
publicclassAsyncServiceImplimplementsAsyncService{
@Autowired
private IMessageHandler mesageHandler;
@Override
@Async("taskExecutor")
public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content){
try {
Thread.sleep(1000);
mesageHandler.sendSms(callPrefix, mobile, actionType, content);
} catch (Exception e) {
log.error("傳送簡訊異常 -> ", e)
}
}
@Override
@Async("taskExecutor")
publicsendEmail(String email, String subject, String content){
try {
Thread.sleep(1000);
mesageHandler.sendsendEmail(email, subject, content);
} catch (Exception e) {
log.error("傳送email異常 -> ", e)
}
}
}
@Async
呼叫執行緒池,推薦等方式是是使用自定義執行緒池的模式,不推薦直接使用@Async直接實現非同步。4.5 Spring ApplicationEvent事件實現非同步
4.5.1 定義事件
publicclassAsyncSendEmailEventextendsApplicationEvent{
/**
* 郵箱
**/
private String email;
/**
* 主題
**/
private String subject;
/**
* 內容
**/
private String content;
/**
* 接收者
**/
private String targetUserId;
}
4.5.2 定義事件處理器
@Slf4j
@Component
publicclassAsyncSendEmailEventHandlerimplementsApplicationListener<AsyncSendEmailEvent> {
@Autowired
private IMessageHandler mesageHandler;
@Async("taskExecutor")
@Override
publicvoidonApplicationEvent(AsyncSendEmailEvent event){
if (event == null) {
return;
}
String email = event.getEmail();
String subject = event.getSubject();
String content = event.getContent();
String targetUserId = event.getTargetUserId();
mesageHandler.sendsendEmailSms(email, subject, content, targerUserId);
}
}
4.6 訊息佇列
4.6.1 回撥事件訊息生產者
@Slf4j
@Component
publicclassCallbackProducer{
@Autowired
AmqpTemplate amqpTemplate;
publicvoidsendCallbackMessage(CallbackDTO allbackDTO, finallong delayTimes){
log.info("生產者傳送訊息,callbackDTO,{}", callbackDTO);
amqpTemplate.convertAndSend(CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getExchange(), CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getRoutingKey(), JsonMapper.getInstance().toJson(genseeCallbackDTO), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)throws AmqpException {
//給訊息設定延遲毫秒值,透過給訊息設定x-delay頭來設定訊息從交換機發送到佇列的延遲時間
message.getMessageProperties().setHeader("x-delay", delayTimes);
message.getMessageProperties().setCorrelationId(callbackDTO.getSdkId());
return message;
}
});
}
}
4.6.2 回撥事件訊息消費者
@Slf4j
@Component
@RabbitListener(queues = "message.callback", containerFactory = "rabbitListenerContainerFactory")
publicclassCallbackConsumer{
@Autowired
private IGlobalUserService globalUserService;
@RabbitHandler
publicvoidhandle(String json, Channel channel, @Headers Map<String, Object> map)throws Exception {
if (map.get("error") != null) {
//否認訊息
channel.basicNack((Long) map.get(AmqpHeaders.DELIVERY_TAG), false, true);
return;
}
try {
CallbackDTO callbackDTO = JsonMapper.getInstance().fromJson(json, CallbackDTO.class);
//執行業務邏輯
globalUserService.execute(callbackDTO);
//訊息訊息成功手動確認,對應訊息確認模式acknowledge-mode: manual
channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);
} catch (Exception e) {
log.error("回撥失敗 -> {}", e);
}
}
}
4.7 ThreadUtil非同步工具類
@Slf4j
publicclassThreadUtils{
publicstaticvoidmain(String[] args){
for (int i = 0; i < 3; i++) {
ThreadUtil.execAsync(() -> {
ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
int number = threadLocalRandom.nextInt(20) + 1;
System.out.println(number);
});
log.info("當前第:" + i + "個執行緒");
}
log.info("task finish!");
}
}
4.8 Guava非同步
Guava
的ListenableFuture
顧名思義就是可以監聽的Future
,是對java原生Future的擴充套件增強。我們知道Future表示一個非同步計算任務,當任務完成時可以得到計算結果。如果我們希望一旦計算完成就拿到結果展示給使用者或者做另外的計算,就必須使用另一個執行緒不斷的查詢計算狀態。這樣做,程式碼複雜,而且效率低下。使用「Guava ListenableFuture」 可以幫我們檢測Future是否完成了,不需要再透過get()方法苦苦等待非同步的計算結果,如果完成就自動呼叫回撥函式,這樣可以減少併發程式的複雜度。ListenableFuture
是一個介面,它從jdk
的Future
介面繼承,添加了void addListener(Runnable listener, Executor executor)
方法。 ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
final ListenableFuture<Integer> listenableFuture = executorService.submit(new Callable<Integer>() {
@Override
public Integer call()throws Exception {
log.info("callable execute...")
TimeUnit.SECONDS.sleep(1);
return1;
}
});
MoreExecutors
類的靜態方法listeningDecorator
方法初始化一個ListeningExecutorService
的方法,然後使用此例項的submit
方法即可初始化ListenableFuture
物件。ListenableFuture
要做的工作,在Callable介面的實現類中定義,這裡只是休眠了1秒鐘然後返回一個數字1,有了ListenableFuture例項,可以執行此Future並執行Future完成之後的回撥函式。 Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
@Override
publicvoidonSuccess(Integer result){
//成功執行...
System.out.println("Get listenable future's result with callback " + result);
}
@Override
publicvoidonFailure(Throwable t){
//異常情況處理...
t.printStackTrace();
}
});





