rluvaton opened a new issue, #15530:
URL: https://github.com/apache/datafusion/issues/15530

   ### Describe the bug
   
   when using aggregate exec with single mode, and spilling and the group by 
expressions are not the first expressions from the previous plan there will be 
schema mismatch
   
   ### To Reproduce
   
   ```rust
   #[cfg(test)]
   mod tests {
       use std::fmt::{Display, Formatter};
       use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch, 
StringArray};
       use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
       use datafusion::common::Result;
       use datafusion::execution::memory_pool::FairSpillPool;
       use datafusion::execution::runtime_env::RuntimeEnvBuilder;
       use datafusion::execution::TaskContext;
       use datafusion::functions_aggregate::sum::sum_udaf;
       use datafusion::physical_expr::aggregate::AggregateExprBuilder;
       use datafusion::physical_expr::expressions::{lit, Column};
       use datafusion::physical_plan::aggregates::{PhysicalGroupBy, 
AggregateExec, AggregateMode};
       use datafusion::physical_plan::common::collect;
       use datafusion::physical_plan::ExecutionPlan;
       use rand::{random, thread_rng, Rng};
       use std::sync::Arc;
       use datafusion_physical_plan::memory::{LazyBatchGenerator, 
LazyMemoryExec};
       use parking_lot::RwLock;
   
       #[tokio::test]
       async fn test_debug() -> Result<()> {
           let scan_schema = Arc::new(Schema::new(vec![
               Field::new("col_0", DataType::Int64, true),
               Field::new("col_1", DataType::Utf8, true),
               Field::new("col_2", DataType::Utf8, true),
               Field::new("col_3", DataType::Utf8, true),
               Field::new("col_4", DataType::Utf8, true),
               Field::new("col_5", DataType::Int32, true),
               Field::new("col_6", DataType::Utf8, true),
               Field::new("col_7", DataType::Utf8, true),
               Field::new("col_8", DataType::Utf8, true),
           ]));
   
           let group_by = PhysicalGroupBy::new_single(vec![
               (Arc::new(Column::new("col_1", 1)), "col_1".to_string()),
               (Arc::new(Column::new("col_7", 7)), "col_7".to_string()),
               (Arc::new(Column::new("col_0", 0)), "col_0".to_string()),
               (Arc::new(Column::new("col_8", 8)), "col_8".to_string()),
           ]);
   
           fn generate_int64_array() -> ArrayRef {
               Arc::new(Int64Array::from_iter_values(
                   (0..8192).map(|_| random::<i64>()),
               ))
           }
           fn generate_int32_array() -> ArrayRef {
               Arc::new(Int32Array::from_iter_values(
                   (0..8192).map(|_| random::<i32>()),
               ))
           }
   
           fn generate_string_array() -> ArrayRef {
               Arc::new(StringArray::from(
                   (0..8192)
                       .map(|_| -> String {
                           thread_rng()
                               .sample_iter::<char, 
_>(rand::distributions::Standard)
                               .take(10)
                               .collect()
                       })
                       .collect::<Vec<_>>(),
               ))
           }
   
           fn generate_record_batch(schema: &SchemaRef) -> Result<RecordBatch> {
               RecordBatch::try_new(
                   Arc::clone(&schema),
                   vec![
                       generate_int64_array(),
                       generate_string_array(),
                       generate_string_array(),
                       generate_string_array(),
                       generate_string_array(),
                       generate_int32_array(),
                       generate_string_array(),
                       generate_string_array(),
                       generate_string_array(),
                   ],
               )
               .map_err(|err| err.into())
           }
   
           let aggregate_expressions = vec![Arc::new(
               AggregateExprBuilder::new(sum_udaf(), vec![lit(1i64)])
                   .schema(Arc::clone(&scan_schema))
                   .alias("SUM(1i64)")
                   .build()?,
           )];
   
           #[derive(Debug)]
           struct Generator {
               index: usize,
               count: usize,
               schema: SchemaRef,
           }
   
           impl Display for Generator {
               fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
                   write!(f, "Generator")
               }
           }
   
           impl LazyBatchGenerator for Generator {
               fn generate_next_batch(&mut self) -> Result<Option<RecordBatch>> 
{
                   if self.index > self.count {
                       return Ok(None);
                   }
   
                   let batch = generate_record_batch(&self.schema)?;
                   self.index += 1;
   
                   Ok(Some(batch))
               }
           }
   
           let generator = Generator {
               index: 0,
               count: 10,
               schema: Arc::clone(&scan_schema),
           };
   
           let plan: Arc<dyn ExecutionPlan> =
             Arc::new(LazyMemoryExec::try_new(Arc::clone(&scan_schema), 
vec![Arc::new(RwLock::new(generator))])?);
   
           let single_aggregate = Arc::new(AggregateExec::try_new(
               AggregateMode::Single,
               group_by,
               aggregate_expressions.clone(),
               vec![None; aggregate_expressions.len()],
               plan,
               Arc::clone(&scan_schema),
           )?);
   
           let memory_pool = Arc::new(FairSpillPool::new(10006216));
           let task_ctx = Arc::new(
               TaskContext::default().with_runtime(Arc::new(
                   RuntimeEnvBuilder::new()
                       .with_memory_pool(memory_pool)
                       .build()?,
               )),
           );
   
           let res = collect(single_aggregate.execute(0, 
Arc::clone(&task_ctx))?).await;
   
           match res {
               Ok(_) => println!("Success"),
               Err(e) => {
                   println!("Error: {}", e);
   
                   return Err(e);
               },
           }
   
           Ok(())
       }
   }
   ```
   
   
   The following error happen:
   ```
   Error: Internal error: PhysicalExpr Column references column 'col_7' at 
index 7 (zero-based) but input schema only has 5 columns: ["col_1", "col_7", 
"col_0", "col_8", "SUM(1i64)[sum]"]
   
   backtrace:    0: std::backtrace_rs::backtrace::libunwind::trace
                ...
      1: std::backtrace_rs::backtrace::trace_unsynchronized
                at ...
      2: std::backtrace::Backtrace::create
                at ...
      3: datafusion_common::error::DataFusionError::get_back_trace
                at <crates>/datafusion-common-46.0.1/src/error.rs:473:30
      4: datafusion_physical_expr::expressions::column::Column::bounds_check
                at 
<crates>/datafusion-physical-expr-46.0.1/src/expressions/column.rs:147:13
      5: <datafusion_physical_expr::expressions::column::Column as 
datafusion_physical_expr_common::physical_expr::PhysicalExpr>::evaluate
                at 
<crates>/datafusion-physical-expr-46.0.1/src/expressions/column.rs:126:9
      6: datafusion_physical_plan::aggregates::evaluate_group_by::{{closure}}
                at 
<crates>/datafusion-physical-plan-46.0.1/src/aggregates/mod.rs:1305:25
      7: core::iter::adapters::map::map_try_fold::{{closure}}
                at ...
                
     8-21: std stuff
     
     22: core::iter::traits::iterator::Iterator::collect
                at ...
     23: datafusion_physical_plan::aggregates::evaluate_group_by
                at 
<crates>/datafusion-physical-plan-46.0.1/src/aggregates/mod.rs:1301:32
     24: 
datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream::group_aggregate_batch
                at 
<crates>/datafusion-physical-plan-46.0.1/src/aggregates/row_hash.rs:821:13
     25: 
<datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as 
futures_core::stream::Stream>::poll_next
                at 
<crates>/datafusion-physical-plan-46.0.1/src/aggregates/row_hash.rs:697:29
                
     26-28: futures stuff
     
     29: datafusion_physical_plan::common::collect::{{closure}}
                at <crates>/datafusion-physical-plan-46.0.1/src/common.rs:45:36
     30: datafusion_pg::repro_bug_agg::tests::test_debug::{{closure}}
                at ./src/repro_bug_agg.rs:145:99
                
     31-51: std and tokio stuff
     
     52: datafusion_pg::repro_bug_agg::tests::test_debug
                at ./src/repro_bug_agg.rs:147:9
     53: datafusion_pg::repro_bug_agg::tests::test_debug::{{closure}}
                at ./src/repro_bug_agg.rs:23:30
   .
   This was likely caused by a bug in DataFusion's code and we would welcome 
that you file an bug report in our issue tracker
   ```
   
   ### Expected behavior
   
   should not fail
   
   ### Additional context
   
   The spill schema is:
   ```
   spill_state.spill_schema: Schema {
       fields: [
           Field {
               name: "col_1",
               data_type: Utf8,
               nullable: true,
               dict_id: 0,
               dict_is_ordered: false,
               metadata: {},
           },
           Field {
               name: "col_7",
               data_type: Utf8,
               nullable: true,
               dict_id: 0,
               dict_is_ordered: false,
               metadata: {},
           },
           Field {
               name: "col_0",
               data_type: Int64,
               nullable: true,
               dict_id: 0,
               dict_is_ordered: false,
               metadata: {},
           },
           Field {
               name: "col_8",
               data_type: Utf8,
               nullable: true,
               dict_id: 0,
               dict_is_ordered: false,
               metadata: {},
           },
           Field {
               name: "SUM(1i64)[sum]",
               data_type: Int64,
               nullable: true,
               dict_id: 0,
               dict_is_ordered: false,
               metadata: {},
           },
       ],
       metadata: {},
   }
   ```
   
   the issue is that the spilling schema is the output schema of the 
intermediate results while the group by expressions are the same and because 
column point to an index rather than by name the index now does not exists


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to