wirybeaver opened a new pull request, #1718:
URL: https://github.com/apache/datafusion-ballista/pull/1718

   Adds **`SplitPartitionsRule`** — the inverse of #1684's 
`CoalescePartitionsRule`. When upstream stats show one shuffle partition is far 
larger than the median, the rule fans that partition out across multiple reader 
tasks via round-robin assignment over its file list, instead of folding small 
partitions together. Same per-stage invocation, same alignment-group leaf walk, 
same carrier-slot-on-`ExchangeExec` pattern as #1684 — strict architectural 
mirror.
   
   > **Stacks on #1684.** Until that lands, this PR's diff shows the 5 coalesce 
commits + 1 split commit. Once #1684 merges I'll rebase and the diff will 
reduce to the single split commit.
   
   Part of the AQE epic #1359. Motivating bug: #1643 (TPC-H Q2 SF1000, one 
partition 8670× larger than the median). **v1 does NOT close #1643** — see "v1 
scope" below.
   
   ## Mechanism — file-list sharding (v1)
   
   The shuffle reader side already lists multiple `PartitionLocation`s per 
output partition (one per upstream map task). Splitting just means handing 
those locations to several reader tasks via round-robin assignment by file 
index. No row-range reads, no protobuf changes, no executor data-path changes — 
pure scheduler/adapter work.
   
   Tradeoff: a partition backed by only one file can't be split this way. The 
rule bails on that idx (factor stays at 1). Row-range reads to lift that 
restriction live in a v2 task doc (linked at the bottom).
   
   ## v1 scope — and what it does NOT cover
   
   File-list sharding produces `UnknownPartitioning(K')` output. Rows that used 
to land in the same hash bucket on the M-side end up scattered across multiple 
K'-side partitions (the round-robin assignment is file-keyed, not row-keyed). 
That breaks any downstream operator with a hash or single-partition input 
requirement:
   
   - `HashJoinExec(Partitioned)` and `SortMergeJoinExec` — both legs must agree 
on hash buckets.
   - `AggregateExec(FinalPartitioned)` — assumes each downstream partition 
holds a closed set of group keys.
   - Any future operator whose `required_input_distribution()` returns 
`HashPartitioned(_)` or `SinglePartition`.
   
   Rather than enumerate operator types, the rule walks the stage subtree and 
inspects `required_input_distribution()` directly. If any node above the leaves 
demands hash or single-partition input, the rule bails the whole stage. 
Strictly correct, future-proof against new DataFusion operators.
   
   **TPC-H Q2's skew sits behind a `FinalPartitioned` aggregate**, so v1 cannot 
help it. v1 helps the narrower set of stages where the consumer is 
distribution-agnostic (`FilterExec`, `ProjectionExec`, `LocalLimitExec`, 
single-input scans into `UnknownPartitioning` sinks). The infrastructure is the 
win; v2 (row-range reads + aggregate-aware plan rewriting) is where Q2 lands.
   
   ## Surface
   
   Opt-in via a single boolean. **`ballista.planner.split.enabled=false`** is 
the default — the rule short-circuits, the plan flows through untouched. Users 
who want the skew-handling turn it on.
   
   | key | default | meaning |
   |---|---|---|
   | `ballista.planner.split.enabled` | `false` | master gate |
   | `ballista.planner.split.skew_factor` | `5.0` | Spark `OptimizeSkewedJoin` 
default — partition is "skewed" if `bytes > skew_factor × median` |
   | `ballista.planner.split.min_split_bytes` | 64 MiB | absolute floor — don't 
fan-out trivially small partitions |
   | `ballista.planner.split.max_split_factor` | `8` | per-partition fan-out 
cap to limit executor pressure |
   
   ## Algorithm
   
   For each per-stage call (mirrors the coalesce rule line-for-line):
   
   1. Bail if disabled.
   2. Single subtree walk that does two things: bail if any operator demands 
`HashPartitioned`/`SinglePartition` input, AND collect every leaf 
`ExchangeExec` (`Jump` after each hit, same convention as #1684).
   3. Conflict guard: bail if any leaf already has `coalesce()` or `split()` 
set.
   4. Alignment-group invariant: every leaf shares M (Q22 guard — same one 
#1684 added in `fix(coalesce): bail on heterogeneous M`).
   5. Sum byte sizes element-wise across the alignment group; capture leaf-0's 
file-count vector.
   6. `decide_split_factors(summed_bytes, file_counts, skew, min_bytes, 
max_factor)` — three guards (single-file → factor 1; below min-bytes → factor 
1; below skew ratio → factor 1) then `ceil(bytes / median)` capped at 
`max_factor`, floored at 2.
   7. If every factor is 1, bail.
   8. Build one `SplitPlan` with `shards = factors_to_shards(&factors)` and 
attach uniformly to every leaf via `set_split` — sharing the plan (not just K') 
keeps per-idx fan-out identical across leaves, which matters for non-join 
multi-leaf shapes (UNION).
   
   Sharing the plan across the alignment group is the same invariant #1684 
enforces for coalesce — preserves uniform K' downstream.
   
   ## Components
   
   | Area | Change |
   |---|---|
   | **Rule** | `SplitPartitionsRule` in 
`state/aqe/optimizer_rule/split_partitions.rs`. Unit struct. Invoked per-stage 
in `AdaptivePlanner::actionable_stages()` right after `CoalescePartitionsRule`. 
|
   | **Carrier** | `ExchangeExec` gains an `Arc<Mutex<Option<Arc<SplitPlan>>>>` 
slot next to the existing `coalesce` slot, with `set_split` / `split` 
accessors. `split=K' of M` annotation appears in `DisplayAs` output only when 
attached. Mutually exclusive with `coalesce` — rule's conflict guard enforces 
it. |
   | **Adapter** | `BallistaAdapter::transform_children` checks 
`exchange.split()`; on `Some(sp)` it pre-shards the M-shape upstream 
`Vec<Vec<PartitionLocation>>` into K'-shape via 
`SplitShard::owns_file(file_idx)` and builds the reader via 
`ShuffleReaderExec::try_new_split` with `UnknownPartitioning(K')`. |
   | **Reader** | `ShuffleReaderExec::try_new_split` constructor + `split: 
Option<SplitPlan>` field. Threaded through `with_work_dir`, `with_client_pool`, 
`with_new_children`, `partition_statistics`, `DisplayAs`. `execute()` needs no 
changes — `self.partition[idx]` already returns the per-output-partition 
`Vec<PartitionLocation>`, which the adapter has pre-sharded. The reader is 
oblivious to whether sharding happened. |
   | **Algorithm** | Pure-CPU helpers in `state/aqe/split/algorithm.rs` — 
`decide_split_factors` (median-based skew detection mirroring Spark's 
`OptimizeSkewedJoin`) + `factors_to_shards` (per-idx factor → flat shard list). 
|
   | **Config** | Four new keys (table above). |
   
   `SplitPlan` is NOT round-tripped through proto in v1 — the rule attaches it 
on the scheduler side, the adapter consumes it inline, and the resulting 
`ShuffleReaderExec` ships already-sharded `Vec<Vec<PartitionLocation>>`. A 
protobuf round-trip would only be needed if the rule outcome had to survive 
serialization to the executor, which it doesn't.
   
   ## Test plan
   
   - [x] `cargo test --workspace --no-fail-fast` — workspace tests pass, 0 
failures (includes the 7 coalesce regressions from #1684 plus the 8 new split 
tests).
   - [x] `cargo clippy --workspace --all-targets --tests` — 0 warnings.
   - [x] `cargo fmt --all` — clean.
   - [x] 9 unit tests in `state/aqe/split/algorithm.rs` covering the decision 
function: no-skew, single-outlier, below-min-bytes, single-file guard, 
max-factor cap, zero-median (div-by-zero guard), empty input, shard expansion 
mixed factors, all-passthrough.
   - [x] 8 functional tests in `state::aqe::test::split_rule`:
     1. `bails_on_final_partitioned_aggregate` — heavy skew, GROUP BY plan, 
snapshot confirms no `split=` annotation on the leaf.
     2. `bails_on_partitioned_hash_join` — `HashJoinExec(Partitioned)`, both 
legs.
     3. `bails_on_sort_merge_join` — `SortMergeJoinExec`, both legs.
     4. `skips_when_disabled` — synthetic safe shape, rule off, no slot 
attached.
     5. `attaches_split_when_skewed_partition_has_multiple_files` — happy path, 
M=4, idx 3 with 4 files and 4096× the median → leaf's `SplitPlan` holds K' = 3 
passthroughs + 8 split shards = 11 (max-factor cap).
     6. `skips_when_single_file_in_skewed_idx` — v1 single-file guard.
     7. `idempotent_on_second_pass` — two consecutive `optimize` calls produce 
the same `Arc`, slot not overwritten.
     8. `default_off_returns_input_arc_verbatim` — `Arc::ptr_eq(input, output)` 
when disabled.
   - [ ] **TPC-H SF=100 sanity sweep, 22 queries × 2 join variants, 
`split.enabled=true`**: deferred to PR validation env. Expected outcome is 
mostly non-regression — most queries hit a hash or `FinalPartitioned` consumer 
and the rule bails. If any query *does* fire the rule (a stage ending in 
`Filter` / `Projection` / `LocalLimit` over a hash exchange), I'll record the K 
→ K' increase in a comment.
   
   ## v2 follow-up
   
   The honest scope limitation in v1 (file-list sharding requires ≥2 files per 
partition; bails on hash/single consumers) is lifted by v2 — row-range reads 
(`PartitionLocation::row_range: Option<(u64, u64)>`, batch-count IPC reader on 
the executor) AND aggregate-aware splitting (rewrite 
`AggregateExec(FinalPartitioned)` into per-shard partial + reshuffle + final). 
That's the path #1643 will actually land on. Task doc written and cross-linked.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to