alamb commented on code in PR #14271: URL: https://github.com/apache/datafusion/pull/14271#discussion_r1934471695
########## datafusion/core/tests/physical_optimizer/enforce_sorting.rs: ########## @@ -238,6 +241,338 @@ async fn test_remove_unnecessary_sort5() -> Result<()> { Ok(()) } +#[test] +fn test_aggregate_set_monotonic_no_group() -> Result<()> { + let schema = create_test_schema4()?; + + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr("a", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + + let aggregate = aggregate_exec_set_monotonic(sort, vec![]); + + let sort_exprs = LexOrdering::new(vec![sort_expr("count", &aggregate.schema())]); + let physical_plan: Arc<dyn ExecutionPlan> = + Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _; + + let expected_input = [ + "SortExec: expr=[count@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=Single, gby=[], aggr=[count]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + + let expected_optimized = [ + "AggregateExec: mode=Single, gby=[], aggr=[count]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[test] +fn test_aggregate_set_monotonic_with_group() -> Result<()> { + let schema = create_test_schema4()?; + + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr("a", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + + let aggregate = + aggregate_exec_set_monotonic(sort, vec![(col("a", &schema)?, "a".to_string())]); + + let sort_exprs = LexOrdering::new(vec![sort_expr("count", &aggregate.schema())]); + let physical_plan: Arc<dyn ExecutionPlan> = + Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _; + + let expected_input = [ + "SortExec: expr=[count@1 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + + let expected_optimized = [ + "SortExec: expr=[count@1 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[test] +fn test_aggregate_set_monotonic_with_group_partial() -> Result<()> { + let schema = create_test_schema4()?; + + let source = memory_exec(&schema); + + let sort_exprs = vec![sort_expr("a", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + + let aggregate = + aggregate_exec_set_monotonic(sort, vec![(col("a", &schema)?, "a".to_string())]); + + let sort_exprs = LexOrdering::new(vec![ + sort_expr("a", &schema), + sort_expr("count", &aggregate.schema()), + ]); + let physical_plan: Arc<dyn ExecutionPlan> = + Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _; + + let expected_input = [ + "SortExec: expr=[a@0 ASC, count@1 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + + let expected_optimized = [ + "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], ordering_mode=Sorted", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan, true); + + Ok(()) +} + +#[test] +fn test_aggregate_non_set_monotonic() -> Result<()> { + let schema = create_test_schema4()?; + let source = memory_exec(&schema); + let sort_exprs = vec![sort_expr("a", &schema)]; + let sort = sort_exec(sort_exprs.clone(), source); + + let aggregate = aggregate_exec_non_set_monotonic(sort); + let sort_exprs = LexOrdering::new(vec![sort_expr("avg", &aggregate.schema())]); + let physical_plan: Arc<dyn ExecutionPlan> = + Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _; + + let expected_input = [ + "SortExec: expr=[avg@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=Single, gby=[], aggr=[avg]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " MemoryExec: partitions=1, partition_sizes=[0]", + ]; + + let expected_optimized = [ + "SortExec: expr=[avg@0 ASC], preserve_partitioning=[false]", Review Comment: I think this sort could be eliminated too, as the output will be a single row On the other hand it likely doesn't really matter as the overhead of sorting a single row is probably pretty low -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org