gabotechs commented on code in PR #14413: URL: https://github.com/apache/datafusion/pull/14413#discussion_r2011672602
########## datafusion/functions-aggregate/src/array_agg.rs: ########## @@ -598,146 +656,370 @@ impl OrderSensitiveArrayAggAccumulator { #[cfg(test)] mod tests { use super::*; - - use std::collections::VecDeque; + use arrow::datatypes::{FieldRef, Schema}; + use datafusion_common::cast::as_generic_string_array; + use datafusion_common::internal_err; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use std::sync::Arc; - use arrow::array::Int64Array; - use arrow::compute::SortOptions; + #[test] + fn no_duplicates_no_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["d", "e", "f"])])?; + acc1 = merge(acc1, acc2)?; - use datafusion_common::utils::get_row_at_idx; - use datafusion_common::{Result, ScalarValue}; + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]); + + Ok(()) + } #[test] - fn test_merge_asc() -> Result<()> { - let lhs_arrays: Vec<ArrayRef> = vec![ - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), - Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), - ]; - let n_row = lhs_arrays[0].len(); - let lhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&lhs_arrays, idx)) - .collect::<Result<VecDeque<_>>>()?; - - let rhs_arrays: Vec<ArrayRef> = vec![ - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2])), - Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])), - ]; - let n_row = rhs_arrays[0].len(); - let rhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&rhs_arrays, idx)) - .collect::<Result<VecDeque<_>>>()?; - let sort_options = vec![ - SortOptions { - descending: false, - nulls_first: false, - }, - SortOptions { - descending: false, - nulls_first: false, - }, - ]; - - let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; - let lhs_vals = (0..lhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) - .collect::<Result<VecDeque<_>>>()?; - - let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 3, 4])) as ArrayRef; - let rhs_vals = (0..rhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) - .collect::<Result<VecDeque<_>>>()?; - let expected = - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef; - let expected_ts = vec![ - Arc::new(Int64Array::from(vec![0, 0, 0, 0, 1, 1, 1, 1, 2, 2])) as ArrayRef, - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 3, 3, 4, 4])) as ArrayRef, - ]; - - let (merged_vals, merged_ts) = merge_ordered_arrays( - &mut [lhs_vals, rhs_vals], - &mut [lhs_orderings, rhs_orderings], - &sort_options, - )?; - let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; - let merged_ts = (0..merged_ts[0].len()) - .map(|col_idx| { - ScalarValue::iter_to_array( - (0..merged_ts.len()) - .map(|row_idx| merged_ts[row_idx][col_idx].clone()), - ) - }) - .collect::<Result<Vec<_>>>()?; + fn no_duplicates_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["d", "e", "f"])])?; + acc1 = merge(acc1, acc2)?; + + let mut result = print_nulls(str_arr(acc1.evaluate()?)?); + result.sort(); - assert_eq!(&merged_vals, &expected); - assert_eq!(&merged_ts, &expected_ts); + assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]); Ok(()) } #[test] - fn test_merge_desc() -> Result<()> { - let lhs_arrays: Vec<ArrayRef> = vec![ - Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), - Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), - ]; - let n_row = lhs_arrays[0].len(); - let lhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&lhs_arrays, idx)) - .collect::<Result<VecDeque<_>>>()?; - - let rhs_arrays: Vec<ArrayRef> = vec![ - Arc::new(Int64Array::from(vec![2, 1, 1, 0, 0])), - Arc::new(Int64Array::from(vec![4, 3, 2, 1, 0])), - ]; - let n_row = rhs_arrays[0].len(); - let rhs_orderings = (0..n_row) - .map(|idx| get_row_at_idx(&rhs_arrays, idx)) - .collect::<Result<VecDeque<_>>>()?; - let sort_options = vec![ - SortOptions { - descending: true, - nulls_first: false, - }, - SortOptions { - descending: true, - nulls_first: false, - }, - ]; - - // Values (which will be merged) doesn't have to be ordered. - let lhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; - let lhs_vals = (0..lhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&lhs_vals_arr, idx)) - .collect::<Result<VecDeque<_>>>()?; - - let rhs_vals_arr = Arc::new(Int64Array::from(vec![0, 1, 2, 1, 2])) as ArrayRef; - let rhs_vals = (0..rhs_vals_arr.len()) - .map(|idx| ScalarValue::try_from_array(&rhs_vals_arr, idx)) - .collect::<Result<VecDeque<_>>>()?; - let expected = - Arc::new(Int64Array::from(vec![0, 0, 1, 1, 2, 2, 1, 1, 2, 2])) as ArrayRef; - let expected_ts = vec![ - Arc::new(Int64Array::from(vec![2, 2, 1, 1, 1, 1, 0, 0, 0, 0])) as ArrayRef, - Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef, - ]; - let (merged_vals, merged_ts) = merge_ordered_arrays( - &mut [lhs_vals, rhs_vals], - &mut [lhs_orderings, rhs_orderings], - &sort_options, - )?; - let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?; - let merged_ts = (0..merged_ts[0].len()) - .map(|col_idx| { - ScalarValue::iter_to_array( - (0..merged_ts.len()) - .map(|row_idx| merged_ts[row_idx][col_idx].clone()), - ) - }) - .collect::<Result<Vec<_>>>()?; + fn duplicates_no_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["a", "b", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "c", "a", "b", "c"]); + + Ok(()) + } + + #[test] + fn duplicates_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data(["a", "b", "c"])])?; + acc2.update_batch(&[data(["a", "b", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let mut result = print_nulls(str_arr(acc1.evaluate()?)?); + result.sort(); + + assert_eq!(result, vec!["a", "b", "c"]); + + Ok(()) + } + + #[test] + fn duplicates_on_second_batch_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data(["a", "c"])])?; + acc2.update_batch(&[data(["d", "a", "b", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let mut result = print_nulls(str_arr(acc1.evaluate()?)?); + result.sort(); + + assert_eq!(result, vec!["a", "b", "c", "d"]); - assert_eq!(&merged_vals, &expected); - assert_eq!(&merged_ts, &expected_ts); Ok(()) } + + #[test] + fn no_duplicates_distinct_sort_asc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, false)) + .build_two()?; + + acc1.update_batch(&[data(["e", "b", "d"])])?; + acc2.update_batch(&[data(["f", "a", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "c", "d", "e", "f"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_desc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, false)) + .build_two()?; + + acc1.update_batch(&[data(["e", "b", "d"])])?; + acc2.update_batch(&[data(["f", "a", "c"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["f", "e", "d", "c", "b", "a"]); + + Ok(()) + } + + #[test] + fn duplicates_distinct_sort_asc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, false)) + .build_two()?; + + acc1.update_batch(&[data(["a", "c", "b"])])?; + acc2.update_batch(&[data(["b", "c", "a"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "c"]); + + Ok(()) + } + + #[test] + fn duplicates_distinct_sort_desc() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, false)) + .build_two()?; + + acc1.update_batch(&[data(["a", "c", "b"])])?; + acc2.update_batch(&[data(["b", "c", "a"])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["c", "b", "a"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_asc_nulls_first() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, true)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["NULL", "a", "b", "e", "f"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_asc_nulls_last() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(false, false)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["a", "b", "e", "f", "NULL"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_desc_nulls_first() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, true)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["NULL", "f", "e", "b", "a"]); + + Ok(()) + } + + #[test] + fn no_duplicates_distinct_sort_desc_nulls_last() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .order_by_col("col", SortOptions::new(true, false)) + .build_two()?; + + acc1.update_batch(&[data([Some("e"), Some("b"), None])])?; + acc2.update_batch(&[data([Some("f"), Some("a"), None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + + assert_eq!(result, vec!["f", "e", "b", "a", "NULL"]); + + Ok(()) + } + + #[test] + fn all_nulls_on_first_batch_with_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?; + acc2.update_batch(&[data([Some("a"), None, None, None])])?; + acc1 = merge(acc1, acc2)?; + + let mut result = print_nulls(str_arr(acc1.evaluate()?)?); + result.sort(); + assert_eq!(result, vec!["NULL", "a"]); + Ok(()) + } + + #[test] + fn all_nulls_on_both_batches_with_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[data::<Option<&str>, 3>([None, None, None])])?; + acc2.update_batch(&[data::<Option<&str>, 4>([None, None, None, None])])?; + acc1 = merge(acc1, acc2)?; + + let result = print_nulls(str_arr(acc1.evaluate()?)?); + assert_eq!(result, vec!["NULL"]); + Ok(()) + } + + struct ArrayAggAccumulatorBuilder { + data_type: DataType, + distinct: bool, + ordering: LexOrdering, + schema: Schema, + } + + impl ArrayAggAccumulatorBuilder { + fn string() -> Self { + Self::new(DataType::Utf8) + } + + fn new(data_type: DataType) -> Self { + Self { + data_type: data_type.clone(), + distinct: Default::default(), + ordering: Default::default(), + schema: Schema { + fields: Fields::from(vec![Field::new( + "col", + DataType::List(FieldRef::new(Field::new( + "item", data_type, true, + ))), + true, + )]), + metadata: Default::default(), + }, + } + } + + fn distinct(mut self) -> Self { + self.distinct = true; + self + } + + fn order_by_col(mut self, col: &str, sort_options: SortOptions) -> Self { + self.ordering.extend([PhysicalSortExpr::new( + Arc::new( + Column::new_with_schema(col, &self.schema) + .expect("column not available in schema"), + ), + sort_options, + )]); + self + } + + fn build(&self) -> Result<Box<dyn Accumulator>> { + ArrayAgg::default().accumulator(AccumulatorArgs { + return_type: &self.data_type, + schema: &self.schema, + ignore_nulls: false, + ordering_req: &self.ordering, + is_reversed: false, + name: "", + is_distinct: self.distinct, + exprs: &[Arc::new(Column::new("col", 0))], + }) + } + + fn build_two(&self) -> Result<(Box<dyn Accumulator>, Box<dyn Accumulator>)> { + Ok((self.build()?, self.build()?)) + } + } + + fn str_arr(value: ScalarValue) -> Result<Vec<Option<String>>> { Review Comment: I tried the suggestion, and run in the the following blockers: - These tests assert over a `ScalarValue` rather than a `RecordBatch`, so intermediate wrappings with arbitrary field names for promoting a `ScalarValue` to a `RecordBatch` are necessary (not a big deal). - The `batches_to_sort_string` will order lines in the output, but will not order array scalar values inside single cells, which is required for this tests. - The more verbose assertions are suitable to be managed by insta, but will make it difficult to follow a "write the test first and the implementation later" approach, which is the approach used for writing these tests. Do you think insta is a good tool for this kind of tests? I imagine that snapshot testing is suitable for tests that are expected to be updated often as the implementation changes, but one of the main points of these tests is that they should never change even if the implementation changes. If you still think insta is a good tool, I can build some extra tooling for alleviating the friction points mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org