Copilot commented on code in PR #20481:
URL: https://github.com/apache/datafusion/pull/20481#discussion_r2848943242


##########
datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs:
##########
@@ -227,8 +227,58 @@ impl RunQueryResult {
         format!("{}", pretty_format_batches(&self.result).unwrap())
     }
 
+    /// Extract ORDER BY column names from the query.
+    /// The query format is always:
+    ///   `SELECT * FROM test_table ORDER BY <col> <dir> <nulls>, ... LIMIT 
<n>`
+    fn sort_columns(&self) -> Vec<String> {
+        let order_by_start = self.query.find("ORDER BY").unwrap() + "ORDER 
BY".len();
+        let limit_start = self.query.rfind(" LIMIT").unwrap();
+        self.query[order_by_start..limit_start]
+            .trim()
+            .split(',')
+            .map(|part| part.split_whitespace().next().unwrap().to_string())
+            .collect()
+    }
+
+    /// Project `batches` to only include the named columns.
+    fn project_columns(batches: &[RecordBatch], cols: &[String]) -> 
Vec<RecordBatch> {
+        batches
+            .iter()
+            .map(|b| {
+                let schema = b.schema();
+                let indices: Vec<usize> = cols
+                    .iter()
+                    .filter_map(|c| schema.index_of(c).ok())
+                    .collect();
+                let columns: Vec<_> =
+                    indices.iter().map(|&i| Arc::clone(b.column(i))).collect();
+                let fields: Vec<_> =
+                    indices.iter().map(|&i| schema.field(i).clone()).collect();
+                let new_schema = Arc::new(Schema::new(fields));
+                RecordBatch::try_new(new_schema, columns).unwrap()
+            })
+            .collect()
+    }
+
     fn is_ok(&self) -> bool {
-        self.expected_formatted() == self.result_formatted()
+        if self.expected_formatted() == self.result_formatted() {
+            return true;
+        }
+        // If the full results differ, compare only the ORDER BY column values.
+        //
+        // For queries with ORDER BY <col> LIMIT k, multiple rows may tie on 
the
+        // sort key (e.g. two rows with id=27 for ORDER BY id DESC LIMIT 1).
+        // SQL permits returning any of the tied rows, so with vs without 
dynamic
+        // filter pushdown may legitimately return different tied rows.
+        //
+        // The dynamic filter must not change the *sort-key values* of the 
top-k
+        // result. We verify correctness by projecting both results down to 
only
+        // the ORDER BY columns and comparing those.
+        let sort_cols = self.sort_columns();
+        let expected_keys = Self::project_columns(&self.expected, &sort_cols);
+        let result_keys = Self::project_columns(&self.result, &sort_cols);
+        format!("{}", pretty_format_batches(&expected_keys).unwrap())
+            == format!("{}", pretty_format_batches(&result_keys).unwrap())
     }

Review Comment:
   The new comparison logic for TOP-K queries with ties is well-designed and 
handles the case where morsel-driven execution can return different tied rows. 
The implementation correctly projects down to only the ORDER BY columns and 
compares those values. However, consider adding a comment in the code 
explaining that this is specifically needed to handle non-deterministic row 
selection in the presence of ties, which can vary with morsel-driven execution. 
This will help future maintainers understand the special handling.



##########
datafusion/datasource-parquet/src/opener.rs:
##########


Review Comment:
   The row group statistics pruning is correctly skipped for morsels (line 736: 
`&& !is_morsel`) since this pruning was already performed during the 
morselization phase. However, bloom filter pruning at line 755 is still 
performed for morsels. This could be inefficient if bloom filter checks are 
expensive, since the morselization phase already determined which row groups to 
scan. Consider whether bloom filter pruning should also be skipped for morsels 
to avoid redundant work, especially since the morsel's access_plan already 
encodes which row group to read.



##########
datafusion/sqllogictest/test_files/limit_pruning.slt:
##########
@@ -72,7 +72,7 @@ explain analyze select * from tracking_data where species > 
'M' AND s >= 50 orde
 ----
 Plan with Metrics
 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], 
preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], 
metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>]
-02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
 projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 
50 AND DynamicFilter [ species@0 < Nlpine Sheep ], 
pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND 
s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != 
row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], 
metrics=[output_rows=3, elapsed_compute=<slt:ignore>, 
output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched, 
row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, 
row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6 
total → 6 matched, limit_pruned_row_groups=0 total → 0 matched, 
bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, 
scan_efficiency_ratio=<slt:ignore> (521/2.35 K)]
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
 projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 
50 AND DynamicFilter [ species@0 < Nlpine Sheep ], 
pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND 
s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != 
row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], 
metrics=[output_rows=3, elapsed_compute=<slt:ignore>, 
output_bytes=<slt:ignore>, files_ranges_pruned_statistics=3 total → 3 matched, 
row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched, 
row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=7 
total → 7 matched, limit_pruned_row_groups=0 total → 0 matched, 
bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>, 
scan_efficiency_ratio=<slt:ignore> (521/2.35 K)]

Review Comment:
   The test expectation changes show different pruning metrics behavior. In the 
first query, bloom filter pruning changed from "3 total → 3 matched" to "1 
total → 1 matched", and in the second query, files_ranges_pruned_statistics 
changed from "1 total → 1 matched" to "3 total → 3 matched". 
   
   These metric changes suggest that morsel-driven execution is running 
file-level or row-group-level pruning multiple times (once per morsel), which 
could lead to inflated or deflated metrics. The metrics should accurately 
reflect the actual pruning work done. Consider whether these metrics should be 
aggregated differently or whether morsels should skip re-running certain 
pruning steps that were already done during morselization.



##########
datafusion/datasource/src/source.rs:
##########
@@ -300,7 +311,46 @@ impl ExecutionPlan for DataSourceExec {
         partition: usize,
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let stream = self.data_source.open(partition, Arc::clone(&context))?;
+        let shared_morsel_queue = if let Some(config) =
+            self.data_source.as_any().downcast_ref::<FileScanConfig>()
+        {
+            if config.morsel_driven {
+                let mut state = self.morsel_state.lock().unwrap();
+
+                // Start a new cycle once all expected partition streams for 
the
+                // previous cycle have been opened.
+                if state.expected_streams > 0
+                    && state.streams_opened >= state.expected_streams
+                {
+                    state.queue = None;
+                    state.streams_opened = 0;
+                    state.expected_streams = 0;
+                }
+
+                if state.queue.is_none() {
+                    let all_files = config
+                        .file_groups
+                        .iter()
+                        .flat_map(|g| g.files().to_vec())
+                        .collect();
+                    state.queue = Some(Arc::new(WorkQueue::new(all_files)));
+                    state.expected_streams = config.file_groups.len();
+                }
+
+                state.streams_opened += 1;
+                state.queue.as_ref().cloned()

Review Comment:
   The morsel state reset logic at lines 322-328 could cause issues with 
concurrent execution. If one partition calls execute() after some partitions 
have already started but before all expected_streams have opened, it will get 
the old queue. Then when the last partition opens, it will reset the queue, but 
the earlier partitions are still using references to the old queue. This could 
lead to work being distributed across two separate queues.
   
   Consider tracking whether any streams are currently active (beyond just 
counting opens) and only resetting when all previous execution has completed. 
Alternatively, use a generation number or similar mechanism to ensure all 
partitions within an execution cycle use the same queue.



##########
datafusion-examples/examples/data_io/json_shredding.rs:
##########
@@ -93,6 +93,7 @@ pub async fn json_shredding() -> Result<()> {
     // Set up query execution
     let mut cfg = SessionConfig::new();
     cfg.options_mut().execution.parquet.pushdown_filters = true;

Review Comment:
   The json_shredding example disables morsel_driven execution at line 96, 
likely because the example relies on specific row group pruning metrics for 
assertions. However, there's no comment explaining why this is necessary. 
Consider adding a comment explaining that morsel_driven execution changes the 
metrics behavior and thus is disabled for this example's assertions to pass. 
This will help future maintainers understand why this configuration is needed.
   ```suggestion
       cfg.options_mut().execution.parquet.pushdown_filters = true;
       // Disable morsel-driven execution because it changes how parquet pruning
       // metrics are reported, and this example asserts on specific row group
       // pruning statistics from EXPLAIN ANALYZE.
   ```



##########
datafusion/datasource/src/source.rs:
##########
@@ -124,6 +125,7 @@ pub trait DataSource: Send + Sync + Debug {
         &self,
         partition: usize,
         context: Arc<TaskContext>,
+        shared_morsel_queue: Option<Arc<WorkQueue>>,
     ) -> Result<SendableRecordBatchStream>;

Review Comment:
   The DataSource::open() method signature has been changed to include a new 
`shared_morsel_queue` parameter. This is a breaking API change that will affect 
all implementations of the DataSource trait outside of this codebase. While the 
PR description mentions this as a user-facing change, consider providing a 
migration guide or deprecation path for external implementors. At minimum, the 
PR description should clearly highlight this as a breaking change requiring a 
major version bump.



##########
datafusion/physical-expr/src/simplifier/mod.rs:
##########
@@ -74,11 +77,14 @@ impl<'a> PhysicalExprSimplifier<'a> {
                     })?;
 
                 #[cfg(debug_assertions)]
-                assert_eq!(
-                    rewritten.data.data_type(schema).unwrap(),
-                    original_type,
-                    "Simplified expression should have the same data type as 
the original"
-                );
+                if let Some(original_type) = original_type
+                 && let Ok(rewritten_type) = rewritten.data.data_type(schema) {
+                    assert_eq!(
+                        rewritten_type,
+                        original_type,
+                        "Simplified expression should have the same data type 
as the original"
+                    );
+                }

Review Comment:
   The changes to skip the data_type assertion when it fails are documented as 
being needed for DynamicFilterPhysicalExpr when the filter has been updated 
concurrently. However, this silently suppresses type-checking errors that could 
indicate real bugs in expression simplification. 
   
   A better approach would be to only skip the assertion specifically for 
DynamicFilterPhysicalExpr (by checking the expression type), or to handle the 
concurrent update case more explicitly. The current implementation makes 
debugging harder because type mismatches in simplification will be silently 
ignored for all expressions when they fail the data_type call.



##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -276,6 +394,89 @@ impl RecordBatchStream for FileStream {
     }
 }
 
+/// Result of pulling work from the queue
+#[derive(Debug)]
+pub enum WorkStatus {
+    /// A morsel is available
+    Work(Box<PartitionedFile>),
+    /// No morsel available now, but others are morselizing
+    Wait,
+    /// No more work available
+    Done,
+}
+
+/// A shared queue of [`PartitionedFile`] morsels for morsel-driven execution.
+#[derive(Debug)]
+pub struct WorkQueue {
+    queue: Mutex<VecDeque<PartitionedFile>>,
+    /// Number of workers currently morselizing a file.
+    morselizing_count: AtomicUsize,
+    /// Notify waiters when work is added or morselizing finishes.
+    notify: Notify,
+}
+
+impl WorkQueue {
+    /// Create a new `WorkQueue` with the given initial files
+    pub fn new(initial_files: Vec<PartitionedFile>) -> Self {
+        Self {
+            queue: Mutex::new(VecDeque::from(initial_files)),
+            morselizing_count: AtomicUsize::new(0),
+            notify: Notify::new(),
+        }
+    }
+
+    /// Pull a file from the queue.
+    pub fn pull(&self) -> WorkStatus {
+        let mut queue = self.queue.lock().unwrap();
+        if let Some(file) = queue.pop_front() {
+            self.morselizing_count.fetch_add(1, Ordering::SeqCst);
+            WorkStatus::Work(Box::new(file))

Review Comment:
   The increment of morselizing_count at line 432 happens after releasing the 
mutex lock. This creates a race condition where multiple threads can observe an 
empty queue and morselizing_count=0 simultaneously, all returning 
WorkStatus::Done prematurely. Consider this sequence:
   1. Queue has 1 file, morselizing_count=0
   2. Thread A acquires lock, pops file, releases lock (line 430-431)
   3. Thread B acquires lock, sees empty queue, checks morselizing_count (still 
0), returns Done (line 434-437)
   4. Thread A increments morselizing_count (line 432)
   
   Thread B incorrectly returns Done while Thread A is about to morselize the 
file. The increment should happen before releasing the lock, or the entire 
check-and-increment should be atomic.



##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -113,15 +144,90 @@ impl FileStream {
                 FileStreamState::Idle => {
                     self.file_stream_metrics.time_opening.start();
 
-                    match self.start_next_file().transpose() {
-                        Ok(Some(future)) => self.state = FileStreamState::Open 
{ future },
-                        Ok(None) => return Poll::Ready(None),
+                    if self.morsel_driven {
+                        let queue = self.shared_queue.as_ref().expect("shared 
queue");
+                        match queue.pull() {
+                            WorkStatus::Work(part_file) => {
+                                self.morsel_guard = Some(MorselizingGuard {
+                                    queue: Arc::clone(queue),
+                                });
+                                self.state = FileStreamState::Morselizing {
+                                    future: 
self.file_opener.morselize(*part_file),
+                                };
+                            }
+                            WorkStatus::Wait => {
+                                self.file_stream_metrics.time_opening.stop();
+                                let queue_captured = Arc::clone(queue);
+                                self.state = FileStreamState::Waiting {
+                                    future: Box::pin(async move {
+                                        let notified = 
queue_captured.notify.notified();
+                                        if !queue_captured.has_work_or_done() {
+                                            notified.await;
+                                        }
+                                    }),
+                                };
+                            }
+                            WorkStatus::Done => {
+                                self.file_stream_metrics.time_opening.stop();
+                                return Poll::Ready(None);
+                            }
+                        }
+                    } else {
+                        match self.start_next_file().transpose() {
+                            Ok(Some(future)) => {
+                                self.state = FileStreamState::Open { future }
+                            }
+                            Ok(None) => return Poll::Ready(None),
+                            Err(e) => {
+                                self.state = FileStreamState::Error;
+                                return Poll::Ready(Some(Err(e)));
+                            }
+                        }
+                    }
+                }
+                FileStreamState::Morselizing { future } => {
+                    match ready!(future.poll_unpin(cx)) {
+                        Ok(morsels) => {
+                            let queue = 
self.shared_queue.as_ref().expect("shared queue");
+                            // Take the guard to decrement morselizing_count
+                            let _guard = self.morsel_guard.take();
+
+                            if morsels.len() > 1 {
+                                self.file_stream_metrics.time_opening.stop();
+                                // Expanded into multiple morsels. Put all 
back and pull again.
+                                queue.push_many(morsels);
+                                self.state = FileStreamState::Idle;
+                            } else if morsels.len() == 1 {
+                                // No further expansion possible. Proceed to 
open.
+                                let morsel = 
morsels.into_iter().next().unwrap();
+                                match self.file_opener.open(morsel) {
+                                    Ok(future) => {
+                                        self.state = FileStreamState::Open { 
future }
+                                    }
+                                    Err(e) => {
+                                        
self.file_stream_metrics.time_opening.stop();
+                                        self.state = FileStreamState::Error;
+                                        return Poll::Ready(Some(Err(e)));
+                                    }
+                                }
+                            } else {
+                                self.file_stream_metrics.time_opening.stop();
+                                // No morsels returned, skip this file
+                                self.state = FileStreamState::Idle;
+                            }

Review Comment:
   When morselize() returns an empty vector (0 morsels - line 213-216), the 
state transitions back to Idle without notifying other waiting workers. 
However, the morselizing_count has already been decremented by the guard. If 
this was the last morselizing worker and the queue is empty, other workers 
waiting in the Waiting state may never be notified to check for the Done 
condition. Consider adding a notify_waiters() call here, or ensure that the 
guard's drop notification is sufficient (it currently does call notify_waiters 
via stop_morselizing).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to