alamb commented on code in PR #14413:
URL: https://github.com/apache/datafusion/pull/14413#discussion_r2009214238


##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -234,6 +253,16 @@ select column1, nth_value(column3, 2 order by column2, 
column4 desc) from array_
 b [4, 5, 6]
 w [9, 5, 2]
 
+query ?

Review Comment:
   Can you please add a test for 
   
   ```
   -- default ordering (and show that desc is respected)
   select array_agg(DISTINCT column2 order by column2) from 
array_agg_order_list_table;
   ```
   
   Also a query with a `GROUP BY` as it goes through a different code path
   
   Also the negative case (no order by but with distinct) -- to show the error 
message is wired up correctly
   



##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -131,7 +133,32 @@ impl AggregateUDFImpl for ArrayAgg {
         let data_type = acc_args.exprs[0].data_type(acc_args.schema)?;
 
         if acc_args.is_distinct {
-            return 
Ok(Box::new(DistinctArrayAggAccumulator::try_new(&data_type)?));
+            // Limitation similar to Postgres. The aggregation function can 
only mix

Review Comment:
   Maybe we could add this description (about ORDER / DISTINCT in general) to 
https://datafusion.apache.org/user-guide/sql/aggregate_functions.html#aggregate-functions
   
   At the very least we shoudl add it to the documentation (in the `user_doc` 
macro on this UDF definition) so it appears in 
https://datafusion.apache.org/user-guide/sql/aggregate_functions.html#array-agg
   



##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -598,146 +656,370 @@ impl OrderSensitiveArrayAggAccumulator {
 #[cfg(test)]
 mod tests {
     use super::*;
-
-    use std::collections::VecDeque;
+    use arrow::datatypes::{FieldRef, Schema};
+    use datafusion_common::cast::as_generic_string_array;
+    use datafusion_common::internal_err;
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
     use std::sync::Arc;
 
-    use arrow::array::Int64Array;
-    use arrow::compute::SortOptions;
+    #[test]
+    fn no_duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
 
-    use datafusion_common::utils::get_row_at_idx;
-    use datafusion_common::{Result, ScalarValue};
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
 
     #[test]
-    fn test_merge_asc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-        ];
-
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef,
-        ];
-
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn no_duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
 
         Ok(())
     }
 
     #[test]
-    fn test_merge_desc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-        ];
-
-        // Values (which will be merged) doesn't have to be ordered.
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as 
ArrayRef,
-        ];
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_on_second_batch_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c"])])?;
+        acc2.update_batch(&[data(["d", "a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c", "d"]);
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
         Ok(())
     }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_first_batch_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data([Some("a"), None, None, None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+        assert_eq!(result, vec!["NULL", "a"]);
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_both_batches_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data::<Option<&str>, 4>([None, None, None, 
None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+        assert_eq!(result, vec!["NULL"]);
+        Ok(())
+    }
+
+    struct ArrayAggAccumulatorBuilder {

Review Comment:
   This looks very similar to 
https://docs.rs/datafusion/latest/datafusion/physical_expr/aggregate/struct.AggregateExprBuilder.html#method.distinct
   
   Though it seems like that structure has no good example, perhaps you could 
make a PR to add an doc example of how to use it



##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -598,146 +656,370 @@ impl OrderSensitiveArrayAggAccumulator {
 #[cfg(test)]
 mod tests {
     use super::*;
-
-    use std::collections::VecDeque;
+    use arrow::datatypes::{FieldRef, Schema};
+    use datafusion_common::cast::as_generic_string_array;
+    use datafusion_common::internal_err;
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
     use std::sync::Arc;
 
-    use arrow::array::Int64Array;
-    use arrow::compute::SortOptions;
+    #[test]
+    fn no_duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
 
-    use datafusion_common::utils::get_row_at_idx;
-    use datafusion_common::{Result, ScalarValue};
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
 
     #[test]
-    fn test_merge_asc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-        ];
-
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef,
-        ];
-
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn no_duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
 
         Ok(())
     }
 
     #[test]
-    fn test_merge_desc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-        ];
-
-        // Values (which will be merged) doesn't have to be ordered.
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as 
ArrayRef,
-        ];
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_on_second_batch_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c"])])?;
+        acc2.update_batch(&[data(["d", "a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c", "d"]);
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
         Ok(())
     }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_first_batch_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data([Some("a"), None, None, None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+        assert_eq!(result, vec!["NULL", "a"]);
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_both_batches_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data::<Option<&str>, 4>([None, None, None, 
None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+        assert_eq!(result, vec!["NULL"]);
+        Ok(())
+    }
+
+    struct ArrayAggAccumulatorBuilder {
+        data_type: DataType,
+        distinct: bool,
+        ordering: LexOrdering,
+        schema: Schema,
+    }
+
+    impl ArrayAggAccumulatorBuilder {
+        fn string() -> Self {
+            Self::new(DataType::Utf8)
+        }
+
+        fn new(data_type: DataType) -> Self {
+            Self {
+                data_type: data_type.clone(),
+                distinct: Default::default(),
+                ordering: Default::default(),
+                schema: Schema {
+                    fields: Fields::from(vec![Field::new(
+                        "col",
+                        DataType::List(FieldRef::new(Field::new(
+                            "item", data_type, true,
+                        ))),
+                        true,
+                    )]),
+                    metadata: Default::default(),
+                },
+            }
+        }
+
+        fn distinct(mut self) -> Self {
+            self.distinct = true;
+            self
+        }
+
+        fn order_by_col(mut self, col: &str, sort_options: SortOptions) -> 
Self {
+            self.ordering.extend([PhysicalSortExpr::new(
+                Arc::new(
+                    Column::new_with_schema(col, &self.schema)
+                        .expect("column not available in schema"),
+                ),
+                sort_options,
+            )]);
+            self
+        }
+
+        fn build(&self) -> Result<Box<dyn Accumulator>> {
+            ArrayAgg::default().accumulator(AccumulatorArgs {
+                return_type: &self.data_type,
+                schema: &self.schema,
+                ignore_nulls: false,
+                ordering_req: &self.ordering,
+                is_reversed: false,
+                name: "",
+                is_distinct: self.distinct,
+                exprs: &[Arc::new(Column::new("col", 0))],
+            })
+        }
+
+        fn build_two(&self) -> Result<(Box<dyn Accumulator>, Box<dyn 
Accumulator>)> {
+            Ok((self.build()?, self.build()?))
+        }
+    }
+
+    fn str_arr(value: ScalarValue) -> Result<Vec<Option<String>>> {

Review Comment:
   I think we could make these tests easier to update using the pre-existing 
formatter: 
https://docs.rs/arrow/latest/arrow/util/pretty/fn.pretty_format_batches.html
   
   Recently (after this PR was made) we have  been migrating to `insta` which 
might be helpful for this kind of test -- see for example 
https://github.com/apache/datafusion/pull/15364
   
   You could create a RecordBatch and then do something like
   
   ```rust
           assert_snapshot!(batches_to_sort_string(&batches), @r#"
               +----+----+----+
               | a1 | b1 | c1 |
               +----+----+----+
               | 5  | 5  | 50 |
               +----+----+----+
               "#);
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to