Hi Spark community,
Please review the cleansed plan below. It is the result of joining a large,
bucketed table with a smaller DF, and then applying a window function. Both the
join and the window function use the same column, which is also the bucket
column of the table ("key_col" in the plan).
The join results in a map-side-join as expected, but then there is a shuffle
for the window function, even though the data is already partitioned
accordingly.
Can anyone explain why?
Using Spark 3.5.0
Thanks,
Shay
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project ...
+- Filter (rn#5441 = 1)
+- Window [row_number() windowspecdefinition(key_col#5394, _w0#5442 ASC
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())) AS rn#5441], [key_col#5394], [_w0#5442 ASC NULLS FIRST]
+- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS FIRST],
row_number(), 1, Final
+- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS FIRST],
false, 0
+- Exchange hashpartitioning(key_col#5394, 80000),
ENSURE_REQUIREMENTS, [plan_id=592]
+- WindowGroupLimit [key_col#5394], [_w0#5442 ASC NULLS
FIRST], row_number(), 1, Partial
+- Sort [key_col#5394 ASC NULLS FIRST, _w0#5442 ASC NULLS
FIRST], false, 0
+- Project ... (key_col stays the same)
+- Project [coalesce(key_col#0, key_col#5009) AS
key_col#5394, CASE WHEN ...
+- SortMergeJoin [key_col#0], [key_col#5009],
FullOuter
:- Sort [key_col#0 ASC NULLS FIRST], false, 0
: +- Project key_
: +- FileScan parquet bucketed table ...
+- Sort [key_col#5009 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key_col#5009,
80000), REPARTITION_BY_NUM, [plan_id=572]
+- Project
+- Filter
+- Scan small table...