This is an automated email from the ASF dual-hosted git repository. linxinyuan pushed a commit to branch xinyuan-region-color in repository https://gitbox.apache.org/repos/asf/texera.git
commit a0188a385a63e9b9c5f993d32b02c1a4783443a3 Author: Xinyuan Lin <[email protected]> AuthorDate: Thu Oct 23 20:35:16 2025 -0700 init --- .../engine/architecture/controller/Controller.scala | 4 ++-- .../scheduling/RegionExecutionCoordinator.scala | 15 ++++++++++++--- .../RegionStateEvent.scala} | 6 ++---- .../model/websocket/response/RegionUpdateEvent.scala | 2 +- .../workflow-editor/workflow-editor.component.ts | 19 +++++++++++++++---- .../execute-workflow/execute-workflow.service.ts | 16 +++++++++++++--- .../workspace/types/workflow-websocket.interface.ts | 8 +++++++- 7 files changed, 52 insertions(+), 18 deletions(-) diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala index 67f94398d3..354601dbe3 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala @@ -114,9 +114,9 @@ class Controller( attachRuntimeServicesToCPState() cp.workflowScheduler.updateSchedule(physicalPlan) - val regions: List[List[String]] = + val regions: List[(Long, List[String])] = cp.workflowScheduler.getSchedule.getRegions.map { region => - region.physicalOps.map(_.id.logicalOpId.id).toList + (region.id.id, region.physicalOps.map(_.id.logicalOpId.id).toList) } SessionState.getAllSessionStates.foreach { state => diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index bacc345a2c..2de7bbfa46 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -54,6 +54,8 @@ import org.apache.amber.engine.common.AmberLogging import org.apache.amber.engine.common.FutureBijection._ import org.apache.amber.engine.common.rpc.AsyncRPCClient import org.apache.amber.engine.common.virtualidentity.util.CONTROLLER +import org.apache.texera.web.SessionState +import org.apache.texera.web.model.websocket.event.RegionStateEvent import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource import java.util.concurrent.TimeUnit @@ -134,7 +136,7 @@ class RegionExecutionCoordinator( // Set this coordinator's status to be completed so that subsequent regions can be started by // WorkflowExecutionCoordinator. - currentPhaseRef.set(Completed) + setPhase(Completed) // Terminate all the workers in this region. terminateWorkers(regionExecution) @@ -223,7 +225,7 @@ class RegionExecutionCoordinator( } private def executeDependeePortPhase(): Future[Unit] = { - currentPhaseRef.set(ExecutingDependeePortsPhase) + setPhase(ExecutingDependeePortsPhase) if (!region.getOperators.exists(_.dependeeInputs.nonEmpty)) { // Skip to the next phase when there are no dependee input ports return syncStatusAndTransitionRegionExecutionPhase() @@ -239,7 +241,7 @@ class RegionExecutionCoordinator( } private def executeNonDependeePortPhase(): Future[Unit] = { - currentPhaseRef.set(ExecutingNonDependeePortsPhase) + setPhase(ExecutingNonDependeePortsPhase) // Allocate output port storage objects region.resourceConfig.get.portConfigs .collect { @@ -541,5 +543,12 @@ class RegionExecutionCoordinator( } } + private def setPhase(phase: RegionExecutionPhase): Unit = { + currentPhaseRef.set(phase) + SessionState.getAllSessionStates.foreach { state => + state.send(RegionStateEvent(region.id.id, phase.toString)) + } + } + override def actorId: ActorVirtualIdentity = CONTROLLER } diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala similarity index 79% copy from amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala copy to amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala index 1cf644ecac..2e16ba9243 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala @@ -17,8 +17,6 @@ * under the License. */ -package org.apache.texera.web.model.websocket.response +package org.apache.texera.web.model.websocket.event -import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent - -case class RegionUpdateEvent(regions: List[List[String]]) extends TexeraWebSocketEvent +case class RegionStateEvent(id: Long, state: String) extends TexeraWebSocketEvent diff --git a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala b/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala index 1cf644ecac..9578b28b46 100644 --- a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala +++ b/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala @@ -21,4 +21,4 @@ package org.apache.texera.web.model.websocket.response import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent -case class RegionUpdateEvent(regions: List[List[String]]) extends TexeraWebSocketEvent +case class RegionUpdateEvent(regions: List[(Long, List[String])]) extends TexeraWebSocketEvent diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index e7780e7aea..a1776bac79 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -340,7 +340,6 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy { attrs: { body: { - fill: "rgba(255,213,79,0.2)", pointerEvents: "none", class: "region", }, @@ -356,14 +355,14 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy this.executeWorkflowService .getRegionUpdateStream() .pipe(untilDestroyed(this)) - .subscribe(regions => { + .subscribe(event => { this.paper.model .getCells() .filter(element => element instanceof Region) .forEach(element => element.remove()); - regionMap = regions.map(region => { - const element = new Region(); + regionMap = event.regions.map(([id, region]) => { + const element = new Region({ id: "region-" + id }); const ops = region.map(id => this.paper.getModelById(id)); this.paper.model.addCell(element); this.updateRegionElement(element, ops); @@ -376,6 +375,18 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy .filter(region => region.operators.includes(operator)) .forEach(region => this.updateRegionElement(region.regionElement, region.operators)); }); + + this.executeWorkflowService + .getRegionStateStream() + .pipe(untilDestroyed(this)) + .subscribe(region => { + const colorMap: Record<string, string> = { + ExecutingDependeePortsPhase: "rgba(244,67,54,0.2)", // soft red + ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)", // warm amber yellow + Completed: "rgba(76,175,80,0.2)", // soft green + }; + this.paper.getModelById("region-" + region.id).attr("body/fill", colorMap[region.state]); + }); } private updateRegionElement(regionElement: joint.dia.Element, operators: joint.dia.Cell[]) { diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts index 3d9ca84b5a..cbddc01a66 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts @@ -31,6 +31,8 @@ import { import { WorkflowWebsocketService } from "../workflow-websocket/workflow-websocket.service"; import { OperatorCurrentTuples, + RegionStateEvent, + RegionUpdateEvent, ReplayExecutionInfo, TexeraWebsocketEvent, WorkflowFatalError, @@ -85,7 +87,8 @@ export class ExecuteWorkflowService { current: ExecutionStateInfo; }>(); - private regionUpdateStream = new Subject<readonly string[][]>(); + private regionUpdateStream = new Subject<RegionUpdateEvent>(); + private regionStateStream = new Subject<RegionStateEvent>(); // TODO: move this to another service, or redesign how this // information is stored on the frontend. @@ -102,7 +105,10 @@ export class ExecuteWorkflowService { workflowWebsocketService.websocketEvent().subscribe(event => { switch (event.type) { case "RegionUpdateEvent": - this.regionUpdateStream.next(event.regions); + this.regionUpdateStream.next(event); + break; + case "RegionStateEvent": + this.regionStateStream.next(event); break; case "WorkerAssignmentUpdateEvent": this.assignedWorkerIds.set(event.operatorId, event.workerIds); @@ -334,10 +340,14 @@ export class ExecuteWorkflowService { return this.executionStateStream.asObservable(); } - public getRegionUpdateStream(): Observable<readonly string[][]> { + public getRegionUpdateStream(): Observable<RegionUpdateEvent> { return this.regionUpdateStream.asObservable(); } + public getRegionStateStream(): Observable<RegionStateEvent> { + return this.regionStateStream.asObservable(); + } + public resetExecutionState(): void { this.currentState = { state: ExecutionState.Uninitialized, diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts b/frontend/src/app/workspace/types/workflow-websocket.interface.ts index 3ab8e34385..15e2a2809d 100644 --- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts +++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts @@ -174,7 +174,12 @@ export type ClusterStatusUpdateEvent = Readonly<{ }>; export type RegionUpdateEvent = Readonly<{ - regions: readonly string[][]; + regions: readonly [number, string[]][]; +}>; + +export type RegionStateEvent = Readonly<{ + id: number; + state: string; }>; export type ModifyLogicResponse = Readonly<{ @@ -234,6 +239,7 @@ export type TexeraWebsocketEventTypeMap = { ExecutionDurationUpdateEvent: ExecutionDurationUpdateEvent; ClusterStatusUpdateEvent: ClusterStatusUpdateEvent; RegionUpdateEvent: RegionUpdateEvent; + RegionStateEvent: RegionStateEvent; }; // helper type definitions to generate the request and event types
