This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e7091485b4 fix: Stuck queries due to skipped "DoneReadingInput". 
(#19219)
0e7091485b4 is described below

commit 0e7091485b47c6dfb6837e27e1abef5bb653467f
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Mar 30 08:00:11 2026 -0700

    fix: Stuck queries due to skipped "DoneReadingInput". (#19219)
    
    In MEMORY output mode when workers sort locally, the controller knows
    to start the next stage when all workers have finished reading their
    input.
    
    Typically the controller learns this because workers send the
    DoneReadingInput message when they transition from READING_INPUT to
    PRESHUFFLE_WRITING_OUTPUT. However, when a worker finishes reading
    input very quickly and is also able to fully buffer its output, it
    transitions directly from READING_INPUT to RESULTS_COMPLETE. This
    causes the query to become stuck.
    
    This patch fixes it at the controller, by additionally checking if
    all workers are done reading input when receiving a ResultsComplete
    message.
---
 .../kernel/controller/ControllerStageTracker.java  | 12 +++++++
 .../controller/BaseControllerQueryKernelTest.java  |  9 +++++
 .../controller/ControllerQueryKernelTest.java      | 38 ++++++++++++++++++++++
 .../controller/MockQueryDefinitionBuilder.java     | 12 ++++++-
 4 files changed, 70 insertions(+), 1 deletion(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
index 1a2a0dc7b79..0a084986a47 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
@@ -904,6 +904,18 @@ class ControllerStageTracker
       );
     }
 
+    // When stages sort during shuffle and don't need to gather statistics, 
they should transition to POST_READING
+    // when all workers are done reading input. "Results complete" implies 
that input is done being read, so check
+    // here if we should transition. (In most cases the worker will call 
"setDoneReadingInputForWorker" first, but
+    // it can go straight to "setResultsCompleteForWorker" if it finishes 
reading inputs very quickly and can
+    // fully buffer its outputs.)
+    if (phase == ControllerStagePhase.READING_INPUT
+        && !stageDef.mustGatherResultKeyStatistics()
+        && stageDef.doesSortDuringShuffle()
+        && allWorkersDoneReadingInput()) {
+      transitionTo(ControllerStagePhase.POST_READING);
+    }
+
     if (allResultsPresent()) {
       transitionTo(ControllerStagePhase.RESULTS_READY);
       return true;
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
index 96312e293fa..bb802382045 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
@@ -303,6 +303,15 @@ public class BaseControllerQueryKernelTest extends 
InitializedNullHandlingTest
                            .forEach(n -> 
controllerQueryKernel.setDoneReadingInputForStageAndWorker(stageId, n));
     }
 
+    public void doneReadingInputForWorkers(int stageNumber, int... workers)
+    {
+      Preconditions.checkArgument(initialized);
+      final StageId stageId = new StageId(queryDefinition.getQueryId(), 
stageNumber);
+      for (int worker : workers) {
+        controllerQueryKernel.setDoneReadingInputForStageAndWorker(stageId, 
worker);
+      }
+    }
+
     public void finishStage(int stageNumber)
     {
       finishStage(stageNumber, true);
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelTest.java
index 03f963b133b..bd1f31ae23e 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernelTest.java
@@ -547,6 +547,44 @@ public class ControllerQueryKernelTest extends 
BaseControllerQueryKernelTest
     controllerQueryKernelTester.assertStagePhase(2, 
ControllerStagePhase.FINISHED);
   }
 
+  /**
+   * Test that the controller handles the case where a worker reports 
ResultsComplete without first
+   * reporting DoneReadingInput, which can happen if the worker finishes 
reading inputs very quickly.
+   */
+  @Test
+  public void testResultsCompleteWithoutDoneReadingInput()
+  {
+    // Stage 0: GLOBAL_SORT with maxPartitions=1 
(mustGatherResultKeyStatistics = false), 2 workers
+    // Stage 1: reads from stage 0
+    final ControllerQueryKernelTester tester = testControllerQueryKernel(
+        configBuilder -> 
configBuilder.maxConcurrentStages(2).pipeline(true).build()
+    );
+
+    tester.queryDefinition(
+        new MockQueryDefinitionBuilder(2)
+            .addEdge(0, 1)
+            .defineStage(0, ShuffleKind.GLOBAL_SORT, 2, 1)
+            .getQueryDefinitionBuilder()
+            .build()
+    );
+    tester.init();
+
+    Assert.assertEquals(ImmutableSet.of(0), 
tester.createAndGetNewStageNumbers());
+
+    tester.startStage(0);
+    tester.sendWorkOrdersForWorkers(0, 0, 1);
+
+    // Worker 0 reports doneReadingInput normally
+    tester.doneReadingInputForWorkers(0, 0);
+
+    // Worker 1 skips doneReadingInput and reports resultsComplete directly 
(the race condition)
+    tester.setResultsCompleteForStageAndWorkers(0, 1);
+
+    // Stage 0 should reach POST_READING; stage 1 should be ready to run
+    tester.assertStagePhase(0, ControllerStagePhase.POST_READING);
+    Assert.assertEquals(ImmutableSet.of(1), 
tester.createAndGetNewStageNumbers());
+  }
+
   private static void 
transitionNewToResultsComplete(ControllerQueryKernelTester queryKernelTester, 
int stageNumber)
   {
     queryKernelTester.startStage(stageNumber);
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
index c0d5157f96e..bd1be760d27 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/MockQueryDefinitionBuilder.java
@@ -132,6 +132,16 @@ public class MockQueryDefinitionBuilder
       @Nullable ShuffleKind shuffleKind,
       int maxWorkers
   )
+  {
+    return defineStage(stageNumber, shuffleKind, maxWorkers, 
MAX_NUM_PARTITIONS);
+  }
+
+  public MockQueryDefinitionBuilder defineStage(
+      int stageNumber,
+      @Nullable ShuffleKind shuffleKind,
+      int maxWorkers,
+      int maxPartitions
+  )
   {
     Preconditions.checkArgument(
         stageNumber < numStages,
@@ -155,7 +165,7 @@ public class MockQueryDefinitionBuilder
                   ),
                   0
               ),
-              MAX_NUM_PARTITIONS,
+              maxPartitions,
               false,
               ShuffleSpec.UNLIMITED
           );


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to