This is an automated email from the ASF dual-hosted git repository.
linxinyuan pushed a commit to branch xinyuan-sk-testing-1
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-sk-testing-1 by this
push:
new 6dc1ae2ce7 update
6dc1ae2ce7 is described below
commit 6dc1ae2ce710008215c7df942c6329ff7e895a94
Author: Xinyuan Lin <[email protected]>
AuthorDate: Fri Feb 13 20:43:05 2026 -0800
update
---
.../input_port_materialization_reader_runnable.py | 3 +-
.../apache/texera/amber/operator/LogicalOp.scala | 2 +-
.../SklearnTestingOpDesc.scala | 100 ---------------------
3 files changed, 3 insertions(+), 102 deletions(-)
diff --git
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index c82926a60a..e49c0316cc 100644
---
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -17,7 +17,7 @@
import typing
from loguru import logger
-from pyarrow.lib import Table
+from pyarrow import Table
from typing import Union
from core.architecture.sendsemantics.broad_cast_partitioner import (
@@ -146,6 +146,7 @@ class InputPortMaterializationReaderRunnable(Runnable,
Stoppable):
break
# Each tuple is sent to the partitioner and converted to
# a batch-based iterator.
+ tup.cast_to_schema(self.tuple_schema)
for data_frame in self.tuple_to_batch_with_filter(tup):
self.emit_payload(data_frame)
self.emit_ecm("EndChannel",
EmbeddedControlMessageType.PORT_ALIGNMENT)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index ed6e33c3ba..a575d5b018 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -137,7 +137,7 @@ import
org.apache.texera.amber.operator.visualization.volcanoPlot.VolcanoPlotOpD
import
org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallChartOpDesc
import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc
import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder,
ToStringBuilder}
-import
org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc.SklearnTestingOpDesc
+import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc
import
org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc
import java.util.UUID
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDesc/SklearnTestingOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDesc/SklearnTestingOpDesc.scala
deleted file mode 100644
index d81e6d423a..0000000000
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/sklearn/testing/SklearnTestingOpDesc/SklearnTestingOpDesc.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc
-
-import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
-import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
-import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
-import org.apache.texera.amber.core.workflow.{InputPort, OutputPort,
PortIdentity}
-import org.apache.texera.amber.operator.PythonOperatorDescriptor
-import org.apache.texera.amber.operator.metadata.annotations.{
- AutofillAttributeName,
- AutofillAttributeNameOnPort1
-}
-import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
-import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString
-import
org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext
-
-class SklearnTestingOpDesc extends PythonOperatorDescriptor {
- @JsonSchemaTitle("Model Attribute")
- @JsonProperty(required = true, defaultValue = "model")
- @JsonPropertyDescription("Attribute corresponding to ML model")
- @AutofillAttributeNameOnPort1
- var model: EncodableString = _
-
- @JsonSchemaTitle("Target Attribute")
- @JsonPropertyDescription("Attribute in your dataset corresponding to
target.")
- @JsonProperty(required = true)
- @AutofillAttributeName
- var target: EncodableString = _
-
- override def generatePythonCode(): String =
- pyb"""from pytexera import *
- |from sklearn.metrics import accuracy_score, f1_score, precision_score,
recall_score
- |class ProcessTupleOperator(UDFOperatorV2):
- | @overrides
- | def open(self) -> None:
- | self.data = []
- | @overrides
- | def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
- | if port == 0:
- | self.data.append(tuple_)
- | else:
- | model = tuple_[$model]
- | table = Table(self.data)
- | Y = table[$target]
- | X = table.drop($target, axis=1)
- | predictions = model.predict(X)
- | tuple_["accuracy"] = round(accuracy_score(Y, predictions),
4)
- | tuple_["f1"] = f1_score(Y, predictions, average =
"weighted")
- | tuple_["precision"] = precision_score(Y, predictions)
- | tuple_["recall"] = recall_score(Y, predictions)
- | yield tuple_""".encode
-
- override def operatorInfo: OperatorInfo =
- OperatorInfo(
- "Sklearn Testing",
- "It will generate F1, precision, accuracy and recall for a trained
Sklearn model",
- OperatorGroupConstants.SKLEARN_GROUP,
- inputPorts = List(
- InputPort(PortIdentity(), "data"),
- InputPort(
- PortIdentity(1),
- "model",
- dependencies = List(PortIdentity()),
- allowMultiLinks = true
- )
- ),
- outputPorts = List(OutputPort())
- )
-
- override def getOutputSchemas(
- inputSchemas: Map[PortIdentity, Schema]
- ): Map[PortIdentity, Schema] = {
- val inputSchema = inputSchemas(operatorInfo.inputPorts(1).id)
- Map(
- operatorInfo.outputPorts.head.id -> inputSchema
- .add("accuracy", AttributeType.DOUBLE)
- .add("f1", AttributeType.DOUBLE)
- .add("precision", AttributeType.DOUBLE)
- .add("recall", AttributeType.DOUBLE)
- )
- }
-}