abhishekrb19 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3156717978


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4347,6 +4478,179 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
     return Collections.emptyMap();
   }
 
+  /**
+   * Check if all partitions in a task group have reached their bounded end 
offsets.
+   * Used to determine if the task group completed successfully vs failed 
midway.
+   *
+   * @param groupId The task group ID to check
+   * @return true if all partitions in the group have reached their end 
offsets, false otherwise
+   */
+  private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+    Map<PartitionIdType, SequenceOffsetType> currentOffsets = 
getOffsetsFromMetadataStorage();
+
+    log.info(
+        "Bounded mode: checking completion for taskGroup[%d]. Current offsets 
from metadata: %s, End offsets: %s",
+        groupId,
+        currentOffsets,
+        endOffsets
+    );
+
+    if (currentOffsets == null || currentOffsets.isEmpty()) {
+      log.debug("No checkpointed offsets found, taskGroup[%d] has not 
completed", groupId);
+      return false; // No progress yet, task hasn't completed
+    }
+
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+    if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+      return false;
+    }
+
+    // Check if ALL partitions in this group have reached their end offsets
+    for (PartitionIdType partition : partitionsInGroup) {
+      SequenceOffsetType endOffset = endOffsets.get(partition);
+      SequenceOffsetType currentOffset = currentOffsets.get(partition);
+
+      if (currentOffset == null) {
+        log.debug(
+            "Partition[%s] in taskGroup[%d] has no checkpointed offset, not 
complete",
+            partition,
+            groupId
+        );
+        return false; // Partition hasn't started processing
+      }
+
+      if (!isOffsetAtOrBeyond(currentOffset, endOffset)) {
+        log.debug(
+            "Partition[%s] in taskGroup[%d] at offset[%s], has not reached 
end[%s]",
+            partition,
+            groupId,
+            currentOffset,
+            endOffset
+        );
+        return false; // This partition hasn't reached its end
+      }
+    }
+
+    log.info(
+        "All partitions in taskGroup[%d] have reached their end offsets",
+        groupId
+    );
+    return true; // All partitions have reached their end offsets
+  }
+
+  /**
+   * Get current offsets for all partitions in a task group from metadata 
storage.
+   */
+  private Map<PartitionIdType, SequenceOffsetType> 
getCurrentOffsetsForGroup(int groupId)
+  {
+    Map<PartitionIdType, SequenceOffsetType> allOffsets = 
getOffsetsFromMetadataStorage();
+    if (allOffsets == null || allOffsets.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+    if (partitionsInGroup == null) {
+      return Collections.emptyMap();
+    }
+
+    return partitionsInGroup.stream()
+        .filter(allOffsets::containsKey)
+        .collect(Collectors.toMap(
+            p -> p,
+            allOffsets::get
+        ));
+  }
+
+  /**
+   * Get end offsets for all partitions in a task group from bounded config.
+   */
+  private Map<PartitionIdType, SequenceOffsetType> getEndOffsetsForGroup(int 
groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+
+    if (partitionsInGroup == null) {
+      return Collections.emptyMap();
+    }
+
+    return partitionsInGroup.stream()
+        .filter(endOffsets::containsKey)
+        .collect(Collectors.toMap(
+            p -> p,
+            endOffsets::get
+        ));
+  }
+
+  /**
+   * Check if all bounded tasks have completed.
+   * Called after createNewTasks() in runInternal to ensure tasks have been 
created first.
+   *
+   * For bounded supervisors, we determine completion by checking if new tasks 
would be created.
+   * In createNewTasks(), bounded mode checks hasTaskGroupReachedBoundedEnd() 
before creating tasks.
+   * If that returns true (offsets reached), no new tasks are created.
+   *
+   * So completion is: no active tasks, no pending tasks, and createNewTasks() 
chose not to create any.
+   * This is indicated by empty task groups after createNewTasks() has run.
+   *
+   * We do NOT separately check metadata storage here because:
+   * 1. Metadata may contain stale offsets from previous supervisor runs
+   * 2. createNewTasks() already does the offset checking logic
+   * 3. If tasks were killed/failed and work is incomplete, createNewTasks() 
will recreate them
+   *
+   * @return true if all bounded work is complete, false otherwise
+   */
+  private boolean isBoundedWorkComplete()
+  {
+    if (!ioConfig.isBounded()) {
+      return false;
+    }
+
+    // Check if task groups are empty (no tasks active or pending)
+    boolean noActiveTasks = activelyReadingTaskGroups.isEmpty();
+    boolean noPendingTasks = 
pendingCompletionTaskGroups.values().stream().allMatch(List::isEmpty);
+
+    if (!noActiveTasks || !noPendingTasks) {
+      return false;
+    }
+
+    // At this point, no tasks are running. Since createNewTasks() already ran,
+    // if tasks aren't running it means either:
+    // A) Tasks completed successfully and offset targets were reached (don't 
recreate)
+    // B) Tasks failed/killed and haven't reached targets (will recreate next 
run)
+    //
+    // To distinguish, we check if createNewTasks() would create new tasks.
+    // If hasTaskGroupReachedBoundedEnd() returns false for any group, 
createNewTasks()
+    // will create tasks next iteration, so we're not complete.
+    for (Integer groupId : partitionGroups.keySet()) {
+      if (!hasTaskGroupReachedBoundedEnd(groupId)) {
+        log.debug("TaskGroup[%d] has not reached bounded end, tasks will be 
recreated", groupId);
+        return false;
+      }
+    }
+
+    // All groups have reached their end offsets and no tasks are running.
+    // Work is complete!
+    log.info("All bounded tasks completed for supervisor[%s]", supervisorId);
+    return true;
+  }
+
+  /**
+   * Handle bounded processing completion by shutting down the supervisor.
+   * At this point, all task groups are already empty (verified by 
isBoundedWorkComplete),
+   * so we just need to mark the supervisor as completed.
+   */
+  private void handleBoundedCompletion()
+  {
+    log.info("Bounded processing complete for supervisor[%s]. Marking as 
COMPLETED.", supervisorId);
+    stateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED);

Review Comment:
   Should this call `stop()` with this COMPLETED state so things get 
unregistered and the executor is removed?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -259,6 +262,42 @@ public void validateSpecUpdateTo(SupervisorSpec 
proposedSpec) throws DruidExcept
     if (!this.getSource().equals(other.getSource())) {
       throw InvalidInput.exception(ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE, 
this.getSource(), other.getSource());
     }
+
+    // Validate bounded stream configuration
+    validateBoundedStreamConfig(other);

Review Comment:
   Looks like this is only invoked on an update. Should we validate this during 
creation time as well?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -259,6 +262,42 @@ public void validateSpecUpdateTo(SupervisorSpec 
proposedSpec) throws DruidExcept
     if (!this.getSource().equals(other.getSource())) {
       throw InvalidInput.exception(ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE, 
this.getSource(), other.getSource());
     }
+
+    // Validate bounded stream configuration
+    validateBoundedStreamConfig(other);
+  }
+
+  /**
+   * Validates bounded stream configuration for the supervisor spec.
+   *
+   * @param spec the supervisor spec to validate
+   * @throws DruidException if the bounded stream configuration is invalid
+   */
+  protected void validateBoundedStreamConfig(SeekableStreamSupervisorSpec 
spec) throws DruidException
+  {
+    SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
+
+    if (ioConfig.isBounded()) {
+      // Validate partition consistency
+      BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+      if 
(!boundedConfig.getStartSequenceNumbers().keySet().equals(boundedConfig.getEndSequenceNumbers().keySet()))
 {
+        throw InvalidInput.exception(
+            "Bounded stream config has mismatched partitions. Start: %s, End: 
%s",
+            boundedConfig.getStartSequenceNumbers().keySet(),
+            boundedConfig.getEndSequenceNumbers().keySet()
+        );
+      }

Review Comment:
   I wonder if some of these checks during create & update flows can be shared. 
Perhaps add a static helper inside `BoundedStreamConfig` that can be validated 
by both create & `validateSpecUpdateTo`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4347,6 +4478,179 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
     return Collections.emptyMap();
   }
 
+  /**
+   * Check if all partitions in a task group have reached their bounded end 
offsets.
+   * Used to determine if the task group completed successfully vs failed 
midway.
+   *
+   * @param groupId The task group ID to check
+   * @return true if all partitions in the group have reached their end 
offsets, false otherwise
+   */
+  private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+    Map<PartitionIdType, SequenceOffsetType> currentOffsets = 
getOffsetsFromMetadataStorage();
+
+    log.info(
+        "Bounded mode: checking completion for taskGroup[%d]. Current offsets 
from metadata: %s, End offsets: %s",
+        groupId,
+        currentOffsets,
+        endOffsets
+    );
+
+    if (currentOffsets == null || currentOffsets.isEmpty()) {
+      log.debug("No checkpointed offsets found, taskGroup[%d] has not 
completed", groupId);
+      return false; // No progress yet, task hasn't completed
+    }
+
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+    if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+      return false;
+    }
+
+    // Check if ALL partitions in this group have reached their end offsets
+    for (PartitionIdType partition : partitionsInGroup) {
+      SequenceOffsetType endOffset = endOffsets.get(partition);
+      SequenceOffsetType currentOffset = currentOffsets.get(partition);
+
+      if (currentOffset == null) {
+        log.debug(
+            "Partition[%s] in taskGroup[%d] has no checkpointed offset, not 
complete",
+            partition,
+            groupId
+        );
+        return false; // Partition hasn't started processing
+      }
+
+      if (!isOffsetAtOrBeyond(currentOffset, endOffset)) {
+        log.debug(
+            "Partition[%s] in taskGroup[%d] at offset[%s], has not reached 
end[%s]",
+            partition,
+            groupId,
+            currentOffset,
+            endOffset
+        );
+        return false; // This partition hasn't reached its end
+      }
+    }
+
+    log.info(
+        "All partitions in taskGroup[%d] have reached their end offsets",
+        groupId
+    );
+    return true; // All partitions have reached their end offsets
+  }
+
+  /**
+   * Get current offsets for all partitions in a task group from metadata 
storage.
+   */
+  private Map<PartitionIdType, SequenceOffsetType> 
getCurrentOffsetsForGroup(int groupId)
+  {
+    Map<PartitionIdType, SequenceOffsetType> allOffsets = 
getOffsetsFromMetadataStorage();
+    if (allOffsets == null || allOffsets.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+    if (partitionsInGroup == null) {
+      return Collections.emptyMap();
+    }
+
+    return partitionsInGroup.stream()
+        .filter(allOffsets::containsKey)
+        .collect(Collectors.toMap(
+            p -> p,
+            allOffsets::get
+        ));
+  }
+
+  /**
+   * Get end offsets for all partitions in a task group from bounded config.
+   */
+  private Map<PartitionIdType, SequenceOffsetType> getEndOffsetsForGroup(int 
groupId)
+  {
+    BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+    Map<PartitionIdType, SequenceOffsetType> endOffsets =
+        convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+    Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+
+    if (partitionsInGroup == null) {
+      return Collections.emptyMap();
+    }
+
+    return partitionsInGroup.stream()
+        .filter(endOffsets::containsKey)
+        .collect(Collectors.toMap(
+            p -> p,
+            endOffsets::get
+        ));
+  }
+
+  /**
+   * Check if all bounded tasks have completed.
+   * Called after createNewTasks() in runInternal to ensure tasks have been 
created first.
+   *
+   * For bounded supervisors, we determine completion by checking if new tasks 
would be created.
+   * In createNewTasks(), bounded mode checks hasTaskGroupReachedBoundedEnd() 
before creating tasks.
+   * If that returns true (offsets reached), no new tasks are created.
+   *
+   * So completion is: no active tasks, no pending tasks, and createNewTasks() 
chose not to create any.
+   * This is indicated by empty task groups after createNewTasks() has run.
+   *
+   * We do NOT separately check metadata storage here because:
+   * 1. Metadata may contain stale offsets from previous supervisor runs
+   * 2. createNewTasks() already does the offset checking logic
+   * 3. If tasks were killed/failed and work is incomplete, createNewTasks() 
will recreate them
+   *
+   * @return true if all bounded work is complete, false otherwise

Review Comment:
   Hmm, indirectly inferring bounded work completion from the absence of tasks 
seems error-prone since these things can race and function independently in 
some ways.
   For example, if all tasks failed or if tasks were queued but haven’t started 
yet as mentioned in point 3, would the supervisor transition to a completed 
state prematurely?
   
   I think a more foolproof way to check completion would be to directly check 
metadata storage to track offsets for work completion and have the design 
reflect that with appropriate guardrails. I believe this also relates to the 
discussion here: https://github.com/apache/druid/pull/19372/changes#r3141594283



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class BoundedStreamConfigTest
+{
+  private final ObjectMapper mapper = new ObjectMapper();
+
+  @Test
+  public void testConstructorWithValidMaps()
+  {
+    Map<String, Long> startOffsets = new HashMap<>();
+    startOffsets.put("0", 100L);
+    startOffsets.put("1", 200L);
+
+    Map<String, Long> endOffsets = new HashMap<>();
+    endOffsets.put("0", 500L);
+    endOffsets.put("1", 600L);
+
+    BoundedStreamConfig config = new BoundedStreamConfig(startOffsets, 
endOffsets);
+
+    Assert.assertEquals(startOffsets, config.getStartSequenceNumbers());
+    Assert.assertEquals(endOffsets, config.getEndSequenceNumbers());
+  }
+
+  @Test
+  public void testConstructorWithNullStartSequenceNumbers()
+  {
+    Map<String, Long> endOffsets = new HashMap<>();
+    endOffsets.put("0", 500L);
+
+    NullPointerException ex = Assert.assertThrows(
+        NullPointerException.class,
+        () -> new BoundedStreamConfig(null, endOffsets)
+    );
+
+    Assert.assertTrue(ex.getMessage().contains("startSequenceNumbers"));
+  }
+
+  @Test
+  public void testConstructorWithNullEndSequenceNumbers()
+  {
+    Map<String, Long> startOffsets = new HashMap<>();
+    startOffsets.put("0", 100L);
+
+    NullPointerException ex = Assert.assertThrows(
+        NullPointerException.class,
+        () -> new BoundedStreamConfig(startOffsets, null)
+    );

Review Comment:
   Seems strange to assert on NPEs. Should we have defensive code that throws 
DruidExceptions instead of NPEs wtih preconditions?



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -259,6 +262,42 @@ public void validateSpecUpdateTo(SupervisorSpec 
proposedSpec) throws DruidExcept
     if (!this.getSource().equals(other.getSource())) {
       throw InvalidInput.exception(ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE, 
this.getSource(), other.getSource());
     }
+
+    // Validate bounded stream configuration
+    validateBoundedStreamConfig(other);
+  }
+
+  /**
+   * Validates bounded stream configuration for the supervisor spec.
+   *
+   * @param spec the supervisor spec to validate
+   * @throws DruidException if the bounded stream configuration is invalid
+   */
+  protected void validateBoundedStreamConfig(SeekableStreamSupervisorSpec 
spec) throws DruidException
+  {
+    SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
+
+    if (ioConfig.isBounded()) {
+      // Validate partition consistency
+      BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+      if 
(!boundedConfig.getStartSequenceNumbers().keySet().equals(boundedConfig.getEndSequenceNumbers().keySet()))
 {
+        throw InvalidInput.exception(
+            "Bounded stream config has mismatched partitions. Start: %s, End: 
%s",
+            boundedConfig.getStartSequenceNumbers().keySet(),
+            boundedConfig.getEndSequenceNumbers().keySet()
+        );
+      }
+
+      // Warn if useConcurrentLocks is not enabled
+      Map<String, Object> context = spec.getContext();
+      if (context == null || 
!Boolean.TRUE.equals(context.get("useConcurrentLocks"))) {
+        log.warn(
+            "Bounded stream processing without 'useConcurrentLocks=true' may 
fail " +
+            "if other supervisors are running or segments already exist for 
these intervals. " +
+            "Consider setting useConcurrentLocks=true in the supervisor 
context."
+        );

Review Comment:
   Is this requirement needed in the general case when an operator is 
submitting a supervisor with these bounds? I’d expect `useConcurrentLocks` to 
be required only when a backfill supervisor is submitted in parallel with 
resetting the currently running supervisor. If that’s the case, would it make 
sense to enforce this at the reset and backfill API layer instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to