Location>code7788 >text

Quartz Cluster Enhancements_02. Task Polling and Optimization ❤️

Popularity:757 ℃/2024-11-19 23:05:57

Quartz Cluster Enhancements_02. Task Polling and Optimization

Reproduced with attribution/funnyzpc/p/18555665

open source address/funnyzpc/quartz

The main job of task polling is to poll for tasks at a fixed frequency (time5s) goes to the execution table to fish for tasks that will be executed in the next 5s, polls these tasks and throws them into the thread pool to be executed when the execution time point is reached.
It seems to be very simple, but there are all kinds of problems, not to mention, please read on ~
Also, the main logic of task polling is in:QuartzSchedulerThread Readers who are interested can check out the source code~

Tasks in the Polling Window

Here's how it looks like, first look at the picture.

Suppose, now, that there is a tasktask1 His execution time is every2 seconds.is executed once, but there will be only oneNext execution time(next_fire_time), which falls on the diagram above is2sposition, so that in every5 seconds.Polling once misses an execution (4sposition)
The solution to this problem is actually quite simple, which is that every time you start from thedbThe fetched executions are recalculated, except for the current one.5s In particular, it is important to note that if the same time item is executed more than once within the current time period, there must be asequences
There will be cyclic waiting in the follow-up, but in the special case, using the above diagram: due to the existence of delays in other tasks in the same batch (if the delay is greater than or equal to2s), at which point4sThis task at the time may predate2s time of task execution, and at the same time due to the4s The reference time for the task at the end of the day is2s The time of the task (pre_fire_time) 😂 (It's probably hard to understand, so I suggest checking out the follow up)updatestatements)
Before it is thrown into the thread pool, the database is not available due to the2s The task is not executed at that time, and the database is stored in the0s The task configuration at the time, and thus the4sThe task at the time will not execute (because he can't compete for the lock) (2sThe mission reference is0scurrent mandate4sThe reference is2stasks), this is a serious problem; if the tasks are ordered and the computed4sThe tasks of the time are always ranked2s If a task is delayed for a certain period of time, it is guaranteed that the tasks for the following periods of time will be executed accordingly, thus avoiding the loss of tasks to a large extent.

Obtaining execution privileges (acquiring a lock)

Because of the problem of cluster concurrency, a task must at the same time be performed by only onenodalto execute, and also to ensure that the executionsequences So before a task is dropped into the thread pool, you need to do a databaseUPDATE The competitive operation of the specificSQLThe statements are as follows:

UPDATE
	QRTZ_EXECUTE SET 
	PREV_FIRE_TIME =? ,
	NEXT_FIRE_TIME = ?,
	TIME_TRIGGERED =?,
	STATE =?,
	HOST_IP =?,
	HOST_NAME =?,
	END_TIME =?
WHERE ID = ?
AND STATE = ? -- old STATE
AND PREV_FIRE_TIME = ? -- old PREV_FIRE_TIME
AND NEXT_FIRE_TIME = ? -- old NEXT_FIRE_TIME

As you can see, it must be that the record being updated must be to align theSTATEPREV_FIRE_TIMENEXT_FIRE_TIME You're not allowed to update until you do.

Using Dynamic Thread Pools

    Quartz The general use ofSimpleThreadPool As a thread pool for its tasks, since simplicity is inevitable, a fixed thread is used internally to process the
At first, I was going to make some changes to the source code, but then I realized that it's not as simple as this.Quartz When acquiring a lock in the
The thread-local variables are used when (ThreadLocal) Cachethread of execution to do concurrency control, and then had to override most of the logic to do refactoring, which is a big change; now, for theQuartz Cluster Enhanced No more.ThreadLocal to focus on its own implementation logic for executing the thread pooling configuration, which is where theMeeThreadPool Not only is there a thread allocation control also a queue, which is a big change, but now you can use theMeeThreadPool It can also continue to be usedSimpleThreadPool

this isMeeThreadPool The main logic of the


    protected void createWorkerThreads(final int createCount) {
        int cct =  = createCount<1? ().availableProcessors() :createCount;
        final MyThreadFactory myThreadFactory = new MyThreadFactory((), this);
         = new ThreadPoolExecutor(cct<=4?2:cct-2,cct+2,6L, , new LinkedBlockingDeque(cct+2),myThreadFactory);
    }
        
    private final class MyThreadFactory implements ThreadFactory {
        final String threadPrefix ;//= schedulerInstanceName + "_QRTZ_";
        final MeeThreadPool meeThreadPool;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        public MyThreadFactory(final String threadPrefix,final MeeThreadPool meeThreadPool) {
             = threadPrefix;
             = meeThreadPool;
        }

        @Override
        public Thread newThread(Runnable r) {
            WorkerThread wth = new WorkerThread(
                    meeThreadPool,
                    threadGroup,
                    threadPrefix + ((())==count?(1):()),
                    getThreadPriority(),
                    isMakeThreadsDaemons(),
                    r);
            if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
                (().getContextClassLoader());
            }
            return wth;
        }
    }

Scalability and availability are greatly improved, and it's important to mention that if you use theThreadPoolExecutor exploit (a resource)Quartz Thread pooling must be taken care of:

  • After the core threads hit fulltask It must be in the queue first.
  • Threads are created sequentially until the maximum number of threads is reached when the queue is full.
  • Be sure to note if there is a strategy for handling exception rejections when threads are full, and if you don't want exception rejections then consider determining if the thread pool is full before submitting the task
  • Development must be completed with extensive testing to meet expectations

Polling timeout/execution timeout issues

existJVMfulfillmentGCorDBorreticulationremainmalfunctionsOr the mainframe.Performance bottlenecks existor the thread pool is full .... etc., there will be a timeout problem, for this kind of problem thisQuartz Cluster Enhanced The following optimizations were made:

  • Tolerance offsets were done to allow tasks to be executed earlier without being bound to a difference of a few milliseconds
   //1. Time offset (6 milliseconds)
   long ww = ()-1000<0 ? 4L : ((()-1000L)/2000L)+4L ;
   ww= (ww, 8L) ;
   while( ! () && (()-now)<=LOOP_INTERVAL ){
       long _et = ();
       QrtzExecute ce = null; // (0).
       for( int i = 0;i< ();i++ ){
           QrtzExecute el = (i); // This is the task to be executed immediately.
           // This is the task to execute immediately
           if( ()-_et <= ww){
               ce=el;
               break;
           }
           if(i==0){
               ce=el; break; } if(i==0){ ce=el; }
               continue; // If the execution list is one long, it will go straight to the following sleep wait
           }
           // Always get the most recent time
           if( () <= () ){
               ce = el; }
           }
       }
       (ce); // Must be removed, otherwise you can't exit the while loop!!!!
       // Delay
       long w = 0; if((w = (()-()-()-gt))
       if((w = (()-()-ww)) >0 ){
           try {
               (w); }catch (Exception e)
           }catch (Exception e){
           }
       }
       // Subsequent code omitted
   }
  • For task polling, the polling interval is guaranteed and offset correction is also done
     // Delay
     long st = 0;
     if((st = (LOOP_INTERVAL-(()-now)-2)) >0 ){
         try {
             (st); }
         } catch (InterruptedException e) {
             ();
         }
     }
     if( st<-10 && st%5==0 ){
         ("Current subtask polling timeout: "+st); }
     }
     // Necessary to prevent polling from timing out
     now = st<-1000?
             ()/1000*1000 :
             ()+(st<-10?st:0);
  • Task corrections were made for factual delays

    This fix relies heavily on theClusterMisfireHandler of the polling process to ensure that subsequent interrupted tasks can be resumed in a timely manner~

For the offset, it needs to be explained:misalignmentis for the entire loop, and the task loops once are 5s , due to write table or task submission may cause the whole loop will bemillisecondsmaybetens of millisecondsThe deviation , which isBackward offsetIf the task finishes early, the entire loop may be insufficient.5s this isforward deviation ~
Both forward and backward are to be avoided~

ultimate

To get a clearer picture of theQuartz Cluster Enhanced It is recommended to go over the structure chart: