ozankabak commented on code in PR #16196:
URL: https://github.com/apache/datafusion/pull/16196#discussion_r2128238867


##########
datafusion/datasource/src/source.rs:
##########
@@ -179,12 +180,17 @@ pub trait DataSource: Send + Sync + Debug {
 /// the [`FileSource`] trait.
 ///
 /// [`FileSource`]: crate::file::FileSource
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for 
cooperative scheduling.
 #[derive(Clone, Debug)]
 pub struct DataSourceExec {
     /// The source of the data -- for example, `FileScanConfig` or 
`MemorySourceConfig`
     data_source: Arc<dyn DataSource>,
     /// Cached plan properties such as sort order
     cache: PlanProperties,
+    /// Indicates whether to enable cooperative yielding mode.
+    cooperative: bool,

Review Comment:
   Should this flag should come from the `data_source` object? That is what 
gives us the stream (which may or may not yield to the runtime).



##########
datafusion/datasource/src/source.rs:
##########
@@ -256,7 +262,39 @@ impl ExecutionPlan for DataSourceExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        self.data_source.open(partition, context)
+        // 1. Get the “base” stream exactly as before, without yielding.
+        let stream = self.data_source.open(partition, Arc::clone(&context));
+
+        // 2. If cooperative == false, return base_stream immediately.
+        if !self.cooperative {
+            return stream;
+        }
+
+        let frequency = context
+            .session_config()
+            .options()
+            .optimizer
+            .yield_frequency_for_pipeline_break;
+
+        // 3. If cooperative == true, wrap the stream into a YieldStream.
+        let yielding_stream = YieldStream::new(stream?, frequency);
+        Ok(Box::pin(yielding_stream))
+    }
+
+    /// Override: this operator *does* support cooperative yielding when 
`cooperative == true`.
+    fn yields_cooperatively(&self) -> bool {
+        self.cooperative
+    }
+
+    /// If `cooperative == true`, return `Some(self.clone())` so the optimizer 
knows
+    /// we can replace a plain DataSourceExec with this same node (it already 
yields).
+    /// Otherwise, return None.
+    fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn 
ExecutionPlan>> {
+        if self.cooperative {
+            Some(self)
+        } else {
+            None
+        }

Review Comment:
   ```suggestion
           self.cooperative.then_some(self)
   ```



##########
datafusion/sqllogictest/test_files/joins.slt:
##########
@@ -4702,7 +4702,8 @@ physical_plan
 01)CrossJoinExec
 02)--DataSourceExec: partitions=1, partition_sizes=[0]
 03)--ProjectionExec: expr=[1 as Int64(1)]
-04)----PlaceholderRowExec
+04)----YieldStreamExec frequency=64
+05)------PlaceholderRowExec

Review Comment:
   Maybe we can, but let's first finalize the design and implementation for 
others.



##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + 
fmt::Display {
 ///
 /// This plan generates output batches lazily, it doesn't have to buffer all 
batches
 /// in memory up front (compared to `MemorySourceConfig`), thus consuming 
constant memory.
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for 
cooperative scheduling.

Review Comment:
   Unnecessary



##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3534,6 +3534,7 @@ async fn test_distribute_sort_memtable() -> Result<()> {
     let session_config = SessionConfig::new()
         .with_repartition_file_min_size(1000)
         .with_target_partitions(3);
+

Review Comment:
   ```suggestion
   ```



##########
datafusion/common/src/config.rs:
##########
@@ -722,6 +722,19 @@ config_namespace! {
         /// then the output will be coerced to a non-view.
         /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to 
`LargeBinary`.
         pub expand_views_at_output: bool, default = false
+
+        /// When true, the optimizer will insert a Yield operator at the leaf 
nodes of any pipeline
+        /// that contains a pipeline-breaking operator, allowing the Tokio 
scheduler to switch to
+        /// other tasks while waiting.
+        /// Default: true (enabled).
+        pub enable_add_yield_for_pipeline_break: bool, default = true
+
+        /// Yield frequency in batches, it represents how many batches to 
process before yielding
+        /// to the Tokio scheduler. The default value is 64, which means that 
after processing
+        /// 64 batches, the execution will yield control back to the Tokio 
scheduler.
+        /// This setting is only effective when 
`enable_add_yield_for_pipeline_break` is set to true.
+        /// This value should be greater than 0.
+        pub yield_frequency_for_pipeline_break: usize, default = 64

Review Comment:
   I think a single configuration is enough, yield frequency being zero already 
implies there is no yielding.



##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -546,6 +558,23 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
             child_pushdown_result,
         ))
     }
+
+    /// Whether this operator supports cooperative yielding. Default is false.
+    fn yields_cooperatively(&self) -> bool {
+        false

Review Comment:
   Shouldn't this use the result of `emission_type` to return the default value 
instead of just returning `false`?



##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
 /// [`execute`]: ExecutionPlan::execute
 /// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
 /// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for “cooperative yielding” support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+///    supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc<Self>)` returns an alternate
+///    plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc<Self>` into `Arc<dyn 
ExecutionPlan>`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.
+pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync + 'static {

Review Comment:
   I think we can make this work without adding the `'static` lifetime.



##########
datafusion/physical-plan/src/execution_plan.rs:
##########
@@ -75,7 +75,19 @@ use futures::stream::{StreamExt, TryStreamExt};
 /// [`execute`]: ExecutionPlan::execute
 /// [`required_input_distribution`]: ExecutionPlan::required_input_distribution
 /// [`required_input_ordering`]: ExecutionPlan::required_input_ordering
-pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
+/// The core trait for a physical execution plan node. Every operator
+/// implements this trait. We have extended it by adding two new methods
+/// for “cooperative yielding” support:
+///
+/// 1. `yields_cooperatively()` indicates whether this operator already
+///    supports async/yield behavior internally (default: false).
+///
+/// 2. `with_cooperative_yields(self: Arc<Self>)` returns an alternate
+///    plan node that has built-in yielding; if not available, returns None.
+///
+/// Because `with_cooperative_yields` moves `Arc<Self>` into `Arc<dyn 
ExecutionPlan>`,
+/// we must ensure `Self: 'static`. Therefore, we add `+ 'static` here.

Review Comment:
   The comments here are not necessary, having good docstrings for the methods 
themselves is enough.



##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -254,10 +269,36 @@ impl ExecutionPlan for LazyMemoryExec {
             );
         }
 
-        Ok(Box::pin(LazyMemoryStream {
+        let stream = Box::pin(LazyMemoryStream {
             schema: Arc::clone(&self.schema),
             generator: Arc::clone(&self.batch_generators[partition]),
-        }))
+        });
+
+        // 2. If cooperative == false, return base_stream immediately.
+        if !self.cooperative {
+            return Ok(stream);
+        }
+
+        let frequency = context
+            .session_config()
+            .options()
+            .optimizer
+            .yield_frequency_for_pipeline_break;
+
+        // 3. If cooperative == true, wrap the stream into a YieldStream.
+        let yielding_stream = YieldStream::new(stream, frequency);
+        Ok(Box::pin(yielding_stream))
+    }
+
+    /// If `cooperative == true`, return `Some(self.clone())` so the optimizer 
knows
+    /// we can replace a plain DataSourceExec with this same node (it already 
yields).
+    /// Otherwise, return None.
+    fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn 
ExecutionPlan>> {
+        if self.cooperative {
+            Some(self)
+        } else {
+            None
+        }

Review Comment:
   ```suggestion
           self.cooperative.then_some(self)
   ```



##########
datafusion/physical-optimizer/src/optimizer.rs:
##########
@@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan;
 use crate::topk_aggregation::TopKAggregation;
 use crate::update_aggr_exprs::OptimizeAggregateOrder;
 
+use crate::wrap_leaves_cancellation::WrapLeaves;

Review Comment:
   ```suggestion
   use crate::update_aggr_exprs::OptimizeAggregateOrder;
   use crate::wrap_leaves_cancellation::WrapLeaves;
   
   ```



##########
datafusion/proto/src/physical_plan/mod.rs:
##########
@@ -89,6 +89,7 @@ use datafusion_common::config::TableParquetOptions;
 use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
 use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
 
+use datafusion::physical_plan::yield_stream::YieldStreamExec;

Review Comment:
   ```suggestion
   use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
   use datafusion::physical_plan::yield_stream::YieldStreamExec;
   
   ```



##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -139,13 +140,18 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + 
fmt::Display {
 ///
 /// This plan generates output batches lazily, it doesn't have to buffer all 
batches
 /// in memory up front (compared to `MemorySourceConfig`), thus consuming 
constant memory.
+/// We now add a `cooperative` flag to
+/// let it optionally yield back to the runtime periodically.
+/// Default is `true`, meaning it will yield back to the runtime for 
cooperative scheduling.
 pub struct LazyMemoryExec {
     /// Schema representing the data
     schema: SchemaRef,
     /// Functions to generate batches for each partition
     batch_generators: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
     /// Plan properties cache storing equivalence properties, partitioning, 
and execution mode
     cache: PlanProperties,
+    /// Indicates whether to enable cooperative yielding mode.

Review Comment:
   ```suggestion
       /// Indicates whether to enable cooperative yielding mode (defaults to 
`true`).
   ```



##########
datafusion/sqllogictest/test_files/group_by.slt:
##########
@@ -4113,7 +4113,7 @@ EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1
 ----
 logical_plan
 01)Projection: lhs.c, rhs.c, lhs.sum1, rhs.sum1
-02)--Cross Join:
+02)--Cross Join: 

Review Comment:
   Accidental whitespace?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to