UBarney commented on code in PR #15954: URL: https://github.com/apache/datafusion/pull/15954#discussion_r2102508622
########## datafusion/physical-plan/src/aggregates/mod.rs: ########## @@ -751,28 +771,18 @@ impl AggregateExec { }) } _ => { - // When the input row count is 0 or 1, we can adopt that statistic keeping its reliability. + // When the input row count is 1, we can adopt that statistic keeping its reliability. // When it is larger than 1, we degrade the precision since it may decrease after aggregation. - let num_rows = if let Some(value) = self - .input() - .partition_statistics(None)? - .num_rows - .get_value() + let num_rows = if let Some(value) = child_statistics.num_rows.get_value() { if *value > 1 { - self.input() - .partition_statistics(None)? - .num_rows - .to_inexact() + child_statistics.num_rows.to_inexact() } else if *value == 0 { - // Aggregation on an empty table creates a null row. - self.input() - .partition_statistics(None)? - .num_rows - .add(&Precision::Exact(1)) + child_statistics.num_rows } else { // num_rows = 1 case - self.input().partition_statistics(None)?.num_rows + let grouping_set_num = self.group_by.groups.len(); + child_statistics.num_rows.map(|x| x * grouping_set_num) Review Comment: ``` > explain analyze select t1.value, t2.value, count(*) from range(1) as t1 join range(1) as t2 using(value) group by cube(t1.value, t2.value); +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | ProjectionExec: expr=[value@0 as value, value@1 as value, count(Int64(1))@3 as count(*)], metrics=[output_rows=4, elapsed_compute=20.988µs] | | | AggregateExec: mode=FinalPartitioned, gby=[value@0 as value, value@1 as value, __grouping_id@2 as __grouping_id], aggr=[count(Int64(1))], metrics=[output_rows=4, elapsed_compute=107.217µs, spill_count=0, spilled_bytes=0, spilled_rows=0, peak_mem_used=3264] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=4, elapsed_compute=4.21µs] | | | RepartitionExec: partitioning=Hash([value@0, value@1, __grouping_id@2], 24), input_partitions=24, metrics=[fetch_time=19.129834ms, repartition_time=7.618µs, send_time=35.082µs] | | | AggregateExec: mode=Partial, gby=[(value@0 as value, value@1 as value), (NULL as value, value@1 as value), (value@0 as value, NULL as value), (NULL as value, NULL as value)], aggr=[count(Int64(1))], metrics=[output_rows=4, elapsed_compute=120.603µs, spill_count=0, spilled_bytes=0, spilled_rows=0, peak_mem_used=1976] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=1.288µs] | | | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(value@0, value@0)], metrics=[output_rows=1, build_input_batches=1, build_input_rows=1, input_batches=1, input_rows=1, output_batches=1, build_mem_used=1760, build_time=475.105µs, join_time=14.496µs] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=1.622µs] | | | RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=654.947µs, repartition_time=4.621µs, send_time=1.056µs] | | | RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=3.096µs, repartition_time=1ns, send_time=741ns] | | | LazyMemoryExec: partitions=1, batch_generators=[range: start=0, end=1, batch_size=8192], metrics=[] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=2.197µs] | | | RepartitionExec: partitioning=Hash([value@0], 24), input_partitions=24, metrics=[fetch_time=2.702565ms, repartition_time=2.726µs, send_time=1.497µs] | | | RepartitionExec: partitioning=RoundRobinBatch(24), input_partitions=1, metrics=[fetch_time=1.571µs, repartition_time=1ns, send_time=248ns] | | | LazyMemoryExec: partitions=1, batch_generators=[range: start=0, end=1, batch_size=8192], metrics=[] | | | | +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.004 seconds. > ``` ########## datafusion/physical-plan/src/aggregates/mod.rs: ########## @@ -733,13 +733,33 @@ impl AggregateExec { &self.input_order_mode } - fn statistics_inner(&self) -> Result<Statistics> { + fn statistics_inner(&self, child_statistics: Statistics) -> Result<Statistics> { // TODO stats: group expressions: // - once expressions will be able to compute their own stats, use it here // - case where we group by on a column for which with have the `distinct` stat // TODO stats: aggr expression: // - aggregations sometimes also preserve invariants such as min, max... - let column_statistics = Statistics::unknown_column(&self.schema()); + + let column_statistics = { + // self.schema: [<group by exprs>, <aggregate exprs>] + let mut column_statistics = Statistics::unknown_column(&self.schema()); + + for (idx, (expr, _)) in self.group_by.expr.iter().enumerate() { Review Comment: Yes. It can be verified by this testcase : https://github.com/apache/datafusion/blob/0c7760aebd4e057196b363a173a9884bcdbb7236/datafusion/core/tests/physical_optimizer/partition_statistics.rs#L549-L576 ########## datafusion/physical-plan/src/aggregates/mod.rs: ########## @@ -751,28 +771,16 @@ impl AggregateExec { }) } _ => { - // When the input row count is 0 or 1, we can adopt that statistic keeping its reliability. + // When the input row count is 1, we can adopt that statistic keeping its reliability. // When it is larger than 1, we degrade the precision since it may decrease after aggregation. - let num_rows = if let Some(value) = self - .input() - .partition_statistics(None)? - .num_rows - .get_value() + let num_rows = if let Some(value) = child_statistics.num_rows.get_value() { - if *value > 1 { - self.input() - .partition_statistics(None)? - .num_rows - .to_inexact() - } else if *value == 0 { - // Aggregation on an empty table creates a null row. Review Comment: I think current unit test already covered it + [`group_by_expr.is_empty() and input_statistics.num_rows == 0`](https://github.com/apache/datafusion/blob/0c7760aebd4e057196b363a173a9884bcdbb7236/datafusion/core/tests/physical_optimizer/partition_statistics.rs#L673-L708) + [`!group_by_expr.is_empty() and input_statistics.num_rows == 0`](https://github.com/apache/datafusion/blob/0c7760aebd4e057196b363a173a9884bcdbb7236/datafusion/core/tests/physical_optimizer/partition_statistics.rs#L632-L663) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org