This is an automated email from the ASF dual-hosted git repository. linxinyuan pushed a commit to branch xinyuan-cm-for-loop in repository https://gitbox.apache.org/repos/asf/texera.git
commit fae4293eee4da118153ea4108708c2bcb72eb5a9 Author: Xinyuan Lin <[email protected]> AuthorDate: Sat Nov 8 21:00:57 2025 -0800 init --- .../messaginglayer/OutputManager.scala | 4 ++ .../amber/core/executor/OperatorExecutor.scala | 5 ++ .../org/apache/amber/core/tuple/TupleLike.scala | 2 + .../org/apache/amber/operator/LogicalOp.scala | 20 ++----- .../apache/amber/operator/loop/LoopEndOpDesc.scala | 53 +++++++++++++++++ .../apache/amber/operator/loop/LoopEndOpExec.scala | 8 +++ .../amber/operator/loop/LoopStartOpDesc.scala | 66 +++++++++++++++++++++ .../amber/operator/loop/LoopStartOpExec.scala | 47 +++++++++++++++ frontend/src/assets/operator_images/LoopEnd.png | Bin 0 -> 5865 bytes frontend/src/assets/operator_images/LoopStart.png | Bin 0 -> 2138 bytes 10 files changed, 191 insertions(+), 14 deletions(-) diff --git a/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala index e786ea432e..ac8764c7be 100644 --- a/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -260,6 +260,10 @@ class OutputManager( outputIterator.appendSpecialTupleToEnd(FinalizeExecutor()) } + def finalizeIteration(worker: ActorVirtualIdentity): Unit = { + outputIterator.appendSpecialTupleToEnd(FinalizeIteration(worker)) + } + /** * This method is only used for ensuring correct region execution. Some operators may have input port dependency * relationships, for which we currently use a two-phase region execution scheme. (See `RegionExecutionCoordinator` diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala index 69e62a8f30..c4839b584d 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/executor/OperatorExecutor.scala @@ -56,4 +56,9 @@ trait OperatorExecutor { def close(): Unit = {} + def reset(): Unit = { + close() + open() + } + } diff --git a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala index a96da69d40..8e789fc2cc 100644 --- a/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala +++ b/common/workflow-core/src/main/scala/org/apache/amber/core/tuple/TupleLike.scala @@ -19,6 +19,7 @@ package org.apache.amber.core.tuple +import org.apache.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.amber.core.workflow.PortIdentity import scala.jdk.CollectionConverters.CollectionHasAsScala @@ -41,6 +42,7 @@ trait InternalMarker extends TupleLike { final case class FinalizePort(portId: PortIdentity, input: Boolean) extends InternalMarker final case class FinalizeExecutor() extends InternalMarker +final case class FinalizeIteration(worker: ActorVirtualIdentity) extends InternalMarker trait SeqTupleLike extends TupleLike with SchemaEnforceable { override def inMemSize: Long = ??? diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala index 6fb27d92c3..05cf952610 100644 --- a/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/LogicalOp.scala @@ -35,22 +35,15 @@ import org.apache.amber.operator.distinct.DistinctOpDesc import org.apache.amber.operator.dummy.DummyOpDesc import org.apache.amber.operator.filter.SpecializedFilterOpDesc import org.apache.amber.operator.hashJoin.HashJoinOpDesc -import org.apache.amber.operator.huggingFace.{ - HuggingFaceIrisLogisticRegressionOpDesc, - HuggingFaceSentimentAnalysisOpDesc, - HuggingFaceSpamSMSDetectionOpDesc, - HuggingFaceTextSummarizationOpDesc -} +import org.apache.amber.operator.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc, HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, HuggingFaceTextSummarizationOpDesc} import org.apache.amber.operator.ifStatement.IfOpDesc import org.apache.amber.operator.intersect.IntersectOpDesc import org.apache.amber.operator.intervalJoin.IntervalJoinOpDesc import org.apache.amber.operator.keywordSearch.KeywordSearchOpDesc import org.apache.amber.operator.limit.LimitOpDesc +import org.apache.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import org.apache.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc -import org.apache.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{ - SklearnAdvancedKNNClassifierTrainerOpDesc, - SklearnAdvancedKNNRegressorTrainerOpDesc -} +import org.apache.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{SklearnAdvancedKNNClassifierTrainerOpDesc, SklearnAdvancedKNNRegressorTrainerOpDesc} import org.apache.amber.operator.machineLearning.sklearnAdvanced.SVCTrainer.SklearnAdvancedSVCTrainerOpDesc import org.apache.amber.operator.machineLearning.sklearnAdvanced.SVRTrainer.SklearnAdvancedSVRTrainerOpDesc import org.apache.amber.operator.metadata.{OPVersion, OperatorInfo, PropertyNameConstants} @@ -64,10 +57,7 @@ import org.apache.amber.operator.sleep.SleepOpDesc import org.apache.amber.operator.sort.{SortOpDesc, StableMergeSortOpDesc} import org.apache.amber.operator.sortPartitions.SortPartitionsOpDesc import org.apache.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc -import org.apache.amber.operator.source.apis.twitter.v2.{ - TwitterFullArchiveSearchSourceOpDesc, - TwitterSearchSourceOpDesc -} +import org.apache.amber.operator.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc, TwitterSearchSourceOpDesc} import org.apache.amber.operator.source.fetcher.URLFetcherOpDesc import org.apache.amber.operator.source.scan.FileScanSourceOpDesc import org.apache.amber.operator.source.scan.arrow.ArrowSourceOpDesc @@ -195,6 +185,8 @@ trait StateTransferFunc new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"), new Type(value = classOf[LimitOpDesc], name = "Limit"), new Type(value = classOf[SleepOpDesc], name = "Sleep"), + new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"), + new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"), new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"), new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"), new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"), diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala new file mode 100644 index 0000000000..8ebe91483c --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpDesc.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.operator.loop + +import org.apache.amber.core.executor.OpExecWithClassName +import org.apache.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.amber.operator.LogicalOp +import org.apache.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopEndOpDesc extends LogicalOp { + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName("edu.uci.ics.amber.operator.loop.LoopEndOpExec") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop End", + "Loop End", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) +} \ No newline at end of file diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala new file mode 100644 index 0000000000..a98081f181 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopEndOpExec.scala @@ -0,0 +1,8 @@ +package org.apache.amber.operator.loop + +import org.apache.amber.core.executor.OperatorExecutor +import org.apache.amber.core.tuple.{Tuple, TupleLike} + +class LoopEndOpExec extends OperatorExecutor { + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) +} \ No newline at end of file diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala new file mode 100644 index 0000000000..67d7a503c3 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpDesc.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.amber.core.executor.OpExecWithClassName +import org.apache.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.amber.operator.LogicalOp +import org.apache.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.amber.util.JSONUtils.objectMapper + +class LoopStartOpDesc extends LogicalOp { + + @JsonProperty(required = true) + @JsonSchemaTitle("Iteration Number") + var iteration: Int = _ + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "edu.uci.ics.amber.operator.loop.LoopStartOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop Start", + "Loop Start", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + +} \ No newline at end of file diff --git a/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala new file mode 100644 index 0000000000..cd9698701e --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/amber/operator/loop/LoopStartOpExec.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.amber.operator.loop + +import org.apache.amber.core.executor.OperatorExecutor +import org.apache.amber.core.tuple.{Tuple, TupleLike} +import org.apache.amber.util.JSONUtils.objectMapper + +import scala.collection.mutable + +class LoopStartOpExec(descString: String) extends OperatorExecutor { + private val desc: LoopStartOpDesc = objectMapper.readValue(descString, classOf[LoopStartOpDesc]) + private val data = new mutable.ArrayBuffer[Tuple] + private var currentIteration = 0 + + def checkCondition(): Boolean = { + desc.iteration > currentIteration + } + + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { + data.append(tuple) + Iterator.empty + } + + override def onFinish(port: Int): Iterator[TupleLike] = { + currentIteration += 1 + data.iterator + } + +} \ No newline at end of file diff --git a/frontend/src/assets/operator_images/LoopEnd.png b/frontend/src/assets/operator_images/LoopEnd.png new file mode 100644 index 0000000000..ee0f9ab6fa Binary files /dev/null and b/frontend/src/assets/operator_images/LoopEnd.png differ diff --git a/frontend/src/assets/operator_images/LoopStart.png b/frontend/src/assets/operator_images/LoopStart.png new file mode 100644 index 0000000000..7e5be023cd Binary files /dev/null and b/frontend/src/assets/operator_images/LoopStart.png differ
