alamb commented on code in PR #14271: URL: https://github.com/apache/datafusion/pull/14271#discussion_r1929720287
########## datafusion/expr/src/udaf.rs: ########## @@ -39,6 +39,26 @@ use crate::utils::AggregateOrderSensitivity; use crate::{Accumulator, Expr}; use crate::{Documentation, Signature}; +/// Status of an Aggregate Expression's Monotonicity +#[derive(Debug, Clone)] +pub enum AggregateExprMonotonicity { Review Comment: Instead of a new structure, did you consider `SortProperties` (which I think @berkaysynnada added) that also reflects the idea of monotonicity? I am thinking of this in particular: https://docs.rs/datafusion/latest/datafusion/logical_expr/sort_properties/enum.SortProperties.html https://github.com/apache/datafusion/blob/95d296cd5aa1c58baf18e7625afe50f2e8989b07/datafusion/expr-common/src/sort_properties.rs#L37-L36 In addition to being an existing pattern, the `SortProperties` structure also can represent the difference between `ASC NULLS FIRST` and `ASC NULLS LAST` which this structure does not seem to do ########## datafusion/sqllogictest/test_files/aggregate.slt: ########## @@ -4963,6 +4963,9 @@ false true NULL +statement ok Review Comment: In order that the tests better explain the implications of this change, can you please add a new test rather than updating the existing test (by setting this option). So that would mean set the flag and run the EXPLAIN again in a separate block That will let the tests better illustrate any change in behavior ########## datafusion/sqllogictest/test_files/aggregates_topk.slt: ########## @@ -143,13 +143,12 @@ logical_plan 03)----TableScan: traces projection=[trace_id, timestamp] physical_plan 01)SortPreservingMergeExec: [max(traces.timestamp)@1 ASC NULLS LAST], fetch=4 -02)--SortExec: TopK(fetch=4), expr=[max(traces.timestamp)@1 ASC NULLS LAST], preserve_partitioning=[true] -03)----AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[max(traces.timestamp)] -04)------CoalesceBatchesExec: target_batch_size=8192 -05)--------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4 -06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -07)------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[max(traces.timestamp)] -08)--------------MemoryExec: partitions=1, partition_sizes=[1] +02)--AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[max(traces.timestamp)] Review Comment: This plan doesn't look correct to me Avioding the sort requires that the output of `AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[max(traces.timestamp)]` is sorted on `max(timestamp)` but that is only true if there is a single group (aka a single value of `trace_id`) 🤔 ########## datafusion/sqllogictest/test_files/aggregate.slt: ########## @@ -4978,17 +4981,19 @@ logical_plan physical_plan 01)GlobalLimitExec: skip=0, fetch=5 02)--CoalescePartitionsExec -03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5] +03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], ordering_mode=PartiallySorted([1]) 04)------CoalesceBatchesExec: target_batch_size=8192 -05)--------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4 -06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5] +05)--------RepartitionExec: partitioning=Hash([c3@0, min(aggregate_test_100.c1)@1], 4), input_partitions=4, preserve_order=true, sort_exprs=min(aggregate_test_100.c1)@1 DESC NULLS LAST +06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3, min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], ordering_mode=PartiallySorted([1]) Review Comment: I am not sure if it is important, but the `lim=[5]` appears to have been lost in this plan Otherwise it looks better to me (as it uses a PartiallySorted aggregate rather than rehashing the entire thing) ########## datafusion/expr/src/udaf.rs: ########## @@ -635,6 +655,14 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { fn documentation(&self) -> Option<&Documentation> { None } + + /// Indicates whether the aggregation function is monotonic as a set function. A set + /// function is monotonically increasing if its value increases as its argument grows + /// (as a set). Formally, `f` is a monotonically increasing set function if `f(S) >= f(T)` + /// whenever `S` is a superset of `T`. + fn monotonicity(&self, _data_type: &DataType) -> AggregateExprMonotonicity { Review Comment: Would it be possible to follow the existing model for ScalarUDFs here instead? https://github.com/apache/datafusion/blob/27db82fe396f43077b5056bab4b20b084c8f6948/datafusion/expr/src/udf.rs#L753-L752 Soemthing like this: ```rust pub trait AggregateUDFImpl { ... /// returns the output order of this aggregate expression given the input properites fn output_ordering(&self, inputs: &[ExprProperties]) -> Result<SortProperties>; ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org