ding-young commented on code in PR #16512:
URL: https://github.com/apache/datafusion/pull/16512#discussion_r2174808708


##########
datafusion/physical-plan/benches/spill_io.rs:
##########
@@ -119,5 +127,450 @@ fn bench_spill_io(c: &mut Criterion) {
     group.finish();
 }
 
-criterion_group!(benches, bench_spill_io);
+// Generate `num_batches` RecordBatches mimicking TPC-H Q2's partial aggregate 
result:
+// GROUP BY ps_partkey -> MIN(ps_supplycost)
+fn create_q2_like_batches(
+    num_batches: usize,
+    num_rows: usize,
+) -> (Arc<Schema>, Vec<RecordBatch>) {
+    // use fixed seed
+    let seed = 2;
+    let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
+    let mut batches = Vec::with_capacity(num_batches);
+
+    let mut current_key = 400000_i64;
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("ps_partkey", DataType::Int64, false),
+        Field::new("min_ps_supplycost", DataType::Decimal128(15, 2), true),
+    ]));
+
+    for _ in 0..num_batches {
+        let mut partkey_builder = Int64Builder::new();
+        let mut cost_builder = Decimal128Builder::new()
+            .with_precision_and_scale(15, 2)
+            .unwrap();
+
+        for _ in 0..num_rows {
+            // Occasionally skip a few partkey values to simulate sparsity
+            let jump = if rng.random_bool(0.05) {
+                rng.random_range(2..10)
+            } else {
+                1
+            };
+            current_key += jump;
+
+            let supply_cost = rng.random_range(10_00..100_000) as i128;
+
+            partkey_builder.append_value(current_key);
+            cost_builder.append_value(supply_cost);
+        }
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(partkey_builder.finish()),
+                Arc::new(cost_builder.finish()),
+            ],
+        )
+        .unwrap();
+
+        batches.push(batch);
+    }
+
+    (schema, batches)
+}
+
+/// Generate `num_batches` RecordBatches mimicking TPC-H Q16's partial 
aggregate result:
+/// GROUP BY (p_brand, p_type, p_size) -> COUNT(DISTINCT ps_suppkey)
+pub fn create_q16_like_batches(
+    num_batches: usize,
+    num_rows: usize,
+) -> (Arc<Schema>, Vec<RecordBatch>) {
+    let seed = 16;
+    let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
+    let mut batches = Vec::with_capacity(num_batches);
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("p_brand", DataType::Utf8, false),
+        Field::new("p_type", DataType::Utf8, false),
+        Field::new("p_size", DataType::Int32, false),
+        Field::new("alias1", DataType::Int64, false), // COUNT(DISTINCT 
ps_suppkey)
+    ]));
+
+    // Representative string pools
+    let brands = ["Brand#32", "Brand#33", "Brand#41", "Brand#42", "Brand#55"];
+    let types = [
+        "PROMO ANODIZED NICKEL",
+        "STANDARD BRUSHED NICKEL",
+        "PROMO POLISHED COPPER",
+        "ECONOMY ANODIZED BRASS",
+        "LARGE BURNISHED COPPER",
+        "STANDARD POLISHED TIN",
+        "SMALL PLATED STEEL",
+        "MEDIUM POLISHED COPPER",
+    ];
+    let sizes = [3, 9, 14, 19, 23, 36, 45, 49];
+
+    for _ in 0..num_batches {
+        let mut brand_builder = StringBuilder::new();
+        let mut type_builder = StringBuilder::new();
+        let mut size_builder = Int32Builder::new();
+        let mut count_builder = Int64Builder::new();
+
+        for _ in 0..num_rows {
+            let brand = brands[rng.random_range(0..brands.len())];
+            let ptype = types[rng.random_range(0..types.len())];
+            let size = sizes[rng.random_range(0..sizes.len())];
+            let count = rng.random_range(1000..100_000);
+
+            brand_builder.append_value(brand);
+            type_builder.append_value(ptype);
+            size_builder.append_value(size);
+            count_builder.append_value(count);
+        }
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(brand_builder.finish()),
+                Arc::new(type_builder.finish()),
+                Arc::new(size_builder.finish()),
+                Arc::new(count_builder.finish()),
+            ],
+        )
+        .unwrap();
+
+        batches.push(batch);
+    }
+
+    (schema, batches)
+}
+
+// Generate `num_batches` RecordBatches mimicking TPC-H Q20's partial 
aggregate result:
+// GROUP BY (l_partkey, l_suppkey) -> SUM(l_quantity)
+fn create_q20_like_batches(
+    num_batches: usize,
+    num_rows: usize,
+) -> (Arc<Schema>, Vec<RecordBatch>) {
+    let seed = 20;
+    let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
+    let mut batches = Vec::with_capacity(num_batches);
+
+    let mut current_partkey = 400000_i64;
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("l_partkey", DataType::Int64, false),
+        Field::new("l_suppkey", DataType::Int64, false),
+        Field::new("sum_l_quantity", DataType::Decimal128(25, 2), true),
+    ]));
+
+    for _ in 0..num_batches {
+        let mut partkey_builder = Int64Builder::new();
+        let mut suppkey_builder = Int64Builder::new();
+        let mut quantity_builder = Decimal128Builder::new()
+            .with_precision_and_scale(25, 2)
+            .unwrap();
+
+        for _ in 0..num_rows {
+            // Occasionally skip a few partkey values to simulate sparsity
+            let partkey_jump = if rng.random_bool(0.03) {
+                rng.random_range(2..6)
+            } else {
+                1
+            };
+            current_partkey += partkey_jump;
+
+            let suppkey = rng.random_range(10_000..99_999);
+            let quantity = rng.random_range(500..20_000) as i128;
+
+            partkey_builder.append_value(current_partkey);
+            suppkey_builder.append_value(suppkey);
+            quantity_builder.append_value(quantity);
+        }
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(partkey_builder.finish()),
+                Arc::new(suppkey_builder.finish()),
+                Arc::new(quantity_builder.finish()),
+            ],
+        )
+        .unwrap();
+
+        batches.push(batch);
+    }
+
+    (schema, batches)
+}
+
+/// Genereate `num_batches` wide RecordBatches resembling sort-tpch Q10 for 
benchmarking.
+/// This includes multiple numeric, date, and Utf8View columns (15 total).
+pub fn create_wide_batches(
+    num_batches: usize,
+    num_rows: usize,
+) -> (Arc<Schema>, Vec<RecordBatch>) {
+    let seed = 10;
+    let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
+    let mut batches = Vec::with_capacity(num_batches);
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("l_linenumber", DataType::Int32, false),
+        Field::new("l_suppkey", DataType::Int64, false),
+        Field::new("l_orderkey", DataType::Int64, false),
+        Field::new("l_partkey", DataType::Int64, false),
+        Field::new("l_quantity", DataType::Decimal128(15, 2), false),
+        Field::new("l_extendedprice", DataType::Decimal128(15, 2), false),
+        Field::new("l_discount", DataType::Decimal128(15, 2), false),
+        Field::new("l_tax", DataType::Decimal128(15, 2), false),
+        Field::new("l_returnflag", DataType::Utf8, false),
+        Field::new("l_linestatus", DataType::Utf8, false),
+        Field::new("l_shipdate", DataType::Date32, false),
+        Field::new("l_commitdate", DataType::Date32, false),
+        Field::new("l_receiptdate", DataType::Date32, false),
+        Field::new("l_shipinstruct", DataType::Utf8, false),
+        Field::new("l_shipmode", DataType::Utf8, false),
+    ]));
+
+    for _ in 0..num_batches {
+        let mut linenum = Int32Builder::new();
+        let mut suppkey = Int64Builder::new();
+        let mut orderkey = Int64Builder::new();
+        let mut partkey = Int64Builder::new();
+        let mut quantity = Decimal128Builder::new()
+            .with_precision_and_scale(15, 2)
+            .unwrap();
+        let mut extprice = Decimal128Builder::new()
+            .with_precision_and_scale(15, 2)
+            .unwrap();
+        let mut discount = Decimal128Builder::new()
+            .with_precision_and_scale(15, 2)
+            .unwrap();
+        let mut tax = Decimal128Builder::new()
+            .with_precision_and_scale(15, 2)
+            .unwrap();
+        let mut retflag = StringBuilder::new();
+        let mut linestatus = StringBuilder::new();
+        let mut shipdate = Date32Builder::new();
+        let mut commitdate = Date32Builder::new();
+        let mut receiptdate = Date32Builder::new();
+        let mut shipinstruct = StringBuilder::new();
+        let mut shipmode = StringBuilder::new();
+
+        let return_flags = ["A", "N", "R"];
+        let statuses = ["F", "O"];
+        let instructs = ["DELIVER IN PERSON", "COLLECT COD", "NONE"];
+        let modes = ["TRUCK", "MAIL", "SHIP", "RAIL", "AIR"];
+
+        for i in 0..num_rows {
+            linenum.append_value((i % 7) as i32);
+            suppkey.append_value(rng.random_range(0..100_000));
+            orderkey.append_value(1_000_000 + i as i64);
+            partkey.append_value(rng.random_range(0..200_000));
+
+            quantity.append_value(rng.random_range(100..10000) as i128);
+            extprice.append_value(rng.random_range(1_000..1_000_000) as i128);
+            discount.append_value(rng.random_range(0..10000) as i128);
+            tax.append_value(rng.random_range(0..5000) as i128);
+
+            
retflag.append_value(return_flags[rng.random_range(0..return_flags.len())]);
+            
linestatus.append_value(statuses[rng.random_range(0..statuses.len())]);
+
+            let base_date = 10_000;
+            shipdate.append_value(base_date + (i % 1000) as i32);
+            commitdate.append_value(base_date + (i % 1000) as i32 + 1);
+            receiptdate.append_value(base_date + (i % 1000) as i32 + 2);
+
+            
shipinstruct.append_value(instructs[rng.random_range(0..instructs.len())]);
+            shipmode.append_value(modes[rng.random_range(0..modes.len())]);
+        }
+
+        let batch = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(linenum.finish()),
+                Arc::new(suppkey.finish()),
+                Arc::new(orderkey.finish()),
+                Arc::new(partkey.finish()),
+                Arc::new(quantity.finish()),
+                Arc::new(extprice.finish()),
+                Arc::new(discount.finish()),
+                Arc::new(tax.finish()),
+                Arc::new(retflag.finish()),
+                Arc::new(linestatus.finish()),
+                Arc::new(shipdate.finish()),
+                Arc::new(commitdate.finish()),
+                Arc::new(receiptdate.finish()),
+                Arc::new(shipinstruct.finish()),
+                Arc::new(shipmode.finish()),
+            ],
+        )
+        .unwrap();
+        batches.push(batch);
+    }
+    (schema, batches)
+}
+
+// Benchmarks spill write + read performance across multiple compression codecs
+// using realistic input data inspired by TPC-H aggregate spill scenarios.
+//
+// This function prepares synthetic RecordBatches that mimic the schema and 
distribution
+// of intermediate aggregate results from representative TPC-H queries (Q2, 
Q16, Q20) and sort-tpch Q10.
+// The schemas of these batches are:
+//      Q2 [Int64, Decimal128]
+//      Q16 [Utf8, Utf8, Int32, Int64]
+//      Q20 [Int64, Int64, Decimal128]
+//      sort-tpch Q10 (wide batch) [Int32, Int64 * 3, Decimal128 * 4, Date * 
3, Utf8 * 4]
+// For each dataset:
+// - It evaluates spill performance under different compression codecs (e.g., 
Uncompressed, Zstd, LZ4).
+// - It measures end-to-end spill write + read performance using Criterion.
+// - It prints the observed memory-to-disk compression ratio for each codec.
+//
+// This helps evaluate the tradeoffs between compression ratio and runtime 
overhead for various codecs.
+fn bench_spill_compression(c: &mut Criterion) {
+    let env = Arc::new(RuntimeEnv::default());
+    let mut group = c.benchmark_group("spill_compression");
+    let rt = Runtime::new().unwrap();
+    let compressions = vec![
+        SpillCompression::Uncompressed,
+        SpillCompression::Zstd,
+        SpillCompression::Lz4Frame,
+    ];
+
+    // Modify these values to change data volume. Note that each batch 
contains `num_rows` rows.
+    let num_batches = 50;
+    let num_rows = 8192;
+
+    // Q2 [Int64, Decimal128]
+    let (schema, batches) = create_q2_like_batches(num_batches, num_rows);
+    benchmark_spill_batches_for_all_codec(
+        &mut group,
+        "q2",
+        batches,
+        &compressions,
+        &rt,
+        env.clone(),
+        schema,
+    );
+    // Q16 [Utf8, Utf8, Int32, Int64]
+    let (schema, batches) = create_q16_like_batches(num_batches, num_rows);
+    benchmark_spill_batches_for_all_codec(
+        &mut group,
+        "q16",
+        batches,
+        &compressions,
+        &rt,
+        env.clone(),
+        schema,
+    );
+    // Q20 [Int64, Int64, Decimal128]
+    let (schema, batches) = create_q20_like_batches(num_batches, num_rows);
+    benchmark_spill_batches_for_all_codec(
+        &mut group,
+        "q20",
+        batches,
+        &compressions,
+        &rt,
+        env.clone(),
+        schema,
+    );
+    // sort-tpch Q10 (wide batch) [Int32, Int64 * 3, Decimal128 * 4, Date * 3, 
Utf8 * 4]
+    let (schema, batches) = create_wide_batches(num_batches, num_rows);
+    benchmark_spill_batches_for_all_codec(
+        &mut group,
+        "wide",
+        batches,
+        &compressions,
+        &rt,
+        env,
+        schema,
+    );
+    group.finish();
+}
+
+fn benchmark_spill_batches_for_all_codec(
+    group: &mut BenchmarkGroup<'_, WallTime>,
+    batch_label: &str,
+    batches: Vec<RecordBatch>,
+    compressions: &[SpillCompression],
+    rt: &Runtime,
+    env: Arc<RuntimeEnv>,
+    schema: Arc<Schema>,
+) {
+    let mem_bytes: usize = batches.iter().map(|b| 
b.get_array_memory_size()).sum();
+
+    for &compression in compressions {
+        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+        let spill_manager =
+            SpillManager::new(Arc::clone(&env), metrics.clone(), 
Arc::clone(&schema))
+                .with_compression_type(compression);
+
+        let bench_id = BenchmarkId::new(batch_label, compression.to_string());
+        group.bench_with_input(bench_id, &spill_manager, |b, spill_manager| {
+            b.iter_batched(
+                || batches.clone(),
+                |batches| {
+                    rt.block_on(async {
+                        let spill_file = spill_manager
+                            .spill_record_batch_and_finish(
+                                &batches,
+                                &format!("{batch_label}_{compression}"),
+                            )
+                            .unwrap()
+                            .unwrap();
+                        let stream =
+                            
spill_manager.read_spill_as_stream(spill_file).unwrap();
+                        let _ = collect(stream).await.unwrap();
+                    })
+                },
+                BatchSize::LargeInput,
+            )
+        });
+
+        // Run Spilling Read & Write once more to read file size & calculate 
bandwidth
+        let start = Instant::now();
+
+        let spill_file = spill_manager
+            .spill_record_batch_and_finish(
+                &batches,
+                &format!("{batch_label}_{compression}"),
+            )
+            .unwrap()
+            .unwrap();
+
+        // calculate write_throughput (includes both compression and I/O time) 
based on in memory batch size
+        let write_time = start.elapsed();
+        let write_throughput = (mem_bytes as u128 / 
write_time.as_millis().max(1)) * 1000;
+
+        // calculate compression ratio
+        let disk_bytes = std::fs::metadata(spill_file.path())
+            .expect("metadata read fail")
+            .len() as usize;
+        let ratio = mem_bytes as f64 / disk_bytes.max(1) as f64;
+
+        // calculate read_throughput (includes both compression and I/O time) 
based on in memory batch size
+        let rt = Runtime::new().unwrap();
+        let start = Instant::now();
+        rt.block_on(async {
+            let stream = 
spill_manager.read_spill_as_stream(spill_file).unwrap();
+            let _ = collect(stream).await.unwrap();
+        });
+        let read_time = start.elapsed();
+        let read_throughput = (mem_bytes as u128 / 
read_time.as_millis().max(1)) * 1000;

Review Comment:
   I added separate measurements for write and read throughput after the 
criterion benchmark, by running the spilling logic once more (just like how 
compression ratio is calculated). This avoids the risk of Instant::now() adding 
latency inside the closure. Also, since the timing includes both I/O and 
(de)compression time, I used the term `throughput` instead of `bandwidth`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to