Jacky Wang created SPARK-53421:
----------------------------------

             Summary: [SDP] Investigate/Fix groupBy column attribute fails to 
resolve
                 Key: SPARK-53421
                 URL: https://issues.apache.org/jira/browse/SPARK-53421
             Project: Spark
          Issue Type: Sub-task
          Components: Declarative Pipelines
    Affects Versions: 4.1.0
            Reporter: Jacky Wang


In SDP, we have a simple pipeline that does a {{groupby}} in the flow function 
([test 
case|https://github.com/apache/spark/pull/52121/files#diff-0a1a8ac2ba56485e6e9153923c4c0d12767d5da8bfd75bc703a761469cfa6b7eR437]):
from pyspark.sql.functions import col, sum, count 
@dp.materialized_view 
def groupby_result(): 
    return spark.read.table("src").groupBy("id").count()
 
It is failing with {{sql.AnalysisException:}}
{{}}
{{[CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's 
probably because of illegal references like df1.select(df2.col("a"))}}
{{}}
{{{}{}}}This is the logical plan initially created from the query in the flow 
function:
'Aggregate ['id], ['id, 'count(1) AS count#7] 
   +- 'UnresolvedRelation [src], [], false
 
SDP does custom 
[analysis|https://github.com/apache/spark/blob/master/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala#L87]
 in FlowAnalysis before passing in the logical plan to Spark. We take a part of 
the logical plan:
{{+- 'UnresolvedRelation [src], [], false}} 
and analyze it independently. Below is the final full logical plan that's sent 
to Spark at this 
[location|https://github.com/apache/spark/blob/master/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala#L130]:
Aggregate ['id], ['id, 'count(1) AS count#7] 
   +- SubqueryAlias spark_catalog.default.src 
     +- Relation spark_catalog.default.src[id#9L] parquet

This is the last few stack traces:
org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveDataFrameColumn(QueryCompilationErrors.scala:4190)
 
org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveDataFrameColumn(ColumnResolutionHelper.scala:599)
 
org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.tryResolveDataFrameColumns(ColumnResolutionHelper.scala:574)
 
org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren(ColumnResolutionHelper.scala:501)
 
org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren$(ColumnResolutionHelper.scala:496)
 
org.apache.spark.sql.catalyst.analysis.ResolveReferencesInAggregate.resolveExpressionByPlanChildren(ResolveReferencesInAggregate.scala:50)
 
org.apache.spark.sql.catalyst.analysis.ResolveReferencesInAggregate.$anonfun$apply$1(ResolveReferencesInAggregate.scala:63)
 
The things we know for sure: the test passes if we wrap the {{id}} field with 
{{col(id)}} in group by.

We suspect this issue is related to how we perform independent analysis on a 
portion of the logical plan, which may have some interesting interaction with 
the spark connect 
[LogicalPlan.PLAN_ID_TAG|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L209]
 tagging mechanism, but we are not sure.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to