alamb commented on code in PR #21637:
URL: https://github.com/apache/datafusion/pull/21637#discussion_r3183489922


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1075,41 +1076,54 @@ impl RowGroupsPrunedParquetOpen {
         let file_metadata = Arc::clone(reader_metadata.metadata());
         let rg_metadata = file_metadata.row_groups();
 
-        // Filter pushdown: evaluate predicates during scan
-        let row_filter = if let Some(predicate) = prepared
+        // Filter pushdown: evaluate predicates during scan.
+        // Keep the predicate around so we can rebuild RowFilter per decoder 
run
+        // when fully matched row groups split the scan into multiple decoders.
+        let pushdown_predicate = prepared
             .pushdown_filters
             .then_some(prepared.predicate.clone())
-            .flatten()
-        {
-            let row_filter = row_filter::build_row_filter(
-                &predicate,
-                &prepared.physical_file_schema,
-                file_metadata.as_ref(),
-                prepared.reorder_predicates,
-                &prepared.file_metrics,
-            );
+            .flatten();
 
-            match row_filter {
-                Ok(Some(filter)) => Some(filter),
-                Ok(None) => None,
-                Err(e) => {
-                    debug!("Ignoring error building row filter for 
'{predicate:?}': {e}");
-                    None
+        let try_build_row_filter =
+            |predicate: &Arc<dyn PhysicalExpr>| -> Option<RowFilter> {
+                match row_filter::build_row_filter(
+                    predicate,
+                    &prepared.physical_file_schema,
+                    file_metadata.as_ref(),
+                    prepared.reorder_predicates,
+                    &prepared.file_metrics,
+                ) {
+                    Ok(Some(filter)) => Some(filter),
+                    Ok(None) => None,
+                    Err(e) => {
+                        debug!(
+                            "Ignoring error building row filter for 
'{predicate:?}': {e}"
+                        );
+                        None
+                    }
                 }
-            }
-        } else {
-            None
-        };
+            };
+
+        // Build the first RowFilter eagerly; it will be reused for the first

Review Comment:
   This is needed because the RowFilter must be owned, right? I think it migth 
make this code easier to understand if you pulled the RowFilter generator logic 
into its own structure rather than a closure and manually tracked Option
   
   like
   ```rust
   let row_filter_generator = RowFilterGenerator::new(predicate,  
&prepared.physical_file_schema, ...);
   ...
   ```
   
   Perhaps as a follow on PR



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1139,33 +1155,69 @@ impl RowGroupsPrunedParquetOpen {
             reader_metadata.parquet_schema(),
         );
 
-        let mut decoder_builder =
-            ParquetPushDecoderBuilder::new_with_metadata(reader_metadata)
-                .with_projection(read_plan.projection_mask)
+        // Split into consecutive runs of row groups that share the same filter
+        // requirement. Fully matched row groups skip the RowFilter; others 
need it.
+        // Reverse the run order for reverse scans so the combined decoder 
stream
+        // preserves the requested global row group order.
+        let mut runs = access_plan.split_runs(has_row_filter);
+        if prepared.reverse_row_groups {
+            runs.reverse();
+        }
+        let run_count = runs.len();
+        let decoder_limit = prepared.limit.filter(|_| run_count == 1);
+        let remaining_limit = prepared.limit.filter(|_| run_count > 1);
+
+        // Helper: configure a decoder builder with shared options from
+        // the prepared plan.
+        let build_decoder = |prepared_access_plan: PreparedAccessPlan,

Review Comment:
   likewise here it would be nice to see this as its own function rather than a 
closure, but we can do that as a follow on PR I think
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to