Jacky Wang created SPARK-53421: ---------------------------------- Summary: [SDP] Investigate/Fix groupBy column attribute fails to resolve Key: SPARK-53421 URL: https://issues.apache.org/jira/browse/SPARK-53421 Project: Spark Issue Type: Sub-task Components: Declarative Pipelines Affects Versions: 4.1.0 Reporter: Jacky Wang
In SDP, we have a simple pipeline that does a {{groupby}} in the flow function ([test case|https://github.com/apache/spark/pull/52121/files#diff-0a1a8ac2ba56485e6e9153923c4c0d12767d5da8bfd75bc703a761469cfa6b7eR437]): from pyspark.sql.functions import col, sum, count @dp.materialized_view def groupby_result(): return spark.read.table("src").groupBy("id").count() It is failing with {{sql.AnalysisException:}} {{}} {{[CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like df1.select(df2.col("a"))}} {{}} {{{}{}}}This is the logical plan initially created from the query in the flow function: 'Aggregate ['id], ['id, 'count(1) AS count#7] +- 'UnresolvedRelation [src], [], false SDP does custom [analysis|https://github.com/apache/spark/blob/master/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala#L87] in FlowAnalysis before passing in the logical plan to Spark. We take a part of the logical plan: {{+- 'UnresolvedRelation [src], [], false}} and analyze it independently. Below is the final full logical plan that's sent to Spark at this [location|https://github.com/apache/spark/blob/master/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala#L130]: Aggregate ['id], ['id, 'count(1) AS count#7] +- SubqueryAlias spark_catalog.default.src +- Relation spark_catalog.default.src[id#9L] parquet This is the last few stack traces: org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveDataFrameColumn(QueryCompilationErrors.scala:4190) org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveDataFrameColumn(ColumnResolutionHelper.scala:599) org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.tryResolveDataFrameColumns(ColumnResolutionHelper.scala:574) org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren(ColumnResolutionHelper.scala:501) org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren$(ColumnResolutionHelper.scala:496) org.apache.spark.sql.catalyst.analysis.ResolveReferencesInAggregate.resolveExpressionByPlanChildren(ResolveReferencesInAggregate.scala:50) org.apache.spark.sql.catalyst.analysis.ResolveReferencesInAggregate.$anonfun$apply$1(ResolveReferencesInAggregate.scala:63) The things we know for sure: the test passes if we wrap the {{id}} field with {{col(id)}} in group by. We suspect this issue is related to how we perform independent analysis on a portion of the logical plan, which may have some interesting interaction with the spark connect [LogicalPlan.PLAN_ID_TAG|https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L209] tagging mechanism, but we are not sure. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org