Dandandan opened a new pull request, #21718:
URL: https://github.com/apache/datafusion/pull/21718

   ## Summary
   
   Adds an experimental workspace crate, `datafusion-morsel-scheduler`, that 
executes an `ExecutionPlan` on a pool of pinned per-core Tokio `current_thread` 
runtimes instead of the default multi-threaded runtime. The ClickBench 
benchmark runner defaults to the new scheduler; existing paths remain available 
via `--no-morsel-scheduler`.
   
   This is a **draft / prototype** to explore the morsel-driven + 
thread-per-core direction on top of today's `ExecutionPlan` abstraction. Not 
aimed for merge in its current shape — opening for discussion of the 
architecture.
   
   ## Architecture
   
   Three layers in the new crate:
   
   ### 1. `runtime::WorkerPool`
   N OS threads, each owning a dedicated `current_thread` runtime plus a 
`LocalSet`. Work is shipped as a `Send` closure that constructs the real 
(possibly `!Send`) future on-worker and `spawn_local`-s it, so the future never 
migrates across threads. `spawn_on(worker_id, ..)` for partition-affinity, 
`spawn_any(..)` for round-robin.
   
   ### 2. Inboxes + pipeline splitting
   A **Pipeline** is a breaker-free `ExecutionPlan` subgraph. The planner walks 
the plan and cuts at v1 breakers — `CoalescePartitionsExec` and 
`SortPreservingMergeExec` — replacing each breaker's children with 
`InboxSourceExec` stubs and recording the detached subtrees as upstream 
pipelines. Morsels flow through bounded mpsc inboxes between pipelines (natural 
backpressure via the bounded capacity).
   
   \`\`\`
   Pipeline A (final):                   Pipeline B (upstream):
     Root                                  OriginalChild
      └─ SPM                               (N partitions)
          └─ InboxSourceExec    ◄──── one sender per partition
   \`\`\`
   
   Breakers explicitly **not** cut in v1: `RepartitionExec` (would require 
reimplementing partitioning), `SortExec`, `HashJoinExec` build-side.
   
   ### 3. `WorkerDispatchExec` + executor
   Before scheduling, each pipeline's leaves are wrapped with 
`WorkerDispatchExec` so that `RepartitionExec`'s lazy-spawned input fetchers 
spread across the pool instead of piling onto whichever worker wins the 
first-poll race. The top-level `execute()` splits the plan into pipelines, 
schedules each partition on `worker = partition % N`, and bridges the final 
pipeline's output back to the caller as a normal `SendableRecordBatchStream`.
   
   ### ClickBench wiring
   The benchmark runner now builds a `WorkerPool` by default and routes query 
execution through `morsel_scheduler::execute`. New flags:
   
   - \`--no-morsel-scheduler\` — fall back to the ambient multi-threaded 
runtime.
   - \`--morsel-workers N\` — override worker count (default: 
\`std::thread::available_parallelism()\`).
   
   ## What's intentionally out of scope for v1
   
   - **Cutting at RepartitionExec / SortExec / hash-join build** — hooks exist 
in the planner but these would need per-breaker partitioning logic.
   - **NUMA / CPU pinning** — workers are OS threads, not core-pinned.
   - **Explicit work-stealing across workers** — partitions are pinned to 
\`partition % N\`; a skewed pipeline will stall its worker.
   - **Async I/O on workers** — no \`tokio-uring\` integration in this crate; 
the \`tokio-uring-benchmark-demo\` branch is a separate experiment that could 
eventually layer on top.
   
   ## Test plan
   
   - [x] \`cargo fmt --all\`
   - [x] \`cargo clippy -p datafusion-morsel-scheduler -p datafusion-benchmarks 
--all-targets -- -D warnings\`
   - [x] \`cargo test -p datafusion-morsel-scheduler --lib\` — 7 unit tests 
covering \`WorkerPool\`, \`Inbox\`/\`InboxSourceExec\`, planner cuts, and an 
end-to-end plan run
   - [ ] End-to-end ClickBench comparison (morsel scheduler vs. baseline)
   - [ ] Sanity test on larger queries (joins, aggregations with 
RepartitionExec)
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to