Quartz Cluster Enhancements_02. Task Polling and Optimization
Reproduced with attribution/funnyzpc/p/18555665
open source address/funnyzpc/quartz
The main job of task polling is to poll for tasks at a fixed frequency (time5s
) goes to the execution table to fish for tasks that will be executed in the next 5s, polls these tasks and throws them into the thread pool to be executed when the execution time point is reached.
It seems to be very simple, but there are all kinds of problems, not to mention, please read on ~
Also, the main logic of task polling is in:QuartzSchedulerThread
Readers who are interested can check out the source code~
Tasks in the Polling Window
Here's how it looks like, first look at the picture.
Suppose, now, that there is a tasktask1
His execution time is every2 seconds.
is executed once, but there will be only oneNext execution time
(next_fire_time
), which falls on the diagram above is2s
position, so that in every5 seconds.
Polling once misses an execution (4s
position)
The solution to this problem is actually quite simple, which is that every time you start from thedb
The fetched executions are recalculated, except for the current one.5s
In particular, it is important to note that if the same time item is executed more than once within the current time period, there must be asequences
!
There will be cyclic waiting in the follow-up, but in the special case, using the above diagram: due to the existence of delays in other tasks in the same batch (if the delay is greater than or equal to2s
), at which point4s
This task at the time may predate2s
time of task execution, and at the same time due to the4s
The reference time for the task at the end of the day is2s
The time of the task (pre_fire_time
) 😂 (It's probably hard to understand, so I suggest checking out the follow up)update
statements)
Before it is thrown into the thread pool, the database is not available due to the2s
The task is not executed at that time, and the database is stored in the0s
The task configuration at the time, and thus the4s
The task at the time will not execute (because he can't compete for the lock) (2s
The mission reference is0s
current mandate4s
The reference is2s
tasks), this is a serious problem; if the tasks are ordered and the computed4s
The tasks of the time are always ranked2s
If a task is delayed for a certain period of time, it is guaranteed that the tasks for the following periods of time will be executed accordingly, thus avoiding the loss of tasks to a large extent.
Obtaining execution privileges (acquiring a lock)
Because of the problem of cluster concurrency, a task must at the same time be performed by only onenodal
to execute, and also to ensure that the executionsequences
So before a task is dropped into the thread pool, you need to do a databaseUPDATE
The competitive operation of the specificSQL
The statements are as follows:
UPDATE
QRTZ_EXECUTE SET
PREV_FIRE_TIME =? ,
NEXT_FIRE_TIME = ?,
TIME_TRIGGERED =?,
STATE =?,
HOST_IP =?,
HOST_NAME =?,
END_TIME =?
WHERE ID = ?
AND STATE = ? -- old STATE
AND PREV_FIRE_TIME = ? -- old PREV_FIRE_TIME
AND NEXT_FIRE_TIME = ? -- old NEXT_FIRE_TIME
As you can see, it must be that the record being updated must be to align theSTATE
、 PREV_FIRE_TIME
、 NEXT_FIRE_TIME
You're not allowed to update until you do.
Using Dynamic Thread Pools
Quartz
The general use ofSimpleThreadPool
As a thread pool for its tasks, since simplicity is inevitable, a fixed thread is used internally to process the
At first, I was going to make some changes to the source code, but then I realized that it's not as simple as this.Quartz
When acquiring a lock in the
The thread-local variables are used when (ThreadLocal
) Cachethread of execution
to do concurrency control, and then had to override most of the logic to do refactoring, which is a big change; now, for theQuartz Cluster Enhanced
No more.ThreadLocal
to focus on its own implementation logic for executing the thread pooling configuration, which is where theMeeThreadPool
Not only is there a thread allocation control also a queue, which is a big change, but now you can use theMeeThreadPool
It can also continue to be usedSimpleThreadPool
~
this isMeeThreadPool
The main logic of the
protected void createWorkerThreads(final int createCount) {
int cct = = createCount<1? ().availableProcessors() :createCount;
final MyThreadFactory myThreadFactory = new MyThreadFactory((), this);
= new ThreadPoolExecutor(cct<=4?2:cct-2,cct+2,6L, , new LinkedBlockingDeque(cct+2),myThreadFactory);
}
private final class MyThreadFactory implements ThreadFactory {
final String threadPrefix ;//= schedulerInstanceName + "_QRTZ_";
final MeeThreadPool meeThreadPool;
private final AtomicInteger threadNumber = new AtomicInteger(1);
public MyThreadFactory(final String threadPrefix,final MeeThreadPool meeThreadPool) {
= threadPrefix;
= meeThreadPool;
}
@Override
public Thread newThread(Runnable r) {
WorkerThread wth = new WorkerThread(
meeThreadPool,
threadGroup,
threadPrefix + ((())==count?(1):()),
getThreadPriority(),
isMakeThreadsDaemons(),
r);
if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
(().getContextClassLoader());
}
return wth;
}
}
Scalability and availability are greatly improved, and it's important to mention that if you use theThreadPoolExecutor
exploit (a resource)Quartz
Thread pooling must be taken care of:
- After the core threads hit full
task
It must be in the queue first. - Threads are created sequentially until the maximum number of threads is reached when the queue is full.
- Be sure to note if there is a strategy for handling exception rejections when threads are full, and if you don't want exception rejections then consider determining if the thread pool is full before submitting the task
- Development must be completed with extensive testing to meet expectations
Polling timeout/execution timeout issues
existJVM
fulfillmentGC
orDB
orreticulation
remainmalfunctions
Or the mainframe.Performance bottlenecks exist
or the thread pool is full .... etc., there will be a timeout problem, for this kind of problem thisQuartz Cluster Enhanced
The following optimizations were made:
- Tolerance offsets were done to allow tasks to be executed earlier without being bound to a difference of a few milliseconds
//1. Time offset (6 milliseconds)
long ww = ()-1000<0 ? 4L : ((()-1000L)/2000L)+4L ;
ww= (ww, 8L) ;
while( ! () && (()-now)<=LOOP_INTERVAL ){
long _et = ();
QrtzExecute ce = null; // (0).
for( int i = 0;i< ();i++ ){
QrtzExecute el = (i); // This is the task to be executed immediately.
// This is the task to execute immediately
if( ()-_et <= ww){
ce=el;
break;
}
if(i==0){
ce=el; break; } if(i==0){ ce=el; }
continue; // If the execution list is one long, it will go straight to the following sleep wait
}
// Always get the most recent time
if( () <= () ){
ce = el; }
}
}
(ce); // Must be removed, otherwise you can't exit the while loop!!!!
// Delay
long w = 0; if((w = (()-()-()-gt))
if((w = (()-()-ww)) >0 ){
try {
(w); }catch (Exception e)
}catch (Exception e){
}
}
// Subsequent code omitted
}
- For task polling, the polling interval is guaranteed and offset correction is also done
// Delay
long st = 0;
if((st = (LOOP_INTERVAL-(()-now)-2)) >0 ){
try {
(st); }
} catch (InterruptedException e) {
();
}
}
if( st<-10 && st%5==0 ){
("Current subtask polling timeout: "+st); }
}
// Necessary to prevent polling from timing out
now = st<-1000?
()/1000*1000 :
()+(st<-10?st:0);
-
Task corrections were made for factual delays
This fix relies heavily on the
ClusterMisfireHandler
of the polling process to ensure that subsequent interrupted tasks can be resumed in a timely manner~
For the offset, it needs to be explained:misalignment
is for the entire loop, and the task loops once are 5s
, due to write table or task submission may cause the whole loop will bemilliseconds
maybetens of milliseconds
The deviation , which isBackward offset
If the task finishes early, the entire loop may be insufficient.5s
this isforward deviation
~
Both forward and backward are to be avoided~
ultimate
To get a clearer picture of theQuartz Cluster Enhanced
It is recommended to go over the structure chart: