kosiew commented on code in PR #16466: URL: https://github.com/apache/datafusion/pull/16466#discussion_r2199572126
########## datafusion/core/tests/sql/aggregates.rs: ########## @@ -441,3 +776,1110 @@ async fn count_distinct_dictionary_mixed_values() -> Result<()> { Ok(()) } + +/// Comprehensive test for aggregate functions with null values and dictionary columns +/// Tests COUNT, SUM, MIN, and MEDIAN null handling in single comprehensive test +#[tokio::test] +async fn test_aggregates_null_handling_comprehensive() -> Result<()> { + let test_data_basic = TestData::new(); + let test_data_extended = TestData::new_extended(); + let test_data_min_max = TestData::new_for_min_max(); + let test_data_median = TestData::new_for_median(); + + // Test COUNT null exclusion with basic data + let sql_count = "SELECT dict_null_keys, COUNT(value) as cnt FROM t GROUP BY dict_null_keys ORDER BY dict_null_keys NULLS FIRST"; + let results_count = run_snapshot_test(&test_data_basic, sql_count).await?; + + assert_snapshot!( + batches_to_string(&results_count), + @r###" + +----------------+-----+ + | dict_null_keys | cnt | + +----------------+-----+ + | | 0 | + | group_a | 2 | + | group_b | 1 | + +----------------+-----+ + "### + ); + + // Test SUM null handling with extended data + let sql_sum = "SELECT dict_null_vals, SUM(value) as total FROM t GROUP BY dict_null_vals ORDER BY dict_null_vals NULLS FIRST"; + let results_sum = run_snapshot_test(&test_data_extended, sql_sum).await?; + + assert_snapshot!( + batches_to_string(&results_sum), + @r" + +----------------+-------+ + | dict_null_vals | total | + +----------------+-------+ + | | 4 | + | group_x | 4 | + | group_y | 2 | + | group_z | 5 | + +----------------+-------+ + " + ); + + // Test MIN null handling with min/max data + let sql_min = "SELECT dict_null_keys, MIN(value) as minimum FROM t GROUP BY dict_null_keys ORDER BY dict_null_keys NULLS FIRST"; + let results_min = run_snapshot_test(&test_data_min_max, sql_min).await?; + + assert_snapshot!( + batches_to_string(&results_min), + @r###" + +----------------+---------+ + | dict_null_keys | minimum | + +----------------+---------+ + | | 2 | + | group_a | 3 | + | group_b | 1 | + | group_c | 7 | + +----------------+---------+ + "### + ); + + // Test MEDIAN null handling with median data + let sql_median = "SELECT dict_null_vals, MEDIAN(value) as median_value FROM t GROUP BY dict_null_vals ORDER BY dict_null_vals NULLS FIRST"; + let results_median = run_snapshot_test(&test_data_median, sql_median).await?; + + assert_snapshot!( + batches_to_string(&results_median), + @r" + +----------------+--------------+ + | dict_null_vals | median_value | + +----------------+--------------+ + | | 3 | + | group_x | 1 | + | group_y | 5 | + | group_z | 7 | + +----------------+--------------+ + "); + + Ok(()) +} + +/// Test FIRST_VAL and LAST_VAL with null values and GROUP BY dict with null keys and null values - may return null if first/last value is null (single and multiple partitions) +#[tokio::test] +async fn test_first_last_val_null_handling() -> Result<()> { + let test_data = TestData::new_for_first_last(); + + // Test FIRST_VALUE and LAST_VALUE with window functions over groups + let sql = "SELECT dict_null_keys, value, FIRST_VALUE(value) OVER (PARTITION BY dict_null_keys ORDER BY value NULLS FIRST) as first_val, LAST_VALUE(value) OVER (PARTITION BY dict_null_keys ORDER BY value NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as last_val FROM t ORDER BY dict_null_keys NULLS FIRST, value NULLS FIRST"; + + let results_single = run_snapshot_test(&test_data, sql).await?; + + assert_snapshot!(batches_to_string(&results_single), @r" + +----------------+-------+-----------+----------+ + | dict_null_keys | value | first_val | last_val | + +----------------+-------+-----------+----------+ + | | 1 | 1 | 3 | + | | 3 | 1 | 3 | + | group_a | | | | + | group_a | | | | + | group_b | 2 | 2 | 2 | + +----------------+-------+-----------+----------+ + "); + + Ok(()) +} + +/// Test FIRST_VALUE and LAST_VALUE with ORDER BY - comprehensive null handling +#[tokio::test] +async fn test_first_last_value_order_by_null_handling() -> Result<()> { + let ctx = SessionContext::new(); + + // Create test data with nulls mixed in + let dict_keys = create_test_dict( + &[Some("group_a"), Some("group_b"), Some("group_c")], + &[Some(0), Some(1), Some(2), Some(0), Some(1)], + ); + + let values = Int32Array::from(vec![None, Some(10), Some(20), Some(5), None]); + + let schema = Arc::new(Schema::new(vec![ + Field::new("dict_group", string_dict_type(), true), + Field::new("value", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(dict_keys), Arc::new(values)], + )?; + + let table = MemTable::try_new(schema, vec![vec![batch]])?; + ctx.register_table("test_data", Arc::new(table))?; + + // Test all combinations of FIRST_VALUE and LAST_VALUE with null handling + let sql = "SELECT + dict_group, + value, + FIRST_VALUE(value IGNORE NULLS) OVER (ORDER BY value NULLS LAST) as first_ignore_nulls, + FIRST_VALUE(value RESPECT NULLS) OVER (ORDER BY value NULLS FIRST) as first_respect_nulls, + LAST_VALUE(value IGNORE NULLS) OVER (ORDER BY value NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as last_ignore_nulls, + LAST_VALUE(value RESPECT NULLS) OVER (ORDER BY value NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as last_respect_nulls + FROM test_data + ORDER BY value NULLS LAST"; + + let df = ctx.sql(sql).await?; + let results = df.collect().await?; + + assert_snapshot!( + batches_to_string(&results), + @r###" + +------------+-------+--------------------+---------------------+-------------------+--------------------+ + | dict_group | value | first_ignore_nulls | first_respect_nulls | last_ignore_nulls | last_respect_nulls | + +------------+-------+--------------------+---------------------+-------------------+--------------------+ + | group_a | 5 | 5 | | 20 | | + | group_b | 10 | 5 | | 20 | | + | group_c | 20 | 5 | | 20 | | + | group_a | | 5 | | 20 | | + | group_b | | 5 | | 20 | | + +------------+-------+--------------------+---------------------+-------------------+--------------------+ + "### + ); + + Ok(()) +} + +/// Test GROUP BY with dictionary columns containing null keys and values for FIRST_VALUE/LAST_VALUE +#[tokio::test] +async fn test_first_last_value_group_by_dict_nulls() -> Result<()> { + let ctx = SessionContext::new(); + + // Create dictionary with null keys + let dict_null_keys = create_test_dict( + &[Some("group_a"), Some("group_b")], + &[ + Some(0), // group_a + None, // null key + Some(1), // group_b + None, // null key + Some(0), // group_a + ], + ); + + // Create dictionary with null values + let dict_null_vals = create_test_dict( + &[Some("val_x"), None, Some("val_y")], + &[ + Some(0), // val_x + Some(1), // null value + Some(2), // val_y + Some(1), // null value + Some(0), // val_x + ], + ); + + // Create test values + let values = Int32Array::from(vec![Some(10), Some(20), Some(30), Some(40), Some(50)]); + + let schema = Arc::new(Schema::new(vec![ + Field::new("dict_null_keys", string_dict_type(), true), + Field::new("dict_null_vals", string_dict_type(), true), + Field::new("value", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(dict_null_keys), + Arc::new(dict_null_vals), + Arc::new(values), + ], + )?; + + let table = MemTable::try_new(schema, vec![vec![batch]])?; + ctx.register_table("test_data", Arc::new(table))?; + + // Test GROUP BY with null keys + let sql = "SELECT + dict_null_keys, + FIRST_VALUE(value) as first_val, + LAST_VALUE(value) as last_val, + COUNT(*) as cnt + FROM test_data + GROUP BY dict_null_keys + ORDER BY dict_null_keys NULLS FIRST"; + + let df = ctx.sql(sql).await?; + let results = df.collect().await?; + + assert_snapshot!( + batches_to_string(&results), + @r###" + +----------------+-----------+----------+-----+ + | dict_null_keys | first_val | last_val | cnt | + +----------------+-----------+----------+-----+ + | | 20 | 40 | 2 | + | group_a | 10 | 50 | 2 | + | group_b | 30 | 30 | 1 | + +----------------+-----------+----------+-----+ + "### + ); + + // Test GROUP BY with null values in dictionary + let sql2 = "SELECT + dict_null_vals, + FIRST_VALUE(value) as first_val, + LAST_VALUE(value) as last_val, + COUNT(*) as cnt + FROM test_data + GROUP BY dict_null_vals + ORDER BY dict_null_vals NULLS FIRST"; + + let df2 = ctx.sql(sql2).await?; + let results2 = df2.collect().await?; + + assert_snapshot!( + batches_to_string(&results2), + @r###" + +----------------+-----------+----------+-----+ + | dict_null_vals | first_val | last_val | cnt | + +----------------+-----------+----------+-----+ + | | 20 | 40 | 2 | + | val_x | 10 | 50 | 2 | + | val_y | 30 | 30 | 1 | + +----------------+-----------+----------+-----+ + "### + ); + + Ok(()) +} + +/// Test MAX with dictionary columns containing null keys and values as specified in the SQL query +/// Test data structure for fuzz table with dictionary columns containing nulls +struct FuzzTestData { + schema: Arc<Schema>, + u8_low: UInt8Array, + dictionary_utf8_low: DictionaryArray<UInt32Type>, + utf8_low: StringArray, + utf8: StringArray, +} + +impl FuzzTestData { + fn new() -> Self { + // Create dictionary columns with null keys and values + let dictionary_utf8_low = create_test_dict( + &[Some("dict_a"), None, Some("dict_b"), Some("dict_c")], + &[ + Some(0), // dict_a + Some(1), // null value + Some(2), // dict_b + None, // null key + Some(0), // dict_a + Some(1), // null value + Some(3), // dict_c + None, // null key + ], + ); + + let u8_low = UInt8Array::from(vec![ + Some(1), + Some(1), + Some(2), + Some(2), + Some(1), + Some(3), + Some(3), + Some(2), + ]); + + let utf8_low = StringArray::from(vec![ + Some("str_a"), + Some("str_b"), + Some("str_c"), + Some("str_d"), + Some("str_a"), + Some("str_e"), + Some("str_f"), + Some("str_c"), + ]); + + let utf8 = StringArray::from(vec![ + Some("value_1"), + Some("value_2"), + Some("value_3"), + Some("value_4"), + Some("value_5"), + None, + Some("value_6"), + Some("value_7"), + ]); + + let schema = Arc::new(Schema::new(vec![ + Field::new("u8_low", DataType::UInt8, true), + Field::new("dictionary_utf8_low", string_dict_type(), true), + Field::new("utf8_low", DataType::Utf8, true), + Field::new("utf8", DataType::Utf8, true), + ])); + + Self { + schema, + u8_low, + dictionary_utf8_low, + utf8_low, + utf8, + } + } +} + +/// Sets up test contexts for fuzz table with both single and multiple partitions +async fn setup_fuzz_test_contexts() -> Result<(SessionContext, SessionContext)> { + let test_data = FuzzTestData::new(); + + // Single partition context + let ctx_single = create_fuzz_context_with_partitions(&test_data, 1).await?; + + // Multiple partition context + let ctx_multi = create_fuzz_context_with_partitions(&test_data, 3).await?; + + Ok((ctx_single, ctx_multi)) +} + +/// Creates a session context with fuzz table partitioned into specified number of partitions +async fn create_fuzz_context_with_partitions( + test_data: &FuzzTestData, + num_partitions: usize, +) -> Result<SessionContext> { + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_target_partitions(num_partitions), + ); + + let batches = split_fuzz_data_into_batches(test_data, num_partitions)?; + let provider = MemTable::try_new(test_data.schema.clone(), batches)?; + ctx.register_table("fuzz_table", Arc::new(provider))?; + + Ok(ctx) +} +/// Splits fuzz test data into multiple batches for partitioning +fn split_fuzz_data_into_batches( + test_data: &FuzzTestData, + num_partitions: usize, +) -> Result<Vec<Vec<RecordBatch>>> { + debug_assert!(num_partitions > 0, "num_partitions must be greater than 0"); + let total_len = test_data.u8_low.len(); + let chunk_size = total_len.div_ceil(num_partitions); + + let mut batches = Vec::new(); + let mut start = 0; + + while start < total_len { + let end = min(start + chunk_size, total_len); + let len = end - start; + + if len > 0 { + let batch = RecordBatch::try_new( + test_data.schema.clone(), + vec![ + Arc::new(test_data.u8_low.slice(start, len)), + Arc::new(test_data.dictionary_utf8_low.slice(start, len)), + Arc::new(test_data.utf8_low.slice(start, len)), + Arc::new(test_data.utf8.slice(start, len)), + ], + )?; + batches.push(vec![batch]); + } + start = end; + } + + Ok(batches) +} + +/// Test MAX with fuzz table containing dictionary columns with null keys and values (single and multiple partitions) +#[tokio::test] +async fn test_max_with_fuzz_table_dict_nulls() -> Result<()> { + let (ctx_single, ctx_multi) = setup_fuzz_test_contexts().await?; + + // Execute the SQL query with MAX aggregations + let sql = "SELECT + u8_low, + dictionary_utf8_low, + utf8_low, + max(utf8_low) as col1, + max(utf8) as col2 + FROM + fuzz_table + GROUP BY + u8_low, + dictionary_utf8_low, + utf8_low + ORDER BY u8_low, dictionary_utf8_low NULLS FIRST, utf8_low"; + + let results = test_query_consistency(&ctx_single, &ctx_multi, sql).await?; + + assert_snapshot!( + batches_to_string(&results), + @r" + +--------+---------------------+----------+-------+---------+ + | u8_low | dictionary_utf8_low | utf8_low | col1 | col2 | + +--------+---------------------+----------+-------+---------+ + | 1 | | str_b | str_b | value_2 | + | 1 | dict_a | str_a | str_a | value_5 | + | 2 | | str_c | str_c | value_7 | + | 2 | | str_d | str_d | value_4 | + | 2 | dict_b | str_c | str_c | value_3 | + | 3 | | str_e | str_e | | + | 3 | dict_c | str_f | str_f | value_6 | + +--------+---------------------+----------+-------+---------+ + "); + + Ok(()) +} + +/// Test MIN with fuzz table containing dictionary columns with null keys and values and timestamp data (single and multiple partitions) +#[tokio::test] +async fn test_min_timestamp_with_fuzz_table_dict_nulls() -> Result<()> { + let (ctx_single, ctx_multi) = setup_fuzz_timestamp_test_contexts().await?; + + // Execute the SQL query with MIN aggregation on timestamp + let sql = "SELECT + utf8_low, + u8_low, + dictionary_utf8_low, + min(timestamp_us) as col1 + FROM + fuzz_table + GROUP BY + utf8_low, + u8_low, + dictionary_utf8_low + ORDER BY utf8_low, u8_low, dictionary_utf8_low NULLS FIRST"; + + let results = test_query_consistency(&ctx_single, &ctx_multi, sql).await?; + + assert_snapshot!( + batches_to_string(&results), + @r" + +----------+--------+---------------------+-------------------------+ + | utf8_low | u8_low | dictionary_utf8_low | col1 | + +----------+--------+---------------------+-------------------------+ + | alpha | 10 | dict_x | 1970-01-01T00:00:01 | + | beta | 20 | | 1970-01-01T00:00:02 | + | delta | 20 | | 1970-01-01T00:00:03.500 | + | epsilon | 40 | | 1970-01-01T00:00:04 | + | gamma | 30 | dict_y | 1970-01-01T00:00:02.800 | + | zeta | 30 | dict_z | 1970-01-01T00:00:02.500 | + +----------+--------+---------------------+-------------------------+ + " + ); + + Ok(()) +} + +/// Test data structure for fuzz table with duration, large_binary and dictionary columns containing nulls +struct FuzzCountTestData { + schema: Arc<Schema>, + u8_low: UInt8Array, + utf8_low: StringArray, + dictionary_utf8_low: DictionaryArray<UInt32Type>, + duration_nanosecond: DurationNanosecondArray, + large_binary: LargeBinaryArray, +} + +impl FuzzCountTestData { + fn new() -> Self { + // Create dictionary columns with null keys and values + let dictionary_utf8_low = create_test_dict( + &[ + Some("group_alpha"), + None, + Some("group_beta"), + Some("group_gamma"), + ], + &[ + Some(0), // group_alpha + Some(1), // null value + Some(2), // group_beta + None, // null key + Some(0), // group_alpha + Some(1), // null value + Some(3), // group_gamma + None, // null key + Some(2), // group_beta + Some(0), // group_alpha + ], + ); + + let u8_low = UInt8Array::from(vec![ + Some(5), + Some(10), + Some(15), + Some(10), + Some(5), + Some(20), + Some(25), + Some(10), + Some(15), + Some(5), + ]); + + let utf8_low = StringArray::from(vec![ + Some("text_a"), + Some("text_b"), + Some("text_c"), + Some("text_d"), + Some("text_a"), + Some("text_e"), + Some("text_f"), + Some("text_d"), + Some("text_c"), + Some("text_a"), + ]); + + // Create duration data with some nulls (nanoseconds) + let duration_nanosecond = DurationNanosecondArray::from(vec![ + Some(1000000000), // 1 second + Some(2000000000), // 2 seconds + None, // null duration + Some(3000000000), // 3 seconds + Some(1500000000), // 1.5 seconds + None, // null duration + Some(4000000000), // 4 seconds + Some(2500000000), // 2.5 seconds + Some(3500000000), // 3.5 seconds + Some(1200000000), // 1.2 seconds + ]); + + // Create large binary data with some nulls and duplicates + let large_binary = LargeBinaryArray::from(vec![ + Some(b"binary_data_1".as_slice()), + Some(b"binary_data_2".as_slice()), + Some(b"binary_data_3".as_slice()), + None, // null binary + Some(b"binary_data_1".as_slice()), // duplicate + Some(b"binary_data_4".as_slice()), + Some(b"binary_data_5".as_slice()), + None, // null binary + Some(b"binary_data_3".as_slice()), // duplicate + Some(b"binary_data_1".as_slice()), // duplicate + ]); + + let schema = Arc::new(Schema::new(vec![ + Field::new("u8_low", DataType::UInt8, true), + Field::new("utf8_low", DataType::Utf8, true), + Field::new("dictionary_utf8_low", string_dict_type(), true), + Field::new( + "duration_nanosecond", + DataType::Duration(TimeUnit::Nanosecond), + true, + ), + Field::new("large_binary", DataType::LargeBinary, true), + ])); + + Self { + schema, + u8_low, + utf8_low, + dictionary_utf8_low, + duration_nanosecond, + large_binary, + } + } +} + +/// Sets up test contexts for fuzz table with duration/binary columns and both single and multiple partitions +async fn setup_fuzz_count_test_contexts() -> Result<(SessionContext, SessionContext)> { + let test_data = FuzzCountTestData::new(); + + // Single partition context + let ctx_single = create_fuzz_count_context_with_partitions(&test_data, 1).await?; + + // Multiple partition context + let ctx_multi = create_fuzz_count_context_with_partitions(&test_data, 3).await?; + + Ok((ctx_single, ctx_multi)) +} + +/// Creates a session context with fuzz count table partitioned into specified number of partitions +async fn create_fuzz_count_context_with_partitions( + test_data: &FuzzCountTestData, + num_partitions: usize, +) -> Result<SessionContext> { + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_target_partitions(num_partitions), + ); + + let batches = split_fuzz_count_data_into_batches(test_data, num_partitions)?; + let provider = MemTable::try_new(test_data.schema.clone(), batches)?; + ctx.register_table("fuzz_table", Arc::new(provider))?; + + Ok(ctx) +} +/// Splits fuzz count test data into multiple batches for partitioning +fn split_fuzz_count_data_into_batches( + test_data: &FuzzCountTestData, + num_partitions: usize, +) -> Result<Vec<Vec<RecordBatch>>> { + debug_assert!(num_partitions > 0, "num_partitions must be greater than 0"); + let total_len = test_data.u8_low.len(); + let chunk_size = total_len.div_ceil(num_partitions); + + let mut batches = Vec::new(); + let mut start = 0; + + while start < total_len { + let end = min(start + chunk_size, total_len); + let len = end - start; + + if len > 0 { + let batch = RecordBatch::try_new( + test_data.schema.clone(), + vec![ + Arc::new(test_data.u8_low.slice(start, len)), + Arc::new(test_data.utf8_low.slice(start, len)), + Arc::new(test_data.dictionary_utf8_low.slice(start, len)), + Arc::new(test_data.duration_nanosecond.slice(start, len)), + Arc::new(test_data.large_binary.slice(start, len)), + ], + )?; + batches.push(vec![batch]); + } + start = end; + } + + Ok(batches) +} + +/// Test COUNT and COUNT DISTINCT with fuzz table containing dictionary columns with null keys and values (single and multiple partitions) Review Comment: 👍 aggregates ├── basic.rs ├── dict_nulls.rs └── mod.rs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org