JNSimba commented on code in PR #3415: URL: https://github.com/apache/flink-cdc/pull/3415#discussion_r1778269742
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java: ########## @@ -187,6 +192,9 @@ protected SnapshotResult<MySqlOffsetContext> doExecute( } else { // Get the current binlog offset as HW highWatermark = DebeziumUtils.currentBinlogOffset(jdbcConnection); + long epochSecond = clock.currentTime().getEpochSecond(); + highWatermark.getOffset().put(BinlogOffset.TIMESTAMP_KEY, String.valueOf(epochSecond)); + highWatermark.getOffset().put(BinlogOffset.SERVER_ID_KEY, String.valueOf(epochSecond)); Review Comment: In the configureFilter method of BinlogSplitReader In the process, the highWatermark of all chunks will be compared to obtain the largest one. When the table has multiple chunks chunk-1:timestamp=1727423957,binlogpostion=1001 chunk-2:timestamp=1727423958,binlogpostion=1002 chunk-3:timestamp=1727423959,binlogpostion=1002 chunk-4:timestamp=1727423960,binlogpostion=1002 However, at this time, the serverid is 0, but the BinlogPosition may be different (because new data has been added during the period). According to the current logic of **BinlogOffset.compare**: if the serverid is the same, the postion/filename will be compared. However, for chunk-2/chunk-3/chunk-4, except for the timestamp, everything else is the same, so the calculated highWatermark is finally chunk-2, which will lead to duplicate data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org