Blizzara commented on code in PR #14194:
URL: https://github.com/apache/datafusion/pull/14194#discussion_r1954777419


##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -559,12 +562,62 @@ pub fn from_table_scan(
     let table_schema = scan.source.schema().to_dfschema_ref()?;
     let base_schema = to_substrait_named_struct(&table_schema)?;
 
+    let mut filter_option = None;
+    let mut best_effort_filter_option = None;
+
+    if !scan.filters.is_empty() {
+        let mut full_filters = vec![];
+        let mut partial_filters = vec![];
+        let mut unsupported_filters = vec![];
+        let filter_refs: Vec<&Expr> = scan.filters.iter().collect();
+
+        if let Ok(results) = 
scan.source.supports_filters_pushdown(&filter_refs) {
+            scan.filters
+                .iter()
+                .zip(results.iter())
+                .for_each(|(x, res)| match res {
+                    TableProviderFilterPushDown::Exact => 
full_filters.push(x.clone()),
+                    TableProviderFilterPushDown::Inexact => {
+                        partial_filters.push(x.clone())
+                    }
+                    TableProviderFilterPushDown::Unsupported => {
+                        unsupported_filters.push(x.clone())

Review Comment:
   `unsupported_filters` isn't used anywhere, I think? so no need to add to it



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -559,12 +562,62 @@ pub fn from_table_scan(
     let table_schema = scan.source.schema().to_dfschema_ref()?;
     let base_schema = to_substrait_named_struct(&table_schema)?;
 
+    let mut filter_option = None;
+    let mut best_effort_filter_option = None;
+
+    if !scan.filters.is_empty() {
+        let mut full_filters = vec![];
+        let mut partial_filters = vec![];
+        let mut unsupported_filters = vec![];
+        let filter_refs: Vec<&Expr> = scan.filters.iter().collect();
+
+        if let Ok(results) = 
scan.source.supports_filters_pushdown(&filter_refs) {
+            scan.filters
+                .iter()
+                .zip(results.iter())
+                .for_each(|(x, res)| match res {
+                    TableProviderFilterPushDown::Exact => 
full_filters.push(x.clone()),
+                    TableProviderFilterPushDown::Inexact => {
+                        partial_filters.push(x.clone())
+                    }
+                    TableProviderFilterPushDown::Unsupported => {
+                        unsupported_filters.push(x.clone())
+                    }
+                });
+        }
+
+        let table_schema_qualified =

Review Comment:
   can we not use the `table_schema` from above here?



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -559,12 +562,62 @@ pub fn from_table_scan(
     let table_schema = scan.source.schema().to_dfschema_ref()?;
     let base_schema = to_substrait_named_struct(&table_schema)?;
 
+    let mut filter_option = None;
+    let mut best_effort_filter_option = None;
+
+    if !scan.filters.is_empty() {
+        let mut full_filters = vec![];
+        let mut partial_filters = vec![];
+        let mut unsupported_filters = vec![];
+        let filter_refs: Vec<&Expr> = scan.filters.iter().collect();
+
+        if let Ok(results) = 
scan.source.supports_filters_pushdown(&filter_refs) {
+            scan.filters
+                .iter()
+                .zip(results.iter())
+                .for_each(|(x, res)| match res {
+                    TableProviderFilterPushDown::Exact => 
full_filters.push(x.clone()),
+                    TableProviderFilterPushDown::Inexact => {
+                        partial_filters.push(x.clone())
+                    }
+                    TableProviderFilterPushDown::Unsupported => {
+                        unsupported_filters.push(x.clone())
+                    }
+                });
+        }
+
+        let table_schema_qualified =
+            Arc::new(if !full_filters.is_empty() || 
!partial_filters.is_empty() {
+                DFSchema::try_from_qualified_schema(
+                    scan.table_name.clone(),
+                    &(scan.source.schema()),
+                )
+                .unwrap()
+            } else {
+                DFSchema::empty()
+            });
+
+        if !full_filters.is_empty() {
+            let combined_expr = conjunction(full_filters).unwrap();
+            let filter_expr =
+                producer.handle_expr(&combined_expr, &table_schema_qualified)?;
+            filter_option = Some(Box::new(filter_expr));
+        }

Review Comment:
   or actually, given `conjunction` returns None if the filters is empty, this 
could also be written functionally, something like:
   
   ```
   let filter = conjunction(full_filters).map(|f| producer.handle_expr(&f, 
&schema)).transpose()?
   ```



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -559,12 +559,31 @@ pub fn from_table_scan(
     let table_schema = scan.source.schema().to_dfschema_ref()?;
     let base_schema = to_substrait_named_struct(&table_schema)?;
 
+    let best_effort_filter_option = if !scan.filters.is_empty() {
+        let table_schema_qualified = Arc::new(
+            DFSchema::try_from_qualified_schema(
+                scan.table_name.clone(),
+                &(scan.source.schema()),
+            )
+            .unwrap(),
+        );
+        let mut combined_expr = scan.filters[0].clone();
+        for i in 1..scan.filters.len() {
+            combined_expr = combined_expr.and(scan.filters[i].clone());
+        }
+        let best_effort_filter_expr =
+            producer.handle_expr(&combined_expr, &table_schema_qualified)?;
+        Some(Box::new(best_effort_filter_expr))
+    } else {
+        None
+    };
+
     Ok(Box::new(Rel {
         rel_type: Some(RelType::Read(Box::new(ReadRel {
             common: None,
             base_schema: Some(base_schema),
             filter: None,
-            best_effort_filter: None,
+            best_effort_filter: best_effort_filter_option,

Review Comment:
   I _think_ that makes sense, though I still find this whole thing a bit 
difficult to wrap my head around.



##########
datafusion/substrait/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -1143,6 +1143,15 @@ async fn roundtrip_repartition_hash() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn roundtrip_read_filter() -> Result<()> {
+    roundtrip_verify_read_filter_counts(
+        "SELECT data.a FROM data JOIN data2 ON data.a = data2.a where data.a < 
5 AND data2.e > 1",

Review Comment:
   you could probably replace the "JOIN" with a "WHERE" and then simplify the 
count_read_filters quite a lot to handle just the rel types you actually need 
here. This sure works too, but usually less lines of code is less lines to 
maintain :)



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -559,12 +562,62 @@ pub fn from_table_scan(
     let table_schema = scan.source.schema().to_dfschema_ref()?;
     let base_schema = to_substrait_named_struct(&table_schema)?;
 
+    let mut filter_option = None;
+    let mut best_effort_filter_option = None;
+
+    if !scan.filters.is_empty() {
+        let mut full_filters = vec![];
+        let mut partial_filters = vec![];
+        let mut unsupported_filters = vec![];
+        let filter_refs: Vec<&Expr> = scan.filters.iter().collect();
+
+        if let Ok(results) = 
scan.source.supports_filters_pushdown(&filter_refs) {
+            scan.filters
+                .iter()
+                .zip(results.iter())
+                .for_each(|(x, res)| match res {
+                    TableProviderFilterPushDown::Exact => 
full_filters.push(x.clone()),
+                    TableProviderFilterPushDown::Inexact => {
+                        partial_filters.push(x.clone())
+                    }
+                    TableProviderFilterPushDown::Unsupported => {
+                        unsupported_filters.push(x.clone())
+                    }
+                });
+        }
+
+        let table_schema_qualified =
+            Arc::new(if !full_filters.is_empty() || 
!partial_filters.is_empty() {
+                DFSchema::try_from_qualified_schema(
+                    scan.table_name.clone(),
+                    &(scan.source.schema()),
+                )
+                .unwrap()
+            } else {
+                DFSchema::empty()
+            });
+
+        if !full_filters.is_empty() {
+            let combined_expr = conjunction(full_filters).unwrap();
+            let filter_expr =
+                producer.handle_expr(&combined_expr, &table_schema_qualified)?;
+            filter_option = Some(Box::new(filter_expr));
+        }

Review Comment:
   nit: I'd do something like
   ```suggestion
           let filter = if !full_filters.is_empty() {
               let combined_expr = conjunction(full_filters).unwrap();
               let filter_expr =
                   producer.handle_expr(&combined_expr, 
&table_schema_qualified)?;
               Some(Box::new(filter_expr))
           } else {
                None
           };
   ```
   to avoid the mutable var. Same below.



##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -1326,19 +1326,46 @@ pub async fn from_read_rel(
         table_ref: TableReference,
         schema: DFSchema,
         projection: &Option<MaskExpression>,
+        filter: &Option<Box<Expression>>,
+        best_effort_filter: &Option<Box<Expression>>,
     ) -> Result<LogicalPlan> {
         let schema = schema.replace_qualifier(table_ref.clone());
 
+        let mut filters = vec![];
+        if filter.is_some() {
+            let filter_expr = consumer
+                .consume_expression(&(filter.clone().unwrap()), &schema)
+                .await?;
+            filters.append(
+                &mut split_conjunction(&filter_expr)

Review Comment:
   nit: you can use `split_conjunction_owned` instead. 
   
   And maybe using `if let Some(f) = filter {..}` can simplify the rest. I'd do 
something like:
   ```
           let filters = if let Some(f) = filter {
               let filter_expr = consumer.consume_expression(f, &schema).await?;
               split_conjunction_owned(filter_expr)
           } else {
               vec![]
           };
   ```
   then the same for the best_effort_filters, and then in `scan_with_filters` 
do `[filters, best_effort_filters].concat()` or so. 



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -559,12 +562,62 @@ pub fn from_table_scan(
     let table_schema = scan.source.schema().to_dfschema_ref()?;
     let base_schema = to_substrait_named_struct(&table_schema)?;
 
+    let mut filter_option = None;
+    let mut best_effort_filter_option = None;
+
+    if !scan.filters.is_empty() {
+        let mut full_filters = vec![];

Review Comment:
   nit: can we call these "filters" and "best_effort_filters", or 
"exact_filters" and "inexact_filters", to not introduce a third naming option? 
😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to