aho135 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3164978799
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4347,6 +4478,179 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
return Collections.emptyMap();
}
+ /**
+ * Check if all partitions in a task group have reached their bounded end
offsets.
+ * Used to determine if the task group completed successfully vs failed
midway.
+ *
+ * @param groupId The task group ID to check
+ * @return true if all partitions in the group have reached their end
offsets, false otherwise
+ */
+ private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+ {
+ BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+ Map<PartitionIdType, SequenceOffsetType> endOffsets =
+ convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+ Map<PartitionIdType, SequenceOffsetType> currentOffsets =
getOffsetsFromMetadataStorage();
+
+ log.info(
+ "Bounded mode: checking completion for taskGroup[%d]. Current offsets
from metadata: %s, End offsets: %s",
+ groupId,
+ currentOffsets,
+ endOffsets
+ );
+
+ if (currentOffsets == null || currentOffsets.isEmpty()) {
+ log.debug("No checkpointed offsets found, taskGroup[%d] has not
completed", groupId);
+ return false; // No progress yet, task hasn't completed
+ }
+
+ Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+ if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+ return false;
+ }
+
+ // Check if ALL partitions in this group have reached their end offsets
+ for (PartitionIdType partition : partitionsInGroup) {
+ SequenceOffsetType endOffset = endOffsets.get(partition);
+ SequenceOffsetType currentOffset = currentOffsets.get(partition);
+
+ if (currentOffset == null) {
+ log.debug(
+ "Partition[%s] in taskGroup[%d] has no checkpointed offset, not
complete",
+ partition,
+ groupId
+ );
+ return false; // Partition hasn't started processing
+ }
+
+ if (!isOffsetAtOrBeyond(currentOffset, endOffset)) {
+ log.debug(
+ "Partition[%s] in taskGroup[%d] at offset[%s], has not reached
end[%s]",
+ partition,
+ groupId,
+ currentOffset,
+ endOffset
+ );
+ return false; // This partition hasn't reached its end
+ }
+ }
+
+ log.info(
+ "All partitions in taskGroup[%d] have reached their end offsets",
+ groupId
+ );
+ return true; // All partitions have reached their end offsets
+ }
+
+ /**
+ * Get current offsets for all partitions in a task group from metadata
storage.
+ */
+ private Map<PartitionIdType, SequenceOffsetType>
getCurrentOffsetsForGroup(int groupId)
+ {
+ Map<PartitionIdType, SequenceOffsetType> allOffsets =
getOffsetsFromMetadataStorage();
+ if (allOffsets == null || allOffsets.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+ if (partitionsInGroup == null) {
+ return Collections.emptyMap();
+ }
+
+ return partitionsInGroup.stream()
+ .filter(allOffsets::containsKey)
+ .collect(Collectors.toMap(
+ p -> p,
+ allOffsets::get
+ ));
+ }
+
+ /**
+ * Get end offsets for all partitions in a task group from bounded config.
+ */
+ private Map<PartitionIdType, SequenceOffsetType> getEndOffsetsForGroup(int
groupId)
+ {
+ BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+ Map<PartitionIdType, SequenceOffsetType> endOffsets =
+ convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+ Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+
+ if (partitionsInGroup == null) {
+ return Collections.emptyMap();
+ }
+
+ return partitionsInGroup.stream()
+ .filter(endOffsets::containsKey)
+ .collect(Collectors.toMap(
+ p -> p,
+ endOffsets::get
+ ));
+ }
+
+ /**
+ * Check if all bounded tasks have completed.
+ * Called after createNewTasks() in runInternal to ensure tasks have been
created first.
+ *
+ * For bounded supervisors, we determine completion by checking if new tasks
would be created.
+ * In createNewTasks(), bounded mode checks hasTaskGroupReachedBoundedEnd()
before creating tasks.
+ * If that returns true (offsets reached), no new tasks are created.
+ *
+ * So completion is: no active tasks, no pending tasks, and createNewTasks()
chose not to create any.
+ * This is indicated by empty task groups after createNewTasks() has run.
+ *
+ * We do NOT separately check metadata storage here because:
+ * 1. Metadata may contain stale offsets from previous supervisor runs
+ * 2. createNewTasks() already does the offset checking logic
+ * 3. If tasks were killed/failed and work is incomplete, createNewTasks()
will recreate them
+ *
+ * @return true if all bounded work is complete, false otherwise
+ */
+ private boolean isBoundedWorkComplete()
+ {
+ if (!ioConfig.isBounded()) {
+ return false;
+ }
+
+ // Check if task groups are empty (no tasks active or pending)
+ boolean noActiveTasks = activelyReadingTaskGroups.isEmpty();
+ boolean noPendingTasks =
pendingCompletionTaskGroups.values().stream().allMatch(List::isEmpty);
+
+ if (!noActiveTasks || !noPendingTasks) {
+ return false;
+ }
+
+ // At this point, no tasks are running. Since createNewTasks() already ran,
+ // if tasks aren't running it means either:
+ // A) Tasks completed successfully and offset targets were reached (don't
recreate)
+ // B) Tasks failed/killed and haven't reached targets (will recreate next
run)
+ //
+ // To distinguish, we check if createNewTasks() would create new tasks.
+ // If hasTaskGroupReachedBoundedEnd() returns false for any group,
createNewTasks()
+ // will create tasks next iteration, so we're not complete.
+ for (Integer groupId : partitionGroups.keySet()) {
+ if (!hasTaskGroupReachedBoundedEnd(groupId)) {
+ log.debug("TaskGroup[%d] has not reached bounded end, tasks will be
recreated", groupId);
+ return false;
+ }
+ }
+
+ // All groups have reached their end offsets and no tasks are running.
+ // Work is complete!
+ log.info("All bounded tasks completed for supervisor[%s]", supervisorId);
+ return true;
+ }
+
+ /**
+ * Handle bounded processing completion by shutting down the supervisor.
+ * At this point, all task groups are already empty (verified by
isBoundedWorkComplete),
+ * so we just need to mark the supervisor as completed.
+ */
+ private void handleBoundedCompletion()
+ {
+ log.info("Bounded processing complete for supervisor[%s]. Marking as
COMPLETED.", supervisorId);
+ stateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED);
Review Comment:
One workflow I was testing out was to submit a bounded Supervisor and have
it run to completion. Then I adjusted the start/end offsets and re-submitted
the spec. Then I did a hard reset to clear the metadata so it could ingest the
new offset range. For this kind of workflow we would need the executor to
continue running even though the Supervisor is in COMPLETED state
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -4347,6 +4478,179 @@ && checkSourceMetadataMatch(dataSourceMetadata)) {
return Collections.emptyMap();
}
+ /**
+ * Check if all partitions in a task group have reached their bounded end
offsets.
+ * Used to determine if the task group completed successfully vs failed
midway.
+ *
+ * @param groupId The task group ID to check
+ * @return true if all partitions in the group have reached their end
offsets, false otherwise
+ */
+ private boolean hasTaskGroupReachedBoundedEnd(int groupId)
+ {
+ BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+ Map<PartitionIdType, SequenceOffsetType> endOffsets =
+ convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+ Map<PartitionIdType, SequenceOffsetType> currentOffsets =
getOffsetsFromMetadataStorage();
+
+ log.info(
+ "Bounded mode: checking completion for taskGroup[%d]. Current offsets
from metadata: %s, End offsets: %s",
+ groupId,
+ currentOffsets,
+ endOffsets
+ );
+
+ if (currentOffsets == null || currentOffsets.isEmpty()) {
+ log.debug("No checkpointed offsets found, taskGroup[%d] has not
completed", groupId);
+ return false; // No progress yet, task hasn't completed
+ }
+
+ Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+ if (partitionsInGroup == null || partitionsInGroup.isEmpty()) {
+ return false;
+ }
+
+ // Check if ALL partitions in this group have reached their end offsets
+ for (PartitionIdType partition : partitionsInGroup) {
+ SequenceOffsetType endOffset = endOffsets.get(partition);
+ SequenceOffsetType currentOffset = currentOffsets.get(partition);
+
+ if (currentOffset == null) {
+ log.debug(
+ "Partition[%s] in taskGroup[%d] has no checkpointed offset, not
complete",
+ partition,
+ groupId
+ );
+ return false; // Partition hasn't started processing
+ }
+
+ if (!isOffsetAtOrBeyond(currentOffset, endOffset)) {
+ log.debug(
+ "Partition[%s] in taskGroup[%d] at offset[%s], has not reached
end[%s]",
+ partition,
+ groupId,
+ currentOffset,
+ endOffset
+ );
+ return false; // This partition hasn't reached its end
+ }
+ }
+
+ log.info(
+ "All partitions in taskGroup[%d] have reached their end offsets",
+ groupId
+ );
+ return true; // All partitions have reached their end offsets
+ }
+
+ /**
+ * Get current offsets for all partitions in a task group from metadata
storage.
+ */
+ private Map<PartitionIdType, SequenceOffsetType>
getCurrentOffsetsForGroup(int groupId)
+ {
+ Map<PartitionIdType, SequenceOffsetType> allOffsets =
getOffsetsFromMetadataStorage();
+ if (allOffsets == null || allOffsets.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+ if (partitionsInGroup == null) {
+ return Collections.emptyMap();
+ }
+
+ return partitionsInGroup.stream()
+ .filter(allOffsets::containsKey)
+ .collect(Collectors.toMap(
+ p -> p,
+ allOffsets::get
+ ));
+ }
+
+ /**
+ * Get end offsets for all partitions in a task group from bounded config.
+ */
+ private Map<PartitionIdType, SequenceOffsetType> getEndOffsetsForGroup(int
groupId)
+ {
+ BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig();
+ Map<PartitionIdType, SequenceOffsetType> endOffsets =
+ convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers());
+ Set<PartitionIdType> partitionsInGroup = partitionGroups.get(groupId);
+
+ if (partitionsInGroup == null) {
+ return Collections.emptyMap();
+ }
+
+ return partitionsInGroup.stream()
+ .filter(endOffsets::containsKey)
+ .collect(Collectors.toMap(
+ p -> p,
+ endOffsets::get
+ ));
+ }
+
+ /**
+ * Check if all bounded tasks have completed.
+ * Called after createNewTasks() in runInternal to ensure tasks have been
created first.
+ *
+ * For bounded supervisors, we determine completion by checking if new tasks
would be created.
+ * In createNewTasks(), bounded mode checks hasTaskGroupReachedBoundedEnd()
before creating tasks.
+ * If that returns true (offsets reached), no new tasks are created.
+ *
+ * So completion is: no active tasks, no pending tasks, and createNewTasks()
chose not to create any.
+ * This is indicated by empty task groups after createNewTasks() has run.
+ *
+ * We do NOT separately check metadata storage here because:
+ * 1. Metadata may contain stale offsets from previous supervisor runs
+ * 2. createNewTasks() already does the offset checking logic
+ * 3. If tasks were killed/failed and work is incomplete, createNewTasks()
will recreate them
+ *
+ * @return true if all bounded work is complete, false otherwise
+ */
+ private boolean isBoundedWorkComplete()
+ {
+ if (!ioConfig.isBounded()) {
+ return false;
+ }
+
+ // Check if task groups are empty (no tasks active or pending)
+ boolean noActiveTasks = activelyReadingTaskGroups.isEmpty();
+ boolean noPendingTasks =
pendingCompletionTaskGroups.values().stream().allMatch(List::isEmpty);
+
+ if (!noActiveTasks || !noPendingTasks) {
+ return false;
+ }
+
+ // At this point, no tasks are running. Since createNewTasks() already ran,
+ // if tasks aren't running it means either:
+ // A) Tasks completed successfully and offset targets were reached (don't
recreate)
+ // B) Tasks failed/killed and haven't reached targets (will recreate next
run)
+ //
+ // To distinguish, we check if createNewTasks() would create new tasks.
+ // If hasTaskGroupReachedBoundedEnd() returns false for any group,
createNewTasks()
+ // will create tasks next iteration, so we're not complete.
+ for (Integer groupId : partitionGroups.keySet()) {
+ if (!hasTaskGroupReachedBoundedEnd(groupId)) {
+ log.debug("TaskGroup[%d] has not reached bounded end, tasks will be
recreated", groupId);
+ return false;
+ }
+ }
+
+ // All groups have reached their end offsets and no tasks are running.
+ // Work is complete!
+ log.info("All bounded tasks completed for supervisor[%s]", supervisorId);
+ return true;
+ }
+
+ /**
+ * Handle bounded processing completion by shutting down the supervisor.
+ * At this point, all task groups are already empty (verified by
isBoundedWorkComplete),
+ * so we just need to mark the supervisor as completed.
+ */
+ private void handleBoundedCompletion()
+ {
+ log.info("Bounded processing complete for supervisor[%s]. Marking as
COMPLETED.", supervisorId);
+ stateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED);
Review Comment:
One workflow I was testing out was to submit a bounded Supervisor and have
it run to completion. Then I adjusted the start/end offsets and re-submitted
the spec. Then I did a hard reset to clear the metadata so it could ingest the
new offset range. For this kind of workflow we would need the executor to
continue running even after the initial completion
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]