berkaysynnada commented on code in PR #14271:
URL: https://github.com/apache/datafusion/pull/14271#discussion_r1935489042


##########
datafusion/core/tests/physical_optimizer/enforce_sorting.rs:
##########
@@ -238,6 +241,338 @@ async fn test_remove_unnecessary_sort5() -> Result<()> {
     Ok(())
 }
 
+#[test]
+fn test_aggregate_set_monotonic_no_group() -> Result<()> {
+    let schema = create_test_schema4()?;
+
+    let source = memory_exec(&schema);
+
+    let sort_exprs = vec![sort_expr("a", &schema)];
+    let sort = sort_exec(sort_exprs.clone(), source);
+
+    let aggregate = aggregate_exec_set_monotonic(sort, vec![]);
+
+    let sort_exprs = LexOrdering::new(vec![sort_expr("count", 
&aggregate.schema())]);
+    let physical_plan: Arc<dyn ExecutionPlan> =
+        Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _;
+
+    let expected_input = [
+        "SortExec: expr=[count@0 ASC], preserve_partitioning=[false]",
+        "  AggregateExec: mode=Single, gby=[], aggr=[count]",
+        "    SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+        "      MemoryExec: partitions=1, partition_sizes=[0]",
+    ];
+
+    let expected_optimized = [
+        "AggregateExec: mode=Single, gby=[], aggr=[count]",
+        "  MemoryExec: partitions=1, partition_sizes=[0]",
+    ];
+    assert_optimized!(expected_input, expected_optimized, physical_plan, true);
+
+    Ok(())
+}
+
+#[test]
+fn test_aggregate_set_monotonic_with_group() -> Result<()> {
+    let schema = create_test_schema4()?;
+
+    let source = memory_exec(&schema);
+
+    let sort_exprs = vec![sort_expr("a", &schema)];
+    let sort = sort_exec(sort_exprs.clone(), source);
+
+    let aggregate =
+        aggregate_exec_set_monotonic(sort, vec![(col("a", &schema)?, 
"a".to_string())]);
+
+    let sort_exprs = LexOrdering::new(vec![sort_expr("count", 
&aggregate.schema())]);
+    let physical_plan: Arc<dyn ExecutionPlan> =
+        Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _;
+
+    let expected_input = [
+        "SortExec: expr=[count@1 ASC], preserve_partitioning=[false]",
+        "  AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], 
ordering_mode=Sorted",
+        "    SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+        "      MemoryExec: partitions=1, partition_sizes=[0]",
+    ];
+
+    let expected_optimized = [
+        "SortExec: expr=[count@1 ASC], preserve_partitioning=[false]",
+        "  AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], 
ordering_mode=Sorted",
+        "    SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+        "      MemoryExec: partitions=1, partition_sizes=[0]",
+    ];
+    assert_optimized!(expected_input, expected_optimized, physical_plan, true);
+
+    Ok(())
+}
+
+#[test]
+fn test_aggregate_set_monotonic_with_group_partial() -> Result<()> {
+    let schema = create_test_schema4()?;
+
+    let source = memory_exec(&schema);
+
+    let sort_exprs = vec![sort_expr("a", &schema)];
+    let sort = sort_exec(sort_exprs.clone(), source);
+
+    let aggregate =
+        aggregate_exec_set_monotonic(sort, vec![(col("a", &schema)?, 
"a".to_string())]);
+
+    let sort_exprs = LexOrdering::new(vec![
+        sort_expr("a", &schema),
+        sort_expr("count", &aggregate.schema()),
+    ]);
+    let physical_plan: Arc<dyn ExecutionPlan> =
+        Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _;
+
+    let expected_input = [
+        "SortExec: expr=[a@0 ASC, count@1 ASC], preserve_partitioning=[false]",
+        "  AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], 
ordering_mode=Sorted",
+        "    SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+        "      MemoryExec: partitions=1, partition_sizes=[0]",
+    ];
+
+    let expected_optimized = [
+        "AggregateExec: mode=Single, gby=[a@0 as a], aggr=[count], 
ordering_mode=Sorted",
+        "  SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+        "    MemoryExec: partitions=1, partition_sizes=[0]",
+    ];
+    assert_optimized!(expected_input, expected_optimized, physical_plan, true);
+
+    Ok(())
+}
+
+#[test]
+fn test_aggregate_non_set_monotonic() -> Result<()> {
+    let schema = create_test_schema4()?;
+    let source = memory_exec(&schema);
+    let sort_exprs = vec![sort_expr("a", &schema)];
+    let sort = sort_exec(sort_exprs.clone(), source);
+
+    let aggregate = aggregate_exec_non_set_monotonic(sort);
+    let sort_exprs = LexOrdering::new(vec![sort_expr("avg", 
&aggregate.schema())]);
+    let physical_plan: Arc<dyn ExecutionPlan> =
+        Arc::new(SortExec::new(sort_exprs.clone(), aggregate)) as _;
+
+    let expected_input = [
+        "SortExec: expr=[avg@0 ASC], preserve_partitioning=[false]",
+        "  AggregateExec: mode=Single, gby=[], aggr=[avg]",
+        "    SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
+        "      MemoryExec: partitions=1, partition_sizes=[0]",
+    ];
+
+    let expected_optimized = [
+        "SortExec: expr=[avg@0 ASC], preserve_partitioning=[false]",

Review Comment:
   Done In 
https://github.com/apache/datafusion/pull/14271/commits/7822613c7a3914bacb1eb172e16d567c973ed49f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to