Happy Moment
I just had a fight with my wife and I'm so angry I want a divorce.
Daughter suddenly stood out to persuade: Can't you beat a child to reduce anger, have to divorce?
My wife and I looked at our daughters at the same time and each rolled up our sleeves
My daughter added: my brother is so small, he won't hold a grudge if you hit him
Requirements background
The project is based onDataX
To achieve offline synchronization of data between heterogeneous sources, I've done some freshening up and revamping of Datax
Heterogeneous data sources synchronization of data synchronization → datax transformation, a little bit of interest
Heterogeneous data sources synchronization of data synchronization → datax re-engineering, start touching the source code
Synchronization of heterogeneous data sources → DataX usage details
Heterogeneous Data Source Data Synchronization → Encryption and Decryption of DataX Sensitive Information from Source Code Analysis
Heterogenous Data Synchronization → Why should DataX support kafka?
Heterogenous Data Synchronization → How to get the amount of data that DataX has synchronized?
I thought that offline synchronization was over and there would be no new requirements, but the punch in the face came very quickly, and the product manager quickly reached out to me with the following quote
Yesterday I tried the offline sync feature in my test development environment and it did a great job of fulfilling the requirements I mentioned, kudos to you!
However, I encountered a situation during the use of a table with a large amount of data, at first I did not pay attention to the amount of data, so I configured the full amount of synchronization, after the start of the synchronization is not synchronized to complete the delay, I realized that the amount of data in the table is very large, only to find out more than 200 million data, I want to terminate the synchronization, but found that there is no place to terminate operations
That's why we need to add a feature: tasks in synchronization can be terminated.
This tactic is kind of being played by the product manager to understand, first to affirm me, and then point out the pain points in the use of the new features for the pain points, so that I do not have any room to refute; as a reasonable developer, in the face of a very reasonable demand, we are still very happy to accept it, don't you think so?
As soon as the demand picks up, the problem arises
How to terminate synchronization
Before we think about that, let's review the launch of DataX; remember how we integrated DataX.Heterogeneous data sources synchronization of data synchronization → datax re-engineering, start touching the source code The qsl-datax-hook module has been added to the list of modules that can be accessed with the command
Process process = ().exec(realCommand);
The realCommand is the java command that starts DataX, similar to thejava -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -=/datax -classpath /datax/lib/* -mode standalone -jobid -1 -job
To start DataX, that is, to start a separate java process for DataX; then how to stop DataX, the idea is not there? Is the problem transformed into
How to terminate a java process
Termination of processes
How to terminate a process, this I'm sure you all know
Linux:kill -9
pid
Win: /c taskkill /PIDpid
/F /T
However, there is a prerequisite for this. You need to know what the DataX java process'spid
and in JDK8Process
The methodology is as follows
There is no way to get the pid, how do we get the pid of a DataX process without adjusting the JDK version, which is different for different operating systems.Linux
cap (a poem)Win
carry out
-
Linux
The implementation is relatively simple and is based on the JDK alone.
Field field = ().getDeclaredField("pid"); (true); int pid = (process);
Get the member variables of the process implementation through reflection
pid
value; this code, you should all be able to read it -
Win
On Win systems, you need to rely on a third-party tool
oshi
<dependency> <groupId></groupId> <artifactId>oshi-core</artifactId> <version>6.6.5</version> </dependency>
Getting the pid is accomplished as follows
Field field = ().getDeclaredField("handle"); (true); long handle = (process); winntHandle = new (); ((handle)); int pid = (winntHandle);
Reflection is also used, as well as methods provided by oshi
Combine them together and you get the method to get the pid
/**
* Getting the processID
* @param process step
* @return stepid,-1Indicates a failure to acquire
* @author greenstone road
*/
public static int getProcessId(Process process) {
int pid = NULL_PROCESS_ID;
Field field;
if (()) {
try {
field = ().getDeclaredField("handle");
(true);
long handle = (process);
winntHandle = new ();
((handle));
pid = (winntHandle);
} catch (Exception e) {
("Getting the processidfail (e.g. experiments),Exception information:", e);
}
} else if (() || ()) {
try {
field = ().getDeclaredField("pid");
(true);
pid = (process);
} catch (Exception e) {
("Getting the processidfail (e.g. experiments),Exception information:", e);
}
}
("stepid={}", pid);
return pid;
}
Are we getting the right pid, do we need to verify it? Write amainClass
/**
* mainClass
* @author greenstone road
*/
public class HookMain {
public static void main(String[] args) throws Exception {
String command = "";
if (()) {
command = "ping -n 1000 localhost";
} else if (() || ()) {
command = "ping -c 1000 localhost";
}
Process process = ().exec(command);
int processId = (process);
("ping stepid = " + processId);
new Thread(() -> {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader((), ("")))) {
String line;
while ((line = ()) != null) {
(line);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}).start();
}
}
Using maven to package an executable jar
<build>
<plugins>
<plugin>
<groupId></groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId></groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>target/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Then execute the jar
java -jar qsl-datax-hook-0.0.
Let's look at the output
-
Linux
The jar output log is as follows
Let's ps the process.
ps -ef|grep ping
-
Win
The jar output log is as follows
Let's take a look at the ping process in Task Manager.
As you can see, the pid is correct for both Linux and Win; once you have the pid, terminating the process is easy!
/**
* Termination of processes
* @param pid processPID
* @return true:successes,false:fail (e.g. experiments)
*/
public static boolean killProcessByPid(int pid) {
if (NULL_PROCESS_ID == pid) {
("pid[{}]exceptions", pid);
return false;
}
String command = "kill -9 " + pid;
boolean result;
if (()) {
command = " /c taskkill /PID " + pid + " /F /T ";
}
Process process = null;
try {
process = ().exec(command);
} catch (IOException e) {
("Termination of processes[pid={}]exceptions:", pid, e);
return false;
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader((), StandardCharsets.UTF_8))) {
//Kill the process.
String line;
while ((line = ()) != null) {
(line);
}
result = true;
} catch (Exception e) {
("Termination of processes[pid={}]exceptions:", pid, e);
result = false;
} finally {
if (!(process)) {
();
}
}
return result;
}
The complete process should be
-
utilization
().exec(java command)
Start DataX and get theProcess
java commands refer to the java commands that start DataX, such as
java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -=/datax -classpath /datax/lib/* -mode standalone -jobid -1 -job
-
pass (a bill or inspection etc)
ProcessUtil#getProcessId
Get the Process'spid
and bound to synchronized task information for persistenceThe corresponding pid can be queried from the task id.
-
trigger a task
terminate (law)
to find the corresponding pid via the task id, via theProcessUtil#killProcessByPid
Termination of processesTerminating the process also terminates the synchronization task
in the event thatqsl-datax-hook
is a single node, the above processing solution is not a problem, but the production environment, qsl-datax-hook can not be a single node, must be a cluster deployment, then the above solution will not work, why? Let me give you an example.
Suppose qsl-datax-hook has 2 nodes: A and B. A DataX synchronization task (taskId = 666) is started on node A and gets the corresponding pid = 1488, and the request to terminate task 666 is load balanced to node B. What happens?
- If there is no process with pid = 1488 on node B, then the termination fails without affecting nodes A and B.
- If there is a process with pid = 1488 on node B, which could be the DataX synchronization task process or some other process, then this termination operation will result in either a minor or major failure!
However, the synchronization task that needs to be terminated is still executing peacefully on node A.
So in cluster mode, we not only need to bind the pid to the task, we also need to bind the information of the node where the task is executed, which can be theNode ID
It can also beNode IP
As long as the node can be uniquely identified on the line; the specific implementation of the program, you need to combine the specific load balancing components to do the design, by the load balancing components will be task termination request distribution to the correct node, and can not use the conventional load balancing strategy for the distribution of the; because of the load balancing components, so the implementation of the program can not be uniformly designed, need to be combined with your own project to achieve, I believe that for you is very I believe it is very simple for you.
summarize
- Tasks are started differently and terminated differently, and how to terminate gracefully is a key point to consider
- Directly kill the process of the way, simple and brutal, but not elegant enough, once the wrong kill, the problem can be big or small, if there are other ways, do not recommend choosing this way
- The termination method that applies to a single node does not necessarily apply to clusters, and you must make all-around considerations when designing a program.
- Sample code:qsl-datax-hook