This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git


The following commit(s) were added to refs/heads/main by this push:
     new 6084a14  Use DataFusion for Zone table generation (#46)
6084a14 is described below

commit 6084a14e019449511741000c0272c050e4654a64
Author: Pranav Toggi <[email protected]>
AuthorDate: Mon Oct 13 22:33:04 2025 -0700

    Use DataFusion for Zone table generation (#46)
    
    * init: use datafusion for Zone
    
    * add row group size configurability
    
    * Add debug statements; use 128MB as default row group size
    
    * fix stats
    
    * Clamp sf to minimum 1
    
    * fix row group size estimation and partitioning
    
    * fix zone table naming for partitioned outputs
    
    * add unit tests
    
    * add integration tests
    
    * fmt fix
    
    * Update spatialbench-cli/tests/cli_integration.rs
    
    Co-authored-by: Copilot <[email protected]>
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 spatialbench-arrow/src/lib.rs             |   2 -
 spatialbench-arrow/src/zone.rs            | 108 -------
 spatialbench-arrow/tests/reparse.rs       |   8 +-
 spatialbench-cli/Cargo.toml               |   5 +
 spatialbench-cli/src/csv.rs               |   4 +-
 spatialbench-cli/src/main.rs              |  33 +-
 spatialbench-cli/src/plan.rs              |   6 +-
 spatialbench-cli/src/tbl.rs               |   2 -
 spatialbench-cli/src/zone_df.rs           | 501 ++++++++++++++++++++++++++++++
 spatialbench-cli/tests/cli_integration.rs | 192 +++++++++++-
 spatialbench/data/sf-v1/zone.parquet      | Bin 0 -> 88467 bytes
 spatialbench/src/csv.rs                   |  49 +--
 spatialbench/src/generators.rs            | 412 +-----------------------
 spatialbench/tests/integration_tests.rs   |  17 -
 14 files changed, 726 insertions(+), 613 deletions(-)

diff --git a/spatialbench-arrow/src/lib.rs b/spatialbench-arrow/src/lib.rs
index eb8f66d..ac6fb78 100644
--- a/spatialbench-arrow/src/lib.rs
+++ b/spatialbench-arrow/src/lib.rs
@@ -41,7 +41,6 @@ mod customer;
 mod driver;
 mod trip;
 mod vehicle;
-mod zone;
 
 use arrow::array::RecordBatch;
 use arrow::datatypes::SchemaRef;
@@ -50,7 +49,6 @@ pub use customer::CustomerArrow;
 pub use driver::DriverArrow;
 pub use trip::TripArrow;
 pub use vehicle::VehicleArrow;
-pub use zone::ZoneArrow;
 
 /// Iterator of Arrow [`RecordBatch`] that also knows its schema
 pub trait RecordBatchIterator: Iterator<Item = RecordBatch> + Send {
diff --git a/spatialbench-arrow/src/zone.rs b/spatialbench-arrow/src/zone.rs
deleted file mode 100644
index 06794b3..0000000
--- a/spatialbench-arrow/src/zone.rs
+++ /dev/null
@@ -1,108 +0,0 @@
-use crate::conversions::string_view_array_from_display_iter;
-use crate::{DEFAULT_BATCH_SIZE, RecordBatchIterator};
-use arrow::array::{BinaryArray, Int64Array, RecordBatch};
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use geozero::{CoordDimensions, ToWkb};
-use spatialbench::generators::{ZoneGenerator, ZoneGeneratorIterator};
-use std::sync::{Arc, LazyLock};
-
-/// Generate [`Zone`]s in [`RecordBatch`] format
-///
-/// [`Zone`]: spatialbench::generators::Zone
-///
-/// # Example
-/// ```
-/// # use spatialbench::generators::{ZoneGenerator};
-/// # use spatialbench_arrow::ZoneArrow;
-///
-/// // Create a SF=1.0 generator and wrap it in an Arrow generator
-/// let generator = ZoneGenerator::new(0.001, 1, 1);
-/// let mut arrow_generator = ZoneArrow::new(generator)
-///   .with_batch_size(10);
-/// // Read the first 10 batches
-/// let batch = arrow_generator.next().unwrap();
-/// // compare the output by pretty printing it
-/// let formatted_batches = 
arrow::util::pretty::pretty_format_batches(&[batch])
-///   .unwrap()
-///   .to_string();
-/// ```
-pub struct ZoneArrow {
-    inner: ZoneGeneratorIterator,
-    batch_size: usize,
-}
-
-impl ZoneArrow {
-    pub fn new(generator: ZoneGenerator) -> Self {
-        let inner = generator.clone().into_iter();
-        Self {
-            inner,
-            batch_size: DEFAULT_BATCH_SIZE,
-        }
-    }
-
-    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
-        self.batch_size = batch_size;
-        self
-    }
-}
-
-impl RecordBatchIterator for ZoneArrow {
-    fn schema(&self) -> &SchemaRef {
-        &ZONE_SCHEMA
-    }
-}
-
-impl Iterator for ZoneArrow {
-    type Item = RecordBatch;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        // Get next rows to convert
-        let rows: Vec<_> = self.inner.by_ref().take(self.batch_size).collect();
-        if rows.is_empty() {
-            return None;
-        }
-
-        let z_zonekey = Int64Array::from_iter_values(rows.iter().map(|r| 
r.z_zonekey));
-        let z_gersid = string_view_array_from_display_iter(rows.iter().map(|r| 
&r.z_gersid));
-        let z_country = 
string_view_array_from_display_iter(rows.iter().map(|r| &r.z_country));
-        let z_region = string_view_array_from_display_iter(rows.iter().map(|r| 
&r.z_region));
-        let z_name = string_view_array_from_display_iter(rows.iter().map(|r| 
&r.z_name));
-        let z_subtype = 
string_view_array_from_display_iter(rows.iter().map(|r| &r.z_subtype));
-
-        // Convert geo::Polygon to WKB binary format
-        let z_boundary = BinaryArray::from_iter_values(rows.iter().map(|r| {
-            r.z_boundary
-                .to_wkb(CoordDimensions::xy())
-                .expect("Failed to encode WKB")
-        }));
-
-        let batch = RecordBatch::try_new(
-            Arc::clone(self.schema()),
-            vec![
-                Arc::new(z_zonekey),
-                Arc::new(z_gersid),
-                Arc::new(z_country),
-                Arc::new(z_region),
-                Arc::new(z_name),
-                Arc::new(z_subtype),
-                Arc::new(z_boundary),
-            ],
-        )
-        .unwrap();
-        Some(batch)
-    }
-}
-
-/// Schema for the Zone
-static ZONE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(make_zone_schema);
-fn make_zone_schema() -> SchemaRef {
-    Arc::new(Schema::new(vec![
-        Field::new("z_zonekey", DataType::Int64, false),
-        Field::new("z_gersid", DataType::Utf8View, false),
-        Field::new("z_country", DataType::Utf8View, false),
-        Field::new("z_region", DataType::Utf8View, false),
-        Field::new("z_name", DataType::Utf8View, false),
-        Field::new("z_subtype", DataType::Utf8View, false),
-        Field::new("z_boundary", DataType::Binary, false),
-    ]))
-}
diff --git a/spatialbench-arrow/tests/reparse.rs 
b/spatialbench-arrow/tests/reparse.rs
index 49c87e3..889933a 100644
--- a/spatialbench-arrow/tests/reparse.rs
+++ b/spatialbench-arrow/tests/reparse.rs
@@ -3,14 +3,13 @@
 
 use arrow::array::RecordBatch;
 use arrow::datatypes::SchemaRef;
-use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, 
VehicleCsv, ZoneCsv};
+use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, 
VehicleCsv};
 use spatialbench::generators::{
     Building, BuildingGenerator, Customer, CustomerGenerator, Driver, 
DriverGenerator, Trip,
-    TripGenerator, Vehicle, VehicleGenerator, Zone, ZoneGenerator,
+    TripGenerator, Vehicle, VehicleGenerator,
 };
 use spatialbench_arrow::{
     BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow, 
VehicleArrow,
-    ZoneArrow,
 };
 use std::io::Write;
 use std::sync::Arc;
@@ -49,8 +48,6 @@ test_row_type!(trip_tbl, TripGenerator, TripArrow, 
Test::tbl());
 test_row_type!(trip_csv, TripGenerator, TripArrow, Test::csv());
 test_row_type!(building_tbl, BuildingGenerator, BuildingArrow, Test::tbl());
 test_row_type!(building_csv, BuildingGenerator, BuildingArrow, Test::csv());
-test_row_type!(zone_tbl, ZoneGenerator, ZoneArrow, Test::tbl());
-test_row_type!(zone_csv, ZoneGenerator, ZoneArrow, Test::csv());
 
 /// Common trait for writing rows in TBL and CSV format
 trait RowType {
@@ -84,7 +81,6 @@ impl_row_type!(Vehicle<'_>, VehicleCsv);
 impl_row_type!(Driver, DriverCsv);
 impl_row_type!(Trip, TripCsv);
 impl_row_type!(Building<'_>, BuildingCsv);
-impl_row_type!(Zone, ZoneCsv);
 
 #[derive(Debug, Clone, Copy)]
 #[allow(clippy::upper_case_acronyms)]
diff --git a/spatialbench-cli/Cargo.toml b/spatialbench-cli/Cargo.toml
index a77c23d..1180f39 100644
--- a/spatialbench-cli/Cargo.toml
+++ b/spatialbench-cli/Cargo.toml
@@ -23,6 +23,11 @@ env_logger = "0.11.7"
 serde = { version = "1.0.219", features = ["derive"] }
 anyhow = "1.0.99"
 serde_yaml = "0.9.33"
+datafusion = "47.0.0"
+object_store = { version = "0.12.4", features = ["aws"] }
+arrow-array = "55.2.0"
+arrow-schema = "55.2.0"
+url = "2.5.7"
 
 [dev-dependencies]
 assert_cmd = "2.0"
diff --git a/spatialbench-cli/src/csv.rs b/spatialbench-cli/src/csv.rs
index 7f9ca3b..78f93e5 100644
--- a/spatialbench-cli/src/csv.rs
+++ b/spatialbench-cli/src/csv.rs
@@ -1,9 +1,8 @@
 //! Implementations of [`Source`] for generating data in TBL format
 use super::generate::Source;
-use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, 
VehicleCsv, ZoneCsv};
+use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, 
VehicleCsv};
 use spatialbench::generators::{
     BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, 
VehicleGenerator,
-    ZoneGenerator,
 };
 use std::io::Write;
 
@@ -45,4 +44,3 @@ define_csv_source!(DriverCsvSource, DriverGenerator<'static>, 
DriverCsv);
 define_csv_source!(CustomerCsvSource, CustomerGenerator<'static>, CustomerCsv);
 define_csv_source!(TripCsvSource, TripGenerator, TripCsv);
 define_csv_source!(BuildingCsvSource, BuildingGenerator<'static>, BuildingCsv);
-define_csv_source!(ZoneCsvSource, ZoneGenerator, ZoneCsv);
diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs
index 8067a45..8050974 100644
--- a/spatialbench-cli/src/main.rs
+++ b/spatialbench-cli/src/main.rs
@@ -47,6 +47,7 @@ mod plan;
 mod spatial_config_file;
 mod statistics;
 mod tbl;
+mod zone_df;
 
 use crate::csv::*;
 use crate::generate::{generate_in_chunks, Sink, Source};
@@ -62,13 +63,11 @@ use log::{debug, info, LevelFilter};
 use spatialbench::distribution::Distributions;
 use spatialbench::generators::{
     BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, 
VehicleGenerator,
-    ZoneGenerator,
 };
 use spatialbench::spatial::overrides::{set_overrides, SpatialOverrides};
 use spatialbench::text::TextPool;
 use spatialbench_arrow::{
     BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow, 
VehicleArrow,
-    ZoneArrow,
 };
 use std::fmt::Display;
 use std::fs::{self, File};
@@ -408,6 +407,28 @@ impl Cli {
         Ok(())
     }
 
+    async fn generate_zone(&self) -> io::Result<()> {
+        match self.format {
+            OutputFormat::Parquet => {
+                let args = zone_df::ZoneDfArgs {
+                    scale_factor: 1.0f64.max(self.scale_factor),
+                    output_dir: self.output_dir.clone(),
+                    parts: self.parts.unwrap_or(1),
+                    part: self.part.unwrap_or(1),
+                    parquet_row_group_bytes: self.parquet_row_group_bytes,
+                    parquet_compression: self.parquet_compression,
+                };
+                zone_df::generate_zone_parquet(args)
+                    .await
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+            }
+            _ => Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "Zone table is only supported in --format=parquet (via 
DataFusion/S3).",
+            )),
+        }
+    }
+
     define_generate!(
         generate_vehicle,
         Table::Vehicle,
@@ -448,14 +469,6 @@ impl Cli {
         BuildingCsvSource,
         BuildingArrow
     );
-    define_generate!(
-        generate_zone,
-        Table::Zone,
-        ZoneGenerator,
-        ZoneTblSource,
-        ZoneCsvSource,
-        ZoneArrow
-    );
 
     /// return the output filename for the given table
     fn output_filename(&self, table: Table) -> String {
diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs
index 926f379..809814f 100644
--- a/spatialbench-cli/src/plan.rs
+++ b/spatialbench-cli/src/plan.rs
@@ -4,7 +4,6 @@ use crate::{OutputFormat, Table};
 use log::debug;
 use spatialbench::generators::{
     BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, 
VehicleGenerator,
-    ZoneGenerator,
 };
 use std::fmt::Display;
 use std::ops::RangeInclusive;
@@ -329,10 +328,7 @@ impl OutputSize {
             Table::Customer => 
CustomerGenerator::calculate_row_count(scale_factor, 1, 1),
             Table::Trip => TripGenerator::calculate_row_count(scale_factor, 1, 
1),
             Table::Building => 
BuildingGenerator::calculate_row_count(scale_factor, 1, 1),
-            Table::Zone => {
-                let generator = ZoneGenerator::new(scale_factor, 1, 1);
-                generator.calculate_row_count()
-            }
+            Table::Zone => todo!(),
         }
     }
 }
diff --git a/spatialbench-cli/src/tbl.rs b/spatialbench-cli/src/tbl.rs
index b7019c7..8eeb448 100644
--- a/spatialbench-cli/src/tbl.rs
+++ b/spatialbench-cli/src/tbl.rs
@@ -3,7 +3,6 @@
 use super::generate::Source;
 use spatialbench::generators::{
     BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, 
VehicleGenerator,
-    ZoneGenerator,
 };
 use std::io::Write;
 
@@ -43,4 +42,3 @@ define_tbl_source!(DriverTblSource, DriverGenerator<'static>);
 define_tbl_source!(CustomerTblSource, CustomerGenerator<'static>);
 define_tbl_source!(TripTblSource, TripGenerator);
 define_tbl_source!(BuildingTblSource, BuildingGenerator<'static>);
-define_tbl_source!(ZoneTblSource, ZoneGenerator);
diff --git a/spatialbench-cli/src/zone_df.rs b/spatialbench-cli/src/zone_df.rs
new file mode 100644
index 0000000..464b61d
--- /dev/null
+++ b/spatialbench-cli/src/zone_df.rs
@@ -0,0 +1,501 @@
+use std::{path::PathBuf, sync::Arc, time::Instant};
+
+use anyhow::{anyhow, Result};
+use arrow_array::RecordBatch;
+use arrow_schema::{Schema, SchemaRef};
+use datafusion::{
+    common::config::ConfigOptions, execution::runtime_env::RuntimeEnvBuilder, 
prelude::*,
+    sql::TableReference,
+};
+
+use crate::plan::DEFAULT_PARQUET_ROW_GROUP_BYTES;
+use datafusion::execution::runtime_env::RuntimeEnv;
+use log::{debug, info};
+use object_store::aws::AmazonS3Builder;
+use object_store::ObjectStore;
+use parquet::{
+    arrow::ArrowWriter, basic::Compression as ParquetCompression,
+    file::properties::WriterProperties,
+};
+use url::Url;
+
+const OVERTURE_RELEASE_DATE: &str = "2025-08-20.1";
+const OVERTURE_S3_BUCKET: &str = "overturemaps-us-west-2";
+const OVERTURE_S3_PREFIX: &str = "release";
+
+fn zones_parquet_url() -> String {
+    format!(
+        "s3://{}/{}/{}/theme=divisions/type=division_area/",
+        OVERTURE_S3_BUCKET, OVERTURE_S3_PREFIX, OVERTURE_RELEASE_DATE
+    )
+}
+
+fn subtypes_for_scale_factor(sf: f64) -> Vec<&'static str> {
+    let mut v = vec!["microhood", "macrohood", "county"];
+    if sf >= 10.0 {
+        v.push("neighborhood");
+    }
+    if sf >= 100.0 {
+        v.extend_from_slice(&["localadmin", "locality", "region", 
"dependency"]);
+    }
+    if sf >= 1000.0 {
+        v.push("country");
+    }
+    v
+}
+
+fn estimated_total_rows_for_sf(sf: f64) -> i64 {
+    let mut total = 0i64;
+    for s in subtypes_for_scale_factor(sf) {
+        total += match s {
+            "microhood" => 74797,
+            "macrohood" => 42619,
+            "neighborhood" => 298615,
+            "county" => 38679,
+            "localadmin" => 19007,
+            "locality" => 555834,
+            "region" => 3905,
+            "dependency" => 53,
+            "country" => 219,
+            _ => 0,
+        };
+    }
+    if sf < 1.0 {
+        (total as f64 * sf).ceil() as i64
+    } else {
+        total
+    }
+}
+
+fn get_zone_table_stats(sf: f64) -> (f64, i64) {
+    // Returns (size_in_gb, total_rows) for the given scale factor
+    if sf < 1.0 {
+        (0.92 * sf, (156_095.0 * sf).ceil() as i64)
+    } else if sf < 10.0 {
+        (1.42, 156_095)
+    } else if sf < 100.0 {
+        (2.09, 454_710)
+    } else if sf < 1000.0 {
+        (5.68, 1_033_456)
+    } else {
+        (6.13, 1_033_675)
+    }
+}
+
+fn compute_rows_per_group_from_stats(size_gb: f64, total_rows: i64, 
target_bytes: i64) -> usize {
+    let total_bytes = size_gb * 1024.0 * 1024.0 * 1024.0; // Convert GB to 
bytes
+    let bytes_per_row = total_bytes / total_rows as f64;
+
+    // Use default if target_bytes is not specified or invalid
+    let effective_target = if target_bytes <= 0 {
+        DEFAULT_PARQUET_ROW_GROUP_BYTES
+    } else {
+        target_bytes
+    };
+
+    debug!(
+        "Using hardcoded stats: {:.2} GB, {} rows, {:.2} bytes/row, target: {} 
bytes",
+        size_gb, total_rows, bytes_per_row, effective_target
+    );
+
+    let est = (effective_target as f64 / bytes_per_row).floor();
+    // Keep RG count <= 32k, but avoid too-tiny RGs
+    est.clamp(1000.0, 32767.0) as usize
+}
+
+fn writer_props_with_rowgroup(comp: ParquetCompression, rows_per_group: usize) 
-> WriterProperties {
+    WriterProperties::builder()
+        .set_compression(comp)
+        .set_max_row_group_size(rows_per_group)
+        .build()
+}
+
+fn write_parquet_with_rowgroup_bytes(
+    out_path: &PathBuf,
+    schema: SchemaRef,
+    all_batches: Vec<RecordBatch>,
+    target_rowgroup_bytes: i64,
+    comp: ParquetCompression,
+    scale_factor: f64,
+    parts: i32,
+) -> Result<()> {
+    let (mut size_gb, mut total_rows) = get_zone_table_stats(scale_factor);
+
+    // Use linear scaling stats for SF <= 1.0 with parts > 1
+    if scale_factor <= 1.0 && parts > 1 {
+        (size_gb, total_rows) = get_zone_table_stats(scale_factor / parts as 
f64);
+    }
+
+    debug!(
+        "size_gb={}, total_rows={} for scale_factor={}",
+        size_gb, total_rows, scale_factor
+    );
+    let rows_per_group =
+        compute_rows_per_group_from_stats(size_gb, total_rows, 
target_rowgroup_bytes);
+    let props = writer_props_with_rowgroup(comp, rows_per_group);
+
+    debug!(
+        "Using row group size: {} rows (based on hardcoded stats)",
+        rows_per_group
+    );
+
+    let mut writer = ArrowWriter::try_new(std::fs::File::create(out_path)?, 
schema, Some(props))?;
+
+    for batch in all_batches {
+        writer.write(&batch)?;
+    }
+    writer.close()?;
+    Ok(())
+}
+
+#[derive(Clone)]
+pub struct ZoneDfArgs {
+    pub scale_factor: f64,
+    pub output_dir: PathBuf,
+    pub parts: i32,
+    pub part: i32,
+    pub parquet_row_group_bytes: i64,
+    pub parquet_compression: ParquetCompression,
+}
+
+impl ZoneDfArgs {
+    fn output_filename(&self) -> PathBuf {
+        let filename = "zone.parquet".to_string();
+        self.output_dir.join(filename)
+    }
+}
+
+pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> {
+    if args.part < 1 || args.part > args.parts {
+        return Err(anyhow!(
+            "Invalid --part={} for --parts={}",
+            args.part,
+            args.parts
+        ));
+    }
+
+    info!(
+        "Starting zone parquet generation with scale factor {}",
+        args.scale_factor
+    );
+    debug!("Zone generation args: parts={}, part={}, output_dir={:?}, 
row_group_bytes={}, compression={:?}",
+           args.parts, args.part, args.output_dir, 
args.parquet_row_group_bytes, args.parquet_compression);
+
+    let subtypes = subtypes_for_scale_factor(args.scale_factor);
+    info!(
+        "Selected subtypes for SF {}: {:?}",
+        args.scale_factor, subtypes
+    );
+
+    let estimated_rows = estimated_total_rows_for_sf(args.scale_factor);
+    info!(
+        "Estimated total rows for SF {}: {}",
+        args.scale_factor, estimated_rows
+    );
+
+    let mut cfg = ConfigOptions::new();
+    cfg.execution.target_partitions = 1;
+    debug!("Created DataFusion config with target_partitions=1");
+
+    let rt: Arc<RuntimeEnv> = Arc::new(RuntimeEnvBuilder::new().build()?);
+    debug!("Built DataFusion runtime environment");
+
+    // Register S3 store for Overture bucket
+    let bucket = OVERTURE_S3_BUCKET;
+    info!("Registering S3 store for bucket: {}", bucket);
+    let s3 = AmazonS3Builder::new()
+        .with_bucket_name(bucket)
+        .with_skip_signature(true)
+        .with_region("us-west-2")
+        .build()?;
+
+    let s3_url = Url::parse(&format!("s3://{bucket}"))?;
+    let s3_store: Arc<dyn ObjectStore> = Arc::new(s3);
+    rt.register_object_store(&s3_url, s3_store);
+    debug!("Successfully registered S3 object store");
+
+    let ctx = SessionContext::new_with_config_rt(SessionConfig::from(cfg), rt);
+    debug!("Created DataFusion session context");
+
+    let url = zones_parquet_url();
+    info!("Reading parquet data from: {}", url);
+    let t_read_start = Instant::now();
+    let mut df = ctx.read_parquet(url, ParquetReadOptions::default()).await?;
+    let read_dur = t_read_start.elapsed();
+    info!("Successfully read parquet data in {:?}", read_dur);
+
+    // Build filter predicate
+    debug!("Building filter predicate for subtypes: {:?}", subtypes);
+    let mut pred = col("subtype").eq(lit("__never__"));
+    for s in subtypes_for_scale_factor(args.scale_factor) {
+        pred = pred.or(col("subtype").eq(lit(s)));
+    }
+    df = df.filter(pred.and(col("is_land").eq(lit(true))))?;
+    info!("Applied subtype and is_land filters");
+
+    // df = df.sort(vec![col("id").sort(true, true)])?;
+    // debug!("Applied sorting by id");
+
+    let total = estimated_total_rows_for_sf(args.scale_factor);
+    let i = (args.part as i64) - 1; // 0-based part index
+    let parts = args.parts as i64;
+
+    let base = total / parts;
+    let rem = total % parts;
+
+    // first `rem` parts get one extra row
+    let rows_this = base + if i < rem { 1 } else { 0 };
+    let offset = i * base + std::cmp::min(i, rem);
+
+    info!(
+        "Partitioning data: total_rows={}, parts={}, base={}, rem={}, 
this_part_rows={}, offset={}",
+        total, parts, base, rem, rows_this, offset
+    );
+
+    df = df.limit(offset as usize, Some(rows_this as usize))?;
+    debug!("Applied limit with offset={}, rows={}", offset, rows_this);
+
+    ctx.register_table(TableReference::bare("zone_filtered"), df.into_view())?;
+    debug!("Registered filtered data as 'zone_filtered' table");
+
+    let sql = format!(
+        r#"
+        SELECT
+          CAST(ROW_NUMBER() OVER (ORDER BY id) + {offset} AS BIGINT) AS 
z_zonekey,
+          COALESCE(id, '')            AS z_gersid,
+          COALESCE(country, '')       AS z_country,
+          COALESCE(region,  '')       AS z_region,
+          COALESCE(names.primary, '') AS z_name,
+          COALESCE(subtype, '')       AS z_subtype,
+          geometry                    AS z_boundary
+        FROM zone_filtered
+        "#
+    );
+    debug!("Executing SQL transformation with offset: {}", offset);
+    let df2 = ctx.sql(&sql).await?;
+    info!("SQL transformation completed successfully");
+
+    let t0 = Instant::now();
+    info!("Starting data collection...");
+    let batches = df2.clone().collect().await?;
+    let collect_dur = t0.elapsed();
+
+    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+    info!(
+        "Collected {} record batches with {} total rows in {:?}",
+        batches.len(),
+        total_rows,
+        collect_dur
+    );
+
+    std::fs::create_dir_all(&args.output_dir)?;
+    debug!("Created output directory: {:?}", args.output_dir);
+
+    let out = args.output_filename();
+    info!("Writing output to: {}", out.display());
+
+    debug!(
+        "Created parquet writer properties with compression: {:?}",
+        args.parquet_compression
+    );
+
+    // Convert DFSchema to Arrow Schema
+    let schema = Arc::new(Schema::new(
+        df2.schema()
+            .fields()
+            .iter()
+            .map(|f| f.as_ref().clone())
+            .collect::<Vec<_>>(),
+    ));
+    debug!(
+        "Converted DataFusion schema to Arrow schema with {} fields",
+        schema.fields().len()
+    );
+
+    let t1 = Instant::now();
+    info!(
+        "Starting parquet file write with row group size: {} bytes",
+        args.parquet_row_group_bytes
+    );
+    write_parquet_with_rowgroup_bytes(
+        &out,
+        schema,
+        batches,
+        args.parquet_row_group_bytes,
+        args.parquet_compression,
+        args.scale_factor,
+        args.parts,
+    )?;
+    let write_dur = t1.elapsed();
+
+    info!(
+        "Zone -> {} (part {}/{}). collect={:?}, write={:?}, total_rows={}",
+        out.display(),
+        args.part,
+        args.parts,
+        collect_dur,
+        write_dur,
+        total_rows
+    );
+
+    Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use parquet::basic::Compression;
+    use tempfile::TempDir;
+
+    fn create_test_args(scale_factor: f64, temp_dir: &TempDir) -> ZoneDfArgs {
+        ZoneDfArgs {
+            scale_factor,
+            output_dir: temp_dir.path().to_path_buf(),
+            parts: 1,
+            part: 1,
+            parquet_row_group_bytes: DEFAULT_PARQUET_ROW_GROUP_BYTES,
+            parquet_compression: Compression::SNAPPY,
+        }
+    }
+
+    #[tokio::test]
+    async fn test_zone_generation_invalid_part() {
+        let temp_dir = TempDir::new().unwrap();
+        let mut args = create_test_args(1.0, &temp_dir);
+        args.parts = 2;
+        args.part = 3; // Invalid part number
+
+        let result = generate_zone_parquet(args).await;
+        assert!(result.is_err(), "Should fail with invalid part number");
+    }
+
+    #[tokio::test]
+    async fn test_subtypes_for_different_scale_factors() {
+        // Test scale factor categorization
+        let sf_01_subtypes = subtypes_for_scale_factor(0.1);
+        assert_eq!(sf_01_subtypes, vec!["microhood", "macrohood", "county"]);
+
+        let sf_10_subtypes = subtypes_for_scale_factor(10.0);
+        assert_eq!(
+            sf_10_subtypes,
+            vec!["microhood", "macrohood", "county", "neighborhood"]
+        );
+
+        let sf_100_subtypes = subtypes_for_scale_factor(100.0);
+        assert!(sf_100_subtypes.contains(&"localadmin"));
+        assert!(sf_100_subtypes.contains(&"locality"));
+
+        let sf_1000_subtypes = subtypes_for_scale_factor(1000.0);
+        assert!(sf_1000_subtypes.contains(&"country"));
+    }
+
+    #[test]
+    fn test_partition_distribution_logic() {
+        // Test the mathematical logic for distributing rows across partitions
+        let total_rows = 100i64;
+        let parts = 3i64;
+
+        let mut collected_rows = Vec::new();
+        let mut collected_offsets = Vec::new();
+
+        // Simulate the partition calculation for each part
+        for part_idx in 0..parts {
+            let i = part_idx;
+            let base = total_rows / parts;
+            let rem = total_rows % parts;
+            let rows_this = base + if i < rem { 1 } else { 0 };
+            let offset = i * base + std::cmp::min(i, rem);
+
+            collected_rows.push(rows_this);
+            collected_offsets.push(offset);
+        }
+
+        // Verify partitioning logic
+        assert_eq!(collected_rows.iter().sum::<i64>(), total_rows); // All 
rows accounted for
+        assert_eq!(collected_offsets[0], 0); // First partition starts at 0
+
+        // Verify no gaps or overlaps between partitions
+        for i in 1..parts as usize {
+            let expected_offset = collected_offsets[i - 1] + collected_rows[i 
- 1];
+            assert_eq!(collected_offsets[i], expected_offset);
+        }
+
+        // Verify remainder distribution (first partitions get extra rows)
+        let remainder = (total_rows % parts) as usize;
+        for i in 0..remainder {
+            assert_eq!(collected_rows[i], collected_rows[remainder] + 1);
+        }
+    }
+
+    #[test]
+    fn test_rows_per_group_bounds() {
+        // Test that compute_rows_per_group_from_stats respects bounds
+
+        // Test minimum bound (should be at least 1000)
+        let rows_per_group_tiny = compute_rows_per_group_from_stats(0.001, 
1000, 1_000_000);
+        assert!(rows_per_group_tiny >= 1000);
+
+        // Test maximum bound (should not exceed 32767)
+        let rows_per_group_huge = compute_rows_per_group_from_stats(1000.0, 
1000, 1);
+        assert!(rows_per_group_huge <= 32767);
+
+        // Test negative target bytes falls back to default
+        let rows_per_group_negative = compute_rows_per_group_from_stats(1.0, 
100000, -1);
+        let rows_per_group_default =
+            compute_rows_per_group_from_stats(1.0, 100000, 
DEFAULT_PARQUET_ROW_GROUP_BYTES);
+        assert_eq!(rows_per_group_negative, rows_per_group_default);
+    }
+
+    #[test]
+    fn test_subtype_selection_logic() {
+        // Test the cumulative nature of subtype selection
+        let base_subtypes = subtypes_for_scale_factor(1.0);
+        let sf10_subtypes = subtypes_for_scale_factor(10.0);
+        let sf100_subtypes = subtypes_for_scale_factor(100.0);
+        let sf1000_subtypes = subtypes_for_scale_factor(1000.0);
+
+        // Each higher scale factor should include all previous subtypes
+        for subtype in &base_subtypes {
+            assert!(sf10_subtypes.contains(subtype));
+            assert!(sf100_subtypes.contains(subtype));
+            assert!(sf1000_subtypes.contains(subtype));
+        }
+
+        for subtype in &sf10_subtypes {
+            assert!(sf100_subtypes.contains(subtype));
+            assert!(sf1000_subtypes.contains(subtype));
+        }
+
+        for subtype in &sf100_subtypes {
+            assert!(sf1000_subtypes.contains(subtype));
+        }
+
+        // Verify progressive addition
+        assert!(sf10_subtypes.len() > base_subtypes.len());
+        assert!(sf100_subtypes.len() > sf10_subtypes.len());
+        assert!(sf1000_subtypes.len() > sf100_subtypes.len());
+    }
+
+    #[test]
+    fn test_estimated_rows_scaling_consistency() {
+        // Test that estimated rows scale proportionally for SF < 1.0
+        let base_rows = estimated_total_rows_for_sf(1.0);
+        let half_rows = estimated_total_rows_for_sf(0.5);
+        let quarter_rows = estimated_total_rows_for_sf(0.25);
+
+        // Should scale proportionally (within rounding)
+        assert!((half_rows as f64 - (base_rows as f64 * 0.5)).abs() < 1.0);
+        assert!((quarter_rows as f64 - (base_rows as f64 * 0.25)).abs() < 1.0);
+
+        // Test that SF >= 1.0 gives discrete jumps (not proportional scaling)
+        let sf1_rows = estimated_total_rows_for_sf(1.0);
+        let sf5_rows = estimated_total_rows_for_sf(5.0);
+        let sf10_rows = estimated_total_rows_for_sf(10.0);
+
+        // These should be equal (same category)
+        assert_eq!(sf1_rows, sf5_rows);
+
+        // This should be different (different category)
+        assert_ne!(sf5_rows, sf10_rows);
+    }
+}
diff --git a/spatialbench-cli/tests/cli_integration.rs 
b/spatialbench-cli/tests/cli_integration.rs
index 5cfaf91..fa42806 100644
--- a/spatialbench-cli/tests/cli_integration.rs
+++ b/spatialbench-cli/tests/cli_integration.rs
@@ -1,3 +1,4 @@
+use arrow_array::RecordBatch;
 use assert_cmd::Command;
 use parquet::arrow::arrow_reader::{ArrowReaderOptions, 
ParquetRecordBatchReaderBuilder};
 use parquet::file::metadata::ParquetMetaDataReader;
@@ -6,7 +7,7 @@ use spatialbench_arrow::{RecordBatchIterator, TripArrow};
 use std::fs;
 use std::fs::File;
 use std::io::Read;
-use std::path::Path;
+use std::path::{Path, PathBuf};
 use std::sync::Arc;
 use tempfile::tempdir;
 
@@ -83,6 +84,112 @@ fn test_spatialbench_cli_tbl_scale_factor_v1() {
     }
 }
 
+/// Test zone parquet output determinism - same data should be generated every 
time
+#[tokio::test]
+async fn test_zone_deterministic_parts_generation() {
+    let temp_dir1 = tempdir().expect("Failed to create temporary directory 1");
+
+    Command::cargo_bin("spatialbench-cli")
+        .expect("Binary not found")
+        .arg("--format")
+        .arg("parquet")
+        .arg("--scale-factor")
+        .arg("1.0")
+        .arg("--output-dir")
+        .arg(temp_dir1.path())
+        .arg("--tables")
+        .arg("zone")
+        .arg("--parts")
+        .arg("100")
+        .arg("--part")
+        .arg("1")
+        .assert()
+        .success();
+
+    let zone_file1 = temp_dir1.path().join("zone.parquet");
+
+    // Reference file is a sf=0.01 zone table with z_boundary column removed
+    let reference_file = 
PathBuf::from("../spatialbench/data/sf-v1/zone.parquet");
+
+    assert!(
+        zone_file1.exists(),
+        "First zone.parquet file was not created"
+    );
+    assert!(
+        reference_file.exists(),
+        "Reference zone.parquet file does not exist"
+    );
+
+    let file1 = File::open(&zone_file1).expect("Failed to open generated 
zone.parquet file");
+    let file2 = File::open(&reference_file).expect("Failed to open reference 
zone.parquet file");
+
+    let reader1 = ParquetRecordBatchReaderBuilder::try_new(file1)
+        .expect("Failed to create reader for generated file")
+        .build()
+        .expect("Failed to build reader for generated file");
+
+    let reader2 = ParquetRecordBatchReaderBuilder::try_new(file2)
+        .expect("Failed to create reader for reference file")
+        .build()
+        .expect("Failed to build reader for reference file");
+
+    let batches1: Result<Vec<RecordBatch>, _> = reader1.collect();
+    let batches2: Result<Vec<RecordBatch>, _> = reader2.collect();
+
+    let batches1 = batches1.expect("Failed to read batches from generated 
file");
+    let batches2 = batches2.expect("Failed to read batches from reference 
file");
+
+    // Check that files are non-empty
+    assert!(
+        !batches1.is_empty(),
+        "Generated zone parquet file has no data"
+    );
+    assert!(
+        !batches2.is_empty(),
+        "Reference zone parquet file has no data"
+    );
+
+    // Check that both files have the same number of batches
+    assert_eq!(
+        batches1.len(),
+        batches2.len(),
+        "Different number of record batches"
+    );
+
+    // Compare each batch, excluding z_boundary column
+    for (i, (batch1, batch2)) in 
batches1.iter().zip(batches2.iter()).enumerate() {
+        assert_eq!(
+            batch1.num_rows(),
+            batch2.num_rows(),
+            "Batch {} has different number of rows",
+            i
+        );
+
+        let schema1 = batch1.schema();
+
+        // Compare all columns except z_boundary
+        for field in schema1.fields() {
+            let column_name = field.name();
+            if column_name == "z_boundary" {
+                continue;
+            }
+
+            let col1 = batch1
+                .column_by_name(column_name)
+                .unwrap_or_else(|| panic!("Column {} not found in generated 
file", column_name));
+            let col2 = batch2
+                .column_by_name(column_name)
+                .unwrap_or_else(|| panic!("Column {} not found in reference 
file", column_name));
+
+            assert_eq!(
+                col1, col2,
+                "Column {} differs between generated and reference files in 
batch {}",
+                column_name, i
+            );
+        }
+    }
+}
+
 /// Test generating the trip table using --parts and --part options
 #[test]
 fn test_spatialbench_cli_parts() {
@@ -231,6 +338,36 @@ async fn test_write_parquet_row_group_size_default() {
     );
 }
 
+#[tokio::test]
+async fn test_zone_write_parquet_row_group_size_default() {
+    // Run the CLI command to generate parquet data with default settings
+    let output_dir = tempdir().unwrap();
+    Command::cargo_bin("spatialbench-cli")
+        .expect("Binary not found")
+        .arg("--format")
+        .arg("parquet")
+        .arg("--scale-factor")
+        .arg("1")
+        .arg("--tables")
+        .arg("zone")
+        .arg("--output-dir")
+        .arg(output_dir.path())
+        .arg("--parts")
+        .arg("10")
+        .arg("--part")
+        .arg("1")
+        .assert()
+        .success();
+
+    expect_row_group_sizes(
+        output_dir.path(),
+        vec![RowGroups {
+            table: "zone",
+            row_group_bytes: vec![91351103],
+        }],
+    );
+}
+
 #[tokio::test]
 async fn test_write_parquet_row_group_size_20mb() {
     // Run the CLI command to generate parquet data with larger row group size
@@ -279,6 +416,38 @@ async fn test_write_parquet_row_group_size_20mb() {
     );
 }
 
+#[tokio::test]
+async fn test_zone_write_parquet_row_group_size_20mb() {
+    // Run the CLI command to generate parquet data with larger row group size
+    let output_dir = tempdir().unwrap();
+    Command::cargo_bin("spatialbench-cli")
+        .expect("Binary not found")
+        .arg("--format")
+        .arg("parquet")
+        .arg("--scale-factor")
+        .arg("1")
+        .arg("--tables")
+        .arg("zone")
+        .arg("--output-dir")
+        .arg(output_dir.path())
+        .arg("--parquet-row-group-bytes")
+        .arg("20000000") // 20 MB
+        .arg("--parts")
+        .arg("10")
+        .arg("--part")
+        .arg("1")
+        .assert()
+        .success();
+
+    expect_row_group_sizes(
+        output_dir.path(),
+        vec![RowGroups {
+            table: "zone",
+            row_group_bytes: vec![16284828, 19041211, 20977976, 17291992, 
18079175],
+        }],
+    );
+}
+
 #[test]
 fn test_spatialbench_cli_part_no_parts() {
     let temp_dir = tempdir().expect("Failed to create temporary directory");
@@ -402,6 +571,27 @@ async fn test_incompatible_options_warnings() {
         ));
 }
 
+#[test]
+fn test_zone_generation_tbl_fails() {
+    let temp_dir = tempdir().expect("Failed to create temporary directory");
+
+    Command::cargo_bin("spatialbench-cli")
+        .expect("Binary not found")
+        .arg("--format")
+        .arg("tbl")
+        .arg("--scale-factor")
+        .arg("1")
+        .arg("--tables")
+        .arg("zone")
+        .arg("--output-dir")
+        .arg(temp_dir.path())
+        .assert()
+        .failure()
+        .stderr(predicates::str::contains(
+            "Zone table is only supported in --format=parquet",
+        ));
+}
+
 fn read_gzipped_file_to_string<P: AsRef<Path>>(path: P) -> Result<String, 
std::io::Error> {
     let file = File::open(path)?;
     let mut decoder = flate2::read::GzDecoder::new(file);
diff --git a/spatialbench/data/sf-v1/zone.parquet 
b/spatialbench/data/sf-v1/zone.parquet
new file mode 100644
index 0000000..0b1740d
Binary files /dev/null and b/spatialbench/data/sf-v1/zone.parquet differ
diff --git a/spatialbench/src/csv.rs b/spatialbench/src/csv.rs
index b9b5a9d..37eb9ab 100644
--- a/spatialbench/src/csv.rs
+++ b/spatialbench/src/csv.rs
@@ -1,6 +1,6 @@
 //! CSV formatting support for the row struct objects generated by the library.
 
-use crate::generators::{Building, Customer, Driver, Trip, Vehicle, Zone};
+use crate::generators::{Building, Customer, Driver, Trip, Vehicle};
 use core::fmt;
 use std::fmt::Display;
 
@@ -259,50 +259,3 @@ impl Display for BuildingCsv<'_> {
         )
     }
 }
-
-/// Write [`Zone`]s in CSV format.
-///
-/// # Example
-/// ```
-/// # use spatialbench::generators::ZoneGenerator;
-/// # use spatialbench::csv::ZoneCsv;
-/// # use std::fmt::Write;
-/// // Output the first 3 rows in CSV format
-/// let generator = ZoneGenerator::new(0.001, 1, 1);
-/// let mut csv = String::new();
-/// writeln!(&mut csv, "{}", ZoneCsv::header()).unwrap(); // write header
-/// for line in generator.iter().take(3) {
-///   // write line using CSV formatter
-///   writeln!(&mut csv, "{}", ZoneCsv::new(line)).unwrap();
-/// }
-/// ```
-pub struct ZoneCsv {
-    inner: Zone,
-}
-
-impl ZoneCsv {
-    pub fn new(inner: Zone) -> Self {
-        Self { inner }
-    }
-
-    /// Returns the CSV header for the Zone table
-    pub fn header() -> &'static str {
-        "z_zonekey,z_gersid,z_country,z_region,z_name,z_subtype,z_boundary"
-    }
-}
-
-impl Display for ZoneCsv {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(
-            f,
-            "{},{},{},{},{},{},\"{:?}\"",
-            self.inner.z_zonekey,
-            self.inner.z_gersid,
-            self.inner.z_country,
-            self.inner.z_region,
-            self.inner.z_name,
-            self.inner.z_subtype,
-            self.inner.z_boundary,
-        )
-    }
-}
diff --git a/spatialbench/src/generators.rs b/spatialbench/src/generators.rs
index 109c0ce..476cbd6 100644
--- a/spatialbench/src/generators.rs
+++ b/spatialbench/src/generators.rs
@@ -1,4 +1,4 @@
-//! Generators for each TPC-H Tables
+//! Generators for each Spatial Bench Tables
 use crate::dates;
 use crate::dates::{GenerateUtils, TPCHDate};
 use crate::decimal::TPCHDecimal;
@@ -14,17 +14,12 @@ use crate::spatial::utils::continent::{build_continent_cdf, 
WeightedTarget};
 use crate::spatial::utils::{hash_to_unit_u64, spider_seed_for_index};
 use crate::spatial::{ContinentAffines, SpatialDefaults, SpatialGenerator};
 use crate::text::TextPool;
-use duckdb::Connection;
-use geo::Geometry;
 use geo::Point;
-use geozero::{wkb::Wkb, ToGeo};
-use log::{debug, error, info};
 use rand::rngs::StdRng;
 use rand::{Rng, SeedableRng};
 use std::convert::TryInto;
 use std::fmt;
 use std::fmt::Display;
-use std::time::Instant;
 
 /// A Vehicle Manufacturer, formatted as `"Manufacturer#<n>"`
 #[derive(Debug, Clone, Copy, PartialEq)]
@@ -1439,337 +1434,6 @@ impl<'a> Iterator for BuildingGeneratorIterator<'a> {
     }
 }
 
-/// Represents a Zone in the dataset
-#[derive(Debug, Clone, PartialEq)]
-pub struct Zone {
-    /// Primary key
-    pub z_zonekey: i64,
-    /// GERS ID of the zone
-    pub z_gersid: String,
-    /// Country of the zone
-    pub z_country: String,
-    /// Region of the zone
-    pub z_region: String,
-    /// Name of the zone
-    pub z_name: String,
-    /// Subtype of the zone
-    pub z_subtype: String,
-    /// Boundary geometry in WKT format
-    pub z_boundary: Geometry,
-}
-
-impl Display for Zone {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(
-            f,
-            "{}|{}|{}|{}|{}|{}|{:?}|",
-            self.z_zonekey,
-            self.z_gersid,
-            self.z_country,
-            self.z_region,
-            self.z_name,
-            self.z_subtype,
-            self.z_boundary
-        )
-    }
-}
-
-/// Generator for [`Zone`]s that loads from a parquet file in S3
-#[derive(Debug, Clone)]
-pub struct ZoneGenerator {
-    scale_factor: f64,
-    part: i32,
-    part_count: i32,
-}
-
-impl ZoneGenerator {
-    /// S3 URL for the zones parquet file
-    const OVERTURE_RELEASE_DATE: &'static str = "2025-08-20.1";
-    const OVERTURE_S3_BUCKET: &'static str = "overturemaps-us-west-2";
-    const OVERTURE_S3_PREFIX: &'static str = "release";
-
-    /// Gets the S3 URL for the zones parquet file
-    fn get_zones_parquet_url() -> String {
-        format!(
-            "s3://{}/{}/{}/theme=divisions/type=division_area/*",
-            Self::OVERTURE_S3_BUCKET,
-            Self::OVERTURE_S3_PREFIX,
-            Self::OVERTURE_RELEASE_DATE
-        )
-    }
-
-    /// Get zone subtypes based on scale factor
-    fn get_zone_subtypes_for_scale_factor(scale_factor: f64) -> Vec<&'static 
str> {
-        let mut subtypes = vec!["microhood", "macrohood", "county"];
-
-        if scale_factor >= 10.0 {
-            subtypes.extend_from_slice(&["neighborhood"]);
-        }
-
-        if scale_factor >= 100.0 {
-            subtypes.extend_from_slice(&["localadmin", "locality", "region", 
"dependency"]);
-        }
-
-        if scale_factor >= 1000.0 {
-            subtypes.push("country");
-        }
-
-        subtypes
-    }
-
-    /// Calculate total zones for a given scale factor based on subtype counts
-    fn calculate_total_zones_for_scale_factor(scale_factor: f64) -> i64 {
-        let subtypes = Self::get_zone_subtypes_for_scale_factor(scale_factor);
-        let mut total = 0i64;
-
-        for subtype in subtypes {
-            let count = match subtype {
-                "microhood" => 74797,
-                "macrohood" => 42619,
-                "neighborhood" => 298615,
-                "county" => 39680,
-                "localadmin" => 19007,
-                "locality" => 555834,
-                "region" => 4714,
-                "dependency" => 105,
-                "country" => 378,
-                _ => 0,
-            };
-            total += count;
-        }
-
-        // Scale down for testing purposes
-        if scale_factor < 1.0 {
-            total = (total as f64 * scale_factor).ceil() as i64;
-        }
-
-        total
-    }
-
-    /// Create a new zone generator with streaming approach
-    pub fn new(scale_factor: f64, part: i32, part_count: i32) -> Self {
-        let start = Instant::now();
-        info!(
-            "Creating ZoneGenerator with scale_factor={}, part={}, 
part_count={}",
-            scale_factor, part, part_count
-        );
-        let elapsed = start.elapsed();
-        info!("ZoneGenerator created in {:?}", elapsed);
-
-        Self {
-            scale_factor,
-            part,
-            part_count,
-        }
-    }
-
-    /// Calculate zones per partition
-    fn calculate_zones_per_part(&self) -> i64 {
-        let total_zones = 
Self::calculate_total_zones_for_scale_factor(self.scale_factor);
-        (total_zones as f64 / self.part_count as f64).ceil() as i64
-    }
-
-    /// Calculate offset for this partition
-    fn calculate_offset(&self) -> i64 {
-        let zones_per_part = self.calculate_zones_per_part();
-        (self.part - 1) as i64 * zones_per_part
-    }
-
-    /// Load zones for this specific partition using LIMIT and OFFSET
-    fn load_partition_zones(&self) -> Result<Vec<Zone>, Box<dyn 
std::error::Error>> {
-        info!(
-            "Loading zones for partition {} of {}",
-            self.part, self.part_count
-        );
-        let start_total = Instant::now();
-
-        // Create a connection to DuckDB
-        let t0 = Instant::now();
-        let conn = Connection::open_in_memory()?;
-        debug!("Opened DuckDB connection in {:?}", t0.elapsed());
-
-        // Install and load required extensions
-        let t1 = Instant::now();
-        conn.execute_batch(
-            r#"
-            INSTALL httpfs;
-            LOAD httpfs;
-            INSTALL spatial;
-            LOAD spatial;
-
-            -- Public bucket: force unsigned requests
-            SET s3_access_key_id = '';
-            SET s3_secret_access_key = '';
-            SET s3_session_token = '';
-
-            -- Region + endpoint for the Overture bucket
-            SET s3_region = 'us-west-2';
-            SET s3_endpoint = 's3.us-west-2.amazonaws.com';
-            "#,
-        )?;
-        debug!(
-            "Installed and loaded DuckDB extensions in {:?}",
-            t1.elapsed()
-        );
-
-        // Calculate partition parameters
-        let zones_per_part = self.calculate_zones_per_part();
-        let offset = self.calculate_offset();
-        let zones_url = Self::get_zones_parquet_url();
-        let subtypes = 
Self::get_zone_subtypes_for_scale_factor(self.scale_factor);
-
-        info!(
-            "Partition {}: LIMIT {} OFFSET {} from {} with subtypes: {:?}",
-            self.part, zones_per_part, offset, zones_url, subtypes
-        );
-
-        // Build the subtype filter
-        let subtype_filter = if subtypes.is_empty() {
-            return Err(format!(
-                "No subtypes found for scale factor {} in partition {}. This 
indicates a logic error.",
-                self.scale_factor,
-                self.part
-            ).into());
-        } else {
-            format!(
-                "subtype IN ({})",
-                subtypes
-                    .iter()
-                    .map(|s| format!("'{}'", s))
-                    .collect::<Vec<_>>()
-                    .join(", ")
-            )
-        };
-
-        // Combine subtype filter with is_land filter
-        let combined_filter = format!("{} AND is_land = true", subtype_filter);
-
-        let query = format!(
-            "SELECT
-                COALESCE(id, '') as z_gersid,
-                COALESCE(country, '') as z_country,
-                COALESCE(region, '') as z_region,
-                COALESCE(names.primary, '') as z_name,
-                COALESCE(subtype, '') as z_subtype,
-                ST_AsWKB(geometry) as z_boundary
-             FROM read_parquet('{}', hive_partitioning=1)
-             WHERE {}
-             LIMIT {} OFFSET {};",
-            zones_url, combined_filter, zones_per_part, offset
-        );
-        debug!("Generated partition query: {}", query);
-
-        // Prepare + execute query
-        let t2 = Instant::now();
-        let mut stmt = conn.prepare(&query)?;
-        debug!("Prepared statement in {:?}", t2.elapsed());
-
-        let t3 = Instant::now();
-        let mut rows = stmt.query([])?;
-        debug!("Executed query and got row iterator in {:?}", t3.elapsed());
-
-        // Iterate rows and parse geometries
-        let mut zones = Vec::new();
-        let mut zone_id = offset + 1;
-
-        let t4 = Instant::now();
-        while let Ok(Some(row)) = rows.next() {
-            let z_gersid: String = row.get(0)?;
-            let z_country: String = row.get(1)?;
-            let z_region: String = row.get(2)?;
-            let z_name: String = row.get(3)?;
-            let z_subtype: String = row.get(4)?;
-            let wkb_bytes: Vec<u8> = row.get(5)?;
-            let geometry: Geometry = Wkb(&wkb_bytes).to_geo()?;
-
-            zones.push(Zone {
-                z_zonekey: zone_id,
-                z_gersid,
-                z_country,
-                z_region,
-                z_name,
-                z_subtype,
-                z_boundary: geometry,
-            });
-
-            if zones.len() % 1000 == 0 {
-                debug!("Loaded {} zones for partition {}", zones.len(), 
self.part);
-            }
-            zone_id += 1;
-        }
-
-        info!(
-            "Partition {} loaded: {} zones in {:?}",
-            self.part,
-            zones.len(),
-            t4.elapsed()
-        );
-
-        info!("Total partition load took {:?}", start_total.elapsed());
-        Ok(zones)
-    }
-
-    /// Return the row count for the given part
-    pub fn calculate_row_count(&self) -> i64 {
-        let total_zones = 
Self::calculate_total_zones_for_scale_factor(self.scale_factor);
-        let zones_per_part = self.calculate_zones_per_part();
-        let offset = self.calculate_offset();
-
-        // Don't exceed total available zones
-        std::cmp::min(zones_per_part, total_zones - offset).max(0)
-    }
-
-    /// Returns an iterator over the zone rows
-    pub fn iter(&self) -> ZoneGeneratorIterator {
-        ZoneGeneratorIterator::new(self.clone())
-    }
-}
-
-impl IntoIterator for ZoneGenerator {
-    type Item = Zone;
-    type IntoIter = ZoneGeneratorIterator;
-
-    fn into_iter(self) -> Self::IntoIter {
-        self.iter()
-    }
-}
-
-/// Iterator that generates Zone rows by loading partition data on-demand
-#[derive(Debug)]
-pub struct ZoneGeneratorIterator {
-    zones: Vec<Zone>,
-    index: usize,
-}
-
-impl ZoneGeneratorIterator {
-    fn new(generator: ZoneGenerator) -> Self {
-        // Load zones for this partition only
-        let zones = generator.load_partition_zones().unwrap_or_else(|e| {
-            error!(
-                "Failed to load zones for partition {}: {}",
-                generator.part, e
-            );
-            Vec::new()
-        });
-
-        ZoneGeneratorIterator { zones, index: 0 }
-    }
-}
-
-impl Iterator for ZoneGeneratorIterator {
-    type Item = Zone;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        if self.index >= self.zones.len() {
-            return None;
-        }
-
-        let zone = self.zones[self.index].clone();
-        self.index += 1;
-        Some(zone)
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -1908,78 +1572,4 @@ mod tests {
         assert_eq!(first.b_buildingkey, 2);
         assert_eq!(first.to_string(), "2|blush|POLYGON((124.218033476 
10.538071565,124.215762091 10.536069114,124.214352934 
10.536014944,124.212486371 10.539913704,124.217919324 
10.539075339,124.218033476 10.538071565))|")
     }
-
-    #[test]
-    fn test_zone_generation() {
-        // Create a generator with a small scale factor
-        let generator = ZoneGenerator::new(0.001, 1, 1);
-        let zones: Vec<_> = generator.into_iter().collect();
-
-        assert_eq!(zones.len(), 158);
-
-        // Check first zone
-        let first = &zones[0];
-        assert_eq!(first.z_zonekey, 1);
-        // The first zone is now a county due to the is_land filter and county 
being in base subtypes
-        assert_eq!(first.z_subtype, "county");
-        // Verify the string format matches the expected pattern (but don't 
check exact content since it's dynamic)
-        let expected_pattern = format!(
-            "{}|{}|{}|{}|{}|{}|{:?}|",
-            first.z_zonekey,
-            first.z_gersid,
-            first.z_country,
-            first.z_region,
-            first.z_name,
-            first.z_subtype,
-            first.z_boundary
-        );
-        assert_eq!(first.to_string(), expected_pattern);
-    }
-
-    #[test]
-    fn test_zone_subtype_filters() {
-        // Test scale factor 0-10: should include microhood, macrohood, and 
county
-        let subtypes_0_10 = 
ZoneGenerator::get_zone_subtypes_for_scale_factor(5.0);
-        assert_eq!(subtypes_0_10, vec!["microhood", "macrohood", "county"]);
-
-        // Test scale factor 10-100: should include microhood, macrohood, 
county, and neighborhood
-        let subtypes_10_100 = 
ZoneGenerator::get_zone_subtypes_for_scale_factor(50.0);
-        assert_eq!(
-            subtypes_10_100,
-            vec!["microhood", "macrohood", "county", "neighborhood"]
-        );
-
-        // Test scale factor 100-1000: should include all except country
-        let subtypes_100_1000 = 
ZoneGenerator::get_zone_subtypes_for_scale_factor(500.0);
-        assert_eq!(
-            subtypes_100_1000,
-            vec![
-                "microhood",
-                "macrohood",
-                "county",
-                "neighborhood",
-                "localadmin",
-                "locality",
-                "region",
-                "dependency"
-            ]
-        );
-
-        // Test scale factor 1000+: should include all subtypes
-        let subtypes_1000_plus = 
ZoneGenerator::get_zone_subtypes_for_scale_factor(2000.0);
-        assert_eq!(
-            subtypes_1000_plus,
-            vec![
-                "microhood",
-                "macrohood",
-                "county",
-                "neighborhood",
-                "localadmin",
-                "locality",
-                "region",
-                "dependency",
-                "country"
-            ]
-        );
-    }
 }
diff --git a/spatialbench/tests/integration_tests.rs 
b/spatialbench/tests/integration_tests.rs
index 6108279..1880d79 100644
--- a/spatialbench/tests/integration_tests.rs
+++ b/spatialbench/tests/integration_tests.rs
@@ -3,7 +3,6 @@
 
 use spatialbench::generators::{
     BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, 
VehicleGenerator,
-    ZoneGenerator,
 };
 
 struct TestIntoIterator<G>
@@ -103,22 +102,6 @@ fn test_vehicle_into_iter() {
     }
 }
 
-#[test]
-fn test_zone_into_iter() {
-    {
-        assert_eq!(
-            TestIntoIterator::new(ZoneGenerator::new(0.001, 1, 1))
-                .to_string_vec(5)
-                .len(),
-            5
-        );
-    }
-    {
-        let zone = ZoneGenerator::new(0.001, 1, 1);
-        assert_eq!(TestIntoIterator::new(zone).to_string_vec(5).len(), 5);
-    }
-}
-
 #[test]
 fn test_building_into_iter() {
     {

Reply via email to