Location>code7788 >text

[Flink/FlinkCDC] Practice Summary: Flink 1.12.6 Upgrade Flink 1.15.4

Popularity:293 ℃/2024-11-06 01:54:53

Flink DataStream/API

Important characteristics that have not changed

Despite the official recommendation to deprecate JDK 8 and use JDK 11+; however: there is continued support for JDK 8

Personal guess: JDK 8 user base is too large, pulling one hair affects the whole body, to prevent too big a step to pull, curbing the momentum of their own projects.

Dependency module changes

Version changes

  • : 1.12.6 => 1.15.4
  • : 1.12.6 => 1.15.4
  • : 1.3.0 => 2.3.0

Flink Cdc : After flink cdc 2.0.0, [groupId, package path] is changed from change into

  • apache flink cdc 1.3.0
<dependency>
	<groupId></groupId>
	<artifactId>flink-connector-mysql-cdc</artifactId>
	<version>1.3.0</version>
</dependency>
  • apache flink cdc 2.3.0
<dependency>
	<groupId></groupId>
	<artifactId>flink-connector-mysql-cdc</artifactId>
	<version>2.3.0</version>
</dependency>
  • For details, see:
  • Flink CDC official website: Flink CDC package && Flink && JDK && MYSQL version comparison - Blogspot/Children's World

The modules get rid of the scala

For details, see:

/apache/flink/blob/release-1.15.4/docs//release-notes/flink-1. [Recommended
/flink/flink-docs-release-1.15/release-notes/flink-1.15/

  • :flink-clients:${}

  • flink-streaming-java:

  • :flink-table-api-java-bridge

:flink-table-api-java-bridge_${}:${}

  • :flink-connector-kafka:${}

  • :flink-runtime-web:${}

  • :flink-statebackend-rocksdb:${}

  • :flink-table-planner:${}

:flink-table-planner-blink_${}:${}

Discontinued support for scala 2.11, but supports 2.12.

= 2.12
flinkversion = 1.15.4

  • :flink-connector-hive_${}:${}

  • :flink-table-api-java-bridge:${}

Compared to flink 1.12.6::flink-table-api-java-bridge_${=2.11}:${=1.12.6}

table-*-blink transfer to full membership : flink-table-planner/runtime-blink => flink-table-planner、flink-table-runtime

  • Starting with Flink 1.15, the distribution includes two planners:
  • flink-table-planner_2.12-${}.jar : in /opt, includes query planner
  • flink-table-planner-loader-${}.jarRecommended] : Default Loading/lib, contains a query planner that is hidden behind the path of the isolated class

Note: These 2 planners (planner_2) cannot exist in the class path at the same time. If they are both loaded into the/libmeter workwill fail and report an errorCould not instantiate the executor. Make sure a planner module is on the classpath

Exception in thread "main" : Could not instantiate the executor. Make sure a planner module is on the classpath
    at (:108)
    at (:100)
    at (:122)
    at (:94)
    at (:15)
Caused by: : Multiple factories for identifier 'default' that implement '' found in the classpath.

Ambiguous factory classes are:



    at (:553)
    at (:105)
    ... 4 more

Process finished with exit code 1
  • After flink version 1.14, previous versions offlink-table-*-blink-* Conversion. So:
  • flink-table-planner-blink => flink-table-planner
  • flink-table-runtime-blink => flink-table-runtime

flink-shaded-guavaModule version changes and package conflict issues

  • If the following error is reported, i.e.: Package conflict due to version difference.

NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/collect/Lists

Reason: flink 1.16, 1.15, 1.12.6 and other versions of flink-shaded-guava are basically not the same, and the versions are not compatible, you need to modify the version of flink-shaded-guava in cdc.

  • Corresponding to different flink versionsflink-shaded-guavaVersion of the module
  • flink 1.12.6 : flink-shaded-guava 18.0-12.0
  • flink 1.15.4 : flink-shaded-guava 30.1.1-jre-15.0
  • flink 1.16.0 : flink-shaded-guava 30.1.1-jre-16.0

  • If there is no active introduction within the project:flink-shaded-guavaEngineering, then there is no need to be concerned about this issue ----flink-core/flink-runtime/flink-clientsThe correct version is introduced by default inside modules such as

flink 1.15.4

flink 1.12.6

MySQL JDBC Version : ≥ 8.0.16 => ≥8.0.27

  • Version based on: Apache Flink CDC website
  • /apache/flink-cdc/tree/release-1.3.0 | ≥8.0.16
  • /apache/flink-cdc/tree/release-2.3.0 | ≥8.0.27

For more information see.Flink CDC official website: Flink CDC MYSQL package && Flink && JDK && MYSQL version comparison - Blogspot/Children's World

In response to reporting errors:Caused by: : (Ljava/lang/String;)Ljava/lang/String;

If MySQL is 8.0, fink cdc 2.1 after bydebeziumProblems caused by connectors.

  • Recommended for uniform use: mysql jdbc 8.0.28
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>8.0.28</version>
</dependency>

Application source code tuning

Flink

KafkaRecordDeserializer : no longer exists/no longer supported (flink 1.13.0 and later) and replace withKafkaDeserializationSchemaKafkaSourceBuilderThe syntax for creating this object changes slightly

  • | flink-connector-kafka_2.11 : 1.12.6
  • flink 1.12.6
    /apache/flink/blob/release-1.12.6/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/
  • flink 1.12.7 : still exists/supports KafkaRecordDeserializer

/apache/flink/blob/release-1.12.7/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/

  • flink 1.13.0 : no longer exists/no longer supports KafkaRecordDeserializer

/apache/flink/tree/release-1.13.0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer

  • flink 14.0

/apache/flink/tree/release-1.14.0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer

  • flink 1.15.4

/apache/flink/tree/release-1.15.4/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/

  • flink-connector-kafka : 3.0.0 | Just understand, no need to be distracted by this project for the time being.

/apache/flink-connector-kafka/blob/v3.0.0/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/

  • Reasons for remodeling, remodeling ideas

As of Apache Flink 1.13.0, theKafkaRecordDeserializerAbandoned and removed.
If you are using an older version of Flink and you see theKafkaRecordDeserializerprompt, you should replace it with a prompt using theKafkaDeserializationSchema[Recommended] orKafkaDeserializer
KafkaDeserializationSchemacompareKafkaRecordDeserializer, 2 more methods that need to be forced to be implemented:

  • boolean isEndOfStream(T var1) : it is fine to return false by default
  • T deserialize(ConsumerRecord<byte[], byte[]> var1) :: The old wayvoid deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)This method is called internally
// flink 1.15.4
//

package ;

import ;
import ;
import ;
import ;
import ;
import ;

@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    default void open( context) throws Exception {
    }

    boolean isEndOfStream(T var1);

    T deserialize(ConsumerRecord<byte[], byte[]> var1) throws Exception;//methodologies1

    default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {//methodologies2
        T deserialized = (message);// reuse/调用的methodologies1
        if (deserialized != null) {
            (deserialized);
        }
    }
}

Therefore, the new adaptation of the newT deserialize(ConsumerRecord<byte[], byte[]> var1)The method is easy:

import ;
import ;
import ;
import ;
import .Tuple2;
import ;
//import ;
import ;
import ;
import ;

//public class MyKafkaRecordDeserializer implements KafkaRecordDeserializer<Tuple2<String, String>> {
public class MyKafkaRecordDeserializer implements KafkaDeserializationSchema<Tuple2<String, String>> {
/* @Override
    public void open( context) throws Exception {
        (context);
    }*/

    @Override
    public boolean isEndOfStream(Tuple2<String, String> stringStringTuple2) {
        return false;
    }

    @Override
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {//Adaptation of new methods1 | obligatory
        if(() == null){
            return new Tuple2<>("null", (()) );
        }
        return new Tuple2<>( new String(() ) , (() ) );
    }

// @Override
// public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<Tuple2<String, String>> collector) throws Exception {//Adaptation of old methods2 | 非obligatory
// (new Tuple2<>(() == null ? "null" : new String(()), (())));
// }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    }
}

The way you use this class, and create objects of this class, has also changed slightly:

//  | flink-connector-kafka:1.15.4
KafkaSourceBuilder<Tuple2<String, String>> kafkaConsumerSourceBuilder = KafkaSource.<Tuple2<String, String>>builder()
	.setTopics(canTopic)
	.setProperties(kafkaConsumerProperties)
	.setClientIdPrefix(Constants.JOB_NAME + "#" + () + "")
	.setDeserializer( (new MyKafkaRecordDeserializer()) ); // flink 1.15.4
	//.setDeserializer(new MyKafkaRecordDeserializer());// flink 1.12.6
  • Recommended Literature
  • Flink 1.14 New KafkaSource and KafkaSink Practical Use (Custom Deserializer, Topic Selector, Serializer, Partitioner) - CSDN
  • /flink/flink-docs-release-1.15/zh/docs/connectors/datastream/kafka/

Flink Cdc : After flink cdc 2.0.0, [groupId, package path] is changed from change into

MySQLSource : Package path adjusted (2.0.0 and later), class name case changed (flink cdc 2.0.0 and later), no longer recommended (flink cdc 2.1.0 and later).

  • | flink cdc 1.3.0

/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/
Package paths have been adjusted and class names have been case-sensitive.

/apache/flink-cdc/blob/release-2.0.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/
Since flink cdc 2.1.0 and laterRecommended for abandonmentneverthelessRecommended for use
/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/
Flink CDC this MySqlSource deprecated, is there another way? - aliyun [Recommended

There are two MysqlSource, one deprecated and the other available, with different package names.The ones under this package are available.

  • | flink cdc 2.3.0

/apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/

serverId : If you select the newMySqlSourceclass, then: its setup entry parameter is slightly changed
  • #serverId() | flink cdc 1.3.0

/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/

  • | flink cdc 2.1.0, 2.3.0 [Recommended].

/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/

hasn'tserverIdmethodologies
/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/
there areserverIdmethod, by means of theMySqlSource.<String>builder()assume (office)MySqlSourceBuilder

/**
 * A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like
 * '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is
 * required when '' enabled. Every ID must be unique across all
 * currently-running database processes in the MySQL cluster. This connector joins the MySQL
 * cluster as another server (with this unique ID) so it can read the binlog. By default, a
 * random number is generated between 5400 and 6400, though we recommend setting an explicit
 * value."
 */
public MySqlSourceBuilder<T> serverId(String serverId) {
	(serverId);
	return this;
}
  • #serverId(int serverId) | flink cdc 2.1.0 [recommended for deprecation], flink cdc 2.3.0 [deprecated/unavailable].

/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/

/**
 * A numeric ID of this database client, which must be unique across all currently-running
 * database processes in the MySQL cluster. This connector joins the MySQL database cluster
 * as another server (with this unique ID) so it can read the binlog. By default, a random
 * number is generated between 5400 and 6400, though we recommend setting an explicit value.
 */
public Builder<T> serverId(int serverId) {
	 = serverId;
	return this;
}
  • Modified Demo: flink 1.3.0
SourceFunction<String> mySqlSource =
	MySqlSource.<String>builder()
	//Database address
	.hostname((""))
	//port number
	.port((("")))
	//user ID
	.username((""))
	//cryptographic
	.password((""))
	//Monitored databases
	.databaseList((""))
	//Table name to monitor,format database.table name
	.tableList((""))
	//Virtualization Approach
	.deserializer(new MySQLCdcMessageDeserializationSchema())
	//time zones
	.serverTimeZone("UTC")
	.serverId( randomServerId(5000, Constants.JOB_NAME + "#xxxConfig") )
	.startupOptions(())
	.build();


public static Integer randomServerId(int interval, String jobCdcConfigDescription){
	//startServerId ∈[ interval + 0, interval + interval)
	//int serverId = (interval) + interval; // (n) : Generate a file between [0,n) Random integers in the interval
	//serverId = [ 7000 + 0, Integer.MAX_VALUE - interval)
	int serverId = (Integer.MAX_VALUE - interval - 7000) + 7000;
	("Success to generate random server id result! serverId : {}, interval : {}, jobCdcConfigDescription : {}"
			, serverId , interval , jobCdcConfigDescription );
	return serverId;
}
  • Modified Demo: flink 2.3.0
MySqlSource<String> mySqlSource =
	MySqlSource.<String>builder()
	//Database address
	.hostname((""))
	//port number
	.port((("")))
	//user ID
	.username((""))
	//cryptographic
	.password((""))
	//Monitored databases
	.databaseList((""))
	//Table name to monitor,format database.table name
	.tableList((""))
	//Virtualization Approach
	.deserializer(new MySQLCdcMessageDeserializationSchema())
	//time zones
	.serverTimeZone("UTC")
	.serverId( randomServerIdRange(5000, Constants.JOB_NAME + "#xxxConfig") )
	.startupOptions(())
	.build();


//New mandatory requirements: interval >= Parallelism of this operator
public static String randomServerIdRange(int interval, String jobCdcConfigDescription){
	// generating1starting random number |
	//startServerId = [interval + 0, interval + interval )
	//int startServerId = (interval) + interval; // (n) : generating介于 [0,n) Random integers in the interval
	//startServerId = [ 7000 + 0, Integer.MAX_VALUE - interval)
	int startServerId = (Integer.MAX_VALUE - interval - 7000) + 7000;

	//endServerId ∈ [startServerId, startServerId + interval];
	int endServerId = startServerId + interval;
	("Success to generate random server id result! startServerId : {},endServerId : {}, interval : {}, jobCdcConfigDescription : {}"
			, startServerId, endServerId , interval , jobCdcConfigDescription );
	return ("%d-%d", startServerId, endServerId);
}
MySQLSourceBuilder#build method: change in return type.SourceFunction/DebeziumSourceFunction<T> => MySqlSource<T>
  • =>
//#build | flink cdc 1.3.0
// come (or go) back:
// public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T>
//public abstract class <OUT> extends AbstractRichFunction implements SourceFunction<OUT>
public DebeziumSourceFunction<T> build() {
	Properties props = new Properties();
	("", ());
	("", "mysql_binlog_source");
	("", (String)());
	("", (String)());
	("", (String)());
	("", ());
	("", (true));
	if ( != null) {
		("", ());
	}
	...
}


//#build | flink cdc 2.3.0
//// come (or go) back:
public MySqlSource<T> build() {
	return new MySqlSource(, (DebeziumDeserializationSchema)());
}
  • Usage Changes Demo: Flink cdc 1.3.0

mysqlSource Want to listen for mysql table structure changes (e.g. adding new fields), what should I do? Settings - aliyun

Properties properties = new Properties();
("", "localhost");
("", "3306");
("", "your_username");
("", "your_password");
("", "1"); // Set the unique server id
("", "mysql_source");

DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
    .hostname("localhost")
    .port(3306)
    .username("your_username")
    .password("your_password")
    .databaseList("your_database")
    .tableList("your_table")
    .includeSchemaChanges(true) // Enable listening for table structure changes
    .deserializer(new StringDebeziumDeserializationSchema())
    .build();

DataStreamSource<String> stream = (sourceFunction);//It is possible to use addSource

();
("MySQL CDC Job");
  • Usage Changes Demo: Flink cdc 2.3.0

/flink-cdc-connectors/release-2.3/content/connectors/mysql-cdc(ZH).html
unavailable(SourceFunction, String sourceName)The only way to use the(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName)

import ;
import ;
import ;
import ;

public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // Setting up the captured database, If you need to synchronize the entire database,please include tableList set to ".*".
        .tableList("") // Setting the captured table
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // commander-in-chief (military) SourceRecord convert to JSON string (computer science)
        .build();

    StreamExecutionEnvironment env = ();

    // set up 3s (used form a nominal expression) checkpoint intervals
    (3000);

    env
      .fromSource(mySqlSource, (), "MySQL Source")
      // set up source 节点(used form a nominal expression)并行度为 4
      .setParallelism(4)
      .print().setParallelism(1); // set up sink The node parallelism is 1

    ("Print MySQL Snapshot + Binlog");
  }
}

StartupOptions : Package paths have been adjusted (2.0.0 and later).

  • import | flink 1.3.0

/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/

  • | flink 2.3.0

/apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/

DebeziumDeserializationSchema : Package paths were adjusted (flink-cdc2.0.0 and later).

  • | flink cdc 1.3.0

:flink-connector-debezium:1.3.0
/apache/flink-cdc/blob/release-1.3.0/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/

  • | flink cdc 2.3.0

:flink-connector-debezium:2.3.0
/apache/flink-cdc/blob/release-2.3.0/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/

FAQ & Upgrade Supplementary Matters

Q: [Apache Flink CDC] FlinkCdcJob starts with an error due to MYSQL serverTimeZone:The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields. |

  • Problem version: flink-cdc >= 2.3.0

Conclusion based on: flink-cdc source code

Recommended Literature

  • [Flink SQL] FlinkSqlCdcJobpriming due toMYSQL serverTimeZoneand report an error:The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. - blogosphere/thousands and thousands of universes

  • /apache/flink-cdc/blob/release-2.4.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/

  • /apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/

  • /apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/

  • /apache/flink-cdc/blob/release-2.0.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/ [Recommended

  • /apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/

  • MySqlValidator#checkTimeZone : Based on the "" configuration item with the mysql databaseserver_time_zoneCompare the time zone offsets with the time zone offsets of the time zone offsets of the time zone offsets.
  • flink-cdc 1.3.0/1.4.0 : neitherMySqlValidatorresemble
  • flink-cdc 2.0.0 : first appearanceMySqlValidatorclass, but nocheckTimeZonemethodologies
  • flink-cdc 2.0.0-2.2.1 : same as flink-cdc 2.0.0
  • flink-cdc 2.3.0 : yesMySqlValidatorcategory, and for the first timecheckTimeZonemethodologies

Description of the problem

  • Key logs
...

$resetToCheckpoint$7(:149) ~[flink-dist-1.15..330.:1.15..330.r34]
	at (:774) ~[?:1.8.0_412]
	at $(:750) ~[?:1.8.0_412]
	at (:488) ~[?:1.8.0_412]
	at (:1975) ~[?:1.8.0_412]
	at $closeAsyncWithTimeout$0(:77) ~[flink-dist-1.15..330.:1.15..330.r34]
	at (:750) [?:1.8.0_412]
2024-10-31 20:43:06,216 ERROR  208 [Thread-16]  - Failed to create Source Enumerator for source Source: xxxDBCConfigCdc-VhrBackendMySQL-Source
: The MySQL server has a timezone offset (28800 seconds ahead of UTC) which does not match the configured timezone UTC. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
	at (:191) ~[flink-sql-connector-mysql-cdc-2.4..330.:2.4..330.r1-SNAPSHOT]
	at (:81) ~[flink-sql-connector-mysql-cdc-2.4..330.:2.4..330.r1-SNAPSHOT]
	at (:170) ~[flink-sql-connector-mysql-cdc-2.4..330.:2.4..330.r1-SNAPSHOT]
	at (:205) ~[flink-dist-1.15..330.:1.15..330.r34]
	at $(:399) ~[flink-dist-1.15..330.:1.15..330.r34]
	at $resetToCheckpoint$7(:149) ~[flink-dist-1.15..330.:1.15..330.r34]
	at (:774) ~[?:1.8.0_412]
	at $(:750) ~[?:1.8.0_412]
	at (:488) ~[?:1.8.0_412]
	at (:1975) ~[?:1.8.0_412]
	at $closeAsyncWithTimeout$0(:77) ~[flink-dist-1.15..330.:1.15..330.r34]
	at (:750) [?:1.8.0_412]
2024-10-31 20:43:06,218 INFO                   320 [-dispatcher-17]  - Trying to recover from a global failure.
: Global failure triggered by OperatorCoordinator for 'Source: xxxDBCConfigCdc-VhrBackendMySQL-Source' (operator feca28aff5a3958840bee985ee7de4d3).
	at $(:556) ~[flink-dist-1.15..330.:1.15..330.r34]
	at $(:236) ~[flink-dist-1.15..330.:1.15..330.r34]
	at 

...

Problem analysis

  • View source: #createConfig
    public MySqlSourceConfig createConfig(int subtaskId) {
        ...
        ("", );//pass (a bill or inspection etc) #serverTimeZone(...)set up
        ...
        if ( != null) {
            ("", );
        }
        if ( != null) {
            ();
        }
        ...
    }
  • Viewing the time zone settings of a database
> show variables like '%time_zone%';
Name         |Value           |
-------------+----------------+
Variable_name|system_time_zone|
Value        |+08             |
  • View timezone settings for flinkCdcJob jobs from Logs

cure

  • step1 Add: DebeziumPropertiesConstants
public class DebeziumPropertiesConstants {
    /** configuration item **/
    public final static String DATABASE_SERVER_TIMEZONE = "";
    /** default value **/
    public final static String DATABASE_SERVER_TIMEZONE_DEFAULT = "UTC";
}
  • step2 Add New:
public class Constants {
    ...
        
    public static class XXXModule {
        public final static String CONFIG_PREFIX = "";

        public final static String DATABASE_SERVER_TIMEZONE = CONFIG_PREFIX + "." + DebeziumPropertiesConstants.DATABASE_SERVER_TIMEZONE;
    }
    
    ...
}
  • step3 Add configuration items
=Asia/Shanghai
  • step3 Adjust MySqlSource

Add the following sample code in thekey linecode line

// public static SourceFunction<String> createxxxModuleCdcSourceFunction(ParameterTool parameterTool){
    public static MySqlSource<String> createxxxModuleCdcSourceFunction(ParameterTool jobParameterTool){
        //conflict with`show variables like '%time_zone%';` center system_time_zone The time zone time offset of the variable is consistent
        String databaseServerTimeZone = ( .DATABASE_SERVER_TIMEZONE , DebeziumPropertiesConstants.DATABASE_SERVER_TIMEZONE_DEFAULT);//key line
        ("{}:{}", .DATABASE_SERVER_TIMEZONE, databaseServerTimeZone);//key line

        //return MySQLSource.<String>builder()
        return MySqlSource.<String>builder()
                //Database address
                .hostname((""))
                //port number
                .port((("")))
                //user ID
                .username((""))
                //cryptographic
                .password((""))
                //Monitored databases
                .databaseList((""))
                //Table name to monitor,format database.table name
                .tableList((""))
                //Virtualization Approach
                .deserializer(new DBCDeserializationSchema())
                //time zones
                .serverTimeZone( databaseServerTimeZone ) // (See source code) be equivalent todebeziumconfigure:  | key line
                .serverId( (5000, Constants.JOB_NAME + "#xxxModule") )
                //.debeziumProperties(debeziumProperties)// key line
                .startupOptions(())
                .build();
    }

X References

  • apache flink cdc
  • /apache/flink-cdc
  • /apache/flink-cdc/blob/master/docs/content/docs/faq/
  • /apache/flink-cdc/tree/master/flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-mysql-cdc
  • :flink-connector-mysql-cdc:1.3.0

/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/ [Recommended] Flink 1.12.6

  • :flink-connector-mysql-cdc:2.0

MYSQL (Database: 5.7, 8. / JDBC Driver: 8.0.16 ) | Flink 1.12 + | JDK 8+
/apache/flink-cdc/tree/release-2.0
/apache/flink-cdc/blob/release-2.0/flink-connector-mysql-cdc/

  • :flink-connector-mysql-cdc:2.3.0

/apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/ [Recommended] Flink 1.15.4

  • :flink-connector-mysql-cdc:${}
  • /flink-cdc-connectors/master/content/ [Repealed]
  • apache flink
  • /apache/flink
  • /apache/flink/blob/release-1.15.4/docs//release-notes/flink-1. [Recommended
  • /flink/flink-docs-release-1.15/release-notes/flink-1.15/ [Recommended
  • /2023/03/15/apache-flink-1.15.4-release-announcement/ [Recommended
  • /flink/flink-docs-release-1.15/zh/docs/ops/upgrading/
  • /apache/flink/blob/release-1.12.6/docs/release-notes/flink-1.
  • apache flink-connector-kafka
  • /apache/flink-connector-kafka
  • testimonials
  • Flink version 1.13.1 upgraded to 1.15.0 - CSDN [Recommended
  • (C) Flink 1.15 release notes for the latest version - CSDN
  • Summary of dependency issues for Flink+Flink CDC version upgrades - CSDN
  • Apache Flink 1.15 Release Notes - Zhihu/Flink Chinese Community
  • (sth. or sb) else
  • FlinkSql source code debugging environment & flink-table code structure - Zhihu [Not recommended]
  • Finally, someone explains addSource, fromSource in Flink - CSDN
  • [FlinkConsumer to KafkaSource of Flink Consumer Kafka] - CSDN
  • Flink Kafka-Source - CSDN