adriangb commented on code in PR #16086: URL: https://github.com/apache/datafusion/pull/16086#discussion_r2094817127
########## datafusion/core/src/datasource/physical_plan/parquet.rs: ########## @@ -848,6 +848,68 @@ mod tests { assert_eq!(read.len(), 0); } + #[tokio::test] + async fn evolved_schema_column_type_filter_strings() { + // The table and filter have a common data type, but the file schema differs + let c1: ArrayRef = + Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")])); + let batch = create_batch(vec![("c1", c1.clone())]); + + let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)])); + + // Predicate should prune all row groups + let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string())))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema.clone()) + .round_trip_to_batches(vec![batch.clone()]) + .await + .unwrap(); + assert_eq!(read.len(), 0); + + // Predicate should prune no row groups + let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string())))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema) + .round_trip_to_batches(vec![batch]) + .await + .unwrap(); + assert_eq!(read.len(), 1); + } + + #[tokio::test] + async fn evolved_schema_column_type_filter_ints() { + // The table and filter have a common data type, but the file schema differs + let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)])); + let batch = create_batch(vec![("c1", c1.clone())]); + + let schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)])); + + // Predicate should prune all row groups + let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema.clone()) + .round_trip_to_batches(vec![batch.clone()]) + .await + .unwrap(); + assert_eq!(read.len(), 0); + + // TODO: this is failing on main, and has been for a long time! + // See <comment on PR> + // // Predicate should prune no row groups + // let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); + // let read = RoundTrip::new() + // .with_predicate(filter) + // .with_schema(schema) + // .round_trip_to_batches(vec![batch]) + // .await + // .unwrap(); + // assert_eq!(read.len(), 1); Review Comment: This has been failing as far back as 76a7789ace33ced54c973fa0d5fc9d1866e1bf19 with this diff: ```diff diff --git a/datafusion-testing b/datafusion-testing index 3462eaa78..e9f9e22cc 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit 3462eaa787459957e38df267a4a21f5bea605807 +Subproject commit e9f9e22ccf09145a7368f80fd6a871f11e2b4481 diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 3b71593b3..60b403aff 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -67,13 +67,14 @@ pub(crate) mod test_util { .into_iter() .zip(tmp_files.into_iter()) .map(|(batch, mut output)| { - let builder = parquet::file::properties::WriterProperties::builder(); - let props = if multi_page { + let mut builder = parquet::file::properties::WriterProperties::builder(); + builder = if multi_page { builder.set_data_page_row_count_limit(ROWS_PER_PAGE) } else { builder - } - .build(); + }; + builder = builder.set_bloom_filter_enabled(true); + let props = builder.build(); let mut writer = parquet::arrow::ArrowWriter::try_new( &mut output, diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 888f3ad9e..240d84783 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -489,6 +489,37 @@ mod tests { assert_eq!(read.len(), 0); } + + #[tokio::test] + async fn evolved_schema_column_type_filter_ints() { + // The table and filter have a common data type, but the file schema differs + let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)])); + let batch = create_batch(vec![("c1", c1.clone())]); + + let schema = + Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)])); + + // // Predicate should prune all row groups + // let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5)))); + // let read = RoundTrip::new() + // .with_predicate(filter) + // .with_schema(schema.clone()) + // .round_trip_to_batches(vec![batch.clone()]) + // .await + // .unwrap(); + // assert_eq!(read.len(), 0); + + // Predicate should prune no row groups + let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1)))); + let read = RoundTrip::new() + .with_predicate(filter) + .with_schema(schema) + .round_trip_to_batches(vec![batch]) + .await + .unwrap(); + assert_eq!(read.len(), 1); + } + #[tokio::test] async fn evolved_schema_disjoint_schema_filter() { let c1: ArrayRef = diff --git a/parquet-testing b/parquet-testing index f4d7ed772..6e851ddd7 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit f4d7ed772a62a95111db50fbcad2460833e8c882 +Subproject commit 6e851ddd768d6af741c7b15dc594874399fc3cff ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org