gguptp commented on code in PR #177: URL: https://github.com/apache/flink-connector-aws/pull/177#discussion_r1811947253
########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -89,13 +95,69 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection<Shard> shardsToAdd) { - Set<String> discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); + if (TRIM_HORIZON.equals(initialPosition)) { + addSplitsForTrimHorizon(shardsToAdd); + return; + } + addSplitsForLatest(shardsToAdd); + } + + private void addSplitsForLatest(Collection<Shard> shardsToAdd) { + List<Shard> openShards = + shardsToAdd.stream() + .filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() == null) + .collect(Collectors.toList()); + Map<String, Shard> shardIdToShardMap = + shardsToAdd.stream().collect(Collectors.toMap(Shard::shardId, shard -> shard)); + for (Shard shard : openShards) { + String shardId = shard.shardId(); + if (knownSplits.containsKey(shardId)) { + continue; + } + String firstShardIdBeforeTimestamp = + findFirstShardIdBeforeTimestamp(shardId, shardIdToShardMap); + putAllAncestorShardsTillFirstShardId( + shardId, firstShardIdBeforeTimestamp, shardIdToShardMap); + } + } + + private void putAllAncestorShardsTillFirstShardId( + String shardId, + String firstShardIdBeforeTimestamp, + Map<String, Shard> shardIdToShardMap) { + String currentShardId = shardId; + while (currentShardId != null && shardIdToShardMap.containsKey(currentShardId)) { + Shard currentShard = shardIdToShardMap.get(currentShardId); + if (Objects.equals(currentShardId, firstShardIdBeforeTimestamp)) { + DynamoDbStreamsShardSplit newSplit = mapToSplit(currentShard, LATEST); + knownSplits.putIfAbsent(currentShardId, newSplit); + break; + } else { + DynamoDbStreamsShardSplit newSplit = mapToSplit(currentShard, TRIM_HORIZON); + knownSplits.putIfAbsent(currentShardId, newSplit); + } + currentShardId = currentShard.parentShardId(); + } + } + + private String findFirstShardIdBeforeTimestamp( + String shardIdToStartWith, Map<String, Shard> shardIdToShardMap) { + String shardId = shardIdToStartWith; + while (shardId != null && shardIdToShardMap.containsKey(shardId)) { + Shard shard = shardIdToShardMap.get(shardId); Review Comment: makes sense let me do this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org