alamb commented on code in PR #15504:
URL: https://github.com/apache/datafusion/pull/15504#discussion_r2023080660
##########
datafusion/physical-expr/src/aggregate.rs:
##########
@@ -97,6 +97,167 @@ impl AggregateExprBuilder {
/// Constructs an `AggregateFunctionExpr` from the builder
///
/// Note that an [`Self::alias`] must be provided before calling this
method.
+ ///
+ /// # Example: Create an `AggregateUDF`
+ ///
+ /// In the following example, `AggregateFunctionExpr` will be built using
`AggregateExprBuilder`
Review Comment:
If you add `[]` to the names rustdoc will make them links. For example
```suggestion
/// In the following example, [`AggregateFunctionExpr`] will be built
using [`AggregateExprBuilder`]
```
You may have to provide the full path with a separate definition like;
```rust
/// Link to [MyStruct]
///
/// [MyStruct]: core::my_struct::MyStruct
```
##########
datafusion/physical-expr/src/aggregate.rs:
##########
@@ -97,6 +97,167 @@ impl AggregateExprBuilder {
/// Constructs an `AggregateFunctionExpr` from the builder
///
/// Note that an [`Self::alias`] must be provided before calling this
method.
+ ///
+ /// # Example: Create an `AggregateUDF`
+ ///
+ /// In the following example, `AggregateFunctionExpr` will be built using
`AggregateExprBuilder`
+ /// which provides a build function.
+ ///
+ /// First we will create an `Accumulator` which will be used to further
implement `AggregateUDFImpl`.
+ /// After implementing `AggregateUDFImpl`, it could be used to pass in as
a parameter to create an `AggregateExprBuilder`.
+ /// `AggregateExprBuilder` could the be used to generate
`AggregateFunctionExpr` after chaining
+ /// queries on top of each other.
+ ///
+ /// ```
Review Comment:
I think this is not the right place to demonstrate creating a user defined
aggregate so I suggest removing this particular example (notes below how to
fold it into the other example)
##########
datafusion/physical-expr/src/aggregate.rs:
##########
@@ -97,6 +97,167 @@ impl AggregateExprBuilder {
/// Constructs an `AggregateFunctionExpr` from the builder
///
/// Note that an [`Self::alias`] must be provided before calling this
method.
+ ///
+ /// # Example: Create an `AggregateUDF`
+ ///
+ /// In the following example, `AggregateFunctionExpr` will be built using
`AggregateExprBuilder`
+ /// which provides a build function.
+ ///
+ /// First we will create an `Accumulator` which will be used to further
implement `AggregateUDFImpl`.
+ /// After implementing `AggregateUDFImpl`, it could be used to pass in as
a parameter to create an `AggregateExprBuilder`.
+ /// `AggregateExprBuilder` could the be used to generate
`AggregateFunctionExpr` after chaining
+ /// queries on top of each other.
+ ///
+ /// ```
+ /// use std::any::Any;
+ /// use std::sync::OnceLock;
+ /// use std::sync::Arc;
+ /// use arrow::datatypes::DataType;
+ /// use datafusion_common::{DataFusionError, plan_err, Result,
ScalarValue};
+ /// use datafusion_expr::{col, ColumnarValue, Signature, Volatility, Expr,
Documentation};
+ /// use datafusion_expr::{AggregateUDFImpl, AggregateUDF, Accumulator,
function::{AccumulatorArgs, StateFieldsArgs}};
+ /// use datafusion_expr::window_doc_sections::DOC_SECTION_AGGREGATE;
+ /// use arrow::datatypes::Schema;
+ /// use arrow::datatypes::Field;
+ /// use arrow::array::Array;
+ ///
+ /// #[derive(Debug)]
+ /// struct FirstValueAccumulator {
+ /// value: Option<ScalarValue>,
+ /// data_type: DataType,
+ /// }
+ ///
+ /// impl Accumulator for FirstValueAccumulator {
+ /// fn update_batch(&mut self, values: &[Arc<dyn Array>]) ->
Result<()> {
+ /// if self.value.is_none() && !values.is_empty() {
+ /// let first_array = &values[0];
+ /// for i in 0..first_array.len() {
+ /// if !first_array.is_null(i) {
+ /// self.value =
Some(ScalarValue::try_from_array(first_array, i)?);
+ /// break;
+ /// }
+ /// }
+ /// }
+ /// Ok(())
+ /// }
+ ///
+ /// fn merge_batch(&mut self, states: &[Arc<dyn Array>]) -> Result<()>
{
+ /// if self.value.is_none() && !states.is_empty() {
+ /// let first_array = &states[0];
+ /// for i in 0..first_array.len() {
+ /// if !first_array.is_null(i) {
+ /// self.value =
Some(ScalarValue::try_from_array(first_array, i)?);
+ /// break;
+ /// }
+ /// }
+ /// }
+ /// Ok(())
+ /// }
+ ///
+ /// fn evaluate(&mut self) -> Result<ScalarValue> {
+ /// match &self.value {
+ /// Some(value) => Ok(value.clone()),
+ /// None => ScalarValue::try_from(&self.data_type),
+ /// }
+ /// }
+ ///
+ /// fn size(&self) -> usize {
+ /// std::mem::size_of_val(self)
+ /// }
+ ///
+ /// fn state(&mut self) -> Result<Vec<ScalarValue>> {
+ /// match &self.value {
+ /// Some(value) => Ok(vec![value.clone()]),
+ /// None => ScalarValue::try_from(&self.data_type).map(|v|
vec![v]),
+ /// }
+ /// }
+ /// }
+ ///
+ /// #[derive(Debug, Clone)]
+ /// struct FirstValueUdf {
+ /// signature: Signature,
+ /// }
+ ///
+ /// impl FirstValueUdf {
+ /// fn new() -> Self {
+ /// Self {
+ /// signature: Signature::any(1, Volatility::Immutable),
+ /// }
+ /// }
+ /// }
+ ///
+ /// static DOCUMENTATION: OnceLock<Documentation> = OnceLock::new();
+ ///
+ /// fn get_doc() -> &'static Documentation {
+ /// DOCUMENTATION.get_or_init(|| {
+ /// Documentation::builder(
+ /// DOC_SECTION_AGGREGATE,
+ /// "returns the first value in a set of values",
+ /// "first_value(column)"
+ /// )
+ /// .with_argument("arg1", "The column to get the first value
from")
+ /// .build()
+ /// })
+ /// }
+ ///
+ /// impl AggregateUDFImpl for FirstValueUdf {
+ /// fn as_any(&self) -> &dyn Any { self }
+ /// fn name(&self) -> &str { "first_value" }
+ /// fn signature(&self) -> &Signature { &self.signature }
+ /// fn return_type(&self, args: &[DataType]) -> Result<DataType> {
+ /// Ok(args[0].clone())
+ /// }
+ ///
+ /// fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn
Accumulator>> {
+ /// let input_type =
acc_args.schema.field(0).data_type().clone();
+ ///
+ /// Ok(Box::new(FirstValueAccumulator {
+ /// value: None,
+ /// data_type: input_type,
+ /// }))
+ /// }
+ ///
+ /// fn state_fields(&self, args: StateFieldsArgs) ->
Result<Vec<Field>> {
+ /// Ok(vec![
+ /// Field::new("value", args.return_type.clone(), true)
+ /// ])
+ /// }
+ ///
+ /// fn documentation(&self) -> Option<&Documentation> {
+ /// Some(get_doc())
+ /// }
+ /// }
+ ///
+ /// let first_value = AggregateUDF::from(FirstValueUdf::new());
+ /// let expr = first_value.call(vec![col("a")]);
+ /// ```
+ ///
+ /// # Example: Creating an ordered aggregate expression
+ ///
+ /// This example shows how to use `AggregateExprBuilder` to create a
physical aggregate
+ /// expression with ordering:
+ ///
Review Comment:
Thanks @Shreyaskr1409 -- the dependencies are a bit tricky here
What I suggest doing is making a "stub" implementation of `first_vaule` for
this documentation example. Here is a similar example:
https://github.com/apache/datafusion/blob/d7205bb696a0574b5a376254b5e453296a6923d9/datafusion/physical-expr-common/src/physical_expr.rs#L404-L425
Note that lines that start with `#` are not displayed in the documentation
but are included
So in this case, the example might look something like
```rust
/// ```
/// # // define stub with boiler plate needed to create a `PhysicalExpr`
for the example
/// # // without depending on datafusion-aggregate crate
/// # struct FirstValueStub {}
/// # impl AggregateUDFImpl for FirstValueUdf {
/// # fn as_any(&self) -> &dyn Any { self }
/// # fn name(&self) -> &str { "first_value" }
/// # fn signature(&self) -> &Signature { unimplemented!() }
/// ...
/// # }
/// # // this has the same signature as first_value in datafusion core
crate
/// # fn first_value() -> Arc<AggregateUDF>
{Arc::new(AggregateUDF::from(FirstValueStub{}))}
/// let args = vec![Arc::new(Column::new("a", 0)) as Arc<dyn
PhysicalExpr>];
/// let order_by = vec![Arc::new(Column::new("x", 1)) as Arc<dyn
PhysicalExpr>];
/// // Creates a physical expression equivalent to SQL:
/// // `first_value(a ORDER BY x) IGNORE NULLS AS first_a_by_x`
/// let aggregate_expr = AggregateExprBuilder::new(first_value(), args),
/// .order_by(order_by)
/// .alias("first_a_by_x")
/// .ignore_nulls()
/// .build()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]