hlteoh37 commented on code in PR #177: URL: https://github.com/apache/flink-connector-aws/pull/177#discussion_r1810789785
########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -75,6 +76,7 @@ public class DynamoDbStreamsSourceEnumerator private final DynamoDbStreamsShardAssigner shardAssigner; private final ShardAssignerContext shardAssignerContext; private final SplitTracker splitTracker; + private Instant startTimestamp; Review Comment: this should be `final` ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -89,13 +95,69 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection<Shard> shardsToAdd) { - Set<String> discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); + if (TRIM_HORIZON.equals(initialPosition)) { + addSplitsForTrimHorizon(shardsToAdd); + return; + } + addSplitsForLatest(shardsToAdd); + } + + private void addSplitsForLatest(Collection<Shard> shardsToAdd) { + List<Shard> openShards = + shardsToAdd.stream() + .filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() == null) + .collect(Collectors.toList()); + Map<String, Shard> shardIdToShardMap = + shardsToAdd.stream().collect(Collectors.toMap(Shard::shardId, shard -> shard)); + for (Shard shard : openShards) { + String shardId = shard.shardId(); + if (knownSplits.containsKey(shardId)) { + continue; + } + String firstShardIdBeforeTimestamp = + findFirstShardIdBeforeTimestamp(shardId, shardIdToShardMap); + putAllAncestorShardsTillFirstShardId( + shardId, firstShardIdBeforeTimestamp, shardIdToShardMap); + } + } + + private void putAllAncestorShardsTillFirstShardId( + String shardId, + String firstShardIdBeforeTimestamp, + Map<String, Shard> shardIdToShardMap) { + String currentShardId = shardId; + while (currentShardId != null && shardIdToShardMap.containsKey(currentShardId)) { + Shard currentShard = shardIdToShardMap.get(currentShardId); + if (Objects.equals(currentShardId, firstShardIdBeforeTimestamp)) { + DynamoDbStreamsShardSplit newSplit = mapToSplit(currentShard, LATEST); + knownSplits.putIfAbsent(currentShardId, newSplit); + break; + } else { + DynamoDbStreamsShardSplit newSplit = mapToSplit(currentShard, TRIM_HORIZON); + knownSplits.putIfAbsent(currentShardId, newSplit); + } + currentShardId = currentShard.parentShardId(); + } + } + + private String findFirstShardIdBeforeTimestamp( + String shardIdToStartWith, Map<String, Shard> shardIdToShardMap) { + String shardId = shardIdToStartWith; + while (shardId != null && shardIdToShardMap.containsKey(shardId)) { + Shard shard = shardIdToShardMap.get(shardId); Review Comment: nit: we could optimise this algorithm by doing a check if `split` is in `knownSplits` as a `break` condition of the while loop ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/util/ShardUtils.java: ########## @@ -78,4 +78,14 @@ public static boolean isShardOlderThanInconsistencyDetectionRetentionPeriod(Stri .plus( DDB_STREAMS_MAX_RETENTION_PERIOD_FOR_RESOLVING_INCONSISTENCIES)); } + + /** + * Returns true if shard ue if the shard was created before the given timestamp. Review Comment: typo! ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -89,13 +95,69 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection<Shard> shardsToAdd) { - Set<String> discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); + if (TRIM_HORIZON.equals(initialPosition)) { + addSplitsForTrimHorizon(shardsToAdd); + return; + } + addSplitsForLatest(shardsToAdd); + } + + private void addSplitsForLatest(Collection<Shard> shardsToAdd) { + List<Shard> openShards = + shardsToAdd.stream() + .filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() == null) + .collect(Collectors.toList()); + Map<String, Shard> shardIdToShardMap = + shardsToAdd.stream().collect(Collectors.toMap(Shard::shardId, shard -> shard)); + for (Shard shard : openShards) { + String shardId = shard.shardId(); + if (knownSplits.containsKey(shardId)) { + continue; + } + String firstShardIdBeforeTimestamp = + findFirstShardIdBeforeTimestamp(shardId, shardIdToShardMap); + putAllAncestorShardsTillFirstShardId( + shardId, firstShardIdBeforeTimestamp, shardIdToShardMap); + } + } + + private void putAllAncestorShardsTillFirstShardId( + String shardId, + String firstShardIdBeforeTimestamp, + Map<String, Shard> shardIdToShardMap) { + String currentShardId = shardId; + while (currentShardId != null && shardIdToShardMap.containsKey(currentShardId)) { + Shard currentShard = shardIdToShardMap.get(currentShardId); + if (Objects.equals(currentShardId, firstShardIdBeforeTimestamp)) { + DynamoDbStreamsShardSplit newSplit = mapToSplit(currentShard, LATEST); Review Comment: Technically we still drop a bunch of records from the given timestamp, but we need to just make this clear on docs. ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -89,13 +95,69 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection<Shard> shardsToAdd) { - Set<String> discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); + if (TRIM_HORIZON.equals(initialPosition)) { + addSplitsForTrimHorizon(shardsToAdd); + return; + } + addSplitsForLatest(shardsToAdd); + } + + private void addSplitsForLatest(Collection<Shard> shardsToAdd) { + List<Shard> openShards = + shardsToAdd.stream() + .filter(shard -> shard.sequenceNumberRange().endingSequenceNumber() == null) + .collect(Collectors.toList()); + Map<String, Shard> shardIdToShardMap = + shardsToAdd.stream().collect(Collectors.toMap(Shard::shardId, shard -> shard)); + for (Shard shard : openShards) { + String shardId = shard.shardId(); + if (knownSplits.containsKey(shardId)) { + continue; + } + String firstShardIdBeforeTimestamp = + findFirstShardIdBeforeTimestamp(shardId, shardIdToShardMap); + putAllAncestorShardsTillFirstShardId( + shardId, firstShardIdBeforeTimestamp, shardIdToShardMap); + } + } + + private void putAllAncestorShardsTillFirstShardId( + String shardId, + String firstShardIdBeforeTimestamp, + Map<String, Shard> shardIdToShardMap) { + String currentShardId = shardId; + while (currentShardId != null && shardIdToShardMap.containsKey(currentShardId)) { + Shard currentShard = shardIdToShardMap.get(currentShardId); + if (Objects.equals(currentShardId, firstShardIdBeforeTimestamp)) { + DynamoDbStreamsShardSplit newSplit = mapToSplit(currentShard, LATEST); + knownSplits.putIfAbsent(currentShardId, newSplit); Review Comment: nit: we could optimise this algorithm by doing this check as a `break` condition of the while loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org