gguptp commented on code in PR #177:
URL: 
https://github.com/apache/flink-connector-aws/pull/177#discussion_r1811943331


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -89,13 +95,69 @@ public SplitTracker(
      * @param shardsToAdd collection of splits to add to tracking
      */
     public void addSplits(Collection<Shard> shardsToAdd) {
-        Set<String> discoveredShardIds =
-                
shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet());
+        if (TRIM_HORIZON.equals(initialPosition)) {
+            addSplitsForTrimHorizon(shardsToAdd);
+            return;
+        }
+        addSplitsForLatest(shardsToAdd);
+    }
+
+    private void addSplitsForLatest(Collection<Shard> shardsToAdd) {
+        List<Shard> openShards =
+                shardsToAdd.stream()
+                        .filter(shard -> 
shard.sequenceNumberRange().endingSequenceNumber() == null)
+                        .collect(Collectors.toList());
+        Map<String, Shard> shardIdToShardMap =
+                shardsToAdd.stream().collect(Collectors.toMap(Shard::shardId, 
shard -> shard));
+        for (Shard shard : openShards) {
+            String shardId = shard.shardId();
+            if (knownSplits.containsKey(shardId)) {
+                continue;
+            }
+            String firstShardIdBeforeTimestamp =
+                    findFirstShardIdBeforeTimestamp(shardId, 
shardIdToShardMap);
+            putAllAncestorShardsTillFirstShardId(
+                    shardId, firstShardIdBeforeTimestamp, shardIdToShardMap);
+        }
+    }
+
+    private void putAllAncestorShardsTillFirstShardId(
+            String shardId,
+            String firstShardIdBeforeTimestamp,
+            Map<String, Shard> shardIdToShardMap) {
+        String currentShardId = shardId;
+        while (currentShardId != null && 
shardIdToShardMap.containsKey(currentShardId)) {
+            Shard currentShard = shardIdToShardMap.get(currentShardId);
+            if (Objects.equals(currentShardId, firstShardIdBeforeTimestamp)) {
+                DynamoDbStreamsShardSplit newSplit = mapToSplit(currentShard, 
LATEST);

Review Comment:
   yes correct we need to update this in docs



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/util/ShardUtils.java:
##########
@@ -78,4 +78,14 @@ public static boolean 
isShardOlderThanInconsistencyDetectionRetentionPeriod(Stri
                                 .plus(
                                         
DDB_STREAMS_MAX_RETENTION_PERIOD_FOR_RESOLVING_INCONSISTENCIES));
     }
+
+    /**
+     * Returns true if shard ue if the shard was created before the given 
timestamp.

Review Comment:
   will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to