Ke-Wng opened a new issue, #18989:
URL: https://github.com/apache/datafusion/issues/18989

   ### Describe the bug
   
   When creating a logical plan that involves a sequence of `Sort -> Aggregate 
-> Sort -> Aggregate`, and the input table is multi-partitioned, the physical 
planner fails during the `SanityCheckPlan` phase.
   
   It appears that the optimizer fails to inject a necessary `RepartitionExec` 
node between the two Aggregate operations, leading to a distribution 
requirement mismatch panic.
   
   ### To Reproduce
   
   Here is a minimal reproduction script (using datafusion and tokio crate):
   ```rust
   use std::sync::Arc;
   
   use datafusion::arrow::datatypes::{DataType, Field, Schema};
   use datafusion::datasource::MemTable;
   use datafusion::functions_aggregate::count::count_udaf;
   use datafusion::logical_expr::col;
   use datafusion::prelude::*;
   
   #[tokio::main]
   async fn main() {
       let ctx = SessionContext::default();
   
       let schema = Arc::new(Schema::new(vec![
           Field::new("ts", DataType::Int64, false),
           Field::new("region", DataType::Utf8, false),
           Field::new("value", DataType::Float64, false),
       ]));
   
       // create an empty but multi-partitioned MemTable
       let mem_table = MemTable::try_new(schema.clone(), vec![vec![], 
vec![]]).unwrap();
       ctx.register_table("metrics", Arc::new(mem_table)).unwrap();
   
       // aggregate and sort twice
       let data_frame = ctx
           .table("metrics")
           .await
           .unwrap()
           .aggregate(
               vec![col("region"), col("ts")],
               vec![count_udaf().call(vec![col("value")])],
           )
           .unwrap()
           .sort(vec![
               col("region").sort(true, true),
               col("ts").sort(true, true),
           ])
           .unwrap()
           .aggregate(
               vec![col("ts")],
               vec![count_udaf().call(vec![col("count(metrics.value)")])],
           )
           .unwrap()
           .sort(vec![col("ts").sort(true, true)])
           .unwrap();
   
       println!(
           "Logical Plan:\n{}",
           data_frame.logical_plan().display_indent()
       );
   
       data_frame.show().await.unwrap();
   }
   ```
   
   ### Expected behavior
   
   The query should execute successfully, but it panics:
   ```text
   Logical Plan:
   Sort: metrics.ts ASC NULLS FIRST
     Aggregate: groupBy=[[metrics.ts]], aggr=[[count(count(metrics.value))]]
       Sort: metrics.region ASC NULLS FIRST, metrics.ts ASC NULLS FIRST
         Aggregate: groupBy=[[metrics.region, metrics.ts]], 
aggr=[[count(metrics.value)]]
           TableScan: metrics
   
   thread 'main' panicked at src/main.rs:51:29:
   called `Result::unwrap()` on an `Err` value: Context("SanityCheckPlan", 
Plan("Plan: [\"AggregateExec: mode=SinglePartitioned, gby=[ts@0 as ts], 
aggr=[count(count(metrics.value))]\", \"  ProjectionExec: expr=[ts@1 as ts, 
count(metrics.value)@2 as count(metrics.value)]\", \"    AggregateExec: 
mode=FinalPartitioned, gby=[region@0 as region, ts@1 as ts], 
aggr=[count(metrics.value)]\", \"      CoalesceBatchesExec: 
target_batch_size=8192\", \"        RepartitionExec: 
partitioning=Hash([region@0, ts@1], 64), input_partitions=2\", \"          
AggregateExec: mode=Partial, gby=[region@1 as region, ts@0 as ts], 
aggr=[count(metrics.value)]\", \"            DataSourceExec: partitions=2, 
partition_sizes=[0, 0]\"] does not satisfy distribution requirements: 
HashPartitioned[[ts@0]]). Child-0 output partitioning: Hash([region@0, ts@0], 
64)"))
   ```
   
   ### Additional context
   
   If the input table is single-partitioned, or any sort operation is missing, 
the query executes successfully.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to