Shay - if I understand your question, you want to know if Spark has an 
optimization to eliminate shuffle from window functions in those conditions 
(when the window function partition key is equal to the bucket key,  after a 
join), and if so, why it does not apply...

Have you tried simpler variations just for debug? Are their shuffle eliminated?

  1.
Same as your query but without the join (just reading large bucketized table 
and applying the window function)
  2.
Same as your query but replace the window function -  row_number() over 
(partition by key_col order by sort col) - with SELECT key_col, min(sort_col) 
... GROUP BY key_col

Might be a case where an optimization is possible but not implemented šŸ™

Also - are your buckets sorted? by key_col + sort_key?  If so, why does Spark 
need to sort for ranking window function at all? (same for the two sorts for 
the join)
Maybe experiment a bit with spark.sql.legacy.bucketedTableScan.outputOrdering 
(again, forcing Spark to eliminate that sort since data on disk is sorted 
should eliminate the window sort - if that was implemented)
Not sure though why that parameter marked as legacy and why, no maybe missing 
something..
Just my two cents,
   Ofir

________________________________
From: Mich Talebzadeh <mich.talebza...@gmail.com>
Sent: Friday, August 16, 2024 7:54 PM
To: Shay Elbaz <sel...@paypal.com>
Cc: Shay Elbaz <sel...@paypal.com.invalid>; user@spark.apache.org 
<user@spark.apache.org>
Subject: [External] Re: Redundant(?) shuffle after join

Hi Shay,

Let me address the points you raised using the STAR methodology. I apologize if 
it sounds a bit formal, but I find it  effective for clarity.

Situation
You encountered an issue while working with a Spark DataFrame where a shuffle 
was unexpectedly triggered during the application of a window function. This 
happened even though the data was already partitioned and sorted by the key 
column (`key_col`). Specifically, the issue arose after joining a large, 
bucketed table with a smaller DataFrame on the same column used in the window 
function.

Task:
Your objective, as evident from your question, was to understand why Spark 
introduced a shuffle for the window function despite the data being 
pre-partitioned and sorted. In summary, we needed to identify the underlying 
cause of this behavior and explore possible solutions to prevent the 
unnecessary shuffle.

Action:
To investigate the issue and provide a reasonable explanation, I considered 
several possibilities:

1. Partitioning Requirements:
   I mentioned the possibility that Spark introduced the shuffle to meet its 
internal partitioning requirements for the window function. Although the data 
was already partitioned by `key_col`, Spark might still trigger a shuffle to 
ensure that the data distribution aligns perfectly with the window function's 
needs.

2. Locality and Ordering:
   I considered that Spark might have required a shuffle to enforce global 
sorting within partitions. Even though the data was locally sorted within each 
bucket, Spark could still introduce a shuffle to ensure the window function 
operates correctly across all partitions.

3. Adaptive Query Execution (AQE):
   You inquired whether AQE might have introduced the shuffle to optimize 
performance based on runtime statistics. This is indeed a possibility, as AQE 
can adjust the execution plan dynamically.

4. Compatibility and Partitioning Mismatch:
   There may be a mismatch in partitioning recognition between the join 
operation and the window function. This mismatch could lead Spark to introduce 
a shuffle, even when using the same `key_col`.

Recommendations:
To address these potential causes, I recommend the following steps:

- Check Spark's Understanding of Partitioning: Inspect the DataFrame’s 
partitioning after the join operation to ensure it aligns with expectations.
- Disable AQE Temporarily: Turn off AQE to determine if it was influencing the 
shuffle.
- Force Specific Partitioning: Repartition the DataFrame explicitly by key_co` 
before applying the window function to see if this prevents the shuffle.

HTH


Mich Talebzadeh,

Architect | Data Engineer | Data Science | Financial Crime
PhD<https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College 
London<https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Fri, 16 Aug 2024 at 14:55, Shay Elbaz 
<sel...@paypal.com<mailto:sel...@paypal.com>> wrote:
Hi Mich, thank you for answering - much appreciated.

This can cause uneven distribution of data, triggering a shuffle for the window 
function.
Could you elaborate on the mechanism that can "trigger a shuffle for the window 
function"? I'm not familiar with it. (or are you referring to AQE?)
In any case, there is no skew - the keys are GUIDs of events. Even if the data 
was skewed, the shuffle would end up exactly the same way as before the shuffle 
- the DF was already partitioned (and locally sorted) by the same key.

Thanks again,

Shay



________________________________
From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Sent: Thursday, August 15, 2024 17:21
To: Shay Elbaz <sel...@paypal.com.invalid>
Cc: user@spark.apache.org<mailto:user@spark.apache.org> 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Redundant(?) shuffle after join

This message contains hyperlinks, take precaution before opening these links.
The actual code is not given, so I am going with the plan output and your 
explanation


  *   You're joining a large, bucketed table with a smaller DataFrame on a 
common column (key_col).
  *   The subsequent window function also uses key_col
  *   However, a shuffle occurs for the window function even though the data is 
already partitioned by key_col


Potential data skew, Though

 the table is bucketed, there might be significant data skew within the 
buckets. This can cause uneven distribution of data, triggering a shuffle for 
the window function.

import pyspark.sql.functions as F
df = spark.table("your_bucketed_table")
df = df.withColumn("approx_count", F.approx_count_distinct("key_col"))
df.groupBy("key_col").agg(F.avg("approx_count").alias("avg_count")).show()


HTH
Mich Talebzadeh,

Architect | Data Engineer | Data Science | Financial Crime
PhD<https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College 
London<https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Thu, 15 Aug 2024 at 14:30, Shay Elbaz <sel...@paypal.com.invalid> wrote:
Hi Spark community,

Please review the cleansed plan below. It is the result of joining a large, 
bucketed table with a smaller DF, and then applying a window function. Both the 
join and the window function use the same column, which is also the bucket 
column of the table ("key_col" in the plan).
The join results in a map-side-join as expected, but then there is a shuffle 
for the window function, even though the data is already partitioned 
accordingly.

Can anyone explain why?

Using Spark 3.5.0


Thanks,
Shay


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project ...
   +- Filter (rn#5441 = 1)
      +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC 
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST]
         +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], 
row_number(), 1, Final
            +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], 
false, 0
               +- Exchange hashpartitioning(key_col#5394, 80000), 
ENSURE_REQUIREMENTS, [plan_id=592]
                  +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS 
FIRST], row_number(), 1, Partial
                     +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS 
FIRST], false, 0
                        +- Project ...  (key_col stays the same)
                           +- Project [coalesce(key_col#0, key_col#5009) AS 
key_col#5394, CASE WHEN ...
                              +- SortMergeJoin [key_col#0], [key_col#5009], 
FullOuter
                                 :- Sort [key_col#0 ASC NULLS FIRST], false, 0
                                 :  +- Project key_
                                 :     +- FileScan parquet bucketed table ...
                                 +- Sort [key_col#5009 ASC NULLS FIRST], false, 0
                                    +- Exchange hashpartitioning(key_col#5009, 
80000), REPARTITION_BY_NUM, [plan_id=572]
                                       +- Project
                                          +- Filter
                                             +- Scan small table...

Reply via email to