vbarua commented on code in PR #14194:
URL: https://github.com/apache/datafusion/pull/14194#discussion_r1971967557


##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -1327,19 +1327,37 @@ pub async fn from_read_rel(
         table_ref: TableReference,
         schema: DFSchema,
         projection: &Option<MaskExpression>,
+        filter: &Option<Box<Expression>>,
+        best_effort_filter: &Option<Box<Expression>>,
     ) -> Result<LogicalPlan> {
         let schema = schema.replace_qualifier(table_ref.clone());
 
+        let filters = if let Some(f) = filter {
+            let filter_expr = consumer.consume_expression(&(f.clone()), 
&schema).await?;
+            split_conjunction_owned(filter_expr)
+        } else {
+            vec![]
+        };
+
+        let best_effort_filters = if let Some(bef) = best_effort_filter {
+            let best_effort_filter_expr =
+                consumer.consume_expression(&(bef.clone()), &schema).await?;

Review Comment:
   minor: same as above, we don't need to clone `bef`



##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -1327,19 +1327,37 @@ pub async fn from_read_rel(
         table_ref: TableReference,
         schema: DFSchema,
         projection: &Option<MaskExpression>,
+        filter: &Option<Box<Expression>>,
+        best_effort_filter: &Option<Box<Expression>>,
     ) -> Result<LogicalPlan> {
         let schema = schema.replace_qualifier(table_ref.clone());
 
+        let filters = if let Some(f) = filter {
+            let filter_expr = consumer.consume_expression(&(f.clone()), 
&schema).await?;

Review Comment:
   minor: `consume_expression` only requires a reference to the expression, so 
we can avoid the `clone()` and just use `f` directly.



##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -1327,19 +1327,37 @@ pub async fn from_read_rel(
         table_ref: TableReference,
         schema: DFSchema,
         projection: &Option<MaskExpression>,
+        filter: &Option<Box<Expression>>,
+        best_effort_filter: &Option<Box<Expression>>,
     ) -> Result<LogicalPlan> {
         let schema = schema.replace_qualifier(table_ref.clone());
 
+        let filters = if let Some(f) = filter {
+            let filter_expr = consumer.consume_expression(&(f.clone()), 
&schema).await?;
+            split_conjunction_owned(filter_expr)
+        } else {
+            vec![]
+        };
+
+        let best_effort_filters = if let Some(bef) = best_effort_filter {
+            let best_effort_filter_expr =
+                consumer.consume_expression(&(bef.clone()), &schema).await?;
+            split_conjunction_owned(best_effort_filter_expr)
+        } else {
+            vec![]
+        };
+
         let plan = {
             let provider = match consumer.resolve_table_ref(&table_ref).await? 
{
                 Some(ref provider) => Arc::clone(provider),
                 _ => return plan_err!("No table named '{table_ref}'"),
             };
 
-            LogicalPlanBuilder::scan(
+            LogicalPlanBuilder::scan_with_filters(
                 table_ref,
                 provider_as_source(Arc::clone(&provider)),
                 None,
+                [filters, best_effort_filters].concat(),

Review Comment:
   The Substrait `best_effort_filter` is described as
   > A boolean Substrait expression that describes a filter that _**may**_ be 
applied to the data.
   
   Including it as a filter on the Scan means that it will _**always**_ be 
applied, which should be safe in terms of plan semantics. This is likely the 
best interpretation for DataFusion at the moment as there is no direct 
`best_effort_filter` equivalent.



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -559,12 +559,31 @@ pub fn from_table_scan(
     let table_schema = scan.source.schema().to_dfschema_ref()?;
     let base_schema = to_substrait_named_struct(&table_schema)?;
 
+    let best_effort_filter_option = if !scan.filters.is_empty() {
+        let table_schema_qualified = Arc::new(
+            DFSchema::try_from_qualified_schema(
+                scan.table_name.clone(),
+                &(scan.source.schema()),
+            )
+            .unwrap(),
+        );
+        let mut combined_expr = scan.filters[0].clone();
+        for i in 1..scan.filters.len() {
+            combined_expr = combined_expr.and(scan.filters[i].clone());
+        }
+        let best_effort_filter_expr =
+            producer.handle_expr(&combined_expr, &table_schema_qualified)?;
+        Some(Box::new(best_effort_filter_expr))
+    } else {
+        None
+    };
+
     Ok(Box::new(Rel {
         rel_type: Some(RelType::Read(Box::new(ReadRel {
             common: None,
             base_schema: Some(base_schema),
             filter: None,
-            best_effort_filter: None,
+            best_effort_filter: best_effort_filter_option,

Review Comment:
   Let's see if I can wrap my head around this. Based on my understanding, a 
DataFusion `TableScan`
   
https://github.com/apache/datafusion/blob/f51cd6e7893aa7221dac338bac0e6983ed8d6141/datafusion/expr/src/logical_plan/plan.rs#L2499-L2514
   
   can contain 0 or more filters, **_all_** of which must be applied. As an 
_optimization_, when DataFusion executes the query it can opt to push some of 
these filters into the TableSource itself. This is controlled by the 
`supports_filter_pushdown` function
   
https://github.com/apache/datafusion/blob/f51cd6e7893aa7221dac338bac0e6983ed8d6141/datafusion/expr/src/table_source.rs#L110-L115
   
   There are 3 types of TableProviderFilterPushDown 
https://github.com/apache/datafusion/blob/f51cd6e7893aa7221dac338bac0e6983ed8d6141/datafusion/expr/src/table_source.rs#L37-L51
   
   I don't think there is a 1-1 mapping between a TableScan and the ReadRel 
with `filter` and `best_effort_filter` fields. Part of this is that if you 
consider the pushdown into the TableSource, the TableScan is almost like 2 
nodes in 1.
   
   The _simplest_ mapping that works for this IMO is that all` 
TableScan#filters` become `ReadRel#filters`.
   
   A more nuance mapping might be something like TableScan to
   ```
   FilterRel: <Optional FilterRel if INEXACT or UNSUPPORTED filters>
     condition: <all INEXACT and UNSUPPORTED filters>
     ReadRel
       filter: <all EXACT filters>
       best_effort_filter: <all INEXACT filters>
   ```
   
   All `Exact` filters becomes a filter on the ReadRel, and all `Inexact` 
filters become `best_effort_filters`. We then apply a `FilterRel` around the 
`ReadRel` to apply the `Inexact` or `Unsupported` filters.
   
   Note that with the more nuance mapping we also have to handle the 
`projection` and `fetch` fields on the TableScan.
   
   I would advocate for the simple approach at this point, and I might even 
argue that we shouldn't support reading or writing the `best_effort_filter` for 
now until the semantics are better pinned down.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to