[ https://issues.apache.org/jira/browse/SPARK-51118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932163#comment-17932163 ]
Ruifeng Zheng edited comment on SPARK-51118 at 3/5/25 1:48 AM: --------------------------------------------------------------- In pyspark classic, the minimum reproducer is {code:java} import operatorfrom typing import * from pyspark.ml.linalg import Vectors, Matrices, Matrix, Vector, Vectors, VectorUDT from pyspark.sql import SparkSession, DataFrame, Row from pyspark.ml.classification import * from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.sql.types import *from pyspark.sql.functions import udf df = spark.createDataFrame([(1.0, 1.0, Vectors.dense(1,2,3)), (0.0, 2.0, Vectors.dense(4,5,6)),], ["label", "weight", "vector"],) initUDF = udf(lambda _: [], ArrayType(DoubleType())) df2 = df.withColumn("array", initUDF("vector")) updateUDF = udf(lambda arr, vec: arr + [vec.tolist()[1]], ArrayType(DoubleType()),) df3 = df2.withColumn("array", updateUDF("array", "vector")) df4 = df3.withColumn("array", updateUDF("array", "vector")) def func(predictions: Iterable[float]) -> Vector: predArray: List[float] = [] for x in predictions: predArray.append(x) return Vectors.dense(predArray) rawPredictionUDF = udf(func, VectorUDT()) df5 = df4.withColumn("raw", rawPredictionUDF("array")) labelUDF = udf(lambda arr: float(max(enumerate(arr), key=operator.itemgetter(1))[0]), DoubleType(),) df6 = df5.withColumn("prediction", labelUDF("array")) eva = MulticlassClassificationEvaluator() eva.evaluate(df6) {code} df6.show(), df6.collect() works well no matter whether arrow optimization is on or off. But eva.evaluate(df6) fails with spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True) And looks like the fallbacked plan is still executed with arrow eval type. was (Author: podongfeng): The minimum reproducer is {code:java} import operatorfrom typing import * from pyspark.ml.linalg import Vectors, Matrices, Matrix, Vector, Vectors, VectorUDT from pyspark.sql import SparkSession, DataFrame, Row from pyspark.ml.classification import * from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.sql.types import *from pyspark.sql.functions import udf df = spark.createDataFrame([(1.0, 1.0, Vectors.dense(1,2,3)), (0.0, 2.0, Vectors.dense(4,5,6)),], ["label", "weight", "vector"],) initUDF = udf(lambda _: [], ArrayType(DoubleType())) df2 = df.withColumn("array", initUDF("vector")) updateUDF = udf(lambda arr, vec: arr + [vec.tolist()[1]], ArrayType(DoubleType()),) df3 = df2.withColumn("array", updateUDF("array", "vector")) df4 = df3.withColumn("array", updateUDF("array", "vector")) def func(predictions: Iterable[float]) -> Vector: predArray: List[float] = [] for x in predictions: predArray.append(x) return Vectors.dense(predArray) rawPredictionUDF = udf(func, VectorUDT()) df5 = df4.withColumn("raw", rawPredictionUDF("array")) labelUDF = udf(lambda arr: float(max(enumerate(arr), key=operator.itemgetter(1))[0]), DoubleType(),) df6 = df5.withColumn("prediction", labelUDF("array")) eva = MulticlassClassificationEvaluator() eva.evaluate(df6) {code} df6.show(), df6.collect() works well no matter whether arrow optimization is on or off. But eva.evaluate(df6) fails with spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True) And looks like the fallbacked plan is still executed with arrow eval type. > Use Arrow python UDF for ml internal code > ----------------------------------------- > > Key: SPARK-51118 > URL: https://issues.apache.org/jira/browse/SPARK-51118 > Project: Spark > Issue Type: Sub-task > Components: ML, PySpark > Affects Versions: 4.0.0 > Reporter: Xinrong Meng > Priority: Major > > When enabling Arrow optimization for Python UDF, areas below error out with > ValueError: 'tolist' is not in list. > - CrossValidator Fit > The issue occurs in test_save_load_nested_estimator, when executing > cv.fit(dataset). > Full stacktrace see > [https://github.com/xinrong-meng/spark/actions/runs/13167027085/job/36749584932] > - OneVsRestModel _transform > Full stacktrace see > [https://github.com/xinrong-meng/spark/actions/runs/13188569330/job/36819938746] > - UnaryTransformer _transform > Now Arrow optimization is disabled in those places explicitly, but we should > enable those in the near future. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org