gguptp commented on code in PR #177: URL: https://github.com/apache/flink-connector-aws/pull/177#discussion_r1811943331
########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -89,13 +95,69 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection<Shard> shardsToAdd) { - Set<String> discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); + if (TRIM_HORIZON.equals(initialPosition)) { + addSplitsForTrimHorizon(shardsToAdd); + return; + } + addSplitsForLatest(shardsToAdd); + } + + private void addSplitsForLatest(Collection<Shard> shardsToAdd) { + List<Shard> openShards = + shardsToAdd.stream() + .filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() == null) + .collect(Collectors.toList()); + Map<String, Shard> shardIdToShardMap = + shardsToAdd.stream().collect(Collectors.toMap(Shard::shardId, shard -> shard)); + for (Shard shard : openShards) { + String shardId = shard.shardId(); + if (knownSplits.containsKey(shardId)) { + continue; + } + String firstShardIdBeforeTimestamp = + findFirstShardIdBeforeTimestamp(shardId, shardIdToShardMap); + putAllAncestorShardsTillFirstShardId( + shardId, firstShardIdBeforeTimestamp, shardIdToShardMap); + } + } + + private void putAllAncestorShardsTillFirstShardId( + String shardId, + String firstShardIdBeforeTimestamp, + Map<String, Shard> shardIdToShardMap) { + String currentShardId = shardId; + while (currentShardId != null && shardIdToShardMap.containsKey(currentShardId)) { + Shard currentShard = shardIdToShardMap.get(currentShardId); + if (Objects.equals(currentShardId, firstShardIdBeforeTimestamp)) { + DynamoDbStreamsShardSplit newSplit = mapToSplit(currentShard, LATEST); Review Comment: yes correct we need to update this in docs ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/util/ShardUtils.java: ########## @@ -78,4 +78,14 @@ public static boolean isShardOlderThanInconsistencyDetectionRetentionPeriod(Stri .plus( DDB_STREAMS_MAX_RETENTION_PERIOD_FOR_RESOLVING_INCONSISTENCIES)); } + + /** + * Returns true if shard ue if the shard was created before the given timestamp. Review Comment: will fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org