In the work, we must have encountered an interface to deal with more than one thing leads to the interface response speed is very slow, usually we will use a combination of two ways to improve the interface response speed
- Optimize query SQL to improve query efficiency
- Enable multi-threaded concurrent processing of business data
Here to discuss the second option: the use of multi-threaded concurrent processing of business data, and finally after the completion of the processing, assembled back to the front-end, each person's implementation of the program is different, I'm working in the past few years have experienced several ways to write.
I. Several common parallel processing writeups
Method 1: Future writing method
The code takes the following form
@Test
public void test1() {
//Defining a Thread Pool
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 30,
,
new ArrayBlockingQueue<>(10),
(),
new ());
//asynchronous execution
Future<String> getUserName = (() -> {
//do something...
return "kdyzm";
});
//asynchronous execution
Future<Integer> getUserAge = (() -> {
//do something...
return 12;
});
//Putting together the callback results
try {
UserInfo user = new UserInfo();
(());
(());
((user));
} catch (InterruptedException | ExecutionException e) {
();
}
}
@Data
static class UserInfo {
private String name;
private Integer age;
}
A few more SUBMITs are executed together, and finally the GET is centralized to get the final result.
Once there are more tasks this way, the code will look cluttered, and a bunch of variable names will make the code poorly readable.
Method 2: Writing
The code takes the following form
@Test
public void test2() {
try {
UserInfo userInfo = new UserInfo();
(
//asynchronous execution
(() -> {
("kdyzm");
}),
//asynchronous execution
(() -> {
(12);
})
//Synchronized return
).get();
((userInfo));
} catch (InterruptedException | ExecutionException e) {
();
}
}
@Data
static class UserInfo {
private String name;
private Integer age;
}
This approach uses the CompletableFuture API, by collecting multiple asynchronous tasks and scheduling them in a unified way and finally synchronizing them to the main thread via a get method. It's a bit simpler than using Future directly.
Method 3: CompletableFuture::join write method
The code takes the following form
@Test
public void test3(){
UserInfo userInfo = new UserInfo();
(
//asynchronous execution
(()->{
return "kdyzm";
//callback execution
}).thenAccept(name->{
(name);
}),
//asynchronous execution
(()->{
return 12;
//callback execution
}).thenAccept(age->{
(age);
})
//Wait for all threads to finish executing
).forEach(CompletableFuture::join);
((userInfo));
}
@Data
static class UserInfo {
private String name;
private Integer age;
}
This way of writing has higher readability compared to the above, but it also has a disadvantage: thenAccept can only receive one return value, if you want to deal with multiple values, there is no way to do so, you can only use method 2.
summarize
Several ways to write the second and third methods are more common, and more convenient to use, both have their advantages and disadvantages: method 2 can handle multiple return values, method 3 is more readable. But whether method 2 or method 3, their use is always to remember the relevant API, the use is not always very smooth, readability, although method 3 is a little stronger, but it is still almost interesting. At this point I had the idea of designing a simple parallel processing tool class, not only easy to use, but also high readability.
II. Parallel processing tool class design
1、Design pattern selection
Because I usually prefer chaining calls to the API, I wanted to use the builder pattern to implement this tool class as soon as I started designing it in the beginning. For more information about the builder pattern, see my previous post:Design Patterns (VI): Builder Pattern . The builder pattern is characterized in practice by chained calls, and both StringBuilder and lombok's @Data annotation use the builder pattern.
2. First version of the code
Modeled after Method 3, I developed the first version of the code
import ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
import ;
/**
* @author kdyzm
*/
@Slf4j
public class ConcurrentWorker {
private List<Task> workers = new ArrayList<>();
public static ConcurrentWorker runner() {
return new ConcurrentWorker();
}
public <R> ConcurrentWorker addTask(Consumer<? super R> action, Supplier<R> value) {
Task<R> worker = new Task<>(action, value);
(worker);
return this;
}
public void run() {
(item -> {
CompletableFuture completableFuture = (());
(completableFuture);
});
workers
.stream()
.map(
item -> {
return (());
}
)
.forEach(CompletableFuture::join);
}
@Data
public static class Task<R> {
private Consumer<? super R> action;
private Supplier<R> value;
private CompletableFuture<R> completableFuture;
public Task(Consumer<? super R> action, Supplier<R> value) {
= action;
= value;
}
}
}
This code is less than 60 lines in total, using Lambda expressions and functional programming related APIs to transform method three, the final use of the effect is as follows
@Test
public void test() {
UserInfo userInfo = new UserInfo();
()
//Add Tasks
.addTask(userInfo::setName, () -> {
//procrastinate1000Milliseconds print thread execution
try {
(1000);
} catch (InterruptedException e) {
();
}
(().getName()+"-name");
return "John Doe";
})
//Add Tasks
.addTask(userInfo::setAge, () -> {
//procrastinate1000Milliseconds print thread execution
try {
(1000);
} catch (InterruptedException e) {
();
}
(().getName()+"-age");
return 13;
})
//operate
.run();
((userInfo));
}
@Data
static class UserInfo {
private String name;
private Integer age;
private String sex;
}
The way it is used is
()
.addTask(setter function, return_value function )
.addTask(setter function, return_value function)
.run()
You can see that the ease of use enough, readability is also very good, but it has the same disadvantages as method three, can only receive a parameter, after all, it is based on the method 3 package, the next transformation of the code to make it support multi-parameter processing.
3. Second version of the code
It is known that the first version of the code already supports functionality of the form
()
.addTask(setter function, return_value function )
.addTask(setter function, return_value function)
.run()
Now I want to add overloaded methods of the form
.addTask(handle function)
That's right, just one parameter, and you can set any object value in this method. The final result is used as follows
@Test
public void test() {
UserInfo userInfo = new UserInfo();
()
.addTask(userInfo::setName, () -> {
try {
(1000);
(().getName());
} catch (InterruptedException e) {
();
}
(().getName()+"-name");
return "John Doe";
})
.addTask(userInfo::setAge, () -> {
try {
(1000);
(().getName());
} catch (InterruptedException e) {
();
}
(().getName()+"-age");
return 13;
})
//new approach:Handling arbitrary multi-attribute value padding
.addTask(()->{
try {
(1000);
} catch (InterruptedException e) {
();
}
(().getName()+"-sex");
("male");
})
.run();
((userInfo));
}
@Data
static class UserInfo {
private String name;
private Integer age;
private String sex;
}
The full tool class methods are as follows
import ;
import .slf4j.Slf4j;
import ;
import ;
import ;
import ;
import ;
/**
* @author kdyzm
*/
@Slf4j
public class ConcurrentWorker {
private List<Task> workers = new ArrayList<>();
public static ConcurrentWorker runner() {
return new ConcurrentWorker();
}
public <R> ConcurrentWorker addTask(Consumer<? super R> action, Supplier<R> value) {
Task<R> worker = new Task<>(action, value);
(worker);
return this;
}
public <R> ConcurrentWorker addTask(Runnable runnable) {
Task<R> worker = new Task<>(runnable);
(worker);
return this;
}
public void run() {
(item -> {
int taskType = ();
CompletableFuture completableFuture = null;
switch (taskType) {
case TaskType.RETURN_VALUE:
completableFuture = (());
break;
case TaskType.VOID_RETURN:
completableFuture = (());
break;
default:
break;
}
(completableFuture);
});
workers
.stream()
.map(
item -> {
int taskType = ();
switch (taskType) {
case TaskType.RETURN_VALUE:
return (());
default:
return (temp->{
//unoccupied
});
}
}
)
.forEach(CompletableFuture::join);
}
@Data
public static class Task<R> {
private Consumer<? super R> action;
private Supplier<R> value;
private CompletableFuture<R> completableFuture;
private Runnable runnable;
private int taskType;
public Task(Consumer<? super R> action, Supplier<R> value) {
= action;
= value;
= TaskType.RETURN_VALUE;
}
public Task(Runnable runnable) {
= runnable;
= TaskType.VOID_RETURN;
}
}
public static class TaskType {
/**
* returnable
*/
public static final int RETURN_VALUE = 1;
/**
* 没returnable
*/
public static final int VOID_RETURN = 2;
}
}
I divided the task type into two types and encapsulated them into constant values using the TaskType class: 1 means that the task execution callback has a return value; 2 means that the task execution does not have a return value, and the property filling will be done during the task execution, which is implemented using the Runnable interface.
4, tools jar package
I have packaged the code into a jar package uploaded to the central maven repository, you can introduce the following maven dependencies to use theConcurrentWorker
tools
<dependency>
<groupId></groupId>
<artifactId>kdyzm-util</artifactId>
<version>0.0.2</version>
</dependency>
Finally, feel free to follow my blog:
END.