[ 
https://issues.apache.org/jira/browse/SPARK-51118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932163#comment-17932163
 ] 

Ruifeng Zheng edited comment on SPARK-51118 at 3/31/25 3:58 AM:
----------------------------------------------------------------

In pyspark classic, the minimum reproducer is 

 
{code:java}
import operator
from typing import *
from pyspark.ml.linalg import Vectors, Matrices, Matrix, Vector, Vectors, 
VectorUDT
from pyspark.sql import SparkSession, DataFrame, Row
from pyspark.ml.classification import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import *
from pyspark.sql.functions import udf


df = spark.createDataFrame([(1.0, 1.0, Vectors.dense(1,2,3)), (0.0, 2.0, 
Vectors.dense(4,5,6)),],   ["label", "weight", "vector"],)

initUDF = udf(lambda _: [], ArrayType(DoubleType()))
df2 = df.withColumn("array", initUDF("vector"))

updateUDF = udf(lambda arr, vec: arr + [vec.tolist()[1]], 
ArrayType(DoubleType()),)
df3 = df2.withColumn("array", updateUDF("array", "vector"))
df4 = df3.withColumn("array", updateUDF("array", "vector"))

def func(predictions: Iterable[float]) -> Vector:
    predArray: List[float] = []
    for x in predictions:
        predArray.append(x)
    return Vectors.dense(predArray)

rawPredictionUDF = udf(func, VectorUDT())
df5 = df4.withColumn("raw", rawPredictionUDF("array"))

labelUDF = udf(lambda arr: float(max(enumerate(arr), 
key=operator.itemgetter(1))[0]), DoubleType(),)

df6 = df5.withColumn("prediction", labelUDF("array"))

eva = MulticlassClassificationEvaluator()
eva.evaluate(df6) {code}
 

df6.show(), df6.collect() works well no matter whether arrow optimization is on 
or off.

But
eva.evaluate(df6) 
fails with spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
 
And looks like the fallbacked plan is still executed with arrow eval type.


was (Author: podongfeng):
In pyspark classic, the minimum reproducer is 

 
{code:java}
import operatorfrom typing import *
from pyspark.ml.linalg import Vectors, Matrices, Matrix, Vector, Vectors, 
VectorUDT
from pyspark.sql import SparkSession, DataFrame, Row
from pyspark.ml.classification import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import *
from pyspark.sql.functions import udf


df = spark.createDataFrame([(1.0, 1.0, Vectors.dense(1,2,3)), (0.0, 2.0, 
Vectors.dense(4,5,6)),],   ["label", "weight", "vector"],)

initUDF = udf(lambda _: [], ArrayType(DoubleType()))
df2 = df.withColumn("array", initUDF("vector"))

updateUDF = udf(lambda arr, vec: arr + [vec.tolist()[1]], 
ArrayType(DoubleType()),)
df3 = df2.withColumn("array", updateUDF("array", "vector"))
df4 = df3.withColumn("array", updateUDF("array", "vector"))

def func(predictions: Iterable[float]) -> Vector:
    predArray: List[float] = []
    for x in predictions:
        predArray.append(x)
    return Vectors.dense(predArray)

rawPredictionUDF = udf(func, VectorUDT())
df5 = df4.withColumn("raw", rawPredictionUDF("array"))

labelUDF = udf(lambda arr: float(max(enumerate(arr), 
key=operator.itemgetter(1))[0]), DoubleType(),)

df6 = df5.withColumn("prediction", labelUDF("array"))

eva = MulticlassClassificationEvaluator()
eva.evaluate(df6) {code}
 

df6.show(), df6.collect() works well no matter whether arrow optimization is on 
or off.

But
eva.evaluate(df6) 
fails with spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
 
And looks like the fallbacked plan is still executed with arrow eval type.

> Use Arrow python UDF for ml internal code
> -----------------------------------------
>
>                 Key: SPARK-51118
>                 URL: https://issues.apache.org/jira/browse/SPARK-51118
>             Project: Spark
>          Issue Type: Sub-task
>          Components: ML, PySpark
>    Affects Versions: 4.0.0
>            Reporter: Xinrong Meng
>            Assignee: Takuya Ueshin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.0.0
>
>
>  When enabling Arrow optimization for Python UDF, areas below error out with 
> ValueError: 'tolist' is not in list.
> - CrossValidator Fit 
> The issue occurs in test_save_load_nested_estimator, when executing 
> cv.fit(dataset). 
> Full stacktrace see 
> [https://github.com/xinrong-meng/spark/actions/runs/13167027085/job/36749584932]
> - OneVsRestModel _transform
> Full stacktrace see 
> [https://github.com/xinrong-meng/spark/actions/runs/13188569330/job/36819938746]
> - UnaryTransformer _transform
> Now Arrow optimization is disabled in those places explicitly, but we should 
> enable those in the near future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to