alamb commented on code in PR #14224: URL: https://github.com/apache/datafusion/pull/14224#discussion_r1927115530
########## datafusion/core/src/datasource/data_source.rs: ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! DataSource and FileSource trait implementations + +use std::any::Any; +use std::fmt; +use std::fmt::Formatter; +use std::sync::Arc; + +use crate::datasource::listing::PartitionedFile; +use crate::datasource::physical_plan::{ + FileGroupPartitioner, FileOpener, FileScanConfig, FileStream, +}; + +use arrow_schema::SchemaRef; +use datafusion_common::Statistics; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::source::{DataSource, DataSourceExec}; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; + +use object_store::ObjectStore; + +/// Common behaviors that every `FileSourceConfig` needs to implement. +pub trait FileSource: Send + Sync { + /// Creates a `dyn FileOpener` based on given parameters + fn create_file_opener( + &self, + object_store: datafusion_common::Result<Arc<dyn ObjectStore>>, + base_config: &FileScanConfig, + partition: usize, + ) -> datafusion_common::Result<Arc<dyn FileOpener>>; + /// Any + fn as_any(&self) -> &dyn Any; + /// Initialize new type with batch size configuration + fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>; + /// Initialize new instance with a new schema + fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>; + /// Initialize new instance with projection information + fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>; + /// Initialize new instance with projected statistics + fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>; + /// Return execution plan metrics + fn metrics(&self) -> &ExecutionPlanMetricsSet; + /// Return projected statistics + fn statistics(&self) -> datafusion_common::Result<Statistics>; + /// Returns the file type such as Arrow, Avro, Parquet, ... + fn file_type(&self) -> FileType; + /// Format FileType specific information + fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result { + Ok(()) + } +} + +/// Determines file types +pub enum FileType { + /// Arrow File + Arrow, + /// Avro File + Avro, + /// CSV File + Csv, + /// JSON File + Json, + /// Parquet File + Parquet, +} + +impl FileType { + fn to_str(&self) -> &str { + match self { + FileType::Arrow => "arrow", + FileType::Avro => "avro", + FileType::Csv => "csv", + FileType::Json => "json", + FileType::Parquet => "parquet", + } + } + + /// Is the file type avro? + pub fn is_avro(&self) -> bool { + matches!(self, FileType::Avro) + } + + /// Is the file type csv? + pub fn is_csv(&self) -> bool { + matches!(self, FileType::Csv) + } + + /// Is the file type parquet? + pub fn is_parquet(&self) -> bool { + matches!(self, FileType::Parquet) + } +} + +/// Holds generic file configuration, and common behaviors for file sources. +/// Can be initialized with a `FileScanConfig` +/// and a `dyn FileSource` type such as `CsvConfig`, `ParquetConfig`, `AvroConfig`, etc. +#[derive(Clone)] +pub struct FileSourceConfig { Review Comment: What is the reason for a separate `FileSourceConfig` (aka why not add a ` source: Arc<dyn FileSource>,` field to the `FileScanConfig` itself? ########## datafusion/core/src/datasource/physical_plan/arrow_file.rs: ########## @@ -223,6 +226,65 @@ impl ExecutionPlan for ArrowExec { } } +/// Arrow configuration struct that is given to DataSourceExec +/// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow +#[derive(Clone, Default)] +pub struct ArrowConfig { Review Comment: Since this implements `FileSource` I would personally suggest a name like `ArrowSource` (and `ParquetSource`, `AvroSource`, etc) for consistency In my mind this is not really "config" it is the runtime state needed to read arrow files. But I think I could be conviced that Config is ok too. ########## datafusion/core/src/datasource/data_source.rs: ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! DataSource and FileSource trait implementations + +use std::any::Any; +use std::fmt; +use std::fmt::Formatter; +use std::sync::Arc; + +use crate::datasource::listing::PartitionedFile; +use crate::datasource::physical_plan::{ + FileGroupPartitioner, FileOpener, FileScanConfig, FileStream, +}; + +use arrow_schema::SchemaRef; +use datafusion_common::Statistics; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::source::{DataSource, DataSourceExec}; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; + +use object_store::ObjectStore; + +/// Common behaviors that every `FileSourceConfig` needs to implement. +pub trait FileSource: Send + Sync { + /// Creates a `dyn FileOpener` based on given parameters + fn create_file_opener( + &self, + object_store: datafusion_common::Result<Arc<dyn ObjectStore>>, + base_config: &FileScanConfig, + partition: usize, + ) -> datafusion_common::Result<Arc<dyn FileOpener>>; + /// Any + fn as_any(&self) -> &dyn Any; + /// Initialize new type with batch size configuration + fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>; + /// Initialize new instance with a new schema + fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>; + /// Initialize new instance with projection information + fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>; + /// Initialize new instance with projected statistics + fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>; + /// Return execution plan metrics + fn metrics(&self) -> &ExecutionPlanMetricsSet; + /// Return projected statistics + fn statistics(&self) -> datafusion_common::Result<Statistics>; + /// Returns the file type such as Arrow, Avro, Parquet, ... + fn file_type(&self) -> FileType; + /// Format FileType specific information + fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result { + Ok(()) + } +} + +/// Determines file types +pub enum FileType { + /// Arrow File + Arrow, + /// Avro File + Avro, + /// CSV File + Csv, + /// JSON File + Json, + /// Parquet File + Parquet, +} + +impl FileType { + fn to_str(&self) -> &str { + match self { + FileType::Arrow => "arrow", + FileType::Avro => "avro", + FileType::Csv => "csv", + FileType::Json => "json", + FileType::Parquet => "parquet", + } + } + + /// Is the file type avro? + pub fn is_avro(&self) -> bool { + matches!(self, FileType::Avro) + } + + /// Is the file type csv? + pub fn is_csv(&self) -> bool { + matches!(self, FileType::Csv) + } + + /// Is the file type parquet? + pub fn is_parquet(&self) -> bool { + matches!(self, FileType::Parquet) + } +} + +/// Holds generic file configuration, and common behaviors for file sources. +/// Can be initialized with a `FileScanConfig` +/// and a `dyn FileSource` type such as `CsvConfig`, `ParquetConfig`, `AvroConfig`, etc. +#[derive(Clone)] +pub struct FileSourceConfig { + source: Arc<dyn FileSource>, + base_config: FileScanConfig, +} + +impl FileSourceConfig { + // TODO: This function should be moved into DataSourceExec once FileScanConfig and FileSourceConfig moved out of datafusion/core Review Comment: Since they are both `DataSourceExec ` and `FileSourceConfig` are in `core` it is not clear to me why you can't do this already ########## datafusion/core/src/datasource/data_source.rs: ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! DataSource and FileSource trait implementations + +use std::any::Any; +use std::fmt; +use std::fmt::Formatter; +use std::sync::Arc; + +use crate::datasource::listing::PartitionedFile; +use crate::datasource::physical_plan::{ + FileGroupPartitioner, FileOpener, FileScanConfig, FileStream, +}; + +use arrow_schema::SchemaRef; +use datafusion_common::Statistics; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::source::{DataSource, DataSourceExec}; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; + +use object_store::ObjectStore; + +/// Common behaviors that every `FileSourceConfig` needs to implement. +pub trait FileSource: Send + Sync { + /// Creates a `dyn FileOpener` based on given parameters + fn create_file_opener( + &self, + object_store: datafusion_common::Result<Arc<dyn ObjectStore>>, + base_config: &FileScanConfig, + partition: usize, + ) -> datafusion_common::Result<Arc<dyn FileOpener>>; + /// Any + fn as_any(&self) -> &dyn Any; + /// Initialize new type with batch size configuration + fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>; + /// Initialize new instance with a new schema + fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>; + /// Initialize new instance with projection information + fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>; + /// Initialize new instance with projected statistics + fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>; + /// Return execution plan metrics + fn metrics(&self) -> &ExecutionPlanMetricsSet; + /// Return projected statistics + fn statistics(&self) -> datafusion_common::Result<Statistics>; + /// Returns the file type such as Arrow, Avro, Parquet, ... + fn file_type(&self) -> FileType; + /// Format FileType specific information + fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result { + Ok(()) + } +} + +/// Determines file types Review Comment: Is there any fundamental reason the code has to use a hard coded enum (e.g. could it used a `dyn Trait` for this instead)? I am thinking of two usecases: 1. Allowing users to reuse the DataSource with their own file formats if they wanted 2. Break the implementations of the different data sources into their own crates in the future (something I have been dreaming about for a long time to improve compile speed) using an enum like this will make it harder ########## datafusion/core/src/datasource/physical_plan/file_scan_config.rs: ########## @@ -156,6 +161,8 @@ impl FileScanConfig { limit: None, table_partition_cols: vec![], output_ordering: vec![], + file_compression_type: FileCompressionType::UNCOMPRESSED, + new_lines_in_values: false, Review Comment: Why are these added to FileScanConfig? Is it because they are common for JSON and CSV or something? I ask because I don't think they are relevant for AVRO and Parquet ########## datafusion/core/src/datasource/physical_plan/avro.rs: ########## @@ -18,24 +18,31 @@ //! Execution plan for reading line-delimited Avro files use std::any::Any; +use std::fmt::Formatter; use std::sync::Arc; -use super::FileScanConfig; +use super::{FileOpener, FileScanConfig}; +#[cfg(feature = "avro")] +use crate::datasource::avro_to_arrow::Reader as AvroReader; +use crate::datasource::data_source::{FileSource, FileType}; use crate::error::Result; -use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; -use crate::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, - SendableRecordBatchStream, Statistics, -}; use arrow::datatypes::SchemaRef; -use datafusion_common::Constraints; -use datafusion_execution::TaskContext; -use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; +use datafusion_common::{Constraints, Statistics}; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, +}; + +use object_store::ObjectStore; /// Execution plan for scanning Avro data source #[derive(Debug, Clone)] +#[deprecated(since = "46.0.0", note = "use DataSourceExec instead")] Review Comment: Thank you for deprecating this. In order to avoid having to maintain 2 copies of the code (and avoid having them drift out of sync), I think we should consider leaving the interface the same but changing the internal implementation. So that would be something like: ```rust pub struct AvroExec { inner: DataSourceExec, // wraps a DataSourceExec } ``` ########## datafusion/core/src/datasource/data_source.rs: ########## @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! DataSource and FileSource trait implementations + +use std::any::Any; +use std::fmt; +use std::fmt::Formatter; +use std::sync::Arc; + +use crate::datasource::listing::PartitionedFile; +use crate::datasource::physical_plan::{ + FileGroupPartitioner, FileOpener, FileScanConfig, FileStream, +}; + +use arrow_schema::SchemaRef; +use datafusion_common::Statistics; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::source::{DataSource, DataSourceExec}; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; + +use object_store::ObjectStore; + +/// Common behaviors that every `FileSourceConfig` needs to implement. +pub trait FileSource: Send + Sync { + /// Creates a `dyn FileOpener` based on given parameters + fn create_file_opener( + &self, + object_store: datafusion_common::Result<Arc<dyn ObjectStore>>, + base_config: &FileScanConfig, + partition: usize, + ) -> datafusion_common::Result<Arc<dyn FileOpener>>; + /// Any + fn as_any(&self) -> &dyn Any; + /// Initialize new type with batch size configuration + fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>; + /// Initialize new instance with a new schema + fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>; + /// Initialize new instance with projection information + fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>; + /// Initialize new instance with projected statistics + fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>; + /// Return execution plan metrics + fn metrics(&self) -> &ExecutionPlanMetricsSet; + /// Return projected statistics + fn statistics(&self) -> datafusion_common::Result<Statistics>; + /// Returns the file type such as Arrow, Avro, Parquet, ... + fn file_type(&self) -> FileType; + /// Format FileType specific information + fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result { + Ok(()) + } +} + +/// Determines file types +pub enum FileType { + /// Arrow File + Arrow, + /// Avro File + Avro, + /// CSV File + Csv, + /// JSON File + Json, + /// Parquet File + Parquet, +} + +impl FileType { + fn to_str(&self) -> &str { + match self { + FileType::Arrow => "arrow", + FileType::Avro => "avro", + FileType::Csv => "csv", + FileType::Json => "json", + FileType::Parquet => "parquet", + } + } + + /// Is the file type avro? + pub fn is_avro(&self) -> bool { + matches!(self, FileType::Avro) + } + + /// Is the file type csv? + pub fn is_csv(&self) -> bool { + matches!(self, FileType::Csv) + } + + /// Is the file type parquet? + pub fn is_parquet(&self) -> bool { + matches!(self, FileType::Parquet) + } +} + +/// Holds generic file configuration, and common behaviors for file sources. +/// Can be initialized with a `FileScanConfig` +/// and a `dyn FileSource` type such as `CsvConfig`, `ParquetConfig`, `AvroConfig`, etc. +#[derive(Clone)] +pub struct FileSourceConfig { + source: Arc<dyn FileSource>, + base_config: FileScanConfig, +} + +impl FileSourceConfig { + // TODO: This function should be moved into DataSourceExec once FileScanConfig and FileSourceConfig moved out of datafusion/core + /// Returns a new [`DataSourceExec`] from file configurations + pub fn new_exec( + base_config: FileScanConfig, + file_source: Arc<dyn FileSource>, + ) -> Arc<DataSourceExec> { + let source = Arc::new(Self::new(base_config, file_source)); + Arc::new(DataSourceExec::new(source)) + } + + /// Initialize a new `FileSourceConfig` instance. + pub fn new(base_config: FileScanConfig, file_source: Arc<dyn FileSource>) -> Self { + let ( + _projected_schema, + _constraints, + projected_statistics, + _projected_output_ordering, + ) = base_config.project(); + let file_source = file_source.with_statistics(projected_statistics); + + Self { + source: file_source, + base_config, + } + } + + /// Write the data_type based on file_source + fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + write!(f, ", file_type={}", self.source.file_type().to_str())?; + self.source.fmt_extra(t, f) + } + + /// Returns the base_config + pub fn base_config(&self) -> &FileScanConfig { + &self.base_config + } + + /// Returns the file_source + pub fn file_source(&self) -> &Arc<dyn FileSource> { + &self.source + } + + fn with_file_groups(mut self, file_groups: Vec<Vec<PartitionedFile>>) -> Self { + self.base_config.file_groups = file_groups; + self + } + + fn supports_repartition(&self) -> bool { + !(self.base_config.file_compression_type.is_compressed() + || self.base_config.new_lines_in_values + || self.source.file_type().is_avro()) + } +} + +impl DataSource for FileSourceConfig { Review Comment: I don't understand the usecase for both `FileSource` and `DataSource` If the idea is to encapsulate all the format specific behaviors in `FileSource` and then `DataSourceExec` implement the common functionality of `ExecutionPLan` what is the usecase for splitting FileSource and DataSource 🤔 ########## datafusion/core/src/datasource/physical_plan/parquet/mod.rs: ########## @@ -532,10 +342,648 @@ impl ParquetExec { schema_adapter_factory, } } + /// [`FileScanConfig`] that controls this scan (such as which files to read) + pub fn base_config(&self) -> &FileScanConfig { + &self.base_config + } + /// Options passed to the parquet reader for this scan + pub fn table_parquet_options(&self) -> &TableParquetOptions { + &self.table_parquet_options + } + /// Optional predicate. + pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>> { + self.predicate.as_ref() + } + /// Optional reference to this parquet scan's pruning predicate + pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> { + self.pruning_predicate.as_ref() + } + /// return the optional file reader factory + pub fn parquet_file_reader_factory( + &self, + ) -> Option<&Arc<dyn ParquetFileReaderFactory>> { + self.parquet_file_reader_factory.as_ref() + } + /// Optional user defined parquet file reader factory. + pub fn with_parquet_file_reader_factory( + mut self, + parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>, + ) -> Self { + self.parquet_file_reader_factory = Some(parquet_file_reader_factory); + self + } + /// return the optional schema adapter factory + pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> { + self.schema_adapter_factory.as_ref() + } + /// Set optional schema adapter factory. + /// + /// [`SchemaAdapterFactory`] allows user to specify how fields from the + /// parquet file get mapped to that of the table schema. The default schema + /// adapter uses arrow's cast library to map the parquet fields to the table + /// schema. + pub fn with_schema_adapter_factory( + mut self, + schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, + ) -> Self { + self.schema_adapter_factory = Some(schema_adapter_factory); + self + } + /// If true, the predicate will be used during the parquet scan. + /// Defaults to false + /// + /// [`Expr`]: datafusion_expr::Expr + pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self { + self.table_parquet_options.global.pushdown_filters = pushdown_filters; + self + } + + /// Return the value described in [`Self::with_pushdown_filters`] + fn pushdown_filters(&self) -> bool { + self.table_parquet_options.global.pushdown_filters + } + /// If true, the `RowFilter` made by `pushdown_filters` may try to + /// minimize the cost of filter evaluation by reordering the + /// predicate [`Expr`]s. If false, the predicates are applied in + /// the same order as specified in the query. Defaults to false. + /// + /// [`Expr`]: datafusion_expr::Expr + pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self { + self.table_parquet_options.global.reorder_filters = reorder_filters; + self + } + /// Return the value described in [`Self::with_reorder_filters`] + fn reorder_filters(&self) -> bool { + self.table_parquet_options.global.reorder_filters + } + /// If enabled, the reader will read the page index + /// This is used to optimize filter pushdown + /// via `RowSelector` and `RowFilter` by + /// eliminating unnecessary IO and decoding + fn bloom_filter_on_read(&self) -> bool { + self.table_parquet_options.global.bloom_filter_on_read + } + /// Return the value described in [`ParquetConfig::with_enable_page_index`] + fn enable_page_index(&self) -> bool { + self.table_parquet_options.global.enable_page_index + } + + fn output_partitioning_helper(file_config: &FileScanConfig) -> Partitioning { + Partitioning::UnknownPartitioning(file_config.file_groups.len()) + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties( + schema: SchemaRef, + orderings: &[LexOrdering], + constraints: Constraints, + file_config: &FileScanConfig, + ) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new_with_orderings(schema, orderings) + .with_constraints(constraints), + Self::output_partitioning_helper(file_config), // Output Partitioning + EmissionType::Incremental, + Boundedness::Bounded, + ) + } + + /// Updates the file groups to read and recalculates the output partitioning + /// + /// Note this function does not update statistics or other properties + /// that depend on the file groups. + fn with_file_groups_and_update_partitioning( + mut self, + file_groups: Vec<Vec<PartitionedFile>>, + ) -> Self { + self.base_config.file_groups = file_groups; + // Changing file groups may invalidate output partitioning. Update it also + let output_partitioning = Self::output_partitioning_helper(&self.base_config); + self.cache = self.cache.with_partitioning(output_partitioning); + self + } +} + +#[allow(unused, deprecated)] +impl DisplayAs for ParquetExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let predicate_string = self + .predicate + .as_ref() + .map(|p| format!(", predicate={p}")) + .unwrap_or_default(); + + let pruning_predicate_string = self + .pruning_predicate + .as_ref() + .map(|pre| { + let mut guarantees = pre + .literal_guarantees() + .iter() + .map(|item| format!("{}", item)) + .collect_vec(); + guarantees.sort(); + format!( + ", pruning_predicate={}, required_guarantees=[{}]", + pre.predicate_expr(), + guarantees.join(", ") + ) + }) + .unwrap_or_default(); + + write!(f, "ParquetExec: ")?; + self.base_config.fmt_as(t, f)?; + write!(f, "{}{}", predicate_string, pruning_predicate_string,) + } + } + } +} + +#[allow(unused, deprecated)] +impl ExecutionPlan for ParquetExec { + fn name(&self) -> &'static str { + "ParquetExec" + } + + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + // this is a leaf node and has no children + vec![] + } + + fn with_new_children( + self: Arc<Self>, + _: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(self) + } + + /// Redistribute files across partitions according to their size + /// See comments on [`FileGroupPartitioner`] for more detail. + fn repartitioned( + &self, + target_partitions: usize, + config: &ConfigOptions, + ) -> Result<Option<Arc<dyn ExecutionPlan>>> { + let repartition_file_min_size = config.optimizer.repartition_file_min_size; + let repartitioned_file_groups_option = FileGroupPartitioner::new() + .with_target_partitions(target_partitions) + .with_repartition_file_min_size(repartition_file_min_size) + .with_preserve_order_within_groups( + self.properties().output_ordering().is_some(), + ) + .repartition_file_groups(&self.base_config.file_groups); + + let mut new_plan = self.clone(); + if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { + new_plan = new_plan + .with_file_groups_and_update_partitioning(repartitioned_file_groups); + } + Ok(Some(Arc::new(new_plan))) + } + + fn execute( + &self, + partition_index: usize, + ctx: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let projection = self + .base_config + .file_column_projection_indices() + .unwrap_or_else(|| { + (0..self.base_config.file_schema.fields().len()).collect() + }); + let parquet_file_reader_factory = self + .parquet_file_reader_factory + .as_ref() + .map(|f| Ok(Arc::clone(f))) + .unwrap_or_else(|| { + ctx.runtime_env() + .object_store(&self.base_config.object_store_url) + .map(|store| { + Arc::new(DefaultParquetFileReaderFactory::new(store)) as _ + }) + })?; + + let schema_adapter_factory = self + .schema_adapter_factory + .clone() + .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); + + let opener = ParquetOpener { + partition_index, + projection: Arc::from(projection), + batch_size: ctx.session_config().batch_size(), + limit: self.base_config.limit, + predicate: self.predicate.clone(), + pruning_predicate: self.pruning_predicate.clone(), + page_pruning_predicate: self.page_pruning_predicate.clone(), + table_schema: Arc::clone(&self.base_config.file_schema), + metadata_size_hint: self.metadata_size_hint, + metrics: self.metrics.clone(), + parquet_file_reader_factory, + pushdown_filters: self.pushdown_filters(), + reorder_filters: self.reorder_filters(), + enable_page_index: self.enable_page_index(), + enable_bloom_filter: self.bloom_filter_on_read(), + schema_adapter_factory, + }; + let stream = FileStream::new( + &self.base_config, + partition_index, + Arc::new(opener), + &self.metrics, + )?; + Ok(Box::pin(stream)) + } + fn metrics(&self) -> Option<MetricsSet> { + Some(self.metrics.clone_inner()) + } + fn statistics(&self) -> Result<Statistics> { + // When filters are pushed down, we have no way of knowing the exact statistics. + // Note that pruning predicate is also a kind of filter pushdown. + // (bloom filters use `pruning_predicate` too) + let stats = if self.pruning_predicate.is_some() + || self.page_pruning_predicate.is_some() + || (self.predicate.is_some() && self.pushdown_filters()) + { + self.projected_statistics.clone().to_inexact() + } else { + self.projected_statistics.clone() + }; + Ok(stats) + } + fn fetch(&self) -> Option<usize> { + self.base_config.limit + } + + fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> { + let new_config = self.base_config.clone().with_limit(limit); + Some(Arc::new(Self { + base_config: new_config, + projected_statistics: self.projected_statistics.clone(), + metrics: self.metrics.clone(), + predicate: self.predicate.clone(), + pruning_predicate: self.pruning_predicate.clone(), + page_pruning_predicate: self.page_pruning_predicate.clone(), + metadata_size_hint: self.metadata_size_hint, + parquet_file_reader_factory: self.parquet_file_reader_factory.clone(), + cache: self.cache.clone(), + table_parquet_options: self.table_parquet_options.clone(), + schema_adapter_factory: self.schema_adapter_factory.clone(), + })) + } +} + +/// Execution plan for reading one or more Parquet files. +/// +/// ```text +/// ▲ +/// │ +/// │ Produce a stream of +/// │ RecordBatches +/// │ +/// ┌───────────────────────┐ +/// │ │ +/// │ DataSourceExec │ +/// │ │ +/// └───────────────────────┘ +/// ▲ +/// │ Asynchronously read from one +/// │ or more parquet files via +/// │ ObjectStore interface +/// │ +/// │ +/// .───────────────────. +/// │ ) +/// │`───────────────────'│ +/// │ ObjectStore │ +/// │.───────────────────.│ +/// │ ) +/// `───────────────────' +/// +/// ``` +/// +/// # Example: Create a `DataSourceExec` +/// ``` +/// # use std::sync::Arc; +/// # use arrow::datatypes::Schema; +/// # use datafusion::datasource::data_source::FileSourceConfig; +/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetConfig}; +/// # use datafusion::datasource::listing::PartitionedFile; +/// # use datafusion_execution::object_store::ObjectStoreUrl; +/// # use datafusion_physical_expr::expressions::lit; +/// # use datafusion_physical_plan::source::DataSourceExec; +/// # use datafusion_common::config::TableParquetOptions; +/// +/// # let file_schema = Arc::new(Schema::empty()); +/// # let object_store_url = ObjectStoreUrl::local_filesystem(); +/// # let predicate = lit(true); +/// // Create a DataSourceExec for reading `file1.parquet` with a file size of 100MB +/// let file_scan_config = FileScanConfig::new(object_store_url, file_schema) +/// .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)); +/// let source_config = Arc::new( +/// ParquetConfig::new( +/// Arc::clone(&file_scan_config.file_schema), +/// Some(predicate), +/// None, +/// TableParquetOptions::default() +/// ) +/// ); +/// let exec = FileSourceConfig::new_exec(file_scan_config, source_config); +/// ``` +/// +/// # Features +/// +/// Supports the following optimizations: +/// +/// * Concurrent reads: reads from one or more files in parallel as multiple +/// partitions, including concurrently reading multiple row groups from a single +/// file. +/// +/// * Predicate push down: skips row groups, pages, rows based on metadata +/// and late materialization. See "Predicate Pushdown" below. +/// +/// * Projection pushdown: reads and decodes only the columns required. +/// +/// * Limit pushdown: stop execution early after some number of rows are read. +/// +/// * Custom readers: customize reading parquet files, e.g. to cache metadata, +/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more +/// details. +/// +/// * Schema evolution: read parquet files with different schemas into a unified +/// table schema. See [`SchemaAdapterFactory`] for more details. +/// +/// * metadata_size_hint: controls the number of bytes read from the end of the +/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a +/// custom reader is used, it supplies the metadata directly and this parameter +/// is ignored. [`ParquetConfig::with_metadata_size_hint`] for more details. +/// +/// * User provided [`ParquetAccessPlan`]s to skip row groups and/or pages +/// based on external information. See "Implementing External Indexes" below +/// +/// # Predicate Pushdown +/// +/// `DataSourceExec` uses the provided [`PhysicalExpr`] predicate as a filter to +/// skip reading unnecessary data and improve query performance using several techniques: +/// +/// * Row group pruning: skips entire row groups based on min/max statistics +/// found in [`ParquetMetaData`] and any Bloom filters that are present. +/// +/// * Page pruning: skips individual pages within a ColumnChunk using the +/// [Parquet PageIndex], if present. +/// +/// * Row filtering: skips rows within a page using a form of late +/// materialization. When possible, predicates are applied by the parquet +/// decoder *during* decode (see [`ArrowPredicate`] and [`RowFilter`] for more +/// details). This is only enabled if `ParquetScanOptions::pushdown_filters` is set to true. +/// +/// Note: If the predicate can not be used to accelerate the scan, it is ignored +/// (no error is raised on predicate evaluation errors). +/// +/// [`ArrowPredicate`]: parquet::arrow::arrow_reader::ArrowPredicate +/// [`RowFilter`]: parquet::arrow::arrow_reader::RowFilter +/// [Parquet PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// +/// # Example: rewriting `DataSourceExec` +/// +/// You can modify a `DataSourceExec` using [`ParquetConfig`], for example +/// to change files or add a predicate. +/// +/// ```no_run +/// # use std::sync::Arc; +/// # use arrow::datatypes::Schema; +/// # use datafusion::datasource::data_source::FileSourceConfig; +/// # use datafusion::datasource::physical_plan::{FileScanConfig}; +/// # use datafusion::datasource::listing::PartitionedFile; +/// # use datafusion_physical_plan::source::DataSourceExec; +/// +/// # fn parquet_exec() -> DataSourceExec { unimplemented!() } +/// // Split a single DataSourceExec into multiple DataSourceExecs, one for each file +/// let source = parquet_exec().source(); +/// let data_source = source.as_any().downcast_ref::<FileSourceConfig>().unwrap(); +/// let base_config = data_source.base_config(); +/// let file_source = data_source.file_source(); +/// let existing_file_groups = &base_config.file_groups; +/// let new_execs = existing_file_groups +/// .iter() +/// .map(|file_group| { +/// // create a new exec by copying the existing exec's source config +/// let new_config = base_config +/// .clone() +/// .with_file_groups(vec![file_group.clone()]); +/// +/// FileSourceConfig::new_exec(new_config, file_source.clone()) +/// }) +/// .collect::<Vec<_>>(); +/// ``` +/// +/// # Implementing External Indexes +/// +/// It is possible to restrict the row groups and selections within those row +/// groups that the DataSourceExec will consider by providing an initial +/// [`ParquetAccessPlan`] as `extensions` on `PartitionedFile`. This can be +/// used to implement external indexes on top of parquet files and select only +/// portions of the files. +/// +/// The `DataSourceExec` will try and reduce any provided `ParquetAccessPlan` +/// further based on the contents of `ParquetMetadata` and other settings. +/// +/// ## Example of providing a ParquetAccessPlan +/// +/// ``` +/// # use std::sync::Arc; +/// # use arrow_schema::{Schema, SchemaRef}; +/// # use datafusion::datasource::listing::PartitionedFile; +/// # use datafusion::datasource::data_source::FileSourceConfig; +/// # use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan; +/// # use datafusion::datasource::physical_plan::{FileScanConfig, ParquetConfig}; +/// # use datafusion_execution::object_store::ObjectStoreUrl; +/// # use datafusion_physical_plan::source::DataSourceExec; +/// +/// # fn schema() -> SchemaRef { +/// # Arc::new(Schema::empty()) +/// # } +/// // create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4 +/// let mut access_plan = ParquetAccessPlan::new_all(5); +/// access_plan.skip(2); +/// access_plan.skip(4); +/// // provide the plan as extension to the FileScanConfig +/// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234) +/// .with_extensions(Arc::new(access_plan)); +/// // create a FileScanConfig to scan this file +/// let file_scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema()) +/// .with_file(partitioned_file); +/// // create a ParguetConfig for file opener configurations +/// let source_config = Arc::new(ParquetConfig::default()); +/// // this parquet DataSourceExec will not even try to read row groups 2 and 4. Additional +/// // pruning based on predicates may also happen +/// let exec = FileSourceConfig::new_exec(file_scan_config, source_config); +/// ``` +/// +/// For a complete example, see the [`advanced_parquet_index` example]). +/// +/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/advanced_parquet_index.rs +/// +/// # Execution Overview +/// +/// * Step 1: `DataSourceExec::execute` is called, returning a `FileStream` +/// configured to open parquet files with a `ParquetOpener`. +/// +/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open +/// the file. +/// +/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata) +/// via [`ParquetFileReaderFactory`], creating a [`ParquetAccessPlan`] by +/// applying predicates to metadata. The plan and projections are used to +/// determine what pages must be read. +/// +/// * Step 4: The stream begins reading data, fetching the required parquet +/// pages incrementally decoding them, and applying any row filters (see +/// [`Self::with_pushdown_filters`]). +/// +/// * Step 5: As each [`RecordBatch`] is read, it may be adapted by a +/// [`SchemaAdapter`] to match the table schema. By default missing columns are +/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`]. +/// +/// [`RecordBatch`]: arrow::record_batch::RecordBatch +/// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter +/// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData +#[derive(Clone, Default, Debug)] Review Comment: if we are moving lots of code anyway, maybe this struct could go in its own module to break up the structure more -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org