andreashgk opened a new issue, #20031:
URL: https://github.com/apache/datafusion/issues/20031
### Describe the bug
Directly calling an async UDF on the output of another async UDF
`async_example(async_example(1))` produces an internal error.:
```
Internal error: async functions should not be called directly
```
### To Reproduce
The following code is a miminal example somewhat copied from the async UDF
example in this repo:
```rs
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::{
arrow::{
array::{ArrayRef, Int64Array},
datatypes::DataType,
},
common::{cast::as_int64_array, not_impl_err, utils::take_function_args},
error::Result,
logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl},
},
prelude::SessionContext,
};
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct AsyncExample {
signature: Signature,
}
impl ScalarUDFImpl for AsyncExample {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn name(&self) -> &str {
"async_example"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Int64)
}
fn invoke_with_args(&self, _args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
not_impl_err!("async_udf was called without async")
}
}
#[async_trait]
impl AsyncScalarUDFImpl for AsyncExample {
async fn invoke_async_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(&args.args)?;
let [input_column] = take_function_args(self.name(), args)?;
let input_column = as_int64_array(&input_column)?;
let results: Int64Array = input_column.iter().map(|i| i.map(|i| i +
1)).collect();
Ok(ColumnarValue::Array(Arc::new(results) as ArrayRef))
}
}
#[tokio::main]
async fn main() {
let ctx = SessionContext::new();
ctx.register_udf(
AsyncScalarUDF::new(Arc::new(AsyncExample {
signature: Signature::exact(
vec![DataType::Int64],
datafusion::logical_expr::Volatility::Immutable,
),
}))
.into_scalar_udf(),
);
// Works
let df = ctx.sql("SELECT async_example(1)").await.unwrap();
df.collect().await.unwrap();
// Does not work
let df = ctx
.sql("SELECT async_example(async_example(1))")
.await
.unwrap();
if let Err(err) = df.collect().await {
eprintln!("{err}");
}
}
```
A backtrace for this example can be found below.
### Expected behavior
Both UDFs should simply be evaluated in succession. if it this not yet
supported a more specific error message may be useful.
### Additional context
The above code produces the following output: (with the backtrace feature in
this crate enabled)
[backtrace.txt](https://github.com/user-attachments/files/24888116/backtrace.txt)
I'm using datafusion 52.1.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]