adriangb commented on code in PR #20829:
URL: https://github.com/apache/datafusion/pull/20829#discussion_r2911843122


##########
datafusion/datasource-parquet/benches/parquet_struct_filter_pushdown.rs:
##########
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks for struct field filter pushdown in Parquet.
+//!
+//! Compares scanning with vs without row-level filter pushdown for
+//! predicates on struct sub-fields (e.g. `get_field(s, 'id') = 42`).
+
+use std::path::{Path, PathBuf};
+use std::sync::{Arc, LazyLock};
+
+use arrow::array::{BinaryBuilder, BooleanArray, Int32Array, RecordBatch, 
StructArray};
+use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use criterion::{Criterion, Throughput, criterion_group, criterion_main};
+use datafusion_common::ScalarValue;
+use datafusion_datasource_parquet::{ParquetFileMetrics, build_row_filter};
+use datafusion_expr::{Expr, col};
+use datafusion_physical_expr::planner::logical2physical;
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+use parquet::arrow::{ArrowWriter, ProjectionMask};
+use parquet::file::properties::WriterProperties;
+use tempfile::TempDir;
+
+const ROW_GROUP_ROW_COUNT: usize = 10_000;
+const TOTAL_ROW_GROUPS: usize = 10;
+const TOTAL_ROWS: usize = ROW_GROUP_ROW_COUNT * TOTAL_ROW_GROUPS;
+/// Only one row group will contain the target value.
+const TARGET_VALUE: i32 = 42;
+const STRUCT_COLUMN_NAME: &str = "s";
+const PAYLOAD_COLUMN_NAME: &str = "payload";
+// Large binary payload to emphasize decoding overhead when pushdown is 
disabled.
+const PAYLOAD_BYTES: usize = 8 * 1024;
+
+struct BenchmarkDataset {
+    _tempdir: TempDir,
+    file_path: PathBuf,
+}
+
+impl BenchmarkDataset {
+    fn path(&self) -> &Path {
+        &self.file_path
+    }
+}
+
+static DATASET: LazyLock<BenchmarkDataset> = LazyLock::new(|| {
+    create_dataset().expect("failed to prepare parquet benchmark dataset")
+});
+
+fn parquet_struct_filter_pushdown(c: &mut Criterion) {
+    let dataset_path = DATASET.path().to_owned();
+    let mut group = c.benchmark_group("parquet_struct_filter_pushdown");
+    group.throughput(Throughput::Elements(TOTAL_ROWS as u64));
+
+    group.bench_function("no_pushdown", |b| {
+        let file_schema = setup_reader(&dataset_path);
+        let predicate = logical2physical(&create_predicate(), &file_schema);
+        b.iter(|| {
+            let matched = scan_with_predicate(&dataset_path, &predicate, false)
+                .expect("baseline parquet scan with filter succeeded");
+            assert_eq!(matched, ROW_GROUP_ROW_COUNT);
+        });
+    });
+
+    group.bench_function("with_pushdown", |b| {
+        let file_schema = setup_reader(&dataset_path);
+        let predicate = logical2physical(&create_predicate(), &file_schema);
+        b.iter(|| {
+            let matched = scan_with_predicate(&dataset_path, &predicate, true)
+                .expect("pushdown parquet scan with filter succeeded");
+            assert_eq!(matched, ROW_GROUP_ROW_COUNT);
+        });
+    });
+
+    group.finish();
+}
+
+fn setup_reader(path: &Path) -> SchemaRef {
+    let file = std::fs::File::open(path).expect("failed to open file");
+    let builder =
+        ParquetRecordBatchReaderBuilder::try_new(file).expect("failed to build 
reader");
+    Arc::clone(builder.schema())
+}
+
+/// `get_field(s, 'id') = TARGET_VALUE`
+fn create_predicate() -> Expr {
+    let get_field_expr = datafusion_functions::core::get_field().call(vec![
+        col(STRUCT_COLUMN_NAME),
+        Expr::Literal(ScalarValue::Utf8(Some("id".to_string())), None),
+    ]);
+    get_field_expr.eq(Expr::Literal(ScalarValue::Int32(Some(TARGET_VALUE)), 
None))
+}
+
+fn scan_with_predicate(
+    path: &Path,
+    predicate: &Arc<dyn datafusion_physical_expr::PhysicalExpr>,
+    pushdown: bool,
+) -> datafusion_common::Result<usize> {
+    let file = std::fs::File::open(path)?;
+    let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
+    let metadata = builder.metadata().clone();
+    let file_schema = builder.schema();
+    let projection = ProjectionMask::all();
+
+    let metrics = ExecutionPlanMetricsSet::new();
+    let file_metrics = ParquetFileMetrics::new(0, &path.display().to_string(), 
&metrics);
+
+    let builder = if pushdown {
+        if let Some(row_filter) =
+            build_row_filter(predicate, file_schema, &metadata, false, 
&file_metrics)?
+        {
+            builder.with_row_filter(row_filter)
+        } else {
+            builder
+        }
+    } else {
+        builder
+    };
+
+    let reader = builder.with_projection(projection).build()?;
+
+    let mut matched_rows = 0usize;
+    for batch in reader {
+        let batch = batch?;
+        matched_rows += count_matches(predicate, &batch)?;
+    }
+
+    Ok(matched_rows)
+}
+
+fn count_matches(
+    expr: &Arc<dyn datafusion_physical_expr::PhysicalExpr>,
+    batch: &RecordBatch,
+) -> datafusion_common::Result<usize> {
+    let values = expr.evaluate(batch)?.into_array(batch.num_rows())?;
+    let bools = values
+        .as_any()
+        .downcast_ref::<BooleanArray>()
+        .expect("boolean filter result");
+
+    Ok(bools.iter().filter(|v| matches!(v, Some(true))).count())
+}
+
+fn schema() -> SchemaRef {
+    let struct_fields = Fields::from(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("tag", DataType::Int32, false),
+    ]);
+    Arc::new(Schema::new(vec![
+        Field::new(STRUCT_COLUMN_NAME, DataType::Struct(struct_fields), false),
+        Field::new(PAYLOAD_COLUMN_NAME, DataType::Binary, false),
+    ]))
+}
+
+fn create_dataset() -> datafusion_common::Result<BenchmarkDataset> {
+    let tempdir = TempDir::new()?;
+    let file_path = tempdir.path().join("struct_filter.parquet");
+
+    let schema = schema();
+    let writer_props = WriterProperties::builder()
+        .set_max_row_group_row_count(Some(ROW_GROUP_ROW_COUNT))
+        .build();
+
+    let mut writer = ArrowWriter::try_new(
+        std::fs::File::create(&file_path)?,
+        Arc::clone(&schema),
+        Some(writer_props),
+    )?;
+
+    // Each row group has a distinct `s.id` value. Only one row group
+    // matches the target, so pushdown should prune 90% of rows.
+    for rg_idx in 0..TOTAL_ROW_GROUPS {
+        let id_value = if rg_idx == TOTAL_ROW_GROUPS - 1 {
+            TARGET_VALUE
+        } else {
+            (rg_idx as i32 + 1) * 1000
+        };
+        let batch = build_struct_batch(&schema, id_value, 
ROW_GROUP_ROW_COUNT)?;
+        writer.write(&batch)?;
+    }
+
+    writer.close()?;
+
+    let reader =
+        
ParquetRecordBatchReaderBuilder::try_new(std::fs::File::open(&file_path)?)?;
+    assert_eq!(reader.metadata().row_groups().len(), TOTAL_ROW_GROUPS);
+
+    Ok(BenchmarkDataset {
+        _tempdir: tempdir,
+        file_path,
+    })
+}
+
+fn build_struct_batch(
+    schema: &SchemaRef,
+    id_value: i32,
+    len: usize,
+) -> datafusion_common::Result<RecordBatch> {
+    let id_array = Arc::new(Int32Array::from(vec![id_value; len]));
+    let tag_array = Arc::new(Int32Array::from(
+        (0..len).map(|i| i as i32).collect::<Vec<_>>(),
+    ));
+
+    let struct_array = StructArray::from(vec![

Review Comment:
   IMO the dataset should be something like:
   
   ```sql
   CREATE TABLE t (
      id INT,  // useful to check correctness
      large_string TEXT, // so we can check `SELECT * FROM t WHERE get_field(s, 
'id') = 42
      s {
        id: INT,  // matches external id
        large_string: TEXT // matches external large string, or not
      }
   )
   ```
   
   The point is that we can test:
   1. The impact of #20822 on `select * from t where get_field(s, 'id') = 42` 
as well as things like `select * from t where get_field(s, 'id') = id`.
   2. Once we build the right ProjectionMask, etc. we can test the impact of 
`select id from t where get_field(s, 'id') = 42` (should be faster because we 
don't have to read `s['large_string']`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to