Location>code7788 >text

How can I ensure that I don't lose data in the thread pool when the service is restarted?

Popularity:178 ℃/2024-08-30 11:39:38

Hi everyone, I'm Suzan and I'm back with you again.

preamble

Recently, a friend in my technical group, asked me a question: the service down, how to ensure that no data is lost in the thread pool?

This is quite an interesting question, and I'm taking it up with you today through this post.

1 What is a thread pool?

Previously, when there was no thread pool, in our code, there were two ways to create a thread:

  1. Inherit the Thread class
  2. Implementation of the Runnable interface

It's very easy to create a thread by either way, though.

But it also brings up the following question:

  1. Creating and destroying a thread, are more time-consuming, frequent creation and destruction of threads, very much affect the performance of the system.
  2. Creating unlimited threads can lead to a lack of memory.
  3. When a new task comes over, the thread must be created before it can be executed, you can't just reuse the thread.

To solve these problems above, Java introduced:thread pool

It is equivalent to a pool that holds threads.

Using a thread pool brings the following 3 benefits:

  1. Reduce resource consumption. Reduce consumption caused by thread creation and destruction by reusing created threads.
  2. Improve response time. When the task arrives, you can directly use the existing free thread, without waiting for the thread to be created can be immediately executed.
  3. Improve the manageability of threads. Threads are scarce resources, if they are created without limit, they will not only consume system resources, but also reduce the stability of the system. And if we use a thread pool, we can allocate, manage and monitor threads in a unified way.

2 Thread Pool Principles

Let's look at the thread pool constructor first:

public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)
  • corePoolSize: the number of core threads, the minimum number of threads maintained by the thread pool.
  • maximumPoolSize: maximum number of threads, the maximum number of threads allowed to be created by the thread pool.
  • keepAliveTime: thread alive time, when the number of threads exceeds the number of core threads, the survival time of the extra idle threads.
  • unit: unit of time.
  • workQueue: task queue, used to save the tasks waiting to be executed.
  • threadFactory: thread factory, used to create new threads.
  • handler: denial policy, the handling policy when the task cannot be executed.

The core flowchart of the thread pool is shown below:

The thread pool works as follows:

  1. Thread pool initialization: initialize core threads based on corePoolSize.
  2. Task submission: when a task is submitted to the thread pool, it is judged based on the current number of threads:
  • If the current number of threads is less than corePoolSize, create a new thread to execute the task.
  • If the current thread count is greater than or equal to corePoolSize, the task is added to the workQueue queue.
  1. Task processing: when there is a free thread, take the task from workQueue to execute.
  2. Thread Expansion: If the queue is full and the current number of threads is less than maximumPoolSize, create a new thread to process the task.
  3. Thread recycling: when thread idle time exceeds keepAliveTime, the extra threads will be recycled until the number of threads does not exceed corePoolSize.
  4. Rejection policy: If the queue is full and the current number of threads reaches maximumPoolSize, new tasks are processed according to the rejection policy.

To put it bluntly in the thread pool, extra tasks are put into the workQueue task queue.

The data for this task queue is kept in memory.

This can lead to some problems.

Next, see what's wrong with thread pools.

3 What are the problems with thread pooling?

In the JDK in order to facilitate the creation of thread pools, specifically provides Executors this tool class.

3.1 Excessive queue size

, which creates a thread pool with a fixed number of threads. The task queue uses LinkedBlockingQueue, and the default maximum capacity is Integer.MAX_VALUE.

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, 
                               nThreads,
                                     0L, 
                  ,
     new LinkedBlockingQueue<Runnable>(),
                          threadFactory);
}

If too many tasks are submitted to the newFixedThreadPool thread pool, it may cause the LinkedBlockingQueue to be very large, resulting in OOM problems.

3.2 Too many threads

, which creates bufferable thread pools, the maximum number of threads is Integer.MAX_VALUE, and the task queue uses SynchronousQueue.

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, 
                Integer.MAX_VALUE,
                               60L, 
                  ,
    new SynchronousQueue<Runnable>());
}

If too many tasks are submitted to the newCachedThreadPool thread pool, this can lead to the creation of a large number of threads and also OOM issues.

3.3 Loss of data

If the service is suddenly restarted while the thread pool is executing, the data in the thread pool may be lost.

The above OOM problem can be solved by customizing the thread pool in our daily development.

For example, create a thread pool like this:

new ThreadPoolExecutor(8, 
                       10,
                       30L, 
     ,
    new ArrayBlockingQueue<Runnable>(300),
            threadFactory);

Customizes a thread pool where both the maximum number of threads and the task queue are under control.

There are essentially no OOM issues with this.

However, the problem of data loss in the thread pool is difficult to solve by its own function alone.

4 How do I ensure that I don't lose my data?

The data in the thread pool, which is stored in memory, will be lost if you encounter a server reboot.

The previous system flow looked like this:

After the user request comes through, business logic 1 is processed first, which is the core function of the system.

The task is then submitted to the thread pool, which handles the business logic.2 It is a non-core function of the system.

However, if the service goes down while the thread pool is processing, at that point, the data for business logic 2 is lost.

So, how do you ensure that data is not lost?

A: RequiredDo persistence ahead of time

Our optimized system flow is as follows:

After the user request comes through, business logic 1 is processed first, followed by writing a piece of task data to the DB with the status: pending execution.

Processing business logic 1 and writing task data to the DB can be in the same transaction for easy rollback in case of exceptions.

Then there is a dedicated timed task for each period of time, in ascending order of when it was added, and the paging query status is the pending task.

The earliest mission, the first to be found out.

The checked out tasks are then submitted to the thread pool, which handles the business logic 2.

After successful processing, modify the pending status of the task to: executed.

It is important to note: the processing of business logic 2, to do idempotence design, the same request is allowed to be executed multiple times, the results will not have an impact.

If, at this point, the thread pool is processing and the service goes down, the data for business logic 2 will be lost.

But at this point the data of the tasks is saved in the DB and the status of losing those tasks is still: Pending.

At the start of the next timed task cycle execution, those task data are queried again and resubmitted to the thread pool.

The data that Business Logic 2 lost, came back automatically.

To account for failures, you also need to add to the task table aNumber of failuresFields.

Execution of business logic 2 fails in the thread pool of a timed task, and can be automatically retried on the next timed task execution.

But it's impossible to keep retrying indefinitely.

When a certain number of failures have been exceeded, the mission status can be changed to: failed.

This allows for subsequent manual processing.

 

One final note (ask for attention, don't patronize me)
If this article is helpful to you, or if you are inspired, help scan the QR code below to pay attention to it, your support is my biggest motivation to keep writing.

Ask for a one-click trifecta: like, retweet, and watch at.
Concerned about the public number: [Su San said technology], in the public number reply: interviews, code magic, development manuals, time management have awesome fan benefits, in addition to the reply: add group, you can communicate with a lot of BAT factory seniors and learning.