adriangb commented on code in PR #15561: URL: https://github.com/apache/datafusion/pull/15561#discussion_r2027429964
########## datafusion/core/src/datasource/file_format/parquet.rs: ########## @@ -67,13 +67,13 @@ pub(crate) mod test_util { .into_iter() .zip(tmp_files.into_iter()) .map(|(batch, mut output)| { - let builder = parquet::file::properties::WriterProperties::builder(); - let props = if multi_page { - builder.set_data_page_row_count_limit(ROWS_PER_PAGE) - } else { - builder + let mut builder = parquet::file::properties::WriterProperties::builder(); + if multi_page { + builder = builder.set_data_page_row_count_limit(ROWS_PER_PAGE) } - .build(); + builder = builder.set_bloom_filter_enabled(true); Review Comment: Since I'm now using the actual explain plan to check that bloom filters are used it's easiest to just enable them and check that they pruned. It's also a more realistic test in that it's what I as a user would do to check if bloom filters are working. ########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -137,71 +139,109 @@ mod tests { self.round_trip(batches).await.batches } - /// run the test, returning the `RoundTripResult` - async fn round_trip(self, batches: Vec<RecordBatch>) -> RoundTripResult { - let Self { - projection, - schema, - predicate, - pushdown_predicate, - page_index_predicate, - } = self; - - let file_schema = match schema { - Some(schema) => schema, - None => Arc::new( - Schema::try_merge( - batches.iter().map(|b| b.schema().as_ref().clone()), - ) - .unwrap(), - ), - }; - // If testing with page_index_predicate, write parquet - // files with multiple pages - let multi_page = page_index_predicate; - let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); - let file_group = meta.into_iter().map(Into::into).collect(); - + fn build_file_source(&self, file_schema: SchemaRef) -> Arc<ParquetSource> { // set up predicate (this is normally done by a layer higher up) - let predicate = predicate.map(|p| logical2physical(&p, &file_schema)); + let predicate = self + .predicate + .as_ref() + .map(|p| logical2physical(p, &file_schema)); let mut source = ParquetSource::default(); if let Some(predicate) = predicate { source = source.with_predicate(Arc::clone(&file_schema), predicate); } - if pushdown_predicate { + if self.pushdown_predicate { source = source .with_pushdown_filters(true) .with_reorder_filters(true); } - if page_index_predicate { + if self.page_index_predicate { source = source.with_enable_page_index(true); } + Arc::new(source) + } + + fn build_parquet_exec( + &self, + file_schema: SchemaRef, + file_group: FileGroup, + source: Arc<ParquetSource>, + ) -> Arc<DataSourceExec> { let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::local_filesystem(), file_schema, - Arc::new(source.clone()), + source, ) .with_file_group(file_group) - .with_projection(projection) + .with_projection(self.projection.clone()) .build(); + DataSourceExec::from_data_source(base_config) + } + + /// run the test, returning the `RoundTripResult` + async fn round_trip(&self, batches: Vec<RecordBatch>) -> RoundTripResult { + let file_schema = match &self.schema { + Some(schema) => schema, + None => &Arc::new( + Schema::try_merge( + batches.iter().map(|b| b.schema().as_ref().clone()), + ) + .unwrap(), + ), + }; + let file_schema = Arc::clone(file_schema); + // If testing with page_index_predicate, write parquet + // files with multiple pages + let multi_page = self.page_index_predicate; + let (meta, _files) = store_parquet(batches, multi_page).await.unwrap(); + let file_group: FileGroup = meta.into_iter().map(Into::into).collect(); + + // build a ParquetExec to return the results + let parquet_source = self.build_file_source(file_schema.clone()); + let parquet_exec = self.build_parquet_exec( + file_schema.clone(), + file_group.clone(), + Arc::clone(&parquet_source), + ); + + let analyze_exec = Arc::new(AnalyzeExec::new( + false, + false, + // use a new ParquetSource to avoid sharing execution metrics + self.build_parquet_exec( + file_schema.clone(), + file_group.clone(), + self.build_file_source(file_schema.clone()), + ), + Arc::new(Schema::new(vec![ + Field::new("plan_type", DataType::Utf8, true), + Field::new("plan", DataType::Utf8, true), + ])), + )); Review Comment: The reason I'm refactoring `RoundTrip` this is that I need to create the file source twice: `ParquetSource` has internal `Metrics` so if we run the query against the same `ParquetSource` twice (once for the data and once for the explain analyze plan) then we end up with duplicate metrics. Cloning it doesn't help / work because the metrics themselves are `Arc`ed. I think generally asserting against the explain plan and not a handle to the `ParquetSource` is more in line with how real world users use DataFusion to debug if the page index, stats, etc. are working / pruning. ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -295,3 +315,84 @@ fn create_initial_plan( // default to scanning all row groups Ok(ParquetAccessPlan::new_all(row_group_count)) } + +/// Build a pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a pruning +/// predicate, return None. +/// If there is an error creating the pruning predicate it is recorded by incrementing +/// the `predicate_creation_errors` counter. +pub fn build_pruning_predicate( + predicate: Arc<dyn PhysicalExpr>, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> Option<Arc<PruningPredicate>> { + match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) { + Ok(pruning_predicate) => { + if !pruning_predicate.always_true() { + return Some(Arc::new(pruning_predicate)); + } + } + Err(e) => { + debug!("Could not create pruning predicate for: {e}"); + predicate_creation_errors.add(1); + } + } + None +} + +/// Build a page pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a page pruning +/// predicate, return None. +pub fn build_page_pruning_predicate( + predicate: &Arc<dyn PhysicalExpr>, + file_schema: &SchemaRef, +) -> Arc<PagePruningAccessPlanFilter> { + Arc::new(PagePruningAccessPlanFilter::new( + predicate, + Arc::clone(file_schema), + )) +} + +fn build_pruning_predicates( + predicate: &Option<Arc<dyn PhysicalExpr>>, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> ( + Option<Arc<PruningPredicate>>, + Option<Arc<PagePruningAccessPlanFilter>>, +) { + let Some(predicate) = predicate.as_ref() else { + return (None, None); + }; + let pruning_predicate = build_pruning_predicate( + Arc::clone(predicate), + file_schema, + predicate_creation_errors, + ); + let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema); + (pruning_predicate, Some(page_pruning_predicate)) +} + +async fn load_page_index<T: AsyncFileReader>( + arrow_reader: ArrowReaderMetadata, + input: &mut T, + options: ArrowReaderOptions, +) -> Result<ArrowReaderMetadata> { + let parquet_metadata = arrow_reader.metadata(); + let missing_column_index = parquet_metadata.column_index().is_none(); + let missing_offset_index = parquet_metadata.offset_index().is_none(); + if missing_column_index || missing_offset_index { + let m = Arc::try_unwrap(Arc::clone(&parquet_metadata)) + .unwrap_or_else(|e| e.as_ref().clone()); + let mut reader = + ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); + reader.load_page_index(input).await?; + let new_parquet_metadata = reader.finish()?; + let new_arrow_reader = + ArrowReaderMetadata::try_new(Arc::new(new_parquet_metadata), options)?; + Ok(new_arrow_reader) + } else { + // No page index, return the original metadata + Ok(arrow_reader) + } +} Review Comment: I feel like this should be in arrow alongside `load_async`, but putting it here for now ########## datafusion/datasource-parquet/src/source.rs: ########## @@ -498,6 +473,7 @@ impl FileSource for ParquetSource { reorder_filters: self.reorder_filters(), enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), + enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, Review Comment: This is just for consistency with the global option: before this I don't think this option was being referenced anywhere / it was broken. Maybe it's being decided in `ListingTable`? I feel like there's a couple options that you'd think apply to `ParquetSource` but only get handled via `ListingTable`. ########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -1581,16 +1508,19 @@ mod tests { .await; // Should not contain a pruning predicate (since nothing can be pruned) - let pruning_predicate = rt.parquet_source.pruning_predicate(); - assert!( - pruning_predicate.is_none(), - "Still had pruning predicate: {pruning_predicate:?}" - ); + let explain = rt.explain.unwrap(); - // but does still has a pushdown down predicate - let predicate = rt.parquet_source.predicate(); - let filter_phys = logical2physical(&filter, rt.parquet_exec.schema().as_ref()); - assert_eq!(predicate.unwrap().to_string(), filter_phys.to_string()); + // When both matched and pruned are 0, it means that the pruning predicate + // was not used at all. Review Comment: Not super intuitive (I'd prefer it was `null` or `none` or the whole `row_groups_matched_statistics` was just not included. But I've left a comment to clarify for future readers. ########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -1445,119 +1465,26 @@ mod tests { // batch1: c1(string) let batch1 = string_batch(); - // c1 != 'bar' - let filter = col("c1").not_eq(lit("bar")); + // c1 == 'aaa', should prune via stats + let filter = col("c1").eq(lit("aaa")); let rt = RoundTrip::new() .with_predicate(filter) .with_pushdown_predicate() .round_trip(vec![batch1]) .await; - // should have a pruning predicate - let pruning_predicate = rt.parquet_source.pruning_predicate(); - assert!(pruning_predicate.is_some()); + let explain = rt.explain.unwrap(); - // convert to explain plan form - let display = displayable(rt.parquet_exec.as_ref()) - .indent(true) - .to_string(); + // check that there was a pruning predicate -> row groups got pruned + assert_contains!(&explain, "predicate=c1@0 = aaa"); - assert_contains!( - &display, - "pruning_predicate=c1_null_count@2 != row_count@3 AND (c1_min@0 != bar OR bar != c1_max@1)" - ); - - assert_contains!(&display, r#"predicate=c1@0 != bar"#); + // there's a single row group, but we can check that it matched + // if no pruning was done this would be 0 instead of 1 + assert_contains!(&explain, "row_groups_matched_statistics=1"); - assert_contains!(&display, "projection=[c1]"); - } - - #[tokio::test] - async fn parquet_exec_display_deterministic() { Review Comment: I canned this whole test because it seems the the point is to check that `required_guarantees=` doesn't change run to run but that is no longer part of the output, so I don't see what this test would be testing. ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -109,47 +108,66 @@ impl FileOpener for ParquetOpener { .schema_adapter_factory .create(projected_schema, Arc::clone(&self.table_schema)); let predicate = self.predicate.clone(); - let pruning_predicate = self.pruning_predicate.clone(); - let page_pruning_predicate = self.page_pruning_predicate.clone(); let table_schema = Arc::clone(&self.table_schema); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; - let enable_page_index = should_enable_page_index( - self.enable_page_index, - &self.page_pruning_predicate, - ); let enable_bloom_filter = self.enable_bloom_filter; + let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; + let predicate_creation_errors = MetricBuilder::new(&self.metrics) + .global_counter("num_predicate_creation_errors"); + + let enable_page_index = self.enable_page_index; + Ok(Box::pin(async move { - let options = ArrowReaderOptions::new().with_page_index(enable_page_index); + // Don't load the page index yet - we will decide later if we need it + let options = ArrowReaderOptions::new().with_page_index(false); let mut metadata_timer = file_metrics.metadata_load_time.timer(); - let metadata = + let mut metadata = ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?; - let mut schema = Arc::clone(metadata.schema()); + // Note about schemas: we are actually dealing with **3 different schemas** here: + // - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc. + // - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to. + // - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains. Review Comment: I'm not actually changing anything here, just documenting the current status quo because it's somewhat confusing. ########## datafusion/datasource-parquet/src/opener.rs: ########## @@ -295,3 +315,84 @@ fn create_initial_plan( // default to scanning all row groups Ok(ParquetAccessPlan::new_all(row_group_count)) } + +/// Build a pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a pruning +/// predicate, return None. +/// If there is an error creating the pruning predicate it is recorded by incrementing +/// the `predicate_creation_errors` counter. +pub fn build_pruning_predicate( + predicate: Arc<dyn PhysicalExpr>, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> Option<Arc<PruningPredicate>> { + match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) { + Ok(pruning_predicate) => { + if !pruning_predicate.always_true() { + return Some(Arc::new(pruning_predicate)); + } + } + Err(e) => { + debug!("Could not create pruning predicate for: {e}"); + predicate_creation_errors.add(1); + } + } + None +} + +/// Build a page pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a page pruning +/// predicate, return None. +pub fn build_page_pruning_predicate( + predicate: &Arc<dyn PhysicalExpr>, + file_schema: &SchemaRef, +) -> Arc<PagePruningAccessPlanFilter> { + Arc::new(PagePruningAccessPlanFilter::new( + predicate, + Arc::clone(file_schema), + )) +} + +fn build_pruning_predicates( + predicate: &Option<Arc<dyn PhysicalExpr>>, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> ( + Option<Arc<PruningPredicate>>, + Option<Arc<PagePruningAccessPlanFilter>>, +) { + let Some(predicate) = predicate.as_ref() else { + return (None, None); + }; + let pruning_predicate = build_pruning_predicate( + Arc::clone(predicate), + file_schema, + predicate_creation_errors, + ); + let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema); + (pruning_predicate, Some(page_pruning_predicate)) +} + +async fn load_page_index<T: AsyncFileReader>( + arrow_reader: ArrowReaderMetadata, + input: &mut T, + options: ArrowReaderOptions, +) -> Result<ArrowReaderMetadata> { + let parquet_metadata = arrow_reader.metadata(); + let missing_column_index = parquet_metadata.column_index().is_none(); + let missing_offset_index = parquet_metadata.offset_index().is_none(); + if missing_column_index || missing_offset_index { Review Comment: This is different than the original logic: the original logic was `if missing_column_index && missing_offset_index`, which is probably fine in practice (I think they always get loaded at the same time) in theory you need to re-load if either is missing. As to why do we even have this check here if we explicitly said _not_ to load the page index in the caller: it's possible that a custom implementation of `AsyncFileReader` gives you the page index even if you didn't ask for it (e.g. because it's cached) so it's important to check that here to avoid extra work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org