This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-loop
in repository https://gitbox.apache.org/repos/asf/texera.git

commit a268872bc97e3c5a8ff97d71f4e1aacea2348f95
Author: Xinyuan Lin <[email protected]>
AuthorDate: Mon Sep 29 13:45:56 2025 -0700

    init
---
 core/gui/src/assets/operator_images/LoopEnd.png    | Bin 0 -> 5865 bytes
 core/gui/src/assets/operator_images/LoopStart.png  | Bin 0 -> 2138 bytes
 .../uci/ics/amber/core/workflow/PhysicalOp.scala   |   2 +-
 .../edu/uci/ics/amber/operator/LogicalOp.scala     |  52 +++---------------
 .../ics/amber/operator/loop/LoopEndOpDesc.scala    |  53 ++++++++++++++++++
 .../ics/amber/operator/loop/LoopEndOpExec.scala    |  27 +++++++++
 .../ics/amber/operator/loop/LoopStartOpDesc.scala  |  61 +++++++++++++++++++++
 .../ics/amber/operator/loop/LoopStartOpExec.scala  |  27 +++++++++
 8 files changed, 177 insertions(+), 45 deletions(-)

diff --git a/core/gui/src/assets/operator_images/LoopEnd.png 
b/core/gui/src/assets/operator_images/LoopEnd.png
new file mode 100644
index 0000000000..ee0f9ab6fa
Binary files /dev/null and b/core/gui/src/assets/operator_images/LoopEnd.png 
differ
diff --git a/core/gui/src/assets/operator_images/LoopStart.png 
b/core/gui/src/assets/operator_images/LoopStart.png
new file mode 100644
index 0000000000..7e5be023cd
Binary files /dev/null and b/core/gui/src/assets/operator_images/LoopStart.png 
differ
diff --git 
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala
 
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala
index 386c9e4beb..ceb07f2cb0 100644
--- 
a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala
+++ 
b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala
@@ -494,7 +494,7 @@ case class PhysicalOp(
     * outputs all its tuples.
     */
   def isOutputLinkBlocking(link: PhysicalLink): Boolean = {
-    this.outputPorts(link.fromPortId)._1.blocking
+    this.outputPorts(link.fromPortId)._1.blocking || 
link.toOpId.logicalOpId.id.contains("LoopStart") || 
link.fromOpId.logicalOpId.id.contains("LoopEnd")
   }
 
   /**
diff --git 
a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala
 
b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala
index 95bb2f87d6..c559dd7997 100644
--- 
a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala
+++ 
b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala
@@ -24,11 +24,7 @@ import com.fasterxml.jackson.annotation._
 import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
 import edu.uci.ics.amber.core.executor.OperatorExecutor
 import edu.uci.ics.amber.core.tuple.Schema
-import edu.uci.ics.amber.core.virtualidentity.{
-  ExecutionIdentity,
-  OperatorIdentity,
-  WorkflowIdentity
-}
+import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, 
OperatorIdentity, WorkflowIdentity}
 import edu.uci.ics.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, 
DEFAULT_WORKFLOW_ID}
 import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, PortIdentity}
 import edu.uci.ics.amber.operator.aggregate.AggregateOpDesc
@@ -39,22 +35,15 @@ import edu.uci.ics.amber.operator.distinct.DistinctOpDesc
 import edu.uci.ics.amber.operator.dummy.DummyOpDesc
 import edu.uci.ics.amber.operator.filter.SpecializedFilterOpDesc
 import edu.uci.ics.amber.operator.hashJoin.HashJoinOpDesc
-import edu.uci.ics.amber.operator.huggingFace.{
-  HuggingFaceIrisLogisticRegressionOpDesc,
-  HuggingFaceSentimentAnalysisOpDesc,
-  HuggingFaceSpamSMSDetectionOpDesc,
-  HuggingFaceTextSummarizationOpDesc
-}
+import 
edu.uci.ics.amber.operator.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc,
 HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, 
HuggingFaceTextSummarizationOpDesc}
 import edu.uci.ics.amber.operator.ifStatement.IfOpDesc
 import edu.uci.ics.amber.operator.intersect.IntersectOpDesc
 import edu.uci.ics.amber.operator.intervalJoin.IntervalJoinOpDesc
 import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc
 import edu.uci.ics.amber.operator.limit.LimitOpDesc
+import edu.uci.ics.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc}
 import 
edu.uci.ics.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc
-import edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{
-  SklearnAdvancedKNNClassifierTrainerOpDesc,
-  SklearnAdvancedKNNRegressorTrainerOpDesc
-}
+import 
edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{SklearnAdvancedKNNClassifierTrainerOpDesc,
 SklearnAdvancedKNNRegressorTrainerOpDesc}
 import 
edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.SVCTrainer.SklearnAdvancedSVCTrainerOpDesc
 import 
edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.SVRTrainer.SklearnAdvancedSVRTrainerOpDesc
 import edu.uci.ics.amber.operator.metadata.{OPVersion, OperatorInfo, 
PropertyNameConstants}
@@ -64,38 +53,11 @@ import edu.uci.ics.amber.operator.regex.RegexOpDesc
 import edu.uci.ics.amber.operator.reservoirsampling.ReservoirSamplingOpDesc
 import edu.uci.ics.amber.operator.sklearn._
 import edu.uci.ics.amber.operator.sleep.SleepOpDesc
-import edu.uci.ics.amber.operator.sklearn.training.{
-  SklearnTrainingAdaptiveBoostingOpDesc,
-  SklearnTrainingBaggingOpDesc,
-  SklearnTrainingBernoulliNaiveBayesOpDesc,
-  SklearnTrainingComplementNaiveBayesOpDesc,
-  SklearnTrainingDecisionTreeOpDesc,
-  SklearnTrainingDummyClassifierOpDesc,
-  SklearnTrainingExtraTreeOpDesc,
-  SklearnTrainingExtraTreesOpDesc,
-  SklearnTrainingGaussianNaiveBayesOpDesc,
-  SklearnTrainingGradientBoostingOpDesc,
-  SklearnTrainingKNNOpDesc,
-  SklearnTrainingLinearSVMOpDesc,
-  SklearnTrainingMultiLayerPerceptronOpDesc,
-  SklearnTrainingMultinomialNaiveBayesOpDesc,
-  SklearnTrainingNearestCentroidOpDesc,
-  SklearnTrainingPassiveAggressiveOpDesc,
-  SklearnTrainingPerceptronOpDesc,
-  SklearnTrainingProbabilityCalibrationOpDesc,
-  SklearnTrainingRandomForestOpDesc,
-  SklearnTrainingRidgeCVOpDesc,
-  SklearnTrainingRidgeOpDesc,
-  SklearnTrainingSDGOpDesc,
-  SklearnTrainingSVMOpDesc
-}
+import 
edu.uci.ics.amber.operator.sklearn.training.{SklearnTrainingAdaptiveBoostingOpDesc,
 SklearnTrainingBaggingOpDesc, SklearnTrainingBernoulliNaiveBayesOpDesc, 
SklearnTrainingComplementNaiveBayesOpDesc, SklearnTrainingDecisionTreeOpDesc, 
SklearnTrainingDummyClassifierOpDesc, SklearnTrainingExtraTreeOpDesc, 
SklearnTrainingExtraTreesOpDesc, SklearnTrainingGaussianNaiveBayesOpDesc, 
SklearnTrainingGradientBoostingOpDesc, SklearnTrainingKNNOpDesc, 
SklearnTrainingLinearSVMOpDesc, SklearnTra [...]
 import edu.uci.ics.amber.operator.sort.SortOpDesc
 import edu.uci.ics.amber.operator.sortPartitions.SortPartitionsOpDesc
 import edu.uci.ics.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc
-import edu.uci.ics.amber.operator.source.apis.twitter.v2.{
-  TwitterFullArchiveSearchSourceOpDesc,
-  TwitterSearchSourceOpDesc
-}
+import 
edu.uci.ics.amber.operator.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc,
 TwitterSearchSourceOpDesc}
 import edu.uci.ics.amber.operator.source.fetcher.URLFetcherOpDesc
 import edu.uci.ics.amber.operator.source.scan.FileScanSourceOpDesc
 import edu.uci.ics.amber.operator.source.scan.arrow.ArrowSourceOpDesc
@@ -216,6 +178,8 @@ trait StateTransferFunc
     new Type(value = classOf[AsterixDBSourceOpDesc], name = "AsterixDBSource"),
     new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"),
     new Type(value = classOf[LimitOpDesc], name = "Limit"),
+    new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"),
+    new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"),
     new Type(value = classOf[SleepOpDesc], name = "Sleep"),
     new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"),
     new Type(value = classOf[ReservoirSamplingOpDesc], name = 
"ReservoirSampling"),
diff --git 
a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpDesc.scala
 
b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpDesc.scala
new file mode 100644
index 0000000000..b7e743ead3
--- /dev/null
+++ 
b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpDesc.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package edu.uci.ics.amber.operator.loop
+
+import edu.uci.ics.amber.core.executor.OpExecWithClassName
+import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort, PhysicalOp}
+import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import edu.uci.ics.amber.operator.LogicalOp
+
+class LoopEndOpDesc extends LogicalOp {
+  override def getPhysicalOp(
+                              workflowId: WorkflowIdentity,
+                              executionId: ExecutionIdentity
+                            ): PhysicalOp = {
+    PhysicalOp
+      .oneToOnePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
+        OpExecWithClassName("edu.uci.ics.amber.operator.loop.LoopEndOpExec")
+      )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+      .withSuggestedWorkerNum(1)
+  }
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      "Loop End",
+      "Loop End",
+      OperatorGroupConstants.CONTROL_GROUP,
+      inputPorts = List(InputPort()),
+      outputPorts = List(OutputPort())
+    )
+}
diff --git 
a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpExec.scala
 
b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpExec.scala
new file mode 100644
index 0000000000..9eea8a1dfc
--- /dev/null
+++ 
b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpExec.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package edu.uci.ics.amber.operator.loop
+
+import edu.uci.ics.amber.core.executor.OperatorExecutor
+import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
+
+class LoopEndOpExec extends OperatorExecutor {
+  override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = 
Iterator(tuple)
+}
\ No newline at end of file
diff --git 
a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpDesc.scala
 
b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpDesc.scala
new file mode 100644
index 0000000000..3baaba4465
--- /dev/null
+++ 
b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpDesc.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package edu.uci.ics.amber.operator.loop
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import edu.uci.ics.amber.core.executor.OpExecWithClassName
+import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort, PhysicalOp}
+import edu.uci.ics.amber.operator.LogicalOp
+import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+import edu.uci.ics.amber.util.JSONUtils.objectMapper
+
+class LoopStartOpDesc extends LogicalOp {
+
+  override def getPhysicalOp(
+                              workflowId: WorkflowIdentity,
+                              executionId: ExecutionIdentity
+                            ): PhysicalOp = {
+    PhysicalOp
+      .oneToOnePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
+        OpExecWithClassName(
+          "edu.uci.ics.amber.operator.loop.LoopStartOpExec",
+        )
+      )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+      .withSuggestedWorkerNum(1)
+      .withParallelizable(false)
+  }
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      "Loop Start",
+      "Loop Start",
+      OperatorGroupConstants.CONTROL_GROUP,
+      inputPorts = List(InputPort()),
+      outputPorts = List(OutputPort())
+    )
+
+}
diff --git 
a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpExec.scala
 
b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpExec.scala
new file mode 100644
index 0000000000..0f9b62e169
--- /dev/null
+++ 
b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpExec.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package edu.uci.ics.amber.operator.loop
+
+import edu.uci.ics.amber.core.executor.OperatorExecutor
+import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
+
+class LoopStartOpExec extends OperatorExecutor {
+  override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = 
Iterator(tuple)
+}
\ No newline at end of file

Reply via email to