gguptp commented on code in PR #177:
URL: 
https://github.com/apache/flink-connector-aws/pull/177#discussion_r1811947253


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -89,13 +95,69 @@ public SplitTracker(
      * @param shardsToAdd collection of splits to add to tracking
      */
     public void addSplits(Collection<Shard> shardsToAdd) {
-        Set<String> discoveredShardIds =
-                
shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet());
+        if (TRIM_HORIZON.equals(initialPosition)) {
+            addSplitsForTrimHorizon(shardsToAdd);
+            return;
+        }
+        addSplitsForLatest(shardsToAdd);
+    }
+
+    private void addSplitsForLatest(Collection<Shard> shardsToAdd) {
+        List<Shard> openShards =
+                shardsToAdd.stream()
+                        .filter(shard -> 
shard.sequenceNumberRange().endingSequenceNumber() == null)
+                        .collect(Collectors.toList());
+        Map<String, Shard> shardIdToShardMap =
+                shardsToAdd.stream().collect(Collectors.toMap(Shard::shardId, 
shard -> shard));
+        for (Shard shard : openShards) {
+            String shardId = shard.shardId();
+            if (knownSplits.containsKey(shardId)) {
+                continue;
+            }
+            String firstShardIdBeforeTimestamp =
+                    findFirstShardIdBeforeTimestamp(shardId, 
shardIdToShardMap);
+            putAllAncestorShardsTillFirstShardId(
+                    shardId, firstShardIdBeforeTimestamp, shardIdToShardMap);
+        }
+    }
+
+    private void putAllAncestorShardsTillFirstShardId(
+            String shardId,
+            String firstShardIdBeforeTimestamp,
+            Map<String, Shard> shardIdToShardMap) {
+        String currentShardId = shardId;
+        while (currentShardId != null && 
shardIdToShardMap.containsKey(currentShardId)) {
+            Shard currentShard = shardIdToShardMap.get(currentShardId);
+            if (Objects.equals(currentShardId, firstShardIdBeforeTimestamp)) {
+                DynamoDbStreamsShardSplit newSplit = mapToSplit(currentShard, 
LATEST);
+                knownSplits.putIfAbsent(currentShardId, newSplit);
+                break;
+            } else {
+                DynamoDbStreamsShardSplit newSplit = mapToSplit(currentShard, 
TRIM_HORIZON);
+                knownSplits.putIfAbsent(currentShardId, newSplit);
+            }
+            currentShardId = currentShard.parentShardId();
+        }
+    }
+
+    private String findFirstShardIdBeforeTimestamp(
+            String shardIdToStartWith, Map<String, Shard> shardIdToShardMap) {
+        String shardId = shardIdToStartWith;
+        while (shardId != null && shardIdToShardMap.containsKey(shardId)) {
+            Shard shard = shardIdToShardMap.get(shardId);

Review Comment:
   makes sense let me do this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to