ruanhang1993 commented on code in PR #3619:
URL: https://github.com/apache/flink-cdc/pull/3619#discussion_r1853188260


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java:
##########
@@ -55,14 +55,14 @@ public void 
testPendingSplitsStateSerializerAndDeserialize() throws IOException
                 new 
PendingSplitsStateSerializer(constructSourceSplitSerializer());
         PendingSplitsState streamSplitsStateAfter =
                 pendingSplitsStateSerializer.deserializePendingSplitsState(
-                        6, 
pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore));
+                        7, 
pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore));
         Assert.assertEquals(streamPendingSplitsStateBefore, 
streamSplitsStateAfter);
 
         SnapshotPendingSplitsState snapshotPendingSplitsStateBefore =
                 
constructSnapshotPendingSplitsState(AssignerStatus.NEWLY_ADDED_ASSIGNING);
         PendingSplitsState snapshotPendingSplitsStateAfter =
                 pendingSplitsStateSerializer.deserializePendingSplitsState(
-                        6,

Review Comment:
   Please add the test for version 6 in 
`testPendingSplitsStateSerializerCompatibility`.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -397,6 +491,27 @@ && allSnapshotSplitsFinished()) {
             }
             LOG.info("Snapshot split assigner is turn into finished status.");
         }
+
+        if (splitFinishedCheckpointIds != null && 
!splitFinishedCheckpointIds.isEmpty()) {
+            Iterator<Map.Entry<String, Long>> iterator =
+                    splitFinishedCheckpointIds.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<String, Long> splitFinishedCheckpointId = 
iterator.next();
+                String splitId = splitFinishedCheckpointId.getKey();
+                Long splitCheckpointId = splitFinishedCheckpointId.getValue();
+                if (splitCheckpointId != UNDEFINED_CHECKPOINT_ID
+                        && checkpointId >= splitCheckpointId) {
+                    // record table-level splits metrics
+                    TableId tableId = SnapshotSplit.parseTableId(splitId);
+                    
enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId);
+                    iterator.remove();
+                }
+            }
+            LOG.info(

Review Comment:
   Maybe we do not need this information when the size of 
splitFinishedCheckpointIds is 0.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -359,11 +432,31 @@ public void addSplits(Collection<SourceSplitBase> splits) 
{
             // because they are failed
             assignedSplits.remove(split.splitId());
             splitFinishedOffsets.remove(split.splitId());
+
+            enumeratorMetrics
+                    .getTableMetrics(split.asSnapshotSplit().getTableId())
+                    .reprocessSplit(split.splitId());
+            TableId tableId = split.asSnapshotSplit().getTableId();
+
+            
enumeratorMetrics.getTableMetrics(tableId).removeFinishedSplit(split.splitId());
         }
     }
 
     @Override
     public SnapshotPendingSplitsState snapshotState(long checkpointId) {
+        if (splitFinishedCheckpointIds != null && 
!splitFinishedCheckpointIds.isEmpty()) {
+            for (Map.Entry<String, Long> splitFinishedCheckpointId :
+                    splitFinishedCheckpointIds.entrySet()) {
+                if (splitFinishedCheckpointId.getValue() == 
UNDEFINED_CHECKPOINT_ID) {
+                    splitFinishedCheckpointId.setValue(checkpointId);
+                }
+            }
+        }
+        LOG.info(
+                "SnapshotSplitAssigner snapshotState on checkpoint {} with 
splitFinishedCheckpointIds size {}.",
+                checkpointId,
+                splitFinishedCheckpointIds == null ? 0 : 
splitFinishedCheckpointIds.size());

Review Comment:
   Maybe we do not need this information when the size of 
splitFinishedCheckpointIds is 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to