vbarua commented on code in PR #13803:
URL: https://github.com/apache/datafusion/pull/13803#discussion_r1889320195


##########
datafusion/substrait/src/logical_plan/consumer.rs:
##########
@@ -1503,370 +1858,416 @@ pub async fn from_substrait_func_args(
 
 /// Convert Substrait AggregateFunction to DataFusion Expr
 pub async fn from_substrait_agg_func(
-    state: &dyn SubstraitPlanningState,
+    consumer: &impl SubstraitConsumer,
     f: &AggregateFunction,
     input_schema: &DFSchema,
-    extensions: &Extensions,
     filter: Option<Box<Expr>>,
     order_by: Option<Vec<SortExpr>>,
     distinct: bool,
 ) -> Result<Arc<Expr>> {
-    let args =
-        from_substrait_func_args(state, &f.arguments, input_schema, 
extensions).await?;
-
-    let Some(function_name) = extensions.functions.get(&f.function_reference) 
else {
+    let Some(fn_signature) = consumer
+        .get_extensions()
+        .functions
+        .get(&f.function_reference)
+    else {
         return plan_err!(
             "Aggregate function not registered: function anchor = {:?}",
             f.function_reference
         );
     };
 
-    let function_name = substrait_fun_name(function_name);
-    // try udaf first, then built-in aggr fn.
-    if let Ok(fun) = state.udaf(function_name) {
-        // deal with situation that count(*) got no arguments
-        let args = if fun.name() == "count" && args.is_empty() {
-            vec![Expr::Literal(ScalarValue::Int64(Some(1)))]
-        } else {
-            args
-        };
-
-        Ok(Arc::new(Expr::AggregateFunction(
-            expr::AggregateFunction::new_udf(fun, args, distinct, filter, 
order_by, None),
-        )))
-    } else {
-        not_impl_err!(
+    let fn_name = substrait_fun_name(fn_signature);
+    let udaf = consumer.get_state().udaf(fn_name);
+    let udaf = udaf.map_err(|_| {
+        not_impl_datafusion_err!(
             "Aggregate function {} is not supported: function anchor = {:?}",
-            function_name,
+            fn_signature,
             f.function_reference
         )
-    }
+    })?;
+
+    let args = from_substrait_func_args(consumer, &f.arguments, 
input_schema).await?;
+
+    // deal with situation that count(*) got no arguments
+    let args = if udaf.name() == "count" && args.is_empty() {
+        vec![Expr::Literal(ScalarValue::Int64(Some(1)))]
+    } else {
+        args
+    };
+
+    Ok(Arc::new(Expr::AggregateFunction(
+        expr::AggregateFunction::new_udf(udaf, args, distinct, filter, 
order_by, None),
+    )))
 }
 
 /// Convert Substrait Rex to DataFusion Expr
-#[async_recursion]
 pub async fn from_substrait_rex(
-    state: &dyn SubstraitPlanningState,
-    e: &Expression,
+    consumer: &impl SubstraitConsumer,
+    expression: &Expression,
     input_schema: &DFSchema,
-    extensions: &Extensions,
 ) -> Result<Expr> {
-    match &e.rex_type {
-        Some(RexType::SingularOrList(s)) => {
-            let substrait_expr = s.value.as_ref().unwrap();
-            let substrait_list = s.options.as_ref();
-            Ok(Expr::InList(InList {
-                expr: Box::new(
-                    from_substrait_rex(state, substrait_expr, input_schema, 
extensions)
-                        .await?,
-                ),
-                list: from_substrait_rex_vec(
-                    state,
-                    substrait_list,
-                    input_schema,
-                    extensions,
-                )
-                .await?,
-                negated: false,
-            }))
-        }
-        Some(RexType::Selection(field_ref)) => {
-            Ok(from_substrait_field_reference(field_ref, input_schema)?)
-        }
-        Some(RexType::IfThen(if_then)) => {
-            // Parse `ifs`
-            // If the first element does not have a `then` part, then we can 
assume it's a base expression
-            let mut when_then_expr: Vec<(Box<Expr>, Box<Expr>)> = vec![];
-            let mut expr = None;
-            for (i, if_expr) in if_then.ifs.iter().enumerate() {
-                if i == 0 {
-                    // Check if the first element is type base expression
-                    if if_expr.then.is_none() {
-                        expr = Some(Box::new(
-                            from_substrait_rex(
-                                state,
-                                if_expr.r#if.as_ref().unwrap(),
-                                input_schema,
-                                extensions,
-                            )
-                            .await?,
-                        ));
-                        continue;
-                    }
-                }
-                when_then_expr.push((
-                    Box::new(
-                        from_substrait_rex(
-                            state,
-                            if_expr.r#if.as_ref().unwrap(),
-                            input_schema,
-                            extensions,
-                        )
-                        .await?,
-                    ),
-                    Box::new(
-                        from_substrait_rex(
-                            state,
-                            if_expr.then.as_ref().unwrap(),
-                            input_schema,
-                            extensions,
-                        )
-                        .await?,
-                    ),
-                ));
+    match &expression.rex_type {
+        Some(t) => match t {
+            RexType::Literal(expr) => consumer.consume_literal(expr).await,
+            RexType::Selection(expr) => {
+                consumer.consume_selection(expr, input_schema).await
+            }
+            RexType::ScalarFunction(expr) => {
+                consumer.consume_scalar_function(expr, input_schema).await
+            }
+            RexType::WindowFunction(expr) => {
+                consumer.consume_window_function(expr, input_schema).await
+            }
+            RexType::IfThen(expr) => consumer.consume_if_then(expr, 
input_schema).await,
+            RexType::SwitchExpression(expr) => {
+                consumer.consume_switch(expr, input_schema).await
+            }
+            RexType::SingularOrList(expr) => {
+                consumer.consume_singular_or_list(expr, input_schema).await
             }
-            // Parse `else`
-            let else_expr = match &if_then.r#else {
-                Some(e) => Some(Box::new(
-                    from_substrait_rex(state, e, input_schema, 
extensions).await?,
-                )),
-                None => None,
-            };
-            Ok(Expr::Case(Case {
-                expr,
-                when_then_expr,
-                else_expr,
-            }))
-        }
-        Some(RexType::ScalarFunction(f)) => {
-            let Some(fn_name) = 
extensions.functions.get(&f.function_reference) else {
-                return plan_err!(
-                    "Scalar function not found: function reference = {:?}",
-                    f.function_reference
-                );
-            };
-            let fn_name = substrait_fun_name(fn_name);
 
-            let args =
-                from_substrait_func_args(state, &f.arguments, input_schema, 
extensions)
-                    .await?;
+            RexType::MultiOrList(expr) => {
+                consumer.consume_multi_or_list(expr, input_schema).await
+            }
 
-            // try to first match the requested function into registered udfs, 
then built-in ops
-            // and finally built-in expressions
-            if let Ok(func) = state.udf(fn_name) {
-                Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
-                    func.to_owned(),
-                    args,
-                )))
-            } else if let Some(op) = name_to_op(fn_name) {
-                if f.arguments.len() < 2 {
-                    return not_impl_err!(
-                        "Expect at least two arguments for binary operator 
{op:?}, the provided number of operators is {:?}",
-                        f.arguments.len()
-                    );
-                }
-                // Some expressions are binary in DataFusion but take in a 
variadic number of args in Substrait.
-                // In those cases we iterate through all the arguments, 
applying the binary expression against them all
-                let combined_expr = args
-                    .into_iter()
-                    .fold(None, |combined_expr: Option<Expr>, arg: Expr| {
-                        Some(match combined_expr {
-                            Some(expr) => Expr::BinaryExpr(BinaryExpr {
-                                left: Box::new(expr),
-                                op,
-                                right: Box::new(arg),
-                            }),
-                            None => arg,
-                        })
-                    })
-                    .unwrap();
+            RexType::Cast(expr) => {
+                consumer.consume_cast(expr.as_ref(), input_schema).await
+            }
 
-                Ok(combined_expr)
-            } else if let Some(builder) = 
BuiltinExprBuilder::try_from_name(fn_name) {
-                builder.build(state, f, input_schema, extensions).await
-            } else {
-                not_impl_err!("Unsupported function name: {fn_name:?}")
+            RexType::Subquery(expr) => {
+                consumer.consume_subquery(expr.as_ref(), input_schema).await
             }
-        }
-        Some(RexType::Literal(lit)) => {
-            let scalar_value = from_substrait_literal_without_names(lit, 
extensions)?;
-            Ok(Expr::Literal(scalar_value))
-        }
-        Some(RexType::Cast(cast)) => match cast.as_ref().r#type.as_ref() {
-            Some(output_type) => {
-                let input_expr = Box::new(
+            RexType::Nested(expr) => consumer.consume_nested(expr, 
input_schema).await,
+            RexType::Enum(expr) => consumer.consume_enum(expr, 
input_schema).await,
+        },
+        None => substrait_err!("Expression must set rex_type: {:?}", 
expression),
+    }
+}
+
+pub async fn from_singular_or_list(
+    consumer: &impl SubstraitConsumer,
+    expr: &SingularOrList,
+    input_schema: &DFSchema,
+) -> Result<Expr> {
+    let substrait_expr = expr.value.as_ref().unwrap();
+    let substrait_list = expr.options.as_ref();
+    Ok(Expr::InList(InList {
+        expr: Box::new(from_substrait_rex(consumer, substrait_expr, 
input_schema).await?),
+        list: from_substrait_rex_vec(consumer, substrait_list, 
input_schema).await?,
+        negated: false,
+    }))
+}
+
+pub async fn from_selection(

Review Comment:
   I think that's a better name. I updated both `consume_selection` and 
`from_selection`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to