gguptp commented on code in PR #177: URL: https://github.com/apache/flink-connector-aws/pull/177#discussion_r1803887401
########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -89,13 +91,77 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection<Shard> shardsToAdd) { - Set<String> discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); + if (TRIM_HORIZON.equals(initialPosition)) { + addSplitsForTrimHorizon(shardsToAdd); + return; + } + addSplitsForLatest(shardsToAdd); + } + + /** + * If there is an ancestor of an open shard in knownSplits, we put all of its ancestors after + * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the ancestors and track only + * the open shard with LATEST. There might be a case when restoring from expired snapshots that + * there is an ancestor in knownSplits, but we wont know that it is an ancestor, so in that case + * also, all the ancestors will not be tracked and rather only the current open shard will be + * tracked with LATEST. Review Comment: changed the algo to resolve this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org