[ https://issues.apache.org/jira/browse/FLINK-35600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Di Wu updated FLINK-35600: -------------------------- Description: Assume that the table has been split into 3 Chunks Timeline t1: chunk1 is read t2: a piece of data A belonging to chunk2 is inserted in MySQL t3: chunk2 is read, and data A has been sent downstream t4: chunk3 is read At this time, startOffset will be set to lowwatermark t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method *shouldEmit* to determine whether the data is sent downstream In this method {code:java} private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) { if (pureBinlogPhaseTables.contains(tableId)) { return true; } // the existed tables those have finished snapshot reading if (maxSplitHighWatermarkMap.containsKey(tableId) && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) { pureBinlogPhaseTables.add(tableId); return true; } } {code} *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without ts_sec variable, and the default value is 0 *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))* So this expression is judged as true *Data A continues to be sent downstream, and the data is repeated* was: Assume that the table has been split into 3 Chunks Timeline t1: chunk1 is read t2: a piece of data A belonging to chunk2 is inserted in MySQL t3: chunk2 is read, and data A has been sent downstream t4: chunk3 is read At this time, startOffset will be set to lowwatermark t5: BinlogSplitReader.pollSplitRecords receives data A, and uses the method shouldEmit to determine whether the data is sent downstream In this method {code:java} private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) { if (pureBinlogPhaseTables.contains(tableId)) { return true; } // the existed tables those have finished snapshot reading if (maxSplitHighWatermarkMap.containsKey(tableId) && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) { pureBinlogPhaseTables.add(tableId); return true; } } {code} *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without ts_sec variable, and the default value is 0 *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))* So this expression is judged as true *Data A continues to be sent downstream, and the data is repeated* > Data read duplication during the full-to-incremental conversion phase > --------------------------------------------------------------------- > > Key: FLINK-35600 > URL: https://issues.apache.org/jira/browse/FLINK-35600 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.1.0 > Reporter: Di Wu > Priority: Major > Labels: pull-request-available > > Assume that the table has been split into 3 Chunks > Timeline > t1: chunk1 is read > t2: a piece of data A belonging to chunk2 is inserted in MySQL > t3: chunk2 is read, and data A has been sent downstream > t4: chunk3 is read > At this time, startOffset will be set to lowwatermark > t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method > *shouldEmit* to determine whether the data is sent downstream > In this method > {code:java} > private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset > position) { > if (pureBinlogPhaseTables.contains(tableId)) { > return true; > } > // the existed tables those have finished snapshot reading > if (maxSplitHighWatermarkMap.containsKey(tableId) > && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) { > pureBinlogPhaseTables.add(tableId); > return true; > } > } {code} > *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data > without ts_sec variable, and the default value is 0 > *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))* > So this expression is judged as true > *Data A continues to be sent downstream, and the data is repeated* -- This message was sent by Atlassian Jira (v8.20.10#820010)