Varun Srinivas created SPARK-56145:
--------------------------------------
Summary: AQE: CoalesceShufflePartitions can eliminate join
parallelism after OptimizeSkewedJoin finds no skew — no post-coalesce re-check
Key: SPARK-56145
URL: https://issues.apache.org/jira/browse/SPARK-56145
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.4.0, 3.2.0
Reporter: Varun Srinivas
CoalesceShufflePartitions can coalesce shuffle partitions on join stages down
to 1, concentrating the entire shuffle dataset into a single reducer task. This
happens _after_ OptimizeSkewedJoin has already run and determined no skew
exists — a determination that becomes invalid once coalescing destroys the
partition layout.
SPARK-35447 (fixed in 3.2.0) addressed a related interaction by ensuring
OptimizeSkewedJoin runs before CoalesceShufflePartitions, preventing coalescing
from inflating the median used for skew detection. However, the reverse
interaction was not addressed: coalescing can _create_ a data concentration
problem that didn't exist when skew detection checked.
h2. Root Cause
In AdaptiveSparkPlanExec, the two rules run in separate rule sets:
# OptimizeSkewedJoin runs in queryStagePreparationRules during the planning
phase. It reads raw MapOutputStatistics.bytesByPartitionId and computes median.
If no individual partition exceeds max(skewedPartitionThresholdInBytes,
skewedPartitionFactor * median), it correctly determines no skew exists.
# CoalesceShufflePartitions runs in queryStageOptimizerRules during the
per-stage optimization phase. It can merge many small partitions into very few
— or even 1 — partition.
There is no post-coalesce re-evaluation. The skew assessment from step 1 is
treated as final, even though coalescing in step 2 fundamentally changes the
data distribution across tasks.
h2. Example
Consider a shuffle with 200 partitions, each ~10 MB (2 GB total). The advisory
partition size is 64 MB (default).
# OptimizeSkewedJoin runs: median = 10 MB, skew threshold = max(256 MB, 5 x 10
MB) = 256 MB. No partition exceeds 256 MB. *No skew detected* — correct.
# CoalesceShufflePartitions runs: 2 GB total / 64 MB advisory = ~31
partitions. But with COALESCE_PARTITIONS_PARALLELISM_FIRST = true (default) and
low defaultParallelism, it may coalesce further — in extreme cases, to {*}1
partition{*}.
# A single reducer task now processes 2 GB of join input. If the data contains
a hot join key, this task hits cardinality explosion and becomes a 1,000x+
straggler. Even without hot keys, concentrating all data into one task
eliminates parallelism.
h2. Production Evidence
Analysis of production Spark 3.4 jobs showed:
* Top 25 straggler jobs (1,000-3,000x task duration vs stage average) *all*
had num_output_partitions = 1 after coalescing.
* Shuffle read analysis confirmed data concentration, not compute skew:
|*Job*|*Straggler Shuffle Read*|*Normal Task Shuffle Read*|*Data Concentration
Ratio*|
|Job A|0.44 GB (19M records)|~0 bytes (2.5K records)|*17,682x*|
|Job B|1.65 GB (5.4M records)|~0 bytes (9 records)|*552,948x*|
|Job C|1.43 GB (51.8M records)|0.4 MB (22.6K records)|*3,716x*|
The straggler task was the only task doing any real work — all other tasks in
the stage read effectively zero data.
h2. Suggested Fix
*Option A (minimal):* Enforce a minimum partition count in
CoalesceShufflePartitions when the stage feeds into a join. Coalescing to 1
partition on a join stage is never beneficial.
COALESCE_PARTITIONS_MIN_PARTITION_NUM exists but is optional and not join-aware.
*Option B (more robust):* Add a lightweight post-coalesce skew check in
queryStageOptimizerRules, after CoalesceShufflePartitions. This would evaluate
the coalesced partition layout and split any partitions that now exceed the
skew threshold.
*Option C (targeted):* When CoalesceShufflePartitions would reduce a join stage
below N partitions (e.g., spark.default.parallelism or a new config), cap the
coalescing at that floor.
h2. Related Issues
* SPARK-35447 — Fixed the reverse interaction (coalescing inflating skew
detection median). Resolved in 3.2.0.
* SPARK-51872 — Open. CoalesceShufflePartitions creates excessively small
partition counts during SMJ-to-BHJ conversion.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]