AHeise commented on code in PR #195:
URL: 
https://github.com/apache/flink-connector-kafka/pull/195#discussion_r2527434098


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -134,117 +161,59 @@ public KafkaSourceEnumerator(
         this.context = context;
         this.boundedness = boundedness;
 
-        Map<AssignmentStatus, List<KafkaPartitionSplit>> splits =
-                initializeMigratedSplits(kafkaSourceEnumState.splits());
-        this.assignedSplits = 
indexByPartition(splits.get(AssignmentStatus.ASSIGNED));
-        this.unassignedSplits = 
indexByPartition(splits.get(AssignmentStatus.UNASSIGNED));
-        this.pendingPartitionSplitAssignment = new HashMap<>();
         this.partitionDiscoveryIntervalMs =
                 KafkaSourceOptions.getOption(
                         properties,
                         KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS,
                         Long::parseLong);
         this.consumerGroupId = 
properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
         this.initialDiscoveryFinished = 
kafkaSourceEnumState.initialDiscoveryFinished();
+        this.assignedSplits = 
indexByPartition(kafkaSourceEnumState.assignedSplits());
+        this.unassignedSplits = 
indexByPartition(kafkaSourceEnumState.unassignedSplits());
     }
 
-    /**
-     * Initialize migrated splits to splits with concrete starting offsets. 
This method ensures that
-     * the costly offset resolution is performed only when there are splits 
that have been
-     * checkpointed with previous enumerator versions.
-     *
-     * <p>Note that this method is deliberately performed in the main thread 
to avoid a checkpoint
-     * of the splits without starting offset.
-     */
-    private Map<AssignmentStatus, List<KafkaPartitionSplit>> 
initializeMigratedSplits(
-            Set<SplitAndAssignmentStatus> splits) {
-        final Set<TopicPartition> migratedPartitions =
-                splits.stream()
-                        .filter(
-                                splitStatus ->
-                                        splitStatus.split().getStartingOffset()
-                                                == 
KafkaPartitionSplit.MIGRATED)
-                        .map(splitStatus -> 
splitStatus.split().getTopicPartition())
-                        .collect(Collectors.toSet());
-
-        if (migratedPartitions.isEmpty()) {
-            return splitByAssignmentStatus(splits.stream());
-        }
-
-        final Map<TopicPartition, Long> startOffsets =
-                startingOffsetInitializer.getPartitionOffsets(
-                        migratedPartitions, getOffsetsRetriever());
-        return splitByAssignmentStatus(
-                splits.stream()
-                        .map(splitStatus -> resolveMigratedSplit(splitStatus, 
startOffsets)));
-    }
-
-    private static Map<AssignmentStatus, List<KafkaPartitionSplit>> 
splitByAssignmentStatus(
-            Stream<SplitAndAssignmentStatus> stream) {
-        return stream.collect(
-                Collectors.groupingBy(
-                        SplitAndAssignmentStatus::assignmentStatus,
-                        Collectors.mapping(SplitAndAssignmentStatus::split, 
Collectors.toList())));
-    }
-
-    private static SplitAndAssignmentStatus resolveMigratedSplit(
-            SplitAndAssignmentStatus splitStatus, Map<TopicPartition, Long> 
startOffsets) {
-        final KafkaPartitionSplit split = splitStatus.split();
-        if (split.getStartingOffset() != KafkaPartitionSplit.MIGRATED) {
-            return splitStatus;
-        }
-        final Long startOffset = startOffsets.get(split.getTopicPartition());
-        checkState(
-                startOffset != null,
-                "Cannot find starting offset for migrated partition %s",
-                split.getTopicPartition());
-        return new SplitAndAssignmentStatus(
-                new KafkaPartitionSplit(split.getTopicPartition(), 
startOffset),
-                splitStatus.assignmentStatus());
-    }
-
-    private Map<TopicPartition, KafkaPartitionSplit> indexByPartition(
-            List<KafkaPartitionSplit> splits) {
-        if (splits == null) {
-            return new HashMap<>();
-        }
+    private static Map<TopicPartition, KafkaPartitionSplit> indexByPartition(
+            Collection<KafkaPartitionSplit> splits) {
         return splits.stream()
-                
.collect(Collectors.toMap(KafkaPartitionSplit::getTopicPartition, split -> 
split));
+                
.collect(Collectors.toMap(KafkaPartitionSplit::getTopicPartition, e -> e));
     }
 
     /**
      * Start the enumerator.
      *
      * <p>Depending on {@link #partitionDiscoveryIntervalMs}, the enumerator 
will trigger a one-time
      * partition discovery, or schedule a callable for discover partitions 
periodically.
-     *
-     * <p>The invoking chain of partition discovery would be:
-     *
-     * <ol>
-     *   <li>{@link #findNewPartitionSplits} in worker thread
-     *   <li>{@link #handlePartitionSplitChanges} in coordinator thread
-     * </ol>
      */
     @Override
     public void start() {
         adminClient = getKafkaAdminClient();
+
+        // find splits where the start offset has been initialized but not yet 
assigned to readers
+        final List<KafkaPartitionSplit> preinitializedSplits =
+                unassignedSplits.values().stream()
+                        .filter(split -> !split.isStartOffsetMigrated())

Review Comment:
   I added "These splits must not be reinitialized to keep offsets consistent 
with first discovery." 
   Note that we are checking for splits that have a proper start offsets 
(!migrated). 
   I inverted the logic to `split.hasValidStartOffset` now to make it easier to 
read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to