This is an automated email from the ASF dual-hosted git repository.

linxinyuan pushed a commit to branch xinyuan-sk-testing-1
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-sk-testing-1 by this 
push:
     new 6dc1ae2ce7 update
6dc1ae2ce7 is described below

commit 6dc1ae2ce710008215c7df942c6329ff7e895a94
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Feb 13 20:43:05 2026 -0800

    update
---
 .../input_port_materialization_reader_runnable.py  |   3 +-
 .../apache/texera/amber/operator/LogicalOp.scala   |   2 +-
 .../SklearnTestingOpDesc.scala                     | 100 ---------------------
 3 files changed, 3 insertions(+), 102 deletions(-)

diff --git 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index c82926a60a..e49c0316cc 100644
--- 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++ 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -17,7 +17,7 @@
 
 import typing
 from loguru import logger
-from pyarrow.lib import Table
+from pyarrow import Table
 from typing import Union
 
 from core.architecture.sendsemantics.broad_cast_partitioner import (
@@ -146,6 +146,7 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
                     break
                 # Each tuple is sent to the partitioner and converted to
                 # a batch-based iterator.
+                tup.cast_to_schema(self.tuple_schema)
                 for data_frame in self.tuple_to_batch_with_filter(tup):
                     self.emit_payload(data_frame)
             self.emit_ecm("EndChannel", 
EmbeddedControlMessageType.PORT_ALIGNMENT)
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index ed6e33c3ba..a575d5b018 100644
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -137,7 +137,7 @@ import 
org.apache.texera.amber.operator.visualization.volcanoPlot.VolcanoPlotOpD
 import 
org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallChartOpDesc
 import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc
 import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, 
ToStringBuilder}
-import 
org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc.SklearnTestingOpDesc
+import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc
 import 
org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc
 
 import java.util.UUID
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDesc/SklearnTestingOpDesc.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDesc/SklearnTestingOpDesc.scala
deleted file mode 100644
index d81e6d423a..0000000000
--- 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDesc/SklearnTestingOpDesc.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc
-
-import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
-import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
-import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
-import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, 
PortIdentity}
-import org.apache.texera.amber.operator.PythonOperatorDescriptor
-import org.apache.texera.amber.operator.metadata.annotations.{
-  AutofillAttributeName,
-  AutofillAttributeNameOnPort1
-}
-import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
-import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString
-import 
org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext
-
-class SklearnTestingOpDesc extends PythonOperatorDescriptor {
-  @JsonSchemaTitle("Model Attribute")
-  @JsonProperty(required = true, defaultValue = "model")
-  @JsonPropertyDescription("Attribute corresponding to ML model")
-  @AutofillAttributeNameOnPort1
-  var model: EncodableString = _
-
-  @JsonSchemaTitle("Target Attribute")
-  @JsonPropertyDescription("Attribute in your dataset corresponding to 
target.")
-  @JsonProperty(required = true)
-  @AutofillAttributeName
-  var target: EncodableString = _
-
-  override def generatePythonCode(): String =
-    pyb"""from pytexera import *
-       |from sklearn.metrics import accuracy_score, f1_score, precision_score, 
recall_score
-       |class ProcessTupleOperator(UDFOperatorV2):
-       |    @overrides
-       |    def open(self) -> None:
-       |        self.data = []
-       |    @overrides
-       |    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
-       |        if port == 0:
-       |            self.data.append(tuple_)
-       |        else:
-       |            model = tuple_[$model]
-       |            table = Table(self.data)
-       |            Y = table[$target]
-       |            X = table.drop($target, axis=1)
-       |            predictions = model.predict(X)
-       |            tuple_["accuracy"] = round(accuracy_score(Y, predictions), 
4)
-       |            tuple_["f1"] = f1_score(Y, predictions, average = 
"weighted")
-       |            tuple_["precision"] = precision_score(Y, predictions)
-       |            tuple_["recall"] = recall_score(Y, predictions)
-       |            yield tuple_""".encode
-
-  override def operatorInfo: OperatorInfo =
-    OperatorInfo(
-      "Sklearn Testing",
-      "It will generate F1, precision, accuracy and recall for a trained 
Sklearn model",
-      OperatorGroupConstants.SKLEARN_GROUP,
-      inputPorts = List(
-        InputPort(PortIdentity(), "data"),
-        InputPort(
-          PortIdentity(1),
-          "model",
-          dependencies = List(PortIdentity()),
-          allowMultiLinks = true
-        )
-      ),
-      outputPorts = List(OutputPort())
-    )
-
-  override def getOutputSchemas(
-      inputSchemas: Map[PortIdentity, Schema]
-  ): Map[PortIdentity, Schema] = {
-    val inputSchema = inputSchemas(operatorInfo.inputPorts(1).id)
-    Map(
-      operatorInfo.outputPorts.head.id -> inputSchema
-        .add("accuracy", AttributeType.DOUBLE)
-        .add("f1", AttributeType.DOUBLE)
-        .add("precision", AttributeType.DOUBLE)
-        .add("recall", AttributeType.DOUBLE)
-    )
-  }
-}

Reply via email to