Flink DataStream/API
Important characteristics that have not changed
Despite the official recommendation to deprecate JDK 8 and use JDK 11+; however: there is continued support for JDK 8
Personal guess: JDK 8 user base is too large, pulling one hair affects the whole body, to prevent too big a step to pull, curbing the momentum of their own projects.
Dependency module changes
Version changes
- : 1.12.6 => 1.15.4
- : 1.12.6 => 1.15.4
- : 1.3.0 => 2.3.0
Flink Cdc : After flink cdc 2.0.0, [groupId, package path] is changed from
change into
- apache flink cdc 1.3.0
<dependency>
<groupId></groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>
- apache flink cdc 2.3.0
<dependency>
<groupId></groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
- For details, see:
- Flink CDC official website: Flink CDC package && Flink && JDK && MYSQL version comparison - Blogspot/Children's World
The modules get rid of the scala
For details, see:
/apache/flink/blob/release-1.15.4/docs//release-notes/flink-1. [Recommended
/flink/flink-docs-release-1.15/release-notes/flink-1.15/
-
:flink-clients:${}
-
flink-streaming-java:
-
:flink-table-api-java-bridge
:flink-table-api-java-bridge_${}:${}
-
:flink-connector-kafka:${}
-
:flink-runtime-web:${}
-
:flink-statebackend-rocksdb:${}
-
:flink-table-planner:${}
:flink-table-planner-blink_${}:${}
Discontinued support for scala 2.11, but supports 2.12.
= 2.12
flinkversion = 1.15.4
-
:flink-connector-hive_${}:${}
-
:flink-table-api-java-bridge:${}
Compared to flink 1.12.6:
:flink-table-api-java-bridge_${=2.11}:${=1.12.6}
table-*-blink
transfer to full membership : flink-table-planner/runtime-blink => flink-table-planner、flink-table-runtime
- Starting with Flink 1.15, the distribution includes two planners:
flink-table-planner_2.12-${}.jar
: in /opt, includes query plannerflink-table-planner-loader-${}.jar
Recommended] : Default Loading/lib
, contains a query planner that is hidden behind the path of the isolated class
Note: These 2 planners (planner_2) cannot exist in the class path at the same time. If they are both loaded into the
/lib
meter workwill fail and report an errorCould not instantiate the executor. Make sure a planner module is on the classpath
。
Exception in thread "main" : Could not instantiate the executor. Make sure a planner module is on the classpath
at (:108)
at (:100)
at (:122)
at (:94)
at (:15)
Caused by: : Multiple factories for identifier 'default' that implement '' found in the classpath.
Ambiguous factory classes are:
at (:553)
at (:105)
... 4 more
Process finished with exit code 1
- After flink version 1.14, previous versions of
flink-table-*-blink-*
Conversion. So:
flink-table-planner-blink
=>flink-table-planner
flink-table-runtime-blink
=>flink-table-runtime
flink-shaded-guava
Module version changes and package conflict issues
- If the following error is reported, i.e.: Package conflict due to version difference.
NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/collect/Lists
Reason: flink 1.16, 1.15, 1.12.6 and other versions of flink-shaded-guava are basically not the same, and the versions are not compatible, you need to modify the version of flink-shaded-guava in cdc.
- Corresponding to different flink versions
flink-shaded-guava
Version of the module
- flink 1.12.6 : flink-shaded-guava
18.0-12.0
- flink 1.15.4 : flink-shaded-guava
30.1.1-jre-15.0
- flink 1.16.0 : flink-shaded-guava
30.1.1-jre-16.0
- If there is no active introduction within the project
:flink-shaded-guava
Engineering, then there is no need to be concerned about this issue ----flink-core
/flink-runtime
/flink-clients
The correct version is introduced by default inside modules such as
flink 1.15.4
flink 1.12.6
MySQL JDBC Version : ≥ 8.0.16
=> ≥8.0.27
- Version based on: Apache Flink CDC website
- /apache/flink-cdc/tree/release-1.3.0 |
≥8.0.16
- /apache/flink-cdc/tree/release-2.3.0 |
≥8.0.27
For more information see.Flink CDC official website: Flink CDC MYSQL package && Flink && JDK && MYSQL version comparison - Blogspot/Children's World
In response to reporting errors:
Caused by: : (Ljava/lang/String;)Ljava/lang/String;
If MySQL is 8.0, fink cdc 2.1 after by
debezium
Problems caused by connectors.
- Recommended for uniform use: mysql jdbc 8.0.28
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
Application source code tuning
Flink
KafkaRecordDeserializer : no longer exists/no longer supported (flink 1.13.0 and later) and replace withKafkaDeserializationSchema
,KafkaSourceBuilder
The syntax for creating this object changes slightly
-
| flink-connector-kafka_2.11 : 1.12.6
- flink 1.12.6
/apache/flink/blob/release-1.12.6/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/- flink 1.12.7 : still exists/supports KafkaRecordDeserializer
/apache/flink/blob/release-1.12.7/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/
- flink 1.13.0 : no longer exists/no longer supports KafkaRecordDeserializer
/apache/flink/tree/release-1.13.0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer
- flink 14.0
/apache/flink/tree/release-1.14.0/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer
- flink 1.15.4
/apache/flink/tree/release-1.15.4/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/
- flink-connector-kafka : 3.0.0 | Just understand, no need to be distracted by this project for the time being.
/apache/flink-connector-kafka/blob/v3.0.0/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/
- Reasons for remodeling, remodeling ideas
As of Apache Flink 1.13.0, the
KafkaRecordDeserializer
Abandoned and removed.
If you are using an older version of Flink and you see theKafkaRecordDeserializer
prompt, you should replace it with a prompt using theKafkaDeserializationSchema
[Recommended] orKafkaDeserializer
。KafkaDeserializationSchema
compareKafkaRecordDeserializer
, 2 more methods that need to be forced to be implemented:
boolean isEndOfStream(T var1)
: it is fine to return false by defaultT deserialize(ConsumerRecord<byte[], byte[]> var1)
:: The old wayvoid deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
This method is called internally
// flink 1.15.4
//
package ;
import ;
import ;
import ;
import ;
import ;
import ;
@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
default void open( context) throws Exception {
}
boolean isEndOfStream(T var1);
T deserialize(ConsumerRecord<byte[], byte[]> var1) throws Exception;//methodologies1
default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {//methodologies2
T deserialized = (message);// reuse/调用的methodologies1
if (deserialized != null) {
(deserialized);
}
}
}
Therefore, the new adaptation of the new
T deserialize(ConsumerRecord<byte[], byte[]> var1)
The method is easy:
import ;
import ;
import ;
import ;
import .Tuple2;
import ;
//import ;
import ;
import ;
import ;
//public class MyKafkaRecordDeserializer implements KafkaRecordDeserializer<Tuple2<String, String>> {
public class MyKafkaRecordDeserializer implements KafkaDeserializationSchema<Tuple2<String, String>> {
/* @Override
public void open( context) throws Exception {
(context);
}*/
@Override
public boolean isEndOfStream(Tuple2<String, String> stringStringTuple2) {
return false;
}
@Override
public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {//Adaptation of new methods1 | obligatory
if(() == null){
return new Tuple2<>("null", (()) );
}
return new Tuple2<>( new String(() ) , (() ) );
}
// @Override
// public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<Tuple2<String, String>> collector) throws Exception {//Adaptation of old methods2 | 非obligatory
// (new Tuple2<>(() == null ? "null" : new String(()), (())));
// }
@Override
public TypeInformation<Tuple2<String, String>> getProducedType() {
return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
}
}
The way you use this class, and create objects of this class, has also changed slightly:
// | flink-connector-kafka:1.15.4
KafkaSourceBuilder<Tuple2<String, String>> kafkaConsumerSourceBuilder = KafkaSource.<Tuple2<String, String>>builder()
.setTopics(canTopic)
.setProperties(kafkaConsumerProperties)
.setClientIdPrefix(Constants.JOB_NAME + "#" + () + "")
.setDeserializer( (new MyKafkaRecordDeserializer()) ); // flink 1.15.4
//.setDeserializer(new MyKafkaRecordDeserializer());// flink 1.12.6
- Recommended Literature
- Flink 1.14 New KafkaSource and KafkaSink Practical Use (Custom Deserializer, Topic Selector, Serializer, Partitioner) - CSDN
- /flink/flink-docs-release-1.15/zh/docs/connectors/datastream/kafka/
Flink Cdc : After flink cdc 2.0.0, [groupId, package path] is changed from
change into
MySQLSource : Package path adjusted (2.0.0 and later), class name case changed (flink cdc 2.0.0 and later), no longer recommended (flink cdc 2.1.0 and later).
-
| flink cdc 1.3.0
/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/
Package paths have been adjusted and class names have been case-sensitive./apache/flink-cdc/blob/release-2.0.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/
Since flink cdc 2.1.0 and laterRecommended for abandonmentnevertheless
Recommended for use
/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/
Flink CDC this MySqlSource deprecated, is there another way? - aliyun [RecommendedThere are two MysqlSource, one deprecated and the other available, with different package names.
The ones under this package are available.
| flink cdc 2.3.0
/apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/
serverId : If you select the newMySqlSource
class, then: its setup entry parameter is slightly changed
-
#serverId()
| flink cdc 1.3.0
/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/
-
| flink cdc 2.1.0, 2.3.0 [Recommended].
/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/
hasn't
serverId
methodologies
/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/
there areserverId
method, by means of theMySqlSource.<String>builder()
assume (office)MySqlSourceBuilder
/**
* A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like
* '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is
* required when '' enabled. Every ID must be unique across all
* currently-running database processes in the MySQL cluster. This connector joins the MySQL
* cluster as another server (with this unique ID) so it can read the binlog. By default, a
* random number is generated between 5400 and 6400, though we recommend setting an explicit
* value."
*/
public MySqlSourceBuilder<T> serverId(String serverId) {
(serverId);
return this;
}
-
#serverId(int serverId)
| flink cdc 2.1.0 [recommended for deprecation], flink cdc 2.3.0 [deprecated/unavailable].
/apache/flink-cdc/blob/release-2.1.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/
/**
* A numeric ID of this database client, which must be unique across all currently-running
* database processes in the MySQL cluster. This connector joins the MySQL database cluster
* as another server (with this unique ID) so it can read the binlog. By default, a random
* number is generated between 5400 and 6400, though we recommend setting an explicit value.
*/
public Builder<T> serverId(int serverId) {
= serverId;
return this;
}
- Modified Demo: flink 1.3.0
SourceFunction<String> mySqlSource =
MySqlSource.<String>builder()
//Database address
.hostname((""))
//port number
.port((("")))
//user ID
.username((""))
//cryptographic
.password((""))
//Monitored databases
.databaseList((""))
//Table name to monitor,format database.table name
.tableList((""))
//Virtualization Approach
.deserializer(new MySQLCdcMessageDeserializationSchema())
//time zones
.serverTimeZone("UTC")
.serverId( randomServerId(5000, Constants.JOB_NAME + "#xxxConfig") )
.startupOptions(())
.build();
public static Integer randomServerId(int interval, String jobCdcConfigDescription){
//startServerId ∈[ interval + 0, interval + interval)
//int serverId = (interval) + interval; // (n) : Generate a file between [0,n) Random integers in the interval
//serverId = [ 7000 + 0, Integer.MAX_VALUE - interval)
int serverId = (Integer.MAX_VALUE - interval - 7000) + 7000;
("Success to generate random server id result! serverId : {}, interval : {}, jobCdcConfigDescription : {}"
, serverId , interval , jobCdcConfigDescription );
return serverId;
}
- Modified Demo: flink 2.3.0
MySqlSource<String> mySqlSource =
MySqlSource.<String>builder()
//Database address
.hostname((""))
//port number
.port((("")))
//user ID
.username((""))
//cryptographic
.password((""))
//Monitored databases
.databaseList((""))
//Table name to monitor,format database.table name
.tableList((""))
//Virtualization Approach
.deserializer(new MySQLCdcMessageDeserializationSchema())
//time zones
.serverTimeZone("UTC")
.serverId( randomServerIdRange(5000, Constants.JOB_NAME + "#xxxConfig") )
.startupOptions(())
.build();
//New mandatory requirements: interval >= Parallelism of this operator
public static String randomServerIdRange(int interval, String jobCdcConfigDescription){
// generating1starting random number |
//startServerId = [interval + 0, interval + interval )
//int startServerId = (interval) + interval; // (n) : generating介于 [0,n) Random integers in the interval
//startServerId = [ 7000 + 0, Integer.MAX_VALUE - interval)
int startServerId = (Integer.MAX_VALUE - interval - 7000) + 7000;
//endServerId ∈ [startServerId, startServerId + interval];
int endServerId = startServerId + interval;
("Success to generate random server id result! startServerId : {},endServerId : {}, interval : {}, jobCdcConfigDescription : {}"
, startServerId, endServerId , interval , jobCdcConfigDescription );
return ("%d-%d", startServerId, endServerId);
}
MySQLSourceBuilder#build method: change in return type.SourceFunction/DebeziumSourceFunction<T>
=> MySqlSource<T>
-
=>
//#build | flink cdc 1.3.0
// come (or go) back:
// public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T>
//public abstract class <OUT> extends AbstractRichFunction implements SourceFunction<OUT>
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
("", ());
("", "mysql_binlog_source");
("", (String)());
("", (String)());
("", (String)());
("", ());
("", (true));
if ( != null) {
("", ());
}
...
}
//#build | flink cdc 2.3.0
//// come (or go) back:
public MySqlSource<T> build() {
return new MySqlSource(, (DebeziumDeserializationSchema)());
}
- Usage Changes Demo: Flink cdc 1.3.0
mysqlSource Want to listen for mysql table structure changes (e.g. adding new fields), what should I do? Settings - aliyun
Properties properties = new Properties();
("", "localhost");
("", "3306");
("", "your_username");
("", "your_password");
("", "1"); // Set the unique server id
("", "mysql_source");
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("your_username")
.password("your_password")
.databaseList("your_database")
.tableList("your_table")
.includeSchemaChanges(true) // Enable listening for table structure changes
.deserializer(new StringDebeziumDeserializationSchema())
.build();
DataStreamSource<String> stream = (sourceFunction);//It is possible to use addSource
();
("MySQL CDC Job");
- Usage Changes Demo: Flink cdc 2.3.0
/flink-cdc-connectors/release-2.3/content/connectors/mysql-cdc(ZH).html
unavailable(SourceFunction, String sourceName)
The only way to use the(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName)
import ;
import ;
import ;
import ;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // Setting up the captured database, If you need to synchronize the entire database,please include tableList set to ".*".
.tableList("") // Setting the captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // commander-in-chief (military) SourceRecord convert to JSON string (computer science)
.build();
StreamExecutionEnvironment env = ();
// set up 3s (used form a nominal expression) checkpoint intervals
(3000);
env
.fromSource(mySqlSource, (), "MySQL Source")
// set up source 节点(used form a nominal expression)并行度为 4
.setParallelism(4)
.print().setParallelism(1); // set up sink The node parallelism is 1
("Print MySQL Snapshot + Binlog");
}
}
StartupOptions : Package paths have been adjusted (2.0.0 and later).
-
import
| flink 1.3.0
/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/table/
| flink 2.3.0
/apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/
DebeziumDeserializationSchema : Package paths were adjusted (flink-cdc2.0.0 and later).
-
| flink cdc 1.3.0
:flink-connector-debezium:1.3.0
/apache/flink-cdc/blob/release-1.3.0/flink-connector-debezium/src/main/java/com/alibaba/ververica/cdc/debezium/
| flink cdc 2.3.0
:flink-connector-debezium:2.3.0
/apache/flink-cdc/blob/release-2.3.0/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/
FAQ & Upgrade Supplementary Matters
Q: [Apache Flink CDC] FlinkCdcJob starts with an error due to MYSQL serverTimeZone:The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
|
- Problem version: flink-cdc >= 2.3.0
Conclusion based on: flink-cdc source code
Recommended Literature
-
[Flink SQL] FlinkSqlCdcJobpriming due toMYSQL serverTimeZoneand report an error:The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. - blogosphere/thousands and thousands of universes
-
/apache/flink-cdc/blob/release-2.4.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/
-
/apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/
-
/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/src/main/java/com/alibaba/ververica/cdc/connectors/mysql/
-
/apache/flink-cdc/blob/release-2.0.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/ [Recommended
-
/apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/
MySqlValidator#checkTimeZone
: Based on the "" configuration item with the mysql databaseserver_time_zone
Compare the time zone offsets with the time zone offsets of the time zone offsets of the time zone offsets.- flink-cdc 1.3.0/1.4.0 : neither
MySqlValidator
resemble- flink-cdc 2.0.0 : first appearance
MySqlValidator
class, but nocheckTimeZone
methodologies- flink-cdc 2.0.0-2.2.1 : same as flink-cdc 2.0.0
- flink-cdc 2.3.0 : yes
MySqlValidator
category, and for the first timecheckTimeZone
methodologies
Description of the problem
- Key logs
...
$resetToCheckpoint$7(:149) ~[flink-dist-1.15..330.:1.15..330.r34]
at (:774) ~[?:1.8.0_412]
at $(:750) ~[?:1.8.0_412]
at (:488) ~[?:1.8.0_412]
at (:1975) ~[?:1.8.0_412]
at $closeAsyncWithTimeout$0(:77) ~[flink-dist-1.15..330.:1.15..330.r34]
at (:750) [?:1.8.0_412]
2024-10-31 20:43:06,216 ERROR 208 [Thread-16] - Failed to create Source Enumerator for source Source: xxxDBCConfigCdc-VhrBackendMySQL-Source
: The MySQL server has a timezone offset (28800 seconds ahead of UTC) which does not match the configured timezone UTC. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
at (:191) ~[flink-sql-connector-mysql-cdc-2.4..330.:2.4..330.r1-SNAPSHOT]
at (:81) ~[flink-sql-connector-mysql-cdc-2.4..330.:2.4..330.r1-SNAPSHOT]
at (:170) ~[flink-sql-connector-mysql-cdc-2.4..330.:2.4..330.r1-SNAPSHOT]
at (:205) ~[flink-dist-1.15..330.:1.15..330.r34]
at $(:399) ~[flink-dist-1.15..330.:1.15..330.r34]
at $resetToCheckpoint$7(:149) ~[flink-dist-1.15..330.:1.15..330.r34]
at (:774) ~[?:1.8.0_412]
at $(:750) ~[?:1.8.0_412]
at (:488) ~[?:1.8.0_412]
at (:1975) ~[?:1.8.0_412]
at $closeAsyncWithTimeout$0(:77) ~[flink-dist-1.15..330.:1.15..330.r34]
at (:750) [?:1.8.0_412]
2024-10-31 20:43:06,218 INFO 320 [-dispatcher-17] - Trying to recover from a global failure.
: Global failure triggered by OperatorCoordinator for 'Source: xxxDBCConfigCdc-VhrBackendMySQL-Source' (operator feca28aff5a3958840bee985ee7de4d3).
at $(:556) ~[flink-dist-1.15..330.:1.15..330.r34]
at $(:236) ~[flink-dist-1.15..330.:1.15..330.r34]
at
...
Problem analysis
- View source: #createConfig
public MySqlSourceConfig createConfig(int subtaskId) {
...
("", );//pass (a bill or inspection etc) #serverTimeZone(...)set up
...
if ( != null) {
("", );
}
if ( != null) {
();
}
...
}
- Viewing the time zone settings of a database
> show variables like '%time_zone%';
Name |Value |
-------------+----------------+
Variable_name|system_time_zone|
Value |+08 |
- View timezone settings for flinkCdcJob jobs from Logs
cure
- step1 Add: DebeziumPropertiesConstants
public class DebeziumPropertiesConstants {
/** configuration item **/
public final static String DATABASE_SERVER_TIMEZONE = "";
/** default value **/
public final static String DATABASE_SERVER_TIMEZONE_DEFAULT = "UTC";
}
- step2 Add New:
public class Constants {
...
public static class XXXModule {
public final static String CONFIG_PREFIX = "";
public final static String DATABASE_SERVER_TIMEZONE = CONFIG_PREFIX + "." + DebeziumPropertiesConstants.DATABASE_SERVER_TIMEZONE;
}
...
}
- step3 Add configuration items
=Asia/Shanghai
- step3 Adjust MySqlSource
Add the following sample code in the
key line
code line
// public static SourceFunction<String> createxxxModuleCdcSourceFunction(ParameterTool parameterTool){
public static MySqlSource<String> createxxxModuleCdcSourceFunction(ParameterTool jobParameterTool){
//conflict with`show variables like '%time_zone%';` center system_time_zone The time zone time offset of the variable is consistent
String databaseServerTimeZone = ( .DATABASE_SERVER_TIMEZONE , DebeziumPropertiesConstants.DATABASE_SERVER_TIMEZONE_DEFAULT);//key line
("{}:{}", .DATABASE_SERVER_TIMEZONE, databaseServerTimeZone);//key line
//return MySQLSource.<String>builder()
return MySqlSource.<String>builder()
//Database address
.hostname((""))
//port number
.port((("")))
//user ID
.username((""))
//cryptographic
.password((""))
//Monitored databases
.databaseList((""))
//Table name to monitor,format database.table name
.tableList((""))
//Virtualization Approach
.deserializer(new DBCDeserializationSchema())
//time zones
.serverTimeZone( databaseServerTimeZone ) // (See source code) be equivalent todebeziumconfigure: | key line
.serverId( (5000, Constants.JOB_NAME + "#xxxModule") )
//.debeziumProperties(debeziumProperties)// key line
.startupOptions(())
.build();
}
X References
- apache flink cdc
- /apache/flink-cdc
- /apache/flink-cdc/blob/master/docs/content/docs/faq/
- /apache/flink-cdc/tree/master/flink-cdc-connect/flink-cdc-source-connectors/flink-sql-connector-mysql-cdc
:flink-connector-mysql-cdc:1.3.0
/apache/flink-cdc/blob/release-1.3.0/flink-connector-mysql-cdc/ [Recommended] Flink 1.12.6
:flink-connector-mysql-cdc:2.0
MYSQL (Database: 5.7, 8. / JDBC Driver: 8.0.16 ) | Flink 1.12 + | JDK 8+
/apache/flink-cdc/tree/release-2.0
/apache/flink-cdc/blob/release-2.0/flink-connector-mysql-cdc/
:flink-connector-mysql-cdc:2.3.0
/apache/flink-cdc/blob/release-2.3.0/flink-connector-mysql-cdc/ [Recommended] Flink 1.15.4
:flink-connector-mysql-cdc:${}
- /flink-cdc-connectors/master/content/ [Repealed]
- apache flink
- /apache/flink
- /apache/flink/blob/release-1.15.4/docs//release-notes/flink-1. [Recommended
- /flink/flink-docs-release-1.15/release-notes/flink-1.15/ [Recommended
- /2023/03/15/apache-flink-1.15.4-release-announcement/ [Recommended
- /flink/flink-docs-release-1.15/zh/docs/ops/upgrading/
- /apache/flink/blob/release-1.12.6/docs/release-notes/flink-1.
- apache flink-connector-kafka
- /apache/flink-connector-kafka
- testimonials
- Flink version 1.13.1 upgraded to 1.15.0 - CSDN [Recommended
- (C) Flink 1.15 release notes for the latest version - CSDN
- Summary of dependency issues for Flink+Flink CDC version upgrades - CSDN
- Apache Flink 1.15 Release Notes - Zhihu/Flink Chinese Community
- (sth. or sb) else
- FlinkSql source code debugging environment & flink-table code structure - Zhihu [Not recommended]
- Finally, someone explains addSource, fromSource in Flink - CSDN
- [FlinkConsumer to KafkaSource of Flink Consumer Kafka] - CSDN
- Flink Kafka-Source - CSDN