This is an automated email from the ASF dual-hosted git repository. linxinyuan pushed a commit to branch xinyuan-region-gui in repository https://gitbox.apache.org/repos/asf/texera.git
commit 8eb1b9808e572d7b212eddec41058bd738c6e1c1 Author: Xinyuan Lin <[email protected]> AuthorDate: Sun Sep 21 20:23:38 2025 -0700 init --- .../engine/architecture/controller/ClientEvent.scala | 2 ++ .../amber/engine/architecture/controller/Controller.scala | 11 +++++++++++ .../architecture/controller/WorkflowScheduler.scala | 2 +- .../amber/engine/architecture/scheduling/Schedule.scala | 2 ++ .../ics/texera/web/service/WorkflowExecutionService.scala | 15 +++++++++++++++ 5 files changed, 31 insertions(+), 1 deletion(-) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ClientEvent.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ClientEvent.scala index d9cfe9c689..7cd0f91c2f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ClientEvent.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/ClientEvent.scala @@ -46,3 +46,5 @@ case class UpdateExecutorCompleted(id: ActorVirtualIdentity) extends ClientEvent final case class ReplayStatusUpdate(id: ActorVirtualIdentity, status: Boolean) extends ClientEvent final case class WorkflowRecoveryStatus(isRecovering: Boolean) extends ClientEvent + +final case class WorkflowRecoveryStatus(isRecovering: Boolean) extends ClientEvent diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/Controller.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/Controller.scala index bd54fd2613..8ce4a8eb4e 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/Controller.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/Controller.scala @@ -111,6 +111,17 @@ class Controller( override def initState(): Unit = { attachRuntimeServicesToCPState() cp.workflowScheduler.updateSchedule(physicalPlan) + + println("ewrfewf", cp.workflowScheduler.schedule.getRegions) + + + cp.asyncRPCClient.sendToClient( + ExecutionStatsUpdate( + cp.workflowExecution.getAllRegionExecutionsStats + ) + ) + + val controllerRestoreConf = controllerConfig.stateRestoreConfOpt if (controllerRestoreConf.isDefined) { globalReplayManager.markRecoveryStatus(CONTROLLER, isRecovering = true) diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala index 732b970f03..78053d056b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -32,7 +32,7 @@ class WorkflowScheduler( actorId: ActorVirtualIdentity ) extends java.io.Serializable { var physicalPlan: PhysicalPlan = _ - private var schedule: Schedule = _ + var schedule: Schedule = _ /** * Update the schedule to be executed, based on the given physicalPlan. diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Schedule.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Schedule.scala index c3ecbc1ae7..edc4d5848c 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Schedule.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/scheduling/Schedule.scala @@ -22,6 +22,8 @@ package edu.uci.ics.amber.engine.architecture.scheduling case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { private var currentLevel = levelSets.keys.minOption.getOrElse(0) + def getRegions: List[Region] = levelSets.values.flatten.toList + override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel) override def next(): Set[Region] = { diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala index b6c8cbb88e..7e09bc3513 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/WorkflowExecutionService.scala @@ -90,6 +90,21 @@ class WorkflowExecutionService( }) ) + addSubscription( + client + .registerCallback[ExecutionStatsUpdate]((evt: ExecutionStatsUpdate) => { + stateStore.statsStore.updateState { statsStore => + statsStore.withOperatorInfo(evt.operatorMetrics) + } + metricsPersistThread.foreach { thread => + thread.execute(() => { + storeRuntimeStatistics(computeStatsDiff(evt.operatorMetrics)) + lastPersistedMetrics = Some(evt.operatorMetrics) + }) + } + }) + ) + private def createStateEvent(state: ExecutionMetadataStore): WorkflowStateEvent = { if (state.isRecovering && state.state != COMPLETED) { WorkflowStateEvent("Recovering")
