This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 768e349f65 feat(frontend): region state color (#3999)
768e349f65 is described below
commit 768e349f652bcd9e3175ad0f2c02f912af538f0a
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Oct 29 20:00:31 2025 -0700
feat(frontend): region state color (#3999)
### What changes were proposed in this PR?
This PR introduces four colors to represent the region states on the
frontend, corresponding to the four existing region states:
| Grey | Blue | Yellow | Green|
| :---: | :---: | :---: | :---: |
| `Unexecuted` | `ExecutingDependeePortsPhase` |
`ExecutingNonDependeePortsPhase` | `Completed`|
| <img width="130" height="130"
src="https://github.com/user-attachments/assets/830012c2-d503-4c91-83ef-3173f88cb311"
/> | <img width="130" height="130"
src="https://github.com/user-attachments/assets/09204a34-236f-4aa4-8d6a-87c2e53d7f25"
/> |<img width="130" height="130"
src="https://github.com/user-attachments/assets/cc914d7f-074b-4443-8a48-e9cdd1ce76b4"
/> | <img width="130" height="130"
src="https://github.com/user-attachments/assets/abe5cd28-9dba-4c66-8e8c-19415a97af06"
/> |
Demo:

### Any related issues, documentation, discussions?
Fix issue #4000.
### How was this PR tested?
The PR is tested with existing test cases.
### Was this PR authored or co-authored using generative AI tooling?
No
---
.../architecture/controller/Controller.scala | 4 ++--
.../scheduling/RegionExecutionCoordinator.scala | 15 ++++++++++---
.../RegionStateEvent.scala} | 6 ++----
.../websocket/response/RegionUpdateEvent.scala | 2 +-
.../workflow-editor/workflow-editor.component.ts | 25 ++++++++++++++++------
.../execute-workflow/execute-workflow.service.ts | 16 +++++++++++---
.../types/workflow-websocket.interface.ts | 8 ++++++-
7 files changed, 56 insertions(+), 20 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
index 67f94398d3..354601dbe3 100644
---
a/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
+++
b/amber/src/main/scala/org/apache/amber/engine/architecture/controller/Controller.scala
@@ -114,9 +114,9 @@ class Controller(
attachRuntimeServicesToCPState()
cp.workflowScheduler.updateSchedule(physicalPlan)
- val regions: List[List[String]] =
+ val regions: List[(Long, List[String])] =
cp.workflowScheduler.getSchedule.getRegions.map { region =>
- region.physicalOps.map(_.id.logicalOpId.id).toList
+ (region.id.id, region.physicalOps.map(_.id.logicalOpId.id).toList)
}
SessionState.getAllSessionStates.foreach { state =>
diff --git
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index bacc345a2c..2de7bbfa46 100644
---
a/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -54,6 +54,8 @@ import org.apache.amber.engine.common.AmberLogging
import org.apache.amber.engine.common.FutureBijection._
import org.apache.amber.engine.common.rpc.AsyncRPCClient
import org.apache.amber.engine.common.virtualidentity.util.CONTROLLER
+import org.apache.texera.web.SessionState
+import org.apache.texera.web.model.websocket.event.RegionStateEvent
import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
import java.util.concurrent.TimeUnit
@@ -134,7 +136,7 @@ class RegionExecutionCoordinator(
// Set this coordinator's status to be completed so that subsequent
regions can be started by
// WorkflowExecutionCoordinator.
- currentPhaseRef.set(Completed)
+ setPhase(Completed)
// Terminate all the workers in this region.
terminateWorkers(regionExecution)
@@ -223,7 +225,7 @@ class RegionExecutionCoordinator(
}
private def executeDependeePortPhase(): Future[Unit] = {
- currentPhaseRef.set(ExecutingDependeePortsPhase)
+ setPhase(ExecutingDependeePortsPhase)
if (!region.getOperators.exists(_.dependeeInputs.nonEmpty)) {
// Skip to the next phase when there are no dependee input ports
return syncStatusAndTransitionRegionExecutionPhase()
@@ -239,7 +241,7 @@ class RegionExecutionCoordinator(
}
private def executeNonDependeePortPhase(): Future[Unit] = {
- currentPhaseRef.set(ExecutingNonDependeePortsPhase)
+ setPhase(ExecutingNonDependeePortsPhase)
// Allocate output port storage objects
region.resourceConfig.get.portConfigs
.collect {
@@ -541,5 +543,12 @@ class RegionExecutionCoordinator(
}
}
+ private def setPhase(phase: RegionExecutionPhase): Unit = {
+ currentPhaseRef.set(phase)
+ SessionState.getAllSessionStates.foreach { state =>
+ state.send(RegionStateEvent(region.id.id, phase.toString))
+ }
+ }
+
override def actorId: ActorVirtualIdentity = CONTROLLER
}
diff --git
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala
similarity index 79%
copy from
amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
copy to
amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala
index 1cf644ecac..2e16ba9243 100644
---
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
+++
b/amber/src/main/scala/org/apache/texera/web/model/websocket/event/RegionStateEvent.scala
@@ -17,8 +17,6 @@
* under the License.
*/
-package org.apache.texera.web.model.websocket.response
+package org.apache.texera.web.model.websocket.event
-import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
-
-case class RegionUpdateEvent(regions: List[List[String]]) extends
TexeraWebSocketEvent
+case class RegionStateEvent(id: Long, state: String) extends
TexeraWebSocketEvent
diff --git
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
b/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
index 1cf644ecac..9578b28b46 100644
---
a/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
+++
b/amber/src/main/scala/org/apache/texera/web/model/websocket/response/RegionUpdateEvent.scala
@@ -21,4 +21,4 @@ package org.apache.texera.web.model.websocket.response
import org.apache.texera.web.model.websocket.event.TexeraWebSocketEvent
-case class RegionUpdateEvent(regions: List[List[String]]) extends
TexeraWebSocketEvent
+case class RegionUpdateEvent(regions: List[(Long, List[String])]) extends
TexeraWebSocketEvent
diff --git
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
index e7780e7aea..ca76a5d2f2 100644
---
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
+++
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
@@ -163,7 +163,7 @@ export class WorkflowEditorComponent implements OnInit,
AfterViewInit, OnDestroy
this.handlePortHighlightEvent();
this.registerPortDisplayNameChangeHandler();
this.handleOperatorStatisticsUpdate();
- this.handleRegionUpdate();
+ this.handleRegionEvents();
this.handleOperatorSuggestionHighlightEvent();
this.handleElementDelete();
this.handleElementSelectAll();
@@ -333,14 +333,14 @@ export class WorkflowEditorComponent implements OnInit,
AfterViewInit, OnDestroy
});
}
- private handleRegionUpdate(): void {
+ private handleRegionEvents(): void {
this.editor.classList.add("hide-region");
const Region = joint.dia.Element.define(
"region",
{
attrs: {
body: {
- fill: "rgba(255,213,79,0.2)",
+ fill: "rgba(158,158,158,0.2)",
pointerEvents: "none",
class: "region",
},
@@ -356,14 +356,14 @@ export class WorkflowEditorComponent implements OnInit,
AfterViewInit, OnDestroy
this.executeWorkflowService
.getRegionUpdateStream()
.pipe(untilDestroyed(this))
- .subscribe(regions => {
+ .subscribe(event => {
this.paper.model
.getCells()
.filter(element => element instanceof Region)
.forEach(element => element.remove());
- regionMap = regions.map(region => {
- const element = new Region();
+ regionMap = event.regions.map(([id, region]) => {
+ const element = new Region({ id: "region-" + id });
const ops = region.map(id => this.paper.getModelById(id));
this.paper.model.addCell(element);
this.updateRegionElement(element, ops);
@@ -376,6 +376,19 @@ export class WorkflowEditorComponent implements OnInit,
AfterViewInit, OnDestroy
.filter(region => region.operators.includes(operator))
.forEach(region => this.updateRegionElement(region.regionElement,
region.operators));
});
+
+ // update region element colors on execution
+ this.executeWorkflowService
+ .getRegionStateStream()
+ .pipe(untilDestroyed(this))
+ .subscribe(region => {
+ const colorMap: Record<string, string> = {
+ ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)",
+ ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)",
+ Completed: "rgba(76,175,80,0.2)",
+ };
+ this.paper.getModelById("region-" + region.id).attr("body/fill",
colorMap[region.state]);
+ });
}
private updateRegionElement(regionElement: joint.dia.Element, operators:
joint.dia.Cell[]) {
diff --git
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
index 3d9ca84b5a..cbddc01a66 100644
---
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
+++
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
@@ -31,6 +31,8 @@ import {
import { WorkflowWebsocketService } from
"../workflow-websocket/workflow-websocket.service";
import {
OperatorCurrentTuples,
+ RegionStateEvent,
+ RegionUpdateEvent,
ReplayExecutionInfo,
TexeraWebsocketEvent,
WorkflowFatalError,
@@ -85,7 +87,8 @@ export class ExecuteWorkflowService {
current: ExecutionStateInfo;
}>();
- private regionUpdateStream = new Subject<readonly string[][]>();
+ private regionUpdateStream = new Subject<RegionUpdateEvent>();
+ private regionStateStream = new Subject<RegionStateEvent>();
// TODO: move this to another service, or redesign how this
// information is stored on the frontend.
@@ -102,7 +105,10 @@ export class ExecuteWorkflowService {
workflowWebsocketService.websocketEvent().subscribe(event => {
switch (event.type) {
case "RegionUpdateEvent":
- this.regionUpdateStream.next(event.regions);
+ this.regionUpdateStream.next(event);
+ break;
+ case "RegionStateEvent":
+ this.regionStateStream.next(event);
break;
case "WorkerAssignmentUpdateEvent":
this.assignedWorkerIds.set(event.operatorId, event.workerIds);
@@ -334,10 +340,14 @@ export class ExecuteWorkflowService {
return this.executionStateStream.asObservable();
}
- public getRegionUpdateStream(): Observable<readonly string[][]> {
+ public getRegionUpdateStream(): Observable<RegionUpdateEvent> {
return this.regionUpdateStream.asObservable();
}
+ public getRegionStateStream(): Observable<RegionStateEvent> {
+ return this.regionStateStream.asObservable();
+ }
+
public resetExecutionState(): void {
this.currentState = {
state: ExecutionState.Uninitialized,
diff --git a/frontend/src/app/workspace/types/workflow-websocket.interface.ts
b/frontend/src/app/workspace/types/workflow-websocket.interface.ts
index 3ab8e34385..15e2a2809d 100644
--- a/frontend/src/app/workspace/types/workflow-websocket.interface.ts
+++ b/frontend/src/app/workspace/types/workflow-websocket.interface.ts
@@ -174,7 +174,12 @@ export type ClusterStatusUpdateEvent = Readonly<{
}>;
export type RegionUpdateEvent = Readonly<{
- regions: readonly string[][];
+ regions: readonly [number, string[]][];
+}>;
+
+export type RegionStateEvent = Readonly<{
+ id: number;
+ state: string;
}>;
export type ModifyLogicResponse = Readonly<{
@@ -234,6 +239,7 @@ export type TexeraWebsocketEventTypeMap = {
ExecutionDurationUpdateEvent: ExecutionDurationUpdateEvent;
ClusterStatusUpdateEvent: ClusterStatusUpdateEvent;
RegionUpdateEvent: RegionUpdateEvent;
+ RegionStateEvent: RegionStateEvent;
};
// helper type definitions to generate the request and event types