This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 641fb857b5 fix(amber): fix getting uncreated region status in query
statistics handler (#4213)
641fb857b5 is described below
commit 641fb857b5fcabf8c48a90a2408453de7ebfd9a4
Author: Xiaozhen Liu <[email protected]>
AuthorDate: Fri Feb 13 17:53:35 2026 -0800
fix(amber): fix getting uncreated region status in query statistics handler
(#4213)
<!--
Thanks for sending a pull request (PR)! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
[Contributing to
Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
2. Ensure you have added or run the appropriate tests for your PR
3. If the PR is work in progress, mark it a draft on GitHub.
4. Please write your PR title to summarize what this PR proposes, we
are following Conventional Commits style for PR titles as well.
5. Be sure to keep the PR description updated to reflect all changes.
-->
### What changes were proposed in this PR?
<!--
Please clarify what changes you are proposing. The purpose of this
section
is to outline the changes. Here are some tips for you:
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
3. If it is a refactoring, clarify what has been changed.
3. It would be helpful to include a before-and-after comparison using
screenshots or GIFs.
4. Please consider writing useful notes for better and faster reviews.
-->
This PR fixes an bug in `QueryWorkerStatisticsHandler` that throws the
following error frequently during any workflow execution:
```
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:627)
at scala.None$.get(Option.scala:626)
at
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution.getLatestOperatorExecution(WorkflowExecution.scala:123)
at
org.apache.texera.amber.engine.architecture.controller.promisehandlers.QueryWorkerStatisticsHandler.$anonfun$controllerInitiateQueryStatistics$2(QueryWorkerStatisticsHandler.scala:108)
at scala.collection.immutable.List.flatMap(List.scala:293)
at scala.collection.immutable.List.flatMap(List.scala:79)
at
org.apache.texera.amber.engine.architecture.controller.promisehandlers.QueryWorkerStatisticsHandler.processLayers$1(QueryWorkerStatisticsHandler.scala:103)
at
org.apache.texera.amber.engine.architecture.controller.promisehandlers.QueryWorkerStatisticsHandler.controllerInitiateQueryStatistics(QueryWorkerStatisticsHandler.scala:131)
````
This error is triggered because `QueryWorkerStatisticsHandler` tries to
retrieve the status of all operators in the physical plan, including
those in regions that have not been scheduled to run.
The fix is to only retrieve statuses for operators that have been
created (i.e., have an existing status in `WorkflowExecution`).
### Any related issues, documentation, discussions?
<!--
Please use this section to link other resources if not mentioned
already.
1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
#1234`
or `Closes #1234`. If it is only related, simply mention the issue
number.
2. If there is design documentation, please add the link.
3. If there is a discussion in the mailing list, please add the link.
-->
No.
### How was this PR tested?
<!--
If tests were added, say they were added here. Or simply mention that if
the PR
is tested with existing test cases. Make sure to include/update test
cases that
check the changes thoroughly including negative and positive cases if
possible.
If it was tested in a way different from regular unit tests, please
clarify how
you tested step by step, ideally copy and paste-able, so that other
reviewers can
test and check, and descendants can verify in the future. If tests were
not added,
please describe why they were not added and/or why it was difficult to
add.
-->
Tested manually.
### Was this PR authored or co-authored using generative AI tooling?
<!--
If generative AI tooling has been used in the process of authoring this
PR,
please include the phrase: 'Generated-by: ' followed by the name of the
tool
and its version. If no, write 'No'.
Please refer to the [ASF Generative Tooling
Guidance](https://www.apache.org/legal/generative-tooling.html) for
details.
-->
No.
---
.../controller/execution/WorkflowExecution.scala | 18 ++++++++++--
.../QueryWorkerStatisticsHandler.scala | 32 +++++++++++++---------
2 files changed, 34 insertions(+), 16 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
index dea9b692a4..b806479b89 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala
@@ -119,10 +119,22 @@ case class WorkflowExecution() {
* @throws NoSuchElementException if no `OperatorExecution` is found for
the specified operatorId.
*/
def getLatestOperatorExecution(physicalOpId: PhysicalOpIdentity):
OperatorExecution = {
- regionExecutions.values.toList
+ getLatestOperatorExecutionOption(physicalOpId).get
+ }
+
+ /**
+ * Returns the latest `OperatorExecution` for a physical operator if it has
been initialized.
+ *
+ * This is the safe counterpart of `getLatestOperatorExecution` for callers
that may traverse
+ * operators before their region is launched (e.g., full-graph stats
queries while execution is still
+ * progressing through schedule levels).
+ */
+ def getLatestOperatorExecutionOption(
+ physicalOpId: PhysicalOpIdentity
+ ): Option[OperatorExecution] = {
+ regionExecutions.values.toSeq
.findLast(regionExecution =>
regionExecution.hasOperatorExecution(physicalOpId))
- .get
- .getOperatorExecution(physicalOpId)
+ .map(_.getOperatorExecution(physicalOpId))
}
def isCompleted: Boolean = getState == WorkflowAggregatedState.COMPLETED
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
index fb725f850e..a7705170e3 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala
@@ -105,20 +105,26 @@ trait QueryWorkerStatisticsHandler {
if (opFilter.nonEmpty && !opFilter.contains(opId)) {
Seq.empty
} else {
- val exec = cp.workflowExecution.getLatestOperatorExecution(opId)
- // Skip completed operators
- if (exec.getState == COMPLETED) {
- Seq.empty
- } else {
- // Select all workers for this operator
- val workerIds = exec.getWorkerIds
-
- // Send queryStatistics to each worker and update internal
state on reply
- workerIds.map { wid =>
- workerInterface.queryStatistics(EmptyRequest(), wid).map {
resp =>
- collectedResults.addOne((exec.getWorkerExecution(wid),
resp, System.nanoTime()))
+ cp.workflowExecution.getLatestOperatorExecutionOption(opId)
match {
+ // Operator region has not been initialized yet; skip in this
polling round.
+ case None => Seq.empty
+ case Some(exec) =>
+ // Skip completed operators
+ if (exec.getState == COMPLETED) {
+ Seq.empty
+ } else {
+ // Select all workers for this operator
+ val workerIds = exec.getWorkerIds
+
+ // Send queryStatistics to each worker and update internal
state on reply
+ workerIds.map { wid =>
+ workerInterface.queryStatistics(EmptyRequest(), wid).map
{ resp =>
+ collectedResults.addOne(
+ (exec.getWorkerExecution(wid), resp,
System.nanoTime())
+ )
+ }
+ }
}
- }
}
}
}