This is an automated email from the ASF dual-hosted git repository. linxinyuan pushed a commit to branch xinyuan-sk-testing-1 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 82111527731b17f4d5729c7ace5da9ac929ccbab Author: Xinyuan Lin <[email protected]> AuthorDate: Fri Feb 13 15:02:02 2026 -0800 init --- .../apache/texera/amber/operator/LogicalOp.scala | 34 +++----- .../SklearnTestingOpDesc.scala | 95 +++++++++++++++++++++ .../src/assets/operator_images/SklearnTesting.png | Bin 0 -> 843070 bytes 3 files changed, 105 insertions(+), 24 deletions(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index eb319a82d1..6d80335a19 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -24,15 +24,8 @@ import com.fasterxml.jackson.annotation._ import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import org.apache.texera.amber.core.executor.OperatorExecutor import org.apache.texera.amber.core.tuple.Schema -import org.apache.texera.amber.core.virtualidentity.{ - ExecutionIdentity, - OperatorIdentity, - WorkflowIdentity -} -import org.apache.texera.amber.core.workflow.WorkflowContext.{ - DEFAULT_EXECUTION_ID, - DEFAULT_WORKFLOW_ID -} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID} import org.apache.texera.amber.core.workflow.{PhysicalOp, PhysicalPlan, PortIdentity} import org.apache.texera.amber.operator.aggregate.AggregateOpDesc import org.apache.texera.amber.operator.cartesianProduct.CartesianProductOpDesc @@ -42,22 +35,14 @@ import org.apache.texera.amber.operator.distinct.DistinctOpDesc import org.apache.texera.amber.operator.dummy.DummyOpDesc import org.apache.texera.amber.operator.filter.SpecializedFilterOpDesc import org.apache.texera.amber.operator.hashJoin.HashJoinOpDesc -import org.apache.texera.amber.operator.huggingFace.{ - HuggingFaceIrisLogisticRegressionOpDesc, - HuggingFaceSentimentAnalysisOpDesc, - HuggingFaceSpamSMSDetectionOpDesc, - HuggingFaceTextSummarizationOpDesc -} +import org.apache.texera.amber.operator.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc, HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, HuggingFaceTextSummarizationOpDesc} import org.apache.texera.amber.operator.ifStatement.IfOpDesc import org.apache.texera.amber.operator.intersect.IntersectOpDesc import org.apache.texera.amber.operator.intervalJoin.IntervalJoinOpDesc import org.apache.texera.amber.operator.keywordSearch.KeywordSearchOpDesc import org.apache.texera.amber.operator.limit.LimitOpDesc import org.apache.texera.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc -import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{ - SklearnAdvancedKNNClassifierTrainerOpDesc, - SklearnAdvancedKNNRegressorTrainerOpDesc -} +import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{SklearnAdvancedKNNClassifierTrainerOpDesc, SklearnAdvancedKNNRegressorTrainerOpDesc} import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.SVCTrainer.SklearnAdvancedSVCTrainerOpDesc import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.SVRTrainer.SklearnAdvancedSVRTrainerOpDesc import org.apache.texera.amber.operator.metadata.{OPVersion, OperatorInfo, PropertyNameConstants} @@ -71,10 +56,7 @@ import org.apache.texera.amber.operator.sleep.SleepOpDesc import org.apache.texera.amber.operator.sort.{SortOpDesc, StableMergeSortOpDesc} import org.apache.texera.amber.operator.sortPartitions.SortPartitionsOpDesc import org.apache.texera.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc -import org.apache.texera.amber.operator.source.apis.twitter.v2.{ - TwitterFullArchiveSearchSourceOpDesc, - TwitterSearchSourceOpDesc -} +import org.apache.texera.amber.operator.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc, TwitterSearchSourceOpDesc} import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc @@ -137,6 +119,7 @@ import org.apache.texera.amber.operator.visualization.volcanoPlot.VolcanoPlotOpD import org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallChartOpDesc import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, ToStringBuilder} +import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc.SklearnTestingOpDesc import org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc import java.util.UUID @@ -407,7 +390,10 @@ trait StateTransferFunc new Type( value = classOf[SklearnAdvancedSVRTrainerOpDesc], name = "SVRTrainer" - ) + ), + new Type( + value = classOf[SklearnTestingOpDesc], + name = "SklearnTesting") ) ) abstract class LogicalOp extends PortDescriptor with Serializable { diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDesc/SklearnTestingOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDesc/SklearnTestingOpDesc.scala new file mode 100644 index 0000000000..2cae6b3e19 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDesc/SklearnTestingOpDesc.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc + +import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.tuple.{AttributeType, Schema} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PortIdentity} +import org.apache.texera.amber.operator.PythonOperatorDescriptor +import org.apache.texera.amber.operator.metadata.annotations.{AutofillAttributeName, AutofillAttributeNameOnPort1} +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class SklearnTestingOpDesc extends PythonOperatorDescriptor { + @JsonSchemaTitle("Model Attribute") + @JsonProperty(required = true, defaultValue = "model") + @JsonPropertyDescription("Attribute corresponding to ML model") + @AutofillAttributeNameOnPort1 + var model: String = _ + + @JsonSchemaTitle("Target Attribute") + @JsonPropertyDescription("Attribute in your dataset corresponding to target.") + @JsonProperty(required = true) + @AutofillAttributeName + var target: String = _ + + override def generatePythonCode(): String = + s"""from pytexera import * + |from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def open(self) -> None: + | self.data = [] + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + | if port == 0: + | self.data.append(tuple_) + | else: + | model = tuple_["$model"] + | table = Table(self.data) + | Y = table["$target"] + | X = table.drop("$target", axis=1) + | predictions = model.predict(X) + | tuple_["accuracy"] = round(accuracy_score(Y, predictions), 4) + | tuple_["f1"] = f1_score(Y, predictions) + | tuple_["precision"] = precision_score(Y, predictions) + | tuple_["recall"] = recall_score(Y, predictions) + | yield tuple_""".stripMargin + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Sklearn Testing", + "Sklearn Testing Operator", + OperatorGroupConstants.SKLEARN_GROUP, + inputPorts = List( + InputPort(PortIdentity(), "data"), + InputPort( + PortIdentity(1), + "model", + dependencies = List(PortIdentity()), + allowMultiLinks = true + ) + ), + outputPorts = List(OutputPort()) + ) + + override def getOutputSchemas( + inputSchemas: Map[PortIdentity, Schema] + ): Map[PortIdentity, Schema] = { + val inputSchema = inputSchemas(operatorInfo.inputPorts(1).id) + Map( + operatorInfo.outputPorts.head.id -> inputSchema + .add("accuracy", AttributeType.DOUBLE) + .add("f1", AttributeType.DOUBLE) + .add("precision", AttributeType.DOUBLE) + .add("recall", AttributeType.DOUBLE) + ) + } +} \ No newline at end of file diff --git a/frontend/src/assets/operator_images/SklearnTesting.png b/frontend/src/assets/operator_images/SklearnTesting.png new file mode 100644 index 0000000000..b90f8853fb Binary files /dev/null and b/frontend/src/assets/operator_images/SklearnTesting.png differ
