Location>code7788 >text

Heterogenous Data Synchronization → How do I manually terminate DataX synchronization after it has started?

Popularity:820 ℃/2024-11-06 02:53:40

Happy Moment

I just had a fight with my wife and I'm so angry I want a divorce.
Daughter suddenly stood out to persuade: Can't you beat a child to reduce anger, have to divorce?
My wife and I looked at our daughters at the same time and each rolled up our sleeves
My daughter added: my brother is so small, he won't hold a grudge if you hit him

开心一刻

Requirements background

The project is based onDataX To achieve offline synchronization of data between heterogeneous sources, I've done some freshening up and revamping of Datax

Heterogeneous data sources synchronization of data synchronization → datax transformation, a little bit of interest
Heterogeneous data sources synchronization of data synchronization → datax re-engineering, start touching the source code
Synchronization of heterogeneous data sources → DataX usage details
Heterogeneous Data Source Data Synchronization → Encryption and Decryption of DataX Sensitive Information from Source Code Analysis
Heterogenous Data Synchronization → Why should DataX support kafka?
Heterogenous Data Synchronization → How to get the amount of data that DataX has synchronized?

I thought that offline synchronization was over and there would be no new requirements, but the punch in the face came very quickly, and the product manager quickly reached out to me with the following quote

Yesterday I tried the offline sync feature in my test development environment and it did a great job of fulfilling the requirements I mentioned, kudos to you!
However, I encountered a situation during the use of a table with a large amount of data, at first I did not pay attention to the amount of data, so I configured the full amount of synchronization, after the start of the synchronization is not synchronized to complete the delay, I realized that the amount of data in the table is very large, only to find out more than 200 million data, I want to terminate the synchronization, but found that there is no place to terminate operations
That's why we need to add a feature: tasks in synchronization can be terminated.

This tactic is kind of being played by the product manager to understand, first to affirm me, and then point out the pain points in the use of the new features for the pain points, so that I do not have any room to refute; as a reasonable developer, in the face of a very reasonable demand, we are still very happy to accept it, don't you think so?

As soon as the demand picks up, the problem arises

How to terminate synchronization

Before we think about that, let's review the launch of DataX; remember how we integrated DataX.Heterogeneous data sources synchronization of data synchronization → datax re-engineering, start touching the source code The qsl-datax-hook module has been added to the list of modules that can be accessed with the command

Process process = ().exec(realCommand);
The realCommand is the java command that starts DataX, similar to the

java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -=/datax -classpath /datax/lib/*  -mode standalone -jobid -1 -job 

To start DataX, that is, to start a separate java process for DataX; then how to stop DataX, the idea is not there? Is the problem transformed into

How to terminate a java process

Termination of processes

How to terminate a process, this I'm sure you all know

Linux:kill -9 pid
Win: /c taskkill /PID pid /F /T

However, there is a prerequisite for this. You need to know what the DataX java process'spidand in JDK8Process The methodology is as follows

Process方法

There is no way to get the pid, how do we get the pid of a DataX process without adjusting the JDK version, which is different for different operating systems.Linux cap (a poem)Win carry out

  1. Linux

    The implementation is relatively simple and is based on the JDK alone.

    Field field = ().getDeclaredField("pid");
    (true);
    int pid = (process);
    

    Get the member variables of the process implementation through reflectionpid value; this code, you should all be able to read it

  2. Win

    On Win systems, you need to rely on a third-party tooloshi

    <dependency>
        <groupId></groupId>
        <artifactId>oshi-core</artifactId>
        <version>6.6.5</version>
    </dependency>
    

    Getting the pid is accomplished as follows

    Field field = ().getDeclaredField("handle");
    (true);
    long handle = (process);
     winntHandle = new ();
    ((handle));
    int pid = (winntHandle);
    

    Reflection is also used, as well as methods provided by oshi

Combine them together and you get the method to get the pid

/**
 * Getting the processID
 * @param process step
 * @return stepid,-1Indicates a failure to acquire
 * @author greenstone road
 */
public static int getProcessId(Process process) {
    int pid = NULL_PROCESS_ID;
    Field field;
    if (()) {
        try {
            field = ().getDeclaredField("handle");
            (true);
            long handle = (process);
             winntHandle = new ();
            ((handle));
            pid = (winntHandle);
        } catch (Exception e) {
            ("Getting the processidfail (e.g. experiments),Exception information:", e);
        }
    } else if (() || ()) {
        try {
            field = ().getDeclaredField("pid");
            (true);
            pid = (process);
        } catch (Exception e) {
            ("Getting the processidfail (e.g. experiments),Exception information:", e);
        }
    }
    ("stepid={}", pid);
    return pid;
}

Are we getting the right pid, do we need to verify it? Write amainClass

/**
 * mainClass
 * @author greenstone road
 */
public class HookMain {

    public static void main(String[] args) throws Exception {
        String command = "";
        if (()) {
            command = "ping -n 1000 localhost";
        } else if (() || ()) {
            command = "ping -c 1000 localhost";
        }
        Process process = ().exec(command);
        int processId = (process);
        ("ping stepid = " + processId);
        new Thread(() -> {
            try (BufferedReader reader = new BufferedReader(
                    new InputStreamReader((), ("")))) {
                String line;
                while ((line = ()) != null) {
                    (line);
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
}

Using maven to package an executable jar

<build>
    <plugins>
        <plugin>
            <groupId></groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <addClasspath>true</addClasspath>
                        <classpathPrefix>lib/</classpathPrefix>
                        <mainClass></mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
        <plugin>
            <groupId></groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
                <execution>
                    <id>copy-dependencies</id>
                    <phase>package</phase>
                    <goals>
                        <goal>copy-dependencies</goal>
                    </goals>
                    <configuration>
                        <outputDirectory>target/lib</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Then execute the jar

java -jar qsl-datax-hook-0.0.

Let's look at the output

  1. Linux

    The jar output log is as follows

    Linux_输出

    Let's ps the process.

    ps -ef|grep ping
    
    Linux_验证
  2. Win

    The jar output log is as follows

    win_输出

    Let's take a look at the ping process in Task Manager.

    win_验证

As you can see, the pid is correct for both Linux and Win; once you have the pid, terminating the process is easy!

/**
 * Termination of processes
 * @param pid processPID
 * @return true:successes,false:fail (e.g. experiments)
 */
public static boolean killProcessByPid(int pid) {
    if (NULL_PROCESS_ID == pid) {
        ("pid[{}]exceptions", pid);
        return false;
    }
    String command = "kill -9 " + pid;
    boolean result;
    if (()) {
        command = " /c taskkill /PID " + pid + " /F /T ";
    }
    Process process = null;
    try {
        process = ().exec(command);
    } catch (IOException e) {
        ("Termination of processes[pid={}]exceptions:", pid, e);
        return false;
    }
    try (BufferedReader reader = new BufferedReader(new InputStreamReader((), StandardCharsets.UTF_8))) {
        //Kill the process.
        String line;
        while ((line = ()) != null) {
            (line);
        }
        result = true;
    } catch (Exception e) {
        ("Termination of processes[pid={}]exceptions:", pid, e);
        result = false;
    } finally {
        if (!(process)) {
            ();
        }
    }
    return result;
}

The complete process should be

  1. utilization().exec(java command) Start DataX and get theProcess

    java commands refer to the java commands that start DataX, such as

    java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -=/datax -classpath /datax/lib/*  -mode standalone -jobid -1 -job 
    
  2. pass (a bill or inspection etc)ProcessUtil#getProcessId Get the Process'spidand bound to synchronized task information for persistence

    The corresponding pid can be queried from the task id.

  3. trigger a taskterminate (law)to find the corresponding pid via the task id, via theProcessUtil#killProcessByPid Termination of processes

    Terminating the process also terminates the synchronization task

in the event thatqsl-datax-hook is a single node, the above processing solution is not a problem, but the production environment, qsl-datax-hook can not be a single node, must be a cluster deployment, then the above solution will not work, why? Let me give you an example.

Suppose qsl-datax-hook has 2 nodes: A and B. A DataX synchronization task (taskId = 666) is started on node A and gets the corresponding pid = 1488, and the request to terminate task 666 is load balanced to node B. What happens?

  1. If there is no process with pid = 1488 on node B, then the termination fails without affecting nodes A and B.
  2. If there is a process with pid = 1488 on node B, which could be the DataX synchronization task process or some other process, then this termination operation will result in either a minor or major failure!

However, the synchronization task that needs to be terminated is still executing peacefully on node A.

So in cluster mode, we not only need to bind the pid to the task, we also need to bind the information of the node where the task is executed, which can be theNode IDIt can also beNode IPAs long as the node can be uniquely identified on the line; the specific implementation of the program, you need to combine the specific load balancing components to do the design, by the load balancing components will be task termination request distribution to the correct node, and can not use the conventional load balancing strategy for the distribution of the; because of the load balancing components, so the implementation of the program can not be uniformly designed, need to be combined with your own project to achieve, I believe that for you is very I believe it is very simple for you.

你懂我意思吧_懂

summarize

  1. Tasks are started differently and terminated differently, and how to terminate gracefully is a key point to consider
  2. Directly kill the process of the way, simple and brutal, but not elegant enough, once the wrong kill, the problem can be big or small, if there are other ways, do not recommend choosing this way
  3. The termination method that applies to a single node does not necessarily apply to clusters, and you must make all-around considerations when designing a program.
  4. Sample code:qsl-datax-hook