adriangb commented on code in PR #18627:
URL: https://github.com/apache/datafusion/pull/18627#discussion_r2562851908


##########
datafusion/datasource/src/projection.rs:
##########
@@ -0,0 +1,634 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+use arrow_schema::SchemaRef;
+use datafusion_common::{
+    tree_node::{Transformed, TransformedResult, TreeNode},
+    Result, ScalarValue,
+};
+use datafusion_physical_expr::{
+    expressions::{Column, Literal},
+    projection::{ProjectionExpr, ProjectionExprs},
+};
+use futures::{FutureExt, StreamExt};
+use itertools::Itertools;
+
+use crate::{
+    file_stream::{FileOpenFuture, FileOpener},
+    PartitionedFile, TableSchema,
+};
+
+/// A file opener that handles applying a projection on top of an inner opener.
+///
+/// This includes handling partition columns.
+///
+/// Any projection pushed down will be split up into:
+/// - Simple column indices / column selection
+/// - A remainder projection that this opener applies on top of it
+///
+/// This is meant to simplify projection pushdown for sources like CSV
+/// that can only handle "simple" column selection.
+pub struct ProjectionOpener {
+    inner: Arc<dyn FileOpener>,
+    projection: ProjectionExprs,
+    input_schema: SchemaRef,
+    partition_columns: Vec<PartitionColumnIndex>,
+}
+
+impl ProjectionOpener {
+    pub fn try_new(
+        projection: SplitProjection,
+        inner: Arc<dyn FileOpener>,
+        file_schema: &Schema,
+    ) -> Result<Arc<dyn FileOpener>> {
+        Ok(Arc::new(ProjectionOpener {
+            inner,
+            projection: projection.remapped_projection,
+            input_schema: 
Arc::new(file_schema.project(&projection.file_indices)?),
+            partition_columns: projection.partition_columns,
+        }))
+    }
+}
+
+impl FileOpener for ProjectionOpener {
+    fn open(&self, partitioned_file: PartitionedFile) -> 
Result<FileOpenFuture> {
+        let partition_values = partitioned_file.partition_values.clone();
+        // Modify any references to partition columns in the projection 
expressions
+        // and substitute them with literal values from 
PartitionedFile.partition_values
+        let projection = if self.partition_columns.is_empty() {
+            self.projection.clone()
+        } else {
+            inject_partition_columns_into_projection(
+                &self.projection,
+                &self.partition_columns,
+                partition_values,
+            )
+        };
+        let projector = projection.make_projector(&self.input_schema)?;
+
+        let inner = self.inner.open(partitioned_file)?;
+
+        Ok(async move {
+            let stream = inner.await?;
+            let stream = stream.map(move |batch| {
+                let batch = batch?;
+                let batch = projector.project_batch(&batch)?;
+                Ok(batch)
+            });
+            Ok(stream.boxed())
+        }
+        .boxed())
+    }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct PartitionColumnIndex {
+    /// The index of this partition column in the remainder projection (>= 
num_file_columns)
+    pub in_remainder_projection: usize,
+    /// The index of this partition column in the partition_values array
+    pub in_partition_values: usize,
+}
+
+fn inject_partition_columns_into_projection(
+    projection: &ProjectionExprs,
+    partition_columns: &[PartitionColumnIndex],
+    partition_values: Vec<ScalarValue>,
+) -> ProjectionExprs {
+    // Pre-create all literals for partition columns to avoid cloning 
ScalarValues multiple times.
+    let partition_literals: Vec<Arc<Literal>> = partition_values
+        .into_iter()
+        .map(|value| Arc::new(Literal::new(value)))
+        .collect();
+
+    let projections = projection
+        .iter()
+        .map(|projection| {
+            let expr = Arc::clone(&projection.expr)
+                .transform(|expr| {
+                    let original_expr = Arc::clone(&expr);
+                    if let Some(column) = 
expr.as_any().downcast_ref::<Column>() {
+                        // Check if this column index corresponds to a 
partition column
+                        if let Some(pci) = partition_columns
+                            .iter()
+                            .find(|pci| pci.in_remainder_projection == 
column.index())
+                        {
+                            let literal =
+                                
Arc::clone(&partition_literals[pci.in_partition_values]);
+                            return Ok(Transformed::yes(literal));
+                        }
+                    }
+                    Ok(Transformed::no(original_expr))
+                })
+                .data()
+                .expect("infallible transform");
+            ProjectionExpr::new(expr, projection.alias.clone())
+        })
+        .collect_vec();
+    ProjectionExprs::new(projections)
+}
+
+/// At a high level the goal of SplitProjection is to take a ProjectionExprs 
meant to be applied to the table schema
+/// and split that into:
+/// - The projection indices into the file schema (file_indices)
+/// - The projection indices into the partition values 
(partition_value_indices), which pre-compute both the index into the table 
schema
+///   and the index into the partition values array
+/// - A remapped projection that can be applied after the file projection is 
applied
+///   This remapped projection has the following properties:
+///     - Column indices referring to file columns are remapped to 
[0..file_indices.len())
+///     - Column indices referring to partition columns are remapped to 
[file_indices.len()..)
+///
+///   This allows the ProjectionOpener to easily identify which columns in the 
remapped projection
+///   refer to partition columns and substitute them with literals from the 
partition values.
+#[derive(Debug, Clone)]
+pub struct SplitProjection {
+    /// The original projection this [`SplitProjection`] was derived from
+    pub source: ProjectionExprs,
+    /// Column indices to read from file (public for file sources)
+    pub file_indices: Vec<usize>,
+    /// Pre-computed partition column mappings (internal, used by 
ProjectionOpener)
+    pub(crate) partition_columns: Vec<PartitionColumnIndex>,
+    /// The remapped projection (internal, used by ProjectionOpener)
+    pub(crate) remapped_projection: ProjectionExprs,
+}
+
+impl SplitProjection {
+    pub fn unprojected(table_schema: &TableSchema) -> Self {
+        let projection = ProjectionExprs::from_indices(
+            &(0..table_schema.table_schema().fields().len()).collect_vec(),
+            table_schema.table_schema(),
+        );
+        Self::new(table_schema.file_schema(), &projection)
+    }
+
+    /// Creates a new `SplitProjection` by splitting columns into file and 
partition columns.
+    ///
+    /// # Algorithm
+    /// Single-pass approach that combines extraction, classification, and 
remapping:
+    /// 1. Extract all unique column references from projection expressions
+    /// 2. Sort columns by original table index
+    /// 3. Classify each column as either file or partition based on 
file_schema length
+    /// 4. Assign final indices: file columns → [0..n), partition columns → 
[n..)
+    /// 5. Transform expressions once to remap all column references
+    ///
+    /// This replaces the previous three-pass approach:

Review Comment:
   I've added docstrings and updated the parameter name to be 
`logical_file_schema`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to