This is an automated email from the ASF dual-hosted git repository.

prantogg pushed a commit to branch update-zone-generation
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git

commit f55a16d513780a442c65b02d303f7fd371af1afd
Author: Pranav Toggi <[email protected]>
AuthorDate: Mon Sep 29 09:45:01 2025 -0700

    add row group size configurability
---
 spatialbench-cli/src/zone_df.rs | 168 ++++++++++++++++++++++++++++++----------
 spatialbench/src/generators.rs  |   3 +-
 2 files changed, 126 insertions(+), 45 deletions(-)

diff --git a/spatialbench-cli/src/zone_df.rs b/spatialbench-cli/src/zone_df.rs
index 2a7a4cb..9ca27be 100644
--- a/spatialbench-cli/src/zone_df.rs
+++ b/spatialbench-cli/src/zone_df.rs
@@ -1,4 +1,3 @@
-// spatialbench-cli/src/zone_df.rs
 use std::{path::PathBuf, sync::Arc, time::Instant};
 
 use anyhow::{anyhow, Result};
@@ -10,7 +9,7 @@ use datafusion::{
 };
 
 use datafusion::execution::runtime_env::RuntimeEnv;
-use log::info;
+use log::{debug, info};
 use object_store::aws::AmazonS3Builder;
 use object_store::ObjectStore;
 use parquet::{
@@ -67,24 +66,41 @@ fn estimated_total_rows_for_sf(sf: f64) -> i64 {
     }
 }
 
-fn parquet_writer_props(comp: ParquetCompression) -> WriterProperties {
-    WriterProperties::builder().set_compression(comp).build()
+fn get_zone_table_stats(sf: f64) -> (f64, i64) {
+    // Returns (size_in_gb, total_rows) for the given scale factor
+    if sf < 10.0 {
+        (1.42, 156_095)
+    } else if sf < 100.0 {
+        (5.68, 455_711)
+    } else {
+        (6.13, 1_035_371)
+    }
 }
 
-fn approx_bytes_per_row(batches: &[RecordBatch]) -> f64 {
-    let mut rows = 0usize;
-    let mut bytes = 0usize;
-    for b in batches {
-        rows += b.num_rows();
-        for col in b.columns() {
-            bytes += col.get_array_memory_size();
-        }
-    }
-    if rows == 0 {
-        0.0
-    } else {
-        bytes as f64 / rows as f64
+fn compute_rows_per_group_from_stats(size_gb: f64, total_rows: i64, 
target_bytes: i64) -> usize {
+    let total_bytes = size_gb * 1024.0 * 1024.0 * 1024.0; // Convert GB to 
bytes
+    let bytes_per_row = total_bytes / total_rows as f64;
+
+    debug!("Using hardcoded stats: {:.2} GB, {} rows, {:.2} bytes/row",
+           size_gb, total_rows, bytes_per_row);
+
+    if bytes_per_row <= 0.0 {
+        return 128_000; // fallback
     }
+
+    let est = (target_bytes as f64 / bytes_per_row).floor();
+    // Keep RG count <= 32k, but avoid too-tiny RGs
+    est.max(10_000.0).min(10_000_000.0) as usize
+}
+
+fn writer_props_with_rowgroup(
+    comp: ParquetCompression,
+    rows_per_group: usize,
+) -> WriterProperties {
+    WriterProperties::builder()
+        .set_compression(comp)
+        .set_max_row_group_size(rows_per_group) // <-- the key line
+        .build()
 }
 
 fn write_parquet_with_rowgroup_bytes(
@@ -92,32 +108,19 @@ fn write_parquet_with_rowgroup_bytes(
     schema: SchemaRef,
     all_batches: Vec<RecordBatch>,
     target_rowgroup_bytes: i64,
-    props: WriterProperties,
+    comp: ParquetCompression,
+    scale_factor: f64,
 ) -> Result<()> {
-    let mut writer = ArrowWriter::try_new(std::fs::File::create(out_path)?, 
schema, Some(props))?;
+    let (size_gb, total_rows) = get_zone_table_stats(scale_factor);
+    let rows_per_group = compute_rows_per_group_from_stats(size_gb, 
total_rows, target_rowgroup_bytes);
+    let props = writer_props_with_rowgroup(comp, rows_per_group);
 
-    if all_batches.is_empty() {
-        writer.close()?;
-        return Ok(());
-    }
+    debug!("Using row group size: {} rows (based on hardcoded stats)", 
rows_per_group);
 
-    let bpr = approx_bytes_per_row(&all_batches);
-    let rows_per_group: usize = if bpr > 0.0 {
-        (target_rowgroup_bytes as f64 / bpr)
-            .floor()
-            .max(10_000.0)
-            .min(1_000_000.0) as usize
-    } else {
-        128_000
-    };
+    let mut writer = ArrowWriter::try_new(std::fs::File::create(out_path)?, 
schema, Some(props))?;
 
     for batch in all_batches {
-        let mut start = 0usize;
-        while start < batch.num_rows() {
-            let end = (start + rows_per_group).min(batch.num_rows());
-            writer.write(&batch.slice(start, end - start))?;
-            start = end;
-        }
+        writer.write(&batch)?;
     }
     writer.close()?;
     Ok(())
@@ -152,13 +155,35 @@ pub async fn generate_zone_parquet(args: ZoneDfArgs) -> 
Result<()> {
         ));
     }
 
+    info!(
+        "Starting zone parquet generation with scale factor {}",
+        args.scale_factor
+    );
+    debug!("Zone generation args: parts={}, part={}, output_dir={:?}, 
row_group_bytes={}, compression={:?}",
+           args.parts, args.part, args.output_dir, 
args.parquet_row_group_bytes, args.parquet_compression);
+
+    let subtypes = subtypes_for_scale_factor(args.scale_factor);
+    info!(
+        "Selected subtypes for SF {}: {:?}",
+        args.scale_factor, subtypes
+    );
+
+    let estimated_rows = estimated_total_rows_for_sf(args.scale_factor);
+    info!(
+        "Estimated total rows for SF {}: {}",
+        args.scale_factor, estimated_rows
+    );
+
     let mut cfg = ConfigOptions::new();
     cfg.execution.target_partitions = 1;
+    debug!("Created DataFusion config with target_partitions=1");
 
     let rt: Arc<RuntimeEnv> = Arc::new(RuntimeEnvBuilder::new().build()?);
+    debug!("Built DataFusion runtime environment");
 
     // Register S3 store for Overture bucket (object_store 0.11)
     let bucket = OVERTURE_S3_BUCKET; // "overturemaps-us-west-2"
+    info!("Registering S3 store for bucket: {}", bucket);
     let s3 = AmazonS3Builder::new()
         .with_bucket_name(bucket)
         .with_skip_signature(true)
@@ -168,27 +193,50 @@ pub async fn generate_zone_parquet(args: ZoneDfArgs) -> 
Result<()> {
     let s3_url = Url::parse(&format!("s3://{bucket}"))?;
     let s3_store: Arc<dyn ObjectStore> = Arc::new(s3);
     rt.register_object_store(&s3_url, s3_store);
+    debug!("Successfully registered S3 object store");
 
     let ctx = SessionContext::new_with_config_rt(SessionConfig::from(cfg), rt);
+    debug!("Created DataFusion session context");
 
     let url = zones_parquet_url();
+    info!("Reading parquet data from: {}", url);
+    let t_read_start = Instant::now();
     let mut df = ctx.read_parquet(url, ParquetReadOptions::default()).await?;
+    let read_dur = t_read_start.elapsed();
+    info!("Successfully read parquet data in {:?}", read_dur);
 
+    // Build filter predicate
+    debug!("Building filter predicate for subtypes: {:?}", subtypes);
     let mut pred = col("subtype").eq(lit("__never__"));
     for s in subtypes_for_scale_factor(args.scale_factor) {
         pred = pred.or(col("subtype").eq(lit(s)));
     }
     df = df.filter(pred.and(col("is_land").eq(lit(true))))?;
+    info!("Applied subtype and is_land filters");
+
+    // df = df.sort(vec![col("id").sort(true, true)])?;
+    // debug!("Applied sorting by id");
 
-    df = df.sort(vec![col("id").sort(true, true)])?;
     let total = estimated_total_rows_for_sf(args.scale_factor);
     let parts = args.parts as i64;
     let this = (args.part as i64) - 1;
     let rows_per_part = (total + parts - 1) / parts;
     let offset = this * rows_per_part;
+
+    info!(
+        "Partitioning data: total_rows={}, parts={}, rows_per_part={}, 
offset={}",
+        total, parts, rows_per_part, offset
+    );
+
     df = df.limit(offset as usize, Some(rows_per_part as usize))?;
+    debug!(
+        "Applied limit with offset={}, rows={}",
+        offset, rows_per_part
+    );
 
     ctx.register_table(TableReference::bare("zone_filtered"), df.into_view())?;
+    debug!("Registered filtered data as 'zone_filtered' table");
+
     let sql = format!(
         r#"
         SELECT
@@ -202,15 +250,33 @@ pub async fn generate_zone_parquet(args: ZoneDfArgs) -> 
Result<()> {
         FROM zone_filtered
         "#
     );
+    debug!("Executing SQL transformation with offset: {}", offset);
     let df2 = ctx.sql(&sql).await?;
+    info!("SQL transformation completed successfully");
 
     let t0 = Instant::now();
+    info!("Starting data collection...");
     let batches = df2.clone().collect().await?;
     let collect_dur = t0.elapsed();
 
+    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+    info!(
+        "Collected {} record batches with {} total rows in {:?}",
+        batches.len(),
+        total_rows,
+        collect_dur
+    );
+
     std::fs::create_dir_all(&args.output_dir)?;
+    debug!("Created output directory: {:?}", args.output_dir);
+
     let out = args.output_filename();
-    let props = parquet_writer_props(args.parquet_compression);
+    info!("Writing output to: {}", out.display());
+
+    debug!(
+        "Created parquet writer properties with compression: {:?}",
+        args.parquet_compression
+    );
 
     // Convert DFSchema to Arrow Schema
     let schema = Arc::new(Schema::new(
@@ -220,18 +286,34 @@ pub async fn generate_zone_parquet(args: ZoneDfArgs) -> 
Result<()> {
             .map(|f| f.as_ref().clone())
             .collect::<Vec<_>>(),
     ));
+    debug!(
+        "Converted DataFusion schema to Arrow schema with {} fields",
+        schema.fields().len()
+    );
 
     let t1 = Instant::now();
-    write_parquet_with_rowgroup_bytes(&out, schema, batches, 
args.parquet_row_group_bytes, props)?;
+    info!(
+        "Starting parquet file write with row group size: {} bytes",
+        args.parquet_row_group_bytes
+    );
+    write_parquet_with_rowgroup_bytes(
+        &out,
+        schema,
+        batches,
+        args.parquet_row_group_bytes,
+        args.parquet_compression,
+        args.scale_factor,
+    )?;
     let write_dur = t1.elapsed();
 
     info!(
-        "Zone -> {} (part {}/{}). collect={:?}, write={:?}",
+        "Zone -> {} (part {}/{}). collect={:?}, write={:?}, total_rows={}",
         out.display(),
         args.part,
         args.parts,
         collect_dur,
-        write_dur
+        write_dur,
+        total_rows
     );
 
     Ok(())
diff --git a/spatialbench/src/generators.rs b/spatialbench/src/generators.rs
index 5144217..476cbd6 100644
--- a/spatialbench/src/generators.rs
+++ b/spatialbench/src/generators.rs
@@ -1,4 +1,4 @@
-//! Generators for each TPC-H Tables
+//! Generators for each Spatial Bench Tables
 use crate::dates;
 use crate::dates::{GenerateUtils, TPCHDate};
 use crate::decimal::TPCHDecimal;
@@ -15,7 +15,6 @@ use crate::spatial::utils::{hash_to_unit_u64, 
spider_seed_for_index};
 use crate::spatial::{ContinentAffines, SpatialDefaults, SpatialGenerator};
 use crate::text::TextPool;
 use geo::Point;
-use geozero::ToGeo;
 use rand::rngs::StdRng;
 use rand::{Rng, SeedableRng};
 use std::convert::TryInto;

Reply via email to