hlteoh37 commented on code in PR #151: URL: https://github.com/apache/flink-connector-aws/pull/151#discussion_r1736097740
########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); + public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) { + throw new UnsupportedOperationException("Partial recovery is not supported"); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); + } + } + + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { + splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + splitAssignment + .get(subtaskId) + .removeIf( + split -> + splitsFinishedEvent + .getFinishedSplitIds() + .contains(split.splitId())); + assignSplits(); + } + + private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { + if (throwable != null) { + throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); + } + + SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = + trackSplitsAndResolveInconsistencies(discoveredSplits); + + if (splitGraphInconsistencyTracker.inconsistencyDetected()) { + LOG.error( + "There are inconsistencies in DescribeStream which we were not able to resolve. First leaf node on which inconsistency was detected:" + + splitGraphInconsistencyTracker.getEarliestClosedLeafNode()); + return; + } + + splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes()); + splitTracker.removeSplits( + splitGraphInconsistencyTracker.getNodes().stream() + .map(Shard::shardId) + .collect(Collectors.toSet())); + if (context.registeredReaders().size() < context.currentParallelism()) { + LOG.info( + "Insufficient registered readers, skipping assignment of discovered splits until all readers are registered. Required number of readers: {}, registered readers: {}", + context.currentParallelism(), + context.registeredReaders().size()); return; } + assignSplits(); + } - for (DynamoDbStreamsShardSplit split : splits) { - splitAssignment.get(subtaskId).remove(split); - assignedSplitIds.remove(split.splitId()); - unassignedSplits.add(split); + /** + * This method tracks the discovered splits in a graph and if the graph has inconsistencies, it + * tries to resolve them using DescribeStream calls using the first inconsistent node found in + * the split graph. + * + * @param discoveredSplits splits discovered after calling DescribeStream at the start of the + * application or periodically. + */ + private SplitGraphInconsistencyTracker trackSplitsAndResolveInconsistencies( + ListShardsResult discoveredSplits) { + SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = + new SplitGraphInconsistencyTracker(); + splitGraphInconsistencyTracker.addNodes(discoveredSplits.getShards()); + + boolean streamDisabled = discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED); + int describeStreamInconsistencyResolutionCount = + sourceConfig.get(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT); + for (int i = 0; + i < describeStreamInconsistencyResolutionCount + && !streamDisabled + && splitGraphInconsistencyTracker.inconsistencyDetected(); + i++) { + ListShardsResult shardsToResolveInconsistencies = Review Comment: Can we add a logline for this to indicate we are retrying? For log level, thinking `WARN` might be sufficient ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); + public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) { + throw new UnsupportedOperationException("Partial recovery is not supported"); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); + } + } + + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { + splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + splitAssignment + .get(subtaskId) + .removeIf( + split -> + splitsFinishedEvent + .getFinishedSplitIds() + .contains(split.splitId())); + assignSplits(); + } + + private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { + if (throwable != null) { + throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); + } + + SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = + trackSplitsAndResolveInconsistencies(discoveredSplits); + + if (splitGraphInconsistencyTracker.inconsistencyDetected()) { + LOG.error( + "There are inconsistencies in DescribeStream which we were not able to resolve. First leaf node on which inconsistency was detected:" + + splitGraphInconsistencyTracker.getEarliestClosedLeafNode()); + return; + } + + splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes()); + splitTracker.removeSplits( + splitGraphInconsistencyTracker.getNodes().stream() + .map(Shard::shardId) + .collect(Collectors.toSet())); Review Comment: This reads weird - is there a reason we `addSplits` then `removeSplits` for the same splits immediately after? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -162,76 +237,16 @@ private List<DynamoDbStreamsShardSplit> initialDiscoverSplits() { * * @return list of discovered splits */ - private List<DynamoDbStreamsShardSplit> periodicallyDiscoverSplits() { - List<Shard> shards = streamProxy.listShards(streamArn, lastSeenShardId); - - // Any shard discovered after the initial startup should be read from the start, since they - // come from resharding - return mapToSplits(shards, InitialPosition.TRIM_HORIZON); - } - - private List<DynamoDbStreamsShardSplit> mapToSplits( - List<Shard> shards, InitialPosition initialPosition) { - StartingPosition startingPosition; - switch (initialPosition) { - case LATEST: - startingPosition = StartingPosition.latest(); - break; - case TRIM_HORIZON: - default: - startingPosition = StartingPosition.fromStart(); - } - - List<DynamoDbStreamsShardSplit> splits = new ArrayList<>(); - for (Shard shard : shards) { - splits.add(new DynamoDbStreamsShardSplit(streamArn, shard.shardId(), startingPosition)); - } - - return splits; - } - - /** - * This method assigns a given set of DynamoDb Streams splits to the readers currently - * registered on the cluster. This assignment is done via a side-effect on the {@link - * SplitEnumeratorContext} object. - * - * @param discoveredSplits list of discovered splits - * @param throwable thrown when discovering splits. Will be null if no throwable thrown. - */ - private void assignSplits( - List<DynamoDbStreamsShardSplit> discoveredSplits, Throwable throwable) { - if (throwable != null) { - throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); - } - - if (context.registeredReaders().size() < context.currentParallelism()) { - LOG.info( - "Insufficient registered readers, skipping assignment of discovered splits until all readers are registered. Required number of readers: {}, Registered readers: {}", - context.currentParallelism(), - context.registeredReaders().size()); - unassignedSplits.addAll(discoveredSplits); - return; - } - - Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments = new HashMap<>(); - for (DynamoDbStreamsShardSplit split : unassignedSplits) { - assignSplitToSubtask(split, newSplitAssignments); - } - unassignedSplits.clear(); - for (DynamoDbStreamsShardSplit split : discoveredSplits) { - assignSplitToSubtask(split, newSplitAssignments); - } - - updateLastSeenShardId(discoveredSplits); - updateSplitAssignment(newSplitAssignments); - context.assignSplits(new SplitsAssignment<>(newSplitAssignments)); + private ListShardsResult periodicallyDiscoverSplits() { Review Comment: This method is now identical to `initialDiscoverSplits()`. Should we just combine the two methods? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); + public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) { + throw new UnsupportedOperationException("Partial recovery is not supported"); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); + } + } + + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { + splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + splitAssignment + .get(subtaskId) + .removeIf( + split -> + splitsFinishedEvent + .getFinishedSplitIds() + .contains(split.splitId())); + assignSplits(); + } + + private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { + if (throwable != null) { + throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); + } + + SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = + trackSplitsAndResolveInconsistencies(discoveredSplits); + + if (splitGraphInconsistencyTracker.inconsistencyDetected()) { + LOG.error( + "There are inconsistencies in DescribeStream which we were not able to resolve. First leaf node on which inconsistency was detected:" + + splitGraphInconsistencyTracker.getEarliestClosedLeafNode()); + return; + } + + splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes()); + splitTracker.removeSplits( + splitGraphInconsistencyTracker.getNodes().stream() + .map(Shard::shardId) + .collect(Collectors.toSet())); + if (context.registeredReaders().size() < context.currentParallelism()) { + LOG.info( + "Insufficient registered readers, skipping assignment of discovered splits until all readers are registered. Required number of readers: {}, registered readers: {}", + context.currentParallelism(), + context.registeredReaders().size()); return; } + assignSplits(); + } - for (DynamoDbStreamsShardSplit split : splits) { - splitAssignment.get(subtaskId).remove(split); - assignedSplitIds.remove(split.splitId()); - unassignedSplits.add(split); + /** + * This method tracks the discovered splits in a graph and if the graph has inconsistencies, it + * tries to resolve them using DescribeStream calls using the first inconsistent node found in + * the split graph. + * + * @param discoveredSplits splits discovered after calling DescribeStream at the start of the + * application or periodically. + */ + private SplitGraphInconsistencyTracker trackSplitsAndResolveInconsistencies( + ListShardsResult discoveredSplits) { Review Comment: Can we move this logic to resolve inconsistencies into the `periodicallyDiscoverSplits()` thread instead? This would move it into a non-blocking thread pool instead of doing the resolution here in the main thread. ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -143,17 +217,18 @@ public void addReader(int subtaskId) { @Override public DynamoDbStreamsSourceEnumeratorState snapshotState(long checkpointId) throws Exception { Review Comment: This method is not covered in unit tests ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java: ########## @@ -62,7 +61,7 @@ public DynamoDbStreamsProxy( } @Override - public List<Shard> listShards(String streamArn, @Nullable String lastSeenShardId) { + public ListShardsResult listShards(String streamArn, @Nullable String lastSeenShardId) { Review Comment: can we remove the lastSeenShardId field here? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitGraphInconsistencyTracker.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator.tracker; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +/** + * Class to track the state of shard graph created as part of describestream operation. This will + * track for inconsistent shards returned due to describestream operation. Caller will have to call + * describestream again with the chronologically first leaf node to resolve inconsistencies. + */ +public class SplitGraphInconsistencyTracker { + private final TreeSet<String> closedLeafNodes; + private final Map<String, Shard> nodes; + + public SplitGraphInconsistencyTracker() { + nodes = new HashMap<>(); + closedLeafNodes = new TreeSet<>(); + } + + public void addNodes(List<Shard> shards) { + for (Shard shard : shards) { + addNode(shard); + } + } + + private void addNode(Shard shard) { + nodes.put(shard.shardId(), shard); + if (shard.sequenceNumberRange().endingSequenceNumber() != null) { + closedLeafNodes.add(shard.shardId()); + } + if (shard.parentShardId() != null) { + closedLeafNodes.remove(shard.parentShardId()); + } Review Comment: Doesn't this presuppose an order in which the shards are being read in? For example, if we have tree like below: ``` 0 / \ 1 2 ``` We must read in order of `[0,1,2]`. If we read in order of `[1,2,0]`, `0` would be flagged as a `closedLeafNode`. Could we instead track 2x set for `closedNodes` and `parentNodes` and do the resolution after all nodes have been added? That way we remove the dependency of the read order of nodes. ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java: ########## @@ -203,6 +218,50 @@ private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerCo return new DynamoDbStreamsProxy(dynamoDbStreamsClient, httpClient); } + private DynamoDbStreamsProxy createDynamoDbStreamsProxyWithRetries( + Configuration consumerConfig) { + SdkHttpClient httpClient = + AWSGeneralUtil.createSyncHttpClient( + AttributeMap.builder().build(), ApacheHttpClient.builder()); + + Properties dynamoDbStreamsClientProperties = new Properties(); + String region = + AWSGeneralUtil.getRegionFromArn(streamArn) + .orElseThrow( + () -> + new IllegalStateException( + "Unable to determine region from stream arn")); + dynamoDbStreamsClientProperties.put(AWSConfigConstants.AWS_REGION, region); + consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties); + + AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties); + int maxDescribeStreamCallAttempts = + sourceConfig.getInteger(DESCRIBE_STREAM_RETRY_CALL_COUNT); + Duration minDescribeStreamDelay = sourceConfig.get(DESCRIBE_STREAM_EXPONENTIAL_DELAY_MIN); + Duration maxDescribeStreamDelay = sourceConfig.get(DESCRIBE_STREAM_EXPONENTIAL_DELAY_MAX); + BackoffStrategy backoffStrategy = + BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay); + AdaptiveRetryStrategy adaptiveRetryStrategy = + AdaptiveRetryStrategy.builder() + .maxAttempts(maxDescribeStreamCallAttempts) + .backoffStrategy(backoffStrategy) + .throttlingBackoffStrategy(backoffStrategy) + .build(); Review Comment: Can we create a JIRA to track migration of this to `flink-connector-aws-base`? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java: ########## @@ -203,6 +218,50 @@ private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerCo return new DynamoDbStreamsProxy(dynamoDbStreamsClient, httpClient); } + private DynamoDbStreamsProxy createDynamoDbStreamsProxyWithRetries( + Configuration consumerConfig) { + SdkHttpClient httpClient = + AWSGeneralUtil.createSyncHttpClient( + AttributeMap.builder().build(), ApacheHttpClient.builder()); + + Properties dynamoDbStreamsClientProperties = new Properties(); + String region = + AWSGeneralUtil.getRegionFromArn(streamArn) + .orElseThrow( + () -> + new IllegalStateException( + "Unable to determine region from stream arn")); + dynamoDbStreamsClientProperties.put(AWSConfigConstants.AWS_REGION, region); + consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties); + + AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties); + int maxDescribeStreamCallAttempts = + sourceConfig.getInteger(DESCRIBE_STREAM_RETRY_CALL_COUNT); Review Comment: you can just do `sourceConfig.get()` here. ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); + public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) { + throw new UnsupportedOperationException("Partial recovery is not supported"); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); + } + } + + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { + splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + splitAssignment + .get(subtaskId) + .removeIf( + split -> + splitsFinishedEvent + .getFinishedSplitIds() + .contains(split.splitId())); + assignSplits(); + } + + private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { + if (throwable != null) { + throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); + } + + SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = + trackSplitsAndResolveInconsistencies(discoveredSplits); + + if (splitGraphInconsistencyTracker.inconsistencyDetected()) { + LOG.error( + "There are inconsistencies in DescribeStream which we were not able to resolve. First leaf node on which inconsistency was detected:" + + splitGraphInconsistencyTracker.getEarliestClosedLeafNode()); + return; + } + + splitTracker.addSplits(splitGraphInconsistencyTracker.getNodes()); + splitTracker.removeSplits( + splitGraphInconsistencyTracker.getNodes().stream() + .map(Shard::shardId) + .collect(Collectors.toSet())); + if (context.registeredReaders().size() < context.currentParallelism()) { + LOG.info( + "Insufficient registered readers, skipping assignment of discovered splits until all readers are registered. Required number of readers: {}, registered readers: {}", + context.currentParallelism(), + context.registeredReaders().size()); return; } + assignSplits(); + } - for (DynamoDbStreamsShardSplit split : splits) { - splitAssignment.get(subtaskId).remove(split); - assignedSplitIds.remove(split.splitId()); - unassignedSplits.add(split); + /** + * This method tracks the discovered splits in a graph and if the graph has inconsistencies, it + * tries to resolve them using DescribeStream calls using the first inconsistent node found in + * the split graph. + * + * @param discoveredSplits splits discovered after calling DescribeStream at the start of the + * application or periodically. + */ + private SplitGraphInconsistencyTracker trackSplitsAndResolveInconsistencies( + ListShardsResult discoveredSplits) { + SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = + new SplitGraphInconsistencyTracker(); + splitGraphInconsistencyTracker.addNodes(discoveredSplits.getShards()); + + boolean streamDisabled = discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED); + int describeStreamInconsistencyResolutionCount = + sourceConfig.get(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT); + for (int i = 0; + i < describeStreamInconsistencyResolutionCount + && !streamDisabled Review Comment: Who do we skip inconsistency resolution if the stream is disabled? Could there be a situation when the stream has been disabled but user is using it for backfill? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java: ########## @@ -203,6 +218,50 @@ private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerCo return new DynamoDbStreamsProxy(dynamoDbStreamsClient, httpClient); } + private DynamoDbStreamsProxy createDynamoDbStreamsProxyWithRetries( + Configuration consumerConfig) { + SdkHttpClient httpClient = + AWSGeneralUtil.createSyncHttpClient( + AttributeMap.builder().build(), ApacheHttpClient.builder()); + + Properties dynamoDbStreamsClientProperties = new Properties(); + String region = + AWSGeneralUtil.getRegionFromArn(streamArn) + .orElseThrow( + () -> + new IllegalStateException( + "Unable to determine region from stream arn")); + dynamoDbStreamsClientProperties.put(AWSConfigConstants.AWS_REGION, region); + consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties); + + AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties); + int maxDescribeStreamCallAttempts = + sourceConfig.getInteger(DESCRIBE_STREAM_RETRY_CALL_COUNT); + Duration minDescribeStreamDelay = sourceConfig.get(DESCRIBE_STREAM_EXPONENTIAL_DELAY_MIN); + Duration maxDescribeStreamDelay = sourceConfig.get(DESCRIBE_STREAM_EXPONENTIAL_DELAY_MAX); + BackoffStrategy backoffStrategy = + BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay); + AdaptiveRetryStrategy adaptiveRetryStrategy = + AdaptiveRetryStrategy.builder() + .maxAttempts(maxDescribeStreamCallAttempts) + .backoffStrategy(backoffStrategy) + .throttlingBackoffStrategy(backoffStrategy) + .build(); Review Comment: Let's create a followup JIRA to track moving this to `flink-connector-aws-base` ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorStateSerializer.java: ########## @@ -93,30 +86,25 @@ public DynamoDbStreamsSourceEnumeratorState deserialize( + ". Serializer version is " + getVersion()); } + final int numKnownSplits = in.readInt(); + final int splitSerializerVersion = in.readInt(); - String lastSeenShardId = null; - - final boolean hasLastSeenShardId = in.readBoolean(); - if (hasLastSeenShardId) { - lastSeenShardId = in.readUTF(); - } + List<DynamoDBStreamsShardSplitWithAssignmentStatus> knownSplits = + new ArrayList<>(numKnownSplits); - final int numUnassignedSplits = in.readInt(); - final int splitSerializerVersion = in.readInt(); - if (splitSerializerVersion != splitSerializer.getVersion()) { - throw new VersionMismatchException( - "Trying to deserialize DynamoDbStreamsShardSplit serialized with unsupported version " - + splitSerializerVersion - + ". Serializer version is " - + splitSerializer.getVersion()); - } - Set<DynamoDbStreamsShardSplit> unassignedSplits = new HashSet<>(numUnassignedSplits); - for (int i = 0; i < numUnassignedSplits; i++) { + for (int i = 0; i < numKnownSplits; i++) { int serializedLength = in.readInt(); byte[] serializedSplit = new byte[serializedLength]; if (in.read(serializedSplit) != -1) { - unassignedSplits.add( - splitSerializer.deserialize(splitSerializerVersion, serializedSplit)); + DynamoDbStreamsShardSplit deserializedSplit = + splitSerializer.deserialize(splitSerializerVersion, serializedSplit); + SplitAssignmentStatus assignmentStatus = SplitAssignmentStatus.UNASSIGNED; + if (version == CURRENT_VERSION) { + assignmentStatus = SplitAssignmentStatus.fromStatusCode(in.readInt()); + } Review Comment: We don't need this check for `CURRENT_VERSION` because we only have 1 version as of now. Let's simplify this code. ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator.tracker; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED; + +/** This class is used to track splits and will be used to assign any unassigned splits. */ +@Internal +public class SplitTracker { + private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new ConcurrentHashMap<>(); + private final Set<String> assignedSplits = new HashSet<>(); + private final Set<String> finishedSplits = new HashSet<>(); + private final String streamArn; + private final InitialPosition initialPosition; + + public SplitTracker(String streamArn, InitialPosition initialPosition) { + this(Collections.emptyList(), streamArn, initialPosition); + } + + public SplitTracker( + List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState, + String streamArn, + DynamodbStreamsSourceConfigConstants.InitialPosition initialPosition) { + this.streamArn = streamArn; + this.initialPosition = initialPosition; + initialState.forEach( + splitWithStatus -> { + DynamoDbStreamsShardSplit currentSplit = splitWithStatus.split(); + knownSplits.put(currentSplit.splitId(), currentSplit); + + if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) { + assignedSplits.add(splitWithStatus.split().splitId()); + } + if (FINISHED.equals(splitWithStatus.assignmentStatus())) { + finishedSplits.add(splitWithStatus.split().splitId()); + } + }); + } + + /** + * Add newly discovered splits to tracker. + * + * @param shardsToAdd collection of splits to add to tracking + */ + public void addSplits(Collection<Shard> shardsToAdd) { + List<DynamoDbStreamsShardSplit> newSplitsToAdd = + determineNewShardsToBePutForAssignment(shardsToAdd); + newSplitsToAdd.forEach( + split -> { + knownSplits.put(split.splitId(), split); + }); + } + + private List<Shard> getOpenShards(Collection<Shard> shards) { + List<Shard> openShards = new ArrayList<>(); + for (Shard shard : shards) { + if (shard.sequenceNumberRange().endingSequenceNumber() == null) { + openShards.add(shard); + } + } + return openShards; + } + + private Map<String, Shard> getShardIdToShardMap(Collection<Shard> shards) { + Map<String, Shard> shardIdToShardMap = new HashMap<>(); + for (Shard shard : shards) { + shardIdToShardMap.put(shard.shardId(), shard); + } + return shardIdToShardMap; + } + + /** + * This function finds the open shards returned from describeStream operation and adds them + * along with their parents if parents are not already tracked to the tracked splits + * + * <p>This is needed because describestream has an inconsistency where for example if a shard s + * is split into s1 and s2, in one describestream operation, its possible that only one of s1 + * and s2 is returned. + * + * <p>We will go up the shard lineage until we find a parent shard which is not yet tracked by + * SplitTracker If no ancestor is tracked, the first ancestor will be read from the initial + * position configured and all its descendants will be read from TRIM_HORIZON + * + * @param shards the shards returned from DescribeStream operation + * @return list of {@link DynamoDbStreamsShardSplit} which will be put to tracked splits + */ + private List<DynamoDbStreamsShardSplit> determineNewShardsToBePutForAssignment( Review Comment: This is a pretty complicated. Let's first try to understand how the DescribeStream API can fail, and see if we can come up with something more elegant ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator.tracker; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED; + +/** This class is used to track splits and will be used to assign any unassigned splits. */ Review Comment: nit: Let's add some comments here to indicate that this class is here to maintain parent-child shard ordering ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator.tracker; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED; + +/** This class is used to track splits and will be used to assign any unassigned splits. */ +@Internal +public class SplitTracker { + private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new ConcurrentHashMap<>(); + private final Set<String> assignedSplits = new HashSet<>(); + private final Set<String> finishedSplits = new HashSet<>(); + private final String streamArn; + private final InitialPosition initialPosition; + + public SplitTracker(String streamArn, InitialPosition initialPosition) { + this(Collections.emptyList(), streamArn, initialPosition); + } + + public SplitTracker( + List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState, + String streamArn, + DynamodbStreamsSourceConfigConstants.InitialPosition initialPosition) { + this.streamArn = streamArn; + this.initialPosition = initialPosition; + initialState.forEach( + splitWithStatus -> { + DynamoDbStreamsShardSplit currentSplit = splitWithStatus.split(); + knownSplits.put(currentSplit.splitId(), currentSplit); + + if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) { + assignedSplits.add(splitWithStatus.split().splitId()); + } + if (FINISHED.equals(splitWithStatus.assignmentStatus())) { + finishedSplits.add(splitWithStatus.split().splitId()); + } + }); + } + + /** + * Add newly discovered splits to tracker. + * + * @param shardsToAdd collection of splits to add to tracking + */ + public void addSplits(Collection<Shard> shardsToAdd) { + List<DynamoDbStreamsShardSplit> newSplitsToAdd = + determineNewShardsToBePutForAssignment(shardsToAdd); + newSplitsToAdd.forEach( + split -> { + knownSplits.put(split.splitId(), split); + }); + } + + private List<Shard> getOpenShards(Collection<Shard> shards) { + List<Shard> openShards = new ArrayList<>(); + for (Shard shard : shards) { + if (shard.sequenceNumberRange().endingSequenceNumber() == null) { + openShards.add(shard); + } + } + return openShards; + } + + private Map<String, Shard> getShardIdToShardMap(Collection<Shard> shards) { + Map<String, Shard> shardIdToShardMap = new HashMap<>(); + for (Shard shard : shards) { + shardIdToShardMap.put(shard.shardId(), shard); + } + return shardIdToShardMap; + } + + /** + * This function finds the open shards returned from describeStream operation and adds them + * along with their parents if parents are not already tracked to the tracked splits + * + * <p>This is needed because describestream has an inconsistency where for example if a shard s + * is split into s1 and s2, in one describestream operation, its possible that only one of s1 + * and s2 is returned. + * + * <p>We will go up the shard lineage until we find a parent shard which is not yet tracked by + * SplitTracker If no ancestor is tracked, the first ancestor will be read from the initial + * position configured and all its descendants will be read from TRIM_HORIZON Review Comment: > If no ancestor is tracked, the first ancestor will be read from the initial position configured and all its descendants will be read from TRIM_HORIZON I think we don't need to care about this, as we either use `LATEST`/`TRIM_HORIZON`. If we are reading from `TRIM_HORIZON`, that is our starting position. If we are reading from `LATEST`, then we just find all the open shards and consume from `LATEST`. It doesn't make sense to consume from parent shards (closed), but `LATEST` ########## flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/ShardGraphTrackerTest.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator.tracker; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShard; +import static org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShardId; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests the {@link SplitGraphInconsistencyTracker} class to verify that it correctly discovers + * inconsistencies within the shard graph. + */ +public class ShardGraphTrackerTest { + @Test + public void testShardGraphTrackerHappyCase() { + List<Shard> shards = + Arrays.asList( + // shards which don't have a parent + generateShard(0, "1400", "1700", null), + generateShard(1, "1500", "1800", null), + // shards produced by rotation of parents + generateShard(2, "1710", null, generateShardId(0)), + generateShard(3, "1520", null, generateShardId(1))); + SplitGraphInconsistencyTracker shardGraphTracker = new SplitGraphInconsistencyTracker(); + shardGraphTracker.addNodes(shards); + assertThat(shardGraphTracker.inconsistencyDetected()).isFalse(); + assertThat(shardGraphTracker.getNodes()).containsExactlyInAnyOrderElementsOf(shards); Review Comment: Can we validate the behavior of other public APIs like `getEarliestClosedLeafNode()`? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java: ########## @@ -174,6 +184,13 @@ private String getShardIterator( shardId, streamArn); return null; + } catch (TrimmedDataAccessException e) { + LOG.info( + "Received TrimmedDataAccessException. " + + "Shard {} of stream {} is no longer valid, marking it as complete.", + shardId, + streamArn); + return null; Review Comment: Is this true? Do we know if the record can be expired even though the shard has not been trimmed? What distinguishes `TrimmedDataAccessException` from `ResourceNotFoundException`? Why do we handle this case separately from the `GetRecords` case for `TrimmedDataAccessException`? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/metrics/DynamoDbStreamsShardMetrics.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.metrics; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.arns.Arn; + +/** A utility class for handling dynamodb streams shard metrics. */ +@Internal +public class DynamoDbStreamsShardMetrics { + private static final Logger log = LoggerFactory.getLogger(DynamoDbStreamsShardMetrics.class); + private final MetricGroup metricGroup; + private final DynamoDbStreamsShardSplit shardInfo; + private volatile long millisBehindLatest = -1; + + public DynamoDbStreamsShardMetrics( + DynamoDbStreamsShardSplit shardInfo, MetricGroup rootMetricGroup) { + this.shardInfo = shardInfo; + Arn streamArn = Arn.fromString(shardInfo.getStreamArn()); + this.metricGroup = + rootMetricGroup + .addGroup(MetricConstants.DYNAMODB_STREAMS_SOURCE_METRIC_GROUP) + .addGroup(MetricConstants.KINEIS_ANALYTICS_METRIC_GROUP) Review Comment: `KINESIS_ANALYTICS_METRIC_GROUP` should not be added to the public source. ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator.tracker; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED; + +/** This class is used to track splits and will be used to assign any unassigned splits. */ +@Internal +public class SplitTracker { + private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new ConcurrentHashMap<>(); + private final Set<String> assignedSplits = new HashSet<>(); + private final Set<String> finishedSplits = new HashSet<>(); + private final String streamArn; + private final InitialPosition initialPosition; + + public SplitTracker(String streamArn, InitialPosition initialPosition) { + this(Collections.emptyList(), streamArn, initialPosition); + } + + public SplitTracker( + List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState, + String streamArn, + DynamodbStreamsSourceConfigConstants.InitialPosition initialPosition) { + this.streamArn = streamArn; + this.initialPosition = initialPosition; + initialState.forEach( + splitWithStatus -> { + DynamoDbStreamsShardSplit currentSplit = splitWithStatus.split(); + knownSplits.put(currentSplit.splitId(), currentSplit); + + if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) { + assignedSplits.add(splitWithStatus.split().splitId()); + } + if (FINISHED.equals(splitWithStatus.assignmentStatus())) { + finishedSplits.add(splitWithStatus.split().splitId()); + } + }); + } + + /** + * Add newly discovered splits to tracker. + * + * @param shardsToAdd collection of splits to add to tracking + */ + public void addSplits(Collection<Shard> shardsToAdd) { + List<DynamoDbStreamsShardSplit> newSplitsToAdd = + determineNewShardsToBePutForAssignment(shardsToAdd); + newSplitsToAdd.forEach( + split -> { + knownSplits.put(split.splitId(), split); + }); + } + + private List<Shard> getOpenShards(Collection<Shard> shards) { + List<Shard> openShards = new ArrayList<>(); + for (Shard shard : shards) { + if (shard.sequenceNumberRange().endingSequenceNumber() == null) { + openShards.add(shard); + } + } + return openShards; + } + + private Map<String, Shard> getShardIdToShardMap(Collection<Shard> shards) { + Map<String, Shard> shardIdToShardMap = new HashMap<>(); + for (Shard shard : shards) { + shardIdToShardMap.put(shard.shardId(), shard); + } + return shardIdToShardMap; + } + + /** + * This function finds the open shards returned from describeStream operation and adds them + * along with their parents if parents are not already tracked to the tracked splits + * + * <p>This is needed because describestream has an inconsistency where for example if a shard s + * is split into s1 and s2, in one describestream operation, its possible that only one of s1 + * and s2 is returned. + * + * <p>We will go up the shard lineage until we find a parent shard which is not yet tracked by + * SplitTracker If no ancestor is tracked, the first ancestor will be read from the initial + * position configured and all its descendants will be read from TRIM_HORIZON + * + * @param shards the shards returned from DescribeStream operation + * @return list of {@link DynamoDbStreamsShardSplit} which will be put to tracked splits + */ + private List<DynamoDbStreamsShardSplit> determineNewShardsToBePutForAssignment( + Collection<Shard> shards) { + Map<String, Shard> shardIdToShardMap = getShardIdToShardMap(shards); + List<Shard> openShards = getOpenShards(shards); + List<DynamoDbStreamsShardSplit> newSplitsToBeTracked = new ArrayList<>(); + Map<String, Boolean> memoizationContext = new HashMap<>(); + for (Shard openShard : openShards) { + String shardId = openShard.shardId(); + if (!knownSplits.containsKey(shardId)) { + boolean isDescendant = + checkIfShardIsDescendantAndAddAncestorsToBeTracked( + openShard.shardId(), + shardIdToShardMap, + newSplitsToBeTracked, + memoizationContext); + if (isDescendant) { + newSplitsToBeTracked.add(mapToSplit(openShard, TRIM_HORIZON)); + } else { + newSplitsToBeTracked.add(mapToSplit(openShard, initialPosition)); + } + } + } + return newSplitsToBeTracked; + } + + /** + * Check if any ancestor shard of the current shard has not been tracked yet. Take this example: + * 0->3->8, 0->4->9, 1->5, 1->6, 2->7 + * + * <p>At epoch 1, the lineage looked like this due to describestream inconsistency 0->3, 1->5, + * 1->6, 2->7 knownSplits = 0,1,2,3,5,6,7 After a few describestream calls, at epoch 2, after + * the whole lineage got discovered, since 4 was not tracked, we should start tracking 4 also. + * knownSplits = 0,1,2,3,4,5,6,7,8,9. + */ + private boolean checkIfShardIsDescendantAndAddAncestorsToBeTracked( + String shardId, + Map<String, Shard> shardIdToShardMap, + List<DynamoDbStreamsShardSplit> newSplitsToBeTracked, + Map<String, Boolean> memoizationContext) { + Boolean previousValue = memoizationContext.get(shardId); + if (previousValue != null) { + return previousValue; + } + + if (shardId != null && shardIdToShardMap.containsKey(shardId)) { + if (knownSplits.containsKey(shardId)) { + return true; + } else { + Shard shard = shardIdToShardMap.get(shardId); + String parentShardId = shard.parentShardId(); + boolean isParentShardDescendant = + checkIfShardIsDescendantAndAddAncestorsToBeTracked( + parentShardId, + shardIdToShardMap, + newSplitsToBeTracked, + memoizationContext); + if (shardIdToShardMap.containsKey(parentShardId)) { + if (!knownSplits.containsKey(parentShardId)) { + Shard parentShard = shardIdToShardMap.get(parentShardId); + if (isParentShardDescendant) { + newSplitsToBeTracked.add(mapToSplit(parentShard, TRIM_HORIZON)); + } else { + newSplitsToBeTracked.add(mapToSplit(parentShard, initialPosition)); + } + return true; + } + } + } + } + + return false; + } + + private DynamoDbStreamsShardSplit mapToSplit( + Shard shard, DynamodbStreamsSourceConfigConstants.InitialPosition initialPosition) { + StartingPosition startingPosition; + switch (initialPosition) { + case LATEST: + startingPosition = StartingPosition.latest(); + break; + case TRIM_HORIZON: + default: + startingPosition = StartingPosition.fromStart(); + } + + Set<String> parentShardIds = new HashSet<>(); + if (shard.parentShardId() != null) { + parentShardIds.add(shard.parentShardId()); + } + return new DynamoDbStreamsShardSplit( + streamArn, shard.shardId(), startingPosition, parentShardIds); + } + + /** + * Mark splits as assigned. Assigned splits will no longer be returned as pending splits. + * + * @param splitsToAssign collection of splits to mark as assigned + */ + public void markAsAssigned(Collection<DynamoDbStreamsShardSplit> splitsToAssign) { + splitsToAssign.forEach(split -> assignedSplits.add(split.splitId())); + } + + /** + * Mark splits as finished. Assigned splits will no longer be returned as pending splits. + * + * @param splitsToFinish collection of splits to mark as assigned + */ + public void markAsFinished(Collection<String> splitsToFinish) { + splitsToFinish.forEach( + splitId -> { + finishedSplits.add(splitId); + assignedSplits.remove(splitId); + }); + } + + public boolean isAssigned(String splitId) { + return assignedSplits.contains(splitId); + } + + /** + * Since we never put an inconsistent shard lineage to splitTracker, so if a shard's parent is + * not there, that means that that should already be cleaned up. + */ + public List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment() { + return knownSplits.values().stream() + .filter( + split -> { + boolean splitIsNotAssigned = !isAssigned(split.splitId()); + return splitIsNotAssigned + && !isFinished(split.splitId()) + && (verifyAllParentSplitsAreFinished(split) + || verifyAllParentSplitsAreCleanedUp(split)); Review Comment: Can we push down this logic into a single check? What if one parent is finished and one is cleanedup? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator.tracker; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED; + +/** This class is used to track splits and will be used to assign any unassigned splits. */ +@Internal +public class SplitTracker { + private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new ConcurrentHashMap<>(); + private final Set<String> assignedSplits = new HashSet<>(); + private final Set<String> finishedSplits = new HashSet<>(); Review Comment: Why do we need to track finished splits at all? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java: ########## @@ -93,6 +92,13 @@ public GetRecordsResponse getRecords( shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator()); } return getRecordsResponse; + } catch (TrimmedDataAccessException e) { + shardIterator = getShardIterator(streamArn, shardId, StartingPosition.fromStart()); + GetRecordsResponse getRecordsResponse = getRecords(shardIterator); + if (getRecordsResponse.nextShardIterator() != null) { + shardIdToIteratorStore.put(shardId, getRecordsResponse.nextShardIterator()); + } + return getRecordsResponse; Review Comment: Let's add a comment to explain that `TrimmedDataAccessException` means that that iterator requested points to a record that has expired. ########## flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/ShardGraphTrackerTest.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator.tracker; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShard; +import static org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShardId; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests the {@link SplitGraphInconsistencyTracker} class to verify that it correctly discovers + * inconsistencies within the shard graph. + */ +public class ShardGraphTrackerTest { + @Test + public void testShardGraphTrackerHappyCase() { + List<Shard> shards = + Arrays.asList( + // shards which don't have a parent + generateShard(0, "1400", "1700", null), + generateShard(1, "1500", "1800", null), + // shards produced by rotation of parents + generateShard(2, "1710", null, generateShardId(0)), + generateShard(3, "1520", null, generateShardId(1))); + SplitGraphInconsistencyTracker shardGraphTracker = new SplitGraphInconsistencyTracker(); + shardGraphTracker.addNodes(shards); + assertThat(shardGraphTracker.inconsistencyDetected()).isFalse(); + assertThat(shardGraphTracker.getNodes()).containsExactlyInAnyOrderElementsOf(shards); + } + + @Test + public void testShardGraphTrackerDetectsInconsistencies() { + List<Shard> shards = + Arrays.asList( + // shards which don't have a parent + generateShard(0, "1400", "1700", null), + generateShard(1, "1500", "1800", null), + // shards produced by rotation of parents + generateShard(2, "1710", null, generateShardId(0))); Review Comment: Can we put in a later `closedLeafNode` so that we can test that the earlier one is detected? Best in a new test ########## flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java: ########## @@ -56,17 +58,19 @@ public static class TestDynamoDbStreamsProxy implements StreamProxy { // List shards configuration private final List<Shard> shards = new ArrayList<>(); + private final Instant creationTimestamp = Instant.now(); private Supplier<Exception> listShardsExceptionSupplier; - private boolean shouldRespectLastSeenShardId = true; + private boolean shouldRespectLastSeenShardId = false; Review Comment: we can just remove support for this ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -116,24 +118,96 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); + public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) { + throw new UnsupportedOperationException("Partial recovery is not supported"); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); + } Review Comment: This is not unit tested! ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java: ########## @@ -93,8 +100,19 @@ public DynamoDbStreamsShardSplit deserialize(int version, byte[] serialized) } } + Set<String> parentShardIds = new HashSet<>(); + if (version == CURRENT_VERSION) { Review Comment: This check is already done on line 83 ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitGraphInconsistencyTracker.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator.tracker; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +/** + * Class to track the state of shard graph created as part of describestream operation. This will + * track for inconsistent shards returned due to describestream operation. Caller will have to call + * describestream again with the chronologically first leaf node to resolve inconsistencies. + */ +public class SplitGraphInconsistencyTracker { + private final TreeSet<String> closedLeafNodes; + private final Map<String, Shard> nodes; + + public SplitGraphInconsistencyTracker() { + nodes = new HashMap<>(); + closedLeafNodes = new TreeSet<>(); + } + + public void addNodes(List<Shard> shards) { + for (Shard shard : shards) { + addNode(shard); + } + } + + private void addNode(Shard shard) { + nodes.put(shard.shardId(), shard); + if (shard.sequenceNumberRange().endingSequenceNumber() != null) { + closedLeafNodes.add(shard.shardId()); + } + if (shard.parentShardId() != null) { + closedLeafNodes.remove(shard.parentShardId()); + } Review Comment: Alternatively, if we know that this return order is a guarantee of `DDBStreams`, let's make this explicit with some form of unit test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org