[ https://issues.apache.org/jira/browse/SPARK-48311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932926#comment-17932926 ]
Sumit Singh commented on SPARK-48311: ------------------------------------- Fix and UT added: [https://github.com/apache/spark/pull/50183] cc [~itskals] [~wenchen] > Nested pythonUDF in groupBy and aggregate result in Binding Exception > ---------------------------------------------------------------------- > > Key: SPARK-48311 > URL: https://issues.apache.org/jira/browse/SPARK-48311 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Affects Versions: 3.3.2 > Reporter: Sumit Singh > Priority: Major > Labels: pull-request-available > > Steps to Reproduce > 1. Data creation > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql.types import StructType, StructField, LongType, > TimestampType, StringType > from datetime import datetime > # Define the schema > schema = StructType([ > StructField("col1", LongType(), nullable=True), > StructField("col2", TimestampType(), nullable=True), > StructField("col3", StringType(), nullable=True) > ]) > # Define the data > data = [ > (1, datetime(2023, 5, 15, 12, 30), "Discount"), > (2, datetime(2023, 5, 16, 16, 45), "Promotion"), > (3, datetime(2023, 5, 17, 9, 15), "Coupon") > ] > # Create the DataFrame > df = spark.createDataFrame(data, schema) > df.createOrReplaceTempView("temp_offers") > # Query the temporary table using SQL > # DISTINCT required to reproduce the issue. > testDf = spark.sql(""" > SELECT DISTINCT > col1, > col2, > col3 FROM temp_offers > """) {code} > 2. UDF registration > {code:java} > import pyspark.sql.functions as F > import pyspark.sql.types as T > #Creating udf functions > def udf1(d): > return d > def udf2(d): > if d.isoweekday() in (1, 2, 3, 4): > return 'WEEKDAY' > else: > return 'WEEKEND' > udf1_name = F.udf(udf1, T.TimestampType()) > udf2_name = F.udf(udf2, T.StringType()) {code} > 3. Adding UDF in grouping and agg > {code:java} > groupBy_cols = ['col1', 'col4', 'col5', 'col3'] > temp = testDf \ > .select('*', udf1_name(F.col('col2')).alias('col4')).select('*', > udf2_name('col4').alias('col5')) > result = > (temp.groupBy(*groupBy_cols).agg(F.countDistinct('col5').alias('col6'))){code} > 4. Result > {code:java} > result.show(5, False) {code} > *We get below error* > {code:java} > An error was encountered: > An error occurred while calling o1079.showString. > : java.lang.IllegalStateException: Couldn't find pythonUDF0#1108 in > [col1#978L,groupingPythonUDF#1104,groupingPythonUDF#1105,col3#980,count(pythonUDF0#1108)#1080L] > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org