friendlymatthew opened a new issue, #20695:
URL: https://github.com/apache/datafusion/issues/20695

   ### Describe the bug
   
   When a Parquet file schema contains Struct columns, Datafusion's row filter 
pushdown reads the wrong column data. Filters will silently evaluate against an 
incorrect column and return wrong results
   
   Arrow and Parquet number columns differently when Structs are present. Arrow 
counts a Struct as a single field, while Parquet flattens its children into 
separate leaf columns:
   
   - Arrow: col_a=0, struct_col=1, col_b=2
   - Parquet: col_a=0,**struct_col.x=1**, **struct_col.y=2**, col_b=3
   
   `leaf_indices_for_roots` has a fast path for `PrimitiveOnly` filters that 
returns Arrow root indices directly as Parquet leaf indices: 
https://github.com/apache/datafusion/blob/92d0a5c527ed38b6ce3bad673f6cf5a372c71b0c/datafusion/datasource-parquet/src/row_filter.rs#L437-L445
   
   This assumes root index == leaf index, which is only true when the schema 
has no Struct or other group columns
   
   ### To Reproduce
   
   Here's a test case where a filter on any primitive column that appears 
**after** a Struct column in the schema will read the wrong parquet leaf. The 
offset grows with the number of struct children
   
   ```rs
   /// Schema:
   ///   Arrow indices:   col_a=0  struct_col=1  col_b=2
   ///   Parquet leaves:  col_a=0  struct_col.x=1  struct_col.y=2  col_b=3
   ///
   /// A filter on col_b should project Parquet leaf 3, but the bug causes it to
   /// project leaf 2 (struct_col.y).
   #[test]
   fn test_filter_pushdown_leaf_index_with_struct_in_schema() {
       use arrow::array::{Int32Array, StringArray, StructArray};
   
       let schema = Arc::new(Schema::new(vec![
           Field::new("col_a", DataType::Int32, false),
           Field::new(
               "struct_col",
               DataType::Struct(
                   vec![
                       Arc::new(Field::new("x", DataType::Int32, true)),
                       Arc::new(Field::new("y", DataType::Int32, true)),
                   ]
                   .into(),
               ),
               true,
           ),
           Field::new("col_b", DataType::Utf8, false),
       ]));
   
       let col_a = Arc::new(Int32Array::from(vec![1, 2, 3]));
       let struct_col = Arc::new(StructArray::from(vec![
           (
               Arc::new(Field::new("x", DataType::Int32, true)),
               Arc::new(Int32Array::from(vec![10, 20, 30])) as _,
           ),
           (
               Arc::new(Field::new("y", DataType::Int32, true)),
               Arc::new(Int32Array::from(vec![100, 200, 300])) as _,
           ),
       ]));
       let col_b = Arc::new(StringArray::from(vec!["aaa", "target", "zzz"]));
   
       let batch =
           RecordBatch::try_new(Arc::clone(&schema), vec![col_a, struct_col, 
col_b])
               .unwrap();
   
       let file = NamedTempFile::new().expect("temp file");
       let mut writer =
           ArrowWriter::try_new(file.reopen().unwrap(), Arc::clone(&schema), 
None)
               .expect("writer");
       writer.write(&batch).expect("write batch");
       writer.close().expect("close writer");
   
       let reader_file = file.reopen().expect("reopen file");
       let builder = ParquetRecordBatchReaderBuilder::try_new(reader_file)
           .expect("reader builder");
       let metadata = builder.metadata().clone();
       let file_schema = builder.schema().clone();
   
       // sanity check: 4 Parquet leaves, 3 Arrow fields
       assert_eq!(metadata.file_metadata().schema_descr().num_columns(), 4);
       assert_eq!(file_schema.fields().len(), 3);
   
       // build a filter candidate for `col_b = 'target'` through the public API
       let expr = col("col_b").eq(Expr::Literal(
           ScalarValue::Utf8(Some("target".to_string())),
           None,
       ));
       let expr = logical2physical(&expr, &file_schema);
   
       let candidate = FilterCandidateBuilder::new(expr, file_schema)
           .build(&metadata)
           .expect("building candidate")
           .expect("filter on primitive col_b should be pushable");
   
       // the bug: col_b is Parquet leaf 3, but the PrimitiveOnly fast-path 
returns
       // Arrow index 2 (which is struct_col.y).
       assert_eq!(
           candidate.projection.leaf_indices,
           vec![3],
           "BUG: leaf_indices should be [3] for col_b, but PrimitiveOnly 
returns [2] (struct_col.y)"
       );
   }
   ```
   
   ### Expected behavior
   
   _No response_
   
   ### Additional context
   
   We're integrating Variant columns


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to