alamb commented on code in PR #17336: URL: https://github.com/apache/datafusion/pull/17336#discussion_r2344142167
########## datafusion/catalog/src/table.rs: ########## @@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send { } } +/// Arguments for scanning a table with [`TableProvider::scan_with_args`]. +/// +/// `ScanArgs` provides a structured way to pass scan parameters to table providers, +/// replacing the multiple individual parameters used by [`TableProvider::scan`]. +/// This struct uses the builder pattern for convenient construction. +/// +/// # Examples +/// +/// ``` +/// # use datafusion_catalog::ScanArgs; +/// # use datafusion_expr::Expr; +/// let args = ScanArgs::default() +/// .with_projection(Some(vec![0, 2, 4])) +/// .with_limit(Some(1000)); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct ScanArgs { + filters: Option<Vec<Expr>>, + projection: Option<Vec<usize>>, + limit: Option<usize>, +} + +impl ScanArgs { + /// Set the column projection for the scan. + /// + /// The projection is a list of column indices from [`TableProvider::schema`] + /// that should be included in the scan results. If `None`, all columns are included. + /// + /// # Arguments + /// * `projection` - Optional list of column indices to project + pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self { + self.projection = projection; + self + } + + /// Get the column projection for the scan. + /// + /// Returns a cloned copy of the projection column indices, or `None` if + /// no projection was specified (meaning all columns should be included). + pub fn projection(&self) -> Option<Vec<usize>> { Review Comment: ```suggestion pub fn projection(&self) -> Option<&Vec<usize>> { ``` ########## datafusion/catalog/src/table.rs: ########## @@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send { } } +/// Arguments for scanning a table with [`TableProvider::scan_with_args`]. +/// +/// `ScanArgs` provides a structured way to pass scan parameters to table providers, +/// replacing the multiple individual parameters used by [`TableProvider::scan`]. +/// This struct uses the builder pattern for convenient construction. +/// +/// # Examples +/// +/// ``` +/// # use datafusion_catalog::ScanArgs; +/// # use datafusion_expr::Expr; +/// let args = ScanArgs::default() +/// .with_projection(Some(vec![0, 2, 4])) +/// .with_limit(Some(1000)); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct ScanArgs { + filters: Option<Vec<Expr>>, + projection: Option<Vec<usize>>, + limit: Option<usize>, +} + +impl ScanArgs { + /// Set the column projection for the scan. + /// + /// The projection is a list of column indices from [`TableProvider::schema`] + /// that should be included in the scan results. If `None`, all columns are included. + /// + /// # Arguments + /// * `projection` - Optional list of column indices to project + pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self { + self.projection = projection; + self + } + + /// Get the column projection for the scan. + /// + /// Returns a cloned copy of the projection column indices, or `None` if + /// no projection was specified (meaning all columns should be included). + pub fn projection(&self) -> Option<Vec<usize>> { Review Comment: I think this should return a reference (`Option<&Vec<usize>>`) to avoid forcing clones even if the caller doesn't need a copy ########## datafusion/core/src/physical_planner.rs: ########## @@ -459,9 +460,12 @@ impl DefaultPhysicalPlanner { // doesn't know (nor should care) how the relation was // referred to in the query let filters = unnormalize_cols(filters.iter().cloned()); - source - .scan(session_state, projection.as_ref(), &filters, *fetch) - .await? + let opts = ScanArgs::default() + .with_projection(projection.clone()) Review Comment: is there a reason we need to cone the projection here? I think it would be better and avoid the clone and pass in the arguments via reference (so something like `ScanArgs<'a>`) ########## datafusion/catalog/src/table.rs: ########## @@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send { } } +/// Arguments for scanning a table with [`TableProvider::scan_with_args`]. +/// +/// `ScanArgs` provides a structured way to pass scan parameters to table providers, +/// replacing the multiple individual parameters used by [`TableProvider::scan`]. +/// This struct uses the builder pattern for convenient construction. +/// +/// # Examples +/// +/// ``` +/// # use datafusion_catalog::ScanArgs; +/// # use datafusion_expr::Expr; +/// let args = ScanArgs::default() +/// .with_projection(Some(vec![0, 2, 4])) +/// .with_limit(Some(1000)); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct ScanArgs { + filters: Option<Vec<Expr>>, + projection: Option<Vec<usize>>, + limit: Option<usize>, +} + +impl ScanArgs { + /// Set the column projection for the scan. + /// + /// The projection is a list of column indices from [`TableProvider::schema`] + /// that should be included in the scan results. If `None`, all columns are included. + /// + /// # Arguments + /// * `projection` - Optional list of column indices to project + pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self { + self.projection = projection; + self + } + + /// Get the column projection for the scan. + /// + /// Returns a cloned copy of the projection column indices, or `None` if + /// no projection was specified (meaning all columns should be included). + pub fn projection(&self) -> Option<Vec<usize>> { + self.projection.clone() + } + + /// Set the filter expressions for the scan. + /// + /// Filters are boolean expressions that should be evaluated during the scan + /// to reduce the number of rows returned. All expressions are combined with AND logic. + /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`]. + /// + /// # Arguments + /// * `filters` - Optional list of filter expressions + pub fn with_filters(mut self, filters: Option<Vec<Expr>>) -> Self { + self.filters = filters; + self + } + + /// Get the filter expressions for the scan. + /// + /// Returns a reference to the filter expressions, or `None` if no filters were specified. + pub fn filters(&self) -> Option<&[Expr]> { + self.filters.as_deref() + } + + /// Set the maximum number of rows to return from the scan. + /// + /// If specified, the scan should return at most this many rows. This is typically + /// used to optimize queries with `LIMIT` clauses. + /// + /// # Arguments + /// * `limit` - Optional maximum number of rows to return + pub fn with_limit(mut self, limit: Option<usize>) -> Self { + self.limit = limit; + self + } + + /// Get the maximum number of rows to return from the scan. + /// + /// Returns the row limit, or `None` if no limit was specified. + pub fn limit(&self) -> Option<usize> { + self.limit + } +} + +/// Result of a table scan operation from [`TableProvider::scan_with_args`]. +/// +/// `ScanResult` encapsulates the [`ExecutionPlan`] produced by a table scan, +/// providing a typed return value instead of returning the plan directly. +/// This allows for future extensibility of scan results without breaking +/// the API. Review Comment: How much value does this comment add? While accurate I don't think it provides much value and just makes the code harder to read (sounds like an LLM wrote it lol) ########## datafusion/core/src/datasource/listing/table.rs: ########## @@ -1166,6 +1166,22 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option<usize>, ) -> Result<Arc<dyn ExecutionPlan>> { + let options = ScanArgs::default() + .with_projection(projection.cloned()) + .with_filters(Some(filters.to_vec())) + .with_limit(limit); + Ok(self.scan_with_args(state, options).await?.plan()) + } Review Comment: I think the recursion error is unfortunate -- maybe we could avoid problems with good enough documentation but it seems like a rough edge on one of the core experiences (TableProvider) Here is an alternate proposal: 1. leave `scan` required (and leave `scan_with_args` in terms of `scan`) Once we are happy with `scan_with_args` we can then properly deprecate (maybe even remove `scan`) in favor of `scan_with_args` as a follow on PR (in some future PR) This approach has the downside that it is more complicated to implement `scan_with_args` as you will be forced to also provide a `not implemented error` or something. The upside is that there is no change needed to existing code and those who aren't yet interested in scan_with_args won't need a change > I also chose not to deprecate scan() to (1) avoid a bunch of code churn mostly for us and (2) until we are confident that scan_with_args is a workable evolution This seems like a good idea to me ########## datafusion/catalog/src/table.rs: ########## @@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send { } } +/// Arguments for scanning a table with [`TableProvider::scan_with_args`]. +/// +/// `ScanArgs` provides a structured way to pass scan parameters to table providers, +/// replacing the multiple individual parameters used by [`TableProvider::scan`]. +/// This struct uses the builder pattern for convenient construction. +/// +/// # Examples +/// +/// ``` +/// # use datafusion_catalog::ScanArgs; +/// # use datafusion_expr::Expr; +/// let args = ScanArgs::default() +/// .with_projection(Some(vec![0, 2, 4])) +/// .with_limit(Some(1000)); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct ScanArgs { + filters: Option<Vec<Expr>>, + projection: Option<Vec<usize>>, + limit: Option<usize>, +} + +impl ScanArgs { + /// Set the column projection for the scan. + /// + /// The projection is a list of column indices from [`TableProvider::schema`] + /// that should be included in the scan results. If `None`, all columns are included. + /// + /// # Arguments + /// * `projection` - Optional list of column indices to project + pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self { + self.projection = projection; + self + } + + /// Get the column projection for the scan. + /// + /// Returns a cloned copy of the projection column indices, or `None` if + /// no projection was specified (meaning all columns should be included). + pub fn projection(&self) -> Option<Vec<usize>> { + self.projection.clone() + } + + /// Set the filter expressions for the scan. + /// + /// Filters are boolean expressions that should be evaluated during the scan + /// to reduce the number of rows returned. All expressions are combined with AND logic. + /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`]. + /// + /// # Arguments + /// * `filters` - Optional list of filter expressions + pub fn with_filters(mut self, filters: Option<Vec<Expr>>) -> Self { + self.filters = filters; + self + } + + /// Get the filter expressions for the scan. + /// + /// Returns a reference to the filter expressions, or `None` if no filters were specified. + pub fn filters(&self) -> Option<&[Expr]> { + self.filters.as_deref() + } + + /// Set the maximum number of rows to return from the scan. + /// + /// If specified, the scan should return at most this many rows. This is typically + /// used to optimize queries with `LIMIT` clauses. + /// + /// # Arguments + /// * `limit` - Optional maximum number of rows to return + pub fn with_limit(mut self, limit: Option<usize>) -> Self { + self.limit = limit; + self + } + + /// Get the maximum number of rows to return from the scan. + /// + /// Returns the row limit, or `None` if no limit was specified. + pub fn limit(&self) -> Option<usize> { + self.limit + } +} + +/// Result of a table scan operation from [`TableProvider::scan_with_args`]. +/// +/// `ScanResult` encapsulates the [`ExecutionPlan`] produced by a table scan, +/// providing a typed return value instead of returning the plan directly. +/// This allows for future extensibility of scan results without breaking +/// the API. +#[derive(Debug, Clone)] +pub struct ScanResult { + /// The ExecutionPlan to run. + plan: Arc<dyn ExecutionPlan>, +} + +impl ScanResult { + /// Create a new `ScanResult` with the given execution plan. + /// + /// # Arguments + /// * `plan` - The execution plan that will perform the table scan + pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self { + Self { plan } + } + + /// Get the execution plan for this scan result. + /// + /// Returns a cloned reference to the [`ExecutionPlan`] that will perform + /// the actual table scanning and data retrieval. + pub fn plan(&self) -> Arc<dyn ExecutionPlan> { Review Comment: I also recommend here providing this as a reference as well as providing a way to get the plan without a clone ```suggestion pub fn plan(&self) -> &Arc<dyn ExecutionPlan> { ``` And then also add a function like `ScanResult::into_inner` to return the inner plan ########## datafusion/catalog/src/table.rs: ########## @@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send { } } +/// Arguments for scanning a table with [`TableProvider::scan_with_args`]. +/// +/// `ScanArgs` provides a structured way to pass scan parameters to table providers, +/// replacing the multiple individual parameters used by [`TableProvider::scan`]. +/// This struct uses the builder pattern for convenient construction. +/// +/// # Examples +/// +/// ``` +/// # use datafusion_catalog::ScanArgs; +/// # use datafusion_expr::Expr; +/// let args = ScanArgs::default() +/// .with_projection(Some(vec![0, 2, 4])) +/// .with_limit(Some(1000)); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct ScanArgs { + filters: Option<Vec<Expr>>, + projection: Option<Vec<usize>>, + limit: Option<usize>, +} + +impl ScanArgs { + /// Set the column projection for the scan. + /// + /// The projection is a list of column indices from [`TableProvider::schema`] + /// that should be included in the scan results. If `None`, all columns are included. + /// + /// # Arguments + /// * `projection` - Optional list of column indices to project + pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self { + self.projection = projection; + self + } + + /// Get the column projection for the scan. + /// + /// Returns a cloned copy of the projection column indices, or `None` if + /// no projection was specified (meaning all columns should be included). + pub fn projection(&self) -> Option<Vec<usize>> { + self.projection.clone() + } + + /// Set the filter expressions for the scan. + /// + /// Filters are boolean expressions that should be evaluated during the scan + /// to reduce the number of rows returned. All expressions are combined with AND logic. + /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`]. + /// + /// # Arguments + /// * `filters` - Optional list of filter expressions + pub fn with_filters(mut self, filters: Option<Vec<Expr>>) -> Self { + self.filters = filters; + self + } + + /// Get the filter expressions for the scan. + /// + /// Returns a reference to the filter expressions, or `None` if no filters were specified. + pub fn filters(&self) -> Option<&[Expr]> { Review Comment: (minor) I recommend returing the raw Vec reference so the caller can use Vec specific functions ```suggestion pub fn filters(&self) -> Option<&Vec<Expr>> { ``` ########## datafusion/catalog/src/table.rs: ########## @@ -299,6 +334,119 @@ pub trait TableProvider: Debug + Sync + Send { } } +/// Arguments for scanning a table with [`TableProvider::scan_with_args`]. +/// +/// `ScanArgs` provides a structured way to pass scan parameters to table providers, +/// replacing the multiple individual parameters used by [`TableProvider::scan`]. +/// This struct uses the builder pattern for convenient construction. +/// +/// # Examples +/// +/// ``` +/// # use datafusion_catalog::ScanArgs; +/// # use datafusion_expr::Expr; +/// let args = ScanArgs::default() +/// .with_projection(Some(vec![0, 2, 4])) +/// .with_limit(Some(1000)); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct ScanArgs { + filters: Option<Vec<Expr>>, + projection: Option<Vec<usize>>, + limit: Option<usize>, +} + +impl ScanArgs { + /// Set the column projection for the scan. + /// + /// The projection is a list of column indices from [`TableProvider::schema`] + /// that should be included in the scan results. If `None`, all columns are included. + /// + /// # Arguments + /// * `projection` - Optional list of column indices to project + pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self { + self.projection = projection; + self + } + + /// Get the column projection for the scan. + /// + /// Returns a cloned copy of the projection column indices, or `None` if + /// no projection was specified (meaning all columns should be included). + pub fn projection(&self) -> Option<Vec<usize>> { + self.projection.clone() + } + + /// Set the filter expressions for the scan. + /// + /// Filters are boolean expressions that should be evaluated during the scan + /// to reduce the number of rows returned. All expressions are combined with AND logic. + /// Whether filters are actually pushed down depends on [`TableProvider::supports_filters_pushdown`]. + /// + /// # Arguments + /// * `filters` - Optional list of filter expressions + pub fn with_filters(mut self, filters: Option<Vec<Expr>>) -> Self { + self.filters = filters; + self + } + + /// Get the filter expressions for the scan. + /// + /// Returns a reference to the filter expressions, or `None` if no filters were specified. + pub fn filters(&self) -> Option<&[Expr]> { + self.filters.as_deref() + } + + /// Set the maximum number of rows to return from the scan. + /// + /// If specified, the scan should return at most this many rows. This is typically + /// used to optimize queries with `LIMIT` clauses. + /// + /// # Arguments + /// * `limit` - Optional maximum number of rows to return + pub fn with_limit(mut self, limit: Option<usize>) -> Self { + self.limit = limit; + self + } + + /// Get the maximum number of rows to return from the scan. + /// + /// Returns the row limit, or `None` if no limit was specified. + pub fn limit(&self) -> Option<usize> { + self.limit + } +} + +/// Result of a table scan operation from [`TableProvider::scan_with_args`]. +/// +/// `ScanResult` encapsulates the [`ExecutionPlan`] produced by a table scan, +/// providing a typed return value instead of returning the plan directly. +/// This allows for future extensibility of scan results without breaking +/// the API. +#[derive(Debug, Clone)] +pub struct ScanResult { + /// The ExecutionPlan to run. + plan: Arc<dyn ExecutionPlan>, +} + +impl ScanResult { + /// Create a new `ScanResult` with the given execution plan. + /// + /// # Arguments + /// * `plan` - The execution plan that will perform the table scan + pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self { + Self { plan } + } + + /// Get the execution plan for this scan result. + /// + /// Returns a cloned reference to the [`ExecutionPlan`] that will perform + /// the actual table scanning and data retrieval. + pub fn plan(&self) -> Arc<dyn ExecutionPlan> { Review Comment: Another nice usability thing would be to provide a `From` impl so we can make thse like `ScanResult::from(plan)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org