karubian commented on code in PR #151: URL: https://github.com/apache/flink-connector-aws/pull/151#discussion_r1694985819
########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -116,24 +117,76 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname } @Override - public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int subtaskId) { - if (!splitAssignment.containsKey(subtaskId)) { - LOG.warn( - "Unable to add splits back for subtask {} since it is not assigned any splits. Splits: {}", - subtaskId, - splits); - return; + public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) { + throw new UnsupportedOperationException("Partial recovery is not supported"); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SplitsFinishedEvent) { + handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent); + } + } + + private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { + splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); + splitAssignment + .get(subtaskId) + .removeIf( + split -> + splitsFinishedEvent + .getFinishedSplitIds() + .contains(split.splitId())); + assignSplits(); + } + + private void processDiscoveredSplits(List<Shard> discoveredSplits, Throwable throwable) { + if (throwable != null) { + throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); + } + + ShardGraphTracker shardGraphTracker = new ShardGraphTracker(); + shardGraphTracker.addNodes(discoveredSplits); + + StreamDescription streamDescription = streamProxy.getStreamDescription(streamArn, null); + boolean streamDisabled = streamDescription.streamStatus().equals(StreamStatus.DISABLED); + + for (int i = 0; + i < sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT) + && !streamDisabled Review Comment: nit: It might be more readable to move or separate these conditions in for loop into helper method(s). Also, we can set `sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT) `to local variable to not loop on this getter method. ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java: ########## @@ -43,6 +45,15 @@ public enum InitialPosition { .defaultValue(10000L) .withDescription("The interval between each attempt to discover new shards."); + public static final ConfigOption<Integer> DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT = + ConfigOptions.key("flink.describestream.inconsistencyresolution.retries") + .intType() + .defaultValue(5) + .withDescription( + "The number of times to retry build shard lineage if describestream returns inconsistent response"); + + public static final Duration MIN_DDB_STREAMS_SHARD_RETENTION = Duration.ofHours(6); Review Comment: Should this be a config option? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -162,75 +216,14 @@ private List<DynamoDbStreamsShardSplit> initialDiscoverSplits() { * * @return list of discovered splits */ - private List<DynamoDbStreamsShardSplit> periodicallyDiscoverSplits() { - List<Shard> shards = streamProxy.listShards(streamArn, lastSeenShardId); - - // Any shard discovered after the initial startup should be read from the start, since they - // come from resharding - return mapToSplits(shards, InitialPosition.TRIM_HORIZON); - } - - private List<DynamoDbStreamsShardSplit> mapToSplits( - List<Shard> shards, InitialPosition initialPosition) { - StartingPosition startingPosition; - switch (initialPosition) { - case LATEST: - startingPosition = StartingPosition.latest(); - break; - case TRIM_HORIZON: - default: - startingPosition = StartingPosition.fromStart(); - } - - List<DynamoDbStreamsShardSplit> splits = new ArrayList<>(); - for (Shard shard : shards) { - splits.add(new DynamoDbStreamsShardSplit(streamArn, shard.shardId(), startingPosition)); - } - - return splits; - } - - /** - * This method assigns a given set of DynamoDb Streams splits to the readers currently - * registered on the cluster. This assignment is done via a side-effect on the {@link - * SplitEnumeratorContext} object. - * - * @param discoveredSplits list of discovered splits - * @param throwable thrown when discovering splits. Will be null if no throwable thrown. - */ - private void assignSplits( - List<DynamoDbStreamsShardSplit> discoveredSplits, Throwable throwable) { - if (throwable != null) { - throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); - } - - if (context.registeredReaders().size() < context.currentParallelism()) { - LOG.info( - "Insufficient registered readers, skipping assignment of discovered splits until all readers are registered. Required number of readers: {}, Registered readers: {}", - context.currentParallelism(), - context.registeredReaders().size()); - unassignedSplits.addAll(discoveredSplits); - return; - } - - Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments = new HashMap<>(); - for (DynamoDbStreamsShardSplit split : unassignedSplits) { - assignSplitToSubtask(split, newSplitAssignments); - } - unassignedSplits.clear(); - for (DynamoDbStreamsShardSplit split : discoveredSplits) { - assignSplitToSubtask(split, newSplitAssignments); - } - - updateLastSeenShardId(discoveredSplits); - updateSplitAssignment(newSplitAssignments); - context.assignSplits(new SplitsAssignment<>(newSplitAssignments)); + private List<Shard> periodicallyDiscoverSplits() { + return streamProxy.listShards(streamArn, null); } private void assignSplitToSubtask( DynamoDbStreamsShardSplit split, Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments) { - if (assignedSplitIds.contains(split.splitId())) { + if (splitTracker.isAssigned(split.splitId())) { LOG.info( "Skipping assignment of shard {} from stream {} because it is already assigned.", Review Comment: Can this log be noisy at large scale? Should we consider making it a DEBUG level instead? ########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ########## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.dynamodb.source.enumerator.tracker; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants; +import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition; +import org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus; +import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit; +import org.apache.flink.connector.dynamodb.source.split.StartingPosition; + +import software.amazon.awssdk.services.dynamodb.model.Shard; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON; +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.MIN_DDB_STREAMS_SHARD_RETENTION; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED; +import static org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED; + +/** This class is used to track splits and will be used to assign any unassigned splits. */ +@Internal +public class SplitTracker { + private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new ConcurrentHashMap<>(); + private final Set<String> assignedSplits = new HashSet<>(); + private final Set<String> finishedSplits = new HashSet<>(); + private final String streamArn; + private final InitialPosition initialPosition; + + public SplitTracker(String streamArn, InitialPosition initialPosition) { + this(Collections.emptyList(), streamArn, initialPosition); + } + + public SplitTracker( + List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState, + String streamArn, + DynamodbStreamsSourceConfigConstants.InitialPosition initialPosition) { + this.streamArn = streamArn; + this.initialPosition = initialPosition; + initialState.forEach( + splitWithStatus -> { + DynamoDbStreamsShardSplit currentSplit = splitWithStatus.split(); + knownSplits.put(currentSplit.splitId(), currentSplit); + + if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) { Review Comment: nit: Is it possible to get a null value from `assignmentStatus`? We can consider converting the Yoda convention in that case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org