This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 641fb857b5 fix(amber): fix getting uncreated region status in query 
statistics handler (#4213)
641fb857b5 is described below

commit 641fb857b5fcabf8c48a90a2408453de7ebfd9a4
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Fri Feb 13 17:53:35 2026 -0800

    fix(amber): fix getting uncreated region status in query statistics handler 
(#4213)
    
    <!--
    Thanks for sending a pull request (PR)! Here are some tips for you:
    1. If this is your first time, please read our contributor guidelines:
    [Contributing to
    Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
      2. Ensure you have added or run the appropriate tests for your PR
      3. If the PR is work in progress, mark it a draft on GitHub.
      4. Please write your PR title to summarize what this PR proposes, we
        are following Conventional Commits style for PR titles as well.
      5. Be sure to keep the PR description updated to reflect all changes.
    -->
    
    ### What changes were proposed in this PR?
    <!--
    Please clarify what changes you are proposing. The purpose of this
    section
    is to outline the changes. Here are some tips for you:
      1. If you propose a new API, clarify the use case for a new API.
      2. If you fix a bug, you can clarify why it is a bug.
      3. If it is a refactoring, clarify what has been changed.
      3. It would be helpful to include a before-and-after comparison using
         screenshots or GIFs.
      4. Please consider writing useful notes for better and faster reviews.
    -->
    
    This PR fixes an bug in `QueryWorkerStatisticsHandler` that throws the
    following error frequently during any workflow execution:
    
    ```
    java.util.NoSuchElementException: None.get
            at scala.None$.get(Option.scala:627)
            at scala.None$.get(Option.scala:626)
            at 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution.getLatestOperatorExecution(WorkflowExecution.scala:123)
            at 
org.apache.texera.amber.engine.architecture.controller.promisehandlers.QueryWorkerStatisticsHandler.$anonfun$controllerInitiateQueryStatistics$2(QueryWorkerStatisticsHandler.scala:108)
            at scala.collection.immutable.List.flatMap(List.scala:293)
            at scala.collection.immutable.List.flatMap(List.scala:79)
            at 
org.apache.texera.amber.engine.architecture.controller.promisehandlers.QueryWorkerStatisticsHandler.processLayers$1(QueryWorkerStatisticsHandler.scala:103)
            at 
org.apache.texera.amber.engine.architecture.controller.promisehandlers.QueryWorkerStatisticsHandler.controllerInitiateQueryStatistics(QueryWorkerStatisticsHandler.scala:131)
    ````
    
    This error is triggered because `QueryWorkerStatisticsHandler` tries to
    retrieve the status of all operators in the physical plan, including
    those in regions that have not been scheduled to run.
    
    The fix is to only retrieve statuses for operators that have been
    created (i.e., have an existing status in `WorkflowExecution`).
    
    ### Any related issues, documentation, discussions?
    <!--
    Please use this section to link other resources if not mentioned
    already.
    1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
    #1234`
    or `Closes #1234`. If it is only related, simply mention the issue
    number.
      2. If there is design documentation, please add the link.
      3. If there is a discussion in the mailing list, please add the link.
    -->
    
    No.
    
    ### How was this PR tested?
    <!--
    If tests were added, say they were added here. Or simply mention that if
    the PR
    is tested with existing test cases. Make sure to include/update test
    cases that
    check the changes thoroughly including negative and positive cases if
    possible.
    If it was tested in a way different from regular unit tests, please
    clarify how
    you tested step by step, ideally copy and paste-able, so that other
    reviewers can
    test and check, and descendants can verify in the future. If tests were
    not added,
    please describe why they were not added and/or why it was difficult to
    add.
    -->
    
    Tested manually.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    <!--
    If generative AI tooling has been used in the process of authoring this
    PR,
    please include the phrase: 'Generated-by: ' followed by the name of the
    tool
    and its version. If no, write 'No'.
    Please refer to the [ASF Generative Tooling
    Guidance](https://www.apache.org/legal/generative-tooling.html) for
    details.
    -->
    
    No.
---
 .../controller/execution/WorkflowExecution.scala   | 18 ++++++++++--
 .../QueryWorkerStatisticsHandler.scala             | 32 +++++++++++++---------
 2 files changed, 34 insertions(+), 16 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
index dea9b692a4..b806479b89 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
@@ -119,10 +119,22 @@ case class WorkflowExecution() {
     * @throws NoSuchElementException if no `OperatorExecution` is found for 
the specified operatorId.
     */
   def getLatestOperatorExecution(physicalOpId: PhysicalOpIdentity): 
OperatorExecution = {
-    regionExecutions.values.toList
+    getLatestOperatorExecutionOption(physicalOpId).get
+  }
+
+  /**
+    * Returns the latest `OperatorExecution` for a physical operator if it has 
been initialized.
+    *
+    * This is the safe counterpart of `getLatestOperatorExecution` for callers 
that may traverse
+    * operators before their region is launched (e.g., full-graph stats 
queries while execution is still
+    * progressing through schedule levels).
+    */
+  def getLatestOperatorExecutionOption(
+      physicalOpId: PhysicalOpIdentity
+  ): Option[OperatorExecution] = {
+    regionExecutions.values.toSeq
       .findLast(regionExecution => 
regionExecution.hasOperatorExecution(physicalOpId))
-      .get
-      .getOperatorExecution(physicalOpId)
+      .map(_.getOperatorExecution(physicalOpId))
   }
 
   def isCompleted: Boolean = getState == WorkflowAggregatedState.COMPLETED
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
index fb725f850e..a7705170e3 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
@@ -105,20 +105,26 @@ trait QueryWorkerStatisticsHandler {
             if (opFilter.nonEmpty && !opFilter.contains(opId)) {
               Seq.empty
             } else {
-              val exec = cp.workflowExecution.getLatestOperatorExecution(opId)
-              // Skip completed operators
-              if (exec.getState == COMPLETED) {
-                Seq.empty
-              } else {
-                // Select all workers for this operator
-                val workerIds = exec.getWorkerIds
-
-                // Send queryStatistics to each worker and update internal 
state on reply
-                workerIds.map { wid =>
-                  workerInterface.queryStatistics(EmptyRequest(), wid).map { 
resp =>
-                    collectedResults.addOne((exec.getWorkerExecution(wid), 
resp, System.nanoTime()))
+              cp.workflowExecution.getLatestOperatorExecutionOption(opId) 
match {
+                // Operator region has not been initialized yet; skip in this 
polling round.
+                case None       => Seq.empty
+                case Some(exec) =>
+                  // Skip completed operators
+                  if (exec.getState == COMPLETED) {
+                    Seq.empty
+                  } else {
+                    // Select all workers for this operator
+                    val workerIds = exec.getWorkerIds
+
+                    // Send queryStatistics to each worker and update internal 
state on reply
+                    workerIds.map { wid =>
+                      workerInterface.queryStatistics(EmptyRequest(), wid).map 
{ resp =>
+                        collectedResults.addOne(
+                          (exec.getWorkerExecution(wid), resp, 
System.nanoTime())
+                        )
+                      }
+                    }
                   }
-                }
               }
             }
           }

Reply via email to