Flink CDC Issue Import created FLINK-34772: ----------------------------------------------
Summary: [Bug] [Critical] cdc-myql-connector snapshot/binlog phase MySQL -> MySQL temporal related field transfer WRONG with/or wrong epoch time across timezone Key: FLINK-34772 URL: https://issues.apache.org/jira/browse/FLINK-34772 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Flink CDC Issue Import ### Search before asking - [X] I searched in the [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found nothing similar. ### Flink version 1.14.3 ### Flink CDC version 2.3.0 ### Database and its version mysql 8.0.23 ### Minimal reproduce step source db DDL: ``` CREATE TABLE `t_cdc_bench` ( `order_id` bigint NOT NULL, `order_date` date DEFAULT NULL, `order_time` datetime DEFAULT NULL, `dt_3` datetime(3) DEFAULT CURRENT_TIMESTAMP(3), `ts_3` timestamp(3) NULL DEFAULT CURRENT_TIMESTAMP(3), `quantity` int DEFAULT NULL, `product_id` int DEFAULT NULL, `purchaser` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL, `acct` varchar(3) COLLATE utf8mb4_bin DEFAULT '911', PRIMARY KEY (`order_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; ``` sink db DDL: ``` CREATE TABLE `t_cdc_bench_sink` ( `order_id` bigint NOT NULL, `order_date` date DEFAULT NULL, `order_time` datetime DEFAULT NULL, `dt_3` datetime(3) DEFAULT NULL, `ts_3` timestamp(3) NULL DEFAULT NULL, `quantity` int DEFAULT NULL, `product_id` int DEFAULT NULL, `purchaser` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL, PRIMARY KEY (`order_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; ``` source DML: ``` insert into t_cdc_bench values(1001, curdate(), now(), current_timestamp(3), current_timestamp(3), 1, 33, 'scala', 911]; ``` run table cdc process: ``` public class MySqlToJdbcTableCDC { public static void main(String[] args) throws Exception{ JSONObject json = JSONObject.parseObject("{\"logConfDirection\": \"\\/Users\\/nirvana.xu\\/IdeaProjects\\/Prod\\/realtime-ingestion\\/flink-cdc-jobs\\/src\\/main\\/resources\\/log4j2.xml\" }"); SourceConfigUtil.doSetupLogConfigDir(json, MySqlToJdbcTableCDC.class); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); env.setParallelism(1); // note: 增量同步需开启 cpkt env.enableCheckpointing(3000); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings); tableEnvironment.executeSql(" CREATE TABLE demoOrders (\n" + " `order_id` INTEGER ,\n" + " `order_date` DATE ,\n" + " `order_time` TIMESTAMP(3),\n" + " `dt_3` TIMESTAMP(3),\n" + " `ts_3` TIMESTAMP(3),\n" + " `quantity` INT ,\n" + " `product_id` INT ,\n" + " `purchaser` STRING,\n" + " primary key(order_id) NOT ENFORCED" + " ) WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '10.162.34.110',\n" + " 'port' = '6606',\n" + " 'username' = 'root',\n" + " 'password' = 'root',\n" + " 'database-name' = 'testdb',\n" + " 'table-name' = 't_cdc_bench', " + " 'server-time-zone' = 'UTC+7', " + // 全量 + 增量同步 " 'scan.startup.mode' = 'initial' " + " )"); tableEnvironment.executeSql("CREATE TABLE sink (\n" + " `order_id` INTEGER ,\n" + " `order_date` DATE ,\n" + " `order_time` TIMESTAMP(3),\n" + " `dt_3` TIMESTAMP(3),\n" + " `ts_3` TIMESTAMP(3),\n" + " `quantity` INT ,\n" + " `product_id` INT ,\n" + " `purchaser` STRING,\n" + " primary key (order_id) NOT ENFORCED " + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://localhost:3306/testdb?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&GMT%2b8',\n" + " 'username' = 'root',\n" + " 'password' = 'root',\n" + " 'table-name' = 't_cdc_bench_sink',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'sink.buffer-flush.interval' = '3s',\n" + " 'sink.buffer-flush.max-rows' = '1',\n" + " 'sink.max-retries' = '5') "); tableEnvironment.executeSql("insert into sink select * from demoOrders"); } } ``` ### What did you expect to see? Presumably, after cdc change to binlog phase, we are expected to see : **source**: ``` select * from t_cdc_bench where order_id = 1001 limit 1\G ``` *************************** 1. row *************************** order_id: 1001 order_date: 2023-02-10 order_time: 2023-02-10 17:15:33 dt_3: 2023-02-10 17:15:33.312 ts_3: 2023-02-10 17:15:33.312 quantity: 1 product_id: 33 purchaser: scala acct: 911 **sink**: ``` select * from t_cdc_bench_sink where order_id = 1001 limit 1\G ``` *************************** 1. row *************************** order_id: 1001 order_date: 2023-02-10 order_time: 2023-02-10 18:15:33 dt_3: 2023-02-10 18:15:33.312 ts_3: 2023-02-10 18:15:33.312 quantity: 1 product_id: 33 purchaser: scala _**the datetime and timestamp field correctly converted by pre-settings of source/sink server timezone**_ ### What did you see instead? Actually, after cdc shift to binlog phase, query source/sink table result in snapshot data: **source**: ``` select * from t_cdc_bench where order_id = 1001 limit 1\G ``` *************************** 1. row *************************** order_id: 1001 order_date: 2023-02-10 order_time: 2023-02-10 17:15:33 dt_3: 2023-02-10 17:15:33.312 ts_3: 2023-02-10 17:15:33.312 quantity: 1 product_id: 33 purchaser: scala acct: 911 **sink**: ``` select * from t_cdc_bench_sink where order_id = 1001 limit 1\G ``` *************************** 1. row *************************** order_id: 1001 order_date: 2023-02-10 order_time: 2023-02-10 17:15:33 dt_3: 2023-02-10 17:15:33.312 ts_3: 2023-02-11 00:15:33.312 quantity: 1 product_id: 33 purchaser: scala **_Not only the LTZ is not correctly converted, the dt_3 and ts_3 field changed at same time are contradict to each other_** ### Anything else? At start, We find this issue from internal version of cdc 2.3-SNAPSHOT with stream API with our Production realtime computing services we got from cdc source (the epoch time of source MySQL binlog datetime type transferred wrong) , and then fixed by temporal related deserializer remedy, then I was wondering is it a fixed issue with 2.3.0 mysql-cdc tables, but with no luck, it's still a bug Notice when we were updating `ts_3` and `dt_3` field, the contradiction issue is updated, but still, the local timezone conversion is not seen So the main issues are : 1. For the historical data that are scanned only in snapshot state, the `ts_3` and `dt_3` field are definitely WRONG if the data not updated; 2. For the source and sink end across regions, the temporal fields cannot be correctly converted. ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! ---------------- Imported from GitHub ---------------- Url: https://github.com/apache/flink-cdc/issues/1906 Created by: [Capri0110|https://github.com/Capri0110] Labels: bug, Created at: Fri Feb 10 18:35:08 CST 2023 State: open -- This message was sent by Atlassian Jira (v8.20.10#820010)