Copilot commented on code in PR #20481:
URL: https://github.com/apache/datafusion/pull/20481#discussion_r2848943242
##########
datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs:
##########
@@ -227,8 +227,58 @@ impl RunQueryResult {
format!("{}", pretty_format_batches(&self.result).unwrap())
}
+ /// Extract ORDER BY column names from the query.
+ /// The query format is always:
+ /// `SELECT * FROM test_table ORDER BY <col> <dir> <nulls>, ... LIMIT
<n>`
+ fn sort_columns(&self) -> Vec<String> {
+ let order_by_start = self.query.find("ORDER BY").unwrap() + "ORDER
BY".len();
+ let limit_start = self.query.rfind(" LIMIT").unwrap();
+ self.query[order_by_start..limit_start]
+ .trim()
+ .split(',')
+ .map(|part| part.split_whitespace().next().unwrap().to_string())
+ .collect()
+ }
+
+ /// Project `batches` to only include the named columns.
+ fn project_columns(batches: &[RecordBatch], cols: &[String]) ->
Vec<RecordBatch> {
+ batches
+ .iter()
+ .map(|b| {
+ let schema = b.schema();
+ let indices: Vec<usize> = cols
+ .iter()
+ .filter_map(|c| schema.index_of(c).ok())
+ .collect();
+ let columns: Vec<_> =
+ indices.iter().map(|&i| Arc::clone(b.column(i))).collect();
+ let fields: Vec<_> =
+ indices.iter().map(|&i| schema.field(i).clone()).collect();
+ let new_schema = Arc::new(Schema::new(fields));
+ RecordBatch::try_new(new_schema, columns).unwrap()
+ })
+ .collect()
+ }
+
fn is_ok(&self) -> bool {
- self.expected_formatted() == self.result_formatted()
+ if self.expected_formatted() == self.result_formatted() {
+ return true;
+ }
+ // If the full results differ, compare only the ORDER BY column values.
+ //
+ // For queries with ORDER BY <col> LIMIT k, multiple rows may tie on
the
+ // sort key (e.g. two rows with id=27 for ORDER BY id DESC LIMIT 1).
+ // SQL permits returning any of the tied rows, so with vs without
dynamic
+ // filter pushdown may legitimately return different tied rows.
+ //
+ // The dynamic filter must not change the *sort-key values* of the
top-k
+ // result. We verify correctness by projecting both results down to
only
+ // the ORDER BY columns and comparing those.
+ let sort_cols = self.sort_columns();
+ let expected_keys = Self::project_columns(&self.expected, &sort_cols);
+ let result_keys = Self::project_columns(&self.result, &sort_cols);
+ format!("{}", pretty_format_batches(&expected_keys).unwrap())
+ == format!("{}", pretty_format_batches(&result_keys).unwrap())
}
Review Comment:
The new comparison logic for TOP-K queries with ties is well-designed and
handles the case where morsel-driven execution can return different tied rows.
The implementation correctly projects down to only the ORDER BY columns and
compares those values. However, consider adding a comment in the code
explaining that this is specifically needed to handle non-deterministic row
selection in the presence of ties, which can vary with morsel-driven execution.
This will help future maintainers understand the special handling.
##########
datafusion/datasource-parquet/src/opener.rs:
##########
Review Comment:
The row group statistics pruning is correctly skipped for morsels (line 736:
`&& !is_morsel`) since this pruning was already performed during the
morselization phase. However, bloom filter pruning at line 755 is still
performed for morsels. This could be inefficient if bloom filter checks are
expensive, since the morselization phase already determined which row groups to
scan. Consider whether bloom filter pruning should also be skipped for morsels
to avoid redundant work, especially since the morsel's access_plan already
encodes which row group to read.
##########
datafusion/sqllogictest/test_files/limit_pruning.slt:
##########
@@ -72,7 +72,7 @@ explain analyze select * from tracking_data where species >
'M' AND s >= 50 orde
----
Plan with Metrics
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST],
preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep],
metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>]
-02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >=
50 AND DynamicFilter [ species@0 < Nlpine Sheep ],
pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND
s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 !=
row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[],
metrics=[output_rows=3, elapsed_compute=<slt:ignore>,
output_bytes=<slt:ignore>, files_ranges_pruned_statistics=1 total → 1 matched,
row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched,
row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=6
total → 6 matched, limit_pruned_row_groups=0 total → 0 matched,
bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>,
scan_efficiency_ratio=<slt:ignore> (521/2.35 K)]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >=
50 AND DynamicFilter [ species@0 < Nlpine Sheep ],
pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND
s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 !=
row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[],
metrics=[output_rows=3, elapsed_compute=<slt:ignore>,
output_bytes=<slt:ignore>, files_ranges_pruned_statistics=3 total → 3 matched,
row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched,
row_groups_pruned_bloom_filter=3 total → 3 matched, page_index_pages_pruned=7
total → 7 matched, limit_pruned_row_groups=0 total → 0 matched,
bytes_scanned=<slt:ignore>, metadata_load_time=<slt:ignore>,
scan_efficiency_ratio=<slt:ignore> (521/2.35 K)]
Review Comment:
The test expectation changes show different pruning metrics behavior. In the
first query, bloom filter pruning changed from "3 total → 3 matched" to "1
total → 1 matched", and in the second query, files_ranges_pruned_statistics
changed from "1 total → 1 matched" to "3 total → 3 matched".
These metric changes suggest that morsel-driven execution is running
file-level or row-group-level pruning multiple times (once per morsel), which
could lead to inflated or deflated metrics. The metrics should accurately
reflect the actual pruning work done. Consider whether these metrics should be
aggregated differently or whether morsels should skip re-running certain
pruning steps that were already done during morselization.
##########
datafusion/datasource/src/source.rs:
##########
@@ -300,7 +311,46 @@ impl ExecutionPlan for DataSourceExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- let stream = self.data_source.open(partition, Arc::clone(&context))?;
+ let shared_morsel_queue = if let Some(config) =
+ self.data_source.as_any().downcast_ref::<FileScanConfig>()
+ {
+ if config.morsel_driven {
+ let mut state = self.morsel_state.lock().unwrap();
+
+ // Start a new cycle once all expected partition streams for
the
+ // previous cycle have been opened.
+ if state.expected_streams > 0
+ && state.streams_opened >= state.expected_streams
+ {
+ state.queue = None;
+ state.streams_opened = 0;
+ state.expected_streams = 0;
+ }
+
+ if state.queue.is_none() {
+ let all_files = config
+ .file_groups
+ .iter()
+ .flat_map(|g| g.files().to_vec())
+ .collect();
+ state.queue = Some(Arc::new(WorkQueue::new(all_files)));
+ state.expected_streams = config.file_groups.len();
+ }
+
+ state.streams_opened += 1;
+ state.queue.as_ref().cloned()
Review Comment:
The morsel state reset logic at lines 322-328 could cause issues with
concurrent execution. If one partition calls execute() after some partitions
have already started but before all expected_streams have opened, it will get
the old queue. Then when the last partition opens, it will reset the queue, but
the earlier partitions are still using references to the old queue. This could
lead to work being distributed across two separate queues.
Consider tracking whether any streams are currently active (beyond just
counting opens) and only resetting when all previous execution has completed.
Alternatively, use a generation number or similar mechanism to ensure all
partitions within an execution cycle use the same queue.
##########
datafusion-examples/examples/data_io/json_shredding.rs:
##########
@@ -93,6 +93,7 @@ pub async fn json_shredding() -> Result<()> {
// Set up query execution
let mut cfg = SessionConfig::new();
cfg.options_mut().execution.parquet.pushdown_filters = true;
Review Comment:
The json_shredding example disables morsel_driven execution at line 96,
likely because the example relies on specific row group pruning metrics for
assertions. However, there's no comment explaining why this is necessary.
Consider adding a comment explaining that morsel_driven execution changes the
metrics behavior and thus is disabled for this example's assertions to pass.
This will help future maintainers understand why this configuration is needed.
```suggestion
cfg.options_mut().execution.parquet.pushdown_filters = true;
// Disable morsel-driven execution because it changes how parquet pruning
// metrics are reported, and this example asserts on specific row group
// pruning statistics from EXPLAIN ANALYZE.
```
##########
datafusion/datasource/src/source.rs:
##########
@@ -124,6 +125,7 @@ pub trait DataSource: Send + Sync + Debug {
&self,
partition: usize,
context: Arc<TaskContext>,
+ shared_morsel_queue: Option<Arc<WorkQueue>>,
) -> Result<SendableRecordBatchStream>;
Review Comment:
The DataSource::open() method signature has been changed to include a new
`shared_morsel_queue` parameter. This is a breaking API change that will affect
all implementations of the DataSource trait outside of this codebase. While the
PR description mentions this as a user-facing change, consider providing a
migration guide or deprecation path for external implementors. At minimum, the
PR description should clearly highlight this as a breaking change requiring a
major version bump.
##########
datafusion/physical-expr/src/simplifier/mod.rs:
##########
@@ -74,11 +77,14 @@ impl<'a> PhysicalExprSimplifier<'a> {
})?;
#[cfg(debug_assertions)]
- assert_eq!(
- rewritten.data.data_type(schema).unwrap(),
- original_type,
- "Simplified expression should have the same data type as
the original"
- );
+ if let Some(original_type) = original_type
+ && let Ok(rewritten_type) = rewritten.data.data_type(schema) {
+ assert_eq!(
+ rewritten_type,
+ original_type,
+ "Simplified expression should have the same data type
as the original"
+ );
+ }
Review Comment:
The changes to skip the data_type assertion when it fails are documented as
being needed for DynamicFilterPhysicalExpr when the filter has been updated
concurrently. However, this silently suppresses type-checking errors that could
indicate real bugs in expression simplification.
A better approach would be to only skip the assertion specifically for
DynamicFilterPhysicalExpr (by checking the expression type), or to handle the
concurrent update case more explicitly. The current implementation makes
debugging harder because type mismatches in simplification will be silently
ignored for all expressions when they fail the data_type call.
##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -276,6 +394,89 @@ impl RecordBatchStream for FileStream {
}
}
+/// Result of pulling work from the queue
+#[derive(Debug)]
+pub enum WorkStatus {
+ /// A morsel is available
+ Work(Box<PartitionedFile>),
+ /// No morsel available now, but others are morselizing
+ Wait,
+ /// No more work available
+ Done,
+}
+
+/// A shared queue of [`PartitionedFile`] morsels for morsel-driven execution.
+#[derive(Debug)]
+pub struct WorkQueue {
+ queue: Mutex<VecDeque<PartitionedFile>>,
+ /// Number of workers currently morselizing a file.
+ morselizing_count: AtomicUsize,
+ /// Notify waiters when work is added or morselizing finishes.
+ notify: Notify,
+}
+
+impl WorkQueue {
+ /// Create a new `WorkQueue` with the given initial files
+ pub fn new(initial_files: Vec<PartitionedFile>) -> Self {
+ Self {
+ queue: Mutex::new(VecDeque::from(initial_files)),
+ morselizing_count: AtomicUsize::new(0),
+ notify: Notify::new(),
+ }
+ }
+
+ /// Pull a file from the queue.
+ pub fn pull(&self) -> WorkStatus {
+ let mut queue = self.queue.lock().unwrap();
+ if let Some(file) = queue.pop_front() {
+ self.morselizing_count.fetch_add(1, Ordering::SeqCst);
+ WorkStatus::Work(Box::new(file))
Review Comment:
The increment of morselizing_count at line 432 happens after releasing the
mutex lock. This creates a race condition where multiple threads can observe an
empty queue and morselizing_count=0 simultaneously, all returning
WorkStatus::Done prematurely. Consider this sequence:
1. Queue has 1 file, morselizing_count=0
2. Thread A acquires lock, pops file, releases lock (line 430-431)
3. Thread B acquires lock, sees empty queue, checks morselizing_count (still
0), returns Done (line 434-437)
4. Thread A increments morselizing_count (line 432)
Thread B incorrectly returns Done while Thread A is about to morselize the
file. The increment should happen before releasing the lock, or the entire
check-and-increment should be atomic.
##########
datafusion/datasource/src/file_stream.rs:
##########
@@ -113,15 +144,90 @@ impl FileStream {
FileStreamState::Idle => {
self.file_stream_metrics.time_opening.start();
- match self.start_next_file().transpose() {
- Ok(Some(future)) => self.state = FileStreamState::Open
{ future },
- Ok(None) => return Poll::Ready(None),
+ if self.morsel_driven {
+ let queue = self.shared_queue.as_ref().expect("shared
queue");
+ match queue.pull() {
+ WorkStatus::Work(part_file) => {
+ self.morsel_guard = Some(MorselizingGuard {
+ queue: Arc::clone(queue),
+ });
+ self.state = FileStreamState::Morselizing {
+ future:
self.file_opener.morselize(*part_file),
+ };
+ }
+ WorkStatus::Wait => {
+ self.file_stream_metrics.time_opening.stop();
+ let queue_captured = Arc::clone(queue);
+ self.state = FileStreamState::Waiting {
+ future: Box::pin(async move {
+ let notified =
queue_captured.notify.notified();
+ if !queue_captured.has_work_or_done() {
+ notified.await;
+ }
+ }),
+ };
+ }
+ WorkStatus::Done => {
+ self.file_stream_metrics.time_opening.stop();
+ return Poll::Ready(None);
+ }
+ }
+ } else {
+ match self.start_next_file().transpose() {
+ Ok(Some(future)) => {
+ self.state = FileStreamState::Open { future }
+ }
+ Ok(None) => return Poll::Ready(None),
+ Err(e) => {
+ self.state = FileStreamState::Error;
+ return Poll::Ready(Some(Err(e)));
+ }
+ }
+ }
+ }
+ FileStreamState::Morselizing { future } => {
+ match ready!(future.poll_unpin(cx)) {
+ Ok(morsels) => {
+ let queue =
self.shared_queue.as_ref().expect("shared queue");
+ // Take the guard to decrement morselizing_count
+ let _guard = self.morsel_guard.take();
+
+ if morsels.len() > 1 {
+ self.file_stream_metrics.time_opening.stop();
+ // Expanded into multiple morsels. Put all
back and pull again.
+ queue.push_many(morsels);
+ self.state = FileStreamState::Idle;
+ } else if morsels.len() == 1 {
+ // No further expansion possible. Proceed to
open.
+ let morsel =
morsels.into_iter().next().unwrap();
+ match self.file_opener.open(morsel) {
+ Ok(future) => {
+ self.state = FileStreamState::Open {
future }
+ }
+ Err(e) => {
+
self.file_stream_metrics.time_opening.stop();
+ self.state = FileStreamState::Error;
+ return Poll::Ready(Some(Err(e)));
+ }
+ }
+ } else {
+ self.file_stream_metrics.time_opening.stop();
+ // No morsels returned, skip this file
+ self.state = FileStreamState::Idle;
+ }
Review Comment:
When morselize() returns an empty vector (0 morsels - line 213-216), the
state transitions back to Idle without notifying other waiting workers.
However, the morselizing_count has already been decremented by the guard. If
this was the last morselizing worker and the queue is empty, other workers
waiting in the Waiting state may never be notified to check for the Done
condition. Consider adding a notify_waiters() call here, or ensure that the
guard's drop notification is sufficient (it currently does call notify_waiters
via stop_morselizing).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]