This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-region-color
in repository https://gitbox.apache.org/repos/asf/texera.git

commit a0188a385a63e9b9c5f993d32b02c1a4783443a3
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Oct 23 20:35:16 2025 -0700

    init
---
 .../engine/architecture/controller/Controller.scala   |  4 ++--
 .../scheduling/RegionExecutionCoordinator.scala       | 15 ++++++++++++---
 .../RegionStateEvent.scala}                           |  6 ++----
 .../model/websocket/response/RegionUpdateEvent.scala  |  2 +-
 .../workflow-editor/workflow-editor.component.ts      | 19 +++++++++++++++----
 .../execute-workflow/execute-workflow.service.ts      | 16 +++++++++++++---
 .../workspace/types/workflow-websocket.interface.ts   |  8 +++++++-
 7 files changed, 52 insertions(+), 18 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
index 67f94398d3..354601dbe3 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
@@ -114,9 +114,9 @@ class Controller(
     attachRuntimeServicesToCPState()
     cp.workflowScheduler.updateSchedule(physicalPlan)
 
-    val regions: List[List[String]] =
+    val regions: List[(Long, List[String])] =
       cp.workflowScheduler.getSchedule.getRegions.map { region =>
-        region.physicalOps.map(_.id.logicalOpId.id).toList
+        (region.id.id, region.physicalOps.map(_.id.logicalOpId.id).toList)
       }
 
     SessionState.getAllSessionStates.foreach { state =>
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index bacc345a2c..2de7bbfa46 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -54,6 +54,8 @@ import org.apache.amber.engine.common.AmberLogging
 import org.apache.amber.engine.common.FutureBijection._
 import org.apache.amber.engine.common.rpc.AsyncRPCClient
 import org.apache.amber.engine.common.virtualidentity.util.CONTROLLER
+import org.apache.texera.web.SessionState
+import org.apache.texera.web.model.websocket.event.RegionStateEvent
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
 
 import java.util.concurrent.TimeUnit
@@ -134,7 +136,7 @@ class RegionExecutionCoordinator(
 
     // Set this coordinator's status to be completed so that subsequent 
regions can be started by
     // WorkflowExecutionCoordinator.
-    currentPhaseRef.set(Completed)
+    setPhase(Completed)
 
     // Terminate all the workers in this region.
     terminateWorkers(regionExecution)
@@ -223,7 +225,7 @@ class RegionExecutionCoordinator(
     }
 
   private def executeDependeePortPhase(): Future[Unit] = {
-    currentPhaseRef.set(ExecutingDependeePortsPhase)
+    setPhase(ExecutingDependeePortsPhase)
     if (!region.getOperators.exists(_.dependeeInputs.nonEmpty)) {
       // Skip to the next phase when there are no dependee input ports
       return syncStatusAndTransitionRegionExecutionPhase()
@@ -239,7 +241,7 @@ class RegionExecutionCoordinator(
   }
 
   private def executeNonDependeePortPhase(): Future[Unit] = {
-    currentPhaseRef.set(ExecutingNonDependeePortsPhase)
+    setPhase(ExecutingNonDependeePortsPhase)
     // Allocate output port storage objects
     region.resourceConfig.get.portConfigs
       .collect {
@@ -541,5 +543,12 @@ class RegionExecutionCoordinator(
     }
   }
 
+  private def setPhase(phase: RegionExecutionPhase): Unit = {
+    currentPhaseRef.set(phase)
+    SessionState.getAllSessionStates.foreach { state =>
+      state.send(RegionStateEvent(region.id.id, phase.toString))
+    }
+  }
+
   override def actorId: ActorVirtualIdentity = CONTROLLER
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala
similarity index 79%
copy from 
amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
copy to 
amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala
index 1cf644ecac..2e16ba9243 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala
@@ -17,8 +17,6 @@
  * under the License.
  */
 
-package org.apache.texera.web.model.websocket.response
+package org.apache.texera.web.model.websocket.event
 
-import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
-
-case class RegionUpdateEvent(regions: List[List[String]]) extends 
TexeraWebSocketEvent
+case class RegionStateEvent(id: Long, state: String) extends 
TexeraWebSocketEvent
diff --git 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
index 1cf644ecac..9578b28b46 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
@@ -21,4 +21,4 @@ package org.apache.texera.web.model.websocket.response
 
 import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
 
-case class RegionUpdateEvent(regions: List[List[String]]) extends 
TexeraWebSocketEvent
+case class RegionUpdateEvent(regions: List[(Long, List[String])]) extends 
TexeraWebSocketEvent
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
index e7780e7aea..a1776bac79 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
+++ 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
@@ -340,7 +340,6 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
       {
         attrs: {
           body: {
-            fill: "rgba(255,213,79,0.2)",
             pointerEvents: "none",
             class: "region",
           },
@@ -356,14 +355,14 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
     this.executeWorkflowService
       .getRegionUpdateStream()
       .pipe(untilDestroyed(this))
-      .subscribe(regions => {
+      .subscribe(event => {
         this.paper.model
           .getCells()
           .filter(element => element instanceof Region)
           .forEach(element => element.remove());
 
-        regionMap = regions.map(region => {
-          const element = new Region();
+        regionMap = event.regions.map(([id, region]) => {
+          const element = new Region({ id: "region-" + id });
           const ops = region.map(id => this.paper.getModelById(id));
           this.paper.model.addCell(element);
           this.updateRegionElement(element, ops);
@@ -376,6 +375,18 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
         .filter(region => region.operators.includes(operator))
         .forEach(region => this.updateRegionElement(region.regionElement, 
region.operators));
     });
+
+    this.executeWorkflowService
+      .getRegionStateStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(region => {
+        const colorMap: Record<string, string> = {
+          ExecutingDependeePortsPhase: "rgba(244,67,54,0.2)", // soft red
+          ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)", // warm 
amber yellow
+          Completed: "rgba(76,175,80,0.2)", // soft green
+        };
+        this.paper.getModelById("region-" + region.id).attr("body/fill", 
colorMap[region.state]);
+      });
   }
 
   private updateRegionElement(regionElement: joint.dia.Element, operators: 
joint.dia.Cell[]) {
diff --git 
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
 
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
index 3d9ca84b5a..cbddc01a66 100644
--- 
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
+++ 
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
@@ -31,6 +31,8 @@ import {
 import { WorkflowWebsocketService } from 
"../workflow-websocket/workflow-websocket.service";
 import {
   OperatorCurrentTuples,
+  RegionStateEvent,
+  RegionUpdateEvent,
   ReplayExecutionInfo,
   TexeraWebsocketEvent,
   WorkflowFatalError,
@@ -85,7 +87,8 @@ export class ExecuteWorkflowService {
     current: ExecutionStateInfo;
   }>();
 
-  private regionUpdateStream = new Subject<readonly string[][]>();
+  private regionUpdateStream = new Subject<RegionUpdateEvent>();
+  private regionStateStream = new Subject<RegionStateEvent>();
 
   // TODO: move this to another service, or redesign how this
   //   information is stored on the frontend.
@@ -102,7 +105,10 @@ export class ExecuteWorkflowService {
     workflowWebsocketService.websocketEvent().subscribe(event => {
       switch (event.type) {
         case "RegionUpdateEvent":
-          this.regionUpdateStream.next(event.regions);
+          this.regionUpdateStream.next(event);
+          break;
+        case "RegionStateEvent":
+          this.regionStateStream.next(event);
           break;
         case "WorkerAssignmentUpdateEvent":
           this.assignedWorkerIds.set(event.operatorId, event.workerIds);
@@ -334,10 +340,14 @@ export class ExecuteWorkflowService {
     return this.executionStateStream.asObservable();
   }
 
-  public getRegionUpdateStream(): Observable<readonly string[][]> {
+  public getRegionUpdateStream(): Observable<RegionUpdateEvent> {
     return this.regionUpdateStream.asObservable();
   }
 
+  public getRegionStateStream(): Observable<RegionStateEvent> {
+    return this.regionStateStream.asObservable();
+  }
+
   public resetExecutionState(): void {
     this.currentState = {
       state: ExecutionState.Uninitialized,
diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts 
b/frontend/src/app/workspace/types/workflow-websocket.interface.ts
index 3ab8e34385..15e2a2809d 100644
--- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts
+++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts
@@ -174,7 +174,12 @@ export type ClusterStatusUpdateEvent = Readonly<{
 }>;
 
 export type RegionUpdateEvent = Readonly<{
-  regions: readonly string[][];
+  regions: readonly [number, string[]][];
+}>;
+
+export type RegionStateEvent = Readonly<{
+  id: number;
+  state: string;
 }>;
 
 export type ModifyLogicResponse = Readonly<{
@@ -234,6 +239,7 @@ export type TexeraWebsocketEventTypeMap = {
   ExecutionDurationUpdateEvent: ExecutionDurationUpdateEvent;
   ClusterStatusUpdateEvent: ClusterStatusUpdateEvent;
   RegionUpdateEvent: RegionUpdateEvent;
+  RegionStateEvent: RegionStateEvent;
 };
 
 // helper type definitions to generate the request and event types

Reply via email to