Flink CDC Issue Import created FLINK-34772:
----------------------------------------------

             Summary: [Bug] [Critical] cdc-myql-connector snapshot/binlog phase 
MySQL -> MySQL temporal related field transfer WRONG with/or wrong epoch time 
across timezone 
                 Key: FLINK-34772
                 URL: https://issues.apache.org/jira/browse/FLINK-34772
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
nothing similar.


### Flink version

1.14.3

### Flink CDC version

2.3.0

### Database and its version

mysql 8.0.23

### Minimal reproduce step

source db DDL:
```
CREATE TABLE `t_cdc_bench` (
  `order_id` bigint NOT NULL,
  `order_date` date DEFAULT NULL,
  `order_time` datetime DEFAULT NULL,
  `dt_3` datetime(3) DEFAULT CURRENT_TIMESTAMP(3),
  `ts_3` timestamp(3) NULL DEFAULT CURRENT_TIMESTAMP(3),
  `quantity` int DEFAULT NULL,
  `product_id` int DEFAULT NULL,
  `purchaser` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL,
  `acct` varchar(3) COLLATE utf8mb4_bin DEFAULT '911',
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
```
sink db DDL:
```
CREATE TABLE `t_cdc_bench_sink` (
  `order_id` bigint NOT NULL,
  `order_date` date DEFAULT NULL,
  `order_time` datetime DEFAULT NULL,
  `dt_3` datetime(3) DEFAULT NULL,
  `ts_3` timestamp(3) NULL DEFAULT NULL,
  `quantity` int DEFAULT NULL,
  `product_id` int DEFAULT NULL,
  `purchaser` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT 
NULL,
  PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
```
source DML:
```
insert into t_cdc_bench values(1001, curdate(), now(), current_timestamp(3), 
current_timestamp(3), 1, 33, 'scala', 911];
```
run table cdc process:
```
public class MySqlToJdbcTableCDC {
    public static void main(String[] args) throws Exception{
        JSONObject json = JSONObject.parseObject("{\"logConfDirection\": 
\"\\/Users\\/nirvana.xu\\/IdeaProjects\\/Prod\\/realtime-ingestion\\/flink-cdc-jobs\\/src\\/main\\/resources\\/log4j2.xml\"
 }");
        SourceConfigUtil.doSetupLogConfigDir(json, MySqlToJdbcTableCDC.class);
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        env.setParallelism(1);
        // note: 增量同步需开启 cpkt
        env.enableCheckpointing(3000);
        StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env, envSettings);

        tableEnvironment.executeSql(" CREATE TABLE demoOrders (\n" +
                "         `order_id` INTEGER ,\n" +
                "          `order_date` DATE ,\n" +
                "          `order_time` TIMESTAMP(3),\n" +
                "          `dt_3` TIMESTAMP(3),\n" +
                "          `ts_3` TIMESTAMP(3),\n" +
                "          `quantity` INT ,\n" +
                "          `product_id` INT ,\n" +
                "          `purchaser` STRING,\n" +
                "           primary key(order_id)  NOT ENFORCED" +
                "         ) WITH (\n" +
                "          'connector' = 'mysql-cdc',\n" +
                "          'hostname' = '10.162.34.110',\n" +
                "          'port' = '6606',\n" +
                "          'username' = 'root',\n" +
                "          'password' = 'root',\n" +
                "          'database-name' = 'testdb',\n" +
                "          'table-name' = 't_cdc_bench', " +
                "          'server-time-zone' = 'UTC+7', " +
                //  全量 + 增量同步
                "          'scan.startup.mode' = 'initial'      " +
                " )");

        tableEnvironment.executeSql("CREATE TABLE sink (\n" +
                "         `order_id` INTEGER ,\n" +
                "          `order_date` DATE ,\n" +
                "          `order_time` TIMESTAMP(3),\n" +
                "          `dt_3` TIMESTAMP(3),\n" +
                "          `ts_3` TIMESTAMP(3),\n" +
                "          `quantity` INT ,\n" +
                "          `product_id` INT ,\n" +
                "          `purchaser` STRING,\n" +
                "          primary key (order_id)  NOT ENFORCED " +
                ") WITH (\n" +
                " 'connector' = 'jdbc',\n" +
                " 'url' = 
'jdbc:mysql://localhost:3306/testdb?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&GMT%2b8',\n"
 +
                " 'username' = 'root',\n" +
                " 'password' = 'root',\n" +
                " 'table-name' = 't_cdc_bench_sink',\n" +
                " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                " 'sink.buffer-flush.interval' = '3s',\n" +
                " 'sink.buffer-flush.max-rows' = '1',\n" +
                " 'sink.max-retries' = '5') ");

        tableEnvironment.executeSql("insert into sink select * from 
demoOrders");

    }
}
```


### What did you expect to see?

Presumably, after cdc change to binlog phase, we are expected to see :

**source**:
```
select * from t_cdc_bench where order_id = 1001 limit 1\G
```
*************************** 1. row ***************************
  order_id: 1001
order_date: 2023-02-10
order_time: 2023-02-10 17:15:33
      dt_3: 2023-02-10 17:15:33.312
      ts_3: 2023-02-10 17:15:33.312
  quantity: 1
product_id: 33
 purchaser: scala
      acct: 911

**sink**:
```
select * from t_cdc_bench_sink where order_id = 1001 limit 1\G
```
*************************** 1. row ***************************
  order_id: 1001
order_date: 2023-02-10
order_time: 2023-02-10 18:15:33
      dt_3: 2023-02-10 18:15:33.312
      ts_3: 2023-02-10 18:15:33.312
  quantity: 1
product_id: 33
 purchaser: scala


_**the datetime and timestamp field correctly converted by pre-settings of 
source/sink server timezone**_

### What did you see instead?

Actually, after cdc shift to binlog phase, query source/sink table result in 
snapshot data:

**source**:
```
select * from t_cdc_bench where order_id = 1001 limit 1\G
```
*************************** 1. row ***************************
  order_id: 1001
order_date: 2023-02-10
order_time: 2023-02-10 17:15:33
      dt_3: 2023-02-10 17:15:33.312
      ts_3: 2023-02-10 17:15:33.312
  quantity: 1
product_id: 33
 purchaser: scala
      acct: 911

**sink**:
```
select * from t_cdc_bench_sink where order_id = 1001 limit 1\G
```
*************************** 1. row ***************************
  order_id: 1001
order_date: 2023-02-10
order_time: 2023-02-10 17:15:33
      dt_3: 2023-02-10 17:15:33.312
      ts_3: 2023-02-11 00:15:33.312
  quantity: 1
product_id: 33
 purchaser: scala

**_Not only the LTZ is not correctly converted, the dt_3 and ts_3 field changed 
at same time are contradict to each other_**

### Anything else?

At start, We find this issue from internal version of cdc 2.3-SNAPSHOT with 
stream API with our Production realtime computing services we got from cdc 
source (the epoch time of source MySQL binlog datetime type transferred wrong) 
, and then fixed by temporal related deserializer remedy, then I was wondering 
is it a fixed issue with 2.3.0 mysql-cdc tables, but with no luck, it's still a 
bug

Notice when we were updating `ts_3` and `dt_3` field, the contradiction issue 
is updated, but still, the local timezone conversion is not seen

So the main issues are :

1. For the historical data that are scanned only in snapshot state, the  `ts_3` 
and `dt_3` field are definitely WRONG if the data not updated;

2. For the source and sink end across regions, the temporal fields cannot be 
correctly converted.

### Are you willing to submit a PR?

- [X] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/1906
Created by: [Capri0110|https://github.com/Capri0110]
Labels: bug, 
Created at: Fri Feb 10 18:35:08 CST 2023
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to