Location>code7788 >text

Memory overflow when synchronizing MySQL data with flink

Popularity:924 ℃/2024-10-17 14:18:44

Memory overflow when synchronizing MySQL data with flink

Background: need to 1000w of a certain type of data synchronization to other data sources inside, using the company's big data platform can be quickly processed, and the use of memory is only a very, very small amount (the company's big data platform is the underlying flink, but the connector used chunjun open source products), because I personally want to use the flink native connector to try, so the simulated 1000w of data, and then started the flink single node, through the flinksql way to submit the synchronization task, the final result of memory overflow !

The following problem occurs when using a MySQL data source, other data sources may not have this problem

Here is the flink code written inside the main method

import ;
import ;
import ;
import ;
import ;
import ;
import org.;

import ;

public class Main2 {

    static {
        LoggerContext loggerContext = (LoggerContext) ();
        List<Logger> loggerList = ();
        (logger -> {
            ();
        });
    }

    public static void main(String[] args) throws Exception {


        Configuration configuration = new Configuration();

        StreamExecutionEnvironment streamExecutionEnvironment = (configuration);
        (1);

        StreamTableEnvironment streamTableEnvironment = (streamExecutionEnvironment);

        // Definition Objectives Table
        ("CREATE TABLE `gsq_hsjcxx_pre_copy1` (\n" +
                " `reportid` BIGINT COMMENT 'reportid',\n" +
                " `sfzh` VARCHAR COMMENT 'sfzh',\n" +
                " `cjddh` VARCHAR COMMENT 'cjddh',\n" +
                " `cjsj` VARCHAR COMMENT 'cjsj',\n" +
                " PRIMARY KEY (`reportid`) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = 'jdbc',\n" +
                " 'url' = 'jdbc:mysql://127.0.0.1:3306/xxx?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&',\n" +
                " 'table-name' = 'xxx',\n" +
                " 'username' = 'xxx',\n" +
                " 'password' = 'xxx',\n" +
                " '-rows' = '1024'\n" +
                ")");

        // Defining the Source Table
        ("CREATE TABLE `gsq_hsjcxx_pre` (\n" +
                " `reportid` BIGINT COMMENT 'reportid',\n" +
                " `sfzh` VARCHAR COMMENT 'sfzh',\n" +
                " `cjddh` VARCHAR COMMENT 'cjddh',\n" +
                " `cjsj` VARCHAR COMMENT 'cjsj'\n" +
                ") WITH (\n" +
                " 'connector' = 'jdbc',\n" +
                " 'url' = 'jdbc:mysql://127.0.0.1:3306/xxx?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',\n" +
                " 'table-name' = 'xxx',\n" +
                " 'username' = 'xxx',\n" +
                " 'password' = 'xxx',\n" +
                " '-size' = '1024'\n" +
                ")");

        // Insert the source table data into the target table
        ("INSERT INTO `gsq_hsjcxx_pre_copy1` (`reportid`,\n" +
                " `sfzh`,\n" +
                " `cjddh`,\n" +
                " `cjsj`)\n" +
                "(SELECT `reportid`,\n" +
                " `sfzh`,\n" +
                " `cjddh`,\n" +
                " `cjsj`\n" +
                " FROM `gsq_hsjcxx_pre`)");


        ();
    }
}

The above is a simple example that defines three sql statements, first defining two data sources and then a query-insert operation, which will start executing flinksql after running.
If you specify jvm's memory size as -Xms512m -Xmx1g at startup, you'll find that it won't start at all, and just oomps.
If you don't specify jvm memory, the program can start and the memory usage will slowly rise, even to nearly 4G of memory, which is directly oom if you run it on a flink cluster.
First of all, flink read data process, flink read data is read in batches, it is impossible to read all the data at once, but through the phenomenon is flink read data, all the data are in memory, this phenomenon is unreasonable.

Analyzing the source code

Analyze how the code goes through debug mode, after some debugging found the following code

public void openInputFormat() {
        try {
            Connection dbConn = ();
            if ( != null) {
                ();
            }

             = (, , );
            if ( == -2147483648 ||  > 0) {
                ();
            }

        } catch (SQLException var2) {
            throw new IllegalArgumentException("open() failed." + (), var2);
        } catch (ClassNotFoundException var3) {
            throw new IllegalArgumentException("JDBC-Class not found. - " + (), var3);
        }
    }

First of all, flink is how is if the batch pull data, flink is the use of the cursor to pull data in batches, then this time to determine whether the cursor is actually used.

So, I wrote a native JDBC program to read the data (without limiting jvm memory)

import ;
import ;
import ;
import ;

public class Main3 {
    public static void main(String[] args) {
        Connection connection = null;
        Runtime runtime = ();
        ("Total memory before startup>%s Free memory before use>%s Maximum memory before use%s%n", () / 1024 / 1024, () / 1024 / 1024, () / 1024 / 1024);

        try {
            int i = 0;
            connection = ("jdbc:mysql://127.0.0.1:3306/xxx?useSSL=false&useInformationSchema=true&nullCatalogMeansCurrent=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useCursorFetch=true", "xxx", "xxx");
            (false);
            PreparedStatement preparedStatement = ("SELECT `reportid`,\n" +
                    " `sfzh`,\n" +
                    " `cjddh`,\n" +
                    " `cjsj`\n" +
                    " FROM `gsq_hsjcxx_pre`", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            // Amount of data pulled per batch
            (1024);
            ResultSet resultSet = ();
            while (()) {
                i++;
            }
            ("Total memory before startup>%s Free memory before use>%s Maximum memory before use%s%n", () / 1024 / 1024, () / 1024 / 1024, () / 1024 / 1024);
            ("data volume> " + i);
        } catch (Exception e) {
            ();
        } finally {
            if (connection != null) {
                try {
                    ();
                } catch (Exception e) {
                    ();
                }
            }
        }
    }
}

The final printout is

Obviously, the data is all read out, this time you need to confirm that the program is not really using the cursor, after some checking found that you need to be in the parameters of the jdbc inside the addition of &useCursorFetch=true, in order to make the cursor effective!
After modifying the jdbc parameters, the problem gets a complete ending

Other than that I've used apahce's seatunnel, this is really fast when synchronizing data, ridiculously fast. However, you may miss some jdbc related parameters when using it (MySQL for example)
"rewriteBatchedStatements" : "true" This batch of parameters apache seatunnel will not be automatically added, you need to manually add, otherwise the data is inserted one by one, this pit I also stepped on the