Quartz Cluster Enhanced Edition_01. Cluster and Misfire Handler (ClusterMisfireHandler)
Reproduced with attribution /funnyzpc/p/18542452
primary purpose
-
Applications (
app
) with nodes (node
) state synchronizationregardless of whether...
node
neverthelessapp
All of them can be accessed through the correspondingstate
to control the node and the entire applicationstart and stop
This is an important feature, while locking operations for clusters/default fires are also based on theapp
to do so, while attaching theapp
This lock on the top is to control concurrent operations between all applications and clusters, and is equally important ~ the -
Task status and execution status updates
Because task scanning primarily operates on execution time entries (
execute
) information, what also changes is the state of the execution item (state
), so the task needs to be updated (job
) state -
Resumption of flame-out missions
The process of task scan scheduling may have
GC
up toDB Disconnect
situation that needs to be corrected in a timely mannernext_fire_time
to ensure that it will be scanned and executed normally after the exception is recovered -
Clear History
Cleanup of the execution frequency is very low, if possible, it is recommended that the back of the pipe to access click sdk manual operation, here is the automatic cleanup of the bottom of the pocket program, based on the database locks of the task concurrency in the table when the less data the performance of the theory of the better ~ , the automatic cleanup of the two major tasks:
- 1. Clean up execution of invalid applications and non-execution nodes
- 2. Cleanup tasks and implementation configuration
-
Create application and execution nodes
This is a necessary operation to pre-create nodes and applications to facilitate subsequent management, while execution scheduling also depends on the state of the nodes and applications.
pretreatment
Pre-processing refers toQuartz
The maintenance that must be done at startup consists of three main parts:
- 01.Write application (
app
) and nodes (node
It's very important. - 02. Restore/update application status
- Sends the executing or abnormal
job
Take it out and check its associated executable by executable (execute
) of the status update task (job
) state, if
Multiple states exist for multi-execution items, and the states are prioritized (from highest to lowest):ERROR
->EXECUTING
->PAUSED
->COMPLETE
The code representation is :
List<QrtzExecute> executes = getDelegate().getExecuteByJobId(conn,());
boolean hasExecuting = false;
boolean hasPaused = false;
boolean hasError = false;
boolean hasComplete = false;
for( QrtzExecute execute:executes ){
final String state = ();
if("EXECUTING".equals(state)){
hasExecuting=true;
}else if("PAUSED".equals(state)){
hasPaused=true;
}else if("ERROR".equals(state)){
hasError=true;
}else if("COMPLETE".equals(state)){
hasComplete=true;
}else{
continue; // Here's the generalINIT
}
}
// If all states are present then they are prioritized as follows
String beforeState = ();
if(hasError){
("ERROR");
}else if(hasExecuting){
("EXECUTING");
}else if(hasPaused){
("PAUSED");
}else if(hasComplete){
("COMPLETE");
}else{
continue; // This corresponds to the aboveINITstate of affairs,leave sth. unprocessed
}
// No unnecessary updates...
if(!().equals(beforeState)){
(now);
getDelegate().updateRecoverJob(conn,job);
}
- 03. Restore/update implementation status
Get all tasks under the current application that are executing or have exceptions (job
), and progressively resuming all the activities under the mandate (EXECUTING
) or anomalies (ERROR
) tasks, mainly recalculating thenext_fire_time
post-processing
- 01. The content of the post-processing is to contain all the pre-processing, and at the same time do the locking on the cluster concurrency (this is very important, and will be discussed in the latter paragraph)
- 02. Synchronization of node state and application state inconsistencies
- 03. Updates
check
flag, thischeck
The marking is mainly used to facilitate the subsequent cleaning, and at the same timeapp
The check (time_next
) is used as a basis for determining the locking cycle
? About concurrent lock handling
This problem can be elaborated on by saying that generally a loop is 15s (TIME_CHECK_INTERVAL
) ,In a clustered environment there is a problem of concurrency of multiple nodes at the same time, so there is a duplication of execution for the processing of clusters and missing fires
At first my thinking was along the lines of optimistic locking, and the code looked roughly like this:
int ct = getDelegate().updateQrtzAppByApp(conn,app); // 5.
// 5. Clear and recover to reduce reads and writes by acquiring the app lock.
if( ct>0 ){
// Process the lock after it is acquired.
}
However, there is a duplication of implementation in this way, the details of which are first shown in the figure:
figure abovenode1
together withnode2
The difference in the start time of the5s
, so it causes the time for them to acquire the lock to exist5s
The time difference as there is this5s
exists, multiple nodes can almost always execute thisupdate
statement to acquire a lock, so that there is bound to be duplicate execution of the logic down the line!
Task Scheduling Scan (QuartzSchedulerThread
) is to harmonize etc.next_fire_time
moment to contend for the lock, while cluster/default processing (ClusterMisfireHandler
) in awhile
Inside the big loop, this loop is15s
, so the period executed by each node is15s
(TIME_CHECK_INTERVAL
), whereas the lock competition is in the execution of theupdate
moment
If the borrowed task scans (QuartzSchedulerThread
The idea is to add anotherwhile
orsleep
Wait until the next check_time(time_next
), the code would be as follows.
long t=0; // Here check_time is the application check time, and loop_time is the current loop start time.
// check_time is the check time of the application, and loop_time is the start time of the current loop.
if( (t=check_time-loop_time)> 0 ){
(t); }
}
int ct = getDelegate().updateQrtzAppByApp(conn,app);
// 5. Clear and recover to minimize reads and writes only if the app lock is obtained.
if( ct>0 ){
// Process the lock after it is acquired.
}
This will basically ensure that more than onenode
Competing for the same lock at the same time... , which has the added benefit of basically guaranteeing that each node'sClusterMisfireHandler
(used form a nominal expression)cycle time
It's basically the same, while sleep allows you to randomly break up the loop time (adding offsets) to put theClusterMisfireHandler
The loop processing is broken up and executed at other nodes .
But, but oh, if usingsleep
+ update
The same way that theupdate
) The overhead of competition, so I borrowed from theshedlock
The open-source project is inspired by thinking about whether it's possible to judge competition for locks before thelock time
After acquiring the lock, add alock time
😂lock time
Those within no longer go to compete for locks, and those outside the locking time do, roughly as shown:
Looking at the graph, if we assume thatnode1
precedenode2
execution, whennode1
After acquiring the lock at 14:15, his next execution time is expected to be 14:30, and if you add a10s
(used form a nominal expression)lock time
(blue line in the figure), that is, at 15:25 and before is not allowed to compete for locks, so that when thenode2
Before attempting to acquire the lock at 14:20, the most recent lock was found to be at 14:25 (and later), at which time thenode2
will automatically drop the contention lock (executeupdate
), and then go to the next point in time, 14:35, and judge the lock time point again, but of course this doesn't come without a price, so you'll have to figure it out on your own 😂.
The modified code is as follows.
// TIME_CHECK_INTERVAL is the cycle time, fixed at 15 seconds.
long tw = TIME_CHECK_INTERVAL/10*3; // 70% concurrency reduction
if( (()-_start)>tw ){
continue;
}
// 5. Clear and recover to reduce reads and writes by acquiring an app lock
if( ct>0 ){
// Processing of the lock after it is acquired
}