adriangb commented on code in PR #15561:
URL: https://github.com/apache/datafusion/pull/15561#discussion_r2027429964


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -67,13 +67,13 @@ pub(crate) mod test_util {
             .into_iter()
             .zip(tmp_files.into_iter())
             .map(|(batch, mut output)| {
-                let builder = 
parquet::file::properties::WriterProperties::builder();
-                let props = if multi_page {
-                    builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
-                } else {
-                    builder
+                let mut builder = 
parquet::file::properties::WriterProperties::builder();
+                if multi_page {
+                    builder = 
builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
                 }
-                .build();
+                builder = builder.set_bloom_filter_enabled(true);

Review Comment:
   Since I'm now using the actual explain plan to check that bloom filters are 
used it's easiest to just enable them and check that they pruned. It's also a 
more realistic test in that it's what I as a user would do to check if bloom 
filters are working.



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -137,71 +139,109 @@ mod tests {
             self.round_trip(batches).await.batches
         }
 
-        /// run the test, returning the `RoundTripResult`
-        async fn round_trip(self, batches: Vec<RecordBatch>) -> 
RoundTripResult {
-            let Self {
-                projection,
-                schema,
-                predicate,
-                pushdown_predicate,
-                page_index_predicate,
-            } = self;
-
-            let file_schema = match schema {
-                Some(schema) => schema,
-                None => Arc::new(
-                    Schema::try_merge(
-                        batches.iter().map(|b| b.schema().as_ref().clone()),
-                    )
-                    .unwrap(),
-                ),
-            };
-            // If testing with page_index_predicate, write parquet
-            // files with multiple pages
-            let multi_page = page_index_predicate;
-            let (meta, _files) = store_parquet(batches, 
multi_page).await.unwrap();
-            let file_group = meta.into_iter().map(Into::into).collect();
-
+        fn build_file_source(&self, file_schema: SchemaRef) -> 
Arc<ParquetSource> {
             // set up predicate (this is normally done by a layer higher up)
-            let predicate = predicate.map(|p| logical2physical(&p, 
&file_schema));
+            let predicate = self
+                .predicate
+                .as_ref()
+                .map(|p| logical2physical(p, &file_schema));
 
             let mut source = ParquetSource::default();
             if let Some(predicate) = predicate {
                 source = source.with_predicate(Arc::clone(&file_schema), 
predicate);
             }
 
-            if pushdown_predicate {
+            if self.pushdown_predicate {
                 source = source
                     .with_pushdown_filters(true)
                     .with_reorder_filters(true);
             }
 
-            if page_index_predicate {
+            if self.page_index_predicate {
                 source = source.with_enable_page_index(true);
             }
 
+            Arc::new(source)
+        }
+
+        fn build_parquet_exec(
+            &self,
+            file_schema: SchemaRef,
+            file_group: FileGroup,
+            source: Arc<ParquetSource>,
+        ) -> Arc<DataSourceExec> {
             let base_config = FileScanConfigBuilder::new(
                 ObjectStoreUrl::local_filesystem(),
                 file_schema,
-                Arc::new(source.clone()),
+                source,
             )
             .with_file_group(file_group)
-            .with_projection(projection)
+            .with_projection(self.projection.clone())
             .build();
+            DataSourceExec::from_data_source(base_config)
+        }
+
+        /// run the test, returning the `RoundTripResult`
+        async fn round_trip(&self, batches: Vec<RecordBatch>) -> 
RoundTripResult {
+            let file_schema = match &self.schema {
+                Some(schema) => schema,
+                None => &Arc::new(
+                    Schema::try_merge(
+                        batches.iter().map(|b| b.schema().as_ref().clone()),
+                    )
+                    .unwrap(),
+                ),
+            };
+            let file_schema = Arc::clone(file_schema);
+            // If testing with page_index_predicate, write parquet
+            // files with multiple pages
+            let multi_page = self.page_index_predicate;
+            let (meta, _files) = store_parquet(batches, 
multi_page).await.unwrap();
+            let file_group: FileGroup = 
meta.into_iter().map(Into::into).collect();
+
+            // build a ParquetExec to return the results
+            let parquet_source = self.build_file_source(file_schema.clone());
+            let parquet_exec = self.build_parquet_exec(
+                file_schema.clone(),
+                file_group.clone(),
+                Arc::clone(&parquet_source),
+            );
+
+            let analyze_exec = Arc::new(AnalyzeExec::new(
+                false,
+                false,
+                // use a new ParquetSource to avoid sharing execution metrics
+                self.build_parquet_exec(
+                    file_schema.clone(),
+                    file_group.clone(),
+                    self.build_file_source(file_schema.clone()),
+                ),
+                Arc::new(Schema::new(vec![
+                    Field::new("plan_type", DataType::Utf8, true),
+                    Field::new("plan", DataType::Utf8, true),
+                ])),
+            ));

Review Comment:
   The reason I'm refactoring `RoundTrip` this is that I need to create the 
file source twice: `ParquetSource` has internal `Metrics` so if we run the 
query against the same `ParquetSource` twice (once for the data and once for 
the explain analyze plan) then we end up with duplicate metrics. Cloning it 
doesn't help / work because the metrics themselves are `Arc`ed.
   
   I think generally asserting against the explain plan and not a handle to the 
`ParquetSource` is more in line with how real world users use DataFusion to 
debug if the page index, stats, etc. are working / pruning.



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -295,3 +315,84 @@ fn create_initial_plan(
     // default to scanning all row groups
     Ok(ParquetAccessPlan::new_all(row_group_count))
 }
+
+/// Build a pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a pruning
+/// predicate, return None.
+/// If there is an error creating the pruning predicate it is recorded by 
incrementing
+/// the `predicate_creation_errors` counter.
+pub fn build_pruning_predicate(
+    predicate: Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+    predicate_creation_errors: &Count,
+) -> Option<Arc<PruningPredicate>> {
+    match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
+        Ok(pruning_predicate) => {
+            if !pruning_predicate.always_true() {
+                return Some(Arc::new(pruning_predicate));
+            }
+        }
+        Err(e) => {
+            debug!("Could not create pruning predicate for: {e}");
+            predicate_creation_errors.add(1);
+        }
+    }
+    None
+}
+
+/// Build a page pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a page 
pruning
+/// predicate, return None.
+pub fn build_page_pruning_predicate(
+    predicate: &Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+) -> Arc<PagePruningAccessPlanFilter> {
+    Arc::new(PagePruningAccessPlanFilter::new(
+        predicate,
+        Arc::clone(file_schema),
+    ))
+}
+
+fn build_pruning_predicates(
+    predicate: &Option<Arc<dyn PhysicalExpr>>,
+    file_schema: &SchemaRef,
+    predicate_creation_errors: &Count,
+) -> (
+    Option<Arc<PruningPredicate>>,
+    Option<Arc<PagePruningAccessPlanFilter>>,
+) {
+    let Some(predicate) = predicate.as_ref() else {
+        return (None, None);
+    };
+    let pruning_predicate = build_pruning_predicate(
+        Arc::clone(predicate),
+        file_schema,
+        predicate_creation_errors,
+    );
+    let page_pruning_predicate = build_page_pruning_predicate(predicate, 
file_schema);
+    (pruning_predicate, Some(page_pruning_predicate))
+}
+
+async fn load_page_index<T: AsyncFileReader>(
+    arrow_reader: ArrowReaderMetadata,
+    input: &mut T,
+    options: ArrowReaderOptions,
+) -> Result<ArrowReaderMetadata> {
+    let parquet_metadata = arrow_reader.metadata();
+    let missing_column_index = parquet_metadata.column_index().is_none();
+    let missing_offset_index = parquet_metadata.offset_index().is_none();
+    if missing_column_index || missing_offset_index {
+        let m = Arc::try_unwrap(Arc::clone(&parquet_metadata))
+            .unwrap_or_else(|e| e.as_ref().clone());
+        let mut reader =
+            
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true);
+        reader.load_page_index(input).await?;
+        let new_parquet_metadata = reader.finish()?;
+        let new_arrow_reader =
+            ArrowReaderMetadata::try_new(Arc::new(new_parquet_metadata), 
options)?;
+        Ok(new_arrow_reader)
+    } else {
+        // No page index, return the original metadata
+        Ok(arrow_reader)
+    }
+}

Review Comment:
   I feel like this should be in arrow alongside `load_async`, but putting it 
here for now



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -498,6 +473,7 @@ impl FileSource for ParquetSource {
             reorder_filters: self.reorder_filters(),
             enable_page_index: self.enable_page_index(),
             enable_bloom_filter: self.bloom_filter_on_read(),
+            enable_row_group_stats_pruning: 
self.table_parquet_options.global.pruning,

Review Comment:
   This is just for consistency with the global option: before this I don't 
think this option was being referenced anywhere / it was broken. Maybe it's 
being decided in `ListingTable`? I feel like there's a couple options that 
you'd think apply to `ParquetSource` but only get handled via `ListingTable`.



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1581,16 +1508,19 @@ mod tests {
             .await;
 
         // Should not contain a pruning predicate (since nothing can be pruned)
-        let pruning_predicate = rt.parquet_source.pruning_predicate();
-        assert!(
-            pruning_predicate.is_none(),
-            "Still had pruning predicate: {pruning_predicate:?}"
-        );
+        let explain = rt.explain.unwrap();
 
-        // but does still has a pushdown down predicate
-        let predicate = rt.parquet_source.predicate();
-        let filter_phys = logical2physical(&filter, 
rt.parquet_exec.schema().as_ref());
-        assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string());
+        // When both matched and pruned are 0, it means that the pruning 
predicate
+        // was not used at all.

Review Comment:
   Not super intuitive (I'd prefer it was `null` or `none` or the whole 
`row_groups_matched_statistics` was just not included. But I've left a comment 
to clarify for future readers.



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1445,119 +1465,26 @@ mod tests {
         // batch1: c1(string)
         let batch1 = string_batch();
 
-        // c1 != 'bar'
-        let filter = col("c1").not_eq(lit("bar"));
+        // c1 == 'aaa', should prune via stats
+        let filter = col("c1").eq(lit("aaa"));
 
         let rt = RoundTrip::new()
             .with_predicate(filter)
             .with_pushdown_predicate()
             .round_trip(vec![batch1])
             .await;
 
-        // should have a pruning predicate
-        let pruning_predicate = rt.parquet_source.pruning_predicate();
-        assert!(pruning_predicate.is_some());
+        let explain = rt.explain.unwrap();
 
-        // convert to explain plan form
-        let display = displayable(rt.parquet_exec.as_ref())
-            .indent(true)
-            .to_string();
+        // check that there was a pruning predicate -> row groups got pruned
+        assert_contains!(&explain, "predicate=c1@0 = aaa");
 
-        assert_contains!(
-            &display,
-            "pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 != 
bar OR bar != c1_max@1)"
-        );
-
-        assert_contains!(&display, r#"predicate=c1@0 != bar"#);
+        // there's a single row group, but we can check that it matched
+        // if no pruning was done this would be 0 instead of 1
+        assert_contains!(&explain, "row_groups_matched_statistics=1");
 
-        assert_contains!(&display, "projection=[c1]");
-    }
-
-    #[tokio::test]
-    async fn parquet_exec_display_deterministic() {

Review Comment:
   I canned this whole test because it seems the the point is to check that 
`required_guarantees=` doesn't change run to run but that is no longer part of 
the output, so I don't see what this test would be testing.



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -109,47 +108,66 @@ impl FileOpener for ParquetOpener {
             .schema_adapter_factory
             .create(projected_schema, Arc::clone(&self.table_schema));
         let predicate = self.predicate.clone();
-        let pruning_predicate = self.pruning_predicate.clone();
-        let page_pruning_predicate = self.page_pruning_predicate.clone();
         let table_schema = Arc::clone(&self.table_schema);
         let reorder_predicates = self.reorder_filters;
         let pushdown_filters = self.pushdown_filters;
-        let enable_page_index = should_enable_page_index(
-            self.enable_page_index,
-            &self.page_pruning_predicate,
-        );
         let enable_bloom_filter = self.enable_bloom_filter;
+        let enable_row_group_stats_pruning = 
self.enable_row_group_stats_pruning;
         let limit = self.limit;
 
+        let predicate_creation_errors = MetricBuilder::new(&self.metrics)
+            .global_counter("num_predicate_creation_errors");
+
+        let enable_page_index = self.enable_page_index;
+
         Ok(Box::pin(async move {
-            let options = 
ArrowReaderOptions::new().with_page_index(enable_page_index);
+            // Don't load the page index yet - we will decide later if we need 
it
+            let options = ArrowReaderOptions::new().with_page_index(false);
 
             let mut metadata_timer = file_metrics.metadata_load_time.timer();
-            let metadata =
+            let mut metadata =
                 ArrowReaderMetadata::load_async(&mut reader, 
options.clone()).await?;
-            let mut schema = Arc::clone(metadata.schema());
+            // Note about schemas: we are actually dealing with **3 different 
schemas** here:
+            // - The table schema as defined by the TableProvider. This is 
what the user sees, what they get when they `SELECT * FROM table`, etc.
+            // - The "virtual" file schema: this is the table schema minus any 
hive partition columns and projections. This is what the file schema is coerced 
to.
+            // - The physical file schema: this is the schema as defined by 
the parquet file. This is what the parquet file actually contains.

Review Comment:
   I'm not actually changing anything here, just documenting the current status 
quo because it's somewhat confusing.



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -295,3 +315,84 @@ fn create_initial_plan(
     // default to scanning all row groups
     Ok(ParquetAccessPlan::new_all(row_group_count))
 }
+
+/// Build a pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a pruning
+/// predicate, return None.
+/// If there is an error creating the pruning predicate it is recorded by 
incrementing
+/// the `predicate_creation_errors` counter.
+pub fn build_pruning_predicate(
+    predicate: Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+    predicate_creation_errors: &Count,
+) -> Option<Arc<PruningPredicate>> {
+    match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
+        Ok(pruning_predicate) => {
+            if !pruning_predicate.always_true() {
+                return Some(Arc::new(pruning_predicate));
+            }
+        }
+        Err(e) => {
+            debug!("Could not create pruning predicate for: {e}");
+            predicate_creation_errors.add(1);
+        }
+    }
+    None
+}
+
+/// Build a page pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a page 
pruning
+/// predicate, return None.
+pub fn build_page_pruning_predicate(
+    predicate: &Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+) -> Arc<PagePruningAccessPlanFilter> {
+    Arc::new(PagePruningAccessPlanFilter::new(
+        predicate,
+        Arc::clone(file_schema),
+    ))
+}
+
+fn build_pruning_predicates(
+    predicate: &Option<Arc<dyn PhysicalExpr>>,
+    file_schema: &SchemaRef,
+    predicate_creation_errors: &Count,
+) -> (
+    Option<Arc<PruningPredicate>>,
+    Option<Arc<PagePruningAccessPlanFilter>>,
+) {
+    let Some(predicate) = predicate.as_ref() else {
+        return (None, None);
+    };
+    let pruning_predicate = build_pruning_predicate(
+        Arc::clone(predicate),
+        file_schema,
+        predicate_creation_errors,
+    );
+    let page_pruning_predicate = build_page_pruning_predicate(predicate, 
file_schema);
+    (pruning_predicate, Some(page_pruning_predicate))
+}
+
+async fn load_page_index<T: AsyncFileReader>(
+    arrow_reader: ArrowReaderMetadata,
+    input: &mut T,
+    options: ArrowReaderOptions,
+) -> Result<ArrowReaderMetadata> {
+    let parquet_metadata = arrow_reader.metadata();
+    let missing_column_index = parquet_metadata.column_index().is_none();
+    let missing_offset_index = parquet_metadata.offset_index().is_none();
+    if missing_column_index || missing_offset_index {

Review Comment:
   This is different than the original logic: the original logic was `if 
missing_column_index && missing_offset_index`, which is probably fine in 
practice (I think they always get loaded at the same time) in theory you need 
to re-load if either is missing.
   
   As to why do we even have this check here if we explicitly said _not_ to 
load the page index in the caller: it's possible that a custom implementation 
of `AsyncFileReader` gives you the page index even if you didn't ask for it 
(e.g. because it's cached) so it's important to check that here to avoid extra 
work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to