This is an automated email from the ASF dual-hosted git repository. linxinyuan pushed a commit to branch xinyuan-cm-for-loop-mat-dcm in repository https://gitbox.apache.org/repos/asf/texera.git
commit 4fb2a6b463b5352b61178e3dcd58daa266d59ee6 Author: Xinyuan Lin <[email protected]> AuthorDate: Wed Feb 4 08:51:18 2026 -0800 init --- .../engine/architecture/rpc/controlcommands.proto | 6 ++ .../architecture/rpc/controllerservice.proto | 1 + .../engine/architecture/rpc/controlreturns.proto | 1 + .../ControllerAsyncRPCHandlerInitializer.scala | 1 + .../IterationCompletedHandler.scala | 78 ++++++++++++++++++++++ .../messaginglayer/OutputManager.scala | 20 +++--- .../scheduling/RegionExecutionCoordinator.scala | 14 +++- .../scheduling/WorkflowExecutionCoordinator.scala | 24 ++++++- .../engine/architecture/worker/DataProcessor.scala | 7 +- .../managers/OutputPortResultWriterThread.scala | 15 ++--- .../promisehandlers/EndIterationHandler.scala | 1 + .../promisehandlers/NextIterationHandler.scala | 12 +--- .../apache/texera/amber/engine/common/Utils.scala | 2 + .../workflow-editor/workflow-editor.component.ts | 1 + .../workspace/service/joint-ui/joint-ui.service.ts | 3 + .../workspace/types/execute-workflow.interface.ts | 1 + 16 files changed, 153 insertions(+), 34 deletions(-) diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index c2816408d1..0943e459ee 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -46,6 +46,7 @@ message ControlRequest { PortCompletedRequest portCompletedRequest = 9; WorkerStateUpdatedRequest workerStateUpdatedRequest = 10; LinkWorkersRequest linkWorkersRequest = 11; + IterationCompletedRequest iterationCompletedRequest = 12; // request for worker AddInputChannelRequest addInputChannelRequest = 50; @@ -169,6 +170,11 @@ message PortCompletedRequest { bool input = 2; } +// Notify controller that an output port has finished one iteration (used by loop operators). +message IterationCompletedRequest { + core.PortIdentity portId = 1 [(scalapb.field).no_box = true]; +} + message WorkerStateUpdatedRequest { worker.WorkerState state = 1 [(scalapb.field).no_box = true]; } diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto index 70d189a341..35d88dcf0f 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto @@ -37,6 +37,7 @@ service ControllerService { rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatePythonExpressionResponse); rpc ConsoleMessageTriggered(ConsoleMessageTriggeredRequest) returns (EmptyReturn); rpc PortCompleted(PortCompletedRequest) returns (EmptyReturn); + rpc IterationCompleted(IterationCompletedRequest) returns (EmptyReturn); rpc StartWorkflow(EmptyRequest) returns (StartWorkflowResponse); rpc ResumeWorkflow(EmptyRequest) returns (EmptyReturn); rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn); diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto index 43613b5cfd..98405c91b7 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto @@ -122,6 +122,7 @@ enum WorkflowAggregatedState { PAUSED = 4; RESUMING = 5; COMPLETED = 6; + ITERATION_COMPLETED = 11; FAILED = 7; UNKNOWN = 8; KILLED = 9; diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala index 4d9a36bab4..d0add4789b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala @@ -40,6 +40,7 @@ class ControllerAsyncRPCHandlerInitializer( with ResumeHandler with StartWorkflowHandler with PortCompletedHandler + with IterationCompletedHandler with ConsoleMessageHandler with RetryWorkflowHandler with EvaluatePythonExpressionHandler diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala new file mode 100644 index 0000000000..2aab1e4be0 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.controller.promisehandlers + +import com.twitter.util.Future +import org.apache.texera.amber.core.WorkflowRuntimeException +import org.apache.texera.amber.core.workflow.GlobalPortIdentity +import org.apache.texera.amber.engine.architecture.controller.{ + ControllerAsyncRPCHandlerInitializer, + FatalError +} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + IterationCompletedRequest, + QueryStatisticsRequest +} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER +import org.apache.texera.amber.util.VirtualIdentityUtils + +/** Notify controller that a worker has completed an iteration on an output port. + * + * This is different from [[PortCompletedHandler]]: a port can have multiple iterations + * (e.g., loop execution) before the whole port is fully completed. + */ +trait IterationCompletedHandler { + this: ControllerAsyncRPCHandlerInitializer => + + override def iterationCompleted( + msg: IterationCompletedRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + controllerInterface + .controllerInitiateQueryStatistics(QueryStatisticsRequest(scala.Seq(ctx.sender)), CONTROLLER) + .map { _ => + val globalPortId = GlobalPortIdentity( + VirtualIdentityUtils.getPhysicalOpId(ctx.sender), + msg.portId + ) + + cp.workflowExecutionCoordinator.getRegionOfPortId(globalPortId) match { + case Some(region) => + // Emit UI-only IterationCompleted phase for this region. + cp.workflowExecutionCoordinator.markRegionIterationCompletedIfNeeded(region) + + // Keep scheduler running + cp.workflowExecutionCoordinator + .coordinateRegionExecutors(cp.actorService) + .onFailure { + case err: WorkflowRuntimeException => + sendToClient(FatalError(err, err.relatedWorkerId)) + case other => + sendToClient(FatalError(other, None)) + } + case None => + } + + EmptyReturn() + } + } +} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 0bc826c42d..bb8843ff1c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -118,6 +118,10 @@ class OutputManager( : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] = mutable.HashMap() + val ECMWriterThreads + : mutable.HashMap[PortIdentity, BufferedItemWriter[Tuple]] = + mutable.HashMap() + /** * Add down stream operator and its corresponding Partitioner. * @@ -209,7 +213,7 @@ class OutputManager( * @param outputPortId If not specified, the tuple will be written to all output ports that need storage. */ def saveTupleToStorageIfNeeded( - tuple: Either[Tuple, String], + tuple: Tuple, outputPortId: Option[PortIdentity] = None ): Unit = { (outputPortId match { @@ -239,7 +243,6 @@ class OutputManager( writerThread.join() case None => } - } def getPort(portId: PortIdentity): WorkerPort = ports(portId) @@ -281,19 +284,18 @@ class OutputManager( } private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = { - val bufferedTupleWriter = DocumentFactory - .openDocument(storageUri) - ._1 + this.ECMWriterThreads(portId) = DocumentFactory + .createDocument(storageUri.resolve("ecm"), ResultSchema.ecmSchema) .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) .asInstanceOf[BufferedItemWriter[Tuple]] - val ecmUri = storageUri.resolve("ecm") - val bufferedECMWriter = DocumentFactory - .createDocument(ecmUri, ResultSchema.ecmSchema) + val bufferedTupleWriter = DocumentFactory + .openDocument(storageUri) + ._1 .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) .asInstanceOf[BufferedItemWriter[Tuple]] - val writerThread = new OutputPortResultWriterThread(bufferedTupleWriter, bufferedECMWriter) + val writerThread = new OutputPortResultWriterThread(bufferedTupleWriter) this.outputPortResultWriterThreads(portId) = writerThread writerThread.start() } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 7e5b228801..84b18a48b5 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -103,6 +103,7 @@ class RegionExecutionCoordinator( private case object Unexecuted extends RegionExecutionPhase private case object ExecutingDependeePortsPhase extends RegionExecutionPhase private case object ExecutingNonDependeePortsPhase extends RegionExecutionPhase + private case object IterationCompleted extends RegionExecutionPhase private case object Completed extends RegionExecutionPhase private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new AtomicReference( @@ -190,7 +191,8 @@ class RegionExecutionCoordinator( } } - def isCompleted: Boolean = currentPhaseRef.get == Completed + // Treat IterationCompleted as not completed from scheduler perspective. + def isCompleted: Boolean = currentPhaseRef.get == Completed || currentPhaseRef.get == IterationCompleted /** * This will sync and transition the region execution phase from one to another depending on its current phase: @@ -219,6 +221,9 @@ class RegionExecutionCoordinator( } case ExecutingNonDependeePortsPhase => tryCompleteRegionExecution() + case IterationCompleted => + // IterationCompleted is a UI/observability phase; scheduling doesn't advance on it. + Future.Unit case Completed => // Already completed, no further action needed. Future.Unit @@ -543,6 +548,13 @@ class RegionExecutionCoordinator( } } + /** Emit IterationCompleted region phase to frontend. This does not affect scheduling semantics. */ + def setIterationCompletedPhase(): Unit = { + if (currentPhaseRef.get != Completed) { + setPhase(IterationCompleted) + } + } + private def setPhase(phase: RegionExecutionPhase): Unit = { currentPhaseRef.set(phase) SessionState.getAllSessionStates.foreach { state => diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala index 05585f88d8..8939c2f4f3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala @@ -27,7 +27,8 @@ import org.apache.texera.amber.engine.architecture.common.{ AkkaActorService } import org.apache.texera.amber.engine.architecture.controller.ControllerConfig -import org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution +import org.apache.texera.amber.engine.architecture.controller.execution.{OperatorExecution, RegionExecution, WorkflowExecution} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient import scala.collection.mutable @@ -102,6 +103,27 @@ class WorkflowExecutionCoordinator( .unit } + /** + * Mark the given operator as iteration-completed. + * + * This is a UI/observability state only; it doesn't change the worker-level state machine. + */ + def markOperatorIterationCompleted(operatorExecution: OperatorExecution): Unit = { + // No-op placeholder: operator aggregated state is computed from WorkerState. + // IterationCompleted is currently an observability/UI state driven by region phase events. + () + } + + /** + * If all operators in the region have reached ITERATION_COMPLETED (or COMPLETED), mark the region as + * ITERATION_COMPLETED (region is still considered not completed, so scheduling remains unchanged). + */ + def markRegionIterationCompletedIfNeeded(region: Region): Unit = { + // Best-effort: tell region coordinator to emit an IterationCompleted phase event. + // This doesn't affect scheduling/completion semantics. + regionExecutionCoordinators.get(region.id).foreach(_.setIterationCompletedPhase()) + } + def getRegionOfLink(link: PhysicalLink): Region = { getExecutingRegions.find(region => region.getLinks.contains(link)).get } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala index 72d9932bcc..dedfc57802 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala @@ -24,6 +24,7 @@ import com.softwaremill.macwire.wire import io.grpc.MethodDescriptor import org.apache.texera.amber.core.executor.OperatorExecutor import org.apache.texera.amber.core.state.State +import org.apache.texera.amber.core.storage.result.ResultSchema import org.apache.texera.amber.core.tuple._ import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity, EmbeddedControlMessageIdentity} import org.apache.texera.amber.core.workflow.PortIdentity @@ -171,15 +172,15 @@ class DataProcessor( PORT_ALIGNMENT, EndIterationRequest(worker) ) - outputManager.saveTupleToStorageIfNeeded(Right("Iteration number = 0, " + worker.name), outputPortOpt) + outputManager.ECMWriterThreads(portId).putOne(new Tuple(ResultSchema.ecmSchema, Array(worker.name))) outputManager.closeOutputStorageWriterIfNeeded(portId) - asyncRPCClient.controllerInterface.portCompleted(PortCompletedRequest(portId, input = false), asyncRPCClient.mkContext(CONTROLLER)) // fix this line, add iteration completed rpc + asyncRPCClient.controllerInterface.iterationCompleted(IterationCompletedRequest(portId), asyncRPCClient.mkContext(CONTROLLER)) executor.reset() case schemaEnforceable: SchemaEnforceable => val portIdentity = outputPortOpt.getOrElse(outputManager.getSingleOutputPortIdentity) val tuple = schemaEnforceable.enforceSchema(outputManager.getPort(portIdentity).schema) statisticsManager.increaseOutputStatistics(portIdentity, tuple.inMemSize) - outputManager.saveTupleToStorageIfNeeded(Left(tuple), outputPortOpt) + outputManager.saveTupleToStorageIfNeeded(tuple, outputPortOpt) case other => // skip for now } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala index f8d0710aa8..188a08d28e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala @@ -30,28 +30,21 @@ sealed trait TerminateSignal case object PortStorageWriterTerminateSignal extends TerminateSignal class OutputPortResultWriterThread( - bufferedTupleWriter: BufferedItemWriter[Tuple], - bufferedECMWriter: BufferedItemWriter[Tuple] + bufferedTupleWriter: BufferedItemWriter[Tuple] ) extends Thread { - val queue: LinkedBlockingQueue[Either[Either[Tuple, String], TerminateSignal]] = - Queues.newLinkedBlockingQueue[Either[Either[Tuple, String], TerminateSignal]]() + val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] = + Queues.newLinkedBlockingQueue[Either[Tuple, TerminateSignal]]() override def run(): Unit = { var internalStop = false while (!internalStop) { val queueContent = queue.take() queueContent match { - case Left(item) => item match { - case Left(tuple) => bufferedTupleWriter.putOne(tuple) - case Right(ecm) => - val ecmTuple = new Tuple(ResultSchema.ecmSchema, Array(ecm)) - bufferedECMWriter.putOne(ecmTuple) - } + case Left(tuple) => bufferedTupleWriter.putOne(tuple) case Right(_) => internalStop = true } } bufferedTupleWriter.close() - bufferedECMWriter.close() } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala index d38cf9854f..fe76fb93bb 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala @@ -23,6 +23,7 @@ import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPC import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer import com.twitter.util.Future import org.apache.texera.amber.core.tuple.FinalizePort +import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.operator.loop.{LoopEndOpExec, LoopStartOpExec} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala index ed8cc475d6..0bf1691ed5 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala @@ -20,10 +20,8 @@ package org.apache.texera.amber.engine.architecture.worker.promisehandlers import com.twitter.util.Future -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ - AsyncRPCContext, - EmptyRequest -} +import org.apache.texera.amber.core.tuple.FinalizeIteration +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest} import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer import org.apache.texera.amber.operator.loop.LoopStartOpExec @@ -36,11 +34,7 @@ trait NextIterationHandler { ctx: AsyncRPCContext ): Future[EmptyReturn] = { dp.processOnFinish() - if (dp.executor.asInstanceOf[LoopStartOpExec].checkCondition()) { - dp.outputManager.finalizeIteration(dp.actorId) - } else { - dp.outputManager.finalizeOutput() - } + dp.outputManager.finalizeIteration(dp.actorId) EmptyReturn() } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala index dc074c1094..488b5e0205 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala @@ -98,6 +98,7 @@ object Utils extends LazyLogging { case WorkflowAggregatedState.PAUSING => "Pausing" case WorkflowAggregatedState.PAUSED => "Paused" case WorkflowAggregatedState.RESUMING => "Resuming" + case WorkflowAggregatedState.ITERATION_COMPLETED => "IterationCompleted" case WorkflowAggregatedState.COMPLETED => "Completed" case WorkflowAggregatedState.TERMINATED => "Terminated" case WorkflowAggregatedState.FAILED => "Failed" @@ -117,6 +118,7 @@ object Utils extends LazyLogging { case "pausing" => WorkflowAggregatedState.PAUSING case "paused" => WorkflowAggregatedState.PAUSED case "resuming" => WorkflowAggregatedState.RESUMING + case "iterationcompleted" => WorkflowAggregatedState.ITERATION_COMPLETED case "completed" => WorkflowAggregatedState.COMPLETED case "failed" => WorkflowAggregatedState.FAILED case "killed" => WorkflowAggregatedState.KILLED diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index b23f92caf3..08bc601c4a 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -384,6 +384,7 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy const colorMap: Record<string, string> = { ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)", ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)", + IterationCompleted: "rgba(156,39,176,0.2)", Completed: "rgba(76,175,80,0.2)", }; this.paper.getModelById("region-" + region.id).attr("body/fill", colorMap[region.state]); diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts index 77458947cd..f54068ad63 100644 --- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts +++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts @@ -401,6 +401,9 @@ export class JointUIService { case OperatorState.Ready: fillColor = "#a6bd37"; break; + case OperatorState.IterationCompleted: + fillColor = "#9C27B0"; + break; case OperatorState.Completed: fillColor = "green"; break; diff --git a/frontend/src/app/workspace/types/execute-workflow.interface.ts b/frontend/src/app/workspace/types/execute-workflow.interface.ts index 23ade23199..0033ac39c3 100644 --- a/frontend/src/app/workspace/types/execute-workflow.interface.ts +++ b/frontend/src/app/workspace/types/execute-workflow.interface.ts @@ -74,6 +74,7 @@ export enum OperatorState { Pausing = "Pausing", Paused = "Paused", Resuming = "Resuming", + IterationCompleted = "IterationCompleted", Completed = "Completed", Recovering = "Recovering", }
