This is an automated email from the ASF dual-hosted git repository.

xiaozhenliu pushed a commit to branch xiaozhen-fix-query-stats
in repository https://gitbox.apache.org/repos/asf/texera.git

commit e39ce3f99e67955d744719705d8e96ce700ee2eb
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Fri Feb 13 16:24:04 2026 -0800

    fix(amber): fix getting uncreated region status in query statistics handler.
---
 .../controller/execution/WorkflowExecution.scala   | 18 ++++++++++--
 .../QueryWorkerStatisticsHandler.scala             | 32 +++++++++++++---------
 2 files changed, 34 insertions(+), 16 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
index dea9b692a4..b806479b89 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
@@ -119,10 +119,22 @@ case class WorkflowExecution() {
     * @throws NoSuchElementException if no `OperatorExecution` is found for 
the specified operatorId.
     */
   def getLatestOperatorExecution(physicalOpId: PhysicalOpIdentity): 
OperatorExecution = {
-    regionExecutions.values.toList
+    getLatestOperatorExecutionOption(physicalOpId).get
+  }
+
+  /**
+    * Returns the latest `OperatorExecution` for a physical operator if it has 
been initialized.
+    *
+    * This is the safe counterpart of `getLatestOperatorExecution` for callers 
that may traverse
+    * operators before their region is launched (e.g., full-graph stats 
queries while execution is still
+    * progressing through schedule levels).
+    */
+  def getLatestOperatorExecutionOption(
+      physicalOpId: PhysicalOpIdentity
+  ): Option[OperatorExecution] = {
+    regionExecutions.values.toSeq
       .findLast(regionExecution => 
regionExecution.hasOperatorExecution(physicalOpId))
-      .get
-      .getOperatorExecution(physicalOpId)
+      .map(_.getOperatorExecution(physicalOpId))
   }
 
   def isCompleted: Boolean = getState == WorkflowAggregatedState.COMPLETED
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
index fb725f850e..a7705170e3 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
@@ -105,20 +105,26 @@ trait QueryWorkerStatisticsHandler {
             if (opFilter.nonEmpty && !opFilter.contains(opId)) {
               Seq.empty
             } else {
-              val exec = cp.workflowExecution.getLatestOperatorExecution(opId)
-              // Skip completed operators
-              if (exec.getState == COMPLETED) {
-                Seq.empty
-              } else {
-                // Select all workers for this operator
-                val workerIds = exec.getWorkerIds
-
-                // Send queryStatistics to each worker and update internal 
state on reply
-                workerIds.map { wid =>
-                  workerInterface.queryStatistics(EmptyRequest(), wid).map { 
resp =>
-                    collectedResults.addOne((exec.getWorkerExecution(wid), 
resp, System.nanoTime()))
+              cp.workflowExecution.getLatestOperatorExecutionOption(opId) 
match {
+                // Operator region has not been initialized yet; skip in this 
polling round.
+                case None       => Seq.empty
+                case Some(exec) =>
+                  // Skip completed operators
+                  if (exec.getState == COMPLETED) {
+                    Seq.empty
+                  } else {
+                    // Select all workers for this operator
+                    val workerIds = exec.getWorkerIds
+
+                    // Send queryStatistics to each worker and update internal 
state on reply
+                    workerIds.map { wid =>
+                      workerInterface.queryStatistics(EmptyRequest(), wid).map 
{ resp =>
+                        collectedResults.addOne(
+                          (exec.getWorkerExecution(wid), resp, 
System.nanoTime())
+                        )
+                      }
+                    }
                   }
-                }
               }
             }
           }

Reply via email to