hlteoh37 commented on code in PR #177:
URL: 
https://github.com/apache/flink-connector-aws/pull/177#discussion_r1799315333


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -89,13 +91,77 @@ public SplitTracker(
      * @param shardsToAdd collection of splits to add to tracking
      */
     public void addSplits(Collection<Shard> shardsToAdd) {
-        Set<String> discoveredShardIds =
-                
shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet());
+        if (TRIM_HORIZON.equals(initialPosition)) {
+            addSplitsForTrimHorizon(shardsToAdd);
+            return;
+        }
+        addSplitsForLatest(shardsToAdd);
+    }
+
+    /**
+     * If there is an ancestor of an open shard in knownSplits, we put all of 
its ancestors after
+     * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the 
ancestors and track only
+     * the open shard with LATEST. There might be a case when restoring from 
expired snapshots that
+     * there is an ancestor in knownSplits, but we wont know that it is an 
ancestor, so in that case
+     * also, all the ancestors will not be tracked and rather only the current 
open shard will be
+     * tracked with LATEST.

Review Comment:
   We should align that the expected behavior, when using `LATEST`, should be:
   
   1. When starting without snapshot,
      - All shards read from `LATEST`. If we want, we can focus on just the 
open shards.
   2. When reading with snapshot,
      - All shards from time of start of application are read from 
`TRIM_HORIZON`, 



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -89,13 +91,77 @@ public SplitTracker(
      * @param shardsToAdd collection of splits to add to tracking
      */
     public void addSplits(Collection<Shard> shardsToAdd) {
-        Set<String> discoveredShardIds =
-                
shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet());
+        if (TRIM_HORIZON.equals(initialPosition)) {
+            addSplitsForTrimHorizon(shardsToAdd);
+            return;
+        }
+        addSplitsForLatest(shardsToAdd);
+    }
+
+    /**
+     * If there is an ancestor of an open shard in knownSplits, we put all of 
its ancestors after
+     * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the 
ancestors and track only
+     * the open shard with LATEST. There might be a case when restoring from 
expired snapshots that
+     * there is an ancestor in knownSplits, but we wont know that it is an 
ancestor, so in that case
+     * also, all the ancestors will not be tracked and rather only the current 
open shard will be
+     * tracked with LATEST.

Review Comment:
   We should align that the expected behavior, when using `LATEST`, should be:
   
   1. When starting without snapshot:
      - All shards read from `LATEST`. If we want, we can focus on just the 
open shards.
   2. When reading with snapshot:
      - All shards from time of start of application are read from 
`TRIM_HORIZON`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to