Just thought I'd do a quick bump and add the dev mailing list - in case there is some insight there Feels like this should be categorized as a bug for spark 3.2.0
On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari <abdealikoth...@gmail.com> wrote: > Hi, > I am using pyspark for some projects. And one of the things we are doing > is trying to find the tables/columns being used by Spark using the > execution plan. > > When we upgrade to spark 3.2 - the spark plan seems to be different from > previous versions - mainly when we are doing joins. > Below is a reproducible example (you could run the same in versions 2.3 to > 3.1 to see the difference) > > My original data frames have the columns: id#0 and id#4 > But after doing the joins we are seeing new columns id#34 and id#19 which > are not created from the original dataframes I was working with. > In previous versions of spark, this used to use a ReusedExchange step > (shown below) > > I was trying to understand if this is expected in spark 3.2 where the > execution plan seems to be creating a new data source which does not > originate from df1 and df2 which I provided. > NOTE: The same happens even if I read from parquet files > > In spark 3.2: > In [1]: import pyspark > ...: spark = pyspark.sql.SparkSession.builder.getOrCreate() > > In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1']) > ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id', > 'col2']) > ...: df1.explain() > ...: df2.explain() > == Physical Plan == > *(1) Scan ExistingRDD[id#0L,col1#1L] > > == Physical Plan == > *(1) Scan ExistingRDD[id#4L,col2#5L] > > In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id']) > ...: df4 = df2.join(df3, df1['id'] == df2['id']) > ...: df4.explain() > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- SortMergeJoin [id#4L], [id#0L], Inner > :- Sort [id#4L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, > [id=#53] > : +- Filter isnotnull(id#4L) > : +- Scan ExistingRDD[id#4L,col2#5L] > +- Project [id#0L, col1#1L, col2#20L] > +- SortMergeJoin [id#0L], [id#19L], Inner > :- Sort [id#0L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, > [id=#45] > : +- Filter isnotnull(id#0L) > : +- Scan ExistingRDD[id#0L,col1#1L] > > > > * +- Sort [id#19L ASC NULLS FIRST], false, 0 +- > Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46] > +- Filter isnotnull(id#19L) +- Scan > ExistingRDD[id#19L,col2#20L]* > > In [4]: df1.createOrReplaceTempView('df1') > ...: df2.createOrReplaceTempView('df2') > ...: df3 = spark.sql(""" > ...: SELECT df1.id, df1.col1, df2.col2 > ...: FROM df1 JOIN df2 ON df1.id = df2.id > ...: """) > ...: df3.createOrReplaceTempView('df3') > ...: df4 = spark.sql(""" > ...: SELECT df2.*, df3.* > ...: FROM df2 JOIN df3 ON df2.id = df3.id > ...: """) > ...: df4.explain() > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- SortMergeJoin [id#4L], [id#0L], Inner > :- Sort [id#4L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, > [id=#110] > : +- Filter isnotnull(id#4L) > : +- Scan ExistingRDD[id#4L,col2#5L] > +- Project [id#0L, col1#1L, col2#35L] > +- SortMergeJoin [id#0L], [id#34L], Inner > :- Sort [id#0L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, > [id=#102] > : +- Filter isnotnull(id#0L) > : +- Scan ExistingRDD[id#0L,col1#1L] > > > > * +- Sort [id#34L ASC NULLS FIRST], false, 0 +- > Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103] > +- Filter isnotnull(id#34L) +- Scan > ExistingRDD[id#34L,col2#35L]* > > > Doing this in spark 3.1.1 - the plan is: > > *(8) SortMergeJoin [id#4L], [id#0L], Inner > :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56] > : +- *(1) Filter isnotnull(id#4L) > : +- *(1) Scan ExistingRDD[id#4L,col2#5L] > +- *(7) Project [id#0L, col1#1L, col2#20L] > +- *(7) SortMergeJoin [id#0L], [id#19L], Inner > :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, > [id=#62] > : +- *(3) Filter isnotnull(id#0L) > : +- *(3) Scan ExistingRDD[id#0L,col1#1L] > > * +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0 +- > ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200), > ENSURE_REQUIREMENTS, [id=#56]* > >