ozankabak commented on code in PR #16196: URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128238867
########## datafusion/datasource/src/source.rs: ########## @@ -179,12 +180,17 @@ pub trait DataSource: Send + Sync + Debug { /// the [`FileSource`] trait. /// /// [`FileSource`]: crate::file::FileSource +/// We now add a `cooperative` flag to +/// let it optionally yield back to the runtime periodically. +/// Default is `true`, meaning it will yield back to the runtime for cooperative scheduling. #[derive(Clone, Debug)] pub struct DataSourceExec { /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig` data_source: Arc<dyn DataSource>, /// Cached plan properties such as sort order cache: PlanProperties, + /// Indicates whether to enable cooperative yielding mode. + cooperative: bool, Review Comment: Should this flag should come from the `data_source` object? That is what gives us the stream (which may or may not yield to the runtime). ########## datafusion/datasource/src/source.rs: ########## @@ -256,7 +262,39 @@ impl ExecutionPlan for DataSourceExec { partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream> { - self.data_source.open(partition, context) + // 1. Get the “base” stream exactly as before, without yielding. + let stream = self.data_source.open(partition, Arc::clone(&context)); + + // 2. If cooperative == false, return base_stream immediately. + if !self.cooperative { + return stream; + } + + let frequency = context + .session_config() + .options() + .optimizer + .yield_frequency_for_pipeline_break; + + // 3. If cooperative == true, wrap the stream into a YieldStream. + let yielding_stream = YieldStream::new(stream?, frequency); + Ok(Box::pin(yielding_stream)) + } + + /// Override: this operator *does* support cooperative yielding when `cooperative == true`. + fn yields_cooperatively(&self) -> bool { + self.cooperative + } + + /// If `cooperative == true`, return `Some(self.clone())` so the optimizer knows + /// we can replace a plain DataSourceExec with this same node (it already yields). + /// Otherwise, return None. + fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> { + if self.cooperative { + Some(self) + } else { + None + } Review Comment: ```suggestion self.cooperative.then_some(self) ``` ########## datafusion/sqllogictest/test_files/joins.slt: ########## @@ -4702,7 +4702,8 @@ physical_plan 01)CrossJoinExec 02)--DataSourceExec: partitions=1, partition_sizes=[0] 03)--ProjectionExec: expr=[1 as Int64(1)] -04)----PlaceholderRowExec +04)----YieldStreamExec frequency=64 +05)------PlaceholderRowExec Review Comment: Maybe we can, but let's first finalize the design and implementation for others. ########## datafusion/physical-plan/src/memory.rs: ########## @@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display { /// /// This plan generates output batches lazily, it doesn't have to buffer all batches /// in memory up front (compared to `MemorySourceConfig`), thus consuming constant memory. +/// We now add a `cooperative` flag to +/// let it optionally yield back to the runtime periodically. +/// Default is `true`, meaning it will yield back to the runtime for cooperative scheduling. Review Comment: Unnecessary ########## datafusion/core/tests/physical_optimizer/enforce_distribution.rs: ########## @@ -3534,6 +3534,7 @@ async fn test_distribute_sort_memtable() -> Result<()> { let session_config = SessionConfig::new() .with_repartition_file_min_size(1000) .with_target_partitions(3); + Review Comment: ```suggestion ``` ########## datafusion/common/src/config.rs: ########## @@ -722,6 +722,19 @@ config_namespace! { /// then the output will be coerced to a non-view. /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. pub expand_views_at_output: bool, default = false + + /// When true, the optimizer will insert a Yield operator at the leaf nodes of any pipeline + /// that contains a pipeline-breaking operator, allowing the Tokio scheduler to switch to + /// other tasks while waiting. + /// Default: true (enabled). + pub enable_add_yield_for_pipeline_break: bool, default = true + + /// Yield frequency in batches, it represents how many batches to process before yielding + /// to the Tokio scheduler. The default value is 64, which means that after processing + /// 64 batches, the execution will yield control back to the Tokio scheduler. + /// This setting is only effective when `enable_add_yield_for_pipeline_break` is set to true. + /// This value should be greater than 0. + pub yield_frequency_for_pipeline_break: usize, default = 64 Review Comment: I think a single configuration is enough, yield frequency being zero already implies there is no yielding. ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -546,6 +558,23 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { child_pushdown_result, )) } + + /// Whether this operator supports cooperative yielding. Default is false. + fn yields_cooperatively(&self) -> bool { + false Review Comment: Shouldn't this use the result of `emission_type` to return the default value instead of just returning `false`? ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt}; /// [`execute`]: ExecutionPlan::execute /// [`required_input_distribution`]: ExecutionPlan::required_input_distribution /// [`required_input_ordering`]: ExecutionPlan::required_input_ordering -pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { +/// The core trait for a physical execution plan node. Every operator +/// implements this trait. We have extended it by adding two new methods +/// for “cooperative yielding” support: +/// +/// 1. `yields_cooperatively()` indicates whether this operator already +/// supports async/yield behavior internally (default: false). +/// +/// 2. `with_cooperative_yields(self: Arc<Self>)` returns an alternate +/// plan node that has built-in yielding; if not available, returns None. +/// +/// Because `with_cooperative_yields` moves `Arc<Self>` into `Arc<dyn ExecutionPlan>`, +/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here. +pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync + 'static { Review Comment: I think we can make this work without adding the `'static` lifetime. ########## datafusion/physical-plan/src/execution_plan.rs: ########## @@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt}; /// [`execute`]: ExecutionPlan::execute /// [`required_input_distribution`]: ExecutionPlan::required_input_distribution /// [`required_input_ordering`]: ExecutionPlan::required_input_ordering -pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { +/// The core trait for a physical execution plan node. Every operator +/// implements this trait. We have extended it by adding two new methods +/// for “cooperative yielding” support: +/// +/// 1. `yields_cooperatively()` indicates whether this operator already +/// supports async/yield behavior internally (default: false). +/// +/// 2. `with_cooperative_yields(self: Arc<Self>)` returns an alternate +/// plan node that has built-in yielding; if not available, returns None. +/// +/// Because `with_cooperative_yields` moves `Arc<Self>` into `Arc<dyn ExecutionPlan>`, +/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here. Review Comment: The comments here are not necessary, having good docstrings for the methods themselves is enough. ########## datafusion/physical-plan/src/memory.rs: ########## @@ -254,10 +269,36 @@ impl ExecutionPlan for LazyMemoryExec { ); } - Ok(Box::pin(LazyMemoryStream { + let stream = Box::pin(LazyMemoryStream { schema: Arc::clone(&self.schema), generator: Arc::clone(&self.batch_generators[partition]), - })) + }); + + // 2. If cooperative == false, return base_stream immediately. + if !self.cooperative { + return Ok(stream); + } + + let frequency = context + .session_config() + .options() + .optimizer + .yield_frequency_for_pipeline_break; + + // 3. If cooperative == true, wrap the stream into a YieldStream. + let yielding_stream = YieldStream::new(stream, frequency); + Ok(Box::pin(yielding_stream)) + } + + /// If `cooperative == true`, return `Some(self.clone())` so the optimizer knows + /// we can replace a plain DataSourceExec with this same node (it already yields). + /// Otherwise, return None. + fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> { + if self.cooperative { + Some(self) + } else { + None + } Review Comment: ```suggestion self.cooperative.then_some(self) ``` ########## datafusion/physical-optimizer/src/optimizer.rs: ########## @@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan; use crate::topk_aggregation::TopKAggregation; use crate::update_aggr_exprs::OptimizeAggregateOrder; +use crate::wrap_leaves_cancellation::WrapLeaves; Review Comment: ```suggestion use crate::update_aggr_exprs::OptimizeAggregateOrder; use crate::wrap_leaves_cancellation::WrapLeaves; ``` ########## datafusion/proto/src/physical_plan/mod.rs: ########## @@ -89,6 +89,7 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result}; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; +use datafusion::physical_plan::yield_stream::YieldStreamExec; Review Comment: ```suggestion use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; use datafusion::physical_plan::yield_stream::YieldStreamExec; ``` ########## datafusion/physical-plan/src/memory.rs: ########## @@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display { /// /// This plan generates output batches lazily, it doesn't have to buffer all batches /// in memory up front (compared to `MemorySourceConfig`), thus consuming constant memory. +/// We now add a `cooperative` flag to +/// let it optionally yield back to the runtime periodically. +/// Default is `true`, meaning it will yield back to the runtime for cooperative scheduling. pub struct LazyMemoryExec { /// Schema representing the data schema: SchemaRef, /// Functions to generate batches for each partition batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>, /// Plan properties cache storing equivalence properties, partitioning, and execution mode cache: PlanProperties, + /// Indicates whether to enable cooperative yielding mode. Review Comment: ```suggestion /// Indicates whether to enable cooperative yielding mode (defaults to `true`). ``` ########## datafusion/sqllogictest/test_files/group_by.slt: ########## @@ -4113,7 +4113,7 @@ EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1 ---- logical_plan 01)Projection: lhs.c, rhs.c, lhs.sum1, rhs.sum1 -02)--Cross Join: +02)--Cross Join: Review Comment: Accidental whitespace? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org