ruanhang1993 commented on code in PR #3619: URL: https://github.com/apache/flink-cdc/pull/3619#discussion_r1853188260
########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java: ########## @@ -55,14 +55,14 @@ public void testPendingSplitsStateSerializerAndDeserialize() throws IOException new PendingSplitsStateSerializer(constructSourceSplitSerializer()); PendingSplitsState streamSplitsStateAfter = pendingSplitsStateSerializer.deserializePendingSplitsState( - 6, pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore)); + 7, pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore)); Assert.assertEquals(streamPendingSplitsStateBefore, streamSplitsStateAfter); SnapshotPendingSplitsState snapshotPendingSplitsStateBefore = constructSnapshotPendingSplitsState(AssignerStatus.NEWLY_ADDED_ASSIGNING); PendingSplitsState snapshotPendingSplitsStateAfter = pendingSplitsStateSerializer.deserializePendingSplitsState( - 6, Review Comment: Please add the test for version 6 in `testPendingSplitsStateSerializerCompatibility`. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java: ########## @@ -397,6 +491,27 @@ && allSnapshotSplitsFinished()) { } LOG.info("Snapshot split assigner is turn into finished status."); } + + if (splitFinishedCheckpointIds != null && !splitFinishedCheckpointIds.isEmpty()) { + Iterator<Map.Entry<String, Long>> iterator = + splitFinishedCheckpointIds.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, Long> splitFinishedCheckpointId = iterator.next(); + String splitId = splitFinishedCheckpointId.getKey(); + Long splitCheckpointId = splitFinishedCheckpointId.getValue(); + if (splitCheckpointId != UNDEFINED_CHECKPOINT_ID + && checkpointId >= splitCheckpointId) { + // record table-level splits metrics + TableId tableId = SnapshotSplit.parseTableId(splitId); + enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId); + iterator.remove(); + } + } + LOG.info( Review Comment: Maybe we do not need this information when the size of splitFinishedCheckpointIds is 0. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java: ########## @@ -359,11 +432,31 @@ public void addSplits(Collection<SourceSplitBase> splits) { // because they are failed assignedSplits.remove(split.splitId()); splitFinishedOffsets.remove(split.splitId()); + + enumeratorMetrics + .getTableMetrics(split.asSnapshotSplit().getTableId()) + .reprocessSplit(split.splitId()); + TableId tableId = split.asSnapshotSplit().getTableId(); + + enumeratorMetrics.getTableMetrics(tableId).removeFinishedSplit(split.splitId()); } } @Override public SnapshotPendingSplitsState snapshotState(long checkpointId) { + if (splitFinishedCheckpointIds != null && !splitFinishedCheckpointIds.isEmpty()) { + for (Map.Entry<String, Long> splitFinishedCheckpointId : + splitFinishedCheckpointIds.entrySet()) { + if (splitFinishedCheckpointId.getValue() == UNDEFINED_CHECKPOINT_ID) { + splitFinishedCheckpointId.setValue(checkpointId); + } + } + } + LOG.info( + "SnapshotSplitAssigner snapshotState on checkpoint {} with splitFinishedCheckpointIds size {}.", + checkpointId, + splitFinishedCheckpointIds == null ? 0 : splitFinishedCheckpointIds.size()); Review Comment: Maybe we do not need this information when the size of splitFinishedCheckpointIds is 0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org