This is an automated email from the ASF dual-hosted git repository. xiaozhenliu pushed a commit to branch xiaozhen-fix-query-stats in repository https://gitbox.apache.org/repos/asf/texera.git
commit e39ce3f99e67955d744719705d8e96ce700ee2eb Author: Xiaozhen Liu <[email protected]> AuthorDate: Fri Feb 13 16:24:04 2026 -0800 fix(amber): fix getting uncreated region status in query statistics handler. --- .../controller/execution/WorkflowExecution.scala | 18 ++++++++++-- .../QueryWorkerStatisticsHandler.scala | 32 +++++++++++++--------- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index dea9b692a4..b806479b89 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -119,10 +119,22 @@ case class WorkflowExecution() { * @throws NoSuchElementException if no `OperatorExecution` is found for the specified operatorId. */ def getLatestOperatorExecution(physicalOpId: PhysicalOpIdentity): OperatorExecution = { - regionExecutions.values.toList + getLatestOperatorExecutionOption(physicalOpId).get + } + + /** + * Returns the latest `OperatorExecution` for a physical operator if it has been initialized. + * + * This is the safe counterpart of `getLatestOperatorExecution` for callers that may traverse + * operators before their region is launched (e.g., full-graph stats queries while execution is still + * progressing through schedule levels). + */ + def getLatestOperatorExecutionOption( + physicalOpId: PhysicalOpIdentity + ): Option[OperatorExecution] = { + regionExecutions.values.toSeq .findLast(regionExecution => regionExecution.hasOperatorExecution(physicalOpId)) - .get - .getOperatorExecution(physicalOpId) + .map(_.getOperatorExecution(physicalOpId)) } def isCompleted: Boolean = getState == WorkflowAggregatedState.COMPLETED diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala index fb725f850e..a7705170e3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/QueryWorkerStatisticsHandler.scala @@ -105,20 +105,26 @@ trait QueryWorkerStatisticsHandler { if (opFilter.nonEmpty && !opFilter.contains(opId)) { Seq.empty } else { - val exec = cp.workflowExecution.getLatestOperatorExecution(opId) - // Skip completed operators - if (exec.getState == COMPLETED) { - Seq.empty - } else { - // Select all workers for this operator - val workerIds = exec.getWorkerIds - - // Send queryStatistics to each worker and update internal state on reply - workerIds.map { wid => - workerInterface.queryStatistics(EmptyRequest(), wid).map { resp => - collectedResults.addOne((exec.getWorkerExecution(wid), resp, System.nanoTime())) + cp.workflowExecution.getLatestOperatorExecutionOption(opId) match { + // Operator region has not been initialized yet; skip in this polling round. + case None => Seq.empty + case Some(exec) => + // Skip completed operators + if (exec.getState == COMPLETED) { + Seq.empty + } else { + // Select all workers for this operator + val workerIds = exec.getWorkerIds + + // Send queryStatistics to each worker and update internal state on reply + workerIds.map { wid => + workerInterface.queryStatistics(EmptyRequest(), wid).map { resp => + collectedResults.addOne( + (exec.getWorkerExecution(wid), resp, System.nanoTime()) + ) + } + } } - } } } }
