實現非同步程式設計,我有八種方式!

👉 這是一個或許對你有用的社群
🐱 一對一交流/面試小冊/簡歷最佳化/求職解惑,歡迎加入芋道快速開發平臺知識星球。下面是星球提供的部分資料:
👉這是一個或許對你有用的開源專案
國產 Star 破 10w+ 的開源專案,前端包括管理後臺 + 微信小程式,後端支援單體和微服務架構。
功能涵蓋 RBAC 許可權、SaaS 多租戶、資料許可權、商城、支付、工作流、大屏報表、微信公眾號、CRM 等等功能:
  • Boot 倉庫:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 倉庫:https://gitee.com/zhijiantianya/yudao-cloud
  • 影片教程:https://doc.iocoder.cn
【國內首批】支援 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 雙版本 

一、🌈前言

非同步執行對於開發者來說並不陌生,在實際的開發過程中,很多場景多會使用到非同步,相比同步執行,非同步可以大大縮短請求鏈路耗時時間,比如:「傳送簡訊、郵件、非同步更新等」 ,這些都是典型的可以透過非同步實現的場景。
基於 Spring Boot + MyBatis Plus + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 影片教程:https://doc.iocoder.cn/video/

二、非同步的八種實現方式

  1. 執行緒Thread
  2. Future
  3. 非同步框架CompletableFuture
  4. Spring註解@Async
  5. Spring ApplicationEvent事件
  6. 訊息佇列
  7. 第三方非同步框架,比如Hutool的ThreadUtil
  8. Guava非同步
基於 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的後臺管理系統 + 使用者小程式,支援 RBAC 動態許可權、多租戶、資料許可權、工作流、三方登入、支付、簡訊、商城等功能
  • 專案地址:https://github.com/YunaiV/yudao-cloud
  • 影片教程:https://doc.iocoder.cn/video/

三、什麼是非同步?

首先我們先看一個常見的使用者下單的場景:

什麼是非同步
在同步操作中,我們執行到 「傳送簡訊」 的時候,我們必須等待這個方法徹底執行完才能執行 「贈送積分」 這個操作,如果 「贈送積分」 這個動作執行時間較長,傳送簡訊需要等待,這就是典型的同步場景。
實際上,傳送簡訊和贈送積分沒有任何的依賴關係,透過非同步,我們可以實現贈送積分傳送簡訊這兩個操作能夠同時進行,比如:

非同步
這就是所謂的非同步,是不是非常簡單,下面就說說非同步的幾種實現方式吧。

四、非同步程式設計

4.1 執行緒非同步

publicclassAsyncThreadextendsThread{

@Override
publicvoidrun(){
        System.out.println("Current thread name:" + Thread.currentThread().getName() + " Send email success!");
    }

publicstaticvoidmain(String[] args){
        AsyncThread asyncThread = new AsyncThread();
        asyncThread.run();
    }
}

當然如果每次都建立一個Thread執行緒,頻繁的建立、銷燬,浪費系統資源,我們可以採用執行緒池:
private ExecutorService executorService = Executors.newCachedThreadPool();

publicvoidfun(){
    executorService.submit(new Runnable() {
@Override
publicvoidrun(){
            log.info("執行業務邏輯...");
        }
    });
}

可以將業務邏輯封裝到RunnableCallable中,交由執行緒池來執行。

4.2 Future非同步

@Slf4j
publicclassFutureManager{

public String execute()throws Exception {

        ExecutorService executor = Executors.newFixedThreadPool(1);
        Future<String> future = executor.submit(new Callable<String>() {
@Override
public String call()throws Exception {

                System.out.println(" --- task start --- ");
                Thread.sleep(3000);
                System.out.println(" --- task finish ---");
return"this is future execute final result!!!";
            }
        });

//這裡需要返回值時會阻塞主執行緒
        String result = future.get();
        log.info("Future get result: {}", result);
return result;
    }

@SneakyThrows
publicstaticvoidmain(String[] args){
        FutureManager manager = new FutureManager();
        manager.execute();
    }
}

輸出結果:
 --- task start --- 
 --- task finish ---
 Future get result: this is future execute final result!!!

4.2.1 Future的不足之處

Future的不足之處的包括以下幾點:
1️⃣ 無法被動接收非同步任務的計算結果:雖然我們可以主動將非同步任務提交給執行緒池中的執行緒來執行,但是待非同步任務執行結束之後,主執行緒無法得到任務完成與否的通知,它需要透過get方法主動獲取任務執行的結果。2️⃣ Future件彼此孤立:有時某一個耗時很長的非同步任務執行結束之後,你想利用它返回的結果再做進一步的運算,該運算也會是一個非同步任務,兩者之間的關係需要程式開發人員手動進行繫結賦予,Future並不能將其形成一個任務流(pipeline),每一個Future都是彼此之間都是孤立的,所以才有了後面的CompletableFuture,CompletableFuture就可以將多個Future串聯起來形成任務流。3️⃣ Futrue沒有很好的錯誤處理機制:截止目前,如果某個非同步任務在執行發的過程中發生了異常,呼叫者無法被動感知,必須透過捕獲get方法的異常才知曉非同步任務執行是否出現了錯誤,從而在做進一步的判斷處理。

4.3 CompletableFuture實現非同步

publicclassCompletableFutureCompose{

/**
     * thenAccept子任務和父任務公用同一個執行緒
     */

@SneakyThrows
publicstaticvoidthenRunAsync(){
        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread() + " cf1 do something....");
return1;
        });
        CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {
            System.out.println(Thread.currentThread() + " cf2 do something...");
        });
//等待任務1執行完成
        System.out.println("cf1結果->" + cf1.get());
//等待任務2執行完成
        System.out.println("cf2結果->" + cf2.get());
    }

publicstaticvoidmain(String[] args){
        thenRunAsync();
    }
}

我們不需要顯式使用ExecutorService,CompletableFuture 內部使用了ForkJoinPool來處理非同步任務,如果在某些業務場景我們想自定義自己的非同步執行緒池也是可以的。

4.4 Spring的@Async非同步

4.4.1 自定義非同步執行緒池

/**
 * 執行緒池引數配置,多個執行緒池實現執行緒池隔離,@Async註解,預設使用系統自定義執行緒池,可在專案中設定多個執行緒池,在非同步呼叫的時候,指明需要呼叫的執行緒池名稱,比如:@Async("taskName")
@EnableAsync
@Configuration
public class TaskPoolConfig {
    /**
     * 自定義執行緒池
     *
     **/

@Bean("taskExecutor")
public Executor taskExecutor(){
//返回可用處理器的Java虛擬機器的數量 12
int i = Runtime.getRuntime().availableProcessors();
        System.out.println("系統最大執行緒數  : " + i);
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心執行緒池大小
        executor.setCorePoolSize(16);
//最大執行緒數
        executor.setMaxPoolSize(20);
//配置佇列容量,預設值為Integer.MAX_VALUE
        executor.setQueueCapacity(99999);
//活躍時間
        executor.setKeepAliveSeconds(60);
//執行緒名字字首
        executor.setThreadNamePrefix("asyncServiceExecutor -");
//設定此執行程式應該在關閉時阻止的最大秒數,以便在容器的其餘部分繼續關閉之前等待剩餘的任務完成他們的執行
        executor.setAwaitTerminationSeconds(60);
//等待所有的任務結束後再關閉執行緒池
        executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
    }
}

4.4.2 AsyncService

publicinterfaceAsyncService{

MessageResult sendSms(String callPrefix, String mobile, String actionType, String content);

MessageResult sendEmail(String email, String subject, String content);
}

@Slf4j
@Service
publicclassAsyncServiceImplimplementsAsyncService{

@Autowired
private IMessageHandler mesageHandler;

@Override
@Async("taskExecutor")
public MessageResult sendSms(String callPrefix, String mobile, String actionType, String content){
try {

            Thread.sleep(1000);
            mesageHandler.sendSms(callPrefix, mobile, actionType, content);

        } catch (Exception e) {
            log.error("傳送簡訊異常 -> ", e)
        }
    }

@Override
@Async("taskExecutor")
publicsendEmail(String email, String subject, String content){
try {

            Thread.sleep(1000);
            mesageHandler.sendsendEmail(email, subject, content);

        } catch (Exception e) {
            log.error("傳送email異常 -> ", e)
        }
    }
}

在實際專案中, 使用@Async呼叫執行緒池,推薦等方式是是使用自定義執行緒池的模式,不推薦直接使用@Async直接實現非同步。

4.5 Spring ApplicationEvent事件實現非同步

4.5.1 定義事件

publicclassAsyncSendEmailEventextendsApplicationEvent{

/**
     * 郵箱
     **/

private String email;

/**
     * 主題
     **/

private String subject;

/**
     * 內容
     **/

private String content;

/**
     * 接收者
     **/

private String targetUserId;

}

4.5.2 定義事件處理器

@Slf4j
@Component
publicclassAsyncSendEmailEventHandlerimplementsApplicationListener<AsyncSendEmailEvent{

@Autowired
private IMessageHandler mesageHandler;

@Async("taskExecutor")
@Override
publicvoidonApplicationEvent(AsyncSendEmailEvent event){
if (event == null) {
return;
        }

        String email = event.getEmail();
        String subject = event.getSubject();
        String content = event.getContent();
        String targetUserId = event.getTargetUserId();
        mesageHandler.sendsendEmailSms(email, subject, content, targerUserId);
      }
}

另外,可能有些時候採用ApplicationEvent實現非同步的使用,當程式出現異常錯誤的時候,需要考慮補償機制,那麼這時候可以結合Spring Retry重試來幫助我們避免這種異常造成資料不一致問題。

4.6 訊息佇列

4.6.1 回撥事件訊息生產者

@Slf4j
@Component
publicclassCallbackProducer{

@Autowired
    AmqpTemplate amqpTemplate;

publicvoidsendCallbackMessage(CallbackDTO allbackDTO, finallong delayTimes){

        log.info("生產者傳送訊息,callbackDTO,{}", callbackDTO);

        amqpTemplate.convertAndSend(CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getExchange(), CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getRoutingKey(), JsonMapper.getInstance().toJson(genseeCallbackDTO), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)throws AmqpException {
//給訊息設定延遲毫秒值,透過給訊息設定x-delay頭來設定訊息從交換機發送到佇列的延遲時間
                message.getMessageProperties().setHeader("x-delay", delayTimes);
                message.getMessageProperties().setCorrelationId(callbackDTO.getSdkId());
return message;
            }
        });
    }
}

4.6.2 回撥事件訊息消費者

@Slf4j
@Component
@RabbitListener(queues = "message.callback", containerFactory = "rabbitListenerContainerFactory")
publicclassCallbackConsumer{

@Autowired
private IGlobalUserService globalUserService;

@RabbitHandler
publicvoidhandle(String json, Channel channel, @Headers Map<String, Object> map)throws Exception {

if (map.get("error") != null) {
//否認訊息
            channel.basicNack((Long) map.get(AmqpHeaders.DELIVERY_TAG), falsetrue);
return;
        }

try {

            CallbackDTO callbackDTO = JsonMapper.getInstance().fromJson(json, CallbackDTO.class);
//執行業務邏輯
            globalUserService.execute(callbackDTO);
//訊息訊息成功手動確認,對應訊息確認模式acknowledge-mode: manual
            channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);

        } catch (Exception e) {
            log.error("回撥失敗 -> {}", e);
        }
    }
}

4.7 ThreadUtil非同步工具類

@Slf4j
publicclassThreadUtils{

publicstaticvoidmain(String[] args){
for (int i = 0; i < 3; i++) {
            ThreadUtil.execAsync(() -> {
                ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
int number = threadLocalRandom.nextInt(20) + 1;
                System.out.println(number);
            });
            log.info("當前第:" + i + "個執行緒");
        }

        log.info("task finish!");
    }
}

4.8 Guava非同步

GuavaListenableFuture顧名思義就是可以監聽的Future,是對java原生Future的擴充套件增強。我們知道Future表示一個非同步計算任務,當任務完成時可以得到計算結果。如果我們希望一旦計算完成就拿到結果展示給使用者或者做另外的計算,就必須使用另一個執行緒不斷的查詢計算狀態。這樣做,程式碼複雜,而且效率低下。使用「Guava ListenableFuture」 可以幫我們檢測Future是否完成了,不需要再透過get()方法苦苦等待非同步的計算結果,如果完成就自動呼叫回撥函式,這樣可以減少併發程式的複雜度。
ListenableFuture是一個介面,它從jdkFuture介面繼承,添加了void addListener(Runnable listener, Executor executor)方法。
我們看下如何使用ListenableFuture。首先需要定義ListenableFuture的例項:
 ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
final ListenableFuture<Integer> listenableFuture = executorService.submit(new Callable<Integer>() {
@Override
public Integer call()throws Exception {
                log.info("callable execute...")
                TimeUnit.SECONDS.sleep(1);
return1;
            }
        });
首先透過MoreExecutors類的靜態方法listeningDecorator方法初始化一個ListeningExecutorService的方法,然後使用此例項的submit方法即可初始化ListenableFuture物件。
ListenableFuture要做的工作,在Callable介面的實現類中定義,這裡只是休眠了1秒鐘然後返回一個數字1,有了ListenableFuture例項,可以執行此Future並執行Future完成之後的回撥函式。
 Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
@Override
publicvoidonSuccess(Integer result){
//成功執行...
        System.out.println("Get listenable future's result with callback " + result);
    }

@Override
publicvoidonFailure(Throwable t){
//異常情況處理...
        t.printStackTrace();
    }
});

那麼,以上就是本期介紹的實現非同步的8種方式了。

歡迎加入我的知識星球,全面提升技術能力。
👉 加入方式,長按”或“掃描”下方二維碼噢
星球的內容包括:專案實戰、面試招聘、原始碼解析、學習路線。

文章有幫助的話,在看,轉發吧。
謝謝支援喲 (*^__^*)

相關文章