gguptp commented on code in PR #151:
URL: 
https://github.com/apache/flink-connector-aws/pull/151#discussion_r1698925946


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java:
##########
@@ -162,75 +216,14 @@ private List<DynamoDbStreamsShardSplit> 
initialDiscoverSplits() {
      *
      * @return list of discovered splits
      */
-    private List<DynamoDbStreamsShardSplit> periodicallyDiscoverSplits() {
-        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
-
-        // Any shard discovered after the initial startup should be read from 
the start, since they
-        // come from resharding
-        return mapToSplits(shards, InitialPosition.TRIM_HORIZON);
-    }
-
-    private List<DynamoDbStreamsShardSplit> mapToSplits(
-            List<Shard> shards, InitialPosition initialPosition) {
-        StartingPosition startingPosition;
-        switch (initialPosition) {
-            case LATEST:
-                startingPosition = StartingPosition.latest();
-                break;
-            case TRIM_HORIZON:
-            default:
-                startingPosition = StartingPosition.fromStart();
-        }
-
-        List<DynamoDbStreamsShardSplit> splits = new ArrayList<>();
-        for (Shard shard : shards) {
-            splits.add(new DynamoDbStreamsShardSplit(streamArn, 
shard.shardId(), startingPosition));
-        }
-
-        return splits;
-    }
-
-    /**
-     * This method assigns a given set of DynamoDb Streams splits to the 
readers currently
-     * registered on the cluster. This assignment is done via a side-effect on 
the {@link
-     * SplitEnumeratorContext} object.
-     *
-     * @param discoveredSplits list of discovered splits
-     * @param throwable thrown when discovering splits. Will be null if no 
throwable thrown.
-     */
-    private void assignSplits(
-            List<DynamoDbStreamsShardSplit> discoveredSplits, Throwable 
throwable) {
-        if (throwable != null) {
-            throw new DynamoDbStreamsSourceException("Failed to list shards.", 
throwable);
-        }
-
-        if (context.registeredReaders().size() < context.currentParallelism()) 
{
-            LOG.info(
-                    "Insufficient registered readers, skipping assignment of 
discovered splits until all readers are registered. Required number of readers: 
{}, Registered readers: {}",
-                    context.currentParallelism(),
-                    context.registeredReaders().size());
-            unassignedSplits.addAll(discoveredSplits);
-            return;
-        }
-
-        Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments = 
new HashMap<>();
-        for (DynamoDbStreamsShardSplit split : unassignedSplits) {
-            assignSplitToSubtask(split, newSplitAssignments);
-        }
-        unassignedSplits.clear();
-        for (DynamoDbStreamsShardSplit split : discoveredSplits) {
-            assignSplitToSubtask(split, newSplitAssignments);
-        }
-
-        updateLastSeenShardId(discoveredSplits);
-        updateSplitAssignment(newSplitAssignments);
-        context.assignSplits(new SplitsAssignment<>(newSplitAssignments));
+    private List<Shard> periodicallyDiscoverSplits() {
+        return streamProxy.listShards(streamArn, null);
     }
 
     private void assignSplitToSubtask(
             DynamoDbStreamsShardSplit split,
             Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments) 
{
-        if (assignedSplitIds.contains(split.splitId())) {
+        if (splitTracker.isAssigned(split.splitId())) {
             LOG.info(
                     "Skipping assignment of shard {} from stream {} because it 
is already assigned.",

Review Comment:
   Made it to debug



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to