The actual code is not given, so I am going with the plan output and your explanation
- You're joining a large, bucketed table with a smaller DataFrame on a common column (key_col). - The subsequent window function also uses key_col - However, a shuffle occurs for the window function even though the data is already partitioned by key_col Potential data skew, Though the table is bucketed, there might be significant data skew within the buckets. This can cause uneven distribution of data, triggering a shuffle for the window function. import pyspark.sql.functions as F df = spark.table("your_bucketed_table") df = df.withColumn("approx_count", F.approx_count_distinct("key_col")) df.groupBy("key_col").agg(F.avg("approx_count").alias("avg_count")).show() HTH Mich Talebzadeh, Architect | Data Engineer | Data Science | Financial Crime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 15 Aug 2024 at 14:30, Shay Elbaz <sel...@paypal.com.invalid> wrote: > Hi Spark community, > > Please review the cleansed plan below. It is the result of joining a > large, bucketed table with a smaller DF, and then applying a window > function. Both the join and the window function use the same column, which > is also the bucket column of the table ("key_col" in the plan). > The join results in a map-side-join as expected, but then there is a > shuffle for the window function, even though the data is already > partitioned accordingly. > > Can anyone explain why? > > Using Spark 3.5.0 > > > Thanks, > Shay > > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project ... > +- Filter (rn#5441 = 1) > +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC > NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), > currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST] > +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], > row_number(), 1, Final > +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], > false, 0 > +- Exchange hashpartitioning(key_col#5394, 80000), > ENSURE_REQUIREMENTS, [plan_id=592] > +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS > FIRST], row_number(), 1, Partial > +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC > NULLS FIRST], false, 0 > +- Project ... (key_col stays the same) > +- Project [coalesce(key_col#0, key_col#5009) AS > key_col#5394, CASE WHEN ... > +- SortMergeJoin [key_col#0], [key_col#5009], > FullOuter > :- Sort [key_col#0 ASC NULLS FIRST], false, 0 > : +- Project key_ > : +- FileScan parquet bucketed table ... > +- Sort [key_col#5009 ASC NULLS FIRST], > false, 0 > +- Exchange > hashpartitioning(key_col#5009, 80000), REPARTITION_BY_NUM, [plan_id=572] > +- Project > +- Filter > +- Scan small table... > > >