andygrove commented on PR #1900: URL: https://github.com/apache/datafusion-ballista/pull/1900#issuecomment-4847651420
The new AQE-on TPC-H CI job was hanging (not the AQE-off job). Root-caused and fixed in 382b8e1e. ## Symptom Under the adaptive (AQE) planner, the job completed the AQE-off suite, then hung on the first multi-join query (Q2) and never recovered. It reproduces locally on a 1-executor / 4-slot / 16-partition cluster and is independent of slot/partition counts (hangs at `-c 4`/4, `-c 8`/8, `-c 16`/16). ## Root cause: DataFusion 54 dynamic filter pushdown DataFusion 54 populates dynamic filters at runtime from an upstream operator (a hash-join build side, a TopK heap, a partial aggregate) and reads them in a downstream scan **within the same plan**. Ballista splits a plan into stages at shuffle/broadcast boundaries that run as **independent tasks**, so when the producing operator and the consuming scan land in different stages, the filter is never populated across the boundary and the consuming scan blocks forever. This deadlocks every multi-join query under AQE, because broadcast (`CollectLeft`) hash joins create the cross-stage producer/consumer split (the probe-side scan carries `predicate=DynamicFilter [ empty ]`). The static planner escapes it only because it plans sort-merge joins, which carry no join dynamic filter — which is why the AQE-off suite passed and AQE-on hung. The regression entered when this branch merged `main`: the DataFusion 54 upgrade (#1906) and the AQE-on CI (#1913) landed in the same merge, and AQE-on had never been exercised in CI before. ## How it was confirmed - `main` (same DF54 + same AQE-on CI) passes the TPC-H job in ~24 min; only this branch hung → the un-zeroed thresholds (broadcast under AQE) are what trigger it. - DF53-vs-DF54 bisect: on the pre-merge DF53 commit, Q2/Q5/Q8/Q9 all run fine (Q2 0.16 s, 100 rows). On DF54 they all hang. - `enable_dynamic_filter_pushdown=false` makes Q2 complete and removes the `DynamicFilter` from the plan; the per-type sub-flags (join/topk/aggregate) each individually leave the hang. ## Fix Pin `datafusion.optimizer.enable_dynamic_filter_pushdown=false` in the Ballista session defaults, alongside the other DataFusion options Ballista already disables because they rely on in-process state that cannot cross stage boundaries (e.g. `enable_physical_uncorrelated_scalar_subquery`). Long-term, carrying dynamic filters across stage boundaries is tracked by #1375. Verified locally: all 21 AQE-on queries (Q16 omitted, as in CI) complete with the default config at the CI topology. Orthogonal note: Q11 and Q15 return 0 rows under **both** AQE on and off at SF10 — a pre-existing correctness issue, not introduced here and not caught by CI (which doesn't validate row counts). Worth a separate issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
