Dandandan opened a new pull request, #21718:
URL: https://github.com/apache/datafusion/pull/21718
## Summary
Adds an experimental workspace crate, `datafusion-morsel-scheduler`, that
executes an `ExecutionPlan` on a pool of pinned per-core Tokio `current_thread`
runtimes instead of the default multi-threaded runtime. The ClickBench
benchmark runner defaults to the new scheduler; existing paths remain available
via `--no-morsel-scheduler`.
This is a **draft / prototype** to explore the morsel-driven +
thread-per-core direction on top of today's `ExecutionPlan` abstraction. Not
aimed for merge in its current shape — opening for discussion of the
architecture.
## Architecture
Three layers in the new crate:
### 1. `runtime::WorkerPool`
N OS threads, each owning a dedicated `current_thread` runtime plus a
`LocalSet`. Work is shipped as a `Send` closure that constructs the real
(possibly `!Send`) future on-worker and `spawn_local`-s it, so the future never
migrates across threads. `spawn_on(worker_id, ..)` for partition-affinity,
`spawn_any(..)` for round-robin.
### 2. Inboxes + pipeline splitting
A **Pipeline** is a breaker-free `ExecutionPlan` subgraph. The planner walks
the plan and cuts at v1 breakers — `CoalescePartitionsExec` and
`SortPreservingMergeExec` — replacing each breaker's children with
`InboxSourceExec` stubs and recording the detached subtrees as upstream
pipelines. Morsels flow through bounded mpsc inboxes between pipelines (natural
backpressure via the bounded capacity).
\`\`\`
Pipeline A (final): Pipeline B (upstream):
Root OriginalChild
└─ SPM (N partitions)
└─ InboxSourceExec ◄──── one sender per partition
\`\`\`
Breakers explicitly **not** cut in v1: `RepartitionExec` (would require
reimplementing partitioning), `SortExec`, `HashJoinExec` build-side.
### 3. `WorkerDispatchExec` + executor
Before scheduling, each pipeline's leaves are wrapped with
`WorkerDispatchExec` so that `RepartitionExec`'s lazy-spawned input fetchers
spread across the pool instead of piling onto whichever worker wins the
first-poll race. The top-level `execute()` splits the plan into pipelines,
schedules each partition on `worker = partition % N`, and bridges the final
pipeline's output back to the caller as a normal `SendableRecordBatchStream`.
### ClickBench wiring
The benchmark runner now builds a `WorkerPool` by default and routes query
execution through `morsel_scheduler::execute`. New flags:
- \`--no-morsel-scheduler\` — fall back to the ambient multi-threaded
runtime.
- \`--morsel-workers N\` — override worker count (default:
\`std::thread::available_parallelism()\`).
## What's intentionally out of scope for v1
- **Cutting at RepartitionExec / SortExec / hash-join build** — hooks exist
in the planner but these would need per-breaker partitioning logic.
- **NUMA / CPU pinning** — workers are OS threads, not core-pinned.
- **Explicit work-stealing across workers** — partitions are pinned to
\`partition % N\`; a skewed pipeline will stall its worker.
- **Async I/O on workers** — no \`tokio-uring\` integration in this crate;
the \`tokio-uring-benchmark-demo\` branch is a separate experiment that could
eventually layer on top.
## Test plan
- [x] \`cargo fmt --all\`
- [x] \`cargo clippy -p datafusion-morsel-scheduler -p datafusion-benchmarks
--all-targets -- -D warnings\`
- [x] \`cargo test -p datafusion-morsel-scheduler --lib\` — 7 unit tests
covering \`WorkerPool\`, \`Inbox\`/\`InboxSourceExec\`, planner cuts, and an
end-to-end plan run
- [ ] End-to-end ClickBench comparison (morsel scheduler vs. baseline)
- [ ] Sanity test on larger queries (joins, aggregations with
RepartitionExec)
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]