hlteoh37 commented on code in PR #151: URL: https://github.com/apache/flink-connector-aws/pull/151#discussion_r1726540874
########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java: ########## @@ -40,9 +42,39 @@ public enum InitialPosition { public static final ConfigOption<Long> SHARD_DISCOVERY_INTERVAL_MILLIS = ConfigOptions.key("flink.shard.discovery.intervalmillis") .longType() - .defaultValue(10000L) + .defaultValue(60000L) .withDescription("The interval between each attempt to discover new shards."); + public static final ConfigOption<Integer> DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT = + ConfigOptions.key("flink.describestream.inconsistencyresolution.retries") + .intType() + .defaultValue(5) + .withDescription( + "The number of times to retry build shard lineage if describestream returns inconsistent response"); + + public static final ConfigOption<Integer> DESCRIBE_STREAM_RETRY_CALL_COUNT = + ConfigOptions.key("flink.describestream.numretries") + .intType() + .defaultValue(5) + .withDescription( + "The number of times to retry describestream call if it returns a retriable exception"); Review Comment: nit: `retryable` ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java: ########## @@ -40,9 +42,39 @@ public enum InitialPosition { public static final ConfigOption<Long> SHARD_DISCOVERY_INTERVAL_MILLIS = ConfigOptions.key("flink.shard.discovery.intervalmillis") .longType() - .defaultValue(10000L) + .defaultValue(60000L) .withDescription("The interval between each attempt to discover new shards."); + public static final ConfigOption<Integer> DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT = + ConfigOptions.key("flink.describestream.inconsistencyresolution.retries") + .intType() + .defaultValue(5) + .withDescription( + "The number of times to retry build shard lineage if describestream returns inconsistent response"); + + public static final ConfigOption<Integer> DESCRIBE_STREAM_RETRY_CALL_COUNT = + ConfigOptions.key("flink.describestream.numretries") + .intType() + .defaultValue(5) + .withDescription( + "The number of times to retry describestream call if it returns a retriable exception"); + + public static final ConfigOption<Integer> DESCRIBE_STREAM_EXPONENTIAL_DELAY_MIN = Review Comment: Can we just use the `Duration` type directly rather than use `min` and `intType`? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -116,24 +117,80 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); + public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) { + throw new UnsupportedOperationException("Partial recovery is not supported"); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); + } + } + + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { + splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + splitAssignment + .get(subtaskId) + .removeIf( + split -> + splitsFinishedEvent + .getFinishedSplitIds() + .contains(split.splitId())); + assignSplits(); + } + + private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { + if (throwable != null) { + throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); + } + + ShardGraphTracker shardGraphTracker = new ShardGraphTracker(); + shardGraphTracker.addNodes(discoveredSplits.getShards()); + + boolean streamDisabled = discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED); + int describeStreamInconsistencyResolutionCount = + sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT); + for (int i = 0; + i < describeStreamInconsistencyResolutionCount + && !streamDisabled + && shardGraphTracker.inconsistencyDetected(); + i++) { + ListShardsResult shardsToResolveInconsistencies = + streamProxy.listShards( + streamArn, shardGraphTracker.getEarliestClosedLeafNode()); + shardGraphTracker.addNodes(shardsToResolveInconsistencies.getShards()); + } + + if (shardGraphTracker.inconsistencyDetected()) { + LOG.error( + "There are inconsistencies in describestream which we were not able to resolve. First leaf node on which inconsistency was detected:" + + shardGraphTracker.getEarliestClosedLeafNode()); return; Review Comment: Would be good to add a reason why we return here and not throw and exception. I think we should also mention it on the description of `DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT` and also the connector overview! ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/ShardGraphTracker.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator.tracker; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +/** + * Class to track the state of shard graph created as part of describestream operation. This will + * track for inconsistent shards returned due to describestream operation. Caller will have to call + * describestream again with the chronologically first leaf node to resolve inconsistencies. Review Comment: Q: Should we make this logic part of this class? e.g. rename as `ShardGraphConsistencyDetector`? Not 100% sure of this suggestion. WDYT @gguptp @karubian ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -116,24 +117,80 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); + public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) { + throw new UnsupportedOperationException("Partial recovery is not supported"); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); + } + } + + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { + splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + splitAssignment + .get(subtaskId) + .removeIf( + split -> + splitsFinishedEvent + .getFinishedSplitIds() + .contains(split.splitId())); + assignSplits(); + } + + private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { + if (throwable != null) { + throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); + } + + ShardGraphTracker shardGraphTracker = new ShardGraphTracker(); + shardGraphTracker.addNodes(discoveredSplits.getShards()); + + boolean streamDisabled = discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED); + int describeStreamInconsistencyResolutionCount = + sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT); + for (int i = 0; + i < describeStreamInconsistencyResolutionCount + && !streamDisabled + && shardGraphTracker.inconsistencyDetected(); + i++) { + ListShardsResult shardsToResolveInconsistencies = + streamProxy.listShards( + streamArn, shardGraphTracker.getEarliestClosedLeafNode()); + shardGraphTracker.addNodes(shardsToResolveInconsistencies.getShards()); + } + + if (shardGraphTracker.inconsistencyDetected()) { + LOG.error( + "There are inconsistencies in describestream which we were not able to resolve. First leaf node on which inconsistency was detected:" + + shardGraphTracker.getEarliestClosedLeafNode()); return; } - for (DynamoDbStreamsShardSplit split : splits) { - splitAssignment.get(subtaskId).remove(split); - assignedSplitIds.remove(split.splitId()); - unassignedSplits.add(split); + splitTracker.addSplits(shardGraphTracker.getNodes()); + splitTracker.cleanupGarbageSplits( + shardGraphTracker.getNodes().stream() + .map(shard -> shard.shardId()) + .collect(Collectors.toSet())); Review Comment: nit: should the method just be `removeSplits()`? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java: ########## @@ -40,9 +42,39 @@ public enum InitialPosition { public static final ConfigOption<Long> SHARD_DISCOVERY_INTERVAL_MILLIS = ConfigOptions.key("flink.shard.discovery.intervalmillis") .longType() - .defaultValue(10000L) + .defaultValue(60000L) .withDescription("The interval between each attempt to discover new shards."); + public static final ConfigOption<Integer> DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT = + ConfigOptions.key("flink.describestream.inconsistencyresolution.retries") + .intType() + .defaultValue(5) + .withDescription( + "The number of times to retry build shard lineage if describestream returns inconsistent response"); + + public static final ConfigOption<Integer> DESCRIBE_STREAM_RETRY_CALL_COUNT = + ConfigOptions.key("flink.describestream.numretries") + .intType() + .defaultValue(5) + .withDescription( + "The number of times to retry describestream call if it returns a retriable exception"); + + public static final ConfigOption<Integer> DESCRIBE_STREAM_EXPONENTIAL_DELAY_MIN = + ConfigOptions.key("flink.describestream.backoff.mindelay") + .intType() Review Comment: let's use duration type here ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorStateSerializer.java: ########## @@ -29,15 +29,19 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; /** Used to serialize and deserialize the {@link DynamoDbStreamsSourceEnumeratorState}. */ @Internal public class DynamoDbStreamsSourceEnumeratorStateSerializer implements SimpleVersionedSerializer<DynamoDbStreamsSourceEnumeratorState> { - private static final int CURRENT_VERSION = 0; + private static final Set<Integer> COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1)); Review Comment: we have not actually released this connector - so we don't need to up the serializer version even though we are making backwards incompatible changes. This is different from the KDS connector. Can we keep the `CURRENT_VERSION` as 0? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -116,24 +117,80 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); + public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) { + throw new UnsupportedOperationException("Partial recovery is not supported"); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); + } + } + + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { + splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + splitAssignment + .get(subtaskId) + .removeIf( + split -> + splitsFinishedEvent + .getFinishedSplitIds() + .contains(split.splitId())); + assignSplits(); + } + + private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { + if (throwable != null) { + throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); + } + + ShardGraphTracker shardGraphTracker = new ShardGraphTracker(); + shardGraphTracker.addNodes(discoveredSplits.getShards()); + + boolean streamDisabled = discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED); + int describeStreamInconsistencyResolutionCount = + sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT); + for (int i = 0; + i < describeStreamInconsistencyResolutionCount + && !streamDisabled + && shardGraphTracker.inconsistencyDetected(); + i++) { + ListShardsResult shardsToResolveInconsistencies = + streamProxy.listShards( + streamArn, shardGraphTracker.getEarliestClosedLeafNode()); + shardGraphTracker.addNodes(shardsToResolveInconsistencies.getShards()); + } + + if (shardGraphTracker.inconsistencyDetected()) { + LOG.error( + "There are inconsistencies in describestream which we were not able to resolve. First leaf node on which inconsistency was detected:" + + shardGraphTracker.getEarliestClosedLeafNode()); return; } - for (DynamoDbStreamsShardSplit split : splits) { - splitAssignment.get(subtaskId).remove(split); - assignedSplitIds.remove(split.splitId()); - unassignedSplits.add(split); + splitTracker.addSplits(shardGraphTracker.getNodes()); Review Comment: This is a little confusing - can we name the `splitGraphTracker` something like `splitGraphManager`? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -116,24 +117,80 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); + public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) { + throw new UnsupportedOperationException("Partial recovery is not supported"); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); + } + } + + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { + splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + splitAssignment + .get(subtaskId) + .removeIf( + split -> + splitsFinishedEvent + .getFinishedSplitIds() + .contains(split.splitId())); + assignSplits(); + } + + private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { + if (throwable != null) { + throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); + } + + ShardGraphTracker shardGraphTracker = new ShardGraphTracker(); + shardGraphTracker.addNodes(discoveredSplits.getShards()); + + boolean streamDisabled = discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED); + int describeStreamInconsistencyResolutionCount = + sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT); Review Comment: can we abstract this away to at least a `private` method in this class, or even better a separate class like `DDBStreamConsistencyTracker`? Would make the code for `processDiscoveredSplits` easier to read (and test, if it's in a separate class) ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -162,75 +216,14 @@ private List<DynamoDbStreamsShardSplit> initialDiscoverSplits() { * * @return list of discovered splits */ - private List<DynamoDbStreamsShardSplit> periodicallyDiscoverSplits() { - List<Shard> shards = streamProxy.listShards(streamArn, lastSeenShardId); - - // Any shard discovered after the initial startup should be read from the start, since they - // come from resharding - return mapToSplits(shards, InitialPosition.TRIM_HORIZON); - } - - private List<DynamoDbStreamsShardSplit> mapToSplits( - List<Shard> shards, InitialPosition initialPosition) { - StartingPosition startingPosition; - switch (initialPosition) { - case LATEST: - startingPosition = StartingPosition.latest(); - break; - case TRIM_HORIZON: - default: - startingPosition = StartingPosition.fromStart(); - } - - List<DynamoDbStreamsShardSplit> splits = new ArrayList<>(); - for (Shard shard : shards) { - splits.add(new DynamoDbStreamsShardSplit(streamArn, shard.shardId(), startingPosition)); - } - - return splits; - } - - /** - * This method assigns a given set of DynamoDb Streams splits to the readers currently - * registered on the cluster. This assignment is done via a side-effect on the {@link - * SplitEnumeratorContext} object. - * - * @param discoveredSplits list of discovered splits - * @param throwable thrown when discovering splits. Will be null if no throwable thrown. - */ - private void assignSplits( - List<DynamoDbStreamsShardSplit> discoveredSplits, Throwable throwable) { - if (throwable != null) { - throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); - } - - if (context.registeredReaders().size() < context.currentParallelism()) { - LOG.info( - "Insufficient registered readers, skipping assignment of discovered splits until all readers are registered. Required number of readers: {}, Registered readers: {}", - context.currentParallelism(), - context.registeredReaders().size()); - unassignedSplits.addAll(discoveredSplits); - return; - } - - Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments = new HashMap<>(); - for (DynamoDbStreamsShardSplit split : unassignedSplits) { - assignSplitToSubtask(split, newSplitAssignments); - } - unassignedSplits.clear(); - for (DynamoDbStreamsShardSplit split : discoveredSplits) { - assignSplitToSubtask(split, newSplitAssignments); - } - - updateLastSeenShardId(discoveredSplits); - updateSplitAssignment(newSplitAssignments); - context.assignSplits(new SplitsAssignment<>(newSplitAssignments)); + private List<Shard> periodicallyDiscoverSplits() { + return streamProxy.listShards(streamArn, null); } private void assignSplitToSubtask( DynamoDbStreamsShardSplit split, Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments) { - if (assignedSplitIds.contains(split.splitId())) { + if (splitTracker.isAssigned(split.splitId())) { LOG.info( "Skipping assignment of shard {} from stream {} because it is already assigned.", Review Comment: Do we expect this to be printed constantly, or just when there are data inconsistencies? If this is only printed during data inconsistencies, then we should keep it as `INFO` ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java: ########## @@ -43,6 +45,15 @@ public enum InitialPosition { .defaultValue(10000L) .withDescription("The interval between each attempt to discover new shards."); + public static final ConfigOption<Integer> DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT = + ConfigOptions.key("flink.describestream.inconsistencyresolution.retries") + .intType() + .defaultValue(5) + .withDescription( + "The number of times to retry build shard lineage if describestream returns inconsistent response"); + + public static final Duration MIN_DDB_STREAMS_SHARD_RETENTION = Duration.ofHours(6); Review Comment: Let's add a comment on why this is 6 hours! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org