This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-cm-for-loop
in repository https://gitbox.apache.org/repos/asf/texera.git

commit fae4293eee4da118153ea4108708c2bcb72eb5a9
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Nov 8 21:00:57 2025 -0800

    init
---
 .../messaginglayer/OutputManager.scala             |   4 ++
 .../amber/core/executor/OperatorExecutor.scala     |   5 ++
 .../org/apache/amber/core/tuple/TupleLike.scala    |   2 +
 .../org/apache/amber/operator/LogicalOp.scala      |  20 ++-----
 .../apache/amber/operator/loop/LoopEndOpDesc.scala |  53 +++++++++++++++++
 .../apache/amber/operator/loop/LoopEndOpExec.scala |   8 +++
 .../amber/operator/loop/LoopStartOpDesc.scala      |  66 +++++++++++++++++++++
 .../amber/operator/loop/LoopStartOpExec.scala      |  47 +++++++++++++++
 frontend/src/assets/operator_images/LoopEnd.png    | Bin 0 -> 5865 bytes
 frontend/src/assets/operator_images/LoopStart.png  | Bin 0 -> 2138 bytes
 10 files changed, 191 insertions(+), 14 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala
index e786ea432e..ac8764c7be 100644
--- 
a/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -260,6 +260,10 @@ class OutputManager(
     outputIterator.appendSpecialTupleToEnd(FinalizeExecutor())
   }
 
+  def finalizeIteration(worker: ActorVirtualIdentity): Unit = {
+    outputIterator.appendSpecialTupleToEnd(FinalizeIteration(worker))
+  }
+
   /**
     * This method is only used for ensuring correct region execution. Some 
operators may have input port dependency
     * relationships, for which we currently use a two-phase region execution 
scheme.  (See `RegionExecutionCoordinator`
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala
index 69e62a8f30..c4839b584d 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala
@@ -56,4 +56,9 @@ trait OperatorExecutor {
 
   def close(): Unit = {}
 
+  def reset(): Unit = {
+    close()
+    open()
+  }
+
 }
diff --git 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala
 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala
index a96da69d40..8e789fc2cc 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala
@@ -19,6 +19,7 @@
 
 package org.apache.amber.core.tuple
 
+import org.apache.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.amber.core.workflow.PortIdentity
 
 import scala.jdk.CollectionConverters.CollectionHasAsScala
@@ -41,6 +42,7 @@ trait InternalMarker extends TupleLike {
 
 final case class FinalizePort(portId: PortIdentity, input: Boolean) extends 
InternalMarker
 final case class FinalizeExecutor() extends InternalMarker
+final case class FinalizeIteration(worker: ActorVirtualIdentity) extends 
InternalMarker
 
 trait SeqTupleLike extends TupleLike with SchemaEnforceable {
   override def inMemSize: Long = ???
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
index 6fb27d92c3..05cf952610 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala
@@ -35,22 +35,15 @@ import org.apache.amber.operator.distinct.DistinctOpDesc
 import org.apache.amber.operator.dummy.DummyOpDesc
 import org.apache.amber.operator.filter.SpecializedFilterOpDesc
 import org.apache.amber.operator.hashJoin.HashJoinOpDesc
-import org.apache.amber.operator.huggingFace.{
-  HuggingFaceIrisLogisticRegressionOpDesc,
-  HuggingFaceSentimentAnalysisOpDesc,
-  HuggingFaceSpamSMSDetectionOpDesc,
-  HuggingFaceTextSummarizationOpDesc
-}
+import 
org.apache.amber.operator.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc, 
HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, 
HuggingFaceTextSummarizationOpDesc}
 import org.apache.amber.operator.ifStatement.IfOpDesc
 import org.apache.amber.operator.intersect.IntersectOpDesc
 import org.apache.amber.operator.intervalJoin.IntervalJoinOpDesc
 import org.apache.amber.operator.keywordSearch.KeywordSearchOpDesc
 import org.apache.amber.operator.limit.LimitOpDesc
+import org.apache.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc}
 import 
org.apache.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc
-import org.apache.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{
-  SklearnAdvancedKNNClassifierTrainerOpDesc,
-  SklearnAdvancedKNNRegressorTrainerOpDesc
-}
+import 
org.apache.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{SklearnAdvancedKNNClassifierTrainerOpDesc,
 SklearnAdvancedKNNRegressorTrainerOpDesc}
 import 
org.apache.amber.operator.machineLearning.sklearnAdvanced.SVCTrainer.SklearnAdvancedSVCTrainerOpDesc
 import 
org.apache.amber.operator.machineLearning.sklearnAdvanced.SVRTrainer.SklearnAdvancedSVRTrainerOpDesc
 import org.apache.amber.operator.metadata.{OPVersion, OperatorInfo, 
PropertyNameConstants}
@@ -64,10 +57,7 @@ import org.apache.amber.operator.sleep.SleepOpDesc
 import org.apache.amber.operator.sort.{SortOpDesc, StableMergeSortOpDesc}
 import org.apache.amber.operator.sortPartitions.SortPartitionsOpDesc
 import org.apache.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc
-import org.apache.amber.operator.source.apis.twitter.v2.{
-  TwitterFullArchiveSearchSourceOpDesc,
-  TwitterSearchSourceOpDesc
-}
+import 
org.apache.amber.operator.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc,
 TwitterSearchSourceOpDesc}
 import org.apache.amber.operator.source.fetcher.URLFetcherOpDesc
 import org.apache.amber.operator.source.scan.FileScanSourceOpDesc
 import org.apache.amber.operator.source.scan.arrow.ArrowSourceOpDesc
@@ -195,6 +185,8 @@ trait StateTransferFunc
     new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"),
     new Type(value = classOf[LimitOpDesc], name = "Limit"),
     new Type(value = classOf[SleepOpDesc], name = "Sleep"),
+    new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"),
+    new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"),
     new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"),
     new Type(value = classOf[ReservoirSamplingOpDesc], name = 
"ReservoirSampling"),
     new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"),
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala
new file mode 100644
index 0000000000..8ebe91483c
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.loop
+
+import org.apache.amber.core.executor.OpExecWithClassName
+import org.apache.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.amber.core.workflow.{InputPort, OutputPort, PhysicalOp}
+import org.apache.amber.operator.LogicalOp
+import org.apache.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+
+class LoopEndOpDesc extends LogicalOp {
+  override def getPhysicalOp(
+                              workflowId: WorkflowIdentity,
+                              executionId: ExecutionIdentity
+                            ): PhysicalOp = {
+    PhysicalOp
+      .oneToOnePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
+        OpExecWithClassName("edu.uci.ics.amber.operator.loop.LoopEndOpExec")
+      )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+      .withSuggestedWorkerNum(1)
+  }
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      "Loop End",
+      "Loop End",
+      OperatorGroupConstants.CONTROL_GROUP,
+      inputPorts = List(InputPort()),
+      outputPorts = List(OutputPort())
+    )
+}
\ No newline at end of file
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala
new file mode 100644
index 0000000000..a98081f181
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala
@@ -0,0 +1,8 @@
+package org.apache.amber.operator.loop
+
+import org.apache.amber.core.executor.OperatorExecutor
+import org.apache.amber.core.tuple.{Tuple, TupleLike}
+
+class LoopEndOpExec extends OperatorExecutor {
+  override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = 
Iterator(tuple)
+}
\ No newline at end of file
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala
new file mode 100644
index 0000000000..67d7a503c3
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.loop
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.amber.core.executor.OpExecWithClassName
+import org.apache.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.amber.core.workflow.{InputPort, OutputPort, PhysicalOp}
+import org.apache.amber.operator.LogicalOp
+import org.apache.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import org.apache.amber.util.JSONUtils.objectMapper
+
+class LoopStartOpDesc extends LogicalOp {
+
+  @JsonProperty(required = true)
+  @JsonSchemaTitle("Iteration Number")
+  var iteration: Int = _
+
+  override def getPhysicalOp(
+                              workflowId: WorkflowIdentity,
+                              executionId: ExecutionIdentity
+                            ): PhysicalOp = {
+    PhysicalOp
+      .oneToOnePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
+        OpExecWithClassName(
+          "edu.uci.ics.amber.operator.loop.LoopStartOpExec",
+          objectMapper.writeValueAsString(this)
+        )
+      )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+      .withSuggestedWorkerNum(1)
+      .withParallelizable(false)
+  }
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      "Loop Start",
+      "Loop Start",
+      OperatorGroupConstants.CONTROL_GROUP,
+      inputPorts = List(InputPort()),
+      outputPorts = List(OutputPort())
+    )
+
+}
\ No newline at end of file
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala
 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala
new file mode 100644
index 0000000000..cd9698701e
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.amber.operator.loop
+
+import org.apache.amber.core.executor.OperatorExecutor
+import org.apache.amber.core.tuple.{Tuple, TupleLike}
+import org.apache.amber.util.JSONUtils.objectMapper
+
+import scala.collection.mutable
+
+class LoopStartOpExec(descString: String) extends OperatorExecutor {
+  private val desc: LoopStartOpDesc = objectMapper.readValue(descString, 
classOf[LoopStartOpDesc])
+  private val data = new mutable.ArrayBuffer[Tuple]
+  private var currentIteration = 0
+
+  def checkCondition(): Boolean = {
+    desc.iteration > currentIteration
+  }
+
+  override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
+    data.append(tuple)
+    Iterator.empty
+  }
+
+  override def onFinish(port: Int): Iterator[TupleLike] = {
+    currentIteration += 1
+    data.iterator
+  }
+
+}
\ No newline at end of file
diff --git a/frontend/src/assets/operator_images/LoopEnd.png 
b/frontend/src/assets/operator_images/LoopEnd.png
new file mode 100644
index 0000000000..ee0f9ab6fa
Binary files /dev/null and b/frontend/src/assets/operator_images/LoopEnd.png 
differ
diff --git a/frontend/src/assets/operator_images/LoopStart.png 
b/frontend/src/assets/operator_images/LoopStart.png
new file mode 100644
index 0000000000..7e5be023cd
Binary files /dev/null and b/frontend/src/assets/operator_images/LoopStart.png 
differ

Reply via email to