alamb commented on code in PR #14413:
URL: https://github.com/apache/datafusion/pull/14413#discussion_r2009216043


##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -598,146 +656,370 @@ impl OrderSensitiveArrayAggAccumulator {
 #[cfg(test)]
 mod tests {
     use super::*;
-
-    use std::collections::VecDeque;
+    use arrow::datatypes::{FieldRef, Schema};
+    use datafusion_common::cast::as_generic_string_array;
+    use datafusion_common::internal_err;
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
     use std::sync::Arc;
 
-    use arrow::array::Int64Array;
-    use arrow::compute::SortOptions;
+    #[test]
+    fn no_duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
 
-    use datafusion_common::utils::get_row_at_idx;
-    use datafusion_common::{Result, ScalarValue};
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
 
     #[test]
-    fn test_merge_asc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-        ];
-
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef,
-        ];
-
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn no_duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
 
         Ok(())
     }
 
     #[test]
-    fn test_merge_desc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-        ];
-
-        // Values (which will be merged) doesn't have to be ordered.
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as 
ArrayRef,
-        ];
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_on_second_batch_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c"])])?;
+        acc2.update_batch(&[data(["d", "a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c", "d"]);
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
         Ok(())
     }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_first_batch_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data([Some("a"), None, None, None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+        assert_eq!(result, vec!["NULL", "a"]);
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_both_batches_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data::<Option<&str>, 4>([None, None, None, 
None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+        assert_eq!(result, vec!["NULL"]);
+        Ok(())
+    }
+
+    struct ArrayAggAccumulatorBuilder {

Review Comment:
   Filed a PR to track
   - https://github.com/apache/datafusion/issues/15369



##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -598,146 +656,370 @@ impl OrderSensitiveArrayAggAccumulator {
 #[cfg(test)]
 mod tests {
     use super::*;
-
-    use std::collections::VecDeque;
+    use arrow::datatypes::{FieldRef, Schema};
+    use datafusion_common::cast::as_generic_string_array;
+    use datafusion_common::internal_err;
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
     use std::sync::Arc;
 
-    use arrow::array::Int64Array;
-    use arrow::compute::SortOptions;
+    #[test]
+    fn no_duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
 
-    use datafusion_common::utils::get_row_at_idx;
-    use datafusion_common::{Result, ScalarValue};
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
 
     #[test]
-    fn test_merge_asc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-        ];
-
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef,
-        ];
-
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn no_duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
 
         Ok(())
     }
 
     #[test]
-    fn test_merge_desc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-        ];
-
-        // Values (which will be merged) doesn't have to be ordered.
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as 
ArrayRef,
-        ];
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_on_second_batch_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c"])])?;
+        acc2.update_batch(&[data(["d", "a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c", "d"]);
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
         Ok(())
     }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_first_batch_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data([Some("a"), None, None, None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+        assert_eq!(result, vec!["NULL", "a"]);
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_both_batches_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data::<Option<&str>, 4>([None, None, None, 
None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+        assert_eq!(result, vec!["NULL"]);
+        Ok(())
+    }
+
+    struct ArrayAggAccumulatorBuilder {

Review Comment:
   Filed an issue to track
   - https://github.com/apache/datafusion/issues/15369



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to