This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git


The following commit(s) were added to refs/heads/main by this push:
     new 59dff99  Feat: Support creating multiple files with single --parts 
command (#57)
59dff99 is described below

commit 59dff99c1f85f9fa0b1566ff562ddcae81d135f6
Author: Pranav Toggi <[email protected]>
AuthorDate: Mon Oct 27 11:21:43 2025 -0700

    Feat: Support creating multiple files with single --parts command (#57)
    
    * temp: rename to tpchgen
    
    * refactor zone generation for readability
    
    * allow generating multi-part zone
    
    * fmt, clippy fixes
    
    * Automatically create multiple files with single `--parts` command
    
    * Improve code readability and logging in multi-part generation
    
    * Rename tpchgen to spatialbench and update related references
    
    * fmt fix
    
    * update generate_data.py script
    
    * fix test
    
    * Apply suggestions from code review
    
    Co-authored-by: Copilot <[email protected]>
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 README.md                                 |  13 +-
 spatialbench-cli/src/main.rs              | 298 +++++-------------
 spatialbench-cli/src/output_plan.rs       | 267 ++++++++++++++++
 spatialbench-cli/src/plan.rs              |  36 ++-
 spatialbench-cli/src/runner.rs            | 355 +++++++++++++++++++++
 spatialbench-cli/src/zone/config.rs       |  55 ++++
 spatialbench-cli/src/zone/datasource.rs   |  95 ++++++
 spatialbench-cli/src/zone/main.rs         |  64 ++++
 spatialbench-cli/src/zone/mod.rs          |  88 ++++++
 spatialbench-cli/src/zone/partition.rs    |  94 ++++++
 spatialbench-cli/src/zone/stats.rs        | 161 ++++++++++
 spatialbench-cli/src/zone/transform.rs    |  50 +++
 spatialbench-cli/src/zone/writer.rs       |  94 ++++++
 spatialbench-cli/src/zone_df.rs           | 503 ------------------------------
 spatialbench-cli/tests/cli_integration.rs | 254 +++++++++++++--
 tools/generate_data.py                    |  30 +-
 16 files changed, 1654 insertions(+), 803 deletions(-)

diff --git a/README.md b/README.md
index 3ee105e..2ac7140 100644
--- a/README.md
+++ b/README.md
@@ -95,13 +95,7 @@ Key performance benefits:
 
 SpatialBench is a Rust-based fork of the tpchgen-rs project. It preserves the 
original’s high-performance, multi-threaded, streaming architecture, while 
extending it with a spatial star schema and geometry generation logic.
 
-You can build the SpatialBench data generator using Cargo:
-
-```bash
-cargo build --release
-```
-
-Alternatively, install it directly using:
+You can install the SpatialBench data generator using Cargo:
 
 ```bash
 cargo install --path ./spatialbench-cli
@@ -133,10 +127,7 @@ spatialbench-cli -s 1 --format=parquet --tables 
trip,building --output-dir sf1-p
 #### Partitioned Output Example
 
 ```bash
-for PART in $(seq 1 4); do
-  mkdir part-$PART
-  spatialbench-cli -s 10 --tables trip,building --output-dir part-$PART 
--parts 4 --part $PART
-done
+spatialbench-cli -s 10 --tables trip,building --parts 4
 ```
 
 #### Generate Multiple Parquet Files of Similar Size
diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs
index 79eee80..9456f95 100644
--- a/spatialbench-cli/src/main.rs
+++ b/spatialbench-cli/src/main.rs
@@ -4,71 +4,31 @@
 //! API wise to the original dbgen tool, as in we use the same command line 
flags
 //! and arguments.
 //!
-//! ```
-//! USAGE:
-//!     spatialbench-cli [OPTIONS]
-//!
-//! OPTIONS:
-//!     -h, --help                    Prints help information
-//!     -V, --version                 Prints version information
-//!     -s, --scale-factor <FACTOR>  Scale factor for the data generation 
(default: 1)
-//!     -T, --tables <TABLES>        Comma-separated list of tables to 
generate (default: all)
-//!     -f, --format <FORMAT>        Output format: parquet, tbl or csv 
(default: parquet)
-//!     -o, --output-dir <DIR>       Output directory (default: current 
directory)
-//!     -p, --parts <N>              Number of parts to split generation into 
(default: 1)
-//!         --part <N>               Which part to generate (1-based, default: 
1)
-//!     -n, --num-threads <N>        Number of threads to use (default: number 
of CPUs)
-//!     -c, --parquet-compression <C> Parquet compression codec, e.g., SNAPPY, 
ZSTD(1), UNCOMPRESSED (default: SNAPPY)
-//!         --parquet-row-group-size <N> Target size in bytes per row group in 
Parquet files (default: 134,217,728)
-//!     -v, --verbose                Verbose output
-//!         --stdout                 Write output to stdout instead of files
-//!```
-//!
-//! # Logging:
-//! Use the `-v` flag or `RUST_LOG` environment variable to control logging 
output.
-//!
-//! `-v` sets the log level to `info` and ignores the `RUST_LOG` environment 
variable.
-//!
-//! # Examples
-//! ```
-//! # see all info output
-//! spatialbench-cli -s 1 -v
-//!
-//! # same thing using RUST_LOG
-//! RUST_LOG=info spatialbench-cli -s 1
-//!
-//! # see all debug output
-//! RUST_LOG=debug spatialbench -s 1
-//! ```
+//! See the documentation on [`Cli`] for more information on the command line
 mod csv;
 mod generate;
+mod output_plan;
 mod parquet;
 mod plan;
+mod runner;
 mod spatial_config_file;
 mod statistics;
 mod tbl;
-mod zone_df;
+mod zone;
 
-use crate::csv::*;
-use crate::generate::{generate_in_chunks, Sink, Source};
+use crate::generate::Sink;
+use crate::output_plan::OutputPlanGenerator;
 use crate::parquet::*;
 use crate::plan::{GenerationPlan, DEFAULT_PARQUET_ROW_GROUP_BYTES};
 use crate::spatial_config_file::parse_yaml;
 use crate::statistics::WriteStatistics;
-use crate::tbl::*;
 use ::parquet::basic::Compression;
 use clap::builder::TypedValueParser;
 use clap::{Parser, ValueEnum};
 use log::{debug, info, LevelFilter};
 use spatialbench::distribution::Distributions;
-use spatialbench::generators::{
-    BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, 
VehicleGenerator,
-};
 use spatialbench::spatial::overrides::{set_overrides, SpatialOverrides};
 use spatialbench::text::TextPool;
-use spatialbench_arrow::{
-    BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow, 
VehicleArrow,
-};
 use std::fmt::Display;
 use std::fs::{self, File};
 use std::io::{self, BufWriter, Stdout, Write};
@@ -79,7 +39,34 @@ use std::time::Instant;
 #[derive(Parser)]
 #[command(name = "spatialbench")]
 #[command(version)]
-#[command(about = "TPC-H Data Generator", long_about = None)]
+#[command(
+    // -h output
+    about = "TPC-H Data Generator",
+    // --help output
+    long_about = r#"
+TPCH Data Generator (https://github.com/clflushopt/spatialbench-rs)
+
+By default each table is written to a single file named 
<output_dir>/<table>.<format>
+
+If `--part` option is specified, each table is written to a subdirectory in
+multiple files named <output_dir>/<table>/<table>.<part>.<format>
+
+Examples
+
+# Generate all tables at scale factor 1 (1GB) in TBL format to /tmp/tpch 
directory:
+
+spatialbench-cli -s 1 --output-dir=/tmp/tpch
+
+# Generate the lineitem table at scale factor 100 in 10 Apache Parquet files to
+# /tmp/tpch/lineitem
+
+spatialbench-cli -s 100 --tables=lineitem --format=parquet --parts=10 
--output-dir=/tmp/tpch
+
+# Generate scale factor one in current directory, seeing debug output
+
+RUST_LOG=debug spatialbench -s 1
+"#
+)]
 struct Cli {
     /// Scale factor to create
     #[arg(short, long, default_value_t = 1.)]
@@ -97,13 +84,11 @@ struct Cli {
     #[arg(long = "config")]
     config: Option<PathBuf>,
 
-    /// Number of part(itions) to generate (manual parallel generation)
+    /// Number of part(itions) to generate. If not specified creates a single 
file per table
     #[arg(short, long)]
     parts: Option<i32>,
 
-    /// Which part(ition) to generate (1-based)
-    ///
-    /// If not specified, generates all parts
+    /// Which part(ition) to generate (1-based). If not specified, generates 
all parts
     #[arg(long)]
     part: Option<i32>,
 
@@ -132,6 +117,9 @@ struct Cli {
     parquet_compression: Compression,
 
     /// Verbose output
+    ///
+    /// When specified, sets the log level to `info` and ignores the `RUST_LOG`
+    /// environment variable. When not specified, uses `RUST_LOG`
     #[arg(short, long, default_value_t = false)]
     verbose: bool,
 
@@ -142,11 +130,11 @@ struct Cli {
     /// Target size in row group bytes in Parquet files
     ///
     /// Row groups are the typical unit of parallel processing and compression
-    /// in Parquet. With many query engines, smaller row groups enable better
+    /// with many query engines. Therefore, smaller row groups enable better
     /// parallelism and lower peak memory use but may reduce compression
     /// efficiency.
     ///
-    /// Note: parquet files are limited to 32k row groups, so at high scale
+    /// Note: Parquet files are limited to 32k row groups, so at high scale
     /// factors, the row group size may be increased to keep the number of row
     /// groups under this limit.
     ///
@@ -257,46 +245,6 @@ async fn main() -> io::Result<()> {
     cli.main().await
 }
 
-/// macro to create a Cli function for generating a table
-///
-/// Arguments:
-/// $FUN_NAME: name of the function to create
-/// $TABLE: The [`Table`] to generate
-/// $GENERATOR: The generator type to use
-/// $TBL_SOURCE: The [`Source`] type to use for TBL format
-/// $CSV_SOURCE: The [`Source`] type to use for CSV format
-/// $PARQUET_SOURCE: The [`RecordBatchIterator`] type to use for Parquet format
-macro_rules! define_generate {
-    ($FUN_NAME:ident,  $TABLE:expr, $GENERATOR:ident, $TBL_SOURCE:ty, 
$CSV_SOURCE:ty, $PARQUET_SOURCE:ty) => {
-        async fn $FUN_NAME(&self) -> io::Result<()> {
-            let filename = self.output_filename($TABLE);
-            let plan = GenerationPlan::try_new(
-                &$TABLE,
-                self.format,
-                self.scale_factor,
-                self.part,
-                self.parts,
-                self.parquet_row_group_bytes,
-            )
-            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
-            let scale_factor = self.scale_factor;
-            info!("Writing table {} (SF={scale_factor}) to {filename}", 
$TABLE);
-            debug!("Plan: {plan}");
-            let gens = plan
-                .into_iter()
-                .map(move |(part, num_parts)| $GENERATOR::new(scale_factor, 
part, num_parts));
-            match self.format {
-                OutputFormat::Tbl => self.go(&filename, 
gens.map(<$TBL_SOURCE>::new)).await,
-                OutputFormat::Csv => self.go(&filename, 
gens.map(<$CSV_SOURCE>::new)).await,
-                OutputFormat::Parquet => {
-                    self.go_parquet(&filename, 
gens.map(<$PARQUET_SOURCE>::new))
-                        .await
-                }
-            }
-        }
-    };
-}
-
 impl Cli {
     /// Main function to run the generation
     async fn main(self) -> io::Result<()> {
@@ -368,15 +316,6 @@ impl Cli {
             ]
         };
 
-        // force the creation of the distributions and text pool to so it 
doesn't
-        // get charged to the first table
-        let start = Instant::now();
-        debug!("Creating distributions and text pool");
-        Distributions::static_default();
-        TextPool::get_or_init_default();
-        let elapsed = start.elapsed();
-        info!("Created static distributions and text pools in {elapsed:?}");
-
         // Warn if parquet specific options are set but not generating parquet
         if self.format != OutputFormat::Parquet {
             if self.parquet_compression != Compression::SNAPPY {
@@ -391,131 +330,58 @@ impl Cli {
             }
         }
 
-        // Generate each table
+        // Determine what files to generate
+        let mut output_plan_generator = OutputPlanGenerator::new(
+            self.format,
+            self.scale_factor,
+            self.parquet_compression,
+            self.parquet_row_group_bytes,
+            self.stdout,
+            self.output_dir.clone(),
+        );
+
         for table in tables {
-            match table {
-                Table::Vehicle => self.generate_vehicle().await?,
-                Table::Driver => self.generate_driver().await?,
-                Table::Customer => self.generate_customer().await?,
-                Table::Trip => self.generate_trip().await?,
-                Table::Building => self.generate_building().await?,
-                Table::Zone => self.generate_zone().await?,
+            if table == Table::Zone {
+                self.generate_zone().await?
+            } else {
+                output_plan_generator.generate_plans(table, self.part, 
self.parts)?;
             }
         }
+        let output_plans = output_plan_generator.build();
+
+        // force the creation of the distributions and text pool to so it 
doesn't
+        // get charged to the first table
+        let start = Instant::now();
+        debug!("Creating distributions and text pool");
+        Distributions::static_default();
+        TextPool::get_or_init_default();
+        let elapsed = start.elapsed();
+        info!("Created static distributions and text pools in {elapsed:?}");
 
+        // Run
+        let runner = runner::PlanRunner::new(output_plans, self.num_threads);
+        runner.run().await?;
         info!("Generation complete!");
         Ok(())
     }
 
     async fn generate_zone(&self) -> io::Result<()> {
-        match self.format {
-            OutputFormat::Parquet => {
-                let args = zone_df::ZoneDfArgs {
-                    scale_factor: 1.0f64.max(self.scale_factor),
-                    output_dir: self.output_dir.clone(),
-                    parts: self.parts.unwrap_or(1),
-                    part: self.part.unwrap_or(1),
-                    parquet_row_group_bytes: self.parquet_row_group_bytes,
-                    parquet_compression: self.parquet_compression,
-                };
-                zone_df::generate_zone_parquet(args)
-                    .await
-                    .map_err(io::Error::other)
-            }
-            _ => Err(io::Error::new(
-                io::ErrorKind::InvalidInput,
-                "Zone table is only supported in --format=parquet (via 
DataFusion/S3).",
-            )),
-        }
-    }
-
-    define_generate!(
-        generate_vehicle,
-        Table::Vehicle,
-        VehicleGenerator,
-        VehicleTblSource,
-        VehicleCsvSource,
-        VehicleArrow
-    );
-    define_generate!(
-        generate_driver,
-        Table::Driver,
-        DriverGenerator,
-        DriverTblSource,
-        DriverCsvSource,
-        DriverArrow
-    );
-    define_generate!(
-        generate_customer,
-        Table::Customer,
-        CustomerGenerator,
-        CustomerTblSource,
-        CustomerCsvSource,
-        CustomerArrow
-    );
-    define_generate!(
-        generate_trip,
-        Table::Trip,
-        TripGenerator,
-        TripTblSource,
-        TripCsvSource,
-        TripArrow
-    );
-    define_generate!(
-        generate_building,
-        Table::Building,
-        BuildingGenerator,
-        BuildingTblSource,
-        BuildingCsvSource,
-        BuildingArrow
-    );
-
-    /// return the output filename for the given table
-    fn output_filename(&self, table: Table) -> String {
-        let extension = match self.format {
-            OutputFormat::Tbl => "tbl",
-            OutputFormat::Csv => "csv",
-            OutputFormat::Parquet => "parquet",
+        let format = match self.format {
+            OutputFormat::Parquet => zone::main::OutputFormat::Parquet,
+            OutputFormat::Csv => zone::main::OutputFormat::Csv,
+            OutputFormat::Tbl => zone::main::OutputFormat::Tbl,
         };
-        format!("{}.{extension}", table.name())
-    }
-
-    /// return a file for writing the given filename in the output directory
-    fn new_output_file(&self, filename: &str) -> io::Result<File> {
-        let path = self.output_dir.join(filename);
-        File::create(path)
-    }
 
-    /// Generates the output file from the sources
-    async fn go<I>(&self, filename: &str, sources: I) -> Result<(), io::Error>
-    where
-        I: Iterator<Item: Source> + 'static,
-    {
-        // Since generate_in_chunks already buffers, there is no need to 
buffer again
-        if self.stdout {
-            let sink = WriterSink::new(io::stdout());
-            generate_in_chunks(sink, sources, self.num_threads).await
-        } else {
-            let sink = WriterSink::new(self.new_output_file(filename)?);
-            generate_in_chunks(sink, sources, self.num_threads).await
-        }
-    }
-
-    /// Generates an output parquet file from the sources
-    async fn go_parquet<I>(&self, filename: &str, sources: I) -> Result<(), 
io::Error>
-    where
-        I: Iterator<Item: RecordBatchIterator> + 'static,
-    {
-        if self.stdout {
-            // write to stdout
-            let writer = BufWriter::with_capacity(32 * 1024 * 1024, 
io::stdout()); // 32MB buffer
-            generate_parquet(writer, sources, self.num_threads, 
self.parquet_compression).await
-        } else {
-            // write to a file
-            let file = self.new_output_file(filename)?;
-            let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); // 
32MB buffer
-            generate_parquet(writer, sources, self.num_threads, 
self.parquet_compression).await
-        }
+        zone::main::generate_zone(
+            format,
+            self.scale_factor,
+            self.output_dir.clone(),
+            self.parts,
+            self.part,
+            self.parquet_row_group_bytes,
+            self.parquet_compression,
+        )
+        .await
     }
 }
 
diff --git a/spatialbench-cli/src/output_plan.rs 
b/spatialbench-cli/src/output_plan.rs
new file mode 100644
index 0000000..a80533b
--- /dev/null
+++ b/spatialbench-cli/src/output_plan.rs
@@ -0,0 +1,267 @@
+//! * [`OutputLocation`]: where to output the generated data
+//! * [`OutputPlan`]: an output file that will be generated
+//! * [`OutputPlanGenerator`]: plans the output files to be generated
+
+use crate::plan::GenerationPlan;
+use crate::{OutputFormat, Table};
+use log::debug;
+use parquet::basic::Compression;
+use std::collections::HashSet;
+use std::fmt::{Display, Formatter};
+use std::io;
+use std::path::PathBuf;
+
+/// Where a partition will be output
+#[derive(Debug, Clone, PartialEq)]
+pub enum OutputLocation {
+    /// Output to a file
+    File(PathBuf),
+    /// Output to stdout
+    Stdout,
+}
+
+impl Display for OutputLocation {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            OutputLocation::File(path) => {
+                let Some(file) = path.file_name() else {
+                    return write!(f, "{}", path.display());
+                };
+                // Display the file name only, not the full path
+                write!(f, "{}", file.to_string_lossy())
+            }
+            OutputLocation::Stdout => write!(f, "Stdout"),
+        }
+    }
+}
+
+/// Describes an output partition (file) that will be generated
+#[derive(Debug, Clone, PartialEq)]
+pub struct OutputPlan {
+    /// The table
+    table: Table,
+    /// The scale factor
+    scale_factor: f64,
+    /// The output format (TODO don't depend back on something in main)
+    output_format: OutputFormat,
+    /// If the output is parquet, what compression level to use
+    parquet_compression: Compression,
+    /// Where to output
+    output_location: OutputLocation,
+    /// Plan for generating the table
+    generation_plan: GenerationPlan,
+}
+
+impl OutputPlan {
+    pub fn new(
+        table: Table,
+        scale_factor: f64,
+        output_format: OutputFormat,
+        parquet_compression: Compression,
+        output_location: OutputLocation,
+        generation_plan: GenerationPlan,
+    ) -> Self {
+        Self {
+            table,
+            scale_factor,
+            output_format,
+            parquet_compression,
+            output_location,
+            generation_plan,
+        }
+    }
+
+    /// Return the table this partition is for
+    pub fn table(&self) -> Table {
+        self.table
+    }
+
+    /// Return the scale factor for this partition
+    pub fn scale_factor(&self) -> f64 {
+        self.scale_factor
+    }
+
+    /// Return the output format for this partition
+    pub fn output_format(&self) -> OutputFormat {
+        self.output_format
+    }
+
+    /// return the output location
+    pub fn output_location(&self) -> &OutputLocation {
+        &self.output_location
+    }
+
+    /// Return the parquet compression level for this partition
+    pub fn parquet_compression(&self) -> Compression {
+        self.parquet_compression
+    }
+
+    /// Return the number of chunks part(ition) count (the number of data 
chunks
+    /// in the underlying generation plan)
+    pub fn chunk_count(&self) -> usize {
+        self.generation_plan.chunk_count()
+    }
+
+    /// return the generation plan for this partition
+    pub fn generation_plan(&self) -> &GenerationPlan {
+        &self.generation_plan
+    }
+}
+
+impl Display for OutputPlan {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "table {} (SF={}, {} chunks) to {}",
+            self.table,
+            self.scale_factor,
+            self.chunk_count(),
+            self.output_location
+        )
+    }
+}
+
+/// Plans the creation of output files
+pub struct OutputPlanGenerator {
+    format: OutputFormat,
+    scale_factor: f64,
+    parquet_compression: Compression,
+    parquet_row_group_bytes: i64,
+    stdout: bool,
+    output_dir: PathBuf,
+    /// The generated output plans
+    output_plans: Vec<OutputPlan>,
+    /// Output directories that have been created so far
+    /// (used to avoid creating the same directory multiple times)
+    created_directories: HashSet<PathBuf>,
+}
+
+impl OutputPlanGenerator {
+    pub fn new(
+        format: OutputFormat,
+        scale_factor: f64,
+        parquet_compression: Compression,
+        parquet_row_group_bytes: i64,
+        stdout: bool,
+        output_dir: PathBuf,
+    ) -> Self {
+        Self {
+            format,
+            scale_factor,
+            parquet_compression,
+            parquet_row_group_bytes,
+            stdout,
+            output_dir,
+            output_plans: Vec::new(),
+            created_directories: HashSet::new(),
+        }
+    }
+
+    /// Generate the output plans for the given table and partition options
+    pub fn generate_plans(
+        &mut self,
+        table: Table,
+        cli_part: Option<i32>,
+        cli_part_count: Option<i32>,
+    ) -> io::Result<()> {
+        // If the user specified only a part count, automatically create all
+        // partitions for the table
+        if let (None, Some(part_count)) = (cli_part, cli_part_count) {
+            if GenerationPlan::partitioned_table(table) {
+                debug!("Generating all partitions for table {table} with part 
count {part_count}");
+                for part in 1..=part_count {
+                    self.generate_plan_inner(table, Some(part), 
Some(part_count))?;
+                }
+            } else {
+                // there is only one partition for this table (e.g nation or 
region)
+                debug!("Generating single partition for table {table}");
+                self.generate_plan_inner(table, Some(1), Some(1))?;
+            }
+        } else {
+            self.generate_plan_inner(table, cli_part, cli_part_count)?;
+        }
+        Ok(())
+    }
+
+    fn generate_plan_inner(
+        &mut self,
+        table: Table,
+        cli_part: Option<i32>,
+        cli_part_count: Option<i32>,
+    ) -> io::Result<()> {
+        let generation_plan = GenerationPlan::try_new(
+            table,
+            self.format,
+            self.scale_factor,
+            cli_part,
+            cli_part_count,
+            self.parquet_row_group_bytes,
+        )
+        .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
+
+        let output_location = self.output_location(table, cli_part)?;
+
+        let plan = OutputPlan::new(
+            table,
+            self.scale_factor,
+            self.format,
+            self.parquet_compression,
+            output_location,
+            generation_plan,
+        );
+
+        self.output_plans.push(plan);
+        Ok(())
+    }
+
+    /// Return the output location for the given table
+    ///
+    /// * if part of is None, the output location is 
`{output_dir}/{table}.{extension}`
+    ///
+    /// * if part is Some(part), then the output location
+    ///   will be `{output_dir}/{table}/{table}table.{part}.{extension}`
+    ///   (e.g. orders/orders.1.tbl, orders/orders.2.tbl, etc.)
+    fn output_location(&mut self, table: Table, part: Option<i32>) -> 
io::Result<OutputLocation> {
+        if self.stdout {
+            Ok(OutputLocation::Stdout)
+        } else {
+            let extension = match self.format {
+                OutputFormat::Tbl => "tbl",
+                OutputFormat::Csv => "csv",
+                OutputFormat::Parquet => "parquet",
+            };
+
+            let mut output_path = self.output_dir.clone();
+            if let Some(part) = part {
+                // If a partition is specified, create a subdirectory for it
+                output_path.push(table.to_string());
+                self.ensure_directory_exists(&output_path)?;
+                output_path.push(format!("{table}.{part}.{extension}"));
+            } else {
+                // No partition specified, output to a single file
+                output_path.push(format!("{table}.{extension}"));
+            }
+            Ok(OutputLocation::File(output_path))
+        }
+    }
+
+    /// Ensure the output directory exists, creating it if necessary
+    fn ensure_directory_exists(&mut self, dir: &PathBuf) -> io::Result<()> {
+        if self.created_directories.contains(dir) {
+            return Ok(());
+        }
+        std::fs::create_dir_all(dir).map_err(|e| {
+            io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Error creating directory {}: {}", dir.display(), e),
+            )
+        })?;
+        self.created_directories.insert(dir.clone());
+        Ok(())
+    }
+
+    /// Return the output plans generated so far
+    pub fn build(self) -> Vec<OutputPlan> {
+        self.output_plans
+    }
+}
diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs
index 809814f..b95c053 100644
--- a/spatialbench-cli/src/plan.rs
+++ b/spatialbench-cli/src/plan.rs
@@ -1,4 +1,4 @@
-//! [`GenerationPlan`] that describes how to generate a Spatial Bench dataset.
+//! * [`GenerationPlan`]: how to generate a specific Spatial Bench dataset.
 
 use crate::{OutputFormat, Table};
 use log::debug;
@@ -8,7 +8,8 @@ use spatialbench::generators::{
 use std::fmt::Display;
 use std::ops::RangeInclusive;
 
-/// A list of generator "parts" (data generator chunks, not Spatial Bench 
parts)
+/// A list of generator "parts" (data generator chunks, not TPCH parts) for a
+/// single output file.
 ///
 /// Controls the parallelization and layout of Parquet files in 
`spatialbench-cli`.
 ///
@@ -49,7 +50,7 @@ use std::ops::RangeInclusive;
 /// let results = plan.into_iter().collect::<Vec<_>>();
 /// /// assert_eq!(results.len(), 1);
 /// ```
-#[derive(Debug)]
+#[derive(Debug, Clone, PartialEq)]
 pub struct GenerationPlan {
     /// Total number of parts to generate
     part_count: i32,
@@ -67,7 +68,7 @@ impl GenerationPlan {
     /// * `cli_part_count`: optional total number of parts, `--parts` CLI 
argument
     /// * `parquet_row_group_size`: optional parquet row group size, 
`--parquet-row-group-size` CLI argument
     pub fn try_new(
-        table: &Table,
+        table: Table,
         format: OutputFormat,
         scale_factor: f64,
         cli_part: Option<i32>,
@@ -100,11 +101,17 @@ impl GenerationPlan {
         }
     }
 
+    /// Return true if the tables is unpartitionable (not parameterized by part
+    /// count)
+    pub fn partitioned_table(table: Table) -> bool {
+        table != Table::Vehicle && table != Table::Driver && table != 
Table::Building
+    }
+
     /// Returns a new `GenerationPlan` when partitioning
     ///
     /// See [`GenerationPlan::try_new`] for argument documentation.
     fn try_new_with_parts(
-        table: &Table,
+        table: Table,
         format: OutputFormat,
         scale_factor: f64,
         cli_part: i32,
@@ -128,7 +135,7 @@ impl GenerationPlan {
 
         // These tables are so small they are not parameterized by part count,
         // so only a single part.
-        if table == &Table::Vehicle || table == &Table::Driver {
+        if !Self::partitioned_table(table) {
             return Ok(Self {
                 part_count: 1,
                 part_list: 1..=1,
@@ -169,7 +176,7 @@ impl GenerationPlan {
 
     /// Returns a new `GenerationPlan` when no partitioning is specified on 
the command line
     fn try_new_without_parts(
-        table: &Table,
+        table: Table,
         format: OutputFormat,
         scale_factor: f64,
         parquet_row_group_bytes: i64,
@@ -182,6 +189,11 @@ impl GenerationPlan {
             part_list: 1..=num_parts,
         })
     }
+
+    /// Return the number of part(ititions) this plan will generate
+    pub fn chunk_count(&self) -> usize {
+        self.part_list.clone().count()
+    }
 }
 
 /// Converts the `GenerationPlan` into an iterator of (part_number, num_parts)
@@ -218,7 +230,7 @@ struct OutputSize {
 
 impl OutputSize {
     pub fn new(
-        table: &Table,
+        table: Table,
         scale_factor: f64,
         format: OutputFormat,
         parquet_row_group_bytes: i64,
@@ -320,7 +332,7 @@ impl OutputSize {
         }
     }
 
-    fn row_count_for_table(table: &Table, scale_factor: f64) -> i64 {
+    fn row_count_for_table(table: Table, scale_factor: f64) -> i64 {
         //let (avg_row_size_bytes, row_count) = match table {
         match table {
             Table::Vehicle => 
VehicleGenerator::calculate_row_count(scale_factor, 1, 1),
@@ -514,7 +526,7 @@ mod tests {
                 .with_cli_part(1)
                 .with_cli_part_count(10)
                 // we expect there is still only one part
-                .assert(10, 1..=1)
+                .assert(1, 1..=1)
         }
 
         // #[test]
@@ -744,7 +756,7 @@ mod tests {
         /// expected number of parts and part numbers.
         fn assert(self, expected_part_count: i32, expected_part_numbers: 
RangeInclusive<i32>) {
             let plan = GenerationPlan::try_new(
-                &self.table,
+                self.table,
                 self.format,
                 self.scale_factor,
                 self.cli_part,
@@ -759,7 +771,7 @@ mod tests {
         /// Assert that creating a [`GenerationPlan`] returns the specified 
error
         fn assert_err(self, expected_error: &str) {
             let actual_error = GenerationPlan::try_new(
-                &self.table,
+                self.table,
                 self.format,
                 self.scale_factor,
                 self.cli_part,
diff --git a/spatialbench-cli/src/runner.rs b/spatialbench-cli/src/runner.rs
new file mode 100644
index 0000000..882d1cf
--- /dev/null
+++ b/spatialbench-cli/src/runner.rs
@@ -0,0 +1,355 @@
+//! [`PlanRunner`] for running [`OutputPlan`]s.
+
+use crate::csv::*;
+use crate::generate::{generate_in_chunks, Source};
+use crate::output_plan::{OutputLocation, OutputPlan};
+use crate::parquet::generate_parquet;
+use crate::tbl::*;
+use crate::{OutputFormat, Table, WriterSink};
+use log::{debug, info};
+use spatialbench::generators::{
+    BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, 
VehicleGenerator,
+};
+use spatialbench_arrow::{
+    BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow, 
VehicleArrow,
+};
+use std::io;
+use std::io::BufWriter;
+use tokio::task::{JoinError, JoinSet};
+
+/// Runs multiple [`OutputPlan`]s in parallel, managing the number of threads
+/// used to run them.
+#[derive(Debug)]
+pub struct PlanRunner {
+    plans: Vec<OutputPlan>,
+    num_threads: usize,
+}
+
+impl PlanRunner {
+    /// Create a new [`PlanRunner`] with the given plans and number of threads.
+    pub fn new(plans: Vec<OutputPlan>, num_threads: usize) -> Self {
+        Self { plans, num_threads }
+    }
+
+    /// Run all the plans in the runner.
+    pub async fn run(self) -> Result<(), io::Error> {
+        debug!(
+            "Running {} plans with {} threads...",
+            self.plans.len(),
+            self.num_threads
+        );
+        let Self {
+            mut plans,
+            num_threads,
+        } = self;
+
+        // Sort the plans by the number of parts so the largest are first
+        plans.sort_unstable_by(|a, b| {
+            let a_cnt = a.chunk_count();
+            let b_cnt = b.chunk_count();
+            a_cnt.cmp(&b_cnt)
+        });
+
+        // Do the actual work in parallel, using a worker queue
+        let mut worker_queue = WorkerQueue::new(num_threads);
+        while let Some(plan) = plans.pop() {
+            worker_queue.schedule_plan(plan).await?;
+        }
+        worker_queue.join_all().await
+    }
+}
+
+/// Manages worker tasks, limiting the number of total outstanding threads
+/// to some fixed number
+///
+/// The runner executes each plan with a number of threads equal to the
+/// number of parts in the plan, but no more than the total number of
+/// threads specified when creating the runner. If a plan does not need all
+/// the threads, the remaining threads are used to run other plans.
+///
+/// This is important to keep all cores busy for smaller tables that may not
+/// have sufficient parts to keep all threads busy (see [`GenerationPlan`]
+/// for more details), but not schedule more tasks than we have threads for.
+///
+/// Scheduling too many tasks requires more memory and leads to context
+/// switching overhead, which can slow down the generation process.
+///
+/// [`GenerationPlan`]: crate::plan::GenerationPlan
+struct WorkerQueue {
+    join_set: JoinSet<io::Result<usize>>,
+    /// Current number of threads available to commit
+    available_threads: usize,
+}
+
+impl WorkerQueue {
+    pub fn new(max_threads: usize) -> Self {
+        assert!(max_threads > 0);
+        Self {
+            join_set: JoinSet::new(),
+            available_threads: max_threads,
+        }
+    }
+
+    /// Spawns a task to run the plan with as many threads as possible
+    /// without exceeding the maximum number of threads.
+    ///
+    /// If there are no threads available, it will wait for one to finish
+    /// before spawning the new task.
+    ///
+    /// Note this algorithm does not guarantee that all threads are always 
busy,
+    /// but it should be good enough for most cases. For best thread 
utilization
+    /// spawn the largest plans first.
+    pub async fn schedule_plan(&mut self, plan: OutputPlan) -> io::Result<()> {
+        debug!("scheduling plan {plan}");
+        loop {
+            if self.available_threads == 0 {
+                debug!("no threads left, wait for one to finish");
+                let Some(result) = self.join_set.join_next().await else {
+                    return Err(io::Error::other(
+                        "Internal Error No more tasks to wait for, but had no 
threads",
+                    ));
+                };
+                self.available_threads += task_result(result)?;
+                continue; // look for threads again
+            }
+
+            // Check for any other jobs done so we can reuse their threads
+            if let Some(result) = self.join_set.try_join_next() {
+                self.available_threads += task_result(result)?;
+                continue;
+            }
+
+            debug_assert!(
+                self.available_threads > 0,
+                "should have at least one thread to continue"
+            );
+
+            // figure out how many threads to allocate to this plan. Each plan
+            // can use up to `part_count` threads.
+            let chunk_count = plan.chunk_count();
+
+            let num_plan_threads = self.available_threads.min(chunk_count);
+
+            // run the plan in a separate task, which returns the number of 
threads it used
+            debug!("Spawning plan {plan} with {num_plan_threads} threads");
+
+            self.join_set
+                .spawn(async move { run_plan(plan, num_plan_threads).await });
+            self.available_threads -= num_plan_threads;
+            return Ok(());
+        }
+    }
+
+    // Wait for all tasks to finish
+    pub async fn join_all(mut self) -> io::Result<()> {
+        debug!("Waiting for tasks to finish...");
+        while let Some(result) = self.join_set.join_next().await {
+            task_result(result)?;
+        }
+        debug!("Tasks finished.");
+        Ok(())
+    }
+}
+
+/// unwraps the result of a task and converts it to an `io::Result<T>`.
+fn task_result<T>(result: Result<io::Result<T>, JoinError>) -> io::Result<T> {
+    result.map_err(|e| io::Error::other(format!("Task Panic: {e}")))?
+}
+
+/// Run a single [`OutputPlan`]
+async fn run_plan(plan: OutputPlan, num_threads: usize) -> io::Result<usize> {
+    match plan.table() {
+        Table::Building => run_building_plan(plan, num_threads).await,
+        Table::Vehicle => run_vehicle_plan(plan, num_threads).await,
+        Table::Driver => run_driver_plan(plan, num_threads).await,
+        Table::Customer => run_customer_plan(plan, num_threads).await,
+        Table::Trip => run_trip_plan(plan, num_threads).await,
+        Table::Zone => todo!("Zone table is not supported in PlanRunner"),
+    }
+}
+
+/// Writes a CSV/TSV output from the sources
+async fn write_file<I>(plan: OutputPlan, num_threads: usize, sources: I) -> 
Result<(), io::Error>
+where
+    I: Iterator<Item: Source> + 'static,
+{
+    // Since generate_in_chunks already buffers, there is no need to buffer
+    // again (aka don't use BufWriter here)
+    match plan.output_location() {
+        OutputLocation::Stdout => {
+            let sink = WriterSink::new(io::stdout());
+            generate_in_chunks(sink, sources, num_threads).await
+        }
+        OutputLocation::File(path) => {
+            // if the output already exists, skip running
+            if path.exists() {
+                info!("{} already exists, skipping generation", 
path.display());
+                return Ok(());
+            }
+            // write to a temp file and then rename to avoid partial files
+            let temp_path = path.with_extension("inprogress");
+            let file = std::fs::File::create(&temp_path).map_err(|err| {
+                io::Error::other(format!("Failed to create {temp_path:?}: 
{err}"))
+            })?;
+            let sink = WriterSink::new(file);
+            generate_in_chunks(sink, sources, num_threads).await?;
+            // rename the temp file to the final path
+            std::fs::rename(&temp_path, path).map_err(|e| {
+                io::Error::other(format!(
+                    "Failed to rename {temp_path:?} to {path:?} file: {e}"
+                ))
+            })?;
+            Ok(())
+        }
+    }
+}
+
+/// Generates an output parquet file from the sources
+async fn write_parquet<I>(plan: OutputPlan, num_threads: usize, sources: I) -> 
Result<(), io::Error>
+where
+    I: Iterator<Item: RecordBatchIterator> + 'static,
+{
+    match plan.output_location() {
+        OutputLocation::Stdout => {
+            let writer = BufWriter::with_capacity(32 * 1024 * 1024, 
io::stdout()); // 32MB buffer
+            generate_parquet(writer, sources, num_threads, 
plan.parquet_compression()).await
+        }
+        OutputLocation::File(path) => {
+            // if the output already exists, skip running
+            if path.exists() {
+                info!("{} already exists, skipping generation", 
path.display());
+                return Ok(());
+            }
+            // write to a temp file and then rename to avoid partial files
+            let temp_path = path.with_extension("inprogress");
+            let file = std::fs::File::create(&temp_path).map_err(|err| {
+                io::Error::other(format!("Failed to create {temp_path:?}: 
{err}"))
+            })?;
+            let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); // 
32MB buffer
+            generate_parquet(writer, sources, num_threads, 
plan.parquet_compression()).await?;
+            // rename the temp file to the final path
+            std::fs::rename(&temp_path, path).map_err(|e| {
+                io::Error::other(format!(
+                    "Failed to rename {temp_path:?} to {path:?} file: {e}"
+                ))
+            })?;
+            Ok(())
+        }
+    }
+}
+
+/// macro to create a function for generating a part of a particular able
+///
+/// Arguments:
+/// $FUN_NAME: name of the function to create
+/// $GENERATOR: The generator type to use
+/// $TBL_SOURCE: The [`Source`] type to use for TBL format
+/// $CSV_SOURCE: The [`Source`] type to use for CSV format
+/// $PARQUET_SOURCE: The [`RecordBatchIterator`] type to use for Parquet format
+macro_rules! define_run {
+    ($FUN_NAME:ident, $GENERATOR:ident, $TBL_SOURCE:ty, $CSV_SOURCE:ty, 
$PARQUET_SOURCE:ty) => {
+        async fn $FUN_NAME(plan: OutputPlan, num_threads: usize) -> 
io::Result<usize> {
+            use crate::GenerationPlan;
+            let scale_factor = plan.scale_factor();
+            info!("Writing {plan} using {num_threads} threads");
+
+            /// These interior functions are used to tell the compiler that 
the lifetime is 'static
+            /// (when these were closures, the compiler could not figure out 
the lifetime) and
+            /// resulted in errors like this:
+            ///          let _ = join_set.spawn(async move {
+            ///                 |  _____________________^
+            ///              96 | |                 run_plan(plan, 
num_plan_threads).await
+            ///              97 | |             });
+            ///                 | |______________^ implementation of `FnOnce` 
is not general enough
+            fn tbl_sources(
+                generation_plan: &GenerationPlan,
+                scale_factor: f64,
+            ) -> impl Iterator<Item: Source> + 'static {
+                generation_plan
+                    .clone()
+                    .into_iter()
+                    .map(move |(part, num_parts)| 
$GENERATOR::new(scale_factor, part, num_parts))
+                    .map(<$TBL_SOURCE>::new)
+            }
+
+            fn csv_sources(
+                generation_plan: &GenerationPlan,
+                scale_factor: f64,
+            ) -> impl Iterator<Item: Source> + 'static {
+                generation_plan
+                    .clone()
+                    .into_iter()
+                    .map(move |(part, num_parts)| 
$GENERATOR::new(scale_factor, part, num_parts))
+                    .map(<$CSV_SOURCE>::new)
+            }
+
+            fn parquet_sources(
+                generation_plan: &GenerationPlan,
+                scale_factor: f64,
+            ) -> impl Iterator<Item: RecordBatchIterator> + 'static {
+                generation_plan
+                    .clone()
+                    .into_iter()
+                    .map(move |(part, num_parts)| 
$GENERATOR::new(scale_factor, part, num_parts))
+                    .map(<$PARQUET_SOURCE>::new)
+            }
+
+            // Dispach to the appropriate output format
+            match plan.output_format() {
+                OutputFormat::Tbl => {
+                    let gens = tbl_sources(plan.generation_plan(), 
scale_factor);
+                    write_file(plan, num_threads, gens).await?
+                }
+                OutputFormat::Csv => {
+                    let gens = csv_sources(plan.generation_plan(), 
scale_factor);
+                    write_file(plan, num_threads, gens).await?
+                }
+                OutputFormat::Parquet => {
+                    let gens = parquet_sources(plan.generation_plan(), 
scale_factor);
+                    write_parquet(plan, num_threads, gens).await?
+                }
+            };
+            Ok(num_threads)
+        }
+    };
+}
+
+define_run!(
+    run_trip_plan,
+    TripGenerator,
+    TripTblSource,
+    TripCsvSource,
+    TripArrow
+);
+
+define_run!(
+    run_building_plan,
+    BuildingGenerator,
+    BuildingTblSource,
+    BuildingCsvSource,
+    BuildingArrow
+);
+
+define_run!(
+    run_vehicle_plan,
+    VehicleGenerator,
+    VehicleTblSource,
+    VehicleCsvSource,
+    VehicleArrow
+);
+
+define_run!(
+    run_driver_plan,
+    DriverGenerator,
+    DriverTblSource,
+    DriverCsvSource,
+    DriverArrow
+);
+
+define_run!(
+    run_customer_plan,
+    CustomerGenerator,
+    CustomerTblSource,
+    CustomerCsvSource,
+    CustomerArrow
+);
diff --git a/spatialbench-cli/src/zone/config.rs 
b/spatialbench-cli/src/zone/config.rs
new file mode 100644
index 0000000..0bbfa32
--- /dev/null
+++ b/spatialbench-cli/src/zone/config.rs
@@ -0,0 +1,55 @@
+use anyhow::{anyhow, Result};
+use parquet::basic::Compression as ParquetCompression;
+use std::path::PathBuf;
+
+#[derive(Clone)]
+pub struct ZoneDfArgs {
+    pub scale_factor: f64,
+    pub output_dir: PathBuf,
+    pub parts: i32,
+    pub part: i32,
+    pub parquet_row_group_bytes: i64,
+    pub parquet_compression: ParquetCompression,
+}
+
+impl ZoneDfArgs {
+    pub fn new(
+        scale_factor: f64,
+        output_dir: PathBuf,
+        parts: i32,
+        part: i32,
+        parquet_row_group_bytes: i64,
+        parquet_compression: ParquetCompression,
+    ) -> Self {
+        Self {
+            scale_factor,
+            output_dir,
+            parts,
+            part,
+            parquet_row_group_bytes,
+            parquet_compression,
+        }
+    }
+
+    pub fn validate(&self) -> Result<()> {
+        if self.part < 1 || self.part > self.parts {
+            return Err(anyhow!(
+                "Invalid --part={} for --parts={}",
+                self.part,
+                self.parts
+            ));
+        }
+        Ok(())
+    }
+
+    pub fn output_filename(&self) -> PathBuf {
+        if self.parts > 1 {
+            // Create zone subdirectory and write parts within it
+            self.output_dir
+                .join("zone")
+                .join(format!("zone.{}.parquet", self.part))
+        } else {
+            self.output_dir.join("zone.parquet")
+        }
+    }
+}
diff --git a/spatialbench-cli/src/zone/datasource.rs 
b/spatialbench-cli/src/zone/datasource.rs
new file mode 100644
index 0000000..2d61b87
--- /dev/null
+++ b/spatialbench-cli/src/zone/datasource.rs
@@ -0,0 +1,95 @@
+use anyhow::Result;
+use datafusion::{
+    common::config::ConfigOptions,
+    execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
+    prelude::*,
+};
+use log::{debug, info};
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use url::Url;
+
+use super::stats::ZoneTableStats;
+
+const OVERTURE_RELEASE_DATE: &str = "2025-08-20.1";
+const HUGGINGFACE_URL: &str = "https://huggingface.co";;
+const COMMIT_HASH: &str = "67822daa2fbc0039681922f0d7fea4157f41d13f";
+const PARQUET_PART_COUNT: usize = 4;
+const PARQUET_UUID: &str = "c998b093-fa14-440c-98f0-bbdb2126ed22";
+
+pub struct ZoneDataSource {
+    runtime: Arc<RuntimeEnv>,
+}
+
+impl ZoneDataSource {
+    pub async fn new() -> Result<Self> {
+        let rt = Arc::new(RuntimeEnvBuilder::new().build()?);
+
+        let hf_store = HttpBuilder::new().with_url(HUGGINGFACE_URL).build()?;
+        let hf_url = Url::parse(HUGGINGFACE_URL)?;
+        rt.register_object_store(&hf_url, Arc::new(hf_store));
+
+        debug!("Registered HTTPS object store for huggingface.co");
+
+        Ok(Self { runtime: rt })
+    }
+
+    pub fn create_context(&self) -> Result<SessionContext> {
+        let mut cfg = ConfigOptions::new();
+
+        // Avoid parallelism to ensure ordering of source data
+        cfg.execution.target_partitions = 1;
+
+        let ctx =
+            SessionContext::new_with_config_rt(SessionConfig::from(cfg), 
Arc::clone(&self.runtime));
+
+        debug!("Created DataFusion session context");
+        Ok(ctx)
+    }
+
+    pub async fn load_zone_data(
+        &self,
+        ctx: &SessionContext,
+        scale_factor: f64,
+    ) -> Result<DataFrame> {
+        let parquet_urls = self.generate_parquet_urls();
+        info!(
+            "Reading {} Parquet parts from Hugging Face...",
+            parquet_urls.len()
+        );
+
+        let df = ctx
+            .read_parquet(parquet_urls, ParquetReadOptions::default())
+            .await?;
+
+        let stats = ZoneTableStats::new(scale_factor, 1);
+        let subtypes = stats.subtypes();
+
+        info!("Selected subtypes for SF {}: {:?}", scale_factor, subtypes);
+
+        let mut pred = col("subtype").eq(lit("__never__"));
+        for s in subtypes {
+            pred = pred.or(col("subtype").eq(lit(s)));
+        }
+
+        let df = df.filter(pred.and(col("is_land").eq(lit(true))))?;
+        info!("Applied subtype and is_land filters");
+
+        // Sort by 'id' to ensure deterministic ordering regardless of 
parallelism
+        // let df = df.sort(vec![col("id").sort(true, false)])?;
+        // info!("Sorted by id for deterministic ordering");
+
+        Ok(df)
+    }
+
+    fn generate_parquet_urls(&self) -> Vec<String> {
+        (0..PARQUET_PART_COUNT)
+            .map(|i| {
+                format!(
+                    
"https://huggingface.co/datasets/apache-sedona/spatialbench/resolve/{}/omf-division-area-{}/part-{:05}-{}-c000.zstd.parquet";,
+                    COMMIT_HASH, OVERTURE_RELEASE_DATE, i, PARQUET_UUID
+                )
+            })
+            .collect()
+    }
+}
diff --git a/spatialbench-cli/src/zone/main.rs 
b/spatialbench-cli/src/zone/main.rs
new file mode 100644
index 0000000..5afd899
--- /dev/null
+++ b/spatialbench-cli/src/zone/main.rs
@@ -0,0 +1,64 @@
+use log::info;
+use parquet::basic::Compression as ParquetCompression;
+use std::io;
+use std::path::PathBuf;
+
+use super::config::ZoneDfArgs;
+
+/// Generates zone table in the requested format
+pub async fn generate_zone(
+    format: OutputFormat,
+    scale_factor: f64,
+    output_dir: PathBuf,
+    parts: Option<i32>,
+    part: Option<i32>,
+    parquet_row_group_bytes: i64,
+    parquet_compression: ParquetCompression,
+) -> io::Result<()> {
+    match format {
+        OutputFormat::Parquet => {
+            let parts = parts.unwrap_or(1);
+
+            if let Some(part_num) = part {
+                // Single part mode - use LIMIT/OFFSET
+                info!("Generating part {} of {} for zone table", part_num, 
parts);
+                let args = ZoneDfArgs::new(
+                    1.0f64.max(scale_factor),
+                    output_dir,
+                    parts,
+                    part_num,
+                    parquet_row_group_bytes,
+                    parquet_compression,
+                );
+                super::generate_zone_parquet_single(args)
+                    .await
+                    .map_err(io::Error::other)
+            } else {
+                // Multi-part mode - collect once and partition in memory
+                info!("Generating all {} part(s) for zone table", parts);
+                let args = ZoneDfArgs::new(
+                    1.0f64.max(scale_factor),
+                    output_dir,
+                    parts,
+                    1, // dummy value, not used in multi mode
+                    parquet_row_group_bytes,
+                    parquet_compression,
+                );
+                super::generate_zone_parquet_multi(args)
+                    .await
+                    .map_err(io::Error::other)
+            }
+        }
+        _ => Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            "Zone table is only supported in --format=parquet.",
+        )),
+    }
+}
+
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum OutputFormat {
+    Tbl,
+    Csv,
+    Parquet,
+}
diff --git a/spatialbench-cli/src/zone/mod.rs b/spatialbench-cli/src/zone/mod.rs
new file mode 100644
index 0000000..f1b2f3c
--- /dev/null
+++ b/spatialbench-cli/src/zone/mod.rs
@@ -0,0 +1,88 @@
+//! Zone table generation module using DataFusion and remote Parquet files
+
+mod config;
+mod datasource;
+mod partition;
+mod stats;
+mod transform;
+mod writer;
+
+pub mod main;
+
+use anyhow::Result;
+use std::sync::Arc;
+
+pub use config::ZoneDfArgs;
+use datasource::ZoneDataSource;
+use partition::PartitionStrategy;
+use stats::ZoneTableStats;
+use transform::ZoneTransformer;
+use writer::ParquetWriter;
+
+/// Generate a single part using LIMIT/OFFSET on the dataframe
+pub async fn generate_zone_parquet_single(args: ZoneDfArgs) -> Result<()> {
+    args.validate()?;
+
+    let stats = ZoneTableStats::new(args.scale_factor, args.parts);
+    let datasource = ZoneDataSource::new().await?;
+    let ctx = datasource.create_context()?;
+
+    let df = datasource.load_zone_data(&ctx, args.scale_factor).await?;
+
+    let partition =
+        PartitionStrategy::calculate(stats.estimated_total_rows(), args.parts, 
args.part);
+
+    let df = partition.apply_to_dataframe(df)?;
+
+    let transformer = ZoneTransformer::new(partition.offset());
+    let df = transformer.transform(&ctx, df).await?;
+
+    // Get schema before collecting (which moves df)
+    let schema = Arc::new(transformer.arrow_schema(&df)?);
+    let batches = df.collect().await?;
+
+    let writer = ParquetWriter::new(&args, &stats, schema);
+    writer.write(&batches)?;
+
+    Ok(())
+}
+
+/// Generate all parts by collecting once and partitioning in memory
+pub async fn generate_zone_parquet_multi(args: ZoneDfArgs) -> Result<()> {
+    let stats = ZoneTableStats::new(args.scale_factor, args.parts);
+    let datasource = ZoneDataSource::new().await?;
+    let ctx = datasource.create_context()?;
+
+    let df = datasource.load_zone_data(&ctx, args.scale_factor).await?;
+
+    // Transform without offset (we'll adjust per-part later)
+    let transformer = ZoneTransformer::new(0);
+    let df = transformer.transform(&ctx, df).await?;
+
+    // Collect once
+    let schema = Arc::new(transformer.arrow_schema(&df)?);
+    let batches = df.collect().await?;
+
+    // Calculate total rows
+    let total_rows: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
+
+    // Write each part
+    for part in 1..=args.parts {
+        let partition = PartitionStrategy::calculate(total_rows, args.parts, 
part);
+        let partitioned_batches = partition.apply_to_batches(&batches)?;
+
+        let part_args = ZoneDfArgs::new(
+            args.scale_factor,
+            args.output_dir.clone(),
+            args.parts,
+            part,
+            args.parquet_row_group_bytes,
+            args.parquet_compression,
+        );
+
+        let writer = ParquetWriter::new(&part_args, &stats, schema.clone());
+        writer.write(&partitioned_batches)?;
+    }
+
+    Ok(())
+}
diff --git a/spatialbench-cli/src/zone/partition.rs 
b/spatialbench-cli/src/zone/partition.rs
new file mode 100644
index 0000000..a27f656
--- /dev/null
+++ b/spatialbench-cli/src/zone/partition.rs
@@ -0,0 +1,94 @@
+use arrow_array::RecordBatch;
+use datafusion::prelude::*;
+use log::info;
+
+pub struct PartitionStrategy {
+    offset: i64,
+    limit: i64,
+}
+
+impl PartitionStrategy {
+    pub fn calculate(total_rows: i64, parts: i32, part: i32) -> Self {
+        let parts = parts as i64;
+        let i = (part as i64) - 1;
+
+        let base = total_rows / parts;
+        let rem = total_rows % parts;
+
+        let limit = base + if i < rem { 1 } else { 0 };
+        let offset = i * base + std::cmp::min(i, rem);
+
+        info!(
+            "Partition: total={}, parts={}, part={}, offset={}, limit={}",
+            total_rows, parts, part, offset, limit
+        );
+
+        Self { offset, limit }
+    }
+
+    pub fn offset(&self) -> i64 {
+        self.offset
+    }
+
+    pub fn apply_to_dataframe(&self, df: DataFrame) -> 
datafusion::common::Result<DataFrame> {
+        df.limit(self.offset as usize, Some(self.limit as usize))
+    }
+
+    /// Apply partition to already-collected batches
+    pub fn apply_to_batches(&self, batches: &[RecordBatch]) -> 
anyhow::Result<Vec<RecordBatch>> {
+        let mut result = Vec::new();
+        let mut current_offset = 0i64;
+        let end_offset = self.offset + self.limit;
+
+        for batch in batches {
+            let batch_rows = batch.num_rows() as i64;
+            let batch_end = current_offset + batch_rows;
+
+            if batch_end <= self.offset || current_offset >= end_offset {
+                current_offset = batch_end;
+                continue;
+            }
+
+            let start_in_batch = 
(self.offset.saturating_sub(current_offset)).max(0) as usize;
+            let end_in_batch = ((end_offset - current_offset).min(batch_rows)) 
as usize;
+            let length = end_in_batch - start_in_batch;
+
+            if length > 0 {
+                let sliced = batch.slice(start_in_batch, length);
+                result.push(sliced);
+            }
+
+            current_offset = batch_end;
+        }
+
+        Ok(result)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_partition_distribution() {
+        let total_rows = 100i64;
+        let parts = 3;
+
+        let mut collected_rows = Vec::new();
+        let mut collected_offsets = Vec::new();
+
+        for part in 1..=parts {
+            let strategy = PartitionStrategy::calculate(total_rows, parts, 
part);
+            collected_rows.push(strategy.limit);
+            collected_offsets.push(strategy.offset);
+        }
+
+        assert_eq!(collected_rows.iter().sum::<i64>(), total_rows);
+        assert_eq!(collected_offsets[0], 0);
+
+        for i in 1..parts as usize {
+            let expected_offset = collected_offsets[i - 1] + collected_rows[i 
- 1];
+            assert_eq!(collected_offsets[i], expected_offset);
+        }
+    }
+}
diff --git a/spatialbench-cli/src/zone/stats.rs 
b/spatialbench-cli/src/zone/stats.rs
new file mode 100644
index 0000000..ce5e5d5
--- /dev/null
+++ b/spatialbench-cli/src/zone/stats.rs
@@ -0,0 +1,161 @@
+use log::debug;
+
+const SUBTYPE_ROWS: &[(&str, i64)] = &[
+    ("microhood", 74797),
+    ("macrohood", 42619),
+    ("neighborhood", 298615),
+    ("county", 38679),
+    ("localadmin", 19007),
+    ("locality", 555834),
+    ("region", 3905),
+    ("dependency", 53),
+    ("country", 219),
+];
+
+pub struct ZoneTableStats {
+    scale_factor: f64,
+    size_gb: f64,
+    total_rows: i64,
+}
+
+impl ZoneTableStats {
+    pub fn new(scale_factor: f64, parts: i32) -> Self {
+        let (mut size_gb, mut total_rows) = Self::base_stats(scale_factor);
+
+        if scale_factor <= 1.0 && parts > 1 {
+            (size_gb, total_rows) = Self::base_stats(scale_factor / parts as 
f64);
+        }
+
+        debug!(
+            "Stats: size_gb={}, total_rows={} for SF={}",
+            size_gb, total_rows, scale_factor
+        );
+
+        Self {
+            scale_factor,
+            size_gb,
+            total_rows,
+        }
+    }
+
+    fn base_stats(sf: f64) -> (f64, i64) {
+        if sf < 1.0 {
+            (0.92 * sf, (156_095.0 * sf).ceil() as i64)
+        } else if sf < 10.0 {
+            (1.42, 156_095)
+        } else if sf < 100.0 {
+            (2.09, 454_710)
+        } else if sf < 1000.0 {
+            (5.68, 1_033_456)
+        } else {
+            (6.13, 1_033_675)
+        }
+    }
+
+    pub fn subtypes(&self) -> Vec<&'static str> {
+        let mut v = vec!["microhood", "macrohood", "county"];
+        if self.scale_factor >= 10.0 {
+            v.push("neighborhood");
+        }
+        if self.scale_factor >= 100.0 {
+            v.extend_from_slice(&["localadmin", "locality", "region", 
"dependency"]);
+        }
+        if self.scale_factor >= 1000.0 {
+            v.push("country");
+        }
+        v
+    }
+
+    pub fn estimated_total_rows(&self) -> i64 {
+        let mut total = 0i64;
+        for subtype in self.subtypes() {
+            total += SUBTYPE_ROWS
+                .iter()
+                .find(|(name, _)| *name == subtype)
+                .map(|(_, rows)| *rows)
+                .unwrap_or(0);
+        }
+
+        if self.scale_factor < 1.0 {
+            (total as f64 * self.scale_factor).ceil() as i64
+        } else {
+            total
+        }
+    }
+
+    pub fn compute_rows_per_group(&self, target_bytes: i64, default_bytes: 
i64) -> usize {
+        let total_bytes = self.size_gb * 1024.0 * 1024.0 * 1024.0;
+        let bytes_per_row = total_bytes / self.total_rows as f64;
+
+        let effective_target = if target_bytes <= 0 {
+            default_bytes
+        } else {
+            target_bytes
+        };
+
+        debug!(
+            "Stats: {:.2} GB, {} rows, {:.2} bytes/row, target: {} bytes",
+            self.size_gb, self.total_rows, bytes_per_row, effective_target
+        );
+
+        let est = (effective_target as f64 / bytes_per_row).floor();
+        est.clamp(1000.0, 32767.0) as usize
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_subtypes_for_different_scale_factors() {
+        let sf_01_stats = ZoneTableStats::new(0.1, 1);
+        assert_eq!(
+            sf_01_stats.subtypes(),
+            vec!["microhood", "macrohood", "county"]
+        );
+
+        let sf_10_stats = ZoneTableStats::new(10.0, 1);
+        assert_eq!(
+            sf_10_stats.subtypes(),
+            vec!["microhood", "macrohood", "county", "neighborhood"]
+        );
+
+        let sf_100_stats = ZoneTableStats::new(100.0, 1);
+        assert!(sf_100_stats.subtypes().contains(&"localadmin"));
+        assert!(sf_100_stats.subtypes().contains(&"locality"));
+
+        let sf_1000_stats = ZoneTableStats::new(1000.0, 1);
+        assert!(sf_1000_stats.subtypes().contains(&"country"));
+    }
+
+    #[test]
+    fn test_rows_per_group_bounds() {
+        let stats = ZoneTableStats::new(1.0, 1);
+
+        let rows_per_group_tiny = stats.compute_rows_per_group(1_000_000, 128 
* 1024 * 1024);
+        assert!(rows_per_group_tiny >= 1000);
+
+        let tiny_stats = ZoneTableStats {
+            scale_factor: 0.001,
+            size_gb: 1000.0,
+            total_rows: 1000,
+        };
+        let rows_per_group_huge = tiny_stats.compute_rows_per_group(1, 128 * 
1024 * 1024);
+        assert!(rows_per_group_huge <= 32767);
+    }
+
+    #[test]
+    fn test_estimated_rows_scaling_consistency() {
+        let base_stats = ZoneTableStats::new(1.0, 1);
+        let half_stats = ZoneTableStats::new(0.5, 1);
+        let quarter_stats = ZoneTableStats::new(0.25, 1);
+
+        let base_rows = base_stats.estimated_total_rows() as f64;
+        let half_rows = half_stats.estimated_total_rows() as f64;
+        let quarter_rows = quarter_stats.estimated_total_rows() as f64;
+
+        assert!((half_rows - (base_rows * 0.5)).abs() < 1.0);
+        assert!((quarter_rows - (base_rows * 0.25)).abs() < 1.0);
+    }
+}
diff --git a/spatialbench-cli/src/zone/transform.rs 
b/spatialbench-cli/src/zone/transform.rs
new file mode 100644
index 0000000..74d9365
--- /dev/null
+++ b/spatialbench-cli/src/zone/transform.rs
@@ -0,0 +1,50 @@
+use anyhow::Result;
+use arrow_schema::Schema;
+use datafusion::{prelude::*, sql::TableReference};
+use log::{debug, info};
+
+pub struct ZoneTransformer {
+    offset: i64,
+}
+
+impl ZoneTransformer {
+    pub fn new(offset: i64) -> Self {
+        Self { offset }
+    }
+
+    pub async fn transform(&self, ctx: &SessionContext, df: DataFrame) -> 
Result<DataFrame> {
+        ctx.register_table(TableReference::bare("zone_filtered"), 
df.into_view())?;
+        debug!("Registered filtered data as 'zone_filtered' table");
+
+        let sql = format!(
+            r#"
+            SELECT
+              CAST(ROW_NUMBER() OVER (ORDER BY id) + {} AS BIGINT) AS 
z_zonekey,
+              COALESCE(id, '')            AS z_gersid,
+              COALESCE(country, '')       AS z_country,
+              COALESCE(region,  '')       AS z_region,
+              COALESCE(names.primary, '') AS z_name,
+              COALESCE(subtype, '')       AS z_subtype,
+              geometry                    AS z_boundary
+            FROM zone_filtered
+            "#,
+            self.offset
+        );
+
+        debug!("Executing SQL transformation with offset: {}", self.offset);
+        let df = ctx.sql(&sql).await?;
+        info!("SQL transformation completed successfully");
+
+        Ok(df)
+    }
+
+    pub fn arrow_schema(&self, df: &DataFrame) -> Result<Schema> {
+        Ok(Schema::new(
+            df.schema()
+                .fields()
+                .iter()
+                .map(|f| f.as_ref().clone())
+                .collect::<Vec<_>>(),
+        ))
+    }
+}
diff --git a/spatialbench-cli/src/zone/writer.rs 
b/spatialbench-cli/src/zone/writer.rs
new file mode 100644
index 0000000..7aff003
--- /dev/null
+++ b/spatialbench-cli/src/zone/writer.rs
@@ -0,0 +1,94 @@
+use anyhow::Result;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use log::{debug, info};
+use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
+use std::{path::PathBuf, sync::Arc, time::Instant};
+
+use super::config::ZoneDfArgs;
+use super::stats::ZoneTableStats;
+
+pub struct ParquetWriter {
+    output_path: PathBuf,
+    schema: SchemaRef,
+    props: WriterProperties,
+    args: ZoneDfArgs,
+}
+
+impl ParquetWriter {
+    pub fn new(args: &ZoneDfArgs, stats: &ZoneTableStats, schema: SchemaRef) 
-> Self {
+        let rows_per_group =
+            stats.compute_rows_per_group(args.parquet_row_group_bytes, 128 * 
1024 * 1024);
+
+        let props = WriterProperties::builder()
+            .set_compression(args.parquet_compression)
+            .set_max_row_group_size(rows_per_group)
+            .build();
+
+        debug!("Using row group size: {} rows", rows_per_group);
+
+        Self {
+            output_path: args.output_filename(),
+            schema,
+            props,
+            args: args.clone(),
+        }
+    }
+
+    pub fn write(&self, batches: &[RecordBatch]) -> Result<()> {
+        // Create parent directory of output file (handles both zone/ 
subdirectory and base dir)
+        let parent_dir = self
+            .output_path
+            .parent()
+            .ok_or_else(|| anyhow::anyhow!("Invalid output path: {:?}", 
self.output_path))?;
+
+        std::fs::create_dir_all(parent_dir)?;
+        debug!("Created output directory: {:?}", parent_dir);
+
+        // Check if file already exists
+        if self.output_path.exists() {
+            info!(
+                "{} already exists, skipping generation",
+                self.output_path.display()
+            );
+            return Ok(());
+        }
+
+        // Write to temp file first
+        let temp_path = self.output_path.with_extension("inprogress");
+        let t0 = Instant::now();
+        let file = std::fs::File::create(&temp_path)?;
+        let mut writer =
+            ArrowWriter::try_new(file, Arc::clone(&self.schema), 
Some(self.props.clone()))?;
+
+        for batch in batches {
+            writer.write(batch)?;
+        }
+
+        writer.close()?;
+
+        // Rename temp file to final output
+        std::fs::rename(&temp_path, &self.output_path).map_err(|e| {
+            anyhow::anyhow!(
+                "Failed to rename {:?} to {:?}: {}",
+                temp_path,
+                self.output_path,
+                e
+            )
+        })?;
+
+        let duration = t0.elapsed();
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+
+        info!(
+            "Zone -> {} (part {}/{}). write={:?}, total_rows={}",
+            self.output_path.display(),
+            self.args.part,
+            self.args.parts,
+            duration,
+            total_rows
+        );
+
+        Ok(())
+    }
+}
diff --git a/spatialbench-cli/src/zone_df.rs b/spatialbench-cli/src/zone_df.rs
deleted file mode 100644
index eb75805..0000000
--- a/spatialbench-cli/src/zone_df.rs
+++ /dev/null
@@ -1,503 +0,0 @@
-use std::{path::PathBuf, sync::Arc, time::Instant};
-
-use anyhow::{anyhow, Result};
-use arrow_array::RecordBatch;
-use arrow_schema::{Schema, SchemaRef};
-use datafusion::{
-    common::config::ConfigOptions, execution::runtime_env::RuntimeEnvBuilder, 
prelude::*,
-    sql::TableReference,
-};
-
-use crate::plan::DEFAULT_PARQUET_ROW_GROUP_BYTES;
-use datafusion::execution::runtime_env::RuntimeEnv;
-use log::{debug, info};
-use object_store::http::HttpBuilder;
-use parquet::{
-    arrow::ArrowWriter, basic::Compression as ParquetCompression,
-    file::properties::WriterProperties,
-};
-use url::Url;
-
-const OVERTURE_RELEASE_DATE: &str = "2025-08-20.1";
-const HUGGINGFACE_URL: &str = "https://huggingface.co";;
-const COMMIT_HASH: &str = "67822daa2fbc0039681922f0d7fea4157f41d13f";
-
-fn subtypes_for_scale_factor(sf: f64) -> Vec<&'static str> {
-    let mut v = vec!["microhood", "macrohood", "county"];
-    if sf >= 10.0 {
-        v.push("neighborhood");
-    }
-    if sf >= 100.0 {
-        v.extend_from_slice(&["localadmin", "locality", "region", 
"dependency"]);
-    }
-    if sf >= 1000.0 {
-        v.push("country");
-    }
-    v
-}
-
-fn estimated_total_rows_for_sf(sf: f64) -> i64 {
-    let mut total = 0i64;
-    for s in subtypes_for_scale_factor(sf) {
-        total += match s {
-            "microhood" => 74797,
-            "macrohood" => 42619,
-            "neighborhood" => 298615,
-            "county" => 38679,
-            "localadmin" => 19007,
-            "locality" => 555834,
-            "region" => 3905,
-            "dependency" => 53,
-            "country" => 219,
-            _ => 0,
-        };
-    }
-    if sf < 1.0 {
-        (total as f64 * sf).ceil() as i64
-    } else {
-        total
-    }
-}
-
-fn get_zone_table_stats(sf: f64) -> (f64, i64) {
-    // Returns (size_in_gb, total_rows) for the given scale factor
-    if sf < 1.0 {
-        (0.92 * sf, (156_095.0 * sf).ceil() as i64)
-    } else if sf < 10.0 {
-        (1.42, 156_095)
-    } else if sf < 100.0 {
-        (2.09, 454_710)
-    } else if sf < 1000.0 {
-        (5.68, 1_033_456)
-    } else {
-        (6.13, 1_033_675)
-    }
-}
-
-fn compute_rows_per_group_from_stats(size_gb: f64, total_rows: i64, 
target_bytes: i64) -> usize {
-    let total_bytes = size_gb * 1024.0 * 1024.0 * 1024.0; // Convert GB to 
bytes
-    let bytes_per_row = total_bytes / total_rows as f64;
-
-    // Use default if target_bytes is not specified or invalid
-    let effective_target = if target_bytes <= 0 {
-        DEFAULT_PARQUET_ROW_GROUP_BYTES
-    } else {
-        target_bytes
-    };
-
-    debug!(
-        "Using hardcoded stats: {:.2} GB, {} rows, {:.2} bytes/row, target: {} 
bytes",
-        size_gb, total_rows, bytes_per_row, effective_target
-    );
-
-    let est = (effective_target as f64 / bytes_per_row).floor();
-    // Keep RG count <= 32k, but avoid too-tiny RGs
-    est.clamp(1000.0, 32767.0) as usize
-}
-
-fn writer_props_with_rowgroup(comp: ParquetCompression, rows_per_group: usize) 
-> WriterProperties {
-    WriterProperties::builder()
-        .set_compression(comp)
-        .set_max_row_group_size(rows_per_group)
-        .build()
-}
-
-fn write_parquet_with_rowgroup_bytes(
-    out_path: &PathBuf,
-    schema: SchemaRef,
-    all_batches: Vec<RecordBatch>,
-    target_rowgroup_bytes: i64,
-    comp: ParquetCompression,
-    scale_factor: f64,
-    parts: i32,
-) -> Result<()> {
-    let (mut size_gb, mut total_rows) = get_zone_table_stats(scale_factor);
-
-    // Use linear scaling stats for SF <= 1.0 with parts > 1
-    if scale_factor <= 1.0 && parts > 1 {
-        (size_gb, total_rows) = get_zone_table_stats(scale_factor / parts as 
f64);
-    }
-
-    debug!(
-        "size_gb={}, total_rows={} for scale_factor={}",
-        size_gb, total_rows, scale_factor
-    );
-    let rows_per_group =
-        compute_rows_per_group_from_stats(size_gb, total_rows, 
target_rowgroup_bytes);
-    let props = writer_props_with_rowgroup(comp, rows_per_group);
-
-    debug!(
-        "Using row group size: {} rows (based on hardcoded stats)",
-        rows_per_group
-    );
-
-    let mut writer = ArrowWriter::try_new(std::fs::File::create(out_path)?, 
schema, Some(props))?;
-
-    for batch in all_batches {
-        writer.write(&batch)?;
-    }
-    writer.close()?;
-    Ok(())
-}
-
-#[derive(Clone)]
-pub struct ZoneDfArgs {
-    pub scale_factor: f64,
-    pub output_dir: PathBuf,
-    pub parts: i32,
-    pub part: i32,
-    pub parquet_row_group_bytes: i64,
-    pub parquet_compression: ParquetCompression,
-}
-
-impl ZoneDfArgs {
-    fn output_filename(&self) -> PathBuf {
-        let filename = "zone.parquet".to_string();
-        self.output_dir.join(filename)
-    }
-}
-
-pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> {
-    if args.part < 1 || args.part > args.parts {
-        return Err(anyhow!(
-            "Invalid --part={} for --parts={}",
-            args.part,
-            args.parts
-        ));
-    }
-
-    info!(
-        "Starting zone parquet generation with scale factor {}",
-        args.scale_factor
-    );
-    debug!("Zone generation args: parts={}, part={}, output_dir={:?}, 
row_group_bytes={}, compression={:?}",
-           args.parts, args.part, args.output_dir, 
args.parquet_row_group_bytes, args.parquet_compression);
-
-    let subtypes = subtypes_for_scale_factor(args.scale_factor);
-    info!(
-        "Selected subtypes for SF {}: {:?}",
-        args.scale_factor, subtypes
-    );
-
-    let estimated_rows = estimated_total_rows_for_sf(args.scale_factor);
-    info!(
-        "Estimated total rows for SF {}: {}",
-        args.scale_factor, estimated_rows
-    );
-
-    let mut cfg = ConfigOptions::new();
-    cfg.execution.target_partitions = 1;
-    debug!("Created DataFusion config with target_partitions=1");
-
-    let rt: Arc<RuntimeEnv> = Arc::new(RuntimeEnvBuilder::new().build()?);
-    debug!("Built DataFusion runtime environment");
-
-    // Register HTTPS object store for Hugging Face
-    let hf_store = HttpBuilder::new().with_url(HUGGINGFACE_URL).build()?;
-    let hf_url = Url::parse(HUGGINGFACE_URL)?;
-    rt.register_object_store(&hf_url, Arc::new(hf_store));
-    debug!("Registered HTTPS object store for huggingface.co");
-
-    let ctx = SessionContext::new_with_config_rt(SessionConfig::from(cfg), rt);
-    debug!("Created DataFusion session context");
-
-    // Parquet parts from Hugging Face (programmatically generated)
-    const PARQUET_PART_COUNT: usize = 4;
-    const PARQUET_UUID: &str = "c998b093-fa14-440c-98f0-bbdb2126ed22";
-    let parquet_urls: Vec<String> = (0..PARQUET_PART_COUNT)
-        .map(|i| format!(
-            
"https://huggingface.co/datasets/apache-sedona/spatialbench/resolve/{}/omf-division-area-{}/part-{i:05}-{uuid}-c000.zstd.parquet";,
-            COMMIT_HASH,
-            OVERTURE_RELEASE_DATE,
-            i = i,
-            uuid = PARQUET_UUID
-        ))
-        .collect();
-
-    info!(
-        "Reading {} Parquet parts from Hugging Face...",
-        parquet_urls.len()
-    );
-
-    let t_read_start = Instant::now();
-    let mut df = ctx
-        .read_parquet(parquet_urls, ParquetReadOptions::default())
-        .await?;
-    let read_dur = t_read_start.elapsed();
-    info!("Successfully read HF parquet data in {:?}", read_dur);
-
-    // Build filter predicate
-    debug!("Building filter predicate for subtypes: {:?}", subtypes);
-    let mut pred = col("subtype").eq(lit("__never__"));
-    for s in subtypes_for_scale_factor(args.scale_factor) {
-        pred = pred.or(col("subtype").eq(lit(s)));
-    }
-    df = df.filter(pred.and(col("is_land").eq(lit(true))))?;
-    info!("Applied subtype and is_land filters");
-
-    // df = df.sort(vec![col("id").sort(true, true)])?;
-    // debug!("Applied sorting by id");
-
-    let total = estimated_total_rows_for_sf(args.scale_factor);
-    let i = (args.part as i64) - 1; // 0-based part index
-    let parts = args.parts as i64;
-
-    let base = total / parts;
-    let rem = total % parts;
-
-    // first `rem` parts get one extra row
-    let rows_this = base + if i < rem { 1 } else { 0 };
-    let offset = i * base + std::cmp::min(i, rem);
-
-    info!(
-        "Partitioning data: total_rows={}, parts={}, base={}, rem={}, 
this_part_rows={}, offset={}",
-        total, parts, base, rem, rows_this, offset
-    );
-
-    df = df.limit(offset as usize, Some(rows_this as usize))?;
-    debug!("Applied limit with offset={}, rows={}", offset, rows_this);
-
-    ctx.register_table(TableReference::bare("zone_filtered"), df.into_view())?;
-    debug!("Registered filtered data as 'zone_filtered' table");
-
-    let sql = format!(
-        r#"
-        SELECT
-          CAST(ROW_NUMBER() OVER (ORDER BY id) + {offset} AS BIGINT) AS 
z_zonekey,
-          COALESCE(id, '')            AS z_gersid,
-          COALESCE(country, '')       AS z_country,
-          COALESCE(region,  '')       AS z_region,
-          COALESCE(names.primary, '') AS z_name,
-          COALESCE(subtype, '')       AS z_subtype,
-          geometry                    AS z_boundary
-        FROM zone_filtered
-        "#
-    );
-    debug!("Executing SQL transformation with offset: {}", offset);
-    let df2 = ctx.sql(&sql).await?;
-    info!("SQL transformation completed successfully");
-
-    let t0 = Instant::now();
-    info!("Starting data collection...");
-    let batches = df2.clone().collect().await?;
-    let collect_dur = t0.elapsed();
-
-    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
-    info!(
-        "Collected {} record batches with {} total rows in {:?}",
-        batches.len(),
-        total_rows,
-        collect_dur
-    );
-
-    std::fs::create_dir_all(&args.output_dir)?;
-    debug!("Created output directory: {:?}", args.output_dir);
-
-    let out = args.output_filename();
-    info!("Writing output to: {}", out.display());
-
-    debug!(
-        "Created parquet writer properties with compression: {:?}",
-        args.parquet_compression
-    );
-
-    // Convert DFSchema to Arrow Schema
-    let schema = Arc::new(Schema::new(
-        df2.schema()
-            .fields()
-            .iter()
-            .map(|f| f.as_ref().clone())
-            .collect::<Vec<_>>(),
-    ));
-    debug!(
-        "Converted DataFusion schema to Arrow schema with {} fields",
-        schema.fields().len()
-    );
-
-    let t1 = Instant::now();
-    info!(
-        "Starting parquet file write with row group size: {} bytes",
-        args.parquet_row_group_bytes
-    );
-    write_parquet_with_rowgroup_bytes(
-        &out,
-        schema,
-        batches,
-        args.parquet_row_group_bytes,
-        args.parquet_compression,
-        args.scale_factor,
-        args.parts,
-    )?;
-    let write_dur = t1.elapsed();
-
-    info!(
-        "Zone -> {} (part {}/{}). collect={:?}, write={:?}, total_rows={}",
-        out.display(),
-        args.part,
-        args.parts,
-        collect_dur,
-        write_dur,
-        total_rows
-    );
-
-    Ok(())
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use parquet::basic::Compression;
-    use tempfile::TempDir;
-
-    fn create_test_args(scale_factor: f64, temp_dir: &TempDir) -> ZoneDfArgs {
-        ZoneDfArgs {
-            scale_factor,
-            output_dir: temp_dir.path().to_path_buf(),
-            parts: 1,
-            part: 1,
-            parquet_row_group_bytes: DEFAULT_PARQUET_ROW_GROUP_BYTES,
-            parquet_compression: Compression::SNAPPY,
-        }
-    }
-
-    #[tokio::test]
-    async fn test_zone_generation_invalid_part() {
-        let temp_dir = TempDir::new().unwrap();
-        let mut args = create_test_args(1.0, &temp_dir);
-        args.parts = 2;
-        args.part = 3; // Invalid part number
-
-        let result = generate_zone_parquet(args).await;
-        assert!(result.is_err(), "Should fail with invalid part number");
-    }
-
-    #[tokio::test]
-    async fn test_subtypes_for_different_scale_factors() {
-        // Test scale factor categorization
-        let sf_01_subtypes = subtypes_for_scale_factor(0.1);
-        assert_eq!(sf_01_subtypes, vec!["microhood", "macrohood", "county"]);
-
-        let sf_10_subtypes = subtypes_for_scale_factor(10.0);
-        assert_eq!(
-            sf_10_subtypes,
-            vec!["microhood", "macrohood", "county", "neighborhood"]
-        );
-
-        let sf_100_subtypes = subtypes_for_scale_factor(100.0);
-        assert!(sf_100_subtypes.contains(&"localadmin"));
-        assert!(sf_100_subtypes.contains(&"locality"));
-
-        let sf_1000_subtypes = subtypes_for_scale_factor(1000.0);
-        assert!(sf_1000_subtypes.contains(&"country"));
-    }
-
-    #[test]
-    fn test_partition_distribution_logic() {
-        // Test the mathematical logic for distributing rows across partitions
-        let total_rows = 100i64;
-        let parts = 3i64;
-
-        let mut collected_rows = Vec::new();
-        let mut collected_offsets = Vec::new();
-
-        // Simulate the partition calculation for each part
-        for part_idx in 0..parts {
-            let i = part_idx;
-            let base = total_rows / parts;
-            let rem = total_rows % parts;
-            let rows_this = base + if i < rem { 1 } else { 0 };
-            let offset = i * base + std::cmp::min(i, rem);
-
-            collected_rows.push(rows_this);
-            collected_offsets.push(offset);
-        }
-
-        // Verify partitioning logic
-        assert_eq!(collected_rows.iter().sum::<i64>(), total_rows); // All 
rows accounted for
-        assert_eq!(collected_offsets[0], 0); // First partition starts at 0
-
-        // Verify no gaps or overlaps between partitions
-        for i in 1..parts as usize {
-            let expected_offset = collected_offsets[i - 1] + collected_rows[i 
- 1];
-            assert_eq!(collected_offsets[i], expected_offset);
-        }
-
-        // Verify remainder distribution (first partitions get extra rows)
-        let remainder = (total_rows % parts) as usize;
-        for i in 0..remainder {
-            assert_eq!(collected_rows[i], collected_rows[remainder] + 1);
-        }
-    }
-
-    #[test]
-    fn test_rows_per_group_bounds() {
-        // Test that compute_rows_per_group_from_stats respects bounds
-
-        // Test minimum bound (should be at least 1000)
-        let rows_per_group_tiny = compute_rows_per_group_from_stats(0.001, 
1000, 1_000_000);
-        assert!(rows_per_group_tiny >= 1000);
-
-        // Test maximum bound (should not exceed 32767)
-        let rows_per_group_huge = compute_rows_per_group_from_stats(1000.0, 
1000, 1);
-        assert!(rows_per_group_huge <= 32767);
-
-        // Test negative target bytes falls back to default
-        let rows_per_group_negative = compute_rows_per_group_from_stats(1.0, 
100000, -1);
-        let rows_per_group_default =
-            compute_rows_per_group_from_stats(1.0, 100000, 
DEFAULT_PARQUET_ROW_GROUP_BYTES);
-        assert_eq!(rows_per_group_negative, rows_per_group_default);
-    }
-
-    #[test]
-    fn test_subtype_selection_logic() {
-        // Test the cumulative nature of subtype selection
-        let base_subtypes = subtypes_for_scale_factor(1.0);
-        let sf10_subtypes = subtypes_for_scale_factor(10.0);
-        let sf100_subtypes = subtypes_for_scale_factor(100.0);
-        let sf1000_subtypes = subtypes_for_scale_factor(1000.0);
-
-        // Each higher scale factor should include all previous subtypes
-        for subtype in &base_subtypes {
-            assert!(sf10_subtypes.contains(subtype));
-            assert!(sf100_subtypes.contains(subtype));
-            assert!(sf1000_subtypes.contains(subtype));
-        }
-
-        for subtype in &sf10_subtypes {
-            assert!(sf100_subtypes.contains(subtype));
-            assert!(sf1000_subtypes.contains(subtype));
-        }
-
-        for subtype in &sf100_subtypes {
-            assert!(sf1000_subtypes.contains(subtype));
-        }
-
-        // Verify progressive addition
-        assert!(sf10_subtypes.len() > base_subtypes.len());
-        assert!(sf100_subtypes.len() > sf10_subtypes.len());
-        assert!(sf1000_subtypes.len() > sf100_subtypes.len());
-    }
-
-    #[test]
-    fn test_estimated_rows_scaling_consistency() {
-        // Test that estimated rows scale proportionally for SF < 1.0
-        let base_rows = estimated_total_rows_for_sf(1.0);
-        let half_rows = estimated_total_rows_for_sf(0.5);
-        let quarter_rows = estimated_total_rows_for_sf(0.25);
-
-        // Should scale proportionally (within rounding)
-        assert!((half_rows as f64 - (base_rows as f64 * 0.5)).abs() < 1.0);
-        assert!((quarter_rows as f64 - (base_rows as f64 * 0.25)).abs() < 1.0);
-
-        // Test that SF >= 1.0 gives discrete jumps (not proportional scaling)
-        let sf1_rows = estimated_total_rows_for_sf(1.0);
-        let sf5_rows = estimated_total_rows_for_sf(5.0);
-        let sf10_rows = estimated_total_rows_for_sf(10.0);
-
-        // These should be equal (same category)
-        assert_eq!(sf1_rows, sf5_rows);
-
-        // This should be different (different category)
-        assert_ne!(sf5_rows, sf10_rows);
-    }
-}
diff --git a/spatialbench-cli/tests/cli_integration.rs 
b/spatialbench-cli/tests/cli_integration.rs
index edd1ea1..1c6028d 100644
--- a/spatialbench-cli/tests/cli_integration.rs
+++ b/spatialbench-cli/tests/cli_integration.rs
@@ -84,6 +84,133 @@ fn test_spatialbench_cli_tbl_scale_factor_v1() {
     }
 }
 
+/// Test that when creating output, if the file already exists it is not 
overwritten
+#[test]
+fn test_spatialbench_cli_tbl_no_overwrite() {
+    let temp_dir = tempdir().expect("Failed to create temporary directory");
+    let expected_file = temp_dir.path().join("trip.tbl");
+
+    let run_command = || {
+        Command::cargo_bin("spatialbench-cli")
+            .expect("Binary not found")
+            .arg("--scale-factor")
+            .arg("0.001")
+            .arg("--format")
+            .arg("tbl")
+            .arg("--tables")
+            .arg("trip")
+            .arg("--output-dir")
+            .arg(temp_dir.path())
+            .assert()
+            .success()
+    };
+
+    run_command();
+    let original_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), 826311);
+
+    // Run the spatialbench-cli command again with the same parameters and 
expect the
+    // file to not be overwritten
+    run_command();
+    let new_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), new_metadata.len());
+    assert_eq!(
+        original_metadata
+            .modified()
+            .expect("Failed to get modified time"),
+        new_metadata
+            .modified()
+            .expect("Failed to get modified time")
+    );
+}
+
+#[tokio::test]
+async fn test_zone_parquet_no_overwrite() {
+    let temp_dir = tempdir().expect("Failed to create temporary directory");
+    let expected_file = temp_dir.path().join("zone/zone.1.parquet");
+
+    let run_command = || {
+        Command::cargo_bin("spatialbench-cli")
+            .expect("Binary not found")
+            .arg("--scale-factor")
+            .arg("1")
+            .arg("--tables")
+            .arg("zone")
+            .arg("--parts")
+            .arg("100")
+            .arg("--part")
+            .arg("1")
+            .arg("--output-dir")
+            .arg(temp_dir.path())
+            .assert()
+            .success()
+    };
+
+    run_command();
+    let original_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), 25400203);
+
+    // Run the spatialbench-cli command again with the same parameters and 
expect the
+    // file to not be overwritten
+    run_command();
+
+    let new_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), new_metadata.len());
+    assert_eq!(
+        original_metadata
+            .modified()
+            .expect("Failed to get modified time"),
+        new_metadata
+            .modified()
+            .expect("Failed to get modified time")
+    );
+}
+
+// Test that when creating output, if the file already exists it is not for 
parquet
+#[test]
+fn test_spatialbench_cli_parquet_no_overwrite() {
+    let temp_dir = tempdir().expect("Failed to create temporary directory");
+    let expected_file = temp_dir.path().join("building.parquet");
+
+    let run_command = || {
+        Command::cargo_bin("spatialbench-cli")
+            .expect("Binary not found")
+            .arg("--scale-factor")
+            .arg("0.001")
+            .arg("--tables")
+            .arg("building")
+            .arg("--output-dir")
+            .arg(temp_dir.path())
+            .assert()
+            .success()
+    };
+
+    run_command();
+    let original_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), 412);
+
+    // Run the spatialbench-cli command again with the same parameters and 
expect the
+    // file to not be overwritten
+    run_command();
+
+    let new_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), new_metadata.len());
+    assert_eq!(
+        original_metadata
+            .modified()
+            .expect("Failed to get modified time"),
+        new_metadata
+            .modified()
+            .expect("Failed to get modified time")
+    );
+}
+
 /// Test zone parquet output determinism - same data should be generated every 
time
 #[tokio::test]
 async fn test_zone_deterministic_parts_generation() {
@@ -106,7 +233,7 @@ async fn test_zone_deterministic_parts_generation() {
         .assert()
         .success();
 
-    let zone_file1 = temp_dir1.path().join("zone.parquet");
+    let zone_file1 = temp_dir1.path().join("zone/zone.1.parquet");
 
     // Reference file is a sf=0.01 zone table with z_boundary column removed
     let reference_file = 
PathBuf::from("../spatialbench/data/sf-v1/zone.parquet");
@@ -190,23 +317,49 @@ async fn test_zone_deterministic_parts_generation() {
     }
 }
 
-/// Test generating the trip table using --parts and --part options
+/// Test generating the trip table using 4 parts implicitly
 #[test]
 fn test_spatialbench_cli_parts() {
-    // Create a temporary directory
     let temp_dir = tempdir().expect("Failed to create temporary directory");
 
-    // generate 4 parts of the trip table with scale factor 0.001
-    // into directories /part1, /part2, /part3, /part4
+    // generate 4 parts of the trip table with scale factor 0.001 and let
+    // spatialbench-cli generate the multiple files
+
+    let num_parts = 4;
+    let output_dir = temp_dir.path().to_path_buf();
+    Command::cargo_bin("spatialbench-cli")
+        .expect("Binary not found")
+        .arg("--scale-factor")
+        .arg("0.001")
+        .arg("--format")
+        .arg("tbl")
+        .arg("--output-dir")
+        .arg(&output_dir)
+        .arg("--parts")
+        .arg(num_parts.to_string())
+        .arg("--tables")
+        .arg("trip")
+        .assert()
+        .success();
+
+    verify_table(temp_dir.path(), "trip", num_parts, "v1");
+}
+
+/// Test generating the order table with multiple invocations using --parts and
+/// --part options
+#[test]
+fn test_spatialbench_cli_parts_explicit() {
+    let temp_dir = tempdir().expect("Failed to create temporary directory");
+
+    // generate 4 parts of the orders table with scale factor 0.001
     // use threads to run the command concurrently to minimize the time taken
     let num_parts = 4;
     let mut threads = vec![];
     for part in 1..=num_parts {
-        let part_dir = temp_dir.path().join(format!("part{part}"));
+        let output_dir = temp_dir.path().to_path_buf();
         threads.push(std::thread::spawn(move || {
-            fs::create_dir(&part_dir).expect("Failed to create part 
directory");
-
             // Run the spatialbench-cli command for each part
+            // output goes into `output_dir/orders/orders.{part}.tbl`
             Command::cargo_bin("spatialbench-cli")
                 .expect("Binary not found")
                 .arg("--scale-factor")
@@ -214,7 +367,7 @@ fn test_spatialbench_cli_parts() {
                 .arg("--format")
                 .arg("tbl")
                 .arg("--output-dir")
-                .arg(&part_dir)
+                .arg(&output_dir)
                 .arg("--parts")
                 .arg(num_parts.to_string())
                 .arg("--part")
@@ -229,11 +382,62 @@ fn test_spatialbench_cli_parts() {
     for thread in threads {
         thread.join().expect("Thread panicked");
     }
-    // Read the generated files into a single buffer and compare them
-    // to the contents of the reference file
+    verify_table(temp_dir.path(), "trip", num_parts, "v1");
+}
+
+/// Create all tables using --parts option and verify the output layouts
+#[test]
+fn test_spatialbench_cli_parts_all_tables() {
+    let temp_dir = tempdir().expect("Failed to create temporary directory");
+
+    let num_parts = 8;
+    let output_dir = temp_dir.path().to_path_buf();
+    Command::cargo_bin("spatialbench-cli")
+        .expect("Binary not found")
+        .arg("--scale-factor")
+        .arg("0.51")
+        .arg("--format")
+        .arg("tbl")
+        .arg("--tables")
+        .arg("building,driver,vehicle,customer")
+        .arg("--output-dir")
+        .arg(&output_dir)
+        .arg("--parts")
+        .arg(num_parts.to_string())
+        .assert()
+        .success();
+
+    Command::cargo_bin("spatialbench-cli")
+        .expect("Binary not found")
+        .arg("--scale-factor")
+        .arg("0.001")
+        .arg("--format")
+        .arg("tbl")
+        .arg("--tables")
+        .arg("trip")
+        .arg("--output-dir")
+        .arg(&output_dir)
+        .arg("--parts")
+        .arg(num_parts.to_string())
+        .assert()
+        .success();
+
+    verify_table(temp_dir.path(), "trip", num_parts, "v1");
+    verify_table(temp_dir.path(), "customer", num_parts, "v1");
+    // Note, building, vehicle and driver have only a single part regardless 
of --parts
+    verify_table(temp_dir.path(), "building", 1, "v1");
+    verify_table(temp_dir.path(), "vehicle", 1, "v1");
+    verify_table(temp_dir.path(), "driver", 1, "v1");
+}
+
+/// Read the N files from `output_dir/table_name/table_name.part.tbl` into a
+/// single buffer and compare them to the contents of the reference file
+fn verify_table(output_dir: &Path, table_name: &str, parts: usize, 
scale_factor: &str) {
     let mut output_contents = Vec::new();
-    for part in 1..=4 {
-        let generated_file = 
temp_dir.path().join(format!("part{part}")).join("trip.tbl");
+    for part in 1..=parts {
+        let generated_file = output_dir
+            .join(table_name)
+            .join(format!("{table_name}.{part}.tbl"));
         assert!(
             generated_file.exists(),
             "File {:?} does not exist",
@@ -247,7 +451,7 @@ fn test_spatialbench_cli_parts() {
         String::from_utf8(output_contents).expect("Failed to convert output 
contents to string");
 
     // load the reference file
-    let reference_file = read_reference_file("trip", "v1");
+    let reference_file = read_reference_file(table_name, scale_factor);
     assert_eq!(output_contents, reference_file);
 }
 
@@ -362,7 +566,7 @@ async fn test_zone_write_parquet_row_group_size_default() {
     expect_row_group_sizes(
         output_dir.path(),
         vec![RowGroups {
-            table: "zone",
+            table: "zone/zone.1",
             row_group_bytes: vec![86288517],
         }],
     );
@@ -442,7 +646,7 @@ async fn test_zone_write_parquet_row_group_size_20mb() {
     expect_row_group_sizes(
         output_dir.path(),
         vec![RowGroups {
-            table: "zone",
+            table: "zone/zone.1",
             row_group_bytes: vec![15428592, 17250042, 19338201, 17046885, 
17251978],
         }],
     );
@@ -466,24 +670,6 @@ fn test_spatialbench_cli_part_no_parts() {
         ));
 }
 
-#[test]
-fn test_spatialbench_cli_parts_no_part() {
-    let temp_dir = tempdir().expect("Failed to create temporary directory");
-
-    // CLI Error test --parts and but not --part
-    Command::cargo_bin("spatialbench-cli")
-        .expect("Binary not found")
-        .arg("--output-dir")
-        .arg(temp_dir.path())
-        .arg("--parts")
-        .arg("42")
-        .assert()
-        .failure()
-        .stderr(predicates::str::contains(
-            "The --part_count option requires the --part option to be set",
-        ));
-}
-
 #[test]
 fn test_spatialbench_cli_too_many_parts() {
     let temp_dir = tempdir().expect("Failed to create temporary directory");
diff --git a/tools/generate_data.py b/tools/generate_data.py
index 34efb0b..2308603 100755
--- a/tools/generate_data.py
+++ b/tools/generate_data.py
@@ -103,12 +103,8 @@ def _generate_data(scale_factor: int, num_partitions: 
dict[str, int], output_pat
         tables = list(num_partitions.keys())
         # Ensure base directories exist
         Path(output_path).mkdir(parents=True, exist_ok=True)
-        (Path(output_path) / "staging").mkdir(parents=True, exist_ok=True)
 
-        def run_one(table: str, part: int) -> None:
-            # Use a per-table, per-part staging dir to avoid collisions when 
parallel
-            staging_dir = Path(output_path) / "staging" / table / 
f"part-{part}"
-            staging_dir.mkdir(parents=True, exist_ok=True)
+        def run_one(table: str) -> None:
 
             result = subprocess.run(
                 [
@@ -118,8 +114,7 @@ def _generate_data(scale_factor: int, num_partitions: 
dict[str, int], output_pat
                     f"--format=parquet",
                     f"--parts={num_partitions[table]}",
                     f"--tables={table}",
-                    f"--part={part}",
-                    f"--output-dir={staging_dir}",
+                    f"--output-dir={output_path}",
                 ],
                 capture_output=True,
                 text=True,
@@ -129,32 +124,13 @@ def _generate_data(scale_factor: int, num_partitions: 
dict[str, int], output_pat
                 logging.warning("Command errors:")
                 logging.warning(result.stderr)
 
-            # Collate results by table instead of part
-            dest_dir = Path(output_path) / table
-            dest_dir.mkdir(parents=True, exist_ok=True)
-            src_file = staging_dir / f"{table}.parquet"
-            dest_file = dest_dir / f"part-{part}.parquet"
-            shutil.move(str(src_file), str(dest_file))
-
-            # Cleanup staging for this (table, part)
-            try:
-                shutil.rmtree(staging_dir)
-                # remove parent if empty
-                parent = staging_dir.parent
-                if parent.exists() and not any(parent.iterdir()):
-                    parent.rmdir()
-            except Exception as cleanup_err:
-                logging.debug(f"Cleanup warning for {staging_dir}: 
{cleanup_err}")
-
         # Launch all generation tasks in parallel threads
         futures = []
         with concurrent.futures.ThreadPoolExecutor(
                 max_workers=os.cpu_count() or 4
         ) as executor:
             for table in tables:
-                for idx in range(num_partitions[table]):
-                    part = idx + 1  # 1-indexed
-                    futures.append(executor.submit(run_one, table, part))
+                futures.append(executor.submit(run_one, table))
             # Raise the first exception if any
             for fut in concurrent.futures.as_completed(futures):
                 fut.result()

Reply via email to