pepijnve commented on code in PR #16398: URL: https://github.com/apache/datafusion/pull/16398#discussion_r2152869442
########## datafusion/physical-plan/src/coop.rs: ########## @@ -0,0 +1,325 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for improved cooperative scheduling. +//! +//! # Cooperative scheduling +//! +//! A single call to `poll_next` on a top-level `Stream` may potentially do a lot of work before it +//! returns a `Poll::Pending`. Think for instance of calculating an aggregation over a large dataset. +//! If an operator tree runs for a long period of time without yielding back to the Tokio executor, +//! it can starve other tasks waiting on that executor to execute them. +//! Additionally, this prevents the query execution from being cancelled. +//! +//! To ensure that `Stream` implementations yield regularly, operators can insert explicit yield +//! points using the utilities in this module. For most operators this is **not** necessary. The +//! built-in DataFusion operators that generate (rather than manipulate; for instance `DataSourceExec`) +//! or repartition `RecordBatch`es (for instance, `RepartitionExec`) contain yield points that will +//! make most operator trees yield as appropriate. +//! +//! There are a couple of types of operators that should insert yield points: +//! - New source operators that do not make use of Tokio resources +//! - Exchange like operators that do not use Tokio's `Channel` implementation to pass data between +//! tasks + +#[cfg(feature = "tokio_coop_fallback")] +use futures::Future; +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::execution_plan::CardinalityEffect::{self, Equal}; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, +}; +use arrow::record_batch::RecordBatch; +use arrow_schema::Schema; +use datafusion_common::{internal_err, Result, Statistics}; +use datafusion_execution::TaskContext; + +use crate::execution_plan::SchedulingType; +use crate::stream::RecordBatchStreamAdapter; +use futures::{Stream, StreamExt}; + +/// A stream that passes record batches through unchanged while cooperating with the Tokio runtime. +/// It consumes cooperative scheduling budget for each returned [`RecordBatch`], +/// allowing other tasks to execute when the budget is exhausted. +pub struct CooperativeStream<T> +where + T: RecordBatchStream + Unpin, +{ + inner: T, + #[cfg(not(any(feature = "tokio_coop", feature = "tokio_coop_fallback")))] + budget: u8, +} + +#[cfg(not(any(feature = "tokio_coop", feature = "tokio_coop_fallback")))] +// Magic value that matches Tokio's task budget value +const YIELD_FREQUENCY: u8 = 128; + +impl<T> CooperativeStream<T> +where + T: RecordBatchStream + Unpin, +{ + /// Creates a new `CooperativeStream` that wraps the provided stream. + /// The resulting stream will cooperate with the Tokio scheduler by consuming a unit of + /// scheduling budget when the wrapped `Stream` returns a record batch. + pub fn new(inner: T) -> Self { + Self { + inner, + #[cfg(not(any(feature = "tokio_coop", feature = "tokio_coop_fallback")))] + budget: YIELD_FREQUENCY, + } + } +} + +impl<T> Stream for CooperativeStream<T> +where + T: RecordBatchStream + Unpin, +{ + type Item = Result<RecordBatch>; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + #[cfg(all(feature = "tokio_coop", not(feature = "tokio_coop_fallback")))] + { + let coop = std::task::ready!(tokio::task::coop::poll_proceed(cx)); + let value = self.inner.poll_next_unpin(cx); + if value.is_ready() { + coop.made_progress(); + } + value + } + + #[cfg(feature = "tokio_coop_fallback")] + { + if !tokio::task::coop::has_budget_remaining() { + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + let value = self.inner.poll_next_unpin(cx); + if value.is_ready() { + // This is a temporary placeholder implementation + let consume = tokio::task::consume_budget(); + let consume_ref = std::pin::pin!(consume); + let _ = consume_ref.poll(cx); + } + value + } + + #[cfg(not(any(feature = "tokio_coop", feature = "tokio_coop_fallback")))] + { + if self.budget == 0 { + self.budget = YIELD_FREQUENCY; + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + let value = { self.inner.poll_next_unpin(cx) }; + + if value.is_ready() { + self.budget -= 1; + } else { + self.budget = YIELD_FREQUENCY; + } + value + } + } +} + +impl<T> RecordBatchStream for CooperativeStream<T> +where + T: RecordBatchStream + Unpin, +{ + fn schema(&self) -> Arc<Schema> { + self.inner.schema() + } +} + +/// An execution plan decorator that enables cooperative multitasking. +/// It wraps the streams produced by its input execution plan using the [`make_cooperative`] function, +/// which makes the stream participate in Tokio cooperative scheduling. +#[derive(Debug)] +pub struct CooperativeExec { + input: Arc<dyn ExecutionPlan>, + properties: PlanProperties, +} + +impl CooperativeExec { + /// Creates a new `CooperativeExec` operator that wraps the given input execution plan. + pub fn new(input: Arc<dyn ExecutionPlan>) -> Self { + let properties = input + .properties() + .clone() + .with_scheduling_type(SchedulingType::Cooperative); + + Self { input, properties } + } + + /// Returns a reference to the wrapped input execution plan. + pub fn input(&self) -> &Arc<dyn ExecutionPlan> { + &self.input + } +} + +impl DisplayAs for CooperativeExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + f: &mut std::fmt::Formatter<'_>, + ) -> std::fmt::Result { + write!(f, "CooperativeExec") + } +} + +impl ExecutionPlan for CooperativeExec { + fn name(&self) -> &str { + "CooperativeExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> Arc<Schema> { + self.input.schema() + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn maintains_input_order(&self) -> Vec<bool> { + self.input.maintains_input_order() + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![&self.input] + } + + fn with_new_children( + self: Arc<Self>, + mut children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + if children.len() != 1 { + return internal_err!("CooperativeExec requires exactly one child"); + } + Ok(Arc::new(CooperativeExec::new(children.swap_remove(0)))) + } + + fn execute( + &self, + partition: usize, + task_ctx: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let child_stream = self.input.execute(partition, task_ctx)?; + Ok(make_cooperative(child_stream)) + } + + fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> { + self.input.partition_statistics(partition) + } + + fn supports_limit_pushdown(&self) -> bool { + true + } + + fn cardinality_effect(&self) -> CardinalityEffect { + Equal + } +} + +/// Creates a [`CooperativeStream`] wrapper around the given [`RecordBatchStream`]. +/// This wrapper collaborates with the Tokio cooperative scheduler by consuming a unit of +/// scheduling budget for each returned record batch. Review Comment: I suggest we add hyperlinks to the `coop` module level docs to avoid repeating the same thing in multiple places. I've done that here already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org