This is an automated email from the ASF dual-hosted git repository.

prantogg pushed a commit to branch support-multipart
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git


The following commit(s) were added to refs/heads/support-multipart by this push:
     new 953b9df  Automatically create multiple files with single `--parts` 
command
953b9df is described below

commit 953b9dfc8a95e1aebd9ffd37253e8fdb92bd116d
Author: Pranav Toggi <[email protected]>
AuthorDate: Sat Oct 25 21:07:59 2025 -0700

    Automatically create multiple files with single `--parts` command
---
 tpchgen-cli/src/main.rs              | 265 +++++++-------------------
 tpchgen-cli/src/output_plan.rs       | 267 ++++++++++++++++++++++++++
 tpchgen-cli/src/plan.rs              |  34 ++--
 tpchgen-cli/src/runner.rs            | 356 +++++++++++++++++++++++++++++++++++
 tpchgen-cli/src/zone/config.rs       |   6 +-
 tpchgen-cli/src/zone/writer.rs       |  35 +++-
 tpchgen-cli/tests/cli_integration.rs | 254 +++++++++++++++++++++----
 7 files changed, 969 insertions(+), 248 deletions(-)

diff --git a/tpchgen-cli/src/main.rs b/tpchgen-cli/src/main.rs
index c225e6a..d95f594 100644
--- a/tpchgen-cli/src/main.rs
+++ b/tpchgen-cli/src/main.rs
@@ -4,58 +4,24 @@
 //! API wise to the original dbgen tool, as in we use the same command line 
flags
 //! and arguments.
 //!
-//! ```
-//! USAGE:
-//!     tpchgen-cli [OPTIONS]
-//!
-//! OPTIONS:
-//!     -h, --help                    Prints help information
-//!     -V, --version                 Prints version information
-//!     -s, --scale-factor <FACTOR>  Scale factor for the data generation 
(default: 1)
-//!     -T, --tables <TABLES>        Comma-separated list of tables to 
generate (default: all)
-//!     -f, --format <FORMAT>        Output format: parquet, tbl or csv 
(default: parquet)
-//!     -o, --output-dir <DIR>       Output directory (default: current 
directory)
-//!     -p, --parts <N>              Number of parts to split generation into 
(default: 1)
-//!         --part <N>               Which part to generate (1-based, default: 
1)
-//!     -n, --num-threads <N>        Number of threads to use (default: number 
of CPUs)
-//!     -c, --parquet-compression <C> Parquet compression codec, e.g., SNAPPY, 
ZSTD(1), UNCOMPRESSED (default: SNAPPY)
-//!         --parquet-row-group-size <N> Target size in bytes per row group in 
Parquet files (default: 134,217,728)
-//!     -v, --verbose                Verbose output
-//!         --stdout                 Write output to stdout instead of files
-//!```
-//!
-//! # Logging:
-//! Use the `-v` flag or `RUST_LOG` environment variable to control logging 
output.
-//!
-//! `-v` sets the log level to `info` and ignores the `RUST_LOG` environment 
variable.
-//!
-//! # Examples
-//! ```
-//! # see all info output
-//! tpchgen-cli -s 1 -v
-//!
-//! # same thing using RUST_LOG
-//! RUST_LOG=info tpchgen-cli -s 1
-//!
-//! # see all debug output
-//! RUST_LOG=debug tpchgen -s 1
-//! ```
+//! See the documentation on [`Cli`] for more information on the command line
 mod csv;
 mod generate;
+mod output_plan;
 mod parquet;
 mod plan;
+mod runner;
 mod spatial_config_file;
 mod statistics;
 mod tbl;
 mod zone;
 
-use crate::csv::*;
-use crate::generate::{generate_in_chunks, Sink, Source};
+use crate::generate::Sink;
+use crate::output_plan::OutputPlanGenerator;
 use crate::parquet::*;
 use crate::plan::{GenerationPlan, DEFAULT_PARQUET_ROW_GROUP_BYTES};
 use crate::spatial_config_file::parse_yaml;
 use crate::statistics::WriteStatistics;
-use crate::tbl::*;
 use ::parquet::basic::Compression;
 use clap::builder::TypedValueParser;
 use clap::{Parser, ValueEnum};
@@ -67,19 +33,40 @@ use std::path::PathBuf;
 use std::str::FromStr;
 use std::time::Instant;
 use tpchgen::distribution::Distributions;
-use tpchgen::generators::{
-    BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, 
VehicleGenerator,
-};
 use tpchgen::spatial::overrides::{set_overrides, SpatialOverrides};
 use tpchgen::text::TextPool;
-use tpchgen_arrow::{
-    BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow, 
VehicleArrow,
-};
 
 #[derive(Parser)]
 #[command(name = "tpchgen")]
 #[command(version)]
-#[command(about = "TPC-H Data Generator", long_about = None)]
+#[command(
+    // -h output
+    about = "TPC-H Data Generator",
+    // --help output
+    long_about = r#"
+TPCH Data Generator (https://github.com/clflushopt/tpchgen-rs)
+
+By default each table is written to a single file named 
<output_dir>/<table>.<format>
+
+If `--part` option is specified, each table is written to a subdirectory in
+multiple files named <output_dir>/<table>/<table>.<part>.<format>
+
+Examples
+
+# Generate all tables at scale factor 1 (1GB) in TBL format to /tmp/tpch 
directory:
+
+tpchgen-cli -s 1 --output-dir=/tmp/tpch
+
+# Generate the lineitem table at scale factor 100 in 10 Apache Parquet files to
+# /tmp/tpch/lineitem
+
+tpchgen-cli -s 100 --tables=lineitem --format=parquet --parts=10 
--output-dir=/tmp/tpch
+
+# Generate scale factor one in current directory, seeing debug output
+
+RUST_LOG=debug tpchgen -s 1
+"#
+)]
 struct Cli {
     /// Scale factor to create
     #[arg(short, long, default_value_t = 1.)]
@@ -97,13 +84,11 @@ struct Cli {
     #[arg(long = "config")]
     config: Option<PathBuf>,
 
-    /// Number of part(itions) to generate (manual parallel generation)
+    /// Number of part(itions) to generate. If not specified creates a single 
file per table
     #[arg(short, long)]
     parts: Option<i32>,
 
-    /// Which part(ition) to generate (1-based)
-    ///
-    /// If not specified, generates all parts
+    /// Which part(ition) to generate (1-based). If not specified, generates 
all parts
     #[arg(long)]
     part: Option<i32>,
 
@@ -132,6 +117,9 @@ struct Cli {
     parquet_compression: Compression,
 
     /// Verbose output
+    ///
+    /// When specified, sets the log level to `info` and ignores the `RUST_LOG`
+    /// environment variable. When not specified, uses `RUST_LOG`
     #[arg(short, long, default_value_t = false)]
     verbose: bool,
 
@@ -142,11 +130,11 @@ struct Cli {
     /// Target size in row group bytes in Parquet files
     ///
     /// Row groups are the typical unit of parallel processing and compression
-    /// in Parquet. With many query engines, smaller row groups enable better
+    /// with many query engines. Therfore, smaller row groups enable better
     /// parallelism and lower peak memory use but may reduce compression
     /// efficiency.
     ///
-    /// Note: parquet files are limited to 32k row groups, so at high scale
+    /// Note: Parquet files are limited to 32k row groups, so at high scale
     /// factors, the row group size may be increased to keep the number of row
     /// groups under this limit.
     ///
@@ -257,46 +245,6 @@ async fn main() -> io::Result<()> {
     cli.main().await
 }
 
-/// macro to create a Cli function for generating a table
-///
-/// Arguments:
-/// $FUN_NAME: name of the function to create
-/// $TABLE: The [`Table`] to generate
-/// $GENERATOR: The generator type to use
-/// $TBL_SOURCE: The [`Source`] type to use for TBL format
-/// $CSV_SOURCE: The [`Source`] type to use for CSV format
-/// $PARQUET_SOURCE: The [`RecordBatchIterator`] type to use for Parquet format
-macro_rules! define_generate {
-    ($FUN_NAME:ident,  $TABLE:expr, $GENERATOR:ident, $TBL_SOURCE:ty, 
$CSV_SOURCE:ty, $PARQUET_SOURCE:ty) => {
-        async fn $FUN_NAME(&self) -> io::Result<()> {
-            let filename = self.output_filename($TABLE);
-            let plan = GenerationPlan::try_new(
-                &$TABLE,
-                self.format,
-                self.scale_factor,
-                self.part,
-                self.parts,
-                self.parquet_row_group_bytes,
-            )
-            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
-            let scale_factor = self.scale_factor;
-            info!("Writing table {} (SF={scale_factor}) to {filename}", 
$TABLE);
-            debug!("Plan: {plan}");
-            let gens = plan
-                .into_iter()
-                .map(move |(part, num_parts)| $GENERATOR::new(scale_factor, 
part, num_parts));
-            match self.format {
-                OutputFormat::Tbl => self.go(&filename, 
gens.map(<$TBL_SOURCE>::new)).await,
-                OutputFormat::Csv => self.go(&filename, 
gens.map(<$CSV_SOURCE>::new)).await,
-                OutputFormat::Parquet => {
-                    self.go_parquet(&filename, 
gens.map(<$PARQUET_SOURCE>::new))
-                        .await
-                }
-            }
-        }
-    };
-}
-
 impl Cli {
     /// Main function to run the generation
     async fn main(self) -> io::Result<()> {
@@ -368,15 +316,6 @@ impl Cli {
             ]
         };
 
-        // force the creation of the distributions and text pool to so it 
doesn't
-        // get charged to the first table
-        let start = Instant::now();
-        debug!("Creating distributions and text pool");
-        Distributions::static_default();
-        TextPool::get_or_init_default();
-        let elapsed = start.elapsed();
-        info!("Created static distributions and text pools in {elapsed:?}");
-
         // Warn if parquet specific options are set but not generating parquet
         if self.format != OutputFormat::Parquet {
             if self.parquet_compression != Compression::SNAPPY {
@@ -391,18 +330,37 @@ impl Cli {
             }
         }
 
-        // Generate each table
+        // Determine what files to generate
+        let mut output_plan_generator = OutputPlanGenerator::new(
+            self.format,
+            self.scale_factor,
+            self.parquet_compression,
+            self.parquet_row_group_bytes,
+            self.stdout,
+            self.output_dir.clone(),
+        );
+
         for table in tables {
-            match table {
-                Table::Vehicle => self.generate_vehicle().await?,
-                Table::Driver => self.generate_driver().await?,
-                Table::Customer => self.generate_customer().await?,
-                Table::Trip => self.generate_trip().await?,
-                Table::Building => self.generate_building().await?,
-                Table::Zone => self.generate_zone().await?,
+            if table == Table::Zone {
+                self.generate_zone().await?
+            } else {
+                output_plan_generator.generate_plans(table, self.part, 
self.parts)?;
             }
         }
+        let output_plans = output_plan_generator.build();
 
+        // force the creation of the distributions and text pool to so it 
doesn't
+        // get charged to the first table
+        let start = Instant::now();
+        debug!("Creating distributions and text pool");
+        Distributions::static_default();
+        TextPool::get_or_init_default();
+        let elapsed = start.elapsed();
+        info!("Created static distributions and text pools in {elapsed:?}");
+
+        // Run
+        let runner = runner::PlanRunner::new(output_plans, self.num_threads);
+        runner.run().await?;
         info!("Generation complete!");
         Ok(())
     }
@@ -425,95 +383,6 @@ impl Cli {
         )
         .await
     }
-
-    define_generate!(
-        generate_vehicle,
-        Table::Vehicle,
-        VehicleGenerator,
-        VehicleTblSource,
-        VehicleCsvSource,
-        VehicleArrow
-    );
-    define_generate!(
-        generate_driver,
-        Table::Driver,
-        DriverGenerator,
-        DriverTblSource,
-        DriverCsvSource,
-        DriverArrow
-    );
-    define_generate!(
-        generate_customer,
-        Table::Customer,
-        CustomerGenerator,
-        CustomerTblSource,
-        CustomerCsvSource,
-        CustomerArrow
-    );
-    define_generate!(
-        generate_trip,
-        Table::Trip,
-        TripGenerator,
-        TripTblSource,
-        TripCsvSource,
-        TripArrow
-    );
-    define_generate!(
-        generate_building,
-        Table::Building,
-        BuildingGenerator,
-        BuildingTblSource,
-        BuildingCsvSource,
-        BuildingArrow
-    );
-
-    /// return the output filename for the given table
-    fn output_filename(&self, table: Table) -> String {
-        let extension = match self.format {
-            OutputFormat::Tbl => "tbl",
-            OutputFormat::Csv => "csv",
-            OutputFormat::Parquet => "parquet",
-        };
-        format!("{}.{extension}", table.name())
-    }
-
-    /// return a file for writing the given filename in the output directory
-    fn new_output_file(&self, filename: &str) -> io::Result<File> {
-        let path = self.output_dir.join(filename);
-        File::create(path)
-    }
-
-    /// Generates the output file from the sources
-    async fn go<I>(&self, filename: &str, sources: I) -> Result<(), io::Error>
-    where
-        I: Iterator<Item: Source> + 'static,
-    {
-        // Since generate_in_chunks already buffers, there is no need to 
buffer again
-        if self.stdout {
-            let sink = WriterSink::new(io::stdout());
-            generate_in_chunks(sink, sources, self.num_threads).await
-        } else {
-            let sink = WriterSink::new(self.new_output_file(filename)?);
-            generate_in_chunks(sink, sources, self.num_threads).await
-        }
-    }
-
-    /// Generates an output parquet file from the sources
-    async fn go_parquet<I>(&self, filename: &str, sources: I) -> Result<(), 
io::Error>
-    where
-        I: Iterator<Item: RecordBatchIterator> + 'static,
-    {
-        if self.stdout {
-            // write to stdout
-            let writer = BufWriter::with_capacity(32 * 1024 * 1024, 
io::stdout()); // 32MB buffer
-            generate_parquet(writer, sources, self.num_threads, 
self.parquet_compression).await
-        } else {
-            // write to a file
-            let file = self.new_output_file(filename)?;
-            let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); // 
32MB buffer
-            generate_parquet(writer, sources, self.num_threads, 
self.parquet_compression).await
-        }
-    }
 }
 
 impl IntoSize for BufWriter<Stdout> {
diff --git a/tpchgen-cli/src/output_plan.rs b/tpchgen-cli/src/output_plan.rs
new file mode 100644
index 0000000..f9ad376
--- /dev/null
+++ b/tpchgen-cli/src/output_plan.rs
@@ -0,0 +1,267 @@
+//! * [`OutputLocation`]: where to output the generated data
+//! * [`OutputPlan`]: an output file that will be generated
+//! * [`OutputPlanGenerator`]: plans the output files to be generated
+
+use crate::plan::GenerationPlan;
+use crate::{OutputFormat, Table};
+use log::debug;
+use parquet::basic::Compression;
+use std::collections::HashSet;
+use std::fmt::{Display, Formatter};
+use std::io;
+use std::path::PathBuf;
+
+/// Where a partition will be output
+#[derive(Debug, Clone, PartialEq)]
+pub enum OutputLocation {
+    /// Output to a file
+    File(PathBuf),
+    /// Output to stdout
+    Stdout,
+}
+
+impl Display for OutputLocation {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            OutputLocation::File(path) => {
+                let Some(file) = path.file_name() else {
+                    return write!(f, "{}", path.display());
+                };
+                // Display the file name only, not the full path
+                write!(f, "{}", file.to_string_lossy())
+            }
+            OutputLocation::Stdout => write!(f, "Stdout"),
+        }
+    }
+}
+
+/// Describes an output partition (file) that will be generated
+#[derive(Debug, Clone, PartialEq)]
+pub struct OutputPlan {
+    /// The table
+    table: Table,
+    /// The scale factor
+    scale_factor: f64,
+    /// The output format (TODO don't depend back on something in main)
+    output_format: OutputFormat,
+    /// If the output is parquet, what compression level to use
+    parquet_compression: Compression,
+    /// Where to output
+    output_location: OutputLocation,
+    /// Plan for generating the table
+    generation_plan: GenerationPlan,
+}
+
+impl OutputPlan {
+    pub fn new(
+        table: Table,
+        scale_factor: f64,
+        output_format: OutputFormat,
+        parquet_compression: Compression,
+        output_location: OutputLocation,
+        generation_plan: GenerationPlan,
+    ) -> Self {
+        Self {
+            table,
+            scale_factor,
+            output_format,
+            parquet_compression,
+            output_location,
+            generation_plan,
+        }
+    }
+
+    /// Return the table this partition is for
+    pub fn table(&self) -> Table {
+        self.table
+    }
+
+    /// Return the scale factor for this partition
+    pub fn scale_factor(&self) -> f64 {
+        self.scale_factor
+    }
+
+    /// Return the output format for this partition
+    pub fn output_format(&self) -> OutputFormat {
+        self.output_format
+    }
+
+    /// return the output location
+    pub fn output_location(&self) -> &OutputLocation {
+        &self.output_location
+    }
+
+    /// Return the parquet compression level for this partition
+    pub fn parquet_compression(&self) -> Compression {
+        self.parquet_compression
+    }
+
+    /// Return the number of chunks part(ition) count (the number of data 
chunks
+    /// in the underlying generation plan)
+    pub fn chunk_count(&self) -> usize {
+        self.generation_plan.chunk_count()
+    }
+
+    /// return the generation plan for this partition
+    pub fn generation_plan(&self) -> &GenerationPlan {
+        &self.generation_plan
+    }
+}
+
+impl Display for OutputPlan {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "table {} (SF={}, {} chunks) to {}",
+            self.table,
+            self.scale_factor,
+            self.chunk_count(),
+            self.output_location
+        )
+    }
+}
+
+/// Plans the creation of output files
+pub struct OutputPlanGenerator {
+    format: OutputFormat,
+    scale_factor: f64,
+    parquet_compression: Compression,
+    parquet_row_group_bytes: i64,
+    stdout: bool,
+    output_dir: PathBuf,
+    /// The generated output plans
+    output_plans: Vec<OutputPlan>,
+    /// Output directores that have been created so far
+    /// (used to avoid creating the same directory multiple times)
+    created_directories: HashSet<PathBuf>,
+}
+
+impl OutputPlanGenerator {
+    pub fn new(
+        format: OutputFormat,
+        scale_factor: f64,
+        parquet_compression: Compression,
+        parquet_row_group_bytes: i64,
+        stdout: bool,
+        output_dir: PathBuf,
+    ) -> Self {
+        Self {
+            format,
+            scale_factor,
+            parquet_compression,
+            parquet_row_group_bytes,
+            stdout,
+            output_dir,
+            output_plans: Vec::new(),
+            created_directories: HashSet::new(),
+        }
+    }
+
+    /// Generate the output plans for the given table and partition options
+    pub fn generate_plans(
+        &mut self,
+        table: Table,
+        cli_part: Option<i32>,
+        cli_part_count: Option<i32>,
+    ) -> io::Result<()> {
+        // If the user specified only a part count, automatically create all
+        // partitions for the table
+        if let (None, Some(part_count)) = (cli_part, cli_part_count) {
+            if GenerationPlan::partitioned_table(table) {
+                debug!("Generating all partitions for table {table} with part 
count {part_count}");
+                for part in 1..=part_count {
+                    self.generate_plan_inner(table, Some(part), 
Some(part_count))?;
+                }
+            } else {
+                // there is only one partition for this table (e.g nation or 
region)
+                debug!("Generating single partition for table {table}");
+                self.generate_plan_inner(table, Some(1), Some(1))?;
+            }
+        } else {
+            self.generate_plan_inner(table, cli_part, cli_part_count)?;
+        }
+        Ok(())
+    }
+
+    fn generate_plan_inner(
+        &mut self,
+        table: Table,
+        cli_part: Option<i32>,
+        cli_part_count: Option<i32>,
+    ) -> io::Result<()> {
+        let generation_plan = GenerationPlan::try_new(
+            table,
+            self.format,
+            self.scale_factor,
+            cli_part,
+            cli_part_count,
+            self.parquet_row_group_bytes,
+        )
+        .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
+
+        let output_location = self.output_location(table, cli_part)?;
+
+        let plan = OutputPlan::new(
+            table,
+            self.scale_factor,
+            self.format,
+            self.parquet_compression,
+            output_location,
+            generation_plan,
+        );
+
+        self.output_plans.push(plan);
+        Ok(())
+    }
+
+    /// Return the output location for the given table
+    ///
+    /// * if part of is None, the output location is 
`{output_dir}/{table}.{extension}`
+    ///
+    /// * if part is Some(part), then the output location
+    ///   will be `{output_dir}/{table}/{table}table.{part}.{extension}`
+    ///   (e.g. orders/orders.1.tbl, orders/orders.2.tbl, etc.)
+    fn output_location(&mut self, table: Table, part: Option<i32>) -> 
io::Result<OutputLocation> {
+        if self.stdout {
+            Ok(OutputLocation::Stdout)
+        } else {
+            let extension = match self.format {
+                OutputFormat::Tbl => "tbl",
+                OutputFormat::Csv => "csv",
+                OutputFormat::Parquet => "parquet",
+            };
+
+            let mut output_path = self.output_dir.clone();
+            if let Some(part) = part {
+                // If a partition is specified, create a subdirectory for it
+                output_path.push(table.to_string());
+                self.ensure_directory_exists(&output_path)?;
+                output_path.push(format!("{table}.{part}.{extension}"));
+            } else {
+                // No partition specified, output to a single file
+                output_path.push(format!("{table}.{extension}"));
+            }
+            Ok(OutputLocation::File(output_path))
+        }
+    }
+
+    /// Ensure the output directory exists, creating it if necessary
+    fn ensure_directory_exists(&mut self, dir: &PathBuf) -> io::Result<()> {
+        if self.created_directories.contains(dir) {
+            return Ok(());
+        }
+        std::fs::create_dir_all(dir).map_err(|e| {
+            io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Error creating directory {}: {}", dir.display(), e),
+            )
+        })?;
+        self.created_directories.insert(dir.clone());
+        Ok(())
+    }
+
+    /// Return the output plans generated so far
+    pub fn build(self) -> Vec<OutputPlan> {
+        self.output_plans
+    }
+}
diff --git a/tpchgen-cli/src/plan.rs b/tpchgen-cli/src/plan.rs
index 3a5c6ff..45822d3 100644
--- a/tpchgen-cli/src/plan.rs
+++ b/tpchgen-cli/src/plan.rs
@@ -1,4 +1,4 @@
-//! [`GenerationPlan`] that describes how to generate a Spatial Bench dataset.
+//! * [`GenerationPlan`]: how to generate a specific Spatial Bench dataset.
 
 use crate::{OutputFormat, Table};
 use log::debug;
@@ -8,7 +8,8 @@ use tpchgen::generators::{
     BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, 
VehicleGenerator,
 };
 
-/// A list of generator "parts" (data generator chunks, not Spatial Bench 
parts)
+/// A list of generator "parts" (data generator chunks, not TPCH parts) for a
+/// single output file.
 ///
 /// Controls the parallelization and layout of Parquet files in `tpchgen-cli`.
 ///
@@ -49,7 +50,7 @@ use tpchgen::generators::{
 /// let results = plan.into_iter().collect::<Vec<_>>();
 /// /// assert_eq!(results.len(), 1);
 /// ```
-#[derive(Debug)]
+#[derive(Debug, Clone, PartialEq)]
 pub struct GenerationPlan {
     /// Total number of parts to generate
     part_count: i32,
@@ -67,7 +68,7 @@ impl GenerationPlan {
     /// * `cli_part_count`: optional total number of parts, `--parts` CLI 
argument
     /// * `parquet_row_group_size`: optional parquet row group size, 
`--parquet-row-group-size` CLI argument
     pub fn try_new(
-        table: &Table,
+        table: Table,
         format: OutputFormat,
         scale_factor: f64,
         cli_part: Option<i32>,
@@ -100,11 +101,17 @@ impl GenerationPlan {
         }
     }
 
+    /// Return true if the tables is unpartitionable (not parameterized by part
+    /// count)
+    pub fn partitioned_table(table: Table) -> bool {
+        table != Table::Vehicle && table != Table::Driver && table != 
Table::Building
+    }
+
     /// Returns a new `GenerationPlan` when partitioning
     ///
     /// See [`GenerationPlan::try_new`] for argument documentation.
     fn try_new_with_parts(
-        table: &Table,
+        table: Table,
         format: OutputFormat,
         scale_factor: f64,
         cli_part: i32,
@@ -128,7 +135,7 @@ impl GenerationPlan {
 
         // These tables are so small they are not parameterized by part count,
         // so only a single part.
-        if table == &Table::Vehicle || table == &Table::Driver {
+        if !Self::partitioned_table(table) {
             return Ok(Self {
                 part_count: 1,
                 part_list: 1..=1,
@@ -169,7 +176,7 @@ impl GenerationPlan {
 
     /// Returns a new `GenerationPlan` when no partitioning is specified on 
the command line
     fn try_new_without_parts(
-        table: &Table,
+        table: Table,
         format: OutputFormat,
         scale_factor: f64,
         parquet_row_group_bytes: i64,
@@ -182,6 +189,11 @@ impl GenerationPlan {
             part_list: 1..=num_parts,
         })
     }
+
+    /// Return the number of part(ititions) this plan will generate
+    pub fn chunk_count(&self) -> usize {
+        self.part_list.clone().count()
+    }
 }
 
 /// Converts the `GenerationPlan` into an iterator of (part_number, num_parts)
@@ -218,7 +230,7 @@ struct OutputSize {
 
 impl OutputSize {
     pub fn new(
-        table: &Table,
+        table: Table,
         scale_factor: f64,
         format: OutputFormat,
         parquet_row_group_bytes: i64,
@@ -320,7 +332,7 @@ impl OutputSize {
         }
     }
 
-    fn row_count_for_table(table: &Table, scale_factor: f64) -> i64 {
+    fn row_count_for_table(table: Table, scale_factor: f64) -> i64 {
         //let (avg_row_size_bytes, row_count) = match table {
         match table {
             Table::Vehicle => 
VehicleGenerator::calculate_row_count(scale_factor, 1, 1),
@@ -744,7 +756,7 @@ mod tests {
         /// expected number of parts and part numbers.
         fn assert(self, expected_part_count: i32, expected_part_numbers: 
RangeInclusive<i32>) {
             let plan = GenerationPlan::try_new(
-                &self.table,
+                self.table,
                 self.format,
                 self.scale_factor,
                 self.cli_part,
@@ -759,7 +771,7 @@ mod tests {
         /// Assert that creating a [`GenerationPlan`] returns the specified 
error
         fn assert_err(self, expected_error: &str) {
             let actual_error = GenerationPlan::try_new(
-                &self.table,
+                self.table,
                 self.format,
                 self.scale_factor,
                 self.cli_part,
diff --git a/tpchgen-cli/src/runner.rs b/tpchgen-cli/src/runner.rs
new file mode 100644
index 0000000..3afb694
--- /dev/null
+++ b/tpchgen-cli/src/runner.rs
@@ -0,0 +1,356 @@
+//! [`PlanRunner`] for running [`OutputPlan`]s.
+
+use crate::csv::*;
+use crate::generate::{generate_in_chunks, Source};
+use crate::output_plan::{OutputLocation, OutputPlan};
+use crate::parquet::generate_parquet;
+use crate::tbl::*;
+use crate::{OutputFormat, Table, WriterSink};
+use log::{debug, info};
+use std::io;
+use std::io::BufWriter;
+use tokio::task::{JoinError, JoinSet};
+use tpchgen::generators::{
+    CustomerGenerator, TripGenerator, VehicleGenerator, BuildingGenerator, 
DriverGenerator,
+};
+use tpchgen_arrow::{
+    CustomerArrow, TripArrow, VehicleArrow,
+    RecordBatchIterator, BuildingArrow, DriverArrow,
+};
+
+/// Runs multiple [`OutputPlan`]s in parallel, managing the number of threads
+/// used to run them.
+#[derive(Debug)]
+pub struct PlanRunner {
+    plans: Vec<OutputPlan>,
+    num_threads: usize,
+}
+
+impl PlanRunner {
+    /// Create a new [`PlanRunner`] with the given plans and number of threads.
+    pub fn new(plans: Vec<OutputPlan>, num_threads: usize) -> Self {
+        Self { plans, num_threads }
+    }
+
+    /// Run all the plans in the runner.
+    pub async fn run(self) -> Result<(), io::Error> {
+        debug!(
+            "Running {} plans with {} threads...",
+            self.plans.len(),
+            self.num_threads
+        );
+        let Self {
+            mut plans,
+            num_threads,
+        } = self;
+
+        // Sort the plans by the number of parts so the largest are first
+        plans.sort_unstable_by(|a, b| {
+            let a_cnt = a.chunk_count();
+            let b_cnt = b.chunk_count();
+            a_cnt.cmp(&b_cnt)
+        });
+
+        // Do the actual work in parallel, using a worker queue
+        let mut worker_queue = WorkerQueue::new(num_threads);
+        while let Some(plan) = plans.pop() {
+            worker_queue.schedule_plan(plan).await?;
+        }
+        worker_queue.join_all().await
+    }
+}
+
+/// Manages worker tasks, limiting the number of total outstanding threads
+/// to some fixed number
+///
+/// The runner executes each plan with a number of threads equal to the
+/// number of parts in the plan, but no more than the total number of
+/// threads specified when creating the runner. If a plan does not need all
+/// the threads, the remaining threads are used to run other plans.
+///
+/// This is important to keep all cores busy for smaller tables that may not
+/// have sufficient parts to keep all threads busy (see [`GenerationPlan`]
+/// for more details), but not schedule more tasks than we have threads for.
+///
+/// Scheduling too many tasks requires more memory and leads to context
+/// switching overhead, which can slow down the generation process.
+///
+/// [`GenerationPlan`]: crate::plan::GenerationPlan
+struct WorkerQueue {
+    join_set: JoinSet<io::Result<usize>>,
+    /// Current number of threads available to commit
+    available_threads: usize,
+}
+
+impl WorkerQueue {
+    pub fn new(max_threads: usize) -> Self {
+        assert!(max_threads > 0);
+        Self {
+            join_set: JoinSet::new(),
+            available_threads: max_threads,
+        }
+    }
+
+    /// Spawns a task to run the plan with as many threads as possible
+    /// without exceeding the maximum number of threads.
+    ///
+    /// If there are no threads available, it will wait for one to finish
+    /// before spawning the new task.
+    ///
+    /// Note this algorithm does not guarantee that all threads are always 
busy,
+    /// but it should be good enough for most cases. For best thread 
utilization
+    /// spawn the largest plans first.
+    pub async fn schedule_plan(&mut self, plan: OutputPlan) -> io::Result<()> {
+        debug!("scheduling plan {plan}");
+        loop {
+            if self.available_threads == 0 {
+                debug!("no threads left, wait for one to finish");
+                let Some(result) = self.join_set.join_next().await else {
+                    return Err(io::Error::other(
+                        "Internal Error No more tasks to wait for, but had no 
threads",
+                    ));
+                };
+                self.available_threads += task_result(result)?;
+                continue; // look for threads again
+            }
+
+            // Check for any other jobs done so we can reuse their threads
+            if let Some(result) = self.join_set.try_join_next() {
+                self.available_threads += task_result(result)?;
+                continue;
+            }
+
+            debug_assert!(
+                self.available_threads > 0,
+                "should have at least one thread to continue"
+            );
+
+            // figure out how many threads to allocate to this plan. Each plan
+            // can use up to `part_count` threads.
+            let chunk_count = plan.chunk_count();
+
+            let num_plan_threads = self.available_threads.min(chunk_count);
+
+            // run the plan in a separate task, which returns the number of 
threads it used
+            debug!("Spawning plan {plan} with {num_plan_threads} threads");
+
+            self.join_set
+                .spawn(async move { run_plan(plan, num_plan_threads).await });
+            self.available_threads -= num_plan_threads;
+            return Ok(());
+        }
+    }
+
+    // Wait for all tasks to finish
+    pub async fn join_all(mut self) -> io::Result<()> {
+        debug!("Waiting for tasks to finish...");
+        while let Some(result) = self.join_set.join_next().await {
+            task_result(result)?;
+        }
+        debug!("Tasks finished.");
+        Ok(())
+    }
+}
+
+/// unwraps the result of a task and converts it to an `io::Result<T>`.
+fn task_result<T>(result: Result<io::Result<T>, JoinError>) -> io::Result<T> {
+    result.map_err(|e| io::Error::other(format!("Task Panic: {e}")))?
+}
+
+/// Run a single [`OutputPlan`]
+async fn run_plan(plan: OutputPlan, num_threads: usize) -> io::Result<usize> {
+    match plan.table() {
+        Table::Building => run_building_plan(plan, num_threads).await,
+        Table::Vehicle => run_vehicle_plan(plan, num_threads).await,
+        Table::Driver => run_driver_plan(plan, num_threads).await,
+        Table::Customer => run_customer_plan(plan, num_threads).await,
+        Table::Trip => run_trip_plan(plan, num_threads).await,
+        Table::Zone => todo!("Zone table is not supported in PlanRunner"),
+    }
+}
+
+/// Writes a CSV/TSV output from the sources
+async fn write_file<I>(plan: OutputPlan, num_threads: usize, sources: I) -> 
Result<(), io::Error>
+where
+    I: Iterator<Item: Source> + 'static,
+{
+    // Since generate_in_chunks already buffers, there is no need to buffer
+    // again (aka don't use BufWriter here)
+    match plan.output_location() {
+        OutputLocation::Stdout => {
+            let sink = WriterSink::new(io::stdout());
+            generate_in_chunks(sink, sources, num_threads).await
+        }
+        OutputLocation::File(path) => {
+            // if the output already exists, skip running
+            if path.exists() {
+                info!("{} already exists, skipping generation", 
path.display());
+                return Ok(());
+            }
+            // write to a temp file and then rename to avoid partial files
+            let temp_path = path.with_extension("inprogress");
+            let file = std::fs::File::create(&temp_path).map_err(|err| {
+                io::Error::other(format!("Failed to create {temp_path:?}: 
{err}"))
+            })?;
+            let sink = WriterSink::new(file);
+            generate_in_chunks(sink, sources, num_threads).await?;
+            // rename the temp file to the final path
+            std::fs::rename(&temp_path, path).map_err(|e| {
+                io::Error::other(format!(
+                    "Failed to rename {temp_path:?} to {path:?} file: {e}"
+                ))
+            })?;
+            Ok(())
+        }
+    }
+}
+
+/// Generates an output parquet file from the sources
+async fn write_parquet<I>(plan: OutputPlan, num_threads: usize, sources: I) -> 
Result<(), io::Error>
+where
+    I: Iterator<Item: RecordBatchIterator> + 'static,
+{
+    match plan.output_location() {
+        OutputLocation::Stdout => {
+            let writer = BufWriter::with_capacity(32 * 1024 * 1024, 
io::stdout()); // 32MB buffer
+            generate_parquet(writer, sources, num_threads, 
plan.parquet_compression()).await
+        }
+        OutputLocation::File(path) => {
+            // if the output already exists, skip running
+            if path.exists() {
+                info!("{} already exists, skipping generation", 
path.display());
+                return Ok(());
+            }
+            // write to a temp file and then rename to avoid partial files
+            let temp_path = path.with_extension("inprogress");
+            let file = std::fs::File::create(&temp_path).map_err(|err| {
+                io::Error::other(format!("Failed to create {temp_path:?}: 
{err}"))
+            })?;
+            let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); // 
32MB buffer
+            generate_parquet(writer, sources, num_threads, 
plan.parquet_compression()).await?;
+            // rename the temp file to the final path
+            std::fs::rename(&temp_path, path).map_err(|e| {
+                io::Error::other(format!(
+                    "Failed to rename {temp_path:?} to {path:?} file: {e}"
+                ))
+            })?;
+            Ok(())
+        }
+    }
+}
+
+/// macro to create a function for generating a part of a particular able
+///
+/// Arguments:
+/// $FUN_NAME: name of the function to create
+/// $GENERATOR: The generator type to use
+/// $TBL_SOURCE: The [`Source`] type to use for TBL format
+/// $CSV_SOURCE: The [`Source`] type to use for CSV format
+/// $PARQUET_SOURCE: The [`RecordBatchIterator`] type to use for Parquet format
+macro_rules! define_run {
+    ($FUN_NAME:ident, $GENERATOR:ident, $TBL_SOURCE:ty, $CSV_SOURCE:ty, 
$PARQUET_SOURCE:ty) => {
+        async fn $FUN_NAME(plan: OutputPlan, num_threads: usize) -> 
io::Result<usize> {
+            use crate::GenerationPlan;
+            let scale_factor = plan.scale_factor();
+            info!("Writing {plan} using {num_threads} threads");
+
+            /// These interior functions are used to tell the compiler that 
the lifetime is 'static
+            /// (when these were closures, the compiler could not figure out 
the lifetime) and
+            /// resulted in errors like this:
+            ///          let _ = join_set.spawn(async move {
+            ///                 |  _____________________^
+            ///              96 | |                 run_plan(plan, 
num_plan_threads).await
+            ///              97 | |             });
+            ///                 | |______________^ implementation of `FnOnce` 
is not general enough
+            fn tbl_sources(
+                generation_plan: &GenerationPlan,
+                scale_factor: f64,
+            ) -> impl Iterator<Item: Source> + 'static {
+                generation_plan
+                    .clone()
+                    .into_iter()
+                    .map(move |(part, num_parts)| 
$GENERATOR::new(scale_factor, part, num_parts))
+                    .map(<$TBL_SOURCE>::new)
+            }
+
+            fn csv_sources(
+                generation_plan: &GenerationPlan,
+                scale_factor: f64,
+            ) -> impl Iterator<Item: Source> + 'static {
+                generation_plan
+                    .clone()
+                    .into_iter()
+                    .map(move |(part, num_parts)| 
$GENERATOR::new(scale_factor, part, num_parts))
+                    .map(<$CSV_SOURCE>::new)
+            }
+
+            fn parquet_sources(
+                generation_plan: &GenerationPlan,
+                scale_factor: f64,
+            ) -> impl Iterator<Item: RecordBatchIterator> + 'static {
+                generation_plan
+                    .clone()
+                    .into_iter()
+                    .map(move |(part, num_parts)| 
$GENERATOR::new(scale_factor, part, num_parts))
+                    .map(<$PARQUET_SOURCE>::new)
+            }
+
+            // Dispach to the appropriate output format
+            match plan.output_format() {
+                OutputFormat::Tbl => {
+                    let gens = tbl_sources(plan.generation_plan(), 
scale_factor);
+                    write_file(plan, num_threads, gens).await?
+                }
+                OutputFormat::Csv => {
+                    let gens = csv_sources(plan.generation_plan(), 
scale_factor);
+                    write_file(plan, num_threads, gens).await?
+                }
+                OutputFormat::Parquet => {
+                    let gens = parquet_sources(plan.generation_plan(), 
scale_factor);
+                    write_parquet(plan, num_threads, gens).await?
+                }
+            };
+            Ok(num_threads)
+        }
+    };
+}
+
+define_run!(
+    run_trip_plan,
+    TripGenerator,
+    TripTblSource,
+    TripCsvSource,
+    TripArrow
+);
+
+define_run!(
+    run_building_plan,
+    BuildingGenerator,
+    BuildingTblSource,
+    BuildingCsvSource,
+    BuildingArrow
+);
+
+define_run!(
+    run_vehicle_plan,
+    VehicleGenerator,
+    VehicleTblSource,
+    VehicleCsvSource,
+    VehicleArrow
+);
+
+define_run!(
+    run_driver_plan,
+    DriverGenerator,
+    DriverTblSource,
+    DriverCsvSource,
+    DriverArrow
+);
+
+define_run!(
+    run_customer_plan,
+    CustomerGenerator,
+    CustomerTblSource,
+    CustomerCsvSource,
+    CustomerArrow
+);
diff --git a/tpchgen-cli/src/zone/config.rs b/tpchgen-cli/src/zone/config.rs
index 2da6d5e..5fe25fa 100644
--- a/tpchgen-cli/src/zone/config.rs
+++ b/tpchgen-cli/src/zone/config.rs
@@ -44,9 +44,13 @@ impl ZoneDfArgs {
 
     pub fn output_filename(&self) -> PathBuf {
         if self.parts > 1 {
-            self.output_dir.join(format!("zone.{}.parquet", self.part))
+            // Create zone subdirectory and write parts within it
+            self.output_dir
+                .join("zone")
+                .join(format!("zone.{}.parquet", self.part))
         } else {
             self.output_dir.join("zone.parquet")
         }
     }
+
 }
diff --git a/tpchgen-cli/src/zone/writer.rs b/tpchgen-cli/src/zone/writer.rs
index 627566b..8bf89db 100644
--- a/tpchgen-cli/src/zone/writer.rs
+++ b/tpchgen-cli/src/zone/writer.rs
@@ -36,11 +36,28 @@ impl ParquetWriter {
     }
 
     pub fn write(&self, batches: &[RecordBatch]) -> Result<()> {
-        std::fs::create_dir_all(&self.args.output_dir)?;
-        debug!("Created output directory: {:?}", self.args.output_dir);
+        // Create parent directory of output file (handles both zone/ 
subdirectory and base dir)
+        let parent_dir = self
+            .output_path
+            .parent()
+            .ok_or_else(|| anyhow::anyhow!("Invalid output path: {:?}", 
self.output_path))?;
 
+        std::fs::create_dir_all(parent_dir)?;
+        debug!("Created output directory: {:?}", parent_dir);
+
+        // Check if file already exists
+        if self.output_path.exists() {
+            info!(
+            "{} already exists, skipping generation",
+            self.output_path.display()
+        );
+            return Ok(());
+        }
+
+        // Write to temp file first
+        let temp_path = self.output_path.with_extension("inprogress");
         let t0 = Instant::now();
-        let file = std::fs::File::create(&self.output_path)?;
+        let file = std::fs::File::create(&temp_path)?;
         let mut writer =
             ArrowWriter::try_new(file, Arc::clone(&self.schema), 
Some(self.props.clone()))?;
 
@@ -49,8 +66,18 @@ impl ParquetWriter {
         }
 
         writer.close()?;
-        let duration = t0.elapsed();
 
+        // Rename temp file to final output
+        std::fs::rename(&temp_path, &self.output_path).map_err(|e| {
+            anyhow::anyhow!(
+            "Failed to rename {:?} to {:?}: {}",
+            temp_path,
+            self.output_path,
+            e
+        )
+        })?;
+
+        let duration = t0.elapsed();
         let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
 
         info!(
diff --git a/tpchgen-cli/tests/cli_integration.rs 
b/tpchgen-cli/tests/cli_integration.rs
index 6916b8e..23947c0 100644
--- a/tpchgen-cli/tests/cli_integration.rs
+++ b/tpchgen-cli/tests/cli_integration.rs
@@ -84,6 +84,133 @@ fn test_tpchgen_cli_tbl_scale_factor_v1() {
     }
 }
 
+/// Test that when creating output, if the file already exists it is not 
overwritten
+#[test]
+fn test_tpchgen_cli_tbl_no_overwrite() {
+    let temp_dir = tempdir().expect("Failed to create temporary directory");
+    let expected_file = temp_dir.path().join("trip.tbl");
+
+    let run_command = || {
+        Command::cargo_bin("tpchgen-cli")
+            .expect("Binary not found")
+            .arg("--scale-factor")
+            .arg("0.001")
+            .arg("--format")
+            .arg("tbl")
+            .arg("--tables")
+            .arg("trip")
+            .arg("--output-dir")
+            .arg(temp_dir.path())
+            .assert()
+            .success()
+    };
+
+    run_command();
+    let original_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), 826311);
+
+    // Run the tpchgen-cli command again with the same parameters and expect 
the
+    // file to not be overwritten
+    run_command();
+    let new_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), new_metadata.len());
+    assert_eq!(
+        original_metadata
+            .modified()
+            .expect("Failed to get modified time"),
+        new_metadata
+            .modified()
+            .expect("Failed to get modified time")
+    );
+}
+
+#[tokio::test]
+async fn test_zone_parquet_no_overwrite() {
+    let temp_dir = tempdir().expect("Failed to create temporary directory");
+    let expected_file = temp_dir.path().join("zone/zone.1.parquet");
+
+    let run_command = || {
+        Command::cargo_bin("tpchgen-cli")
+            .expect("Binary not found")
+            .arg("--scale-factor")
+            .arg("1")
+            .arg("--tables")
+            .arg("zone")
+            .arg("--parts")
+            .arg("100")
+            .arg("--part")
+            .arg("1")
+            .arg("--output-dir")
+            .arg(temp_dir.path())
+            .assert()
+            .success()
+    };
+
+    run_command();
+    let original_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), 25400203);
+
+    // Run the tpchgen-cli command again with the same parameters and expect 
the
+    // file to not be overwritten
+    run_command();
+
+    let new_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), new_metadata.len());
+    assert_eq!(
+        original_metadata
+            .modified()
+            .expect("Failed to get modified time"),
+        new_metadata
+            .modified()
+            .expect("Failed to get modified time")
+    );
+}
+
+// Test that when creating output, if the file already exists it is not for 
parquet
+#[test]
+fn test_tpchgen_cli_parquet_no_overwrite() {
+    let temp_dir = tempdir().expect("Failed to create temporary directory");
+    let expected_file = temp_dir.path().join("building.parquet");
+
+    let run_command = || {
+        Command::cargo_bin("tpchgen-cli")
+            .expect("Binary not found")
+            .arg("--scale-factor")
+            .arg("0.001")
+            .arg("--tables")
+            .arg("building")
+            .arg("--output-dir")
+            .arg(temp_dir.path())
+            .assert()
+            .success()
+    };
+
+    run_command();
+    let original_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), 412);
+
+    // Run the tpchgen-cli command again with the same parameters and expect 
the
+    // file to not be overwritten
+    run_command();
+
+    let new_metadata =
+        fs::metadata(&expected_file).expect("Failed to get metadata of 
generated file");
+    assert_eq!(original_metadata.len(), new_metadata.len());
+    assert_eq!(
+        original_metadata
+            .modified()
+            .expect("Failed to get modified time"),
+        new_metadata
+            .modified()
+            .expect("Failed to get modified time")
+    );
+}
+
 /// Test zone parquet output determinism - same data should be generated every 
time
 #[tokio::test]
 async fn test_zone_deterministic_parts_generation() {
@@ -106,7 +233,7 @@ async fn test_zone_deterministic_parts_generation() {
         .assert()
         .success();
 
-    let zone_file1 = temp_dir1.path().join("zone.1.parquet");
+    let zone_file1 = temp_dir1.path().join("zone/zone.1.parquet");
 
     // Reference file is a sf=0.01 zone table with z_boundary column removed
     let reference_file = PathBuf::from("../tpchgen/data/sf-v1/zone.parquet");
@@ -190,23 +317,49 @@ async fn test_zone_deterministic_parts_generation() {
     }
 }
 
-/// Test generating the trip table using --parts and --part options
+/// Test generating the trip table using 4 parts implicitly
 #[test]
 fn test_tpchgen_cli_parts() {
-    // Create a temporary directory
     let temp_dir = tempdir().expect("Failed to create temporary directory");
 
-    // generate 4 parts of the trip table with scale factor 0.001
-    // into directories /part1, /part2, /part3, /part4
+    // generate 4 parts of the trip table with scale factor 0.001 and let
+    // tpchgen-cli generate the multiple files
+
+    let num_parts = 4;
+    let output_dir = temp_dir.path().to_path_buf();
+    Command::cargo_bin("tpchgen-cli")
+        .expect("Binary not found")
+        .arg("--scale-factor")
+        .arg("0.001")
+        .arg("--format")
+        .arg("tbl")
+        .arg("--output-dir")
+        .arg(&output_dir)
+        .arg("--parts")
+        .arg(num_parts.to_string())
+        .arg("--tables")
+        .arg("trip")
+        .assert()
+        .success();
+
+    verify_table(temp_dir.path(), "trip", num_parts, "v1");
+}
+
+/// Test generating the order table with multiple invocations using --parts and
+/// --part options
+#[test]
+fn test_tpchgen_cli_parts_explicit() {
+    let temp_dir = tempdir().expect("Failed to create temporary directory");
+
+    // generate 4 parts of the orders table with scale factor 0.001
     // use threads to run the command concurrently to minimize the time taken
     let num_parts = 4;
     let mut threads = vec![];
     for part in 1..=num_parts {
-        let part_dir = temp_dir.path().join(format!("part{part}"));
+        let output_dir = temp_dir.path().to_path_buf();
         threads.push(std::thread::spawn(move || {
-            fs::create_dir(&part_dir).expect("Failed to create part 
directory");
-
             // Run the tpchgen-cli command for each part
+            // output goes into `output_dir/orders/orders.{part}.tbl`
             Command::cargo_bin("tpchgen-cli")
                 .expect("Binary not found")
                 .arg("--scale-factor")
@@ -214,7 +367,7 @@ fn test_tpchgen_cli_parts() {
                 .arg("--format")
                 .arg("tbl")
                 .arg("--output-dir")
-                .arg(&part_dir)
+                .arg(&output_dir)
                 .arg("--parts")
                 .arg(num_parts.to_string())
                 .arg("--part")
@@ -229,11 +382,62 @@ fn test_tpchgen_cli_parts() {
     for thread in threads {
         thread.join().expect("Thread panicked");
     }
-    // Read the generated files into a single buffer and compare them
-    // to the contents of the reference file
+    verify_table(temp_dir.path(), "trip", num_parts, "v1");
+}
+
+/// Create all tables using --parts option and verify the output layouts
+#[test]
+fn test_tpchgen_cli_parts_all_tables() {
+    let temp_dir = tempdir().expect("Failed to create temporary directory");
+
+    let num_parts = 8;
+    let output_dir = temp_dir.path().to_path_buf();
+    Command::cargo_bin("tpchgen-cli")
+        .expect("Binary not found")
+        .arg("--scale-factor")
+        .arg("0.51")
+        .arg("--format")
+        .arg("tbl")
+        .arg("--tables")
+        .arg("building,driver,vehicle,customer")
+        .arg("--output-dir")
+        .arg(&output_dir)
+        .arg("--parts")
+        .arg(num_parts.to_string())
+        .assert()
+        .success();
+
+    Command::cargo_bin("tpchgen-cli")
+        .expect("Binary not found")
+        .arg("--scale-factor")
+        .arg("0.001")
+        .arg("--format")
+        .arg("tbl")
+        .arg("--tables")
+        .arg("trip")
+        .arg("--output-dir")
+        .arg(&output_dir)
+        .arg("--parts")
+        .arg(num_parts.to_string())
+        .assert()
+        .success();
+
+    verify_table(temp_dir.path(), "trip", num_parts, "v1");
+    verify_table(temp_dir.path(), "customer", num_parts, "v1");
+    // Note, building, vehicle and driver have only a single part regardless 
of --parts
+    verify_table(temp_dir.path(), "building", 1, "v1");
+    verify_table(temp_dir.path(), "vehicle", 1, "v1");
+    verify_table(temp_dir.path(), "driver", 1, "v1");
+}
+
+/// Read the N files from `output_dir/table_name/table_name.part.tml` into a
+/// single buffer and compare them to the contents of the reference file
+fn verify_table(output_dir: &Path, table_name: &str, parts: usize, 
scale_factor: &str) {
     let mut output_contents = Vec::new();
-    for part in 1..=4 {
-        let generated_file = 
temp_dir.path().join(format!("part{part}")).join("trip.tbl");
+    for part in 1..=parts {
+        let generated_file = output_dir
+            .join(table_name)
+            .join(format!("{table_name}.{part}.tbl"));
         assert!(
             generated_file.exists(),
             "File {:?} does not exist",
@@ -247,7 +451,7 @@ fn test_tpchgen_cli_parts() {
         String::from_utf8(output_contents).expect("Failed to convert output 
contents to string");
 
     // load the reference file
-    let reference_file = read_reference_file("trip", "v1");
+    let reference_file = read_reference_file(table_name, scale_factor);
     assert_eq!(output_contents, reference_file);
 }
 
@@ -362,7 +566,7 @@ async fn test_zone_write_parquet_row_group_size_default() {
     expect_row_group_sizes(
         output_dir.path(),
         vec![RowGroups {
-            table: "zone.1",
+            table: "zone/zone.1",
             row_group_bytes: vec![86288517],
         }],
     );
@@ -442,7 +646,7 @@ async fn test_zone_write_parquet_row_group_size_20mb() {
     expect_row_group_sizes(
         output_dir.path(),
         vec![RowGroups {
-            table: "zone.1",
+            table: "zone/zone.1",
             row_group_bytes: vec![15428592, 17250042, 19338201, 17046885, 
17251978],
         }],
     );
@@ -466,24 +670,6 @@ fn test_tpchgen_cli_part_no_parts() {
         ));
 }
 
-#[test]
-fn test_tpchgen_cli_parts_no_part() {
-    let temp_dir = tempdir().expect("Failed to create temporary directory");
-
-    // CLI Error test --parts and but not --part
-    Command::cargo_bin("tpchgen-cli")
-        .expect("Binary not found")
-        .arg("--output-dir")
-        .arg(temp_dir.path())
-        .arg("--parts")
-        .arg("42")
-        .assert()
-        .failure()
-        .stderr(predicates::str::contains(
-            "The --part_count option requires the --part option to be set",
-        ));
-}
-
 #[test]
 fn test_tpchgen_cli_too_many_parts() {
     let temp_dir = tempdir().expect("Failed to create temporary directory");

Reply via email to