This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-cm-for-loop-mat-dcm
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-cm-for-loop-mat-dcm by 
this push:
     new acbb0b5b40 fix
acbb0b5b40 is described below

commit acbb0b5b402c76bd98be3b580eb9d0a4dc9336c0
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed Feb 4 11:17:41 2026 -0800

    fix
---
 .../engine/architecture/worker/DataProcessor.scala |  1 +
 .../promisehandlers/EndIterationHandler.scala      | 32 +++++++++++-----------
 2 files changed, 17 insertions(+), 16 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
index dedfc57802..adbc97bfe4 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
@@ -173,6 +173,7 @@ class DataProcessor(
           EndIterationRequest(worker)
         )
         outputManager.ECMWriterThreads(portId).putOne(new 
Tuple(ResultSchema.ecmSchema, Array(worker.name)))
+        outputManager.ECMWriterThreads(portId).close()
         outputManager.closeOutputStorageWriterIfNeeded(portId)
         
asyncRPCClient.controllerInterface.iterationCompleted(IterationCompletedRequest(portId),
 asyncRPCClient.mkContext(CONTROLLER))
         executor.reset()
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
index fe76fb93bb..80869dc17f 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala
@@ -38,24 +38,24 @@ trait EndIterationHandler {
       case _: LoopEndOpExec =>
         workerInterface.nextIteration(EmptyRequest(), 
mkContext(request.worker))
       case _ =>
-        val channelId = dp.inputManager.currentChannelId
-        val portId = dp.inputGateway.getChannel(channelId).getPortId
-        dp.inputManager.getPort(portId).completed = true
-        dp.inputManager.initBatch(channelId, Array.empty)
-        dp.processOnFinish()
+    }
+    val channelId = dp.inputManager.currentChannelId
+    val portId = dp.inputGateway.getChannel(channelId).getPortId
+    dp.inputManager.getPort(portId).completed = true
+    dp.inputManager.initBatch(channelId, Array.empty)
+    dp.processOnFinish()
 
-        dp.outputManager.outputIterator.appendSpecialTupleToEnd(
-          FinalizePort(portId, input = true)
-        )
+    dp.outputManager.outputIterator.appendSpecialTupleToEnd(
+      FinalizePort(portId, input = true)
+    )
 
-        if (dp.inputManager.getAllPorts.forall(portId => 
dp.inputManager.isPortCompleted(portId))) {
-          // Need this check for handling input port dependency relationships.
-          // See documentation of isMissingOutputPort
-          if (!dp.outputManager.isMissingOutputPort) {
-            // assuming all the output ports finalize after all input ports 
are finalized.
-            dp.outputManager.finalizeIteration(request.worker)
-          }
-        }
+    if (dp.inputManager.getAllPorts.forall(portId => 
dp.inputManager.isPortCompleted(portId))) {
+      // Need this check for handling input port dependency relationships.
+      // See documentation of isMissingOutputPort
+      if (!dp.outputManager.isMissingOutputPort) {
+        // assuming all the output ports finalize after all input ports are 
finalized.
+        dp.outputManager.finalizeIteration(request.worker)
+      }
     }
     EmptyReturn()
   }

Reply via email to