Dandandan commented on code in PR #20820:
URL: https://github.com/apache/datafusion/pull/20820#discussion_r2969282090
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -204,329 +316,444 @@ impl FileOpener for ParquetOpener {
.transpose()?;
}
- let reorder_predicates = self.reorder_filters;
- let pushdown_filters = self.pushdown_filters;
- let force_filter_selections = self.force_filter_selections;
- let coerce_int96 = self.coerce_int96;
- let enable_bloom_filter = self.enable_bloom_filter;
- let enable_row_group_stats_pruning =
self.enable_row_group_stats_pruning;
- let limit = self.limit;
- let parquet_file_reader_factory =
Arc::clone(&self.parquet_file_reader_factory);
- let partition_index = self.partition_index;
- let metrics = self.metrics.clone();
-
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
.global_counter("num_predicate_creation_errors");
- let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
-
- let enable_page_index = self.enable_page_index;
- #[cfg(feature = "parquet_encryption")]
- let encryption_context = self.get_encryption_context();
- let max_predicate_cache_size = self.max_predicate_cache_size;
-
- let reverse_row_groups = self.reverse_row_groups;
- let preserve_order = self.preserve_order;
-
- Ok(Box::pin(async move {
+ let file_pruner = predicate
+ .as_ref()
+ .filter(|p| is_dynamic_physical_expr(p) ||
partitioned_file.has_statistics())
+ .and_then(|p| {
+ FilePruner::try_new(
+ Arc::clone(p),
+ &logical_file_schema,
+ &partitioned_file,
+ predicate_creation_errors.clone(),
+ )
+ });
+
+ Ok(PreparedParquetOpen {
+ state: Arc::clone(self),
+ partitioned_file,
+ file_range,
+ extensions,
+ file_metrics,
+ file_pruner,
+ metadata_size_hint,
+ async_file_reader,
+ logical_file_schema,
+ output_schema,
+ projection,
+ predicate,
#[cfg(feature = "parquet_encryption")]
- let file_decryption_properties = encryption_context
- .get_file_decryption_properties(&file_location)
- .await?;
-
- // ---------------------------------------------
- // Step: try to prune the current file partition
- // ---------------------------------------------
-
- // Prune this file using the file level statistics and partition
values.
- // Since dynamic filters may have been updated since planning it
is possible that we are able
- // to prune files now that we couldn't prune at planning time.
- // It is assumed that there is no point in doing pruning here if
the predicate is not dynamic,
- // as it would have been done at planning time.
- // We'll also check this after every record batch we read,
- // and if at some point we are able to prove we can prune the file
using just the file level statistics
- // we can end the stream early.
- let mut file_pruner = predicate
- .as_ref()
- .filter(|p| {
- // Make a FilePruner only if there is either
- // 1. a dynamic expr in the predicate
- // 2. the file has file-level statistics.
- //
- // File-level statistics may prune the file without loading
- // any row groups or metadata.
- //
- // Dynamic filters may prune the file after initial
- // planning, as the dynamic filter is updated during
- // execution.
- //
- // The case where there is a dynamic filter but no
- // statistics corresponds to a dynamic filter that
- // references partition columns. While rare, this is
possible
- // e.g. `select * from table order by partition_col limit
- // 10` could hit this condition.
- is_dynamic_physical_expr(p) ||
partitioned_file.has_statistics()
- })
- .and_then(|p| {
- FilePruner::try_new(
- Arc::clone(p),
- &logical_file_schema,
- &partitioned_file,
- predicate_creation_errors.clone(),
- )
- });
-
- if let Some(file_pruner) = &mut file_pruner
- && file_pruner.should_prune()?
- {
- // Return an empty stream immediately to skip the work of
setting up the actual stream
- file_metrics.files_ranges_pruned_statistics.add_pruned(1);
- return Ok(futures::stream::empty().boxed());
- }
-
- file_metrics.files_ranges_pruned_statistics.add_matched(1);
+ file_decryption_properties: None,
+ })
+ }
+}
- // --------------------------------------------------------
- // Step: fetch Parquet metadata (and optionally page index)
- // --------------------------------------------------------
+impl PreparedParquetOpen {
+ /// CPU-only file pruning performed before metadata I/O begins.
+ ///
+ /// Returns `None` if the file was completely pruned.
+ fn prune_file(mut self) -> Result<Option<Self>> {
+ if let Some(file_pruner) = &mut self.file_pruner
+ && file_pruner.should_prune()?
+ {
+ self.file_metrics
+ .files_ranges_pruned_statistics
+ .add_pruned(1);
+ return Ok(None);
+ }
- // Don't load the page index yet. Since it is not stored inline in
- // the footer, loading the page index if it is not needed will do
- // unnecessary I/O. We decide later if it is needed to evaluate the
- // pruning predicates. Thus default to not requesting it from the
- // underlying reader.
- let mut options =
-
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
- #[cfg(feature = "parquet_encryption")]
- if let Some(fd_val) = file_decryption_properties {
- options =
options.with_file_decryption_properties(Arc::clone(&fd_val));
- }
- let mut metadata_timer = file_metrics.metadata_load_time.timer();
-
- // Begin by loading the metadata from the underlying reader (note
- // the returned metadata may actually include page indexes as some
- // readers may return page indexes even when not requested -- for
- // example when they are cached)
- let mut reader_metadata =
- ArrowReaderMetadata::load_async(&mut async_file_reader,
options.clone())
- .await?;
-
- // Note about schemas: we are actually dealing with **3 different
schemas** here:
- // - The table schema as defined by the TableProvider.
- // This is what the user sees, what they get when they `SELECT *
FROM table`, etc.
- // - The logical file schema: this is the table schema minus any
hive partition columns and projections.
- // This is what the physical file schema is coerced to.
- // - The physical file schema: this is the schema that the arrow-rs
- // parquet reader will actually produce.
- let mut physical_file_schema =
Arc::clone(reader_metadata.schema());
-
- // The schema loaded from the file may not be the same as the
- // desired schema (for example if we want to instruct the parquet
- // reader to read strings using Utf8View instead). Update if
necessary
- if let Some(merged) = apply_file_schema_type_coercions(
- &logical_file_schema,
- &physical_file_schema,
- ) {
- physical_file_schema = Arc::new(merged);
- options =
options.with_schema(Arc::clone(&physical_file_schema));
- reader_metadata = ArrowReaderMetadata::try_new(
- Arc::clone(reader_metadata.metadata()),
- options.clone(),
- )?;
- }
+ self.file_metrics
+ .files_ranges_pruned_statistics
+ .add_matched(1);
+ Ok(Some(self))
+ }
- if let Some(ref coerce) = coerce_int96
- && let Some(merged) = coerce_int96_to_resolution(
- reader_metadata.parquet_schema(),
- &physical_file_schema,
- coerce,
- )
- {
- physical_file_schema = Arc::new(merged);
- options =
options.with_schema(Arc::clone(&physical_file_schema));
- reader_metadata = ArrowReaderMetadata::try_new(
- Arc::clone(reader_metadata.metadata()),
- options.clone(),
- )?;
- }
+ /// Fetch parquet metadata once file-level pruning is complete.
+ async fn load(mut self) -> Result<MetadataLoadedParquetOpen> {
+ let options =
+
ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Skip);
+ #[cfg(feature = "parquet_encryption")]
+ let mut options = options;
+ #[cfg(feature = "parquet_encryption")]
+ if let Some(fd_val) = &self.file_decryption_properties {
+ options =
options.with_file_decryption_properties(Arc::clone(fd_val));
+ }
+ let reader_metadata = {
+ let mut metadata_timer =
self.file_metrics.metadata_load_time.timer();
+ let reader_metadata = ArrowReaderMetadata::load_async(
Review Comment:
Currently this AFAIK can do duplicated work (if not in cache)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]