This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1416ed4d50 Add benchmarks for Parquet struct leaf-level projection 
pruning (#21180)
1416ed4d50 is described below

commit 1416ed4d5007180136dae0aaeb921f0681ec001e
Author: Matthew Kim <[email protected]>
AuthorDate: Thu Mar 26 15:19:16 2026 -0400

    Add benchmarks for Parquet struct leaf-level projection pruning (#21180)
    
    ## Rationale for this change
    
    This PR adds benchmarks that measure the perf of projecting individual
    fields from struct columns in Parquet files. #20925 introduced
    leaf-level projection masking so that `select s['small_int']` on a
    struct with large string fields only reads the small integer leaf,
    skipping the expensive string decoding entirely
    
    3 dataset shapes are coevered, each with ~262K rows of 8kb string
    payloads: a narrow struct (2 leaves), a wide struct (5 leaves), and a
    nested struct. Each shape benchmarks full-struct reads against
    single-field projections
---
 datafusion/core/Cargo.toml                         |   5 +
 .../core/benches/parquet_struct_projection.rs      | 403 +++++++++++++++++++++
 2 files changed, 408 insertions(+)

diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 326b791a2f..e3864e4bff 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -247,6 +247,11 @@ harness = false
 name = "parquet_struct_query"
 required-features = ["parquet"]
 
+[[bench]]
+harness = false
+name = "parquet_struct_projection"
+required-features = ["parquet"]
+
 [[bench]]
 harness = false
 name = "range_and_generate_series"
diff --git a/datafusion/core/benches/parquet_struct_projection.rs 
b/datafusion/core/benches/parquet_struct_projection.rs
new file mode 100644
index 0000000000..d6cf86a91c
--- /dev/null
+++ b/datafusion/core/benches/parquet_struct_projection.rs
@@ -0,0 +1,403 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks for struct leaf-level projection pruning in Parquet.
+//!
+//! Measures the benefit of reading only the needed leaf columns from a
+//! struct column. Three dataset shapes are tested:
+//!
+//! 1. **Narrow struct** (2 leaves): one 128 KiB UTF-8 field + one INT field
+//! 2. **Wide struct** (5 leaves): four 128 KiB UTF-8 fields + one INT field
+//! 3. **Nested struct** (3 leaves): `STRUCT<inner: STRUCT<large_string, 
small_int>, extra_string>`
+//!
+//! In all cases, projecting just the small integer should skip decoding
+//! all of the large string leaves, including through nested struct levels.
+
+use arrow::array::{ArrayRef, Int32Array, StringBuilder, StructArray};
+use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use criterion::{Criterion, criterion_group, criterion_main};
+use datafusion::prelude::SessionContext;
+use datafusion_common::instant::Instant;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::{WriterProperties, WriterVersion};
+use std::hint::black_box;
+use std::path::Path;
+use std::sync::Arc;
+use tempfile::NamedTempFile;
+use tokio::runtime::Runtime;
+
+const NUM_BATCHES: usize = 64;
+const WRITE_RECORD_BATCH_SIZE: usize = 4096;
+const ROW_GROUP_ROW_COUNT: usize = 65536;
+const EXPECTED_ROW_GROUPS: usize = 4;
+const LARGE_STRING_LEN: usize = 128 * 1024;
+
+fn narrow_schema() -> SchemaRef {
+    let struct_fields = Fields::from(vec![
+        Field::new("large_string", DataType::Utf8, false),
+        Field::new("small_int", DataType::Int32, false),
+    ]);
+    Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("s", DataType::Struct(struct_fields), false),
+    ]))
+}
+
+fn narrow_batch(batch_id: usize) -> RecordBatch {
+    let schema = narrow_schema();
+    let len = WRITE_RECORD_BATCH_SIZE;
+
+    let base_id = (batch_id * len) as i32;
+    let id_values: Vec<i32> = (0..len).map(|i| base_id + i as i32).collect();
+    let id_array = Arc::new(Int32Array::from(id_values.clone()));
+
+    let small_int_array = Arc::new(Int32Array::from(id_values));
+
+    let large_string: String = "x".repeat(LARGE_STRING_LEN);
+    let mut string_builder = StringBuilder::new();
+    for _ in 0..len {
+        string_builder.append_value(&large_string);
+    }
+    let large_string_array = Arc::new(string_builder.finish());
+
+    let struct_array = StructArray::from(vec![
+        (
+            Arc::new(Field::new("large_string", DataType::Utf8, false)),
+            large_string_array as ArrayRef,
+        ),
+        (
+            Arc::new(Field::new("small_int", DataType::Int32, false)),
+            small_int_array as ArrayRef,
+        ),
+    ]);
+
+    RecordBatch::try_new(schema, vec![id_array, 
Arc::new(struct_array)]).unwrap()
+}
+
+fn wide_schema() -> SchemaRef {
+    let struct_fields = Fields::from(vec![
+        Field::new("str_a", DataType::Utf8, false),
+        Field::new("str_b", DataType::Utf8, false),
+        Field::new("str_c", DataType::Utf8, false),
+        Field::new("str_d", DataType::Utf8, false),
+        Field::new("small_int", DataType::Int32, false),
+    ]);
+    Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("s", DataType::Struct(struct_fields), false),
+    ]))
+}
+
+fn wide_batch(batch_id: usize) -> RecordBatch {
+    let schema = wide_schema();
+    let len = WRITE_RECORD_BATCH_SIZE;
+
+    let base_id = (batch_id * len) as i32;
+    let id_values: Vec<i32> = (0..len).map(|i| base_id + i as i32).collect();
+    let id_array = Arc::new(Int32Array::from(id_values.clone()));
+    let small_int_array = Arc::new(Int32Array::from(id_values));
+
+    let large_string: String = "x".repeat(LARGE_STRING_LEN);
+    let mut string_fields: Vec<(Arc<Field>, ArrayRef)> = Vec::new();
+    for name in &["str_a", "str_b", "str_c", "str_d"] {
+        let mut sb = StringBuilder::new();
+        for _ in 0..len {
+            sb.append_value(&large_string);
+        }
+        string_fields.push((
+            Arc::new(Field::new(*name, DataType::Utf8, false)),
+            Arc::new(sb.finish()) as ArrayRef,
+        ));
+    }
+    string_fields.push((
+        Arc::new(Field::new("small_int", DataType::Int32, false)),
+        small_int_array as ArrayRef,
+    ));
+
+    let struct_array = StructArray::from(string_fields);
+    RecordBatch::try_new(schema, vec![id_array, 
Arc::new(struct_array)]).unwrap()
+}
+
+fn generate_file(
+    schema: SchemaRef,
+    batch_fn: fn(usize) -> RecordBatch,
+    prefix: &str,
+) -> NamedTempFile {
+    let now = Instant::now();
+    let mut named_file = tempfile::Builder::new()
+        .prefix(prefix)
+        .suffix(".parquet")
+        .tempfile()
+        .unwrap();
+
+    println!("Generating parquet file - {}", named_file.path().display());
+
+    let properties = WriterProperties::builder()
+        .set_writer_version(WriterVersion::PARQUET_2_0)
+        .set_max_row_group_row_count(Some(ROW_GROUP_ROW_COUNT))
+        .build();
+
+    let mut writer =
+        ArrowWriter::try_new(&mut named_file, schema, 
Some(properties)).unwrap();
+
+    for batch_id in 0..NUM_BATCHES {
+        let batch = batch_fn(batch_id);
+        writer.write(&batch).unwrap();
+    }
+
+    let metadata = writer.close().unwrap();
+    let file_metadata = metadata.file_metadata();
+    let expected_rows = WRITE_RECORD_BATCH_SIZE * NUM_BATCHES;
+    assert_eq!(
+        file_metadata.num_rows() as usize,
+        expected_rows,
+        "Expected {expected_rows} rows but got {}",
+        file_metadata.num_rows()
+    );
+    assert_eq!(
+        metadata.row_groups().len(),
+        EXPECTED_ROW_GROUPS,
+        "Expected {EXPECTED_ROW_GROUPS} row groups but got {}",
+        metadata.row_groups().len()
+    );
+
+    println!(
+        "Generated parquet file with {} rows and {} row groups in {:.2}s",
+        file_metadata.num_rows(),
+        metadata.row_groups().len(),
+        now.elapsed().as_secs_f32()
+    );
+
+    named_file
+}
+
+fn create_context(rt: &Runtime, file_path: &str, table: &str) -> 
SessionContext {
+    let ctx = SessionContext::new();
+    rt.block_on(ctx.register_parquet(table, file_path, Default::default()))
+        .unwrap();
+    ctx
+}
+
+fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) {
+    let ctx = ctx.clone();
+    let sql = sql.to_string();
+    let df = rt.block_on(ctx.sql(&sql)).unwrap();
+    black_box(rt.block_on(df.collect()).unwrap());
+}
+
+fn narrow_benchmarks(c: &mut Criterion) {
+    let temp_file = generate_file(narrow_schema(), narrow_batch, 
"narrow_struct");
+    let file_path = temp_file.path().display().to_string();
+    assert!(Path::new(&file_path).exists(), "path not found");
+
+    let rt = Runtime::new().unwrap();
+    let ctx = create_context(&rt, &file_path, "t");
+
+    let mut group = c.benchmark_group("narrow_struct");
+
+    // baseline: full struct, must decode both leaves
+    group.bench_function("select_struct", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT s FROM t"))
+    });
+
+    // pruned: skip large_string, read only small_int
+    group.bench_function("select_small_field", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT s['small_int'] FROM t"))
+    });
+
+    // pruned: skip small_int, read only large_string
+    group.bench_function("select_large_field", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT s['large_string'] FROM t"))
+    });
+
+    // no pruning: all columns
+    group.bench_function("select_all", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT * FROM t"))
+    });
+
+    // top-level column + pruned struct sub-field
+    group.bench_function("select_id_and_small_field", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT id, s['small_int'] FROM t"))
+    });
+
+    // aggregation on pruned sub-field, realistic analytical pattern
+    group.bench_function("sum_small_field", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT SUM(s['small_int']) FROM t"))
+    });
+
+    group.finish();
+    drop(temp_file);
+}
+
+fn wide_benchmarks(c: &mut Criterion) {
+    let temp_file = generate_file(wide_schema(), wide_batch, "wide_struct");
+    let file_path = temp_file.path().display().to_string();
+    assert!(Path::new(&file_path).exists(), "path not found");
+
+    let rt = Runtime::new().unwrap();
+    let ctx = create_context(&rt, &file_path, "t");
+
+    let mut group = c.benchmark_group("wide_struct");
+
+    // baseline: full struct, must decode all 5 leaves
+    group.bench_function("select_struct", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT s FROM t"))
+    });
+
+    // pruned: skip all 4 large string leaves
+    group.bench_function("select_small_field", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT s['small_int'] FROM t"))
+    });
+
+    // pruned: read 1 of 4 string leaves + skip the rest
+    group.bench_function("select_one_string_field", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT s['str_a'] FROM t"))
+    });
+
+    // pruned: read 2 of 4 string leaves
+    group.bench_function("select_two_string_fields", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT s['str_a'], s['str_b'] FROM t"))
+    });
+
+    // no pruning: all columns
+    group.bench_function("select_all", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT * FROM t"))
+    });
+
+    // aggregation on pruned sub-field, skips all 4 large leaves
+    group.bench_function("sum_small_field", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT SUM(s['small_int']) FROM t"))
+    });
+
+    group.finish();
+    drop(temp_file);
+}
+
+fn nested_schema() -> SchemaRef {
+    let inner_fields = Fields::from(vec![
+        Field::new("large_string", DataType::Utf8, false),
+        Field::new("small_int", DataType::Int32, false),
+    ]);
+    let outer_fields = Fields::from(vec![
+        Field::new("inner", DataType::Struct(inner_fields), false),
+        Field::new("extra_string", DataType::Utf8, false),
+    ]);
+    Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("s", DataType::Struct(outer_fields), false),
+    ]))
+}
+
+fn nested_batch(batch_id: usize) -> RecordBatch {
+    let schema = nested_schema();
+    let len = WRITE_RECORD_BATCH_SIZE;
+
+    let base_id = (batch_id * len) as i32;
+    let id_values: Vec<i32> = (0..len).map(|i| base_id + i as i32).collect();
+    let id_array = Arc::new(Int32Array::from(id_values.clone()));
+    let small_int_array = Arc::new(Int32Array::from(id_values));
+
+    let large_string: String = "x".repeat(LARGE_STRING_LEN);
+
+    let mut sb1 = StringBuilder::new();
+    let mut sb2 = StringBuilder::new();
+    for _ in 0..len {
+        sb1.append_value(&large_string);
+        sb2.append_value(&large_string);
+    }
+
+    let inner_struct = StructArray::from(vec![
+        (
+            Arc::new(Field::new("large_string", DataType::Utf8, false)),
+            Arc::new(sb1.finish()) as ArrayRef,
+        ),
+        (
+            Arc::new(Field::new("small_int", DataType::Int32, false)),
+            small_int_array as ArrayRef,
+        ),
+    ]);
+
+    let inner_fields = Fields::from(vec![
+        Field::new("large_string", DataType::Utf8, false),
+        Field::new("small_int", DataType::Int32, false),
+    ]);
+    let outer_struct = StructArray::from(vec![
+        (
+            Arc::new(Field::new("inner", DataType::Struct(inner_fields), 
false)),
+            Arc::new(inner_struct) as ArrayRef,
+        ),
+        (
+            Arc::new(Field::new("extra_string", DataType::Utf8, false)),
+            Arc::new(sb2.finish()) as ArrayRef,
+        ),
+    ]);
+
+    RecordBatch::try_new(schema, vec![id_array, 
Arc::new(outer_struct)]).unwrap()
+}
+
+fn nested_benchmarks(c: &mut Criterion) {
+    let temp_file = generate_file(nested_schema(), nested_batch, 
"nested_struct");
+    let file_path = temp_file.path().display().to_string();
+    assert!(Path::new(&file_path).exists(), "path not found");
+
+    let rt = Runtime::new().unwrap();
+    let ctx = create_context(&rt, &file_path, "t");
+
+    let mut group = c.benchmark_group("nested_struct");
+
+    // baseline: full outer struct, decode all 3 leaves
+    group.bench_function("select_struct", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT s FROM t"))
+    });
+
+    // pruned outer: read only inner struct, skip extra_string
+    group.bench_function("select_inner_struct", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT s['inner'] FROM t"))
+    });
+
+    // pruned both levels: reach through outer + inner, skip both large strings
+    group.bench_function("select_inner_small_field", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT s['inner']['small_int'] FROM t"))
+    });
+
+    // pruned outer only: skip inner struct entirely, read extra_string
+    group.bench_function("select_extra_string", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT s['extra_string'] FROM t"))
+    });
+
+    // no pruning: all columns
+    group.bench_function("select_all", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT * FROM t"))
+    });
+
+    // aggregation reaching through two levels of nesting
+    group.bench_function("sum_inner_small_field", |b| {
+        b.iter(|| query(&ctx, &rt, "SELECT SUM(s['inner']['small_int']) FROM 
t"))
+    });
+
+    group.finish();
+    drop(temp_file);
+}
+
+criterion_group!(
+    benches,
+    narrow_benchmarks,
+    wide_benchmarks,
+    nested_benchmarks
+);
+criterion_main!(benches);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to