This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-cm-for-loop-mat
in repository https://gitbox.apache.org/repos/asf/texera.git

commit d6edcfd23c842c003a91c4d887811c8fe11f07bc
Author: Xinyuan Lin <[email protected]>
AuthorDate: Tue Jan 27 23:53:36 2026 -0800

    init
---
 .../messaginglayer/OutputManager.scala             | 27 +++++++++++-----------
 .../engine/architecture/worker/DataProcessor.scala |  4 +++-
 .../InputPortMaterializationReaderThread.scala     | 10 ++++++++
 .../managers/OutputPortResultWriterThread.scala    | 18 +++++++++++----
 common/config/src/main/resources/storage.conf      |  3 +++
 .../apache/texera/amber/config/StorageConfig.scala |  2 ++
 .../amber/core/storage/DocumentFactory.scala       |  2 ++
 .../texera/amber/core/storage/VFSURIFactory.scala  |  1 +
 .../amber/core/storage/result/ResultSchema.scala   |  4 ++++
 9 files changed, 52 insertions(+), 19 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index 9a5ff22439..1fa43cda4a 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -20,22 +20,16 @@
 package org.apache.texera.amber.engine.architecture.messaginglayer
 
 import org.apache.texera.amber.core.state.State
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSResourceType}
 import org.apache.texera.amber.core.storage.model.BufferedItemWriter
+import org.apache.texera.amber.core.storage.result.ResultSchema
 import org.apache.texera.amber.core.tuple._
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
 import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity}
-import 
org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{
-  DPOutputIterator,
-  getBatchSize,
-  toPartitioner
-}
+import 
org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{DPOutputIterator,
 getBatchSize, toPartitioner}
 import org.apache.texera.amber.engine.architecture.sendsemantics.partitioners._
 import 
org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
-import org.apache.texera.amber.engine.architecture.worker.managers.{
-  OutputPortResultWriterThread,
-  PortStorageWriterTerminateSignal
-}
+import 
org.apache.texera.amber.engine.architecture.worker.managers.{OutputPortResultWriterThread,
 PortStorageWriterTerminateSignal}
 import org.apache.texera.amber.engine.common.AmberLogging
 import org.apache.texera.amber.util.VirtualIdentityUtils
 
@@ -215,7 +209,7 @@ class OutputManager(
     * @param outputPortId If not specified, the tuple will be written to all 
output ports that need storage.
     */
   def saveTupleToStorageIfNeeded(
-      tuple: Tuple,
+      tuple: Either[Tuple, String],
       outputPortId: Option[PortIdentity] = None
   ): Unit = {
     (outputPortId match {
@@ -284,12 +278,19 @@ class OutputManager(
   }
 
   private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: 
URI): Unit = {
-    val bufferedItemWriter = DocumentFactory
+    val bufferedTupleWriter = DocumentFactory
       .openDocument(storageUri)
       ._1
       .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
       .asInstanceOf[BufferedItemWriter[Tuple]]
-    val writerThread = new OutputPortResultWriterThread(bufferedItemWriter)
+
+    val ecmUri = storageUri.resolve("ecm")
+    val bufferedECMWriter = DocumentFactory
+      .createDocument(ecmUri, ResultSchema.ecmSchema)
+      .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+      .asInstanceOf[BufferedItemWriter[Tuple]]
+
+    val writerThread = new OutputPortResultWriterThread(bufferedTupleWriter, 
bufferedECMWriter)
     this.outputPortResultWriterThreads(portId) = writerThread
     writerThread.start()
   }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
index dae3446826..1ef9dd7d99 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala
@@ -188,13 +188,15 @@ class DataProcessor(
           PORT_ALIGNMENT,
           EndIterationRequest(worker)
         )
+
         executor.reset()
       case schemaEnforceable: SchemaEnforceable =>
         val portIdentity = 
outputPortOpt.getOrElse(outputManager.getSingleOutputPortIdentity)
         val tuple = 
schemaEnforceable.enforceSchema(outputManager.getPort(portIdentity).schema)
         statisticsManager.increaseOutputStatistics(portIdentity, 
tuple.inMemSize)
         outputManager.passTupleToDownstream(tuple, outputPortOpt)
-        outputManager.saveTupleToStorageIfNeeded(tuple, outputPortOpt)
+        outputManager.saveTupleToStorageIfNeeded(Right(actorId.toString), 
outputPortOpt)
+        outputManager.saveTupleToStorageIfNeeded(Left(tuple), outputPortOpt)
 
       case other => // skip for now
     }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index 10fbbc44a2..953bc677af 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -84,6 +84,16 @@ class InputPortMaterializationReaderThread(
     // Notify the input port of start of input channel
     emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT)
     try {
+      val ecm: VirtualDocument[Tuple] = DocumentFactory
+        .openDocument(uri.resolve("ecm"))
+        ._1
+        .asInstanceOf[VirtualDocument[Tuple]]
+      val ecmReadIterator = ecm.get()
+      if (ecmReadIterator.hasNext) {
+        val tuple = ecmReadIterator.next()
+        println("Received ECM tuple: " + tuple)
+      }
+
       val materialization: VirtualDocument[Tuple] = DocumentFactory
         .openDocument(uri)
         ._1
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
index 28e5d2af66..f8d0710aa8 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/OutputPortResultWriterThread.scala
@@ -22,6 +22,7 @@ package 
org.apache.texera.amber.engine.architecture.worker.managers
 import com.google.common.collect.Queues
 import org.apache.texera.amber.core.storage.model.BufferedItemWriter
 import org.apache.texera.amber.core.tuple.Tuple
+import org.apache.texera.amber.core.storage.result.ResultSchema
 
 import java.util.concurrent.LinkedBlockingQueue
 
@@ -29,21 +30,28 @@ sealed trait TerminateSignal
 case object PortStorageWriterTerminateSignal extends TerminateSignal
 
 class OutputPortResultWriterThread(
-    bufferedItemWriter: BufferedItemWriter[Tuple]
+    bufferedTupleWriter: BufferedItemWriter[Tuple],
+    bufferedECMWriter: BufferedItemWriter[Tuple]
 ) extends Thread {
 
-  val queue: LinkedBlockingQueue[Either[Tuple, TerminateSignal]] =
-    Queues.newLinkedBlockingQueue[Either[Tuple, TerminateSignal]]()
+  val queue: LinkedBlockingQueue[Either[Either[Tuple, String], 
TerminateSignal]] =
+    Queues.newLinkedBlockingQueue[Either[Either[Tuple, String], 
TerminateSignal]]()
 
   override def run(): Unit = {
     var internalStop = false
     while (!internalStop) {
       val queueContent = queue.take()
       queueContent match {
-        case Left(tuple) => bufferedItemWriter.putOne(tuple)
+        case Left(item) => item match {
+          case Left(tuple) => bufferedTupleWriter.putOne(tuple)
+          case Right(ecm)    =>
+            val ecmTuple = new Tuple(ResultSchema.ecmSchema, Array(ecm))
+            bufferedECMWriter.putOne(ecmTuple)
+      }
         case Right(_)    => internalStop = true
       }
     }
-    bufferedItemWriter.close()
+    bufferedTupleWriter.close()
+    bufferedECMWriter.close()
   }
 }
diff --git a/common/config/src/main/resources/storage.conf 
b/common/config/src/main/resources/storage.conf
index 85a62b77a3..a158eb8753 100644
--- a/common/config/src/main/resources/storage.conf
+++ b/common/config/src/main/resources/storage.conf
@@ -52,6 +52,9 @@ storage {
             runtime-statistics-namespace = "workflow-runtime-statistics"
             runtime-statistics-namespace = 
${?STORAGE_ICEBERG_TABLE_RUNTIME_STATISTICS_NAMESPACE}
 
+            ecm-namespace = "ecm"
+            ecm-namespace = ${?STORAGE_ICEBERG_TABLE_ECM_NAMESPACE}
+
             commit {
                 batch-size = 4096 # decide the buffer size of our 
IcebergTableWriter
                 batch-size = ${?STORAGE_ICEBERG_TABLE_COMMIT_BATCH_SIZE}
diff --git 
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
 
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
index c5bd330286..8a1aba73b7 100644
--- 
a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
+++ 
b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala
@@ -63,6 +63,7 @@ object StorageConfig {
     conf.getInt("storage.iceberg.table.commit.retry.min-wait-ms")
   val icebergTableCommitMaxRetryWaitMs: Int =
     conf.getInt("storage.iceberg.table.commit.retry.max-wait-ms")
+  val icebergTableECMNamespace: String = 
conf.getString("storage.iceberg.table.ecm-namespace")
 
   // LakeFS specifics
   // lakefsEndpoint is a var because in test we need to override it to point 
to the test container
@@ -116,6 +117,7 @@ object StorageConfig {
   val ENV_ICEBERG_TABLE_COMMIT_NUM_RETRIES = 
"STORAGE_ICEBERG_TABLE_COMMIT_NUM_RETRIES"
   val ENV_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS = 
"STORAGE_ICEBERG_TABLE_COMMIT_MIN_WAIT_MS"
   val ENV_ICEBERG_TABLE_COMMIT_MAX_WAIT_MS = 
"STORAGE_ICEBERG_TABLE_COMMIT_MAX_WAIT_MS"
+  val ENV_ICEBERG_TABLE_ECM_NAMESPACE = "STORAGE_ICEBERG_TABLE_ECM_NAMESPACE"
 
   // LakeFS
   val ENV_LAKEFS_ENDPOINT = "STORAGE_LAKEFS_ENDPOINT"
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
index 4c37c33bb2..5b27170560 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala
@@ -72,6 +72,7 @@ object DocumentFactory {
           case RESULT             => StorageConfig.icebergTableResultNamespace
           case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
           case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
+          case ECM                => StorageConfig.icebergTableECMNamespace
           case _ =>
             throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
         }
@@ -126,6 +127,7 @@ object DocumentFactory {
           case RESULT             => StorageConfig.icebergTableResultNamespace
           case CONSOLE_MESSAGES   => 
StorageConfig.icebergTableConsoleMessagesNamespace
           case RUNTIME_STATISTICS => 
StorageConfig.icebergTableRuntimeStatisticsNamespace
+          case ECM                => StorageConfig.icebergTableECMNamespace
           case _ =>
             throw new IllegalArgumentException(s"Resource type $resourceType 
is not supported")
         }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
index 3513ac5ecd..f4f3a9e4bf 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
@@ -34,6 +34,7 @@ object VFSResourceType extends Enumeration {
   val RESULT: Value = Value("result")
   val RUNTIME_STATISTICS: Value = Value("runtimeStatistics")
   val CONSOLE_MESSAGES: Value = Value("consoleMessages")
+  val ECM: Value = Value("ecm")
 }
 
 object VFSURIFactory {
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala
index ade33283f7..a3978d0c71 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala
@@ -39,4 +39,8 @@ object ResultSchema {
   val consoleMessagesSchema: Schema = new Schema(
     new Attribute("message", AttributeType.STRING)
   )
+
+  val ecmSchema: Schema = new Schema(
+    new Attribute("workerId", AttributeType.STRING)
+  )
 }

Reply via email to