gabotechs commented on code in PR #14413:
URL: https://github.com/apache/datafusion/pull/14413#discussion_r2011672602


##########
datafusion/functions-aggregate/src/array_agg.rs:
##########
@@ -598,146 +656,370 @@ impl OrderSensitiveArrayAggAccumulator {
 #[cfg(test)]
 mod tests {
     use super::*;
-
-    use std::collections::VecDeque;
+    use arrow::datatypes::{FieldRef, Schema};
+    use datafusion_common::cast::as_generic_string_array;
+    use datafusion_common::internal_err;
+    use datafusion_physical_expr::expressions::Column;
+    use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
     use std::sync::Arc;
 
-    use arrow::array::Int64Array;
-    use arrow::compute::SortOptions;
+    #[test]
+    fn no_duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
 
-    use datafusion_common::utils::get_row_at_idx;
-    use datafusion_common::{Result, ScalarValue};
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
 
     #[test]
-    fn test_merge_asc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])),
-            Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-        ];
-
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as 
ArrayRef,
-        ];
-
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn no_duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["d", "e", "f"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
 
         Ok(())
     }
 
     #[test]
-    fn test_merge_desc() -> Result<()> {
-        let lhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = lhs_arrays[0].len();
-        let lhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&lhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_arrays: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])),
-            Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])),
-        ];
-        let n_row = rhs_arrays[0].len();
-        let rhs_orderings = (0..n_row)
-            .map(|idx| get_row_at_idx(&rhs_arrays, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let sort_options = vec![
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-            SortOptions {
-                descending: true,
-                nulls_first: false,
-            },
-        ];
-
-        // Values (which will be merged) doesn't have to be ordered.
-        let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let lhs_vals = (0..lhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-
-        let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as 
ArrayRef;
-        let rhs_vals = (0..rhs_vals_arr.len())
-            .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx))
-            .collect::<Result<VecDeque<_>>>()?;
-        let expected =
-            Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as 
ArrayRef;
-        let expected_ts = vec![
-            Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as 
ArrayRef,
-            Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as 
ArrayRef,
-        ];
-        let (merged_vals, merged_ts) = merge_ordered_arrays(
-            &mut [lhs_vals, rhs_vals],
-            &mut [lhs_orderings, rhs_orderings],
-            &sort_options,
-        )?;
-        let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
-        let merged_ts = (0..merged_ts[0].len())
-            .map(|col_idx| {
-                ScalarValue::iter_to_array(
-                    (0..merged_ts.len())
-                        .map(|row_idx| merged_ts[row_idx][col_idx].clone()),
-                )
-            })
-            .collect::<Result<Vec<_>>>()?;
+    fn duplicates_no_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "b", "c"])])?;
+        acc2.update_batch(&[data(["a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_on_second_batch_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c"])])?;
+        acc2.update_batch(&[data(["d", "a", "b", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+
+        assert_eq!(result, vec!["a", "b", "c", "d"]);
 
-        assert_eq!(&merged_vals, &expected);
-        assert_eq!(&merged_ts, &expected_ts);
         Ok(())
     }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["e", "b", "d"])])?;
+        acc2.update_batch(&[data(["f", "a", "c"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_asc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "c"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn duplicates_distinct_sort_desc() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["c", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, true))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .order_by_col("col", SortOptions::new(true, false))
+            .build_two()?;
+
+        acc1.update_batch(&[data([Some("e"), Some("b"), None])])?;
+        acc2.update_batch(&[data([Some("f"), Some("a"), None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+
+        assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_first_batch_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data([Some("a"), None, None, None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let mut result = print_nulls(str_arr(acc1.evaluate()?)?);
+        result.sort();
+        assert_eq!(result, vec!["NULL", "a"]);
+        Ok(())
+    }
+
+    #[test]
+    fn all_nulls_on_both_batches_with_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?;
+        acc2.update_batch(&[data::<Option<&str>, 4>([None, None, None, 
None])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        let result = print_nulls(str_arr(acc1.evaluate()?)?);
+        assert_eq!(result, vec!["NULL"]);
+        Ok(())
+    }
+
+    struct ArrayAggAccumulatorBuilder {
+        data_type: DataType,
+        distinct: bool,
+        ordering: LexOrdering,
+        schema: Schema,
+    }
+
+    impl ArrayAggAccumulatorBuilder {
+        fn string() -> Self {
+            Self::new(DataType::Utf8)
+        }
+
+        fn new(data_type: DataType) -> Self {
+            Self {
+                data_type: data_type.clone(),
+                distinct: Default::default(),
+                ordering: Default::default(),
+                schema: Schema {
+                    fields: Fields::from(vec![Field::new(
+                        "col",
+                        DataType::List(FieldRef::new(Field::new(
+                            "item", data_type, true,
+                        ))),
+                        true,
+                    )]),
+                    metadata: Default::default(),
+                },
+            }
+        }
+
+        fn distinct(mut self) -> Self {
+            self.distinct = true;
+            self
+        }
+
+        fn order_by_col(mut self, col: &str, sort_options: SortOptions) -> 
Self {
+            self.ordering.extend([PhysicalSortExpr::new(
+                Arc::new(
+                    Column::new_with_schema(col, &self.schema)
+                        .expect("column not available in schema"),
+                ),
+                sort_options,
+            )]);
+            self
+        }
+
+        fn build(&self) -> Result<Box<dyn Accumulator>> {
+            ArrayAgg::default().accumulator(AccumulatorArgs {
+                return_type: &self.data_type,
+                schema: &self.schema,
+                ignore_nulls: false,
+                ordering_req: &self.ordering,
+                is_reversed: false,
+                name: "",
+                is_distinct: self.distinct,
+                exprs: &[Arc::new(Column::new("col", 0))],
+            })
+        }
+
+        fn build_two(&self) -> Result<(Box<dyn Accumulator>, Box<dyn 
Accumulator>)> {
+            Ok((self.build()?, self.build()?))
+        }
+    }
+
+    fn str_arr(value: ScalarValue) -> Result<Vec<Option<String>>> {

Review Comment:
   I tried the suggestion, and run in the the following blockers:
   - These tests assert over a `ScalarValue` rather than a `RecordBatch`, so 
intermediate wrappings with arbitrary field names for promoting a `ScalarValue` 
to a `RecordBatch` are necessary (not a big deal).
   - The `batches_to_sort_string` will order lines in the output, but will not 
order array scalar values inside single cells, which is required for this tests.
   - The more verbose assertions are suitable to be managed by insta, but will 
make it difficult to follow a "write the test first and the implementation 
later" approach, which is the approach used for writing these tests.
   
   Do you think insta is a good tool for this kind of tests? I imagine that 
snapshot testing is suitable for tests that are expected to be updated often as 
the implementation changes, but one of the main points of these tests is that 
they should never change even if the implementation changes.
   
   If you still think insta is a good tool, I can build some extra tooling for 
alleviating the friction points mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to