hlteoh37 commented on code in PR #151:
URL: 
https://github.com/apache/flink-connector-aws/pull/151#discussion_r1741916696


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+import org.apache.flink.connector.dynamodb.source.util.ShardUtils;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/**
+ * This class is used to track splits and will be used to assign any 
unassigned splits. It also
+ * ensures that the parent-child shard ordering is maintained.
+ */
+@Internal
+public class SplitTracker {
+    private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+    private final Set<String> assignedSplits = new HashSet<>();
+    private final Set<String> finishedSplits = new HashSet<>();
+    private final String streamArn;
+    private final InitialPosition initialPosition;
+
+    public SplitTracker(String streamArn, InitialPosition initialPosition) {
+        this(Collections.emptyList(), streamArn, initialPosition);
+    }
+
+    public SplitTracker(
+            List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState,
+            String streamArn,
+            DynamodbStreamsSourceConfigConstants.InitialPosition 
initialPosition) {
+        this.streamArn = streamArn;
+        this.initialPosition = initialPosition;
+        initialState.forEach(
+                splitWithStatus -> {
+                    DynamoDbStreamsShardSplit currentSplit = 
splitWithStatus.split();
+                    knownSplits.put(currentSplit.splitId(), currentSplit);
+
+                    if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
+                        assignedSplits.add(splitWithStatus.split().splitId());
+                    }
+                    if (FINISHED.equals(splitWithStatus.assignmentStatus())) {
+                        finishedSplits.add(splitWithStatus.split().splitId());
+                    }
+                });
+    }
+
+    /**
+     * Add newly discovered splits to tracker.
+     *
+     * @param shardsToAdd collection of splits to add to tracking
+     */
+    public void addSplits(Collection<Shard> shardsToAdd) {
+        for (Shard shard : shardsToAdd) {
+            String shardId = shard.shardId();
+            if (!knownSplits.containsKey(shardId)) {
+                DynamoDbStreamsShardSplit newSplit = mapToSplit(shard, 
getStartingPosition(shard));
+                knownSplits.put(shardId, newSplit);
+            }
+        }
+    }
+
+    private InitialPosition getStartingPosition(Shard shard) {
+        if (shard.parentShardId() == null) {
+            return initialPosition;
+        }
+        if (!knownSplits.containsKey(shard.parentShardId())) {
+            return initialPosition;
+        }
+        return TRIM_HORIZON;
+    }
+
+    private DynamoDbStreamsShardSplit mapToSplit(
+            Shard shard, DynamodbStreamsSourceConfigConstants.InitialPosition 
initialPosition) {
+        StartingPosition startingPosition;
+        switch (initialPosition) {
+            case LATEST:
+                startingPosition = StartingPosition.latest();
+                break;
+            case TRIM_HORIZON:
+            default:
+                startingPosition = StartingPosition.fromStart();
+        }
+
+        Set<String> parentShardIds = new HashSet<>();
+        if (shard.parentShardId() != null) {
+            parentShardIds.add(shard.parentShardId());
+        }
+        return new DynamoDbStreamsShardSplit(
+                streamArn, shard.shardId(), startingPosition, parentShardIds);
+    }
+
+    /**
+     * Mark splits as assigned. Assigned splits will no longer be returned as 
pending splits.
+     *
+     * @param splitsToAssign collection of splits to mark as assigned
+     */
+    public void markAsAssigned(Collection<DynamoDbStreamsShardSplit> 
splitsToAssign) {
+        splitsToAssign.forEach(split -> assignedSplits.add(split.splitId()));
+    }
+
+    /**
+     * Mark splits as finished. Assigned splits will no longer be returned as 
pending splits.
+     *
+     * @param splitsToFinish collection of splits to mark as assigned
+     */
+    public void markAsFinished(Collection<String> splitsToFinish) {
+        splitsToFinish.forEach(
+                splitId -> {
+                    finishedSplits.add(splitId);
+                    assignedSplits.remove(splitId);
+                });
+    }
+
+    public boolean isAssigned(String splitId) {
+        return assignedSplits.contains(splitId);
+    }
+
+    /**
+     * Since we never put an inconsistent shard lineage to splitTracker, so if 
a shard's parent is
+     * not there, that means that that should already be cleaned up.
+     */
+    public List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment() {
+        return knownSplits.values().stream()
+                .filter(
+                        split -> {
+                            boolean splitIsNotAssigned = 
!isAssigned(split.splitId());
+                            return splitIsNotAssigned
+                                    && !isFinished(split.splitId())
+                                    && (verifyAllParentSplitsAreFinished(split)
+                                            || 
verifyAllParentSplitsAreCleanedUp(split));
+                        })
+                .collect(Collectors.toList());
+    }
+
+    public List<DynamoDBStreamsShardSplitWithAssignmentStatus> 
snapshotState(long checkpointId) {
+        return knownSplits.values().stream()
+                .map(
+                        split -> {
+                            SplitAssignmentStatus assignmentStatus =
+                                    SplitAssignmentStatus.UNASSIGNED;
+                            if (isAssigned(split.splitId())) {
+                                assignmentStatus = ASSIGNED;
+                            } else if (isFinished(split.splitId())) {
+                                assignmentStatus = 
SplitAssignmentStatus.FINISHED;
+                            }
+                            return new 
DynamoDBStreamsShardSplitWithAssignmentStatus(
+                                    split, assignmentStatus);
+                        })
+                .collect(Collectors.toList());
+    }
+
+    @VisibleForTesting
+    public Set<String> getKnownSplitIds() {
+        return knownSplits.keySet();
+    }
+
+    /**
+     * finishedSplits needs to be cleaned up. The logic applied to cleaning up 
finished splits is
+     * that if any split has been finished reading, its parent has been 
finished reading and it is
+     * no longer present in the describestream response, it means that the 
split can be cleaned up.
+     */
+    public void removeSplits(Set<String> discoveredSplitIds) {
+        Set<String> finishedSplitsSnapshot = new HashSet<>(finishedSplits);
+        for (String finishedSplitId : finishedSplitsSnapshot) {
+            DynamoDbStreamsShardSplit finishedSplit = 
knownSplits.get(finishedSplitId);
+            if (isSplitReadyToBeCleanedUp(finishedSplit, discoveredSplitIds)) {
+                finishedSplits.remove(finishedSplitId);
+                knownSplits.remove(finishedSplitId);
+            }
+        }
+    }
+
+    private boolean isSplitReadyToBeCleanedUp(
+            DynamoDbStreamsShardSplit finishedSplit, Set<String> 
discoveredSplitIds) {
+        String splitId = finishedSplit.splitId();
+        boolean parentSplitsAreFinished = 
verifyAllParentSplitsAreFinished(finishedSplit);
+        boolean parentSplitsAreCleanedUp = 
verifyAllParentSplitsAreCleanedUp(finishedSplit);

Review Comment:
   why do we care about parents? Might cause issues when we restore from stale 
snapshot (all shards are expired and should be cleaned up)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to