This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 768e349f65 feat(frontend): region state color (#3999)
768e349f65 is described below

commit 768e349f652bcd9e3175ad0f2c02f912af538f0a
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Oct 29 20:00:31 2025 -0700

    feat(frontend): region state color (#3999)
    
    ### What changes were proposed in this PR?
    
    This PR introduces four colors to represent the region states on the
    frontend, corresponding to the four existing region states:
    
    | Grey  | Blue | Yellow | Green|
    | :---: | :---: | :---: | :---: |
    | `Unexecuted` | `ExecutingDependeePortsPhase` |
    `ExecutingNonDependeePortsPhase` | `Completed`|
    | <img width="130" height="130"
    
src="https://github.com/user-attachments/assets/830012c2-d503-4c91-83ef-3173f88cb311";
    /> | <img width="130" height="130"
    
src="https://github.com/user-attachments/assets/09204a34-236f-4aa4-8d6a-87c2e53d7f25";
    /> |<img width="130" height="130"
    
src="https://github.com/user-attachments/assets/cc914d7f-074b-4443-8a48-e9cdd1ce76b4";
    /> | <img width="130" height="130"
    
src="https://github.com/user-attachments/assets/abe5cd28-9dba-4c66-8e8c-19415a97af06";
    /> |
    
    
    
    
    Demo:
    
    
![505681949-d43629f7-5276-431b-9666-1bac06b065aa](https://github.com/user-attachments/assets/b223d1bb-ef3f-4d83-bacb-aa0884ca71eb)
    
    
    ### Any related issues, documentation, discussions?
    Fix issue #4000.
    
    ### How was this PR tested?
    The PR is tested with existing test cases.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    No
---
 .../architecture/controller/Controller.scala       |  4 ++--
 .../scheduling/RegionExecutionCoordinator.scala    | 15 ++++++++++---
 .../RegionStateEvent.scala}                        |  6 ++----
 .../websocket/response/RegionUpdateEvent.scala     |  2 +-
 .../workflow-editor/workflow-editor.component.ts   | 25 ++++++++++++++++------
 .../execute-workflow/execute-workflow.service.ts   | 16 +++++++++++---
 .../types/workflow-websocket.interface.ts          |  8 ++++++-
 7 files changed, 56 insertions(+), 20 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
index 67f94398d3..354601dbe3 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
@@ -114,9 +114,9 @@ class Controller(
     attachRuntimeServicesToCPState()
     cp.workflowScheduler.updateSchedule(physicalPlan)
 
-    val regions: List[List[String]] =
+    val regions: List[(Long, List[String])] =
       cp.workflowScheduler.getSchedule.getRegions.map { region =>
-        region.physicalOps.map(_.id.logicalOpId.id).toList
+        (region.id.id, region.physicalOps.map(_.id.logicalOpId.id).toList)
       }
 
     SessionState.getAllSessionStates.foreach { state =>
diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index bacc345a2c..2de7bbfa46 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -54,6 +54,8 @@ import org.apache.amber.engine.common.AmberLogging
 import org.apache.amber.engine.common.FutureBijection._
 import org.apache.amber.engine.common.rpc.AsyncRPCClient
 import org.apache.amber.engine.common.virtualidentity.util.CONTROLLER
+import org.apache.texera.web.SessionState
+import org.apache.texera.web.model.websocket.event.RegionStateEvent
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
 
 import java.util.concurrent.TimeUnit
@@ -134,7 +136,7 @@ class RegionExecutionCoordinator(
 
     // Set this coordinator's status to be completed so that subsequent 
regions can be started by
     // WorkflowExecutionCoordinator.
-    currentPhaseRef.set(Completed)
+    setPhase(Completed)
 
     // Terminate all the workers in this region.
     terminateWorkers(regionExecution)
@@ -223,7 +225,7 @@ class RegionExecutionCoordinator(
     }
 
   private def executeDependeePortPhase(): Future[Unit] = {
-    currentPhaseRef.set(ExecutingDependeePortsPhase)
+    setPhase(ExecutingDependeePortsPhase)
     if (!region.getOperators.exists(_.dependeeInputs.nonEmpty)) {
       // Skip to the next phase when there are no dependee input ports
       return syncStatusAndTransitionRegionExecutionPhase()
@@ -239,7 +241,7 @@ class RegionExecutionCoordinator(
   }
 
   private def executeNonDependeePortPhase(): Future[Unit] = {
-    currentPhaseRef.set(ExecutingNonDependeePortsPhase)
+    setPhase(ExecutingNonDependeePortsPhase)
     // Allocate output port storage objects
     region.resourceConfig.get.portConfigs
       .collect {
@@ -541,5 +543,12 @@ class RegionExecutionCoordinator(
     }
   }
 
+  private def setPhase(phase: RegionExecutionPhase): Unit = {
+    currentPhaseRef.set(phase)
+    SessionState.getAllSessionStates.foreach { state =>
+      state.send(RegionStateEvent(region.id.id, phase.toString))
+    }
+  }
+
   override def actorId: ActorVirtualIdentity = CONTROLLER
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala
similarity index 79%
copy from 
amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
copy to 
amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala
index 1cf644ecac..2e16ba9243 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala
@@ -17,8 +17,6 @@
  * under the License.
  */
 
-package org.apache.texera.web.model.websocket.response
+package org.apache.texera.web.model.websocket.event
 
-import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
-
-case class RegionUpdateEvent(regions: List[List[String]]) extends 
TexeraWebSocketEvent
+case class RegionStateEvent(id: Long, state: String) extends 
TexeraWebSocketEvent
diff --git 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
index 1cf644ecac..9578b28b46 100644
--- 
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
+++ 
b/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
@@ -21,4 +21,4 @@ package org.apache.texera.web.model.websocket.response
 
 import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
 
-case class RegionUpdateEvent(regions: List[List[String]]) extends 
TexeraWebSocketEvent
+case class RegionUpdateEvent(regions: List[(Long, List[String])]) extends 
TexeraWebSocketEvent
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
index e7780e7aea..ca76a5d2f2 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
+++ 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
@@ -163,7 +163,7 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
     this.handlePortHighlightEvent();
     this.registerPortDisplayNameChangeHandler();
     this.handleOperatorStatisticsUpdate();
-    this.handleRegionUpdate();
+    this.handleRegionEvents();
     this.handleOperatorSuggestionHighlightEvent();
     this.handleElementDelete();
     this.handleElementSelectAll();
@@ -333,14 +333,14 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
       });
   }
 
-  private handleRegionUpdate(): void {
+  private handleRegionEvents(): void {
     this.editor.classList.add("hide-region");
     const Region = joint.dia.Element.define(
       "region",
       {
         attrs: {
           body: {
-            fill: "rgba(255,213,79,0.2)",
+            fill: "rgba(158,158,158,0.2)",
             pointerEvents: "none",
             class: "region",
           },
@@ -356,14 +356,14 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
     this.executeWorkflowService
       .getRegionUpdateStream()
       .pipe(untilDestroyed(this))
-      .subscribe(regions => {
+      .subscribe(event => {
         this.paper.model
           .getCells()
           .filter(element => element instanceof Region)
           .forEach(element => element.remove());
 
-        regionMap = regions.map(region => {
-          const element = new Region();
+        regionMap = event.regions.map(([id, region]) => {
+          const element = new Region({ id: "region-" + id });
           const ops = region.map(id => this.paper.getModelById(id));
           this.paper.model.addCell(element);
           this.updateRegionElement(element, ops);
@@ -376,6 +376,19 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
         .filter(region => region.operators.includes(operator))
         .forEach(region => this.updateRegionElement(region.regionElement, 
region.operators));
     });
+
+    // update region element colors on execution
+    this.executeWorkflowService
+      .getRegionStateStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(region => {
+        const colorMap: Record<string, string> = {
+          ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)",
+          ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)",
+          Completed: "rgba(76,175,80,0.2)",
+        };
+        this.paper.getModelById("region-" + region.id).attr("body/fill", 
colorMap[region.state]);
+      });
   }
 
   private updateRegionElement(regionElement: joint.dia.Element, operators: 
joint.dia.Cell[]) {
diff --git 
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
 
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
index 3d9ca84b5a..cbddc01a66 100644
--- 
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
+++ 
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
@@ -31,6 +31,8 @@ import {
 import { WorkflowWebsocketService } from 
"../workflow-websocket/workflow-websocket.service";
 import {
   OperatorCurrentTuples,
+  RegionStateEvent,
+  RegionUpdateEvent,
   ReplayExecutionInfo,
   TexeraWebsocketEvent,
   WorkflowFatalError,
@@ -85,7 +87,8 @@ export class ExecuteWorkflowService {
     current: ExecutionStateInfo;
   }>();
 
-  private regionUpdateStream = new Subject<readonly string[][]>();
+  private regionUpdateStream = new Subject<RegionUpdateEvent>();
+  private regionStateStream = new Subject<RegionStateEvent>();
 
   // TODO: move this to another service, or redesign how this
   //   information is stored on the frontend.
@@ -102,7 +105,10 @@ export class ExecuteWorkflowService {
     workflowWebsocketService.websocketEvent().subscribe(event => {
       switch (event.type) {
         case "RegionUpdateEvent":
-          this.regionUpdateStream.next(event.regions);
+          this.regionUpdateStream.next(event);
+          break;
+        case "RegionStateEvent":
+          this.regionStateStream.next(event);
           break;
         case "WorkerAssignmentUpdateEvent":
           this.assignedWorkerIds.set(event.operatorId, event.workerIds);
@@ -334,10 +340,14 @@ export class ExecuteWorkflowService {
     return this.executionStateStream.asObservable();
   }
 
-  public getRegionUpdateStream(): Observable<readonly string[][]> {
+  public getRegionUpdateStream(): Observable<RegionUpdateEvent> {
     return this.regionUpdateStream.asObservable();
   }
 
+  public getRegionStateStream(): Observable<RegionStateEvent> {
+    return this.regionStateStream.asObservable();
+  }
+
   public resetExecutionState(): void {
     this.currentState = {
       state: ExecutionState.Uninitialized,
diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts 
b/frontend/src/app/workspace/types/workflow-websocket.interface.ts
index 3ab8e34385..15e2a2809d 100644
--- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts
+++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts
@@ -174,7 +174,12 @@ export type ClusterStatusUpdateEvent = Readonly<{
 }>;
 
 export type RegionUpdateEvent = Readonly<{
-  regions: readonly string[][];
+  regions: readonly [number, string[]][];
+}>;
+
+export type RegionStateEvent = Readonly<{
+  id: number;
+  state: string;
 }>;
 
 export type ModifyLogicResponse = Readonly<{
@@ -234,6 +239,7 @@ export type TexeraWebsocketEventTypeMap = {
   ExecutionDurationUpdateEvent: ExecutionDurationUpdateEvent;
   ClusterStatusUpdateEvent: ClusterStatusUpdateEvent;
   RegionUpdateEvent: RegionUpdateEvent;
+  RegionStateEvent: RegionStateEvent;
 };
 
 // helper type definitions to generate the request and event types

Reply via email to