This is an automated email from the ASF dual-hosted git repository. linxinyuan pushed a commit to branch xinyuan-loop in repository https://gitbox.apache.org/repos/asf/texera.git
commit a268872bc97e3c5a8ff97d71f4e1aacea2348f95 Author: Xinyuan Lin <[email protected]> AuthorDate: Mon Sep 29 13:45:56 2025 -0700 init --- core/gui/src/assets/operator_images/LoopEnd.png | Bin 0 -> 5865 bytes core/gui/src/assets/operator_images/LoopStart.png | Bin 0 -> 2138 bytes .../uci/ics/amber/core/workflow/PhysicalOp.scala | 2 +- .../edu/uci/ics/amber/operator/LogicalOp.scala | 52 +++--------------- .../ics/amber/operator/loop/LoopEndOpDesc.scala | 53 ++++++++++++++++++ .../ics/amber/operator/loop/LoopEndOpExec.scala | 27 +++++++++ .../ics/amber/operator/loop/LoopStartOpDesc.scala | 61 +++++++++++++++++++++ .../ics/amber/operator/loop/LoopStartOpExec.scala | 27 +++++++++ 8 files changed, 177 insertions(+), 45 deletions(-) diff --git a/core/gui/src/assets/operator_images/LoopEnd.png b/core/gui/src/assets/operator_images/LoopEnd.png new file mode 100644 index 0000000000..ee0f9ab6fa Binary files /dev/null and b/core/gui/src/assets/operator_images/LoopEnd.png differ diff --git a/core/gui/src/assets/operator_images/LoopStart.png b/core/gui/src/assets/operator_images/LoopStart.png new file mode 100644 index 0000000000..7e5be023cd Binary files /dev/null and b/core/gui/src/assets/operator_images/LoopStart.png differ diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala index 386c9e4beb..ceb07f2cb0 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/workflow/PhysicalOp.scala @@ -494,7 +494,7 @@ case class PhysicalOp( * outputs all its tuples. */ def isOutputLinkBlocking(link: PhysicalLink): Boolean = { - this.outputPorts(link.fromPortId)._1.blocking + this.outputPorts(link.fromPortId)._1.blocking || link.toOpId.logicalOpId.id.contains("LoopStart") || link.fromOpId.logicalOpId.id.contains("LoopEnd") } /** diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala index 95bb2f87d6..c559dd7997 100644 --- a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/LogicalOp.scala @@ -24,11 +24,7 @@ import com.fasterxml.jackson.annotation._ import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import edu.uci.ics.amber.core.executor.OperatorExecutor import edu.uci.ics.amber.core.tuple.Schema -import edu.uci.ics.amber.core.virtualidentity.{ - ExecutionIdentity, - OperatorIdentity, - WorkflowIdentity -} +import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} import edu.uci.ics.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID} import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, PortIdentity} import edu.uci.ics.amber.operator.aggregate.AggregateOpDesc @@ -39,22 +35,15 @@ import edu.uci.ics.amber.operator.distinct.DistinctOpDesc import edu.uci.ics.amber.operator.dummy.DummyOpDesc import edu.uci.ics.amber.operator.filter.SpecializedFilterOpDesc import edu.uci.ics.amber.operator.hashJoin.HashJoinOpDesc -import edu.uci.ics.amber.operator.huggingFace.{ - HuggingFaceIrisLogisticRegressionOpDesc, - HuggingFaceSentimentAnalysisOpDesc, - HuggingFaceSpamSMSDetectionOpDesc, - HuggingFaceTextSummarizationOpDesc -} +import edu.uci.ics.amber.operator.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc, HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, HuggingFaceTextSummarizationOpDesc} import edu.uci.ics.amber.operator.ifStatement.IfOpDesc import edu.uci.ics.amber.operator.intersect.IntersectOpDesc import edu.uci.ics.amber.operator.intervalJoin.IntervalJoinOpDesc import edu.uci.ics.amber.operator.keywordSearch.KeywordSearchOpDesc import edu.uci.ics.amber.operator.limit.LimitOpDesc +import edu.uci.ics.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import edu.uci.ics.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc -import edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{ - SklearnAdvancedKNNClassifierTrainerOpDesc, - SklearnAdvancedKNNRegressorTrainerOpDesc -} +import edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{SklearnAdvancedKNNClassifierTrainerOpDesc, SklearnAdvancedKNNRegressorTrainerOpDesc} import edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.SVCTrainer.SklearnAdvancedSVCTrainerOpDesc import edu.uci.ics.amber.operator.machineLearning.sklearnAdvanced.SVRTrainer.SklearnAdvancedSVRTrainerOpDesc import edu.uci.ics.amber.operator.metadata.{OPVersion, OperatorInfo, PropertyNameConstants} @@ -64,38 +53,11 @@ import edu.uci.ics.amber.operator.regex.RegexOpDesc import edu.uci.ics.amber.operator.reservoirsampling.ReservoirSamplingOpDesc import edu.uci.ics.amber.operator.sklearn._ import edu.uci.ics.amber.operator.sleep.SleepOpDesc -import edu.uci.ics.amber.operator.sklearn.training.{ - SklearnTrainingAdaptiveBoostingOpDesc, - SklearnTrainingBaggingOpDesc, - SklearnTrainingBernoulliNaiveBayesOpDesc, - SklearnTrainingComplementNaiveBayesOpDesc, - SklearnTrainingDecisionTreeOpDesc, - SklearnTrainingDummyClassifierOpDesc, - SklearnTrainingExtraTreeOpDesc, - SklearnTrainingExtraTreesOpDesc, - SklearnTrainingGaussianNaiveBayesOpDesc, - SklearnTrainingGradientBoostingOpDesc, - SklearnTrainingKNNOpDesc, - SklearnTrainingLinearSVMOpDesc, - SklearnTrainingMultiLayerPerceptronOpDesc, - SklearnTrainingMultinomialNaiveBayesOpDesc, - SklearnTrainingNearestCentroidOpDesc, - SklearnTrainingPassiveAggressiveOpDesc, - SklearnTrainingPerceptronOpDesc, - SklearnTrainingProbabilityCalibrationOpDesc, - SklearnTrainingRandomForestOpDesc, - SklearnTrainingRidgeCVOpDesc, - SklearnTrainingRidgeOpDesc, - SklearnTrainingSDGOpDesc, - SklearnTrainingSVMOpDesc -} +import edu.uci.ics.amber.operator.sklearn.training.{SklearnTrainingAdaptiveBoostingOpDesc, SklearnTrainingBaggingOpDesc, SklearnTrainingBernoulliNaiveBayesOpDesc, SklearnTrainingComplementNaiveBayesOpDesc, SklearnTrainingDecisionTreeOpDesc, SklearnTrainingDummyClassifierOpDesc, SklearnTrainingExtraTreeOpDesc, SklearnTrainingExtraTreesOpDesc, SklearnTrainingGaussianNaiveBayesOpDesc, SklearnTrainingGradientBoostingOpDesc, SklearnTrainingKNNOpDesc, SklearnTrainingLinearSVMOpDesc, SklearnTra [...] import edu.uci.ics.amber.operator.sort.SortOpDesc import edu.uci.ics.amber.operator.sortPartitions.SortPartitionsOpDesc import edu.uci.ics.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc -import edu.uci.ics.amber.operator.source.apis.twitter.v2.{ - TwitterFullArchiveSearchSourceOpDesc, - TwitterSearchSourceOpDesc -} +import edu.uci.ics.amber.operator.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc, TwitterSearchSourceOpDesc} import edu.uci.ics.amber.operator.source.fetcher.URLFetcherOpDesc import edu.uci.ics.amber.operator.source.scan.FileScanSourceOpDesc import edu.uci.ics.amber.operator.source.scan.arrow.ArrowSourceOpDesc @@ -216,6 +178,8 @@ trait StateTransferFunc new Type(value = classOf[AsterixDBSourceOpDesc], name = "AsterixDBSource"), new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"), new Type(value = classOf[LimitOpDesc], name = "Limit"), + new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"), + new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"), new Type(value = classOf[SleepOpDesc], name = "Sleep"), new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"), new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"), diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpDesc.scala new file mode 100644 index 0000000000..b7e743ead3 --- /dev/null +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpDesc.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package edu.uci.ics.amber.operator.loop + +import edu.uci.ics.amber.core.executor.OpExecWithClassName +import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import edu.uci.ics.amber.operator.LogicalOp + +class LoopEndOpDesc extends LogicalOp { + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName("edu.uci.ics.amber.operator.loop.LoopEndOpExec") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop End", + "Loop End", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) +} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpExec.scala new file mode 100644 index 0000000000..9eea8a1dfc --- /dev/null +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopEndOpExec.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package edu.uci.ics.amber.operator.loop + +import edu.uci.ics.amber.core.executor.OperatorExecutor +import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} + +class LoopEndOpExec extends OperatorExecutor { + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) +} \ No newline at end of file diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpDesc.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpDesc.scala new file mode 100644 index 0000000000..3baaba4465 --- /dev/null +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpDesc.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package edu.uci.ics.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import edu.uci.ics.amber.core.executor.OpExecWithClassName +import edu.uci.ics.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import edu.uci.ics.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import edu.uci.ics.amber.operator.LogicalOp +import edu.uci.ics.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import edu.uci.ics.amber.util.JSONUtils.objectMapper + +class LoopStartOpDesc extends LogicalOp { + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "edu.uci.ics.amber.operator.loop.LoopStartOpExec", + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop Start", + "Loop Start", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + +} diff --git a/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpExec.scala b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpExec.scala new file mode 100644 index 0000000000..0f9b62e169 --- /dev/null +++ b/core/workflow-operator/src/main/scala/edu/uci/ics/amber/operator/loop/LoopStartOpExec.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package edu.uci.ics.amber.operator.loop + +import edu.uci.ics.amber.core.executor.OperatorExecutor +import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike} + +class LoopStartOpExec extends OperatorExecutor { + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) +} \ No newline at end of file
