gabotechs commented on code in PR #14413:
URL: https://github.com/apache/datafusion/pull/14413#discussion_r2011460176


##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -598,146 +656,370 @@ impl OrderSensitiveArrayAggAccumulator {
 #[cfg(test)]
 mod tests {
     use super::*;
-
-    use std::collections::VecDeque;
+    use arrow::datatypes::{FieldRef, Schema};
+    use datafusion_common::cast::as_generic_string_array;
+    use datafusion_common::internal_err;
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
     use std::sync::Arc;
 
-    use arrow::array::Int64Array;
-    use arrow::compute::SortOptions;
+    #[test]
+    fn no_duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
 
-    use datafusion_common::utils::get_row_at_idx;
-    use datafusion_common::{Result, ScalarValue};
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
 
     #[test]
-    fn test_merge_asc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-        ];
-
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef,
-        ];
-
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn no_duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
 
         Ok(())
     }
 
     #[test]
-    fn test_merge_desc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-        ];
-
-        // Values (which will be merged) doesn't have to be ordered.
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as 
ArrayRef,
-        ];
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_on_second_batch_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c"])])?;
+        acc2.update_batch(&[data(["d", "a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c", "d"]);
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
         Ok(())
     }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_first_batch_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data([Some("a"), None, None, None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+        assert_eq!(result, vec!["NULL", "a"]);
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_both_batches_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data::<Option<&str>, 4>([None, None, None, 
None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+        assert_eq!(result, vec!["NULL"]);
+        Ok(())
+    }
+
+    struct ArrayAggAccumulatorBuilder {

Review Comment:
   🤔 I did look at the `AggregateExprBuilder`, but did not find it suitable for 
what these tests try to accomplish. These are the fundamental differences:
   - This `ArrayAggAccumulatorBuilder` is a builder for an accumulator, rather 
than an expression.
   - This `ArrayAggAccumulatorBuilder` is very tailored to these specific 
tests, automatically providing dummy arguments like a column named "col", or a 
schema with just one list field
   
   I think there's an opportunity to have a generic `AccumulatorBuilder` 
though, that could be used for unit testing these aggregation functions from a 
more internal standpoint, increasing coverage in the `aggregate-functions` 
crate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to