Shay - if I understand your question, you want to know if Spark has an optimization to eliminate shuffle from window functions in those conditions (when the window function partition key is equal to the bucket key, after a join), and if so, why it does not apply...
Have you tried simpler variations just for debug? Are their shuffle eliminated? 1. Same as your query but without the join (just reading large bucketized table and applying the window function) 2. Same as your query but replace the window function - row_number() over (partition by key_col order by sort col) - with SELECT key_col, min(sort_col) ... GROUP BY key_col Might be a case where an optimization is possible but not implemented š Also - are your buckets sorted? by key_col + sort_key? If so, why does Spark need to sort for ranking window function at all? (same for the two sorts for the join) Maybe experiment a bit with spark.sql.legacy.bucketedTableScan.outputOrdering (again, forcing Spark to eliminate that sort since data on disk is sorted should eliminate the window sort - if that was implemented) Not sure though why that parameter marked as legacy and why, no maybe missing something.. Just my two cents, Ofir ________________________________ From: Mich Talebzadeh <mich.talebza...@gmail.com> Sent: Friday, August 16, 2024 7:54 PM To: Shay Elbaz <sel...@paypal.com> Cc: Shay Elbaz <sel...@paypal.com.invalid>; user@spark.apache.org <user@spark.apache.org> Subject: [External] Re: Redundant(?) shuffle after join Hi Shay, Let me address the points you raised using the STAR methodology. I apologize if it sounds a bit formal, but I find it effective for clarity. Situation You encountered an issue while working with a Spark DataFrame where a shuffle was unexpectedly triggered during the application of a window function. This happened even though the data was already partitioned and sorted by the key column (`key_col`). Specifically, the issue arose after joining a large, bucketed table with a smaller DataFrame on the same column used in the window function. Task: Your objective, as evident from your question, was to understand why Spark introduced a shuffle for the window function despite the data being pre-partitioned and sorted. In summary, we needed to identify the underlying cause of this behavior and explore possible solutions to prevent the unnecessary shuffle. Action: To investigate the issue and provide a reasonable explanation, I considered several possibilities: 1. Partitioning Requirements: I mentioned the possibility that Spark introduced the shuffle to meet its internal partitioning requirements for the window function. Although the data was already partitioned by `key_col`, Spark might still trigger a shuffle to ensure that the data distribution aligns perfectly with the window function's needs. 2. Locality and Ordering: I considered that Spark might have required a shuffle to enforce global sorting within partitions. Even though the data was locally sorted within each bucket, Spark could still introduce a shuffle to ensure the window function operates correctly across all partitions. 3. Adaptive Query Execution (AQE): You inquired whether AQE might have introduced the shuffle to optimize performance based on runtime statistics. This is indeed a possibility, as AQE can adjust the execution plan dynamically. 4. Compatibility and Partitioning Mismatch: There may be a mismatch in partitioning recognition between the join operation and the window function. This mismatch could lead Spark to introduce a shuffle, even when using the same `key_col`. Recommendations: To address these potential causes, I recommend the following steps: - Check Spark's Understanding of Partitioning: Inspect the DataFrameās partitioning after the join operation to ensure it aligns with expectations. - Disable AQE Temporarily: Turn off AQE to determine if it was influencing the shuffle. - Force Specific Partitioning: Repartition the DataFrame explicitly by key_co` before applying the window function to see if this prevents the shuffle. HTH Mich Talebzadeh, Architect | Data Engineer | Data Science | Financial Crime PhD<https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London<https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom [https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE] view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun> Von Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 16 Aug 2024 at 14:55, Shay Elbaz <sel...@paypal.com<mailto:sel...@paypal.com>> wrote: Hi Mich, thank you for answering - much appreciated. This can cause uneven distribution of data, triggering a shuffle for the window function. Could you elaborate on the mechanism that can "trigger a shuffle for the window function"? I'm not familiar with it. (or are you referring to AQE?) In any case, there is no skew - the keys are GUIDs of events. Even if the data was skewed, the shuffle would end up exactly the same way as before the shuffle - the DF was already partitioned (and locally sorted) by the same key. Thanks again, Shay ________________________________ From: Mich Talebzadeh <mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>> Sent: Thursday, August 15, 2024 17:21 To: Shay Elbaz <sel...@paypal.com.invalid> Cc: user@spark.apache.org<mailto:user@spark.apache.org> <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Redundant(?) shuffle after join This message contains hyperlinks, take precaution before opening these links. The actual code is not given, so I am going with the plan output and your explanation * You're joining a large, bucketed table with a smaller DataFrame on a common column (key_col). * The subsequent window function also uses key_col * However, a shuffle occurs for the window function even though the data is already partitioned by key_col Potential data skew, Though the table is bucketed, there might be significant data skew within the buckets. This can cause uneven distribution of data, triggering a shuffle for the window function. import pyspark.sql.functions as F df = spark.table("your_bucketed_table") df = df.withColumn("approx_count", F.approx_count_distinct("key_col")) df.groupBy("key_col").agg(F.avg("approx_count").alias("avg_count")).show() HTH Mich Talebzadeh, Architect | Data Engineer | Data Science | Financial Crime PhD<https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London<https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom [https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE] view my Linkedin profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun> Von Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Thu, 15 Aug 2024 at 14:30, Shay Elbaz <sel...@paypal.com.invalid> wrote: Hi Spark community, Please review the cleansed plan below. It is the result of joining a large, bucketed table with a smaller DF, and then applying a window function. Both the join and the window function use the same column, which is also the bucket column of the table ("key_col" in the plan). The join results in a map-side-join as expected, but then there is a shuffle for the window function, even though the data is already partitioned accordingly. Can anyone explain why? Using Spark 3.5.0 Thanks, Shay == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project ... +- Filter (rn#5441 = 1) +- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST] +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], row_number(), 1, Final +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key_col#5394, 80000), ENSURE_REQUIREMENTS, [plan_id=592] +- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST], row_number(), 1, Partial +- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST], false, 0 +- Project ... (key_col stays the same) +- Project [coalesce(key_col#0, key_col#5009) AS key_col#5394, CASE WHEN ... +- SortMergeJoin [key_col#0], [key_col#5009], FullOuter :- Sort [key_col#0 ASC NULLS FIRST], false, 0 : +- Project key_ : +- FileScan parquet bucketed table ... +- Sort [key_col#5009 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(key_col#5009, 80000), REPARTITION_BY_NUM, [plan_id=572] +- Project +- Filter +- Scan small table...