liuxiao2shf commented on code in PR #3619:
URL: https://github.com/apache/flink-cdc/pull/3619#discussion_r1853230011


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -397,6 +491,27 @@ && allSnapshotSplitsFinished()) {
             }
             LOG.info("Snapshot split assigner is turn into finished status.");
         }
+
+        if (splitFinishedCheckpointIds != null && 
!splitFinishedCheckpointIds.isEmpty()) {
+            Iterator<Map.Entry<String, Long>> iterator =
+                    splitFinishedCheckpointIds.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<String, Long> splitFinishedCheckpointId = 
iterator.next();
+                String splitId = splitFinishedCheckpointId.getKey();
+                Long splitCheckpointId = splitFinishedCheckpointId.getValue();
+                if (splitCheckpointId != UNDEFINED_CHECKPOINT_ID
+                        && checkpointId >= splitCheckpointId) {
+                    // record table-level splits metrics
+                    TableId tableId = SnapshotSplit.parseTableId(splitId);
+                    
enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId);
+                    iterator.remove();
+                }
+            }
+            LOG.info(

Review Comment:
   Adjustment completed



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -359,11 +432,31 @@ public void addSplits(Collection<SourceSplitBase> splits) 
{
             // because they are failed
             assignedSplits.remove(split.splitId());
             splitFinishedOffsets.remove(split.splitId());
+
+            enumeratorMetrics
+                    .getTableMetrics(split.asSnapshotSplit().getTableId())
+                    .reprocessSplit(split.splitId());
+            TableId tableId = split.asSnapshotSplit().getTableId();
+
+            
enumeratorMetrics.getTableMetrics(tableId).removeFinishedSplit(split.splitId());
         }
     }
 
     @Override
     public SnapshotPendingSplitsState snapshotState(long checkpointId) {
+        if (splitFinishedCheckpointIds != null && 
!splitFinishedCheckpointIds.isEmpty()) {
+            for (Map.Entry<String, Long> splitFinishedCheckpointId :
+                    splitFinishedCheckpointIds.entrySet()) {
+                if (splitFinishedCheckpointId.getValue() == 
UNDEFINED_CHECKPOINT_ID) {
+                    splitFinishedCheckpointId.setValue(checkpointId);
+                }
+            }
+        }
+        LOG.info(
+                "SnapshotSplitAssigner snapshotState on checkpoint {} with 
splitFinishedCheckpointIds size {}.",
+                checkpointId,
+                splitFinishedCheckpointIds == null ? 0 : 
splitFinishedCheckpointIds.size());

Review Comment:
   Adjustment completed



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java:
##########
@@ -55,14 +55,14 @@ public void 
testPendingSplitsStateSerializerAndDeserialize() throws IOException
                 new 
PendingSplitsStateSerializer(constructSourceSplitSerializer());
         PendingSplitsState streamSplitsStateAfter =
                 pendingSplitsStateSerializer.deserializePendingSplitsState(
-                        6, 
pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore));
+                        7, 
pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore));
         Assert.assertEquals(streamPendingSplitsStateBefore, 
streamSplitsStateAfter);
 
         SnapshotPendingSplitsState snapshotPendingSplitsStateBefore =
                 
constructSnapshotPendingSplitsState(AssignerStatus.NEWLY_ADDED_ASSIGNING);
         PendingSplitsState snapshotPendingSplitsStateAfter =
                 pendingSplitsStateSerializer.deserializePendingSplitsState(
-                        6,

Review Comment:
   Adjustment completed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to