Location>code7788 >text

[Asynchronous Programming Practice] How to implement the timeout function (taking CompletableFuture as an example)

Popularity:167 ℃/2025-01-26 22:19:27

[Asynchronous Programming Practice] How to implement the timeout function (taking CompletableFuture as an example)

Due to various problems such as network fluctuations or connecting nodes, the implementation of most internet asynchronous tasks usually performs timeout limits, which is a common problem in asynchronous programming. This article mainly discusses the basic idea of ​​implementing overtime functions and how the CompletableFuture (later referred to as CF) to achieve overtime function through code.

Basic idea

  1. Two tasks, two threads: original task, timeout task
  2. The original task is performed normally.
  3. The original task is canceled at timeout, and the writing result is abnormal timeout or the default value
  4. Guarantee the atomicity of the results under the competition conditions and write only once

Implementation of CompletableFuture

1. Basic implementation process

// New timeout method in JDK9
 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
     if (unit == null)
         throw new NullPointerException();
     if (result == null)
         whenComplete(new Canceller((new Timeout(this),
                                                  timeout, unit)));
     return this;
 }

 // CF inner class
     static final class Timeout implements Runnable {
         final CompletableFuture<?> f;
         Timeout(CompletableFuture<?> f) { = f; }
         public void run() {
             if (f != null && !())
                 (new TimeoutException());
         }
     }

The analysis code learns that the where time of the Whencomplete method adds a normal ending callback and cancel the timeout task.

A timeout task is created, and the Timeout::run method is executed when the timeout occurs, that is, the written result is a TimeoutException.

Let’s take a look at the specific implementation of Dalayer:

/**
  * Singleton delay scheduler, used only for starting and
  * canceling tasks.
  */
 static final class Delayer {
     static ScheduledFuture<?> delay(Runnable command, long delay,
                                     TimeUnit unit) {
         return (command, delay, unit);
     }

     static final class DaemonThreadFactory implements ThreadFactory {
         public Thread newThread(Runnable r) {
             Thread t = new Thread(r);
             // The daemon thread, when the main thread closes, it also closes itself
             (true);
             ("CompletableFutureDelayScheduler");
             return t;
         }
     }

     static final ScheduledThreadPoolExecutor delayer;
     static {
         (delayer = new ScheduledThreadPoolExecutor(
             1, new DaemonThreadFactory())).
             setRemoveOnCancelPolicy(true);
     }
 }

Delayer is a single -example object that is used to perform delay tasks and reduce memory occupation. Scheduledthreadpoolexecutor is configured as a single thread, settledremoveOnCancelPolicy, indicating that when a delayed task is canceled, the task is deleted from the delayed queue. The delay queue here is implemented for the default executor:

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue(), threadFactory);
}

ScheduledThreadPoolexecutor underlying use delay queueDelayedWorkQueueThe delayed queue layer depends on the index priority queue, and the time complexity of the deleting operation is O (LOGN).

Let's take a look at the specific implementation of Canceller:

static final class Canceller implements BiConsumer<Object, Throwable> {
    final Future<?> f;
    Canceller(Future<?> f) {  = f; }
    public void accept(Object ignore, Throwable ex) {
        if (f != null && !())
            (false);
    }
}

Canceller is actually a callback function. After the original task is completed, the relevant timeout task will be canceled.

2. Static condition analysis

The following is the implementation code fragment of CF:

//Out time end
         if (f! = Null &&! ())
             (New Timeoutexception ());
         // Cancel the task
         if (f! = Null &&! ())
             (FALSE);
 // The writing of the original task of CF cannot be controlled by the Ortimeout method. The following is an example
 (1000);
 (u);

The check for CF cannot actually guarantee atomicity, because this check-recalculation mode requires the protection of synchronized blocks, and the underlying layer of CF does not have this implementation. Therefore, if the if statement checks that the task is not completed, when the code is executed later, the task may have been completed. However, this check also has certain benefits, because CF guarantees that after the result is written, the isDone method must be true, thereby avoiding unnecessary code execution.

completeExceptionallyMethod andcompleteMethods may be executed at the same time. CF operates the atomicity of the results through CAS operations.

//Exception result implementation
 final boolean internalComplete(Object r) { // CAS from null to r
     return (this, null, r);
 }
 //Normal result implementation
 final boolean completeValue(T t) {
     return (this, null, (t == null) ? NIL : t);
 }

 public boolean isDone() {
     return result != null;
 }

3. Memory leak bug

In the CF implementation before JDK21, there is a memory leak bug. For detailed description, see/browse/jdk-8303742, the author is currently onlyThe code was found to be fixed in JDK21 (non-LTS versions are not considered). As a bug, this issue may be fixed in subsequent JDK subversions.

This bug is in the following code:

// Cancel the task, the realization of the previous realization of JDK21 will check the abnormal results
 if (ex == NULL && F! = NULL &&! ())
     (FALSE);

When the normal task is abnormal, the tasks in the delay queue will not be canceled, which will eventually lead to memory leakage. If there are multiple long timeout CF tasks in the project, memory leakage will be more obvious.

public class LeakDemo {
    public static void main(String[] args) {
        while (true) {
            new CompletableFuture<>().orTimeout(1, ).completeExceptionally(new Exception());
        }
    }
}

Executing the above code will report an OOM error. You can test it in your own programming environment.

4. How to implement timeout tasks in JDK8

CompletableFuture in JDK8 does not support timeout tasks. I recommend using itCFFU class libraryIt is the enhanced library of CF, which supports the use of high versions in the JDK8 environment. Another solution uses ListenableFuture provided by Guava. Of course, you can also refer to the code in JDK21 yourself.