berkaysynnada commented on code in PR #14271: URL: https://github.com/apache/datafusion/pull/14271#discussion_r1935489042
########## datafusion/core/tests/physical_optimizer/enforce_sorting.rs: ########## @@ -238,6 +241,338 @@ async fn test_remove_unnecessary_sort5() -> Result<()> { Ok(()) } +#[test] +fn test_aggregate_set_monotonic_no_group() -> Result<()> { + let schema = create_test_schema4()?; + + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr("a", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + + let aggregate = aggregate_exec_set_monotonic(sort, vec![]); + + let sort_exprs = LexOrdering::new(vec![sort_expr("count", &aggregate.schema())]); + let physical_plan: Arc<dyn ExecutionPlan> = + Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _; + + let expected_input = [ + "SortExec: expr=[count@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=Single, gby=[], aggr=[count]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + + let expected_optimized = [ + "AggregateExec: mode=Single, gby=[], aggr=[count]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[test] +fn test_aggregate_set_monotonic_with_group() -> Result<()> { + let schema = create_test_schema4()?; + + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr("a", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + + let aggregate = + aggregate_exec_set_monotonic(sort, vec![(col("a", &schema)?, "a".to_string())]); + + let sort_exprs = LexOrdering::new(vec![sort_expr("count", &aggregate.schema())]); + let physical_plan: Arc<dyn ExecutionPlan> = + Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _; + + let expected_input = [ + "SortExec: expr=[count@1 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + + let expected_optimized = [ + "SortExec: expr=[count@1 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[test] +fn test_aggregate_set_monotonic_with_group_partial() -> Result<()> { + let schema = create_test_schema4()?; + + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr("a", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + + let aggregate = + aggregate_exec_set_monotonic(sort, vec![(col("a", &schema)?, "a".to_string())]); + + let sort_exprs = LexOrdering::new(vec![ + sort_expr("a", &schema), + sort_expr("count", &aggregate.schema()), + ]); + let physical_plan: Arc<dyn ExecutionPlan> = + Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _; + + let expected_input = [ + "SortExec: expr=[a@0 ASC, count@1 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + + let expected_optimized = [ + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[test] +fn test_aggregate_non_set_monotonic() -> Result<()> { + let schema = create_test_schema4()?; + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr("a", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + + let aggregate = aggregate_exec_non_set_monotonic(sort); + let sort_exprs = LexOrdering::new(vec![sort_expr("avg", &aggregate.schema())]); + let physical_plan: Arc<dyn ExecutionPlan> = + Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _; + + let expected_input = [ + "SortExec: expr=[avg@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=Single, gby=[], aggr=[avg]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + + let expected_optimized = [ + "SortExec: expr=[avg@0 ASC], preserve_partitioning=[false]", Review Comment: Done In https://github.com/apache/datafusion/pull/14271/commits/7822613c7a3914bacb1eb172e16d567c973ed49f -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org