The actual code is not given, so I am going with the plan output and your
explanation


   - You're joining a large, bucketed table with a smaller DataFrame on a
   common column (key_col).
   - The subsequent window function also uses key_col
   - However, a shuffle occurs for the window function even though the data
   is already partitioned by key_col


Potential data skew, Though
 the table is bucketed, there might be significant data skew within the
buckets. This can cause uneven distribution of data, triggering a shuffle
for the window function.

import pyspark.sql.functions as F
df = spark.table("your_bucketed_table")
df = df.withColumn("approx_count", F.approx_count_distinct("key_col"))
df.groupBy("key_col").agg(F.avg("approx_count").alias("avg_count")).show()


HTH
Mich Talebzadeh,

Architect | Data Engineer | Data Science | Financial Crime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 15 Aug 2024 at 14:30, Shay Elbaz <sel...@paypal.com.invalid> wrote:

> Hi Spark community,
>
> Please review the cleansed plan below. It is the result of joining a
> large, bucketed table with a smaller DF, and then applying a window
> function. Both the join and the window function use the same column, which
> is also the bucket column of the table ("key_col" in the plan).
> The join results in a map-side-join as expected, but then there is a
> shuffle for the window function, even though the data is already
> partitioned accordingly.
>
> Can anyone explain why?
>
> Using Spark 3.5.0
>
>
> Thanks,
> Shay
>
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project ...
>    +- Filter (rn#5441 = 1)
>       +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC 
> NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
> currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST]
>          +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], 
> row_number(), 1, Final
>             +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], 
> false, 0
>                +- Exchange hashpartitioning(key_col#5394, 80000), 
> ENSURE_REQUIREMENTS, [plan_id=592]
>                   +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS 
> FIRST], row_number(), 1, Partial
>                      +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC 
> NULLS FIRST], false, 0
>                         +- Project ...  (key_col stays the same)
>                            +- Project [coalesce(key_col#0, key_col#5009) AS 
> key_col#5394, CASE WHEN ...
>                               +- SortMergeJoin [key_col#0], [key_col#5009], 
> FullOuter
>                                  :- Sort [key_col#0 ASC NULLS FIRST], false, 0
>                                  :  +- Project key_
>                                  :     +- FileScan parquet bucketed table ...
>                                  +- Sort [key_col#5009 ASC NULLS FIRST], 
> false, 0
>                                     +- Exchange 
> hashpartitioning(key_col#5009, 80000), REPARTITION_BY_NUM, [plan_id=572]
>                                        +- Project
>                                           +- Filter
>                                              +- Scan small table...
>
>
>

Reply via email to