ElasticJob's idempotency mechanism, which refers to the idempotency of the job's sliced execution, he needs to do the following two things:
-
The same slice will not be executed repeatedly on the current job instance
-
A job slice cannot execute on multiple job instances at the same time
How to realize idempotence
Scenario simulation: there exists a task A execution cycle of 10s. Under normal circumstances, the task processing time is 3-5s, but at a certain moment because of a sudden increase in the amount of data or because of database pressure, resulting in the task takes more than 10s. In the process, the task scheduling every 10s, if there is no idempotent, then there is a task at the same time more than one scheduling situation, processing the same data.
ElasticJob Task Execution.#execute()
public final void execute() {
...
// Get the sharding contexts for the current job server
ShardingContexts shardingContexts = ();
// Allow job events to be able to be sent
if (()) {
// Post job state tracking events
((), State.TASK_STAGING, ("Job '%s' execute begin.", jobName)); }
}
// After a scheduled job is triggered, if the previous job has not yet been executed, set the state of the slice to mirefire, indicating that a job execution was missed
if ((().keySet())) {
if (()) {
((), State.TASK_FINISHED, (
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, ().
().keySet())));
}
return;
}
...
}
The next main thing to look atimplementation logic
public boolean misfireIfHasRunningItems(Collection<Integer> items) {
// If no item is running, return false and the task is scheduled normally, otherwise set misfire
if (! (items)) {
return false; } else {
} else {
(items); } else {
return true; } else { (items); return
}
}
If there are slices that have not completed scheduling, call thesetMisfire(items)
Method. How can you tell if there are slices that have not finished scheduling, look at thehasRunningItems(items)
The implementation logic of the
public boolean hasRunningItems(Collection<Integer> items) {
LiteJobConfiguration jobConfig = (true);
if (null != jobConfig && ()) {
Iterator i$ = ();
int each;
do {
if (!i$.hasNext()) {
return false;
}
each = (Integer)i$.next();
} while(!((each)));
// (each)
/*
public static String getRunningNode(int item) {
return ("sharding/%s/running", item);
}*/
return true;
} else {
return false;
}
}
Turning on ElasticjobmonitorExecution
The slicing task begins with the creation of thesharding/sharding/running
node, which is deleted when the task is completed. So in the above code, it can be seen that you can determine if a slice is running by the presence or absence of a node for that slice.
At the same time, calling thesetMisfire(items)
method when it is judged by the code.setMisfire(items)
method creates persistent nodes /shading/{item}/misfire nodes for all the slices assigned under this instance, and as long as any of the slices assigned to this instance have not finished executing, the misfire node is added to all the slices under this instance, and then ignores the current task triggering execution and waits for the task to finish before executing it again.
public void setMisfire(Collection<Integer> items) {
Iterator i$ = ();
while(i$.hasNext()) {
int each = (Integer)i$.next();
((each));
}
// (each)
/**
static String getMisfireNode(int item) {
return ("sharding/%s/misfire", item);
}
*/
}
In this execution method (#execute()
)
//Execute the job
execute(shardingContexts, .NORMAL_TRIGGER);
// Clear the Misfire node if it exists
while ((().keySet())) {
// Clear the Misfire node
(().keySet()));
execute(shardingContexts, );;
}
To summarize: after the next scheduling cycle arrives, as soon as any of the slices of this slice is found to be executing, set all the slices of this instance slice to misfire, and wait for the task to finish executing, and then execute the next task scheduling uniformly.