[ 
https://issues.apache.org/jira/browse/SPARK-48311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932926#comment-17932926
 ] 

Sumit Singh commented on SPARK-48311:
-------------------------------------

Fix and UT added: [https://github.com/apache/spark/pull/50183] 
cc [~itskals] [~wenchen] 

> Nested pythonUDF in groupBy and aggregate result in Binding Exception 
> ----------------------------------------------------------------------
>
>                 Key: SPARK-48311
>                 URL: https://issues.apache.org/jira/browse/SPARK-48311
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 3.3.2
>            Reporter: Sumit Singh
>            Priority: Major
>              Labels: pull-request-available
>
> Steps to Reproduce 
> 1. Data creation
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType, StructField, LongType, 
> TimestampType, StringType
> from datetime import datetime
> # Define the schema
> schema = StructType([
>     StructField("col1", LongType(), nullable=True),
>     StructField("col2", TimestampType(), nullable=True),
>     StructField("col3", StringType(), nullable=True)
> ])
> # Define the data
> data = [
>     (1, datetime(2023, 5, 15, 12, 30), "Discount"),
>     (2, datetime(2023, 5, 16, 16, 45), "Promotion"),
>     (3, datetime(2023, 5, 17, 9, 15), "Coupon")
> ]
> # Create the DataFrame
> df = spark.createDataFrame(data, schema)
> df.createOrReplaceTempView("temp_offers")
> # Query the temporary table using SQL
> # DISTINCT required to reproduce the issue. 
> testDf = spark.sql("""
>                     SELECT DISTINCT 
>                     col1,
>                     col2,
>                     col3 FROM temp_offers
>                     """) {code}
> 2. UDF registration 
> {code:java}
> import pyspark.sql.functions as F 
> import pyspark.sql.types as T
> #Creating udf functions 
> def udf1(d):
>     return d
> def udf2(d):
>     if d.isoweekday() in (1, 2, 3, 4):
>         return 'WEEKDAY'
>     else:
>         return 'WEEKEND'
> udf1_name = F.udf(udf1, T.TimestampType())
> udf2_name = F.udf(udf2, T.StringType()) {code}
> 3. Adding UDF in grouping and agg
> {code:java}
> groupBy_cols = ['col1', 'col4', 'col5', 'col3']
> temp = testDf \
>   .select('*', udf1_name(F.col('col2')).alias('col4')).select('*', 
> udf2_name('col4').alias('col5')) 
> result = 
> (temp.groupBy(*groupBy_cols).agg(F.countDistinct('col5').alias('col6'))){code}
> 4. Result
> {code:java}
> result.show(5, False) {code}
> *We get below error*
> {code:java}
> An error was encountered:
> An error occurred while calling o1079.showString.
> : java.lang.IllegalStateException: Couldn't find pythonUDF0#1108 in 
> [col1#978L,groupingPythonUDF#1104,groupingPythonUDF#1105,col3#980,count(pythonUDF0#1108)#1080L]
>       at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
>       at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to