This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
The following commit(s) were added to refs/heads/main by this push:
new 59dff99 Feat: Support creating multiple files with single --parts
command (#57)
59dff99 is described below
commit 59dff99c1f85f9fa0b1566ff562ddcae81d135f6
Author: Pranav Toggi <[email protected]>
AuthorDate: Mon Oct 27 11:21:43 2025 -0700
Feat: Support creating multiple files with single --parts command (#57)
* temp: rename to tpchgen
* refactor zone generation for readability
* allow generating multi-part zone
* fmt, clippy fixes
* Automatically create multiple files with single `--parts` command
* Improve code readability and logging in multi-part generation
* Rename tpchgen to spatialbench and update related references
* fmt fix
* update generate_data.py script
* fix test
* Apply suggestions from code review
Co-authored-by: Copilot <[email protected]>
---------
Co-authored-by: Copilot <[email protected]>
---
README.md | 13 +-
spatialbench-cli/src/main.rs | 298 +++++-------------
spatialbench-cli/src/output_plan.rs | 267 ++++++++++++++++
spatialbench-cli/src/plan.rs | 36 ++-
spatialbench-cli/src/runner.rs | 355 +++++++++++++++++++++
spatialbench-cli/src/zone/config.rs | 55 ++++
spatialbench-cli/src/zone/datasource.rs | 95 ++++++
spatialbench-cli/src/zone/main.rs | 64 ++++
spatialbench-cli/src/zone/mod.rs | 88 ++++++
spatialbench-cli/src/zone/partition.rs | 94 ++++++
spatialbench-cli/src/zone/stats.rs | 161 ++++++++++
spatialbench-cli/src/zone/transform.rs | 50 +++
spatialbench-cli/src/zone/writer.rs | 94 ++++++
spatialbench-cli/src/zone_df.rs | 503 ------------------------------
spatialbench-cli/tests/cli_integration.rs | 254 +++++++++++++--
tools/generate_data.py | 30 +-
16 files changed, 1654 insertions(+), 803 deletions(-)
diff --git a/README.md b/README.md
index 3ee105e..2ac7140 100644
--- a/README.md
+++ b/README.md
@@ -95,13 +95,7 @@ Key performance benefits:
SpatialBench is a Rust-based fork of the tpchgen-rs project. It preserves the
original’s high-performance, multi-threaded, streaming architecture, while
extending it with a spatial star schema and geometry generation logic.
-You can build the SpatialBench data generator using Cargo:
-
-```bash
-cargo build --release
-```
-
-Alternatively, install it directly using:
+You can install the SpatialBench data generator using Cargo:
```bash
cargo install --path ./spatialbench-cli
@@ -133,10 +127,7 @@ spatialbench-cli -s 1 --format=parquet --tables
trip,building --output-dir sf1-p
#### Partitioned Output Example
```bash
-for PART in $(seq 1 4); do
- mkdir part-$PART
- spatialbench-cli -s 10 --tables trip,building --output-dir part-$PART
--parts 4 --part $PART
-done
+spatialbench-cli -s 10 --tables trip,building --parts 4
```
#### Generate Multiple Parquet Files of Similar Size
diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs
index 79eee80..9456f95 100644
--- a/spatialbench-cli/src/main.rs
+++ b/spatialbench-cli/src/main.rs
@@ -4,71 +4,31 @@
//! API wise to the original dbgen tool, as in we use the same command line
flags
//! and arguments.
//!
-//! ```
-//! USAGE:
-//! spatialbench-cli [OPTIONS]
-//!
-//! OPTIONS:
-//! -h, --help Prints help information
-//! -V, --version Prints version information
-//! -s, --scale-factor <FACTOR> Scale factor for the data generation
(default: 1)
-//! -T, --tables <TABLES> Comma-separated list of tables to
generate (default: all)
-//! -f, --format <FORMAT> Output format: parquet, tbl or csv
(default: parquet)
-//! -o, --output-dir <DIR> Output directory (default: current
directory)
-//! -p, --parts <N> Number of parts to split generation into
(default: 1)
-//! --part <N> Which part to generate (1-based, default:
1)
-//! -n, --num-threads <N> Number of threads to use (default: number
of CPUs)
-//! -c, --parquet-compression <C> Parquet compression codec, e.g., SNAPPY,
ZSTD(1), UNCOMPRESSED (default: SNAPPY)
-//! --parquet-row-group-size <N> Target size in bytes per row group in
Parquet files (default: 134,217,728)
-//! -v, --verbose Verbose output
-//! --stdout Write output to stdout instead of files
-//!```
-//!
-//! # Logging:
-//! Use the `-v` flag or `RUST_LOG` environment variable to control logging
output.
-//!
-//! `-v` sets the log level to `info` and ignores the `RUST_LOG` environment
variable.
-//!
-//! # Examples
-//! ```
-//! # see all info output
-//! spatialbench-cli -s 1 -v
-//!
-//! # same thing using RUST_LOG
-//! RUST_LOG=info spatialbench-cli -s 1
-//!
-//! # see all debug output
-//! RUST_LOG=debug spatialbench -s 1
-//! ```
+//! See the documentation on [`Cli`] for more information on the command line
mod csv;
mod generate;
+mod output_plan;
mod parquet;
mod plan;
+mod runner;
mod spatial_config_file;
mod statistics;
mod tbl;
-mod zone_df;
+mod zone;
-use crate::csv::*;
-use crate::generate::{generate_in_chunks, Sink, Source};
+use crate::generate::Sink;
+use crate::output_plan::OutputPlanGenerator;
use crate::parquet::*;
use crate::plan::{GenerationPlan, DEFAULT_PARQUET_ROW_GROUP_BYTES};
use crate::spatial_config_file::parse_yaml;
use crate::statistics::WriteStatistics;
-use crate::tbl::*;
use ::parquet::basic::Compression;
use clap::builder::TypedValueParser;
use clap::{Parser, ValueEnum};
use log::{debug, info, LevelFilter};
use spatialbench::distribution::Distributions;
-use spatialbench::generators::{
- BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator,
VehicleGenerator,
-};
use spatialbench::spatial::overrides::{set_overrides, SpatialOverrides};
use spatialbench::text::TextPool;
-use spatialbench_arrow::{
- BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow,
VehicleArrow,
-};
use std::fmt::Display;
use std::fs::{self, File};
use std::io::{self, BufWriter, Stdout, Write};
@@ -79,7 +39,34 @@ use std::time::Instant;
#[derive(Parser)]
#[command(name = "spatialbench")]
#[command(version)]
-#[command(about = "TPC-H Data Generator", long_about = None)]
+#[command(
+ // -h output
+ about = "TPC-H Data Generator",
+ // --help output
+ long_about = r#"
+TPCH Data Generator (https://github.com/clflushopt/spatialbench-rs)
+
+By default each table is written to a single file named
<output_dir>/<table>.<format>
+
+If `--part` option is specified, each table is written to a subdirectory in
+multiple files named <output_dir>/<table>/<table>.<part>.<format>
+
+Examples
+
+# Generate all tables at scale factor 1 (1GB) in TBL format to /tmp/tpch
directory:
+
+spatialbench-cli -s 1 --output-dir=/tmp/tpch
+
+# Generate the lineitem table at scale factor 100 in 10 Apache Parquet files to
+# /tmp/tpch/lineitem
+
+spatialbench-cli -s 100 --tables=lineitem --format=parquet --parts=10
--output-dir=/tmp/tpch
+
+# Generate scale factor one in current directory, seeing debug output
+
+RUST_LOG=debug spatialbench -s 1
+"#
+)]
struct Cli {
/// Scale factor to create
#[arg(short, long, default_value_t = 1.)]
@@ -97,13 +84,11 @@ struct Cli {
#[arg(long = "config")]
config: Option<PathBuf>,
- /// Number of part(itions) to generate (manual parallel generation)
+ /// Number of part(itions) to generate. If not specified creates a single
file per table
#[arg(short, long)]
parts: Option<i32>,
- /// Which part(ition) to generate (1-based)
- ///
- /// If not specified, generates all parts
+ /// Which part(ition) to generate (1-based). If not specified, generates
all parts
#[arg(long)]
part: Option<i32>,
@@ -132,6 +117,9 @@ struct Cli {
parquet_compression: Compression,
/// Verbose output
+ ///
+ /// When specified, sets the log level to `info` and ignores the `RUST_LOG`
+ /// environment variable. When not specified, uses `RUST_LOG`
#[arg(short, long, default_value_t = false)]
verbose: bool,
@@ -142,11 +130,11 @@ struct Cli {
/// Target size in row group bytes in Parquet files
///
/// Row groups are the typical unit of parallel processing and compression
- /// in Parquet. With many query engines, smaller row groups enable better
+ /// with many query engines. Therefore, smaller row groups enable better
/// parallelism and lower peak memory use but may reduce compression
/// efficiency.
///
- /// Note: parquet files are limited to 32k row groups, so at high scale
+ /// Note: Parquet files are limited to 32k row groups, so at high scale
/// factors, the row group size may be increased to keep the number of row
/// groups under this limit.
///
@@ -257,46 +245,6 @@ async fn main() -> io::Result<()> {
cli.main().await
}
-/// macro to create a Cli function for generating a table
-///
-/// Arguments:
-/// $FUN_NAME: name of the function to create
-/// $TABLE: The [`Table`] to generate
-/// $GENERATOR: The generator type to use
-/// $TBL_SOURCE: The [`Source`] type to use for TBL format
-/// $CSV_SOURCE: The [`Source`] type to use for CSV format
-/// $PARQUET_SOURCE: The [`RecordBatchIterator`] type to use for Parquet format
-macro_rules! define_generate {
- ($FUN_NAME:ident, $TABLE:expr, $GENERATOR:ident, $TBL_SOURCE:ty,
$CSV_SOURCE:ty, $PARQUET_SOURCE:ty) => {
- async fn $FUN_NAME(&self) -> io::Result<()> {
- let filename = self.output_filename($TABLE);
- let plan = GenerationPlan::try_new(
- &$TABLE,
- self.format,
- self.scale_factor,
- self.part,
- self.parts,
- self.parquet_row_group_bytes,
- )
- .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
- let scale_factor = self.scale_factor;
- info!("Writing table {} (SF={scale_factor}) to {filename}",
$TABLE);
- debug!("Plan: {plan}");
- let gens = plan
- .into_iter()
- .map(move |(part, num_parts)| $GENERATOR::new(scale_factor,
part, num_parts));
- match self.format {
- OutputFormat::Tbl => self.go(&filename,
gens.map(<$TBL_SOURCE>::new)).await,
- OutputFormat::Csv => self.go(&filename,
gens.map(<$CSV_SOURCE>::new)).await,
- OutputFormat::Parquet => {
- self.go_parquet(&filename,
gens.map(<$PARQUET_SOURCE>::new))
- .await
- }
- }
- }
- };
-}
-
impl Cli {
/// Main function to run the generation
async fn main(self) -> io::Result<()> {
@@ -368,15 +316,6 @@ impl Cli {
]
};
- // force the creation of the distributions and text pool to so it
doesn't
- // get charged to the first table
- let start = Instant::now();
- debug!("Creating distributions and text pool");
- Distributions::static_default();
- TextPool::get_or_init_default();
- let elapsed = start.elapsed();
- info!("Created static distributions and text pools in {elapsed:?}");
-
// Warn if parquet specific options are set but not generating parquet
if self.format != OutputFormat::Parquet {
if self.parquet_compression != Compression::SNAPPY {
@@ -391,131 +330,58 @@ impl Cli {
}
}
- // Generate each table
+ // Determine what files to generate
+ let mut output_plan_generator = OutputPlanGenerator::new(
+ self.format,
+ self.scale_factor,
+ self.parquet_compression,
+ self.parquet_row_group_bytes,
+ self.stdout,
+ self.output_dir.clone(),
+ );
+
for table in tables {
- match table {
- Table::Vehicle => self.generate_vehicle().await?,
- Table::Driver => self.generate_driver().await?,
- Table::Customer => self.generate_customer().await?,
- Table::Trip => self.generate_trip().await?,
- Table::Building => self.generate_building().await?,
- Table::Zone => self.generate_zone().await?,
+ if table == Table::Zone {
+ self.generate_zone().await?
+ } else {
+ output_plan_generator.generate_plans(table, self.part,
self.parts)?;
}
}
+ let output_plans = output_plan_generator.build();
+
+ // force the creation of the distributions and text pool to so it
doesn't
+ // get charged to the first table
+ let start = Instant::now();
+ debug!("Creating distributions and text pool");
+ Distributions::static_default();
+ TextPool::get_or_init_default();
+ let elapsed = start.elapsed();
+ info!("Created static distributions and text pools in {elapsed:?}");
+ // Run
+ let runner = runner::PlanRunner::new(output_plans, self.num_threads);
+ runner.run().await?;
info!("Generation complete!");
Ok(())
}
async fn generate_zone(&self) -> io::Result<()> {
- match self.format {
- OutputFormat::Parquet => {
- let args = zone_df::ZoneDfArgs {
- scale_factor: 1.0f64.max(self.scale_factor),
- output_dir: self.output_dir.clone(),
- parts: self.parts.unwrap_or(1),
- part: self.part.unwrap_or(1),
- parquet_row_group_bytes: self.parquet_row_group_bytes,
- parquet_compression: self.parquet_compression,
- };
- zone_df::generate_zone_parquet(args)
- .await
- .map_err(io::Error::other)
- }
- _ => Err(io::Error::new(
- io::ErrorKind::InvalidInput,
- "Zone table is only supported in --format=parquet (via
DataFusion/S3).",
- )),
- }
- }
-
- define_generate!(
- generate_vehicle,
- Table::Vehicle,
- VehicleGenerator,
- VehicleTblSource,
- VehicleCsvSource,
- VehicleArrow
- );
- define_generate!(
- generate_driver,
- Table::Driver,
- DriverGenerator,
- DriverTblSource,
- DriverCsvSource,
- DriverArrow
- );
- define_generate!(
- generate_customer,
- Table::Customer,
- CustomerGenerator,
- CustomerTblSource,
- CustomerCsvSource,
- CustomerArrow
- );
- define_generate!(
- generate_trip,
- Table::Trip,
- TripGenerator,
- TripTblSource,
- TripCsvSource,
- TripArrow
- );
- define_generate!(
- generate_building,
- Table::Building,
- BuildingGenerator,
- BuildingTblSource,
- BuildingCsvSource,
- BuildingArrow
- );
-
- /// return the output filename for the given table
- fn output_filename(&self, table: Table) -> String {
- let extension = match self.format {
- OutputFormat::Tbl => "tbl",
- OutputFormat::Csv => "csv",
- OutputFormat::Parquet => "parquet",
+ let format = match self.format {
+ OutputFormat::Parquet => zone::main::OutputFormat::Parquet,
+ OutputFormat::Csv => zone::main::OutputFormat::Csv,
+ OutputFormat::Tbl => zone::main::OutputFormat::Tbl,
};
- format!("{}.{extension}", table.name())
- }
-
- /// return a file for writing the given filename in the output directory
- fn new_output_file(&self, filename: &str) -> io::Result<File> {
- let path = self.output_dir.join(filename);
- File::create(path)
- }
- /// Generates the output file from the sources
- async fn go<I>(&self, filename: &str, sources: I) -> Result<(), io::Error>
- where
- I: Iterator<Item: Source> + 'static,
- {
- // Since generate_in_chunks already buffers, there is no need to
buffer again
- if self.stdout {
- let sink = WriterSink::new(io::stdout());
- generate_in_chunks(sink, sources, self.num_threads).await
- } else {
- let sink = WriterSink::new(self.new_output_file(filename)?);
- generate_in_chunks(sink, sources, self.num_threads).await
- }
- }
-
- /// Generates an output parquet file from the sources
- async fn go_parquet<I>(&self, filename: &str, sources: I) -> Result<(),
io::Error>
- where
- I: Iterator<Item: RecordBatchIterator> + 'static,
- {
- if self.stdout {
- // write to stdout
- let writer = BufWriter::with_capacity(32 * 1024 * 1024,
io::stdout()); // 32MB buffer
- generate_parquet(writer, sources, self.num_threads,
self.parquet_compression).await
- } else {
- // write to a file
- let file = self.new_output_file(filename)?;
- let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); //
32MB buffer
- generate_parquet(writer, sources, self.num_threads,
self.parquet_compression).await
- }
+ zone::main::generate_zone(
+ format,
+ self.scale_factor,
+ self.output_dir.clone(),
+ self.parts,
+ self.part,
+ self.parquet_row_group_bytes,
+ self.parquet_compression,
+ )
+ .await
}
}
diff --git a/spatialbench-cli/src/output_plan.rs
b/spatialbench-cli/src/output_plan.rs
new file mode 100644
index 0000000..a80533b
--- /dev/null
+++ b/spatialbench-cli/src/output_plan.rs
@@ -0,0 +1,267 @@
+//! * [`OutputLocation`]: where to output the generated data
+//! * [`OutputPlan`]: an output file that will be generated
+//! * [`OutputPlanGenerator`]: plans the output files to be generated
+
+use crate::plan::GenerationPlan;
+use crate::{OutputFormat, Table};
+use log::debug;
+use parquet::basic::Compression;
+use std::collections::HashSet;
+use std::fmt::{Display, Formatter};
+use std::io;
+use std::path::PathBuf;
+
+/// Where a partition will be output
+#[derive(Debug, Clone, PartialEq)]
+pub enum OutputLocation {
+ /// Output to a file
+ File(PathBuf),
+ /// Output to stdout
+ Stdout,
+}
+
+impl Display for OutputLocation {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ OutputLocation::File(path) => {
+ let Some(file) = path.file_name() else {
+ return write!(f, "{}", path.display());
+ };
+ // Display the file name only, not the full path
+ write!(f, "{}", file.to_string_lossy())
+ }
+ OutputLocation::Stdout => write!(f, "Stdout"),
+ }
+ }
+}
+
+/// Describes an output partition (file) that will be generated
+#[derive(Debug, Clone, PartialEq)]
+pub struct OutputPlan {
+ /// The table
+ table: Table,
+ /// The scale factor
+ scale_factor: f64,
+ /// The output format (TODO don't depend back on something in main)
+ output_format: OutputFormat,
+ /// If the output is parquet, what compression level to use
+ parquet_compression: Compression,
+ /// Where to output
+ output_location: OutputLocation,
+ /// Plan for generating the table
+ generation_plan: GenerationPlan,
+}
+
+impl OutputPlan {
+ pub fn new(
+ table: Table,
+ scale_factor: f64,
+ output_format: OutputFormat,
+ parquet_compression: Compression,
+ output_location: OutputLocation,
+ generation_plan: GenerationPlan,
+ ) -> Self {
+ Self {
+ table,
+ scale_factor,
+ output_format,
+ parquet_compression,
+ output_location,
+ generation_plan,
+ }
+ }
+
+ /// Return the table this partition is for
+ pub fn table(&self) -> Table {
+ self.table
+ }
+
+ /// Return the scale factor for this partition
+ pub fn scale_factor(&self) -> f64 {
+ self.scale_factor
+ }
+
+ /// Return the output format for this partition
+ pub fn output_format(&self) -> OutputFormat {
+ self.output_format
+ }
+
+ /// return the output location
+ pub fn output_location(&self) -> &OutputLocation {
+ &self.output_location
+ }
+
+ /// Return the parquet compression level for this partition
+ pub fn parquet_compression(&self) -> Compression {
+ self.parquet_compression
+ }
+
+ /// Return the number of chunks part(ition) count (the number of data
chunks
+ /// in the underlying generation plan)
+ pub fn chunk_count(&self) -> usize {
+ self.generation_plan.chunk_count()
+ }
+
+ /// return the generation plan for this partition
+ pub fn generation_plan(&self) -> &GenerationPlan {
+ &self.generation_plan
+ }
+}
+
+impl Display for OutputPlan {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "table {} (SF={}, {} chunks) to {}",
+ self.table,
+ self.scale_factor,
+ self.chunk_count(),
+ self.output_location
+ )
+ }
+}
+
+/// Plans the creation of output files
+pub struct OutputPlanGenerator {
+ format: OutputFormat,
+ scale_factor: f64,
+ parquet_compression: Compression,
+ parquet_row_group_bytes: i64,
+ stdout: bool,
+ output_dir: PathBuf,
+ /// The generated output plans
+ output_plans: Vec<OutputPlan>,
+ /// Output directories that have been created so far
+ /// (used to avoid creating the same directory multiple times)
+ created_directories: HashSet<PathBuf>,
+}
+
+impl OutputPlanGenerator {
+ pub fn new(
+ format: OutputFormat,
+ scale_factor: f64,
+ parquet_compression: Compression,
+ parquet_row_group_bytes: i64,
+ stdout: bool,
+ output_dir: PathBuf,
+ ) -> Self {
+ Self {
+ format,
+ scale_factor,
+ parquet_compression,
+ parquet_row_group_bytes,
+ stdout,
+ output_dir,
+ output_plans: Vec::new(),
+ created_directories: HashSet::new(),
+ }
+ }
+
+ /// Generate the output plans for the given table and partition options
+ pub fn generate_plans(
+ &mut self,
+ table: Table,
+ cli_part: Option<i32>,
+ cli_part_count: Option<i32>,
+ ) -> io::Result<()> {
+ // If the user specified only a part count, automatically create all
+ // partitions for the table
+ if let (None, Some(part_count)) = (cli_part, cli_part_count) {
+ if GenerationPlan::partitioned_table(table) {
+ debug!("Generating all partitions for table {table} with part
count {part_count}");
+ for part in 1..=part_count {
+ self.generate_plan_inner(table, Some(part),
Some(part_count))?;
+ }
+ } else {
+ // there is only one partition for this table (e.g nation or
region)
+ debug!("Generating single partition for table {table}");
+ self.generate_plan_inner(table, Some(1), Some(1))?;
+ }
+ } else {
+ self.generate_plan_inner(table, cli_part, cli_part_count)?;
+ }
+ Ok(())
+ }
+
+ fn generate_plan_inner(
+ &mut self,
+ table: Table,
+ cli_part: Option<i32>,
+ cli_part_count: Option<i32>,
+ ) -> io::Result<()> {
+ let generation_plan = GenerationPlan::try_new(
+ table,
+ self.format,
+ self.scale_factor,
+ cli_part,
+ cli_part_count,
+ self.parquet_row_group_bytes,
+ )
+ .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
+
+ let output_location = self.output_location(table, cli_part)?;
+
+ let plan = OutputPlan::new(
+ table,
+ self.scale_factor,
+ self.format,
+ self.parquet_compression,
+ output_location,
+ generation_plan,
+ );
+
+ self.output_plans.push(plan);
+ Ok(())
+ }
+
+ /// Return the output location for the given table
+ ///
+ /// * if part of is None, the output location is
`{output_dir}/{table}.{extension}`
+ ///
+ /// * if part is Some(part), then the output location
+ /// will be `{output_dir}/{table}/{table}table.{part}.{extension}`
+ /// (e.g. orders/orders.1.tbl, orders/orders.2.tbl, etc.)
+ fn output_location(&mut self, table: Table, part: Option<i32>) ->
io::Result<OutputLocation> {
+ if self.stdout {
+ Ok(OutputLocation::Stdout)
+ } else {
+ let extension = match self.format {
+ OutputFormat::Tbl => "tbl",
+ OutputFormat::Csv => "csv",
+ OutputFormat::Parquet => "parquet",
+ };
+
+ let mut output_path = self.output_dir.clone();
+ if let Some(part) = part {
+ // If a partition is specified, create a subdirectory for it
+ output_path.push(table.to_string());
+ self.ensure_directory_exists(&output_path)?;
+ output_path.push(format!("{table}.{part}.{extension}"));
+ } else {
+ // No partition specified, output to a single file
+ output_path.push(format!("{table}.{extension}"));
+ }
+ Ok(OutputLocation::File(output_path))
+ }
+ }
+
+ /// Ensure the output directory exists, creating it if necessary
+ fn ensure_directory_exists(&mut self, dir: &PathBuf) -> io::Result<()> {
+ if self.created_directories.contains(dir) {
+ return Ok(());
+ }
+ std::fs::create_dir_all(dir).map_err(|e| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!("Error creating directory {}: {}", dir.display(), e),
+ )
+ })?;
+ self.created_directories.insert(dir.clone());
+ Ok(())
+ }
+
+ /// Return the output plans generated so far
+ pub fn build(self) -> Vec<OutputPlan> {
+ self.output_plans
+ }
+}
diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs
index 809814f..b95c053 100644
--- a/spatialbench-cli/src/plan.rs
+++ b/spatialbench-cli/src/plan.rs
@@ -1,4 +1,4 @@
-//! [`GenerationPlan`] that describes how to generate a Spatial Bench dataset.
+//! * [`GenerationPlan`]: how to generate a specific Spatial Bench dataset.
use crate::{OutputFormat, Table};
use log::debug;
@@ -8,7 +8,8 @@ use spatialbench::generators::{
use std::fmt::Display;
use std::ops::RangeInclusive;
-/// A list of generator "parts" (data generator chunks, not Spatial Bench
parts)
+/// A list of generator "parts" (data generator chunks, not TPCH parts) for a
+/// single output file.
///
/// Controls the parallelization and layout of Parquet files in
`spatialbench-cli`.
///
@@ -49,7 +50,7 @@ use std::ops::RangeInclusive;
/// let results = plan.into_iter().collect::<Vec<_>>();
/// /// assert_eq!(results.len(), 1);
/// ```
-#[derive(Debug)]
+#[derive(Debug, Clone, PartialEq)]
pub struct GenerationPlan {
/// Total number of parts to generate
part_count: i32,
@@ -67,7 +68,7 @@ impl GenerationPlan {
/// * `cli_part_count`: optional total number of parts, `--parts` CLI
argument
/// * `parquet_row_group_size`: optional parquet row group size,
`--parquet-row-group-size` CLI argument
pub fn try_new(
- table: &Table,
+ table: Table,
format: OutputFormat,
scale_factor: f64,
cli_part: Option<i32>,
@@ -100,11 +101,17 @@ impl GenerationPlan {
}
}
+ /// Return true if the tables is unpartitionable (not parameterized by part
+ /// count)
+ pub fn partitioned_table(table: Table) -> bool {
+ table != Table::Vehicle && table != Table::Driver && table !=
Table::Building
+ }
+
/// Returns a new `GenerationPlan` when partitioning
///
/// See [`GenerationPlan::try_new`] for argument documentation.
fn try_new_with_parts(
- table: &Table,
+ table: Table,
format: OutputFormat,
scale_factor: f64,
cli_part: i32,
@@ -128,7 +135,7 @@ impl GenerationPlan {
// These tables are so small they are not parameterized by part count,
// so only a single part.
- if table == &Table::Vehicle || table == &Table::Driver {
+ if !Self::partitioned_table(table) {
return Ok(Self {
part_count: 1,
part_list: 1..=1,
@@ -169,7 +176,7 @@ impl GenerationPlan {
/// Returns a new `GenerationPlan` when no partitioning is specified on
the command line
fn try_new_without_parts(
- table: &Table,
+ table: Table,
format: OutputFormat,
scale_factor: f64,
parquet_row_group_bytes: i64,
@@ -182,6 +189,11 @@ impl GenerationPlan {
part_list: 1..=num_parts,
})
}
+
+ /// Return the number of part(ititions) this plan will generate
+ pub fn chunk_count(&self) -> usize {
+ self.part_list.clone().count()
+ }
}
/// Converts the `GenerationPlan` into an iterator of (part_number, num_parts)
@@ -218,7 +230,7 @@ struct OutputSize {
impl OutputSize {
pub fn new(
- table: &Table,
+ table: Table,
scale_factor: f64,
format: OutputFormat,
parquet_row_group_bytes: i64,
@@ -320,7 +332,7 @@ impl OutputSize {
}
}
- fn row_count_for_table(table: &Table, scale_factor: f64) -> i64 {
+ fn row_count_for_table(table: Table, scale_factor: f64) -> i64 {
//let (avg_row_size_bytes, row_count) = match table {
match table {
Table::Vehicle =>
VehicleGenerator::calculate_row_count(scale_factor, 1, 1),
@@ -514,7 +526,7 @@ mod tests {
.with_cli_part(1)
.with_cli_part_count(10)
// we expect there is still only one part
- .assert(10, 1..=1)
+ .assert(1, 1..=1)
}
// #[test]
@@ -744,7 +756,7 @@ mod tests {
/// expected number of parts and part numbers.
fn assert(self, expected_part_count: i32, expected_part_numbers:
RangeInclusive<i32>) {
let plan = GenerationPlan::try_new(
- &self.table,
+ self.table,
self.format,
self.scale_factor,
self.cli_part,
@@ -759,7 +771,7 @@ mod tests {
/// Assert that creating a [`GenerationPlan`] returns the specified
error
fn assert_err(self, expected_error: &str) {
let actual_error = GenerationPlan::try_new(
- &self.table,
+ self.table,
self.format,
self.scale_factor,
self.cli_part,
diff --git a/spatialbench-cli/src/runner.rs b/spatialbench-cli/src/runner.rs
new file mode 100644
index 0000000..882d1cf
--- /dev/null
+++ b/spatialbench-cli/src/runner.rs
@@ -0,0 +1,355 @@
+//! [`PlanRunner`] for running [`OutputPlan`]s.
+
+use crate::csv::*;
+use crate::generate::{generate_in_chunks, Source};
+use crate::output_plan::{OutputLocation, OutputPlan};
+use crate::parquet::generate_parquet;
+use crate::tbl::*;
+use crate::{OutputFormat, Table, WriterSink};
+use log::{debug, info};
+use spatialbench::generators::{
+ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator,
VehicleGenerator,
+};
+use spatialbench_arrow::{
+ BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow,
VehicleArrow,
+};
+use std::io;
+use std::io::BufWriter;
+use tokio::task::{JoinError, JoinSet};
+
+/// Runs multiple [`OutputPlan`]s in parallel, managing the number of threads
+/// used to run them.
+#[derive(Debug)]
+pub struct PlanRunner {
+ plans: Vec<OutputPlan>,
+ num_threads: usize,
+}
+
+impl PlanRunner {
+ /// Create a new [`PlanRunner`] with the given plans and number of threads.
+ pub fn new(plans: Vec<OutputPlan>, num_threads: usize) -> Self {
+ Self { plans, num_threads }
+ }
+
+ /// Run all the plans in the runner.
+ pub async fn run(self) -> Result<(), io::Error> {
+ debug!(
+ "Running {} plans with {} threads...",
+ self.plans.len(),
+ self.num_threads
+ );
+ let Self {
+ mut plans,
+ num_threads,
+ } = self;
+
+ // Sort the plans by the number of parts so the largest are first
+ plans.sort_unstable_by(|a, b| {
+ let a_cnt = a.chunk_count();
+ let b_cnt = b.chunk_count();
+ a_cnt.cmp(&b_cnt)
+ });
+
+ // Do the actual work in parallel, using a worker queue
+ let mut worker_queue = WorkerQueue::new(num_threads);
+ while let Some(plan) = plans.pop() {
+ worker_queue.schedule_plan(plan).await?;
+ }
+ worker_queue.join_all().await
+ }
+}
+
+/// Manages worker tasks, limiting the number of total outstanding threads
+/// to some fixed number
+///
+/// The runner executes each plan with a number of threads equal to the
+/// number of parts in the plan, but no more than the total number of
+/// threads specified when creating the runner. If a plan does not need all
+/// the threads, the remaining threads are used to run other plans.
+///
+/// This is important to keep all cores busy for smaller tables that may not
+/// have sufficient parts to keep all threads busy (see [`GenerationPlan`]
+/// for more details), but not schedule more tasks than we have threads for.
+///
+/// Scheduling too many tasks requires more memory and leads to context
+/// switching overhead, which can slow down the generation process.
+///
+/// [`GenerationPlan`]: crate::plan::GenerationPlan
+struct WorkerQueue {
+ join_set: JoinSet<io::Result<usize>>,
+ /// Current number of threads available to commit
+ available_threads: usize,
+}
+
+impl WorkerQueue {
+ pub fn new(max_threads: usize) -> Self {
+ assert!(max_threads > 0);
+ Self {
+ join_set: JoinSet::new(),
+ available_threads: max_threads,
+ }
+ }
+
+ /// Spawns a task to run the plan with as many threads as possible
+ /// without exceeding the maximum number of threads.
+ ///
+ /// If there are no threads available, it will wait for one to finish
+ /// before spawning the new task.
+ ///
+ /// Note this algorithm does not guarantee that all threads are always
busy,
+ /// but it should be good enough for most cases. For best thread
utilization
+ /// spawn the largest plans first.
+ pub async fn schedule_plan(&mut self, plan: OutputPlan) -> io::Result<()> {
+ debug!("scheduling plan {plan}");
+ loop {
+ if self.available_threads == 0 {
+ debug!("no threads left, wait for one to finish");
+ let Some(result) = self.join_set.join_next().await else {
+ return Err(io::Error::other(
+ "Internal Error No more tasks to wait for, but had no
threads",
+ ));
+ };
+ self.available_threads += task_result(result)?;
+ continue; // look for threads again
+ }
+
+ // Check for any other jobs done so we can reuse their threads
+ if let Some(result) = self.join_set.try_join_next() {
+ self.available_threads += task_result(result)?;
+ continue;
+ }
+
+ debug_assert!(
+ self.available_threads > 0,
+ "should have at least one thread to continue"
+ );
+
+ // figure out how many threads to allocate to this plan. Each plan
+ // can use up to `part_count` threads.
+ let chunk_count = plan.chunk_count();
+
+ let num_plan_threads = self.available_threads.min(chunk_count);
+
+ // run the plan in a separate task, which returns the number of
threads it used
+ debug!("Spawning plan {plan} with {num_plan_threads} threads");
+
+ self.join_set
+ .spawn(async move { run_plan(plan, num_plan_threads).await });
+ self.available_threads -= num_plan_threads;
+ return Ok(());
+ }
+ }
+
+ // Wait for all tasks to finish
+ pub async fn join_all(mut self) -> io::Result<()> {
+ debug!("Waiting for tasks to finish...");
+ while let Some(result) = self.join_set.join_next().await {
+ task_result(result)?;
+ }
+ debug!("Tasks finished.");
+ Ok(())
+ }
+}
+
+/// unwraps the result of a task and converts it to an `io::Result<T>`.
+fn task_result<T>(result: Result<io::Result<T>, JoinError>) -> io::Result<T> {
+ result.map_err(|e| io::Error::other(format!("Task Panic: {e}")))?
+}
+
+/// Run a single [`OutputPlan`]
+async fn run_plan(plan: OutputPlan, num_threads: usize) -> io::Result<usize> {
+ match plan.table() {
+ Table::Building => run_building_plan(plan, num_threads).await,
+ Table::Vehicle => run_vehicle_plan(plan, num_threads).await,
+ Table::Driver => run_driver_plan(plan, num_threads).await,
+ Table::Customer => run_customer_plan(plan, num_threads).await,
+ Table::Trip => run_trip_plan(plan, num_threads).await,
+ Table::Zone => todo!("Zone table is not supported in PlanRunner"),
+ }
+}
+
+/// Writes a CSV/TSV output from the sources
+async fn write_file<I>(plan: OutputPlan, num_threads: usize, sources: I) ->
Result<(), io::Error>
+where
+ I: Iterator<Item: Source> + 'static,
+{
+ // Since generate_in_chunks already buffers, there is no need to buffer
+ // again (aka don't use BufWriter here)
+ match plan.output_location() {
+ OutputLocation::Stdout => {
+ let sink = WriterSink::new(io::stdout());
+ generate_in_chunks(sink, sources, num_threads).await
+ }
+ OutputLocation::File(path) => {
+ // if the output already exists, skip running
+ if path.exists() {
+ info!("{} already exists, skipping generation",
path.display());
+ return Ok(());
+ }
+ // write to a temp file and then rename to avoid partial files
+ let temp_path = path.with_extension("inprogress");
+ let file = std::fs::File::create(&temp_path).map_err(|err| {
+ io::Error::other(format!("Failed to create {temp_path:?}:
{err}"))
+ })?;
+ let sink = WriterSink::new(file);
+ generate_in_chunks(sink, sources, num_threads).await?;
+ // rename the temp file to the final path
+ std::fs::rename(&temp_path, path).map_err(|e| {
+ io::Error::other(format!(
+ "Failed to rename {temp_path:?} to {path:?} file: {e}"
+ ))
+ })?;
+ Ok(())
+ }
+ }
+}
+
+/// Generates an output parquet file from the sources
+async fn write_parquet<I>(plan: OutputPlan, num_threads: usize, sources: I) ->
Result<(), io::Error>
+where
+ I: Iterator<Item: RecordBatchIterator> + 'static,
+{
+ match plan.output_location() {
+ OutputLocation::Stdout => {
+ let writer = BufWriter::with_capacity(32 * 1024 * 1024,
io::stdout()); // 32MB buffer
+ generate_parquet(writer, sources, num_threads,
plan.parquet_compression()).await
+ }
+ OutputLocation::File(path) => {
+ // if the output already exists, skip running
+ if path.exists() {
+ info!("{} already exists, skipping generation",
path.display());
+ return Ok(());
+ }
+ // write to a temp file and then rename to avoid partial files
+ let temp_path = path.with_extension("inprogress");
+ let file = std::fs::File::create(&temp_path).map_err(|err| {
+ io::Error::other(format!("Failed to create {temp_path:?}:
{err}"))
+ })?;
+ let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); //
32MB buffer
+ generate_parquet(writer, sources, num_threads,
plan.parquet_compression()).await?;
+ // rename the temp file to the final path
+ std::fs::rename(&temp_path, path).map_err(|e| {
+ io::Error::other(format!(
+ "Failed to rename {temp_path:?} to {path:?} file: {e}"
+ ))
+ })?;
+ Ok(())
+ }
+ }
+}
+
+/// macro to create a function for generating a part of a particular able
+///
+/// Arguments:
+/// $FUN_NAME: name of the function to create
+/// $GENERATOR: The generator type to use
+/// $TBL_SOURCE: The [`Source`] type to use for TBL format
+/// $CSV_SOURCE: The [`Source`] type to use for CSV format
+/// $PARQUET_SOURCE: The [`RecordBatchIterator`] type to use for Parquet format
+macro_rules! define_run {
+ ($FUN_NAME:ident, $GENERATOR:ident, $TBL_SOURCE:ty, $CSV_SOURCE:ty,
$PARQUET_SOURCE:ty) => {
+ async fn $FUN_NAME(plan: OutputPlan, num_threads: usize) ->
io::Result<usize> {
+ use crate::GenerationPlan;
+ let scale_factor = plan.scale_factor();
+ info!("Writing {plan} using {num_threads} threads");
+
+ /// These interior functions are used to tell the compiler that
the lifetime is 'static
+ /// (when these were closures, the compiler could not figure out
the lifetime) and
+ /// resulted in errors like this:
+ /// let _ = join_set.spawn(async move {
+ /// | _____________________^
+ /// 96 | | run_plan(plan,
num_plan_threads).await
+ /// 97 | | });
+ /// | |______________^ implementation of `FnOnce`
is not general enough
+ fn tbl_sources(
+ generation_plan: &GenerationPlan,
+ scale_factor: f64,
+ ) -> impl Iterator<Item: Source> + 'static {
+ generation_plan
+ .clone()
+ .into_iter()
+ .map(move |(part, num_parts)|
$GENERATOR::new(scale_factor, part, num_parts))
+ .map(<$TBL_SOURCE>::new)
+ }
+
+ fn csv_sources(
+ generation_plan: &GenerationPlan,
+ scale_factor: f64,
+ ) -> impl Iterator<Item: Source> + 'static {
+ generation_plan
+ .clone()
+ .into_iter()
+ .map(move |(part, num_parts)|
$GENERATOR::new(scale_factor, part, num_parts))
+ .map(<$CSV_SOURCE>::new)
+ }
+
+ fn parquet_sources(
+ generation_plan: &GenerationPlan,
+ scale_factor: f64,
+ ) -> impl Iterator<Item: RecordBatchIterator> + 'static {
+ generation_plan
+ .clone()
+ .into_iter()
+ .map(move |(part, num_parts)|
$GENERATOR::new(scale_factor, part, num_parts))
+ .map(<$PARQUET_SOURCE>::new)
+ }
+
+ // Dispach to the appropriate output format
+ match plan.output_format() {
+ OutputFormat::Tbl => {
+ let gens = tbl_sources(plan.generation_plan(),
scale_factor);
+ write_file(plan, num_threads, gens).await?
+ }
+ OutputFormat::Csv => {
+ let gens = csv_sources(plan.generation_plan(),
scale_factor);
+ write_file(plan, num_threads, gens).await?
+ }
+ OutputFormat::Parquet => {
+ let gens = parquet_sources(plan.generation_plan(),
scale_factor);
+ write_parquet(plan, num_threads, gens).await?
+ }
+ };
+ Ok(num_threads)
+ }
+ };
+}
+
+define_run!(
+ run_trip_plan,
+ TripGenerator,
+ TripTblSource,
+ TripCsvSource,
+ TripArrow
+);
+
+define_run!(
+ run_building_plan,
+ BuildingGenerator,
+ BuildingTblSource,
+ BuildingCsvSource,
+ BuildingArrow
+);
+
+define_run!(
+ run_vehicle_plan,
+ VehicleGenerator,
+ VehicleTblSource,
+ VehicleCsvSource,
+ VehicleArrow
+);
+
+define_run!(
+ run_driver_plan,
+ DriverGenerator,
+ DriverTblSource,
+ DriverCsvSource,
+ DriverArrow
+);
+
+define_run!(
+ run_customer_plan,
+ CustomerGenerator,
+ CustomerTblSource,
+ CustomerCsvSource,
+ CustomerArrow
+);
diff --git a/spatialbench-cli/src/zone/config.rs
b/spatialbench-cli/src/zone/config.rs
new file mode 100644
index 0000000..0bbfa32
--- /dev/null
+++ b/spatialbench-cli/src/zone/config.rs
@@ -0,0 +1,55 @@
+use anyhow::{anyhow, Result};
+use parquet::basic::Compression as ParquetCompression;
+use std::path::PathBuf;
+
+#[derive(Clone)]
+pub struct ZoneDfArgs {
+ pub scale_factor: f64,
+ pub output_dir: PathBuf,
+ pub parts: i32,
+ pub part: i32,
+ pub parquet_row_group_bytes: i64,
+ pub parquet_compression: ParquetCompression,
+}
+
+impl ZoneDfArgs {
+ pub fn new(
+ scale_factor: f64,
+ output_dir: PathBuf,
+ parts: i32,
+ part: i32,
+ parquet_row_group_bytes: i64,
+ parquet_compression: ParquetCompression,
+ ) -> Self {
+ Self {
+ scale_factor,
+ output_dir,
+ parts,
+ part,
+ parquet_row_group_bytes,
+ parquet_compression,
+ }
+ }
+
+ pub fn validate(&self) -> Result<()> {
+ if self.part < 1 || self.part > self.parts {
+ return Err(anyhow!(
+ "Invalid --part={} for --parts={}",
+ self.part,
+ self.parts
+ ));
+ }
+ Ok(())
+ }
+
+ pub fn output_filename(&self) -> PathBuf {
+ if self.parts > 1 {
+ // Create zone subdirectory and write parts within it
+ self.output_dir
+ .join("zone")
+ .join(format!("zone.{}.parquet", self.part))
+ } else {
+ self.output_dir.join("zone.parquet")
+ }
+ }
+}
diff --git a/spatialbench-cli/src/zone/datasource.rs
b/spatialbench-cli/src/zone/datasource.rs
new file mode 100644
index 0000000..2d61b87
--- /dev/null
+++ b/spatialbench-cli/src/zone/datasource.rs
@@ -0,0 +1,95 @@
+use anyhow::Result;
+use datafusion::{
+ common::config::ConfigOptions,
+ execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
+ prelude::*,
+};
+use log::{debug, info};
+use object_store::http::HttpBuilder;
+use std::sync::Arc;
+use url::Url;
+
+use super::stats::ZoneTableStats;
+
+const OVERTURE_RELEASE_DATE: &str = "2025-08-20.1";
+const HUGGINGFACE_URL: &str = "https://huggingface.co";
+const COMMIT_HASH: &str = "67822daa2fbc0039681922f0d7fea4157f41d13f";
+const PARQUET_PART_COUNT: usize = 4;
+const PARQUET_UUID: &str = "c998b093-fa14-440c-98f0-bbdb2126ed22";
+
+pub struct ZoneDataSource {
+ runtime: Arc<RuntimeEnv>,
+}
+
+impl ZoneDataSource {
+ pub async fn new() -> Result<Self> {
+ let rt = Arc::new(RuntimeEnvBuilder::new().build()?);
+
+ let hf_store = HttpBuilder::new().with_url(HUGGINGFACE_URL).build()?;
+ let hf_url = Url::parse(HUGGINGFACE_URL)?;
+ rt.register_object_store(&hf_url, Arc::new(hf_store));
+
+ debug!("Registered HTTPS object store for huggingface.co");
+
+ Ok(Self { runtime: rt })
+ }
+
+ pub fn create_context(&self) -> Result<SessionContext> {
+ let mut cfg = ConfigOptions::new();
+
+ // Avoid parallelism to ensure ordering of source data
+ cfg.execution.target_partitions = 1;
+
+ let ctx =
+ SessionContext::new_with_config_rt(SessionConfig::from(cfg),
Arc::clone(&self.runtime));
+
+ debug!("Created DataFusion session context");
+ Ok(ctx)
+ }
+
+ pub async fn load_zone_data(
+ &self,
+ ctx: &SessionContext,
+ scale_factor: f64,
+ ) -> Result<DataFrame> {
+ let parquet_urls = self.generate_parquet_urls();
+ info!(
+ "Reading {} Parquet parts from Hugging Face...",
+ parquet_urls.len()
+ );
+
+ let df = ctx
+ .read_parquet(parquet_urls, ParquetReadOptions::default())
+ .await?;
+
+ let stats = ZoneTableStats::new(scale_factor, 1);
+ let subtypes = stats.subtypes();
+
+ info!("Selected subtypes for SF {}: {:?}", scale_factor, subtypes);
+
+ let mut pred = col("subtype").eq(lit("__never__"));
+ for s in subtypes {
+ pred = pred.or(col("subtype").eq(lit(s)));
+ }
+
+ let df = df.filter(pred.and(col("is_land").eq(lit(true))))?;
+ info!("Applied subtype and is_land filters");
+
+ // Sort by 'id' to ensure deterministic ordering regardless of
parallelism
+ // let df = df.sort(vec![col("id").sort(true, false)])?;
+ // info!("Sorted by id for deterministic ordering");
+
+ Ok(df)
+ }
+
+ fn generate_parquet_urls(&self) -> Vec<String> {
+ (0..PARQUET_PART_COUNT)
+ .map(|i| {
+ format!(
+
"https://huggingface.co/datasets/apache-sedona/spatialbench/resolve/{}/omf-division-area-{}/part-{:05}-{}-c000.zstd.parquet",
+ COMMIT_HASH, OVERTURE_RELEASE_DATE, i, PARQUET_UUID
+ )
+ })
+ .collect()
+ }
+}
diff --git a/spatialbench-cli/src/zone/main.rs
b/spatialbench-cli/src/zone/main.rs
new file mode 100644
index 0000000..5afd899
--- /dev/null
+++ b/spatialbench-cli/src/zone/main.rs
@@ -0,0 +1,64 @@
+use log::info;
+use parquet::basic::Compression as ParquetCompression;
+use std::io;
+use std::path::PathBuf;
+
+use super::config::ZoneDfArgs;
+
+/// Generates zone table in the requested format
+pub async fn generate_zone(
+ format: OutputFormat,
+ scale_factor: f64,
+ output_dir: PathBuf,
+ parts: Option<i32>,
+ part: Option<i32>,
+ parquet_row_group_bytes: i64,
+ parquet_compression: ParquetCompression,
+) -> io::Result<()> {
+ match format {
+ OutputFormat::Parquet => {
+ let parts = parts.unwrap_or(1);
+
+ if let Some(part_num) = part {
+ // Single part mode - use LIMIT/OFFSET
+ info!("Generating part {} of {} for zone table", part_num,
parts);
+ let args = ZoneDfArgs::new(
+ 1.0f64.max(scale_factor),
+ output_dir,
+ parts,
+ part_num,
+ parquet_row_group_bytes,
+ parquet_compression,
+ );
+ super::generate_zone_parquet_single(args)
+ .await
+ .map_err(io::Error::other)
+ } else {
+ // Multi-part mode - collect once and partition in memory
+ info!("Generating all {} part(s) for zone table", parts);
+ let args = ZoneDfArgs::new(
+ 1.0f64.max(scale_factor),
+ output_dir,
+ parts,
+ 1, // dummy value, not used in multi mode
+ parquet_row_group_bytes,
+ parquet_compression,
+ );
+ super::generate_zone_parquet_multi(args)
+ .await
+ .map_err(io::Error::other)
+ }
+ }
+ _ => Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "Zone table is only supported in --format=parquet.",
+ )),
+ }
+}
+
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum OutputFormat {
+ Tbl,
+ Csv,
+ Parquet,
+}
diff --git a/spatialbench-cli/src/zone/mod.rs b/spatialbench-cli/src/zone/mod.rs
new file mode 100644
index 0000000..f1b2f3c
--- /dev/null
+++ b/spatialbench-cli/src/zone/mod.rs
@@ -0,0 +1,88 @@
+//! Zone table generation module using DataFusion and remote Parquet files
+
+mod config;
+mod datasource;
+mod partition;
+mod stats;
+mod transform;
+mod writer;
+
+pub mod main;
+
+use anyhow::Result;
+use std::sync::Arc;
+
+pub use config::ZoneDfArgs;
+use datasource::ZoneDataSource;
+use partition::PartitionStrategy;
+use stats::ZoneTableStats;
+use transform::ZoneTransformer;
+use writer::ParquetWriter;
+
+/// Generate a single part using LIMIT/OFFSET on the dataframe
+pub async fn generate_zone_parquet_single(args: ZoneDfArgs) -> Result<()> {
+ args.validate()?;
+
+ let stats = ZoneTableStats::new(args.scale_factor, args.parts);
+ let datasource = ZoneDataSource::new().await?;
+ let ctx = datasource.create_context()?;
+
+ let df = datasource.load_zone_data(&ctx, args.scale_factor).await?;
+
+ let partition =
+ PartitionStrategy::calculate(stats.estimated_total_rows(), args.parts,
args.part);
+
+ let df = partition.apply_to_dataframe(df)?;
+
+ let transformer = ZoneTransformer::new(partition.offset());
+ let df = transformer.transform(&ctx, df).await?;
+
+ // Get schema before collecting (which moves df)
+ let schema = Arc::new(transformer.arrow_schema(&df)?);
+ let batches = df.collect().await?;
+
+ let writer = ParquetWriter::new(&args, &stats, schema);
+ writer.write(&batches)?;
+
+ Ok(())
+}
+
+/// Generate all parts by collecting once and partitioning in memory
+pub async fn generate_zone_parquet_multi(args: ZoneDfArgs) -> Result<()> {
+ let stats = ZoneTableStats::new(args.scale_factor, args.parts);
+ let datasource = ZoneDataSource::new().await?;
+ let ctx = datasource.create_context()?;
+
+ let df = datasource.load_zone_data(&ctx, args.scale_factor).await?;
+
+ // Transform without offset (we'll adjust per-part later)
+ let transformer = ZoneTransformer::new(0);
+ let df = transformer.transform(&ctx, df).await?;
+
+ // Collect once
+ let schema = Arc::new(transformer.arrow_schema(&df)?);
+ let batches = df.collect().await?;
+
+ // Calculate total rows
+ let total_rows: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
+
+ // Write each part
+ for part in 1..=args.parts {
+ let partition = PartitionStrategy::calculate(total_rows, args.parts,
part);
+ let partitioned_batches = partition.apply_to_batches(&batches)?;
+
+ let part_args = ZoneDfArgs::new(
+ args.scale_factor,
+ args.output_dir.clone(),
+ args.parts,
+ part,
+ args.parquet_row_group_bytes,
+ args.parquet_compression,
+ );
+
+ let writer = ParquetWriter::new(&part_args, &stats, schema.clone());
+ writer.write(&partitioned_batches)?;
+ }
+
+ Ok(())
+}
diff --git a/spatialbench-cli/src/zone/partition.rs
b/spatialbench-cli/src/zone/partition.rs
new file mode 100644
index 0000000..a27f656
--- /dev/null
+++ b/spatialbench-cli/src/zone/partition.rs
@@ -0,0 +1,94 @@
+use arrow_array::RecordBatch;
+use datafusion::prelude::*;
+use log::info;
+
+pub struct PartitionStrategy {
+ offset: i64,
+ limit: i64,
+}
+
+impl PartitionStrategy {
+ pub fn calculate(total_rows: i64, parts: i32, part: i32) -> Self {
+ let parts = parts as i64;
+ let i = (part as i64) - 1;
+
+ let base = total_rows / parts;
+ let rem = total_rows % parts;
+
+ let limit = base + if i < rem { 1 } else { 0 };
+ let offset = i * base + std::cmp::min(i, rem);
+
+ info!(
+ "Partition: total={}, parts={}, part={}, offset={}, limit={}",
+ total_rows, parts, part, offset, limit
+ );
+
+ Self { offset, limit }
+ }
+
+ pub fn offset(&self) -> i64 {
+ self.offset
+ }
+
+ pub fn apply_to_dataframe(&self, df: DataFrame) ->
datafusion::common::Result<DataFrame> {
+ df.limit(self.offset as usize, Some(self.limit as usize))
+ }
+
+ /// Apply partition to already-collected batches
+ pub fn apply_to_batches(&self, batches: &[RecordBatch]) ->
anyhow::Result<Vec<RecordBatch>> {
+ let mut result = Vec::new();
+ let mut current_offset = 0i64;
+ let end_offset = self.offset + self.limit;
+
+ for batch in batches {
+ let batch_rows = batch.num_rows() as i64;
+ let batch_end = current_offset + batch_rows;
+
+ if batch_end <= self.offset || current_offset >= end_offset {
+ current_offset = batch_end;
+ continue;
+ }
+
+ let start_in_batch =
(self.offset.saturating_sub(current_offset)).max(0) as usize;
+ let end_in_batch = ((end_offset - current_offset).min(batch_rows))
as usize;
+ let length = end_in_batch - start_in_batch;
+
+ if length > 0 {
+ let sliced = batch.slice(start_in_batch, length);
+ result.push(sliced);
+ }
+
+ current_offset = batch_end;
+ }
+
+ Ok(result)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_partition_distribution() {
+ let total_rows = 100i64;
+ let parts = 3;
+
+ let mut collected_rows = Vec::new();
+ let mut collected_offsets = Vec::new();
+
+ for part in 1..=parts {
+ let strategy = PartitionStrategy::calculate(total_rows, parts,
part);
+ collected_rows.push(strategy.limit);
+ collected_offsets.push(strategy.offset);
+ }
+
+ assert_eq!(collected_rows.iter().sum::<i64>(), total_rows);
+ assert_eq!(collected_offsets[0], 0);
+
+ for i in 1..parts as usize {
+ let expected_offset = collected_offsets[i - 1] + collected_rows[i
- 1];
+ assert_eq!(collected_offsets[i], expected_offset);
+ }
+ }
+}
diff --git a/spatialbench-cli/src/zone/stats.rs
b/spatialbench-cli/src/zone/stats.rs
new file mode 100644
index 0000000..ce5e5d5
--- /dev/null
+++ b/spatialbench-cli/src/zone/stats.rs
@@ -0,0 +1,161 @@
+use log::debug;
+
+const SUBTYPE_ROWS: &[(&str, i64)] = &[
+ ("microhood", 74797),
+ ("macrohood", 42619),
+ ("neighborhood", 298615),
+ ("county", 38679),
+ ("localadmin", 19007),
+ ("locality", 555834),
+ ("region", 3905),
+ ("dependency", 53),
+ ("country", 219),
+];
+
+pub struct ZoneTableStats {
+ scale_factor: f64,
+ size_gb: f64,
+ total_rows: i64,
+}
+
+impl ZoneTableStats {
+ pub fn new(scale_factor: f64, parts: i32) -> Self {
+ let (mut size_gb, mut total_rows) = Self::base_stats(scale_factor);
+
+ if scale_factor <= 1.0 && parts > 1 {
+ (size_gb, total_rows) = Self::base_stats(scale_factor / parts as
f64);
+ }
+
+ debug!(
+ "Stats: size_gb={}, total_rows={} for SF={}",
+ size_gb, total_rows, scale_factor
+ );
+
+ Self {
+ scale_factor,
+ size_gb,
+ total_rows,
+ }
+ }
+
+ fn base_stats(sf: f64) -> (f64, i64) {
+ if sf < 1.0 {
+ (0.92 * sf, (156_095.0 * sf).ceil() as i64)
+ } else if sf < 10.0 {
+ (1.42, 156_095)
+ } else if sf < 100.0 {
+ (2.09, 454_710)
+ } else if sf < 1000.0 {
+ (5.68, 1_033_456)
+ } else {
+ (6.13, 1_033_675)
+ }
+ }
+
+ pub fn subtypes(&self) -> Vec<&'static str> {
+ let mut v = vec!["microhood", "macrohood", "county"];
+ if self.scale_factor >= 10.0 {
+ v.push("neighborhood");
+ }
+ if self.scale_factor >= 100.0 {
+ v.extend_from_slice(&["localadmin", "locality", "region",
"dependency"]);
+ }
+ if self.scale_factor >= 1000.0 {
+ v.push("country");
+ }
+ v
+ }
+
+ pub fn estimated_total_rows(&self) -> i64 {
+ let mut total = 0i64;
+ for subtype in self.subtypes() {
+ total += SUBTYPE_ROWS
+ .iter()
+ .find(|(name, _)| *name == subtype)
+ .map(|(_, rows)| *rows)
+ .unwrap_or(0);
+ }
+
+ if self.scale_factor < 1.0 {
+ (total as f64 * self.scale_factor).ceil() as i64
+ } else {
+ total
+ }
+ }
+
+ pub fn compute_rows_per_group(&self, target_bytes: i64, default_bytes:
i64) -> usize {
+ let total_bytes = self.size_gb * 1024.0 * 1024.0 * 1024.0;
+ let bytes_per_row = total_bytes / self.total_rows as f64;
+
+ let effective_target = if target_bytes <= 0 {
+ default_bytes
+ } else {
+ target_bytes
+ };
+
+ debug!(
+ "Stats: {:.2} GB, {} rows, {:.2} bytes/row, target: {} bytes",
+ self.size_gb, self.total_rows, bytes_per_row, effective_target
+ );
+
+ let est = (effective_target as f64 / bytes_per_row).floor();
+ est.clamp(1000.0, 32767.0) as usize
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_subtypes_for_different_scale_factors() {
+ let sf_01_stats = ZoneTableStats::new(0.1, 1);
+ assert_eq!(
+ sf_01_stats.subtypes(),
+ vec!["microhood", "macrohood", "county"]
+ );
+
+ let sf_10_stats = ZoneTableStats::new(10.0, 1);
+ assert_eq!(
+ sf_10_stats.subtypes(),
+ vec!["microhood", "macrohood", "county", "neighborhood"]
+ );
+
+ let sf_100_stats = ZoneTableStats::new(100.0, 1);
+ assert!(sf_100_stats.subtypes().contains(&"localadmin"));
+ assert!(sf_100_stats.subtypes().contains(&"locality"));
+
+ let sf_1000_stats = ZoneTableStats::new(1000.0, 1);
+ assert!(sf_1000_stats.subtypes().contains(&"country"));
+ }
+
+ #[test]
+ fn test_rows_per_group_bounds() {
+ let stats = ZoneTableStats::new(1.0, 1);
+
+ let rows_per_group_tiny = stats.compute_rows_per_group(1_000_000, 128
* 1024 * 1024);
+ assert!(rows_per_group_tiny >= 1000);
+
+ let tiny_stats = ZoneTableStats {
+ scale_factor: 0.001,
+ size_gb: 1000.0,
+ total_rows: 1000,
+ };
+ let rows_per_group_huge = tiny_stats.compute_rows_per_group(1, 128 *
1024 * 1024);
+ assert!(rows_per_group_huge <= 32767);
+ }
+
+ #[test]
+ fn test_estimated_rows_scaling_consistency() {
+ let base_stats = ZoneTableStats::new(1.0, 1);
+ let half_stats = ZoneTableStats::new(0.5, 1);
+ let quarter_stats = ZoneTableStats::new(0.25, 1);
+
+ let base_rows = base_stats.estimated_total_rows() as f64;
+ let half_rows = half_stats.estimated_total_rows() as f64;
+ let quarter_rows = quarter_stats.estimated_total_rows() as f64;
+
+ assert!((half_rows - (base_rows * 0.5)).abs() < 1.0);
+ assert!((quarter_rows - (base_rows * 0.25)).abs() < 1.0);
+ }
+}
diff --git a/spatialbench-cli/src/zone/transform.rs
b/spatialbench-cli/src/zone/transform.rs
new file mode 100644
index 0000000..74d9365
--- /dev/null
+++ b/spatialbench-cli/src/zone/transform.rs
@@ -0,0 +1,50 @@
+use anyhow::Result;
+use arrow_schema::Schema;
+use datafusion::{prelude::*, sql::TableReference};
+use log::{debug, info};
+
+pub struct ZoneTransformer {
+ offset: i64,
+}
+
+impl ZoneTransformer {
+ pub fn new(offset: i64) -> Self {
+ Self { offset }
+ }
+
+ pub async fn transform(&self, ctx: &SessionContext, df: DataFrame) ->
Result<DataFrame> {
+ ctx.register_table(TableReference::bare("zone_filtered"),
df.into_view())?;
+ debug!("Registered filtered data as 'zone_filtered' table");
+
+ let sql = format!(
+ r#"
+ SELECT
+ CAST(ROW_NUMBER() OVER (ORDER BY id) + {} AS BIGINT) AS
z_zonekey,
+ COALESCE(id, '') AS z_gersid,
+ COALESCE(country, '') AS z_country,
+ COALESCE(region, '') AS z_region,
+ COALESCE(names.primary, '') AS z_name,
+ COALESCE(subtype, '') AS z_subtype,
+ geometry AS z_boundary
+ FROM zone_filtered
+ "#,
+ self.offset
+ );
+
+ debug!("Executing SQL transformation with offset: {}", self.offset);
+ let df = ctx.sql(&sql).await?;
+ info!("SQL transformation completed successfully");
+
+ Ok(df)
+ }
+
+ pub fn arrow_schema(&self, df: &DataFrame) -> Result<Schema> {
+ Ok(Schema::new(
+ df.schema()
+ .fields()
+ .iter()
+ .map(|f| f.as_ref().clone())
+ .collect::<Vec<_>>(),
+ ))
+ }
+}
diff --git a/spatialbench-cli/src/zone/writer.rs
b/spatialbench-cli/src/zone/writer.rs
new file mode 100644
index 0000000..7aff003
--- /dev/null
+++ b/spatialbench-cli/src/zone/writer.rs
@@ -0,0 +1,94 @@
+use anyhow::Result;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use log::{debug, info};
+use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
+use std::{path::PathBuf, sync::Arc, time::Instant};
+
+use super::config::ZoneDfArgs;
+use super::stats::ZoneTableStats;
+
+pub struct ParquetWriter {
+ output_path: PathBuf,
+ schema: SchemaRef,
+ props: WriterProperties,
+ args: ZoneDfArgs,
+}
+
+impl ParquetWriter {
+ pub fn new(args: &ZoneDfArgs, stats: &ZoneTableStats, schema: SchemaRef)
-> Self {
+ let rows_per_group =
+ stats.compute_rows_per_group(args.parquet_row_group_bytes, 128 *
1024 * 1024);
+
+ let props = WriterProperties::builder()
+ .set_compression(args.parquet_compression)
+ .set_max_row_group_size(rows_per_group)
+ .build();
+
+ debug!("Using row group size: {} rows", rows_per_group);
+
+ Self {
+ output_path: args.output_filename(),
+ schema,
+ props,
+ args: args.clone(),
+ }
+ }
+
+ pub fn write(&self, batches: &[RecordBatch]) -> Result<()> {
+ // Create parent directory of output file (handles both zone/
subdirectory and base dir)
+ let parent_dir = self
+ .output_path
+ .parent()
+ .ok_or_else(|| anyhow::anyhow!("Invalid output path: {:?}",
self.output_path))?;
+
+ std::fs::create_dir_all(parent_dir)?;
+ debug!("Created output directory: {:?}", parent_dir);
+
+ // Check if file already exists
+ if self.output_path.exists() {
+ info!(
+ "{} already exists, skipping generation",
+ self.output_path.display()
+ );
+ return Ok(());
+ }
+
+ // Write to temp file first
+ let temp_path = self.output_path.with_extension("inprogress");
+ let t0 = Instant::now();
+ let file = std::fs::File::create(&temp_path)?;
+ let mut writer =
+ ArrowWriter::try_new(file, Arc::clone(&self.schema),
Some(self.props.clone()))?;
+
+ for batch in batches {
+ writer.write(batch)?;
+ }
+
+ writer.close()?;
+
+ // Rename temp file to final output
+ std::fs::rename(&temp_path, &self.output_path).map_err(|e| {
+ anyhow::anyhow!(
+ "Failed to rename {:?} to {:?}: {}",
+ temp_path,
+ self.output_path,
+ e
+ )
+ })?;
+
+ let duration = t0.elapsed();
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+
+ info!(
+ "Zone -> {} (part {}/{}). write={:?}, total_rows={}",
+ self.output_path.display(),
+ self.args.part,
+ self.args.parts,
+ duration,
+ total_rows
+ );
+
+ Ok(())
+ }
+}
diff --git a/spatialbench-cli/src/zone_df.rs b/spatialbench-cli/src/zone_df.rs
deleted file mode 100644
index eb75805..0000000
--- a/spatialbench-cli/src/zone_df.rs
+++ /dev/null
@@ -1,503 +0,0 @@
-use std::{path::PathBuf, sync::Arc, time::Instant};
-
-use anyhow::{anyhow, Result};
-use arrow_array::RecordBatch;
-use arrow_schema::{Schema, SchemaRef};
-use datafusion::{
- common::config::ConfigOptions, execution::runtime_env::RuntimeEnvBuilder,
prelude::*,
- sql::TableReference,
-};
-
-use crate::plan::DEFAULT_PARQUET_ROW_GROUP_BYTES;
-use datafusion::execution::runtime_env::RuntimeEnv;
-use log::{debug, info};
-use object_store::http::HttpBuilder;
-use parquet::{
- arrow::ArrowWriter, basic::Compression as ParquetCompression,
- file::properties::WriterProperties,
-};
-use url::Url;
-
-const OVERTURE_RELEASE_DATE: &str = "2025-08-20.1";
-const HUGGINGFACE_URL: &str = "https://huggingface.co";
-const COMMIT_HASH: &str = "67822daa2fbc0039681922f0d7fea4157f41d13f";
-
-fn subtypes_for_scale_factor(sf: f64) -> Vec<&'static str> {
- let mut v = vec!["microhood", "macrohood", "county"];
- if sf >= 10.0 {
- v.push("neighborhood");
- }
- if sf >= 100.0 {
- v.extend_from_slice(&["localadmin", "locality", "region",
"dependency"]);
- }
- if sf >= 1000.0 {
- v.push("country");
- }
- v
-}
-
-fn estimated_total_rows_for_sf(sf: f64) -> i64 {
- let mut total = 0i64;
- for s in subtypes_for_scale_factor(sf) {
- total += match s {
- "microhood" => 74797,
- "macrohood" => 42619,
- "neighborhood" => 298615,
- "county" => 38679,
- "localadmin" => 19007,
- "locality" => 555834,
- "region" => 3905,
- "dependency" => 53,
- "country" => 219,
- _ => 0,
- };
- }
- if sf < 1.0 {
- (total as f64 * sf).ceil() as i64
- } else {
- total
- }
-}
-
-fn get_zone_table_stats(sf: f64) -> (f64, i64) {
- // Returns (size_in_gb, total_rows) for the given scale factor
- if sf < 1.0 {
- (0.92 * sf, (156_095.0 * sf).ceil() as i64)
- } else if sf < 10.0 {
- (1.42, 156_095)
- } else if sf < 100.0 {
- (2.09, 454_710)
- } else if sf < 1000.0 {
- (5.68, 1_033_456)
- } else {
- (6.13, 1_033_675)
- }
-}
-
-fn compute_rows_per_group_from_stats(size_gb: f64, total_rows: i64,
target_bytes: i64) -> usize {
- let total_bytes = size_gb * 1024.0 * 1024.0 * 1024.0; // Convert GB to
bytes
- let bytes_per_row = total_bytes / total_rows as f64;
-
- // Use default if target_bytes is not specified or invalid
- let effective_target = if target_bytes <= 0 {
- DEFAULT_PARQUET_ROW_GROUP_BYTES
- } else {
- target_bytes
- };
-
- debug!(
- "Using hardcoded stats: {:.2} GB, {} rows, {:.2} bytes/row, target: {}
bytes",
- size_gb, total_rows, bytes_per_row, effective_target
- );
-
- let est = (effective_target as f64 / bytes_per_row).floor();
- // Keep RG count <= 32k, but avoid too-tiny RGs
- est.clamp(1000.0, 32767.0) as usize
-}
-
-fn writer_props_with_rowgroup(comp: ParquetCompression, rows_per_group: usize)
-> WriterProperties {
- WriterProperties::builder()
- .set_compression(comp)
- .set_max_row_group_size(rows_per_group)
- .build()
-}
-
-fn write_parquet_with_rowgroup_bytes(
- out_path: &PathBuf,
- schema: SchemaRef,
- all_batches: Vec<RecordBatch>,
- target_rowgroup_bytes: i64,
- comp: ParquetCompression,
- scale_factor: f64,
- parts: i32,
-) -> Result<()> {
- let (mut size_gb, mut total_rows) = get_zone_table_stats(scale_factor);
-
- // Use linear scaling stats for SF <= 1.0 with parts > 1
- if scale_factor <= 1.0 && parts > 1 {
- (size_gb, total_rows) = get_zone_table_stats(scale_factor / parts as
f64);
- }
-
- debug!(
- "size_gb={}, total_rows={} for scale_factor={}",
- size_gb, total_rows, scale_factor
- );
- let rows_per_group =
- compute_rows_per_group_from_stats(size_gb, total_rows,
target_rowgroup_bytes);
- let props = writer_props_with_rowgroup(comp, rows_per_group);
-
- debug!(
- "Using row group size: {} rows (based on hardcoded stats)",
- rows_per_group
- );
-
- let mut writer = ArrowWriter::try_new(std::fs::File::create(out_path)?,
schema, Some(props))?;
-
- for batch in all_batches {
- writer.write(&batch)?;
- }
- writer.close()?;
- Ok(())
-}
-
-#[derive(Clone)]
-pub struct ZoneDfArgs {
- pub scale_factor: f64,
- pub output_dir: PathBuf,
- pub parts: i32,
- pub part: i32,
- pub parquet_row_group_bytes: i64,
- pub parquet_compression: ParquetCompression,
-}
-
-impl ZoneDfArgs {
- fn output_filename(&self) -> PathBuf {
- let filename = "zone.parquet".to_string();
- self.output_dir.join(filename)
- }
-}
-
-pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> {
- if args.part < 1 || args.part > args.parts {
- return Err(anyhow!(
- "Invalid --part={} for --parts={}",
- args.part,
- args.parts
- ));
- }
-
- info!(
- "Starting zone parquet generation with scale factor {}",
- args.scale_factor
- );
- debug!("Zone generation args: parts={}, part={}, output_dir={:?},
row_group_bytes={}, compression={:?}",
- args.parts, args.part, args.output_dir,
args.parquet_row_group_bytes, args.parquet_compression);
-
- let subtypes = subtypes_for_scale_factor(args.scale_factor);
- info!(
- "Selected subtypes for SF {}: {:?}",
- args.scale_factor, subtypes
- );
-
- let estimated_rows = estimated_total_rows_for_sf(args.scale_factor);
- info!(
- "Estimated total rows for SF {}: {}",
- args.scale_factor, estimated_rows
- );
-
- let mut cfg = ConfigOptions::new();
- cfg.execution.target_partitions = 1;
- debug!("Created DataFusion config with target_partitions=1");
-
- let rt: Arc<RuntimeEnv> = Arc::new(RuntimeEnvBuilder::new().build()?);
- debug!("Built DataFusion runtime environment");
-
- // Register HTTPS object store for Hugging Face
- let hf_store = HttpBuilder::new().with_url(HUGGINGFACE_URL).build()?;
- let hf_url = Url::parse(HUGGINGFACE_URL)?;
- rt.register_object_store(&hf_url, Arc::new(hf_store));
- debug!("Registered HTTPS object store for huggingface.co");
-
- let ctx = SessionContext::new_with_config_rt(SessionConfig::from(cfg), rt);
- debug!("Created DataFusion session context");
-
- // Parquet parts from Hugging Face (programmatically generated)
- const PARQUET_PART_COUNT: usize = 4;
- const PARQUET_UUID: &str = "c998b093-fa14-440c-98f0-bbdb2126ed22";
- let parquet_urls: Vec<String> = (0..PARQUET_PART_COUNT)
- .map(|i| format!(
-
"https://huggingface.co/datasets/apache-sedona/spatialbench/resolve/{}/omf-division-area-{}/part-{i:05}-{uuid}-c000.zstd.parquet",
- COMMIT_HASH,
- OVERTURE_RELEASE_DATE,
- i = i,
- uuid = PARQUET_UUID
- ))
- .collect();
-
- info!(
- "Reading {} Parquet parts from Hugging Face...",
- parquet_urls.len()
- );
-
- let t_read_start = Instant::now();
- let mut df = ctx
- .read_parquet(parquet_urls, ParquetReadOptions::default())
- .await?;
- let read_dur = t_read_start.elapsed();
- info!("Successfully read HF parquet data in {:?}", read_dur);
-
- // Build filter predicate
- debug!("Building filter predicate for subtypes: {:?}", subtypes);
- let mut pred = col("subtype").eq(lit("__never__"));
- for s in subtypes_for_scale_factor(args.scale_factor) {
- pred = pred.or(col("subtype").eq(lit(s)));
- }
- df = df.filter(pred.and(col("is_land").eq(lit(true))))?;
- info!("Applied subtype and is_land filters");
-
- // df = df.sort(vec![col("id").sort(true, true)])?;
- // debug!("Applied sorting by id");
-
- let total = estimated_total_rows_for_sf(args.scale_factor);
- let i = (args.part as i64) - 1; // 0-based part index
- let parts = args.parts as i64;
-
- let base = total / parts;
- let rem = total % parts;
-
- // first `rem` parts get one extra row
- let rows_this = base + if i < rem { 1 } else { 0 };
- let offset = i * base + std::cmp::min(i, rem);
-
- info!(
- "Partitioning data: total_rows={}, parts={}, base={}, rem={},
this_part_rows={}, offset={}",
- total, parts, base, rem, rows_this, offset
- );
-
- df = df.limit(offset as usize, Some(rows_this as usize))?;
- debug!("Applied limit with offset={}, rows={}", offset, rows_this);
-
- ctx.register_table(TableReference::bare("zone_filtered"), df.into_view())?;
- debug!("Registered filtered data as 'zone_filtered' table");
-
- let sql = format!(
- r#"
- SELECT
- CAST(ROW_NUMBER() OVER (ORDER BY id) + {offset} AS BIGINT) AS
z_zonekey,
- COALESCE(id, '') AS z_gersid,
- COALESCE(country, '') AS z_country,
- COALESCE(region, '') AS z_region,
- COALESCE(names.primary, '') AS z_name,
- COALESCE(subtype, '') AS z_subtype,
- geometry AS z_boundary
- FROM zone_filtered
- "#
- );
- debug!("Executing SQL transformation with offset: {}", offset);
- let df2 = ctx.sql(&sql).await?;
- info!("SQL transformation completed successfully");
-
- let t0 = Instant::now();
- info!("Starting data collection...");
- let batches = df2.clone().collect().await?;
- let collect_dur = t0.elapsed();
-
- let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
- info!(
- "Collected {} record batches with {} total rows in {:?}",
- batches.len(),
- total_rows,
- collect_dur
- );
-
- std::fs::create_dir_all(&args.output_dir)?;
- debug!("Created output directory: {:?}", args.output_dir);
-
- let out = args.output_filename();
- info!("Writing output to: {}", out.display());
-
- debug!(
- "Created parquet writer properties with compression: {:?}",
- args.parquet_compression
- );
-
- // Convert DFSchema to Arrow Schema
- let schema = Arc::new(Schema::new(
- df2.schema()
- .fields()
- .iter()
- .map(|f| f.as_ref().clone())
- .collect::<Vec<_>>(),
- ));
- debug!(
- "Converted DataFusion schema to Arrow schema with {} fields",
- schema.fields().len()
- );
-
- let t1 = Instant::now();
- info!(
- "Starting parquet file write with row group size: {} bytes",
- args.parquet_row_group_bytes
- );
- write_parquet_with_rowgroup_bytes(
- &out,
- schema,
- batches,
- args.parquet_row_group_bytes,
- args.parquet_compression,
- args.scale_factor,
- args.parts,
- )?;
- let write_dur = t1.elapsed();
-
- info!(
- "Zone -> {} (part {}/{}). collect={:?}, write={:?}, total_rows={}",
- out.display(),
- args.part,
- args.parts,
- collect_dur,
- write_dur,
- total_rows
- );
-
- Ok(())
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use parquet::basic::Compression;
- use tempfile::TempDir;
-
- fn create_test_args(scale_factor: f64, temp_dir: &TempDir) -> ZoneDfArgs {
- ZoneDfArgs {
- scale_factor,
- output_dir: temp_dir.path().to_path_buf(),
- parts: 1,
- part: 1,
- parquet_row_group_bytes: DEFAULT_PARQUET_ROW_GROUP_BYTES,
- parquet_compression: Compression::SNAPPY,
- }
- }
-
- #[tokio::test]
- async fn test_zone_generation_invalid_part() {
- let temp_dir = TempDir::new().unwrap();
- let mut args = create_test_args(1.0, &temp_dir);
- args.parts = 2;
- args.part = 3; // Invalid part number
-
- let result = generate_zone_parquet(args).await;
- assert!(result.is_err(), "Should fail with invalid part number");
- }
-
- #[tokio::test]
- async fn test_subtypes_for_different_scale_factors() {
- // Test scale factor categorization
- let sf_01_subtypes = subtypes_for_scale_factor(0.1);
- assert_eq!(sf_01_subtypes, vec!["microhood", "macrohood", "county"]);
-
- let sf_10_subtypes = subtypes_for_scale_factor(10.0);
- assert_eq!(
- sf_10_subtypes,
- vec!["microhood", "macrohood", "county", "neighborhood"]
- );
-
- let sf_100_subtypes = subtypes_for_scale_factor(100.0);
- assert!(sf_100_subtypes.contains(&"localadmin"));
- assert!(sf_100_subtypes.contains(&"locality"));
-
- let sf_1000_subtypes = subtypes_for_scale_factor(1000.0);
- assert!(sf_1000_subtypes.contains(&"country"));
- }
-
- #[test]
- fn test_partition_distribution_logic() {
- // Test the mathematical logic for distributing rows across partitions
- let total_rows = 100i64;
- let parts = 3i64;
-
- let mut collected_rows = Vec::new();
- let mut collected_offsets = Vec::new();
-
- // Simulate the partition calculation for each part
- for part_idx in 0..parts {
- let i = part_idx;
- let base = total_rows / parts;
- let rem = total_rows % parts;
- let rows_this = base + if i < rem { 1 } else { 0 };
- let offset = i * base + std::cmp::min(i, rem);
-
- collected_rows.push(rows_this);
- collected_offsets.push(offset);
- }
-
- // Verify partitioning logic
- assert_eq!(collected_rows.iter().sum::<i64>(), total_rows); // All
rows accounted for
- assert_eq!(collected_offsets[0], 0); // First partition starts at 0
-
- // Verify no gaps or overlaps between partitions
- for i in 1..parts as usize {
- let expected_offset = collected_offsets[i - 1] + collected_rows[i
- 1];
- assert_eq!(collected_offsets[i], expected_offset);
- }
-
- // Verify remainder distribution (first partitions get extra rows)
- let remainder = (total_rows % parts) as usize;
- for i in 0..remainder {
- assert_eq!(collected_rows[i], collected_rows[remainder] + 1);
- }
- }
-
- #[test]
- fn test_rows_per_group_bounds() {
- // Test that compute_rows_per_group_from_stats respects bounds
-
- // Test minimum bound (should be at least 1000)
- let rows_per_group_tiny = compute_rows_per_group_from_stats(0.001,
1000, 1_000_000);
- assert!(rows_per_group_tiny >= 1000);
-
- // Test maximum bound (should not exceed 32767)
- let rows_per_group_huge = compute_rows_per_group_from_stats(1000.0,
1000, 1);
- assert!(rows_per_group_huge <= 32767);
-
- // Test negative target bytes falls back to default
- let rows_per_group_negative = compute_rows_per_group_from_stats(1.0,
100000, -1);
- let rows_per_group_default =
- compute_rows_per_group_from_stats(1.0, 100000,
DEFAULT_PARQUET_ROW_GROUP_BYTES);
- assert_eq!(rows_per_group_negative, rows_per_group_default);
- }
-
- #[test]
- fn test_subtype_selection_logic() {
- // Test the cumulative nature of subtype selection
- let base_subtypes = subtypes_for_scale_factor(1.0);
- let sf10_subtypes = subtypes_for_scale_factor(10.0);
- let sf100_subtypes = subtypes_for_scale_factor(100.0);
- let sf1000_subtypes = subtypes_for_scale_factor(1000.0);
-
- // Each higher scale factor should include all previous subtypes
- for subtype in &base_subtypes {
- assert!(sf10_subtypes.contains(subtype));
- assert!(sf100_subtypes.contains(subtype));
- assert!(sf1000_subtypes.contains(subtype));
- }
-
- for subtype in &sf10_subtypes {
- assert!(sf100_subtypes.contains(subtype));
- assert!(sf1000_subtypes.contains(subtype));
- }
-
- for subtype in &sf100_subtypes {
- assert!(sf1000_subtypes.contains(subtype));
- }
-
- // Verify progressive addition
- assert!(sf10_subtypes.len() > base_subtypes.len());
- assert!(sf100_subtypes.len() > sf10_subtypes.len());
- assert!(sf1000_subtypes.len() > sf100_subtypes.len());
- }
-
- #[test]
- fn test_estimated_rows_scaling_consistency() {
- // Test that estimated rows scale proportionally for SF < 1.0
- let base_rows = estimated_total_rows_for_sf(1.0);
- let half_rows = estimated_total_rows_for_sf(0.5);
- let quarter_rows = estimated_total_rows_for_sf(0.25);
-
- // Should scale proportionally (within rounding)
- assert!((half_rows as f64 - (base_rows as f64 * 0.5)).abs() < 1.0);
- assert!((quarter_rows as f64 - (base_rows as f64 * 0.25)).abs() < 1.0);
-
- // Test that SF >= 1.0 gives discrete jumps (not proportional scaling)
- let sf1_rows = estimated_total_rows_for_sf(1.0);
- let sf5_rows = estimated_total_rows_for_sf(5.0);
- let sf10_rows = estimated_total_rows_for_sf(10.0);
-
- // These should be equal (same category)
- assert_eq!(sf1_rows, sf5_rows);
-
- // This should be different (different category)
- assert_ne!(sf5_rows, sf10_rows);
- }
-}
diff --git a/spatialbench-cli/tests/cli_integration.rs
b/spatialbench-cli/tests/cli_integration.rs
index edd1ea1..1c6028d 100644
--- a/spatialbench-cli/tests/cli_integration.rs
+++ b/spatialbench-cli/tests/cli_integration.rs
@@ -84,6 +84,133 @@ fn test_spatialbench_cli_tbl_scale_factor_v1() {
}
}
+/// Test that when creating output, if the file already exists it is not
overwritten
+#[test]
+fn test_spatialbench_cli_tbl_no_overwrite() {
+ let temp_dir = tempdir().expect("Failed to create temporary directory");
+ let expected_file = temp_dir.path().join("trip.tbl");
+
+ let run_command = || {
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("0.001")
+ .arg("--format")
+ .arg("tbl")
+ .arg("--tables")
+ .arg("trip")
+ .arg("--output-dir")
+ .arg(temp_dir.path())
+ .assert()
+ .success()
+ };
+
+ run_command();
+ let original_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), 826311);
+
+ // Run the spatialbench-cli command again with the same parameters and
expect the
+ // file to not be overwritten
+ run_command();
+ let new_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), new_metadata.len());
+ assert_eq!(
+ original_metadata
+ .modified()
+ .expect("Failed to get modified time"),
+ new_metadata
+ .modified()
+ .expect("Failed to get modified time")
+ );
+}
+
+#[tokio::test]
+async fn test_zone_parquet_no_overwrite() {
+ let temp_dir = tempdir().expect("Failed to create temporary directory");
+ let expected_file = temp_dir.path().join("zone/zone.1.parquet");
+
+ let run_command = || {
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("1")
+ .arg("--tables")
+ .arg("zone")
+ .arg("--parts")
+ .arg("100")
+ .arg("--part")
+ .arg("1")
+ .arg("--output-dir")
+ .arg(temp_dir.path())
+ .assert()
+ .success()
+ };
+
+ run_command();
+ let original_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), 25400203);
+
+ // Run the spatialbench-cli command again with the same parameters and
expect the
+ // file to not be overwritten
+ run_command();
+
+ let new_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), new_metadata.len());
+ assert_eq!(
+ original_metadata
+ .modified()
+ .expect("Failed to get modified time"),
+ new_metadata
+ .modified()
+ .expect("Failed to get modified time")
+ );
+}
+
+// Test that when creating output, if the file already exists it is not for
parquet
+#[test]
+fn test_spatialbench_cli_parquet_no_overwrite() {
+ let temp_dir = tempdir().expect("Failed to create temporary directory");
+ let expected_file = temp_dir.path().join("building.parquet");
+
+ let run_command = || {
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("0.001")
+ .arg("--tables")
+ .arg("building")
+ .arg("--output-dir")
+ .arg(temp_dir.path())
+ .assert()
+ .success()
+ };
+
+ run_command();
+ let original_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), 412);
+
+ // Run the spatialbench-cli command again with the same parameters and
expect the
+ // file to not be overwritten
+ run_command();
+
+ let new_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), new_metadata.len());
+ assert_eq!(
+ original_metadata
+ .modified()
+ .expect("Failed to get modified time"),
+ new_metadata
+ .modified()
+ .expect("Failed to get modified time")
+ );
+}
+
/// Test zone parquet output determinism - same data should be generated every
time
#[tokio::test]
async fn test_zone_deterministic_parts_generation() {
@@ -106,7 +233,7 @@ async fn test_zone_deterministic_parts_generation() {
.assert()
.success();
- let zone_file1 = temp_dir1.path().join("zone.parquet");
+ let zone_file1 = temp_dir1.path().join("zone/zone.1.parquet");
// Reference file is a sf=0.01 zone table with z_boundary column removed
let reference_file =
PathBuf::from("../spatialbench/data/sf-v1/zone.parquet");
@@ -190,23 +317,49 @@ async fn test_zone_deterministic_parts_generation() {
}
}
-/// Test generating the trip table using --parts and --part options
+/// Test generating the trip table using 4 parts implicitly
#[test]
fn test_spatialbench_cli_parts() {
- // Create a temporary directory
let temp_dir = tempdir().expect("Failed to create temporary directory");
- // generate 4 parts of the trip table with scale factor 0.001
- // into directories /part1, /part2, /part3, /part4
+ // generate 4 parts of the trip table with scale factor 0.001 and let
+ // spatialbench-cli generate the multiple files
+
+ let num_parts = 4;
+ let output_dir = temp_dir.path().to_path_buf();
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("0.001")
+ .arg("--format")
+ .arg("tbl")
+ .arg("--output-dir")
+ .arg(&output_dir)
+ .arg("--parts")
+ .arg(num_parts.to_string())
+ .arg("--tables")
+ .arg("trip")
+ .assert()
+ .success();
+
+ verify_table(temp_dir.path(), "trip", num_parts, "v1");
+}
+
+/// Test generating the order table with multiple invocations using --parts and
+/// --part options
+#[test]
+fn test_spatialbench_cli_parts_explicit() {
+ let temp_dir = tempdir().expect("Failed to create temporary directory");
+
+ // generate 4 parts of the orders table with scale factor 0.001
// use threads to run the command concurrently to minimize the time taken
let num_parts = 4;
let mut threads = vec![];
for part in 1..=num_parts {
- let part_dir = temp_dir.path().join(format!("part{part}"));
+ let output_dir = temp_dir.path().to_path_buf();
threads.push(std::thread::spawn(move || {
- fs::create_dir(&part_dir).expect("Failed to create part
directory");
-
// Run the spatialbench-cli command for each part
+ // output goes into `output_dir/orders/orders.{part}.tbl`
Command::cargo_bin("spatialbench-cli")
.expect("Binary not found")
.arg("--scale-factor")
@@ -214,7 +367,7 @@ fn test_spatialbench_cli_parts() {
.arg("--format")
.arg("tbl")
.arg("--output-dir")
- .arg(&part_dir)
+ .arg(&output_dir)
.arg("--parts")
.arg(num_parts.to_string())
.arg("--part")
@@ -229,11 +382,62 @@ fn test_spatialbench_cli_parts() {
for thread in threads {
thread.join().expect("Thread panicked");
}
- // Read the generated files into a single buffer and compare them
- // to the contents of the reference file
+ verify_table(temp_dir.path(), "trip", num_parts, "v1");
+}
+
+/// Create all tables using --parts option and verify the output layouts
+#[test]
+fn test_spatialbench_cli_parts_all_tables() {
+ let temp_dir = tempdir().expect("Failed to create temporary directory");
+
+ let num_parts = 8;
+ let output_dir = temp_dir.path().to_path_buf();
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("0.51")
+ .arg("--format")
+ .arg("tbl")
+ .arg("--tables")
+ .arg("building,driver,vehicle,customer")
+ .arg("--output-dir")
+ .arg(&output_dir)
+ .arg("--parts")
+ .arg(num_parts.to_string())
+ .assert()
+ .success();
+
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("0.001")
+ .arg("--format")
+ .arg("tbl")
+ .arg("--tables")
+ .arg("trip")
+ .arg("--output-dir")
+ .arg(&output_dir)
+ .arg("--parts")
+ .arg(num_parts.to_string())
+ .assert()
+ .success();
+
+ verify_table(temp_dir.path(), "trip", num_parts, "v1");
+ verify_table(temp_dir.path(), "customer", num_parts, "v1");
+ // Note, building, vehicle and driver have only a single part regardless
of --parts
+ verify_table(temp_dir.path(), "building", 1, "v1");
+ verify_table(temp_dir.path(), "vehicle", 1, "v1");
+ verify_table(temp_dir.path(), "driver", 1, "v1");
+}
+
+/// Read the N files from `output_dir/table_name/table_name.part.tbl` into a
+/// single buffer and compare them to the contents of the reference file
+fn verify_table(output_dir: &Path, table_name: &str, parts: usize,
scale_factor: &str) {
let mut output_contents = Vec::new();
- for part in 1..=4 {
- let generated_file =
temp_dir.path().join(format!("part{part}")).join("trip.tbl");
+ for part in 1..=parts {
+ let generated_file = output_dir
+ .join(table_name)
+ .join(format!("{table_name}.{part}.tbl"));
assert!(
generated_file.exists(),
"File {:?} does not exist",
@@ -247,7 +451,7 @@ fn test_spatialbench_cli_parts() {
String::from_utf8(output_contents).expect("Failed to convert output
contents to string");
// load the reference file
- let reference_file = read_reference_file("trip", "v1");
+ let reference_file = read_reference_file(table_name, scale_factor);
assert_eq!(output_contents, reference_file);
}
@@ -362,7 +566,7 @@ async fn test_zone_write_parquet_row_group_size_default() {
expect_row_group_sizes(
output_dir.path(),
vec![RowGroups {
- table: "zone",
+ table: "zone/zone.1",
row_group_bytes: vec![86288517],
}],
);
@@ -442,7 +646,7 @@ async fn test_zone_write_parquet_row_group_size_20mb() {
expect_row_group_sizes(
output_dir.path(),
vec![RowGroups {
- table: "zone",
+ table: "zone/zone.1",
row_group_bytes: vec![15428592, 17250042, 19338201, 17046885,
17251978],
}],
);
@@ -466,24 +670,6 @@ fn test_spatialbench_cli_part_no_parts() {
));
}
-#[test]
-fn test_spatialbench_cli_parts_no_part() {
- let temp_dir = tempdir().expect("Failed to create temporary directory");
-
- // CLI Error test --parts and but not --part
- Command::cargo_bin("spatialbench-cli")
- .expect("Binary not found")
- .arg("--output-dir")
- .arg(temp_dir.path())
- .arg("--parts")
- .arg("42")
- .assert()
- .failure()
- .stderr(predicates::str::contains(
- "The --part_count option requires the --part option to be set",
- ));
-}
-
#[test]
fn test_spatialbench_cli_too_many_parts() {
let temp_dir = tempdir().expect("Failed to create temporary directory");
diff --git a/tools/generate_data.py b/tools/generate_data.py
index 34efb0b..2308603 100755
--- a/tools/generate_data.py
+++ b/tools/generate_data.py
@@ -103,12 +103,8 @@ def _generate_data(scale_factor: int, num_partitions:
dict[str, int], output_pat
tables = list(num_partitions.keys())
# Ensure base directories exist
Path(output_path).mkdir(parents=True, exist_ok=True)
- (Path(output_path) / "staging").mkdir(parents=True, exist_ok=True)
- def run_one(table: str, part: int) -> None:
- # Use a per-table, per-part staging dir to avoid collisions when
parallel
- staging_dir = Path(output_path) / "staging" / table /
f"part-{part}"
- staging_dir.mkdir(parents=True, exist_ok=True)
+ def run_one(table: str) -> None:
result = subprocess.run(
[
@@ -118,8 +114,7 @@ def _generate_data(scale_factor: int, num_partitions:
dict[str, int], output_pat
f"--format=parquet",
f"--parts={num_partitions[table]}",
f"--tables={table}",
- f"--part={part}",
- f"--output-dir={staging_dir}",
+ f"--output-dir={output_path}",
],
capture_output=True,
text=True,
@@ -129,32 +124,13 @@ def _generate_data(scale_factor: int, num_partitions:
dict[str, int], output_pat
logging.warning("Command errors:")
logging.warning(result.stderr)
- # Collate results by table instead of part
- dest_dir = Path(output_path) / table
- dest_dir.mkdir(parents=True, exist_ok=True)
- src_file = staging_dir / f"{table}.parquet"
- dest_file = dest_dir / f"part-{part}.parquet"
- shutil.move(str(src_file), str(dest_file))
-
- # Cleanup staging for this (table, part)
- try:
- shutil.rmtree(staging_dir)
- # remove parent if empty
- parent = staging_dir.parent
- if parent.exists() and not any(parent.iterdir()):
- parent.rmdir()
- except Exception as cleanup_err:
- logging.debug(f"Cleanup warning for {staging_dir}:
{cleanup_err}")
-
# Launch all generation tasks in parallel threads
futures = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=os.cpu_count() or 4
) as executor:
for table in tables:
- for idx in range(num_partitions[table]):
- part = idx + 1 # 1-indexed
- futures.append(executor.submit(run_one, table, part))
+ futures.append(executor.submit(run_one, table))
# Raise the first exception if any
for fut in concurrent.futures.as_completed(futures):
fut.result()