adriangb commented on code in PR #17242: URL: https://github.com/apache/datafusion/pull/17242#discussion_r2301428037
########## datafusion/core/src/datasource/physical_plan/arrow_file.rs: ########## @@ -49,23 +62,37 @@ impl From<ArrowSource> for Arc<dyn FileSource> { } impl FileSource for ArrowSource { + fn config(&self) -> &FileScanConfig { + &self.config + } + + fn with_config(&self, config: FileScanConfig) -> Arc<dyn FileSource> { + let mut this = self.clone(); + this.config = config; + + Arc::new(this) + } + + fn as_data_source(&self) -> Arc<dyn DataSource> { + Arc::new(self.clone()) + } + fn create_file_opener( &self, object_store: Arc<dyn ObjectStore>, - base_config: &FileScanConfig, _partition: usize, ) -> Arc<dyn FileOpener> { Arc::new(ArrowOpener { object_store, - projection: base_config.file_column_projection_indices(), + projection: self.config().file_column_projection_indices(), Review Comment: Use `self.config` instead, seems like not point in calling the public method? ########## datafusion/core/src/datasource/physical_plan/arrow_file.rs: ########## @@ -49,23 +62,37 @@ impl From<ArrowSource> for Arc<dyn FileSource> { } impl FileSource for ArrowSource { + fn config(&self) -> &FileScanConfig { + &self.config + } + + fn with_config(&self, config: FileScanConfig) -> Arc<dyn FileSource> { + let mut this = self.clone(); + this.config = config; + + Arc::new(this) + } + + fn as_data_source(&self) -> Arc<dyn DataSource> { + Arc::new(self.clone()) + } + fn create_file_opener( &self, object_store: Arc<dyn ObjectStore>, - base_config: &FileScanConfig, _partition: usize, ) -> Arc<dyn FileOpener> { Arc::new(ArrowOpener { object_store, - projection: base_config.file_column_projection_indices(), + projection: self.config().file_column_projection_indices(), }) } fn as_any(&self) -> &dyn Any { self } - fn metrics(&self) -> &ExecutionPlanMetricsSet { + fn metrics_inner(&self) -> &ExecutionPlanMetricsSet { Review Comment: Hmm is `metrics_inner` now a method on `FileSource`? ########## datafusion/datasource-parquet/src/source.rs: ########## @@ -580,12 +616,12 @@ impl FileSource for ParquetSource { self } - fn metrics(&self) -> &ExecutionPlanMetricsSet { + fn metrics_inner(&self) -> &ExecutionPlanMetricsSet { &self.metrics } - fn file_source_statistics(&self, config: &FileScanConfig) -> Statistics { - let statistics = config.file_source_projected_statistics.clone(); + fn file_source_statistics(&self) -> Statistics { + let statistics = self.config.file_source_projected_statistics.clone(); Review Comment: Hmm also seems a bit strange that a FileSource calls into a field on FileScanConfig called `file_source...`; seems circular? ########## datafusion-examples/examples/parquet_exec_visitor.rs: ########## @@ -98,9 +98,11 @@ impl ExecutionPlanVisitor for ParquetExecVisitor { fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> { // If needed match on a specific `ExecutionPlan` node type if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() { - if let Some((file_config, _)) = + if let Some(parquet_source) = data_source_exec.downcast_to_file_source::<ParquetSource>() { + let file_config = parquet_source.config(); + self.file_groups = Some(file_config.file_groups.clone()); Review Comment: This seems like a valid usage. Only question I have is if we should make `FileScanConfig` an implementation detail and have a `file_groups()` method on DataSource. We could even have a `config()` on `FileSource` so that `impl<T: FileSource> DataSource for &T { fn file_groups(&self) -> ? { self.config().file_groups.clone() }` or something like that. ########## datafusion/datasource/src/file.rs: ########## @@ -150,3 +165,193 @@ pub trait FileSource: Send + Sync { None } } + +impl<T: FileSource + 'static> DataSource for T { + fn open( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let object_store = context + .runtime_env() + .object_store(&self.config().object_store_url)?; + + let batch_size = self + .config() + .batch_size + .unwrap_or_else(|| context.session_config().batch_size()); + + let config = FileScanConfigBuilder::from(self.config().clone()) + .with_batch_size(Some(batch_size)) + .build(); + + let source = self.with_config(config); + + let opener = source.create_file_opener(object_store, partition); + + let stream = + FileStream::new(source.config(), partition, opener, source.metrics_inner())?; + Ok(Box::pin(cooperative(stream))) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let schema = self.config().projected_schema(); + let orderings = get_projected_output_ordering(self.config(), &schema); + + write!(f, "file_groups=")?; + FileGroupsDisplay(&self.config().file_groups).fmt_as(t, f)?; + + if !schema.fields().is_empty() { + write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; + } + + if let Some(limit) = self.config().limit { + write!(f, ", limit={limit}")?; + } + + display_orderings(f, &orderings)?; + + if !self.config().constraints.is_empty() { + write!(f, ", {}", self.config().constraints)?; + } + + write!(f, ", file_type={}", self.file_type())?; + self.fmt_extra(t, f) + } + DisplayFormatType::TreeRender => { + writeln!(f, "format={}", self.file_type())?; + self.fmt_extra(t, f)?; + let num_files = self + .config() + .file_groups + .iter() + .map(|fg| fg.len()) + .sum::<usize>(); + writeln!(f, "files={num_files}")?; + Ok(()) + } + } + } + + /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size. + fn repartitioned( Review Comment: Okay I see now why the rename (name clash between the traits). I wonder if we could do something better here? Maybe even explicitly calling `FIleSource::repartitioned` will at least avoid some public API churn for all implementers of `FileSource`. ########## datafusion/datasource/src/file.rs: ########## @@ -150,3 +165,193 @@ pub trait FileSource: Send + Sync { None } } + +impl<T: FileSource + 'static> DataSource for T { + fn open( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let object_store = context + .runtime_env() + .object_store(&self.config().object_store_url)?; + + let batch_size = self + .config() + .batch_size + .unwrap_or_else(|| context.session_config().batch_size()); + + let config = FileScanConfigBuilder::from(self.config().clone()) + .with_batch_size(Some(batch_size)) + .build(); + + let source = self.with_config(config); + + let opener = source.create_file_opener(object_store, partition); + + let stream = + FileStream::new(source.config(), partition, opener, source.metrics_inner())?; + Ok(Box::pin(cooperative(stream))) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let schema = self.config().projected_schema(); + let orderings = get_projected_output_ordering(self.config(), &schema); + + write!(f, "file_groups=")?; + FileGroupsDisplay(&self.config().file_groups).fmt_as(t, f)?; + + if !schema.fields().is_empty() { + write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; + } + + if let Some(limit) = self.config().limit { + write!(f, ", limit={limit}")?; + } + + display_orderings(f, &orderings)?; + + if !self.config().constraints.is_empty() { + write!(f, ", {}", self.config().constraints)?; + } + + write!(f, ", file_type={}", self.file_type())?; + self.fmt_extra(t, f) + } + DisplayFormatType::TreeRender => { + writeln!(f, "format={}", self.file_type())?; + self.fmt_extra(t, f)?; + let num_files = self + .config() + .file_groups + .iter() + .map(|fg| fg.len()) + .sum::<usize>(); + writeln!(f, "files={num_files}")?; + Ok(()) + } + } + } + + /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size. + fn repartitioned( + &self, + target_partitions: usize, + repartition_file_min_size: usize, + output_ordering: Option<LexOrdering>, + ) -> Result<Option<Arc<dyn DataSource>>> { + let source = self.repartitioned_inner( + target_partitions, + repartition_file_min_size, + output_ordering, + )?; + + Ok(source.map(|s| s.as_data_source())) + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(self.config().file_groups.len()) + } + + fn eq_properties(&self) -> EquivalenceProperties { + let (schema, constraints, _, orderings) = + self.config().project(self.file_source_statistics()); + EquivalenceProperties::new_with_orderings(schema, orderings) + .with_constraints(constraints) + } + + fn scheduling_type(&self) -> SchedulingType { + SchedulingType::Cooperative + } + + fn data_source_statistics(&self) -> Result<Statistics> { + Ok(self.config().projected_stats(self.file_source_statistics())) + } + + fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> { + let config = FileScanConfigBuilder::from(self.config().clone()) + .with_limit(limit) + .build(); + + Some(self.with_config(config).as_data_source()) + } Review Comment: I wonder if we should invert this: ``` fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> { Some(Arc::new(FileSource::with_fetch(self, limit)) as Arc<dyn DataSource>) } ``` Where `FileSource::with_fetch` is something along the lines of: ``` self.config = self.config.with_fetch(limit) ``` Basically can we make FileScanConfig an implementation detail one level lower. ########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -185,8 +192,6 @@ mod tests { source = source.with_bloom_filter_on_read(false); } - // source.with_schema(Arc::clone(&table_schema)) Review Comment: Remove commented out code? ########## datafusion/core/src/datasource/physical_plan/arrow_file.rs: ########## @@ -49,23 +62,37 @@ impl From<ArrowSource> for Arc<dyn FileSource> { } impl FileSource for ArrowSource { + fn config(&self) -> &FileScanConfig { + &self.config + } + + fn with_config(&self, config: FileScanConfig) -> Arc<dyn FileSource> { + let mut this = self.clone(); + this.config = config; + + Arc::new(this) + } Review Comment: These are the APIs I'm not so sure about ########## datafusion/core/src/datasource/physical_plan/arrow_file.rs: ########## @@ -30,16 +30,29 @@ use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::PartitionedFile; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_datasource::source::DataSource; use futures::StreamExt; use itertools::Itertools; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; /// Arrow configuration struct that is given to DataSourceExec /// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow -#[derive(Clone, Default)] +#[derive(Clone)] pub struct ArrowSource { metrics: ExecutionPlanMetricsSet, schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, + config: FileScanConfig, +} + +impl ArrowSource { + /// Returns a [`ArrowSource`] + pub fn new(config: FileScanConfig) -> Self { + Self { + metrics: Default::default(), + schema_adapter_factory: None, + config, + } + } } Review Comment: This makes sense - all of the FileSource now need a FileScanConfig -> they can't impl Default anymmore ########## datafusion/datasource-parquet/src/source.rs: ########## @@ -603,12 +639,7 @@ impl FileSource for ParquetSource { "parquet" } - fn fmt_extra( - &self, - t: DisplayFormatType, - f: &mut Formatter, - config: &FileScanConfig, - ) -> std::fmt::Result { Review Comment: Nice, I think this is emblematic of how this PR cleans up the current APIs ########## datafusion/datasource-json/src/file_format.rs: ########## @@ -251,16 +251,16 @@ impl FileFormat for JsonFormat { async fn create_physical_plan( &self, _state: &dyn Session, - conf: FileScanConfig, + source: Arc<dyn FileSource>, ) -> Result<Arc<dyn ExecutionPlan>> { - let source = Arc::new(JsonSource::new()); - let conf = FileScanConfigBuilder::from(conf) + let conf = FileScanConfigBuilder::from(source.config().to_owned()) .with_file_compression_type(FileCompressionType::from( self.options.compression, )) - .with_source(source) .build(); - Ok(DataSourceExec::from_data_source(conf)) Review Comment: This is a bit wonky, pulling out the config, adding some stuff to it, then rebuilding it ########## datafusion/datasource/src/file.rs: ########## @@ -50,31 +65,31 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> /// * [`ParquetSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ParquetSource.html) /// /// [`DataSource`]: crate::source::DataSource -pub trait FileSource: Send + Sync { +pub trait FileSource: fmt::Debug + Send + Sync { + fn config(&self) -> &FileScanConfig; + + fn with_config(&self, config: FileScanConfig) -> Arc<dyn FileSource>; + + fn as_data_source(&self) -> Arc<dyn DataSource>; + /// Creates a `dyn FileOpener` based on given parameters fn create_file_opener( &self, object_store: Arc<dyn ObjectStore>, - base_config: &FileScanConfig, partition: usize, ) -> Arc<dyn FileOpener>; /// Any fn as_any(&self) -> &dyn Any; /// Return execution plan metrics - fn metrics(&self) -> &ExecutionPlanMetricsSet; + fn metrics_inner(&self) -> &ExecutionPlanMetricsSet; /// Return projected statistics - fn file_source_statistics(&self, config: &FileScanConfig) -> Statistics { - config.file_source_projected_statistics.clone() + fn file_source_statistics(&self) -> Statistics { + self.config().file_source_projected_statistics.clone() Review Comment: Hmm I guess this method already existed but it seems fishy to me. ########## datafusion/datasource/src/display.rs: ########## @@ -27,7 +27,7 @@ use std::fmt::{Debug, Formatter, Result as FmtResult}; /// {NUM_GROUPS groups: [[file1, file2,...], [fileN, fileM, ...], ...]} /// ``` #[derive(Debug)] -pub(crate) struct FileGroupsDisplay<'a>(pub(crate) &'a [FileGroup]); +pub struct FileGroupsDisplay<'a>(pub(crate) &'a [FileGroup]); Review Comment: Do we *need* to make this public? ########## datafusion/datasource/src/file.rs: ########## @@ -50,31 +65,31 @@ pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> /// * [`ParquetSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ParquetSource.html) /// /// [`DataSource`]: crate::source::DataSource -pub trait FileSource: Send + Sync { +pub trait FileSource: fmt::Debug + Send + Sync { + fn config(&self) -> &FileScanConfig; + + fn with_config(&self, config: FileScanConfig) -> Arc<dyn FileSource>; + + fn as_data_source(&self) -> Arc<dyn DataSource>; + /// Creates a `dyn FileOpener` based on given parameters fn create_file_opener( &self, object_store: Arc<dyn ObjectStore>, - base_config: &FileScanConfig, partition: usize, ) -> Arc<dyn FileOpener>; /// Any fn as_any(&self) -> &dyn Any; /// Return execution plan metrics - fn metrics(&self) -> &ExecutionPlanMetricsSet; + fn metrics_inner(&self) -> &ExecutionPlanMetricsSet; Review Comment: This seems like a rename. Curious why? A public method with the `_inner` suffix seems weird. ########## datafusion/datasource/src/file.rs: ########## @@ -84,13 +99,14 @@ pub trait FileSource: Send + Sync { /// /// The default implementation uses [`FileGroupPartitioner`]. See that /// struct for more details. - fn repartitioned( + fn repartitioned_inner( Review Comment: Also wondering why the rename / why the `_inner` ########## datafusion/datasource/src/file.rs: ########## @@ -150,3 +165,193 @@ pub trait FileSource: Send + Sync { None } } + +impl<T: FileSource + 'static> DataSource for T { + fn open( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let object_store = context + .runtime_env() + .object_store(&self.config().object_store_url)?; + + let batch_size = self + .config() + .batch_size + .unwrap_or_else(|| context.session_config().batch_size()); + + let config = FileScanConfigBuilder::from(self.config().clone()) + .with_batch_size(Some(batch_size)) + .build(); + + let source = self.with_config(config); + + let opener = source.create_file_opener(object_store, partition); + + let stream = + FileStream::new(source.config(), partition, opener, source.metrics_inner())?; + Ok(Box::pin(cooperative(stream))) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let schema = self.config().projected_schema(); + let orderings = get_projected_output_ordering(self.config(), &schema); + + write!(f, "file_groups=")?; + FileGroupsDisplay(&self.config().file_groups).fmt_as(t, f)?; + + if !schema.fields().is_empty() { + write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?; + } + + if let Some(limit) = self.config().limit { + write!(f, ", limit={limit}")?; + } + + display_orderings(f, &orderings)?; + + if !self.config().constraints.is_empty() { + write!(f, ", {}", self.config().constraints)?; + } + + write!(f, ", file_type={}", self.file_type())?; + self.fmt_extra(t, f) + } + DisplayFormatType::TreeRender => { + writeln!(f, "format={}", self.file_type())?; + self.fmt_extra(t, f)?; + let num_files = self + .config() + .file_groups + .iter() + .map(|fg| fg.len()) + .sum::<usize>(); + writeln!(f, "files={num_files}")?; + Ok(()) + } + } + } + + /// If supported by the underlying [`FileSource`], redistribute files across partitions according to their size. + fn repartitioned( + &self, + target_partitions: usize, + repartition_file_min_size: usize, + output_ordering: Option<LexOrdering>, + ) -> Result<Option<Arc<dyn DataSource>>> { + let source = self.repartitioned_inner( + target_partitions, + repartition_file_min_size, + output_ordering, + )?; + + Ok(source.map(|s| s.as_data_source())) + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(self.config().file_groups.len()) + } + + fn eq_properties(&self) -> EquivalenceProperties { + let (schema, constraints, _, orderings) = + self.config().project(self.file_source_statistics()); + EquivalenceProperties::new_with_orderings(schema, orderings) + .with_constraints(constraints) + } + + fn scheduling_type(&self) -> SchedulingType { + SchedulingType::Cooperative + } + + fn data_source_statistics(&self) -> Result<Statistics> { + Ok(self.config().projected_stats(self.file_source_statistics())) + } Review Comment: I'd like to do something about these methods. Should we move the projection logic from `FileScanConfig` into this method? Unpacking it it seems like we have: ``` self.config().projected_stats(self.config.file_source_projected_statistics.clone()) ``` Which seems weird ########## datafusion/core/tests/physical_optimizer/enforce_distribution.rs: ########## @@ -184,36 +185,32 @@ fn parquet_exec_multiple() -> Arc<DataSourceExec> { fn parquet_exec_multiple_sorted( output_ordering: Vec<LexOrdering>, ) -> Arc<DataSourceExec> { Review Comment: Not a problem for today but again the fact that optimizer rules have to special case execution node implementations via downcasting is IMO a smell ########## datafusion/datasource/src/source.rs: ########## @@ -179,6 +179,10 @@ pub trait DataSource: Send + Sync + Debug { vec![PushedDown::No; filters.len()], )) } + + fn as_file_source(&self) -> Option<Arc<dyn FileSource>> { + None + } Review Comment: Wondering / questioning this method ########## datafusion/datasource/src/source.rs: ########## @@ -465,18 +468,9 @@ impl DataSourceExec { /// /// Returns `None` if /// 1. the datasource is not scanning files (`FileScanConfig`) - /// 2. The [`FileScanConfig::file_source`] is not of type `T` - pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> { - self.data_source() - .as_any() - .downcast_ref::<FileScanConfig>() - .and_then(|file_scan_conf| { - file_scan_conf - .file_source() - .as_any() - .downcast_ref::<T>() - .map(|source| (file_scan_conf, source)) - }) + /// 2. the file source is not of type `T` + pub fn downcast_to_file_source<T: FileSource + 'static>(&self) -> Option<&T> { + self.data_source().as_any().downcast_ref::<T>() } Review Comment: Same about this method, wonder where it is used. It's a nice convenience but to me this sort of downcasting is always a smell of missing design on the trait / leaky abstraction ########## datafusion/datasource/src/source.rs: ########## @@ -291,10 +295,9 @@ impl ExecutionPlan for DataSourceExec { fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> { if let Some(partition) = partition { let mut statistics = Statistics::new_unknown(&self.schema()); - if let Some(file_config) = - self.data_source.as_any().downcast_ref::<FileScanConfig>() - { - if let Some(file_group) = file_config.file_groups.get(partition) { + if let Some(file_source) = self.data_source.as_file_source() { + if let Some(file_group) = file_source.config().file_groups.get(partition) + { Review Comment: I guess the point is that not all data sources are file sources, and for the ones that aren't partitioning doesn't necessarily apply. Can't we make this a method on the `DataSource` trait so that we can implement it as `Ok(Statistics::new_unknown())` for `MemoryDataSource` and copy this code into `impl <T: FileScan> DataSource for &T`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org