alamb commented on code in PR #18627:
URL: https://github.com/apache/datafusion/pull/18627#discussion_r2561304187
##########
datafusion/core/tests/physical_optimizer/projection_pushdown.rs:
##########
@@ -703,10 +705,7 @@ fn test_projection_after_projection() -> Result<()> {
assert_snapshot!(
actual,
- @r"
- ProjectionExec: expr=[b@1 as new_b, c@2 + e@4 as binary, b@1 as newest_b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=csv, has_header=false
- "
+ @"DataSourceExec: file_groups={1 group: [[x]]}, projection=[b@1 as
new_b, c@2 + e@4 as binary, b@1 as newest_b], file_type=csv, has_header=false"
Review Comment:
it is pretty neat to see the expressions pushed down as projections here
##########
datafusion/datasource/src/file.rs:
##########
@@ -126,6 +129,13 @@ pub trait FileSource: Send + Sync {
))
}
+ fn try_pushdown_projection(
Review Comment:
I think we need to add doc strings to this method to help people who want to
implement it -- specifically:
1. What the return value is (aka if None, the projection isn't pushed down)
2. If Some is returned, what is the expectation on the returned plan (e.g.
the output schema changes, etc )
This would also be a good place to add hints / links to structures like
`SplitProjection` that might help implementing such a thing
##########
datafusion/datasource-arrow/src/source.rs:
##########
@@ -66,6 +71,8 @@ enum ArrowFormat {
pub(crate) struct ArrowStreamFileOpener {
object_store: Arc<dyn ObjectStore>,
projection: Option<Vec<usize>>,
+ projected_schema: Option<SchemaRef>,
+ schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
Review Comment:
this seems like a nice improvement (the Arrow readers now get a
SchemaAdapterFactory, so they should now also handle schema evolution)
Do we need to add tests for this anywhere? Perhaps a slt showing we can read
multiple arrow files with different schemas?
##########
datafusion-examples/examples/custom_data_source/csv_json_opener.rs:
##########
@@ -64,22 +64,22 @@ async fn csv_opener() -> Result<()> {
..Default::default()
};
- let scan_config = FileScanConfigBuilder::new(
- ObjectStoreUrl::local_filesystem(),
-
Arc::new(CsvSource::new(Arc::clone(&schema)).with_csv_options(options.clone())),
- )
- .with_projection_indices(Some(vec![12, 0]))
- .with_limit(Some(5))
- .with_file(PartitionedFile::new(path.display().to_string(), 10))
- .build();
-
- let config = CsvSource::new(Arc::clone(&schema))
+ let source = CsvSource::new(Arc::clone(&schema))
.with_csv_options(options)
.with_comment(Some(b'#'))
- .with_batch_size(8192)
- .with_projection(&scan_config);
+ .with_batch_size(8192);
+
+ let scan_config =
+ FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
+ .with_projection_indices(Some(vec![12, 0]))?
+ .with_limit(Some(5))
+ .with_file(PartitionedFile::new(path.display().to_string(), 10))
+ .build();
- let opener = config.create_file_opener(object_store, &scan_config, 0);
+ let opener =
+ scan_config
+ .file_source()
Review Comment:
This one is still kind of weird -- look like we might be able to replace it
entirely with
https://github.com/apache/datafusion/blob/3c21b546a9acf9922229220d3ceca91a945cbf46/datafusion/datasource/src/file_scan_config.rs#L501-L517
But then we would have to change the example to setup the runtime env
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -239,6 +223,7 @@ pub struct FileScanConfig {
/// .with_limit(Some(1000))
/// // Project only the first column
/// .with_projection_indices(Some(vec![0]))
+/// .expect("Failed to push down projection")
Review Comment:
nit -- it would be nice to make example this return a real error rather than
expect
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -1114,252 +1092,6 @@ impl DisplayAs for FileScanConfig {
}
}
-/// A helper that projects partition columns into the file record batches.
-///
-/// One interesting trick is the usage of a cache for the key buffers of the
partition column
-/// dictionaries. Indeed, the partition columns are constant, so the
dictionaries that represent them
-/// have all their keys equal to 0. This enables us to re-use the same
"all-zero" buffer across batches,
-/// which makes the space consumption of the partition columns O(batch_size)
instead of O(record_count).
-pub struct PartitionColumnProjector {
Review Comment:
This might be a good thing to document in the upgrade guide -- what to do in
this case
##########
datafusion/datasource/src/projection.rs:
##########
@@ -0,0 +1,634 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+use arrow_schema::SchemaRef;
+use datafusion_common::{
+ tree_node::{Transformed, TransformedResult, TreeNode},
+ Result, ScalarValue,
+};
+use datafusion_physical_expr::{
+ expressions::{Column, Literal},
+ projection::{ProjectionExpr, ProjectionExprs},
+};
+use futures::{FutureExt, StreamExt};
+use itertools::Itertools;
+
+use crate::{
+ file_stream::{FileOpenFuture, FileOpener},
+ PartitionedFile, TableSchema,
+};
+
+/// A file opener that handles applying a projection on top of an inner opener.
+///
+/// This includes handling partition columns.
+///
+/// Any projection pushed down will be split up into:
+/// - Simple column indices / column selection
+/// - A remainder projection that this opener applies on top of it
+///
+/// This is meant to simplify projection pushdown for sources like CSV
+/// that can only handle "simple" column selection.
+pub struct ProjectionOpener {
+ inner: Arc<dyn FileOpener>,
+ projection: ProjectionExprs,
+ input_schema: SchemaRef,
+ partition_columns: Vec<PartitionColumnIndex>,
+}
+
+impl ProjectionOpener {
+ pub fn try_new(
+ projection: SplitProjection,
+ inner: Arc<dyn FileOpener>,
+ file_schema: &Schema,
+ ) -> Result<Arc<dyn FileOpener>> {
+ Ok(Arc::new(ProjectionOpener {
+ inner,
+ projection: projection.remapped_projection,
+ input_schema:
Arc::new(file_schema.project(&projection.file_indices)?),
+ partition_columns: projection.partition_columns,
+ }))
+ }
+}
+
+impl FileOpener for ProjectionOpener {
+ fn open(&self, partitioned_file: PartitionedFile) ->
Result<FileOpenFuture> {
+ let partition_values = partitioned_file.partition_values.clone();
+ // Modify any references to partition columns in the projection
expressions
+ // and substitute them with literal values from
PartitionedFile.partition_values
+ let projection = if self.partition_columns.is_empty() {
+ self.projection.clone()
+ } else {
+ inject_partition_columns_into_projection(
+ &self.projection,
+ &self.partition_columns,
+ partition_values,
+ )
+ };
+ let projector = projection.make_projector(&self.input_schema)?;
+
+ let inner = self.inner.open(partitioned_file)?;
+
+ Ok(async move {
+ let stream = inner.await?;
+ let stream = stream.map(move |batch| {
+ let batch = batch?;
+ let batch = projector.project_batch(&batch)?;
+ Ok(batch)
+ });
+ Ok(stream.boxed())
+ }
+ .boxed())
+ }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct PartitionColumnIndex {
+ /// The index of this partition column in the remainder projection (>=
num_file_columns)
+ pub in_remainder_projection: usize,
+ /// The index of this partition column in the partition_values array
+ pub in_partition_values: usize,
+}
+
+fn inject_partition_columns_into_projection(
+ projection: &ProjectionExprs,
+ partition_columns: &[PartitionColumnIndex],
+ partition_values: Vec<ScalarValue>,
+) -> ProjectionExprs {
+ // Pre-create all literals for partition columns to avoid cloning
ScalarValues multiple times.
+ let partition_literals: Vec<Arc<Literal>> = partition_values
+ .into_iter()
+ .map(|value| Arc::new(Literal::new(value)))
+ .collect();
+
+ let projections = projection
+ .iter()
+ .map(|projection| {
+ let expr = Arc::clone(&projection.expr)
+ .transform(|expr| {
+ let original_expr = Arc::clone(&expr);
+ if let Some(column) =
expr.as_any().downcast_ref::<Column>() {
+ // Check if this column index corresponds to a
partition column
+ if let Some(pci) = partition_columns
+ .iter()
+ .find(|pci| pci.in_remainder_projection ==
column.index())
+ {
+ let literal =
+
Arc::clone(&partition_literals[pci.in_partition_values]);
+ return Ok(Transformed::yes(literal));
+ }
+ }
+ Ok(Transformed::no(original_expr))
+ })
+ .data()
+ .expect("infallible transform");
+ ProjectionExpr::new(expr, projection.alias.clone())
+ })
+ .collect_vec();
+ ProjectionExprs::new(projections)
+}
+
+/// At a high level the goal of SplitProjection is to take a ProjectionExprs
meant to be applied to the table schema
+/// and split that into:
+/// - The projection indices into the file schema (file_indices)
+/// - The projection indices into the partition values
(partition_value_indices), which pre-compute both the index into the table
schema
+/// and the index into the partition values array
+/// - A remapped projection that can be applied after the file projection is
applied
+/// This remapped projection has the following properties:
+/// - Column indices referring to file columns are remapped to
[0..file_indices.len())
+/// - Column indices referring to partition columns are remapped to
[file_indices.len()..)
+///
+/// This allows the ProjectionOpener to easily identify which columns in the
remapped projection
+/// refer to partition columns and substitute them with literals from the
partition values.
+#[derive(Debug, Clone)]
+pub struct SplitProjection {
+ /// The original projection this [`SplitProjection`] was derived from
+ pub source: ProjectionExprs,
+ /// Column indices to read from file (public for file sources)
+ pub file_indices: Vec<usize>,
+ /// Pre-computed partition column mappings (internal, used by
ProjectionOpener)
+ pub(crate) partition_columns: Vec<PartitionColumnIndex>,
+ /// The remapped projection (internal, used by ProjectionOpener)
+ pub(crate) remapped_projection: ProjectionExprs,
+}
+
+impl SplitProjection {
+ pub fn unprojected(table_schema: &TableSchema) -> Self {
+ let projection = ProjectionExprs::from_indices(
+ &(0..table_schema.table_schema().fields().len()).collect_vec(),
+ table_schema.table_schema(),
+ );
+ Self::new(table_schema.file_schema(), &projection)
+ }
+
+ /// Creates a new `SplitProjection` by splitting columns into file and
partition columns.
+ ///
+ /// # Algorithm
+ /// Single-pass approach that combines extraction, classification, and
remapping:
+ /// 1. Extract all unique column references from projection expressions
+ /// 2. Sort columns by original table index
+ /// 3. Classify each column as either file or partition based on
file_schema length
+ /// 4. Assign final indices: file columns → [0..n), partition columns →
[n..)
+ /// 5. Transform expressions once to remap all column references
+ ///
+ /// This replaces the previous three-pass approach:
Review Comment:
I don't think the comment here about old vs new approach is helpful / adds
value
Could you also update this comment to clarify:
1. What is contained in the file_schema
2. What assumptions are made
Specifically, I am confused as this code seems to assumes any columns
referenced in `projection` that are greater than file_schema are partition
columns. But what about columns that are in the table schema but not in the
file schema (aka "schema evolution " columns that will be filled in as nulls)
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -321,15 +304,46 @@ impl FileScanConfigBuilder {
/// Use [`Self::with_projection_indices`] instead. This method will be
removed in a future release.
#[deprecated(since = "51.0.0", note = "Use with_projection_indices
instead")]
pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
- self.with_projection_indices(indices)
+ match self.clone().with_projection_indices(indices) {
+ Ok(builder) => builder,
+ Err(e) => {
+ warn!("Failed to push down projection in
FileScanConfigBuilder::with_projection: {e}");
+ self
+ }
+ }
}
/// Set the columns on which to project the data using column indices.
///
/// Indexes that are higher than the number of columns of `file_schema`
refer to `table_partition_cols`.
- pub fn with_projection_indices(mut self, indices: Option<Vec<usize>>) ->
Self {
- self.projection_indices = indices;
- self
+ pub fn with_projection_indices(
+ mut self,
+ indices: Option<Vec<usize>>,
+ ) -> Result<Self> {
+ let projection_exprs = indices.map(|indices| {
Review Comment:
nit: You could probably reduce the level of indenting here by chevking or
non immediately:
```rust
let Some(indices) = indicies else {
return Ok(self)
}
```
##########
datafusion/core/tests/physical_optimizer/partition_statistics.rs:
##########
@@ -620,7 +620,7 @@ mod test {
let plan_string =
get_plan_string(&aggregate_exec_partial).swap_remove(0);
assert_snapshot!(
plan_string,
- @"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr],
aggr=[COUNT(c)]"
+ @"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr],
aggr=[COUNT(c)], ordering_mode=Sorted"
Review Comment:
The table is created with `ORDER BY id` so I think this plan is correct:
https://github.com/apache/datafusion/blob/3c21b546a9acf9922229220d3ceca91a945cbf46/datafusion/core/tests/physical_optimizer/partition_statistics.rs#L89-L88
(I don't really know why it started appearing either)
##########
datafusion/datasource-arrow/src/file_format.rs:
##########
@@ -208,14 +208,21 @@ impl FileFormat for ArrowFormat {
conf.table_partition_cols().clone(),
);
- let source: Arc<dyn FileSource> =
+ let mut source: Arc<dyn FileSource> =
match is_object_in_arrow_ipc_file_format(object_store,
object_location).await
{
Ok(true) =>
Arc::new(ArrowSource::new_file_source(table_schema)),
Ok(false) =>
Arc::new(ArrowSource::new_stream_file_source(table_schema)),
Err(e) => Err(e)?,
};
+ // Preserve projection from the original file source
+ if let Some(projection) = conf.file_source.projection() {
+ if let Some(new_source) =
source.try_pushdown_projection(projection)? {
+ source = new_source;
+ }
+ }
Review Comment:
I think @corasaurus-hex was recently working in this area, perhaps they can
help review this change
##########
datafusion/datasource/Cargo.toml:
##########
@@ -36,6 +36,7 @@ default = ["compression"]
[dependencies]
arrow = { workspace = true }
+arrow-schema = { workspace = true }
Review Comment:
Since we already have arrow as a dependency, the arrow-schema dependency
seems unecessary
##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -460,12 +460,13 @@ impl FileFormat for ParquetFormat {
metadata_size_hint = Some(metadata);
}
- let table_schema = TableSchema::new(
- Arc::clone(conf.file_schema()),
- conf.table_partition_cols().clone(),
- );
- let mut source = ParquetSource::new(table_schema)
- .with_table_parquet_options(self.options.clone());
+ let mut source = conf
+ .file_source()
+ .as_any()
+ .downcast_ref::<ParquetSource>()
+ .cloned()
+ .expect("should be a parquet source");
Review Comment:
it seems like this would be better if it were an internal error rather than
panic
The idea of using the existing source rather than creating a new one makes a
lot of sense though
##########
datafusion/datasource-json/src/file_format.rs:
##########
@@ -254,16 +254,10 @@ impl FileFormat for JsonFormat {
_state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
- let table_schema = TableSchema::new(
Review Comment:
I believe you had some open questions about this -- why was it re-creating
an opener?
But given all the tests pass I am going to assume it was some left over
unecessary code
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -595,6 +617,24 @@ impl DataSource for FileScanConfig {
}
}
}
+
+ if let Some(projection) = self.file_source.projection() {
+ match (
+ projection.project_schema(schema),
+ projection.projection_mapping(schema),
+ ) {
+ (Ok(output_schema), Ok(mapping)) => {
+ eq_properties =
+ eq_properties.project(&mapping,
Arc::new(output_schema));
+ }
+ (Err(e), _) | (_, Err(e)) => {
+ warn!("Failed to project equivalence properties: {e}");
Review Comment:
this seems reasonable to me (panic in debug, warn in release0
##########
datafusion-examples/examples/custom_data_source/csv_json_opener.rs:
##########
@@ -62,22 +62,22 @@ async fn csv_opener() -> Result<()> {
..Default::default()
};
- let scan_config = FileScanConfigBuilder::new(
- ObjectStoreUrl::local_filesystem(),
-
Arc::new(CsvSource::new(Arc::clone(&schema)).with_csv_options(options.clone())),
- )
- .with_projection_indices(Some(vec![12, 0]))
- .with_limit(Some(5))
- .with_file(PartitionedFile::new(path.display().to_string(), 10))
- .build();
-
- let config = CsvSource::new(Arc::clone(&schema))
+ let source = CsvSource::new(Arc::clone(&schema))
.with_csv_options(options)
.with_comment(Some(b'#'))
- .with_batch_size(8192)
- .with_projection(&scan_config);
-
- let opener = config.create_file_opener(object_store, &scan_config, 0);
+ .with_batch_size(8192);
+
+ let scan_config =
+ FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
Review Comment:
Yeah I agree it is really nice to not have this strange circular dependency
between the source and the config
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -656,57 +696,65 @@ impl DataSource for FileScanConfig {
&self,
projection: &[ProjectionExpr],
) -> Result<Option<Arc<dyn DataSource>>> {
- // This process can be moved into CsvExec, but it would be an overlap
of their responsibility.
-
- // Must be all column references, with no table partition columns
(which can not be projected)
- let partitioned_columns_in_proj = projection.iter().any(|proj_expr| {
- proj_expr
- .expr
- .as_any()
- .downcast_ref::<Column>()
- .map(|expr| expr.index() >= self.file_schema().fields().len())
- .unwrap_or(false)
- });
-
- // If there is any non-column or alias-carrier expression, Projection
should not be removed.
- let no_aliases = all_alias_free_columns(projection);
-
- Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
- let file_scan = self.clone();
- let source = Arc::clone(&file_scan.file_source);
- let new_projections = new_projections_for_columns(
- projection,
- &file_scan
- .projection_exprs
- .as_ref()
- .map(|p| p.ordered_column_indices())
- .unwrap_or_else(||
(0..self.file_schema().fields().len()).collect()),
- );
-
- Arc::new(
- FileScanConfigBuilder::from(file_scan)
- // Assign projected statistics to source
- .with_projection_indices(Some(new_projections))
- .with_source(source)
- .build(),
- ) as _
- }))
+ let projection = ProjectionExprs::new(projection.to_vec()); // TODO:
update signature of this method
Review Comment:
What method does this TODO refer to? I think we should file a ticket and
leave a reference here or just remove the comment -- as is the comment seems
non actionable to me
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -1114,252 +1092,6 @@ impl DisplayAs for FileScanConfig {
}
}
-/// A helper that projects partition columns into the file record batches.
-///
-/// One interesting trick is the usage of a cache for the key buffers of the
partition column
-/// dictionaries. Indeed, the partition columns are constant, so the
dictionaries that represent them
-/// have all their keys equal to 0. This enables us to re-use the same
"all-zero" buffer across batches,
-/// which makes the space consumption of the partition columns O(batch_size)
instead of O(record_count).
-pub struct PartitionColumnProjector {
Review Comment:
This code also seems to have a bunch of optimizations -- should we move them
to the schema adapter code perhaps?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]