gguptp commented on code in PR #151: URL: https://github.com/apache/flink-connector-aws/pull/151#discussion_r1698925946
########## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java: ########## @@ -162,75 +216,14 @@ private List<DynamoDbStreamsShardSplit> initialDiscoverSplits() { * * @return list of discovered splits */ - private List<DynamoDbStreamsShardSplit> periodicallyDiscoverSplits() { - List<Shard> shards = streamProxy.listShards(streamArn, lastSeenShardId); - - // Any shard discovered after the initial startup should be read from the start, since they - // come from resharding - return mapToSplits(shards, InitialPosition.TRIM_HORIZON); - } - - private List<DynamoDbStreamsShardSplit> mapToSplits( - List<Shard> shards, InitialPosition initialPosition) { - StartingPosition startingPosition; - switch (initialPosition) { - case LATEST: - startingPosition = StartingPosition.latest(); - break; - case TRIM_HORIZON: - default: - startingPosition = StartingPosition.fromStart(); - } - - List<DynamoDbStreamsShardSplit> splits = new ArrayList<>(); - for (Shard shard : shards) { - splits.add(new DynamoDbStreamsShardSplit(streamArn, shard.shardId(), startingPosition)); - } - - return splits; - } - - /** - * This method assigns a given set of DynamoDb Streams splits to the readers currently - * registered on the cluster. This assignment is done via a side-effect on the {@link - * SplitEnumeratorContext} object. - * - * @param discoveredSplits list of discovered splits - * @param throwable thrown when discovering splits. Will be null if no throwable thrown. - */ - private void assignSplits( - List<DynamoDbStreamsShardSplit> discoveredSplits, Throwable throwable) { - if (throwable != null) { - throw new DynamoDbStreamsSourceException("Failed to list shards.", throwable); - } - - if (context.registeredReaders().size() < context.currentParallelism()) { - LOG.info( - "Insufficient registered readers, skipping assignment of discovered splits until all readers are registered. Required number of readers: {}, Registered readers: {}", - context.currentParallelism(), - context.registeredReaders().size()); - unassignedSplits.addAll(discoveredSplits); - return; - } - - Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments = new HashMap<>(); - for (DynamoDbStreamsShardSplit split : unassignedSplits) { - assignSplitToSubtask(split, newSplitAssignments); - } - unassignedSplits.clear(); - for (DynamoDbStreamsShardSplit split : discoveredSplits) { - assignSplitToSubtask(split, newSplitAssignments); - } - - updateLastSeenShardId(discoveredSplits); - updateSplitAssignment(newSplitAssignments); - context.assignSplits(new SplitsAssignment<>(newSplitAssignments)); + private List<Shard> periodicallyDiscoverSplits() { + return streamProxy.listShards(streamArn, null); } private void assignSplitToSubtask( DynamoDbStreamsShardSplit split, Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments) { - if (assignedSplitIds.contains(split.splitId())) { + if (splitTracker.isAssigned(split.splitId())) { LOG.info( "Skipping assignment of shard {} from stream {} because it is already assigned.", Review Comment: Made it to debug -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org