gguptp commented on code in PR #177:
URL: 
https://github.com/apache/flink-connector-aws/pull/177#discussion_r1811945191


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -89,13 +95,69 @@ public SplitTracker(
      * @param shardsToAdd collection of splits to add to tracking
      */
     public void addSplits(Collection<Shard> shardsToAdd) {
-        Set<String> discoveredShardIds =
-                
shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet());
+        if (TRIM_HORIZON.equals(initialPosition)) {
+            addSplitsForTrimHorizon(shardsToAdd);
+            return;
+        }
+        addSplitsForLatest(shardsToAdd);
+    }
+
+    private void addSplitsForLatest(Collection<Shard> shardsToAdd) {
+        List<Shard> openShards =
+                shardsToAdd.stream()
+                        .filter(shard -> 
shard.sequenceNumberRange().endingSequenceNumber() == null)
+                        .collect(Collectors.toList());
+        Map<String, Shard> shardIdToShardMap =
+                shardsToAdd.stream().collect(Collectors.toMap(Shard::shardId, 
shard -> shard));
+        for (Shard shard : openShards) {
+            String shardId = shard.shardId();
+            if (knownSplits.containsKey(shardId)) {
+                continue;
+            }
+            String firstShardIdBeforeTimestamp =
+                    findFirstShardIdBeforeTimestamp(shardId, 
shardIdToShardMap);
+            putAllAncestorShardsTillFirstShardId(
+                    shardId, firstShardIdBeforeTimestamp, shardIdToShardMap);
+        }
+    }
+
+    private void putAllAncestorShardsTillFirstShardId(
+            String shardId,
+            String firstShardIdBeforeTimestamp,
+            Map<String, Shard> shardIdToShardMap) {
+        String currentShardId = shardId;
+        while (currentShardId != null && 
shardIdToShardMap.containsKey(currentShardId)) {
+            Shard currentShard = shardIdToShardMap.get(currentShardId);
+            if (Objects.equals(currentShardId, firstShardIdBeforeTimestamp)) {
+                DynamoDbStreamsShardSplit newSplit = mapToSplit(currentShard, 
LATEST);
+                knownSplits.putIfAbsent(currentShardId, newSplit);

Review Comment:
   i think if we put it as a break condition of the while loop this shard will 
not be assigned the LATEST shard iterator type hence i prefer to keep it as-is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to