Ke-Wng opened a new issue, #18989:
URL: https://github.com/apache/datafusion/issues/18989
### Describe the bug
When creating a logical plan that involves a sequence of `Sort -> Aggregate
-> Sort -> Aggregate`, and the input table is multi-partitioned, the physical
planner fails during the `SanityCheckPlan` phase.
It appears that the optimizer fails to inject a necessary `RepartitionExec`
node between the two Aggregate operations, leading to a distribution
requirement mismatch panic.
### To Reproduce
Here is a minimal reproduction script (using datafusion and tokio crate):
```rust
use std::sync::Arc;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::MemTable;
use datafusion::functions_aggregate::count::count_udaf;
use datafusion::logical_expr::col;
use datafusion::prelude::*;
#[tokio::main]
async fn main() {
let ctx = SessionContext::default();
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Int64, false),
Field::new("region", DataType::Utf8, false),
Field::new("value", DataType::Float64, false),
]));
// create an empty but multi-partitioned MemTable
let mem_table = MemTable::try_new(schema.clone(), vec![vec![],
vec![]]).unwrap();
ctx.register_table("metrics", Arc::new(mem_table)).unwrap();
// aggregate and sort twice
let data_frame = ctx
.table("metrics")
.await
.unwrap()
.aggregate(
vec![col("region"), col("ts")],
vec![count_udaf().call(vec![col("value")])],
)
.unwrap()
.sort(vec![
col("region").sort(true, true),
col("ts").sort(true, true),
])
.unwrap()
.aggregate(
vec![col("ts")],
vec![count_udaf().call(vec![col("count(metrics.value)")])],
)
.unwrap()
.sort(vec![col("ts").sort(true, true)])
.unwrap();
println!(
"Logical Plan:\n{}",
data_frame.logical_plan().display_indent()
);
data_frame.show().await.unwrap();
}
```
### Expected behavior
The query should execute successfully, but it panics:
```text
Logical Plan:
Sort: metrics.ts ASC NULLS FIRST
Aggregate: groupBy=[[metrics.ts]], aggr=[[count(count(metrics.value))]]
Sort: metrics.region ASC NULLS FIRST, metrics.ts ASC NULLS FIRST
Aggregate: groupBy=[[metrics.region, metrics.ts]],
aggr=[[count(metrics.value)]]
TableScan: metrics
thread 'main' panicked at src/main.rs:51:29:
called `Result::unwrap()` on an `Err` value: Context("SanityCheckPlan",
Plan("Plan: [\"AggregateExec: mode=SinglePartitioned, gby=[ts@0 as ts],
aggr=[count(count(metrics.value))]\", \" ProjectionExec: expr=[ts@1 as ts,
count(metrics.value)@2 as count(metrics.value)]\", \" AggregateExec:
mode=FinalPartitioned, gby=[region@0 as region, ts@1 as ts],
aggr=[count(metrics.value)]\", \" CoalesceBatchesExec:
target_batch_size=8192\", \" RepartitionExec:
partitioning=Hash([region@0, ts@1], 64), input_partitions=2\", \"
AggregateExec: mode=Partial, gby=[region@1 as region, ts@0 as ts],
aggr=[count(metrics.value)]\", \" DataSourceExec: partitions=2,
partition_sizes=[0, 0]\"] does not satisfy distribution requirements:
HashPartitioned[[ts@0]]). Child-0 output partitioning: Hash([region@0, ts@0],
64)"))
```
### Additional context
If the input table is single-partitioned, or any sort operation is missing,
the query executes successfully.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]