karubian commented on code in PR #151:
URL: 
https://github.com/apache/flink-connector-aws/pull/151#discussion_r1694985819


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -116,24 +117,76 @@ public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname
     }
 
     @Override
-    public void addSplitsBack(List<DynamoDbStreamsShardSplit> splits, int 
subtaskId) {
-        if (!splitAssignment.containsKey(subtaskId)) {
-            LOG.warn(
-                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
-                    subtaskId,
-                    splits);
-            return;
+    public void addSplitsBack(List<DynamoDbStreamsShardSplit> list, int i) {
+        throw new UnsupportedOperationException("Partial recovery is not 
supported");
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SplitsFinishedEvent) {
+            handleFinishedSplits(subtaskId, (SplitsFinishedEvent) sourceEvent);
+        }
+    }
+
+    private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent 
splitsFinishedEvent) {
+        splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+        splitAssignment
+                .get(subtaskId)
+                .removeIf(
+                        split ->
+                                splitsFinishedEvent
+                                        .getFinishedSplitIds()
+                                        .contains(split.splitId()));
+        assignSplits();
+    }
+
+    private void processDiscoveredSplits(List<Shard> discoveredSplits, 
Throwable throwable) {
+        if (throwable != null) {
+            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
+        }
+
+        ShardGraphTracker shardGraphTracker = new ShardGraphTracker();
+        shardGraphTracker.addNodes(discoveredSplits);
+
+        StreamDescription streamDescription = 
streamProxy.getStreamDescription(streamArn, null);
+        boolean streamDisabled = 
streamDescription.streamStatus().equals(StreamStatus.DISABLED);
+
+        for (int i = 0;
+                i < 
sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT)
+                        && !streamDisabled

Review Comment:
   nit: It might be more readable to move or separate these conditions in for 
loop into helper method(s).
   Also, we can set 
`sourceConfig.getInteger(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT) 
`to local variable to not loop on this getter method.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java:
##########
@@ -43,6 +45,15 @@ public enum InitialPosition {
                     .defaultValue(10000L)
                     .withDescription("The interval between each attempt to 
discover new shards.");
 
+    public static final ConfigOption<Integer> 
DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT =
+            
ConfigOptions.key("flink.describestream.inconsistencyresolution.retries")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The number of times to retry build shard lineage 
if describestream returns inconsistent response");
+
+    public static final Duration MIN_DDB_STREAMS_SHARD_RETENTION = 
Duration.ofHours(6);

Review Comment:
   Should this be a config option? 



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -162,75 +216,14 @@ private List<DynamoDbStreamsShardSplit> 
initialDiscoverSplits() {
      *
      * @return list of discovered splits
      */
-    private List<DynamoDbStreamsShardSplit> periodicallyDiscoverSplits() {
-        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
-
-        // Any shard discovered after the initial startup should be read from 
the start, since they
-        // come from resharding
-        return mapToSplits(shards, InitialPosition.TRIM_HORIZON);
-    }
-
-    private List<DynamoDbStreamsShardSplit> mapToSplits(
-            List<Shard> shards, InitialPosition initialPosition) {
-        StartingPosition startingPosition;
-        switch (initialPosition) {
-            case LATEST:
-                startingPosition = StartingPosition.latest();
-                break;
-            case TRIM_HORIZON:
-            default:
-                startingPosition = StartingPosition.fromStart();
-        }
-
-        List<DynamoDbStreamsShardSplit> splits = new ArrayList<>();
-        for (Shard shard : shards) {
-            splits.add(new DynamoDbStreamsShardSplit(streamArn, 
shard.shardId(), startingPosition));
-        }
-
-        return splits;
-    }
-
-    /**
-     * This method assigns a given set of DynamoDb Streams splits to the 
readers currently
-     * registered on the cluster. This assignment is done via a side-effect on 
the {@link
-     * SplitEnumeratorContext} object.
-     *
-     * @param discoveredSplits list of discovered splits
-     * @param throwable thrown when discovering splits. Will be null if no 
throwable thrown.
-     */
-    private void assignSplits(
-            List<DynamoDbStreamsShardSplit> discoveredSplits, Throwable 
throwable) {
-        if (throwable != null) {
-            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
-        }
-
-        if (context.registeredReaders().size() < context.currentParallelism()) 
{
-            LOG.info(
-                    "Insufficient registered readers, skipping assignment of 
discovered splits until all readers are registered. Required number of readers: 
{}, Registered readers: {}",
-                    context.currentParallelism(),
-                    context.registeredReaders().size());
-            unassignedSplits.addAll(discoveredSplits);
-            return;
-        }
-
-        Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments = 
new HashMap<>();
-        for (DynamoDbStreamsShardSplit split : unassignedSplits) {
-            assignSplitToSubtask(split, newSplitAssignments);
-        }
-        unassignedSplits.clear();
-        for (DynamoDbStreamsShardSplit split : discoveredSplits) {
-            assignSplitToSubtask(split, newSplitAssignments);
-        }
-
-        updateLastSeenShardId(discoveredSplits);
-        updateSplitAssignment(newSplitAssignments);
-        context.assignSplits(new SplitsAssignment<>(newSplitAssignments));
+    private List<Shard> periodicallyDiscoverSplits() {
+        return streamProxy.listShards(streamArn, null);
     }
 
     private void assignSplitToSubtask(
             DynamoDbStreamsShardSplit split,
             Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments) 
{
-        if (assignedSplitIds.contains(split.splitId())) {
+        if (splitTracker.isAssigned(split.splitId())) {
             LOG.info(
                     "Skipping assignment of shard {} from stream {} because it 
is already assigned.",

Review Comment:
   Can this log be noisy at large scale? Should we consider making it a DEBUG 
level instead?
   



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.source.enumerator.tracker;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
+import 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.DynamoDBStreamsShardSplitWithAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus;
+import 
org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
+import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON;
+import static 
org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.MIN_DDB_STREAMS_SHARD_RETENTION;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.ASSIGNED;
+import static 
org.apache.flink.connector.dynamodb.source.enumerator.SplitAssignmentStatus.FINISHED;
+
+/** This class is used to track splits and will be used to assign any 
unassigned splits. */
+@Internal
+public class SplitTracker {
+    private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new 
ConcurrentHashMap<>();
+    private final Set<String> assignedSplits = new HashSet<>();
+    private final Set<String> finishedSplits = new HashSet<>();
+    private final String streamArn;
+    private final InitialPosition initialPosition;
+
+    public SplitTracker(String streamArn, InitialPosition initialPosition) {
+        this(Collections.emptyList(), streamArn, initialPosition);
+    }
+
+    public SplitTracker(
+            List<DynamoDBStreamsShardSplitWithAssignmentStatus> initialState,
+            String streamArn,
+            DynamodbStreamsSourceConfigConstants.InitialPosition 
initialPosition) {
+        this.streamArn = streamArn;
+        this.initialPosition = initialPosition;
+        initialState.forEach(
+                splitWithStatus -> {
+                    DynamoDbStreamsShardSplit currentSplit = 
splitWithStatus.split();
+                    knownSplits.put(currentSplit.splitId(), currentSplit);
+
+                    if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {

Review Comment:
   nit: Is it possible to get a null value from `assignmentStatus`? We can 
consider converting the Yoda convention in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to