Flink CDC Issue Import created FLINK-34767: ----------------------------------------------
Summary: flink-sql-connector-mysql-cdc cannot read incremental data in initial mode Key: FLINK-34767 URL: https://issues.apache.org/jira/browse/FLINK-34767 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Flink CDC Issue Import **Describe the bug(Please use English)** A clear and concise description of what the bug is. **Environment :** - Flink version : flink-1.14.2 - Flink CDC version: 2.3.0 - Database and version: mysql 5.7.37 **To Reproduce** Steps to reproduce the behavior: 1. The test data :Synchronize all tables in a database , about 7 tables; 2. The test code : ```java public static MySqlSource<String> getMySqlSource() { return MySqlSource.<String>builder() .hostname(SOURCE_IP) .port(SOURCE_PORT) .databaseList(SOURCE_DB) // set captured database .tableList(SOURCE_TABLS) // set captured table .username(SOURCE_USER) .password(SOURCE_PWD) .serverTimeZone("Asia/Shanghai") //控制MYSQL中的时间戳类型如何转换为字符串 .startupOptions(StartupOptions.initial()) .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); } ``` 3. The error : There is no error message, I opened the debug log of yarn, and found that the program could not reach this step ```java @Override public void addSplits(List<MySqlSplit> splits) { // restore for finishedUnackedSplits List<MySqlSplit> unfinishedSplits = new ArrayList<>(); for (MySqlSplit split : splits) { LOG.info("Add Split: " + split); if (split.isSnapshotSplit()) { MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit(); if (snapshotSplit.isSnapshotReadFinished()) { finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit); } else { unfinishedSplits.add(split); } } else { MySqlBinlogSplit binlogSplit = split.asBinlogSplit(); // the binlog split is suspended if (binlogSplit.isSuspended()) { suspendedBinlogSplit = binlogSplit; } else if (!binlogSplit.isCompletedSplit()) { uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit()); requestBinlogSplitMetaIfNeeded(split.asBinlogSplit()); } else { uncompletedBinlogSplits.remove(split.splitId()); MySqlBinlogSplit mySqlBinlogSplit = discoverTableSchemasForBinlogSplit(split.asBinlogSplit()); unfinishedSplits.add(mySqlBinlogSplit); } } } // notify split enumerator again about the finished unacked snapshot splits reportFinishedSnapshotSplitsIfNeed(); // add all un-finished splits (including binlog split) to SourceReaderBase if (!unfinishedSplits.isEmpty()) { super.addSplits(unfinishedSplits); } } ``` The log of the table DiscoveryUtils discovery table will not appear in the log like a normal program; The default task manager memory of the cluster is 1.7GB. If I increase the memory allocated by yarn to task manager to 4GB, this problem can be solved ---------------- Imported from GitHub ---------------- Url: https://github.com/apache/flink-cdc/issues/1869 Created by: [red-cy|https://github.com/red-cy] Labels: bug, Created at: Wed Jan 11 19:15:30 CST 2023 State: open -- This message was sent by Atlassian Jira (v8.20.10#820010)