This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-cm-for-loop-mat-dcm
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 4fb2a6b463b5352b61178e3dcd58daa266d59ee6
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Feb 4 08:51:18 2026 -0800

    init
---
 .../engine/architecture/rpc/controlcommands.proto  |  6 ++
 .../architecture/rpc/controllerservice.proto       |  1 +
 .../engine/architecture/rpc/controlreturns.proto   |  1 +
 .../ControllerAsyncRPCHandlerInitializer.scala     |  1 +
 .../IterationCompletedHandler.scala                | 78 ++++++++++++++++++++++
 .../messaginglayer/OutputManager.scala             | 20 +++---
 .../scheduling/RegionExecutionCoordinator.scala    | 14 +++-
 .../scheduling/WorkflowExecutionCoordinator.scala  | 24 ++++++-
 .../engine/architecture/worker/DataProcessor.scala |  7 +-
 .../managers/OutputPortResultWriterThread.scala    | 15 ++---
 .../promisehandlers/EndIterationHandler.scala      |  1 +
 .../promisehandlers/NextIterationHandler.scala     | 12 +---
 .../apache/texera/amber/engine/common/Utils.scala  |  2 +
 .../workflow-editor/workflow-editor.component.ts   |  1 +
 .../workspace/service/joint-ui/joint-ui.service.ts |  3 +
 .../workspace/types/execute-workflow.interface.ts  |  1 +
 16 files changed, 153 insertions(+), 34 deletions(-)

diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
index c2816408d1..0943e459ee 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
@@ -46,6 +46,7 @@ message ControlRequest {
     PortCompletedRequest portCompletedRequest = 9;
     WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
     LinkWorkersRequest linkWorkersRequest = 11;
+    IterationCompletedRequest iterationCompletedRequest = 12;
 
     // request for worker
     AddInputChannelRequest addInputChannelRequest = 50;
@@ -169,6 +170,11 @@ message PortCompletedRequest {
   bool input = 2;
 }
 
+// Notify controller that an output port has finished one iteration (used by 
loop operators).
+message IterationCompletedRequest {
+  core.PortIdentity portId = 1 [(scalapb.field).no_box = true];
+}
+
 message WorkerStateUpdatedRequest {
   worker.WorkerState state = 1 [(scalapb.field).no_box = true];
 }
diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
index 70d189a341..35d88dcf0f 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto
@@ -37,6 +37,7 @@ service ControllerService {
   rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns 
(EvaluatePythonExpressionResponse);
   rpc ConsoleMessageTriggered(ConsoleMessageTriggeredRequest) returns 
(EmptyReturn);
   rpc PortCompleted(PortCompletedRequest) returns (EmptyReturn);
+  rpc IterationCompleted(IterationCompletedRequest) returns (EmptyReturn);
   rpc StartWorkflow(EmptyRequest) returns (StartWorkflowResponse);
   rpc ResumeWorkflow(EmptyRequest) returns (EmptyReturn);
   rpc PauseWorkflow(EmptyRequest) returns (EmptyReturn);
diff --git 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto
 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto
index 43613b5cfd..98405c91b7 100644
--- 
a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto
+++ 
b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlreturns.proto
@@ -122,6 +122,7 @@ enum WorkflowAggregatedState {
   PAUSED = 4;
   RESUMING = 5;
   COMPLETED = 6;
+  ITERATION_COMPLETED = 11;
   FAILED = 7;
   UNKNOWN = 8;
   KILLED = 9;
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
index 4d9a36bab4..d0add4789b 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala
@@ -40,6 +40,7 @@ class ControllerAsyncRPCHandlerInitializer(
     with ResumeHandler
     with StartWorkflowHandler
     with PortCompletedHandler
+    with IterationCompletedHandler
     with ConsoleMessageHandler
     with RetryWorkflowHandler
     with EvaluatePythonExpressionHandler
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala
new file mode 100644
index 0000000000..2aab1e4be0
--- /dev/null
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/IterationCompletedHandler.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.controller.promisehandlers
+
+import com.twitter.util.Future
+import org.apache.texera.amber.core.WorkflowRuntimeException
+import org.apache.texera.amber.core.workflow.GlobalPortIdentity
+import org.apache.texera.amber.engine.architecture.controller.{
+  ControllerAsyncRPCHandlerInitializer,
+  FatalError
+}
+import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
+  AsyncRPCContext,
+  IterationCompletedRequest,
+  QueryStatisticsRequest
+}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
+import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
+import org.apache.texera.amber.util.VirtualIdentityUtils
+
+/** Notify controller that a worker has completed an iteration on an output 
port.
+  *
+  * This is different from [[PortCompletedHandler]]: a port can have multiple 
iterations
+  * (e.g., loop execution) before the whole port is fully completed.
+  */
+trait IterationCompletedHandler {
+  this: ControllerAsyncRPCHandlerInitializer =>
+
+  override def iterationCompleted(
+      msg: IterationCompletedRequest,
+      ctx: AsyncRPCContext
+  ): Future[EmptyReturn] = {
+    controllerInterface
+      
.controllerInitiateQueryStatistics(QueryStatisticsRequest(scala.Seq(ctx.sender)),
 CONTROLLER)
+      .map { _ =>
+        val globalPortId = GlobalPortIdentity(
+          VirtualIdentityUtils.getPhysicalOpId(ctx.sender),
+          msg.portId
+        )
+
+        cp.workflowExecutionCoordinator.getRegionOfPortId(globalPortId) match {
+          case Some(region) =>
+            // Emit UI-only IterationCompleted phase for this region.
+            
cp.workflowExecutionCoordinator.markRegionIterationCompletedIfNeeded(region)
+
+            // Keep scheduler running
+            cp.workflowExecutionCoordinator
+              .coordinateRegionExecutors(cp.actorService)
+              .onFailure {
+                case err: WorkflowRuntimeException =>
+                  sendToClient(FatalError(err, err.relatedWorkerId))
+                case other =>
+                  sendToClient(FatalError(other, None))
+              }
+          case None =>
+        }
+
+        EmptyReturn()
+      }
+  }
+}
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 0bc826c42d..bb8843ff1c 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -118,6 +118,10 @@ class OutputManager(
       : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
     mutable.HashMap()
 
+  val ECMWriterThreads
+  : mutable.HashMap[PortIdentity, BufferedItemWriter[Tuple]] =
+    mutable.HashMap()
+
   /**
     * Add down stream operator and its corresponding Partitioner.
     *
@@ -209,7 +213,7 @@ class OutputManager(
     * @param outputPortId If not specified, the tuple will be written to all 
output ports that need storage.
     */
   def saveTupleToStorageIfNeeded(
-      tuple: Either[Tuple, String],
+      tuple: Tuple,
       outputPortId: Option[PortIdentity] = None
   ): Unit = {
     (outputPortId match {
@@ -239,7 +243,6 @@ class OutputManager(
         writerThread.join()
       case None =>
     }
-
   }
 
   def getPort(portId: PortIdentity): WorkerPort = ports(portId)
@@ -281,19 +284,18 @@ class OutputManager(
   }
 
   private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: 
URI): Unit = {
-    val bufferedTupleWriter = DocumentFactory
-      .openDocument(storageUri)
-      ._1
+    this.ECMWriterThreads(portId) = DocumentFactory
+      .createDocument(storageUri.resolve("ecm"), ResultSchema.ecmSchema)
       .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
       .asInstanceOf[BufferedItemWriter[Tuple]]
 
-    val ecmUri = storageUri.resolve("ecm")
-    val bufferedECMWriter = DocumentFactory
-      .createDocument(ecmUri, ResultSchema.ecmSchema)
+    val bufferedTupleWriter = DocumentFactory
+      .openDocument(storageUri)
+      ._1
       .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
       .asInstanceOf[BufferedItemWriter[Tuple]]
 
-    val writerThread = new OutputPortResultWriterThread(bufferedTupleWriter, 
bufferedECMWriter)
+    val writerThread = new OutputPortResultWriterThread(bufferedTupleWriter)
     this.outputPortResultWriterThreads(portId) = writerThread
     writerThread.start()
   }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 7e5b228801..84b18a48b5 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -103,6 +103,7 @@ class RegionExecutionCoordinator(
   private case object Unexecuted extends RegionExecutionPhase
   private case object ExecutingDependeePortsPhase extends RegionExecutionPhase
   private case object ExecutingNonDependeePortsPhase extends 
RegionExecutionPhase
+  private case object IterationCompleted extends RegionExecutionPhase
   private case object Completed extends RegionExecutionPhase
 
   private val currentPhaseRef: AtomicReference[RegionExecutionPhase] = new 
AtomicReference(
@@ -190,7 +191,8 @@ class RegionExecutionCoordinator(
     }
   }
 
-  def isCompleted: Boolean = currentPhaseRef.get == Completed
+  // Treat IterationCompleted as not completed from scheduler perspective.
+  def isCompleted: Boolean = currentPhaseRef.get == Completed || 
currentPhaseRef.get == IterationCompleted
 
   /**
     * This will sync and transition the region execution phase from one to 
another depending on its current phase:
@@ -219,6 +221,9 @@ class RegionExecutionCoordinator(
         }
       case ExecutingNonDependeePortsPhase =>
         tryCompleteRegionExecution()
+      case IterationCompleted =>
+        // IterationCompleted is a UI/observability phase; scheduling doesn't 
advance on it.
+        Future.Unit
       case Completed =>
         // Already completed, no further action needed.
         Future.Unit
@@ -543,6 +548,13 @@ class RegionExecutionCoordinator(
     }
   }
 
+  /** Emit IterationCompleted region phase to frontend. This does not affect 
scheduling semantics. */
+  def setIterationCompletedPhase(): Unit = {
+    if (currentPhaseRef.get != Completed) {
+      setPhase(IterationCompleted)
+    }
+  }
+
   private def setPhase(phase: RegionExecutionPhase): Unit = {
     currentPhaseRef.set(phase)
     SessionState.getAllSessionStates.foreach { state =>
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
index 05585f88d8..8939c2f4f3 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/WorkflowExecutionCoordinator.scala
@@ -27,7 +27,8 @@ import org.apache.texera.amber.engine.architecture.common.{
   AkkaActorService
 }
 import org.apache.texera.amber.engine.architecture.controller.ControllerConfig
-import 
org.apache.texera.amber.engine.architecture.controller.execution.WorkflowExecution
+import 
org.apache.texera.amber.engine.architecture.controller.execution.{OperatorExecution,
 RegionExecution, WorkflowExecution}
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState
 import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient
 
 import scala.collection.mutable
@@ -102,6 +103,27 @@ class WorkflowExecutionCoordinator(
       .unit
   }
 
+  /**
+    * Mark the given operator as iteration-completed.
+    *
+    * This is a UI/observability state only; it doesn't change the 
worker-level state machine.
+    */
+  def markOperatorIterationCompleted(operatorExecution: OperatorExecution): 
Unit = {
+    // No-op placeholder: operator aggregated state is computed from 
WorkerState.
+    // IterationCompleted is currently an observability/UI state driven by 
region phase events.
+    ()
+  }
+
+  /**
+    * If all operators in the region have reached ITERATION_COMPLETED (or 
COMPLETED), mark the region as
+    * ITERATION_COMPLETED (region is still considered not completed, so 
scheduling remains unchanged).
+    */
+  def markRegionIterationCompletedIfNeeded(region: Region): Unit = {
+    // Best-effort: tell region coordinator to emit an IterationCompleted 
phase event.
+    // This doesn't affect scheduling/completion semantics.
+    
regionExecutionCoordinators.get(region.id).foreach(_.setIterationCompletedPhase())
+  }
+
   def getRegionOfLink(link: PhysicalLink): Region = {
     getExecutingRegions.find(region => region.getLinks.contains(link)).get
   }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
index 72d9932bcc..dedfc57802 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
@@ -24,6 +24,7 @@ import com.softwaremill.macwire.wire
 import io.grpc.MethodDescriptor
 import org.apache.texera.amber.core.executor.OperatorExecutor
 import org.apache.texera.amber.core.state.State
+import org.apache.texera.amber.core.storage.result.ResultSchema
 import org.apache.texera.amber.core.tuple._
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity, EmbeddedControlMessageIdentity}
 import org.apache.texera.amber.core.workflow.PortIdentity
@@ -171,15 +172,15 @@ class DataProcessor(
           PORT_ALIGNMENT,
           EndIterationRequest(worker)
         )
-        outputManager.saveTupleToStorageIfNeeded(Right("Iteration number = 0, 
" + worker.name), outputPortOpt)
+        outputManager.ECMWriterThreads(portId).putOne(new 
Tuple(ResultSchema.ecmSchema, Array(worker.name)))
         outputManager.closeOutputStorageWriterIfNeeded(portId)
-        
asyncRPCClient.controllerInterface.portCompleted(PortCompletedRequest(portId, 
input = false), asyncRPCClient.mkContext(CONTROLLER)) // fix this line, add 
iteration completed rpc
+        
asyncRPCClient.controllerInterface.iterationCompleted(IterationCompletedRequest(portId),
 asyncRPCClient.mkContext(CONTROLLER))
         executor.reset()
       case schemaEnforceable: SchemaEnforceable =>
         val portIdentity = 
outputPortOpt.getOrElse(outputManager.getSingleOutputPortIdentity)
         val tuple = 
schemaEnforceable.enforceSchema(outputManager.getPort(portIdentity).schema)
         statisticsManager.increaseOutputStatistics(portIdentity, 
tuple.inMemSize)
-        outputManager.saveTupleToStorageIfNeeded(Left(tuple), outputPortOpt)
+        outputManager.saveTupleToStorageIfNeeded(tuple, outputPortOpt)
 
       case other => // skip for now
     }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
index f8d0710aa8..188a08d28e 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
@@ -30,28 +30,21 @@ sealed trait TerminateSignal
 case object PortStorageWriterTerminateSignal extends TerminateSignal
 
 class OutputPortResultWriterThread(
-    bufferedTupleWriter: BufferedItemWriter[Tuple],
-    bufferedECMWriter: BufferedItemWriter[Tuple]
+    bufferedTupleWriter: BufferedItemWriter[Tuple]
 ) extends Thread {
 
-  val queue: LinkedBlockingQueue[Either[Either[Tuple, String], 
TerminateSignal]] =
-    Queues.newLinkedBlockingQueue[Either[Either[Tuple, String], 
TerminateSignal]]()
+  val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] =
+    Queues.newLinkedBlockingQueue[Either[Tuple, TerminateSignal]]()
 
   override def run(): Unit = {
     var internalStop = false
     while (!internalStop) {
       val queueContent = queue.take()
       queueContent match {
-        case Left(item) => item match {
-          case Left(tuple) => bufferedTupleWriter.putOne(tuple)
-          case Right(ecm)    =>
-            val ecmTuple = new Tuple(ResultSchema.ecmSchema, Array(ecm))
-            bufferedECMWriter.putOne(ecmTuple)
-      }
+        case Left(tuple) => bufferedTupleWriter.putOne(tuple)
         case Right(_)    => internalStop = true
       }
     }
     bufferedTupleWriter.close()
-    bufferedECMWriter.close()
   }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
index d38cf9854f..fe76fb93bb 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
@@ -23,6 +23,7 @@ import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPC
 import 
org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
 import com.twitter.util.Future
 import org.apache.texera.amber.core.tuple.FinalizePort
+import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import org.apache.texera.amber.operator.loop.{LoopEndOpExec, LoopStartOpExec}
 
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala
index ed8cc475d6..0bf1691ed5 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/NextIterationHandler.scala
@@ -20,10 +20,8 @@
 package org.apache.texera.amber.engine.architecture.worker.promisehandlers
 
 import com.twitter.util.Future
-import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
-  AsyncRPCContext,
-  EmptyRequest
-}
+import org.apache.texera.amber.core.tuple.FinalizeIteration
+import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext,
 EmptyRequest}
 import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
 import 
org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer
 import org.apache.texera.amber.operator.loop.LoopStartOpExec
@@ -36,11 +34,7 @@ trait NextIterationHandler {
       ctx: AsyncRPCContext
   ): Future[EmptyReturn] = {
     dp.processOnFinish()
-    if (dp.executor.asInstanceOf[LoopStartOpExec].checkCondition()) {
-      dp.outputManager.finalizeIteration(dp.actorId)
-    } else {
-      dp.outputManager.finalizeOutput()
-    }
+    dp.outputManager.finalizeIteration(dp.actorId)
     EmptyReturn()
   }
 }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala 
b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala
index dc074c1094..488b5e0205 100644
--- a/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala
+++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/Utils.scala
@@ -98,6 +98,7 @@ object Utils extends LazyLogging {
       case WorkflowAggregatedState.PAUSING       => "Pausing"
       case WorkflowAggregatedState.PAUSED        => "Paused"
       case WorkflowAggregatedState.RESUMING      => "Resuming"
+      case WorkflowAggregatedState.ITERATION_COMPLETED => "IterationCompleted"
       case WorkflowAggregatedState.COMPLETED     => "Completed"
       case WorkflowAggregatedState.TERMINATED    => "Terminated"
       case WorkflowAggregatedState.FAILED        => "Failed"
@@ -117,6 +118,7 @@ object Utils extends LazyLogging {
       case "pausing"       => WorkflowAggregatedState.PAUSING
       case "paused"        => WorkflowAggregatedState.PAUSED
       case "resuming"      => WorkflowAggregatedState.RESUMING
+      case "iterationcompleted" => WorkflowAggregatedState.ITERATION_COMPLETED
       case "completed"     => WorkflowAggregatedState.COMPLETED
       case "failed"        => WorkflowAggregatedState.FAILED
       case "killed"        => WorkflowAggregatedState.KILLED
diff --git 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
index b23f92caf3..08bc601c4a 100644
--- 
a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
+++ 
b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts
@@ -384,6 +384,7 @@ export class WorkflowEditorComponent implements OnInit, 
AfterViewInit, OnDestroy
         const colorMap: Record<string, string> = {
           ExecutingDependeePortsPhase: "rgba(33,150,243,0.2)",
           ExecutingNonDependeePortsPhase: "rgba(255,213,79,0.2)",
+          IterationCompleted: "rgba(156,39,176,0.2)",
           Completed: "rgba(76,175,80,0.2)",
         };
         this.paper.getModelById("region-" + region.id).attr("body/fill", 
colorMap[region.state]);
diff --git a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts 
b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
index 77458947cd..f54068ad63 100644
--- a/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
+++ b/frontend/src/app/workspace/service/joint-ui/joint-ui.service.ts
@@ -401,6 +401,9 @@ export class JointUIService {
       case OperatorState.Ready:
         fillColor = "#a6bd37";
         break;
+      case OperatorState.IterationCompleted:
+        fillColor = "#9C27B0";
+        break;
       case OperatorState.Completed:
         fillColor = "green";
         break;
diff --git a/frontend/src/app/workspace/types/execute-workflow.interface.ts 
b/frontend/src/app/workspace/types/execute-workflow.interface.ts
index 23ade23199..0033ac39c3 100644
--- a/frontend/src/app/workspace/types/execute-workflow.interface.ts
+++ b/frontend/src/app/workspace/types/execute-workflow.interface.ts
@@ -74,6 +74,7 @@ export enum OperatorState {
   Pausing = "Pausing",
   Paused = "Paused",
   Resuming = "Resuming",
+  IterationCompleted = "IterationCompleted",
   Completed = "Completed",
   Recovering = "Recovering",
 }

Reply via email to