alamb commented on code in PR #14271:
URL: https://github.com/apache/datafusion/pull/14271#discussion_r1929720287
##########
datafusion/expr/src/udaf.rs:
##########
@@ -39,6 +39,26 @@ use crate::utils::AggregateOrderSensitivity;
use crate::{Accumulator, Expr};
use crate::{Documentation, Signature};
+/// Status of an Aggregate Expression's Monotonicity
+#[derive(Debug, Clone)]
+pub enum AggregateExprMonotonicity {
Review Comment:
Instead of a new structure, did you consider `SortProperties` (which I
think @berkaysynnada added) that also reflects the idea of monotonicity?
I am thinking of this in particular:
https://docs.rs/datafusion/latest/datafusion/logical_expr/sort_properties/enum.SortProperties.html
https://github.com/apache/datafusion/blob/95d296cd5aa1c58baf18e7625afe50f2e8989b07/datafusion/expr-common/src/sort_properties.rs#L37-L36
In addition to being an existing pattern, the `SortProperties` structure
also can represent the difference between `ASC NULLS FIRST` and `ASC NULLS
LAST` which this structure does not seem to do
##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -4963,6 +4963,9 @@ false
true
NULL
+statement ok
Review Comment:
In order that the tests better explain the implications of this change, can
you please add a new test rather than updating the existing test (by setting
this option).
So that would mean set the flag and run the EXPLAIN again in a separate block
That will let the tests better illustrate any change in behavior
##########
datafusion/sqllogictest/test_files/aggregates_topk.slt:
##########
@@ -143,13 +143,12 @@ logical_plan
03)----TableScan: traces projection=[trace_id, timestamp]
physical_plan
01)SortPreservingMergeExec: [max(traces.timestamp)@1 ASC NULLS LAST], fetch=4
-02)--SortExec: TopK(fetch=4), expr=[max(traces.timestamp)@1 ASC NULLS LAST],
preserve_partitioning=[true]
-03)----AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id],
aggr=[max(traces.timestamp)]
-04)------CoalesceBatchesExec: target_batch_size=8192
-05)--------RepartitionExec: partitioning=Hash([trace_id@0], 4),
input_partitions=4
-06)----------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-07)------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[max(traces.timestamp)]
-08)--------------MemoryExec: partitions=1, partition_sizes=[1]
+02)--AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id],
aggr=[max(traces.timestamp)]
Review Comment:
This plan doesn't look correct to me
Avioding the sort requires that the output of `AggregateExec:
mode=FinalPartitioned, gby=[trace_id@0 as trace_id],
aggr=[max(traces.timestamp)]` is sorted on `max(timestamp)` but that is only
true if there is a single group (aka a single value of `trace_id`) 🤔
##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -4978,17 +4981,19 @@ logical_plan
physical_plan
01)GlobalLimitExec: skip=0, fetch=5
02)--CoalescePartitionsExec
-03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3,
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
+03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3,
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[],
ordering_mode=PartiallySorted([1])
04)------CoalesceBatchesExec: target_batch_size=8192
-05)--------RepartitionExec: partitioning=Hash([c3@0,
min(aggregate_test_100.c1)@1], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3,
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
+05)--------RepartitionExec: partitioning=Hash([c3@0,
min(aggregate_test_100.c1)@1], 4), input_partitions=4, preserve_order=true,
sort_exprs=min(aggregate_test_100.c1)@1 DESC NULLS LAST
+06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3,
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[],
ordering_mode=PartiallySorted([1])
Review Comment:
I am not sure if it is important, but the `lim=[5]` appears to have been
lost in this plan
Otherwise it looks better to me (as it uses a PartiallySorted aggregate
rather than rehashing the entire thing)
##########
datafusion/expr/src/udaf.rs:
##########
@@ -635,6 +655,14 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
fn documentation(&self) -> Option<&Documentation> {
None
}
+
+ /// Indicates whether the aggregation function is monotonic as a set
function. A set
+ /// function is monotonically increasing if its value increases as its
argument grows
+ /// (as a set). Formally, `f` is a monotonically increasing set function
if `f(S) >= f(T)`
+ /// whenever `S` is a superset of `T`.
+ fn monotonicity(&self, _data_type: &DataType) -> AggregateExprMonotonicity
{
Review Comment:
Would it be possible to follow the existing model for ScalarUDFs here
instead?
https://github.com/apache/datafusion/blob/27db82fe396f43077b5056bab4b20b084c8f6948/datafusion/expr/src/udf.rs#L753-L752
Soemthing like this:
```rust
pub trait AggregateUDFImpl {
...
/// returns the output order of this aggregate expression given the
input properites
fn output_ordering(&self, inputs: &[ExprProperties]) ->
Result<SortProperties>;
...
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]