hlteoh37 commented on code in PR #177: URL: https://github.com/apache/flink-connector-aws/pull/177#discussion_r1799315333
########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -89,13 +91,77 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection<Shard> shardsToAdd) { - Set<String> discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); + if (TRIM_HORIZON.equals(initialPosition)) { + addSplitsForTrimHorizon(shardsToAdd); + return; + } + addSplitsForLatest(shardsToAdd); + } + + /** + * If there is an ancestor of an open shard in knownSplits, we put all of its ancestors after + * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the ancestors and track only + * the open shard with LATEST. There might be a case when restoring from expired snapshots that + * there is an ancestor in knownSplits, but we wont know that it is an ancestor, so in that case + * also, all the ancestors will not be tracked and rather only the current open shard will be + * tracked with LATEST. Review Comment: We should align that the expected behavior, when using `LATEST`, should be: 1. When starting without snapshot, - All shards read from `LATEST`. If we want, we can focus on just the open shards. 2. When reading with snapshot, - All shards from time of start of application are read from `TRIM_HORIZON`, ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -89,13 +91,77 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection<Shard> shardsToAdd) { - Set<String> discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); + if (TRIM_HORIZON.equals(initialPosition)) { + addSplitsForTrimHorizon(shardsToAdd); + return; + } + addSplitsForLatest(shardsToAdd); + } + + /** + * If there is an ancestor of an open shard in knownSplits, we put all of its ancestors after + * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the ancestors and track only + * the open shard with LATEST. There might be a case when restoring from expired snapshots that + * there is an ancestor in knownSplits, but we wont know that it is an ancestor, so in that case + * also, all the ancestors will not be tracked and rather only the current open shard will be + * tracked with LATEST. Review Comment: We should align that the expected behavior, when using `LATEST`, should be: 1. When starting without snapshot: - All shards read from `LATEST`. If we want, we can focus on just the open shards. 2. When reading with snapshot: - All shards from time of start of application are read from `TRIM_HORIZON` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org