Location>code7788 >text

Asynchronous programming——Detailed explanation of CompletableFuture

Popularity:336 ℃/2025-03-06 22:28:56

Future

JDK5 has added a Future interface to describe the results of an asynchronous calculation.

Although Future and related usage methods provide the ability to execute tasks asynchronously, it is very inconvenient to obtain results. We must use() to block the calling thread, or use polling to determine whether the task is over and then obtain the result.

Moreover, Future cannot solve the situation where multiple asynchronous tasks depend on each other. To put it simply, the main thread needs to wait for the child thread task to be executed after it is executed. At this time, you may think of "CountDownLatch". That's right, it can be solved. The code is as follows.

Two Futures are defined here. The first one obtains user information through user id, and the second one obtains product information through product id.

public void testCountDownLatch() throws InterruptedException, ExecutionException {
     ExecutorService executorService = (5);

     CountDownLatch downLatch = new CountDownLatch(2);

     long startTime = ();
     Future<String> userFuture = (() -> {
         //Simulated product query takes 500 milliseconds
         (500);
         ();
         return "User A";
     });

     Future<String> goodsFuture = (() -> {
         //Simulated product query takes 500 milliseconds
         (400);
         ();
         return "Product A";
     });

     ();
     //Simulation of the main program takes time
     (600);
     ("Get user information:" + ());
     ("Get product information:" + ());
     ("Total Time" + (() - startTime) + "ms");

 }

In Java8, this is no longer an elegant solution. Let’s learn about the use of CompleteFuture.

CompletableFuture

@Test
 public void testCompletableInfo() throws InterruptedException, ExecutionException {
     long startTime = ();

     //Calling user service to obtain user basic information
     CompleteFuture<String> userFuture = (() ->
             //Simulated product query takes 500 milliseconds
     {
         try {
             (500);
         } catch (InterruptedException e) {
             ();
         }
         return "User A";
     });

     //Call product services to obtain basic product information
     CompleteFuture<String> goodsFuture = (() ->
             //Simulated product query takes 500 milliseconds
     {
         try {
             (400);
         } catch (InterruptedException e) {
             ();
         }
         return "Product A";
     });

     ("Get user information:" + ());
     ("Get product information:" + ());

     //Simulation of the main program takes time
     (600);
     ("Total Time" + (() - startTime) + "ms");
 }

CompleteFuture creation method

「supplyAsync」Execute tasks and support return values.
「runAsync」Execute task, no return value.
Parameters If the thread pool is passed, a custom thread pool will be used. If there is no thread pool, the default built-in thread pool () will be used to build and execute tasks according to the supplier. (Notice:The default built-in thread pool core number is reduced by one. If the number of machine cores is 2 hours, a new thread will be created to run the task. It is recommended to use a custom thread pool in high concurrency scenarios.

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}

CompleteFuture acquisition method

//Method One
 public T get()

 //Method 2

 public T get(long timeout, TimeUnit unit)

 //Method Three

 public T getNow(T valueIfAbsent)

 //Method 4

 public T join()

illustrate:

"get() and get(long timeout, TimeUnit unit)" => It is already provided in Future, which provides timeout processing. If the result is not retrieved within the specified time, a timeout exception will be thrown
"getNow" => Get the result immediately without blocking. The result calculation has been completed and will return the result or an exception during the calculation process. If the calculation is not completed, the set valueIfAbsent value will be returned.
"join" => If there is an exception in the method, the exception will not be thrown, but the CompletionException will be thrown.

Asynchronous callback method

1、thenRun/thenRunAsync
In simple terms, "After completing the first task, then doing the second task, and the second task has no return value."

【Async】If you add, the first task uses the thread pool you passed in by yourself, the second task uses the ForkJoin thread pool. If you don’t add, the second thread pool also uses the thread pool you passed in.

2、thenAccept/thenAcceptAsync

After the first task is executed, the second callback method task will be executed, and the execution result of the task will be passed as an entry parameter to the callback method, but the callback method has no return value.

3、thenApply/thenApplyAsync

It means that after the first task is executed, the second callback method task will be executed. The execution result of the task will be passed as an entry parameter to the callback method, and the callback method has a return value.

Exception callback

whenComplete + exceptionally

public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {
     CompleteFuture<Double> future = (() -> {
         if (() < 0.5) {
             throw new RuntimeException("Error");
         }
         ("Normal End");
         return 0.11;

     }).whenComplete((aDouble, throwable) -> {
         if (aDouble == null) {
             ("whenComplete aDouble is null");
         } else {
             ("whenComplete aDouble is " + aDouble);
         }
         if (throwable == null) {
             ("whenComplete throwable is null");
         } else {
             ("whenComplete throwable is " + ());
         }
     }).exceptionally((throwable) -> {
         ("exceptionally:" + ());
         return 0.0;
     });

     ("The final result returned = " + ());
 }

When an exception occurs, the exceptionally will be caught in exceptionally, giving the default return value of 0.0.

And the callback function "whenComplete":

"Normal completion": whenComplete returns the result consistent with the previous task, and the exception is null;
"Exception occurs": whenComplete returns the result as null, and the exception is an exception of the superior task;

result:

whenComplete aDouble is null
 WhenComplete throwable is: Something went wrong
 Exceptionally:: An error occurred
 The final result returned = 0.0

Note

1. Future needs to obtain the return value to obtain exception information

Future requires a return value to obtain exception information. If the get()/join() method is not added, the exception information cannot be seen. If you want to get it, consider whether to add try...catch... or use exceptionally method.

2. The get() method of CompletableFuture is blocking

The get() method of CompleteFuture is blocking. If you use it to get the return value of the asynchronous call, you need to add a timeout.

3. It is not recommended to use the default thread pool

The default "ForkJoin thread pool" is used in the CompletableFuture code, and the number of threads processed is the computer "CPU core number -1". When a large number of requests come over, if the processing logic is complicated, the response will be very slow. It is generally recommended to use a custom thread pool to optimize thread pool configuration parameters.

4. When customizing thread pool, pay attention to rejection policies

If the thread pool rejection policy is DiscardPolicy or DiscardOldestPolicy, when the thread pool is saturated, the task will be discarded directly and the exception will not be discarded. Therefore, it is recommended that the CompletableFuture thread pool policy is best used with AbortPolicy (throws the execution exception) or CallerRunsPolicy (let the main thread execute).

Examples of using business code

Util tool class

public class CompletableFutureUtil {

    private CompletableFutureUtil(){}

    public static <R> CompletableFuture<R>  executeWithFallbackAndContextPropagation(@Nonnull Supplier<R> normalFunction,
                                                 @Nonnull Supplier<R> exceptionFunction,
                                                 @Nonnull ThreadPoolTaskExecutor taskExecutor,
                                                 @Nonnull String exceptionMsg){
        Thread mainThread = ();
        return CompletableFuture
                .supplyAsync(normalFunction,taskExecutor)
                .exceptionally(e -> {
                    (exceptionMsg, e);
                    return ();
                })
                .whenComplete((data,e)->{
                    if(!(())){
                        ();
                    }
                });
    }
    
}

Create task code using Util

private CompletableFuture<Boolean> asyncQueryCommentPic(ProductDetailInfoNewDto detailInfoDto, ProductInfoQueryDTO productInfoQuery) {
         ThreadPoolTaskExecutor taskExecutor = (BIZ_THREAD_POOL_NAME);
         // Degrade when the thread pool cannot be obtained
         if (taskExecutor == null) {
             ();
             return null;
         }
         Return (
                 () -> queryShowPrimaryPic(detailInfoDto, productInfoQuery),
                 () -> ,
                 taskExecutor,
                 "Async task execution exception");
     }

Get task result code

private void handShowPrimaryPic(ProductDetailInfoNewDto detailInfoDto, CompletableFuture<Boolean> commentPicFuture) {
         ();
         if (commentPicFuture != null) {
             try {
                 Boolean showPrimaryPic = (asyncGetCommentPrimaryPicTimeout, );
                 (showPrimaryPic);
             } catch (Exception e) {
                 ("Task waiting for result exception: future={}", (commentPicFuture), e);
             }
         }
     }