Location>code7788 >text

Java uses multithreading to handle unknown tasks

Popularity:338 ℃/2025-03-21 13:02:18

Knowing the number of tasks, you can define the number of threads rules and generate the number of threads to run.

Code description:

Virtual thread pool:

  • Use () to create a virtual thread pool, and each task will be assigned a virtual thread to execute.

Submit the task and return the result:

  • Each task is submitted through (), and the task will return a result (here is a string, which simulates the processing results of the task).

  • Each CompleteFuture saves the return value of the task.

Wait for all tasks to complete:

  • Use ((new CompleteFuture[0])) to wait for all CompleteFutures to complete. () blocks the current thread until all tasks are completed.

Collect results:

  • Use Java 8's stream() method and () to collect the results of all tasks and merge them into a Listmiddle.
  • CompletableFuture::join gets the result of each task and if there is an exception, it throws a CompletionException, so you can handle exceptions as needed.

Close the virtual thread pool:

Finally, close the thread pool by () and free up the resources.

public static void main(String[] args) throws InterruptedException {
         // Create a thread pool for virtual threads
         ExecutorService executorService = ();

         // Suppose we have 10 tasks, each task returns a string
         int numTasks = 10;
         List<CompletableFuture<String>> futures = new ArrayList<>(numTasks);

         // Submit tasks to virtual thread pool
         for (int i = 0; i < numTasks; i++) {
             int taskId = i;
             // Put the results of each task into CompletableFuture
             CompleteFuture<String> future = (() -> {
                 try {
                     // Simulation work
                     ("Task " + taskId + " started on " + ());
                     (1000); // Analog delay
                     String result = "Result of task " + taskId;
                     ("Task " + taskId + " completed on " + ());
                     return result;
                 } catch (InterruptedException e) {
                     ().interrupt();
                     return "Task " + taskId + " was interrupted";
                 }
             }, executorService);

             (future); // Add each future to the collection
         }

         // Wait for all tasks to complete and get the results
         CompleteFuture<Void> allOf = ((new CompleteFuture[0]));
         (); // Block until all tasks are completed

         // Merge the results of all tasks into a collection
         List<String> results = ()
                                       .map(CompletableFuture::join) // Get the result of each task
                                       .collect(()); // Merge into list

         // Print the result
         ("All results: " + results);

         // Close the virtual thread pool
         ();
     }

Java is not sure about the number of threads. To execute asynchronously, multi-threaded execution, and wait for all threads to execute, and then get the results and merge them.

explain:

Tasks: We created a List<Callable> to save all asynchronous tasks to be executed, each task returns an Integer result.

Create a thread pool: Use (5) to create a thread pool of size 5, which can execute 5 threads concurrently. The size of the thread pool can be dynamically adjusted according to actual needs.

Submit tasks and get the Future list: (tasks) method submits all tasks and returns a List<Future>. Each Future object represents the result of an asynchronous task.

Wait for the task to complete and merge the results: block the current thread through the () method until the task completes and returns the result. We accumulate the results of all tasks in sum.

Close the thread pool: Finally, use () to close the thread pool to ensure that all threads can be recycled correctly after the task is completed.

Important matters

  • invokeAll(): will block the current thread until all tasks are completed. If the task execution time is uncertain, it is more appropriate to use invokeAll(), which will wait for all tasks to complete and return to the Future list.
  • (): This method blocks the current thread until the task is completed. If there is an exception in the task execution, get() will throw an exception.
  • Thread pool management: Use ExecutorService to facilitate the management of thread pool size and avoid the performance losses caused by frequent creation and destruction of threads.
public static void main(String[] args) throws InterruptedException, ExecutionException {
         // Suppose we have some tasks that need to be executed concurrently
         List<Callable<Integer>> tasks = new ArrayList<>();
        
         // Create some tasks
         for (int i = 0; i < 10; i++) {
             final int taskId = i;
             (() -> {
                 // Simulate the task execution and return a result
                 (1000); // Time-consuming simulation task
                 return taskId * 2; // Assume that the task returns 2 times the taskId
             });
         }

         // Create a fixed-size thread pool
         ExecutorService executorService = (5);

         try {
             // Submit all tasks and return a Future list
             List<Future<Integer>> futures = (tasks);

             // Wait for all tasks to complete and merge the results
             int sum = 0;
             for (Future<Integer> future : futures) {
                 sum += (); // Get the task results and merge
             }

             // Output the merge result of all tasks
             ("Total sum: " + sum);

         } finally {
             // Close the thread pool
             ();
         }
     }

Actual case: The result of multi-threading API and then merging API is returned to the front end

Declare task queue collection

/*The variable value corresponds to Map*/
         List<VarResultDto> results = new ArrayList<>();
         // Suppose we have some tasks that need to be executed concurrently
         List<Callable<Map<String, Object>>> tasks = new ArrayList<>();

Join the task and then join the task queue

(() -> {
                     Map<String, Object> respTask = new HashMap<>();
                     List<VarResultDto> listTaskResp = new ArrayList<>();
                     List<String> listTaskError = new ArrayList<>();
                     try {
                         ("Execute API request {} apiId:[{}]", (), (), ());
                         /*Request API to get results*/
                         R<Object> objectR = (vo);
                         // Analysis results
                         JSONObject apiResp = (objectR);
                         if (("code") == 200 || ("code") == 0) {
                             apiResp = ("data");
                         }
                         // JavaScript data processing
                         if ((())) {
                             try {
                                 String newJson = ((apiResp), ());
                                 apiResp = (newJson);
                                 ("JavaScript data processing is completed");
                             } catch (Exception e) {
                                 ("JavaScript data processing exception: {}", (apiVarInfoDto));
                             }
                         }

                         final JSONObject tempData = apiResp;
                         (relation -> {
                             String value = (tempData, (), "");
                             if ((value)) {
                                 // *Set variables and actual values*
                                 VarResultDto resultDto = new VarResultDto();
                                 (());
                                 (());
                                 (value);
                                 (resultDto);
                             } else {
                                 String error = "API interface:[" + () + "]Cannot get variable:[" + () + "]Valid data, reason:[" + "API address:" + () + "->Return error:" + () + "]";
                                 (error);
                             }
                         });
                         ("results", listTaskResp);
                         ("errorLogs", listTaskError);
                     } catch (Exception e) {
                         ("Request API->{} failed!{}", (), (), e);
                         boolean contains = ().contains("TIMEOUT");
                         /*Record error results*/
                         (relation -> {
                             String error = "API interface:[" + () + "] cannot obtain variables:[" + () + "] valid data, reason:[" + (contains ? "Data interface acquisition timeout" : ()) + "]";
                             (error);
                         });
                         ("errorLogs", listTaskError);
                     }
                     return respTask;
                 });

Submit tasks to execute, get the results of all tasks, and merge the results

String defaultThreadPoolSize = ("api_fork_join_size", "5");
         // Create a fixed-size thread pool
         try (ExecutorService executorService = ((defaultThreadPoolSize))) {
             try {
                 // Submit all tasks and return a Future list
                 List<Future<Map<String, Object>>> futures = (tasks);

                 // Wait for all tasks to complete and merge the results
                 List<Map<String, Object>> sum = new ArrayList<>();
                 for (Future<Map<String, Object>> future : futures) {
                     // Get the task results and merge
                     (());
                 }
                 // Output the merge result of all tasks
                 for (Map<String, Object> stringObjectMap : sum) {
                     Object results1 = ("results");
                     if (results1 != null) {
                         ((List<VarResultDto>) results1);
                     }
                     Object errorLogs1 = ("errorLogs");
                     if (errorLogs1 != null) {
                         ((List<String>) errorLogs1);
                     }
                 }
             } catch (Exception e) {
                 ("Multi-threading--parallel processing-Error {}", (), e);
             } finally {
                 // Close the thread pool
                 ();
             }
         }