This is an automated email from the ASF dual-hosted git repository. prantogg pushed a commit to branch add-file-size-option in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
commit 827a3da02785e7d3eda8848bce2fa61f61284977 Author: Pranav Toggi <[email protected]> AuthorDate: Mon Oct 27 18:44:19 2025 -0700 feat: add file size option for output files --- spatialbench-cli/src/main.rs | 13 +++- spatialbench-cli/src/output_plan.rs | 38 +++++++++- spatialbench-cli/src/plan.rs | 14 ++-- spatialbench-cli/src/zone/config.rs | 28 +++++--- spatialbench-cli/src/zone/datasource.rs | 2 +- spatialbench-cli/src/zone/main.rs | 12 ++-- spatialbench-cli/src/zone/mod.rs | 16 +++-- spatialbench-cli/src/zone/partition.rs | 67 +++++++++++++++--- spatialbench-cli/src/zone/stats.rs | 24 +++---- spatialbench-cli/src/zone/writer.rs | 2 +- spatialbench-cli/tests/cli_integration.rs | 114 ++++++++++++++++++++++++++++++ 11 files changed, 282 insertions(+), 48 deletions(-) diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs index 9456f95..7b6f6d2 100644 --- a/spatialbench-cli/src/main.rs +++ b/spatialbench-cli/src/main.rs @@ -92,6 +92,11 @@ struct Cli { #[arg(long)] part: Option<i32>, + /// Maximum file size in MB. If specified, automatically determines the number of parts. + /// Cannot be used with --parts or --part options. + #[arg(long, conflicts_with_all = ["parts", "part"])] + mb_per_file: Option<f32>, + /// Output format: tbl, csv, parquet #[arg(short, long, default_value = "parquet")] format: OutputFormat, @@ -344,7 +349,12 @@ impl Cli { if table == Table::Zone { self.generate_zone().await? } else { - output_plan_generator.generate_plans(table, self.part, self.parts)?; + output_plan_generator.generate_plans( + table, + self.part, + self.parts, + self.mb_per_file, + )?; } } let output_plans = output_plan_generator.build(); @@ -378,6 +388,7 @@ impl Cli { self.output_dir.clone(), self.parts, self.part, + self.mb_per_file, self.parquet_row_group_bytes, self.parquet_compression, ) diff --git a/spatialbench-cli/src/output_plan.rs b/spatialbench-cli/src/output_plan.rs index a80533b..75e441b 100644 --- a/spatialbench-cli/src/output_plan.rs +++ b/spatialbench-cli/src/output_plan.rs @@ -163,10 +163,18 @@ impl OutputPlanGenerator { table: Table, cli_part: Option<i32>, cli_part_count: Option<i32>, + output_file_size_mb: Option<f32>, ) -> io::Result<()> { + // Calculate part_count from output_file_size_mb if specified + let calculated_part_count = if let Some(max_size_mb) = output_file_size_mb { + Some(self.calculate_parts_from_file_size(table, max_size_mb)) + } else { + cli_part_count + }; + // If the user specified only a part count, automatically create all // partitions for the table - if let (None, Some(part_count)) = (cli_part, cli_part_count) { + if let (None, Some(part_count)) = (cli_part, calculated_part_count) { if GenerationPlan::partitioned_table(table) { debug!("Generating all partitions for table {table} with part count {part_count}"); for part in 1..=part_count { @@ -178,11 +186,37 @@ impl OutputPlanGenerator { self.generate_plan_inner(table, Some(1), Some(1))?; } } else { - self.generate_plan_inner(table, cli_part, cli_part_count)?; + self.generate_plan_inner(table, cli_part, calculated_part_count)?; } Ok(()) } + /// Calculate the number of parts needed to approximate output file size + fn calculate_parts_from_file_size(&self, table: Table, max_size_mb: f32) -> i32 { + use crate::plan::OutputSize; + + let output_size = OutputSize::new( + table, + self.scale_factor, + self.format, + self.parquet_row_group_bytes, + ); + + let total_size_bytes = output_size.total_size_bytes(); + let output_file_size_mb = max_size_mb * (1024 * 1024) as f32; + + // Calculate parts needed + let parts = ((total_size_bytes as f64 / output_file_size_mb as f64).round() as i32).max(1); + + debug!( + "Calculated {parts} parts for table {table} (total size: {}MB, max size: {}MB)", + total_size_bytes / (1024 * 1024), + max_size_mb + ); + + parts + } + fn generate_plan_inner( &mut self, table: Table, diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs index b95c053..7935bed 100644 --- a/spatialbench-cli/src/plan.rs +++ b/spatialbench-cli/src/plan.rs @@ -217,7 +217,7 @@ impl Display for GenerationPlan { /// output size of a table #[derive(Debug)] -struct OutputSize { +pub struct OutputSize { /// Average row size in bytes avg_row_size_bytes: i64, /// Number of rows in the table @@ -251,7 +251,7 @@ impl OutputSize { // Average row size in bytes for each table at scale factor 1.0 // computed using datafusion-cli: // ```shell - // datafusion-cli -c "datafusion-cli -c "select row_group_id, count(*), min(row_group_bytes)::float/min(row_group_num_rows)::float as bytes_per_row from parquet_metadata('lineitem.parquet') GROUP BY 1 ORDER BY 1"" + // datafusion-cli -c "datafusion-cli -c "select row_group_id, count(*), min(row_group_bytes)::float/min(row_group_num_rows)::float as bytes_per_row from parquet_metadata('zone.parquet') GROUP BY 1 ORDER BY 1"" // ``` OutputFormat::Parquet => match table { Table::Vehicle => 54, @@ -263,8 +263,9 @@ impl OutputSize { // Scale based on zone subtype count for the scale factor match scale_factor { sf if sf < 10.0 => 1332, - sf if sf < 100.0 => 2000, - _ => 4258, + sf if sf < 100.0 => 4445, + sf if sf < 100.0 => 5220, + _ => 5650, } } }, @@ -343,6 +344,11 @@ impl OutputSize { Table::Zone => todo!(), } } + + /// Return the total estimated size in bytes + pub fn total_size_bytes(&self) -> i64 { + self.row_count * self.avg_row_size_bytes + } } #[cfg(test)] diff --git a/spatialbench-cli/src/zone/config.rs b/spatialbench-cli/src/zone/config.rs index 0bbfa32..92890a6 100644 --- a/spatialbench-cli/src/zone/config.rs +++ b/spatialbench-cli/src/zone/config.rs @@ -6,8 +6,9 @@ use std::path::PathBuf; pub struct ZoneDfArgs { pub scale_factor: f64, pub output_dir: PathBuf, - pub parts: i32, - pub part: i32, + pub parts: Option<i32>, + pub part: Option<i32>, + pub output_file_size_mb: Option<f32>, pub parquet_row_group_bytes: i64, pub parquet_compression: ParquetCompression, } @@ -16,8 +17,9 @@ impl ZoneDfArgs { pub fn new( scale_factor: f64, output_dir: PathBuf, - parts: i32, - part: i32, + parts: Option<i32>, + part: Option<i32>, + output_file_size_mb: Option<f32>, parquet_row_group_bytes: i64, parquet_compression: ParquetCompression, ) -> Self { @@ -26,28 +28,34 @@ impl ZoneDfArgs { output_dir, parts, part, + output_file_size_mb, parquet_row_group_bytes, parquet_compression, } } pub fn validate(&self) -> Result<()> { - if self.part < 1 || self.part > self.parts { + if let (Some(part), Some(parts)) = (self.part, self.parts) { + if part < 1 || part > parts { + return Err(anyhow!("Invalid --part={} for --parts={}", part, parts)); + } + } + + if self.output_file_size_mb.is_some() && (self.parts.is_some() || self.part.is_some()) { return Err(anyhow!( - "Invalid --part={} for --parts={}", - self.part, - self.parts + "Cannot specify --parts/--part with --max-file-size-mb" )); } + Ok(()) } pub fn output_filename(&self) -> PathBuf { - if self.parts > 1 { + if self.parts.unwrap_or(1) > 1 { // Create zone subdirectory and write parts within it self.output_dir .join("zone") - .join(format!("zone.{}.parquet", self.part)) + .join(format!("zone.{}.parquet", self.part.unwrap_or(1))) } else { self.output_dir.join("zone.parquet") } diff --git a/spatialbench-cli/src/zone/datasource.rs b/spatialbench-cli/src/zone/datasource.rs index 2d61b87..1866460 100644 --- a/spatialbench-cli/src/zone/datasource.rs +++ b/spatialbench-cli/src/zone/datasource.rs @@ -62,7 +62,7 @@ impl ZoneDataSource { .read_parquet(parquet_urls, ParquetReadOptions::default()) .await?; - let stats = ZoneTableStats::new(scale_factor, 1); + let stats = ZoneTableStats::new(scale_factor, Some(1)); let subtypes = stats.subtypes(); info!("Selected subtypes for SF {}: {:?}", scale_factor, subtypes); diff --git a/spatialbench-cli/src/zone/main.rs b/spatialbench-cli/src/zone/main.rs index 5afd899..cfa59e3 100644 --- a/spatialbench-cli/src/zone/main.rs +++ b/spatialbench-cli/src/zone/main.rs @@ -6,12 +6,14 @@ use std::path::PathBuf; use super::config::ZoneDfArgs; /// Generates zone table in the requested format +#[allow(clippy::too_many_arguments)] pub async fn generate_zone( format: OutputFormat, scale_factor: f64, output_dir: PathBuf, parts: Option<i32>, part: Option<i32>, + max_file_size_mb: Option<f32>, parquet_row_group_bytes: i64, parquet_compression: ParquetCompression, ) -> io::Result<()> { @@ -25,8 +27,9 @@ pub async fn generate_zone( let args = ZoneDfArgs::new( 1.0f64.max(scale_factor), output_dir, - parts, - part_num, + Option::from(parts), + Option::from(part_num), + max_file_size_mb, parquet_row_group_bytes, parquet_compression, ); @@ -39,8 +42,9 @@ pub async fn generate_zone( let args = ZoneDfArgs::new( 1.0f64.max(scale_factor), output_dir, - parts, - 1, // dummy value, not used in multi mode + Option::from(parts), + None, + max_file_size_mb, parquet_row_group_bytes, parquet_compression, ); diff --git a/spatialbench-cli/src/zone/mod.rs b/spatialbench-cli/src/zone/mod.rs index f1b2f3c..5d8d3ee 100644 --- a/spatialbench-cli/src/zone/mod.rs +++ b/spatialbench-cli/src/zone/mod.rs @@ -66,16 +66,24 @@ pub async fn generate_zone_parquet_multi(args: ZoneDfArgs) -> Result<()> { // Calculate total rows let total_rows: i64 = batches.iter().map(|b| b.num_rows() as i64).sum(); + // Determine number of parts + let mut parts = args.parts.unwrap_or(1); + if let Some(max_size) = args.output_file_size_mb { + parts = PartitionStrategy::calculate_parts_from_max_size(args.scale_factor, max_size); + } + // Write each part - for part in 1..=args.parts { - let partition = PartitionStrategy::calculate(total_rows, args.parts, part); + for part in 1..=parts { + let partition = + PartitionStrategy::calculate(total_rows, Option::from(parts), Option::from(part)); let partitioned_batches = partition.apply_to_batches(&batches)?; let part_args = ZoneDfArgs::new( args.scale_factor, args.output_dir.clone(), - args.parts, - part, + Option::from(parts), + Option::from(part), + args.output_file_size_mb, args.parquet_row_group_bytes, args.parquet_compression, ); diff --git a/spatialbench-cli/src/zone/partition.rs b/spatialbench-cli/src/zone/partition.rs index a27f656..700eca9 100644 --- a/spatialbench-cli/src/zone/partition.rs +++ b/spatialbench-cli/src/zone/partition.rs @@ -1,6 +1,7 @@ +use crate::zone::stats::ZoneTableStats; use arrow_array::RecordBatch; use datafusion::prelude::*; -use log::info; +use log::{debug, info}; pub struct PartitionStrategy { offset: i64, @@ -8,15 +9,16 @@ pub struct PartitionStrategy { } impl PartitionStrategy { - pub fn calculate(total_rows: i64, parts: i32, part: i32) -> Self { - let parts = parts as i64; - let i = (part as i64) - 1; + pub fn calculate(total_rows: i64, parts: Option<i32>, part: Option<i32>) -> Self { + let parts = parts.unwrap_or(1); + let part = part.unwrap_or(1); + let i = part - 1; - let base = total_rows / parts; - let rem = total_rows % parts; + let base = total_rows / parts as i64; + let rem = total_rows % parts as i64; - let limit = base + if i < rem { 1 } else { 0 }; - let offset = i * base + std::cmp::min(i, rem); + let limit = base + if i < rem as i32 { 1 } else { 0 }; + let offset = i as i64 * base + std::cmp::min(i as i64, rem); info!( "Partition: total={}, parts={}, part={}, offset={}, limit={}", @@ -26,6 +28,24 @@ impl PartitionStrategy { Self { offset, limit } } + /// Calculates the number of parts needed to approximate the output file size. + pub(crate) fn calculate_parts_from_max_size(sf: f64, output_file_size_mb: f32) -> i32 { + let (size_gb, _) = ZoneTableStats::base_stats(sf); + + let total_size_bytes = size_gb * 1024.0 * 1024.0 * 1024.0; + let output_file_size_mb = output_file_size_mb * 1024.0 * 1024.0; + + let parts = ((total_size_bytes / output_file_size_mb as f64).round() as i32).max(1); + + debug!( + "Calculated {parts} parts for table Zone (total size: {}MB, max size: {}MB)", + total_size_bytes / (1024.0 * 1024.0), + output_file_size_mb + ); + + parts + } + pub fn offset(&self) -> i64 { self.offset } @@ -78,7 +98,8 @@ mod tests { let mut collected_offsets = Vec::new(); for part in 1..=parts { - let strategy = PartitionStrategy::calculate(total_rows, parts, part); + let strategy = + PartitionStrategy::calculate(total_rows, Option::from(parts), Option::from(part)); collected_rows.push(strategy.limit); collected_offsets.push(strategy.offset); } @@ -91,4 +112,32 @@ mod tests { assert_eq!(collected_offsets[i], expected_offset); } } + + #[test] + fn test_calculate_parts_from_max_size() { + // Test with a scale factor that produces a known size + let sf = 1.0; + let (size_gb, _) = ZoneTableStats::base_stats(sf); + let total_size_mb = size_gb * 1024.0; + + // Test case 1: file size larger than total - should return 1 part + let output_file_size_mb = (total_size_mb * 2.0) as f32; + let parts = PartitionStrategy::calculate_parts_from_max_size(sf, output_file_size_mb); + assert_eq!(parts, 1); + + // Test case 2: file size exactly half of total - should return 2 parts + let output_file_size_mb = (total_size_mb / 2.0) as f32; + let parts = PartitionStrategy::calculate_parts_from_max_size(sf, output_file_size_mb); + assert_eq!(parts, 2); + + // Test case 3: file size forces 3+ parts + let output_file_size_mb = (total_size_mb / 3.5) as f32; + let parts = PartitionStrategy::calculate_parts_from_max_size(sf, output_file_size_mb); + assert_eq!(parts, 4); // ceil(3.5) = 4 + + // Test case 5: very small file size + let parts = PartitionStrategy::calculate_parts_from_max_size(sf, 1.0); + assert!(parts > 1); + assert_eq!(parts, (total_size_mb.round() as i32).max(1)); + } } diff --git a/spatialbench-cli/src/zone/stats.rs b/spatialbench-cli/src/zone/stats.rs index ce5e5d5..5ba8ba0 100644 --- a/spatialbench-cli/src/zone/stats.rs +++ b/spatialbench-cli/src/zone/stats.rs @@ -19,11 +19,11 @@ pub struct ZoneTableStats { } impl ZoneTableStats { - pub fn new(scale_factor: f64, parts: i32) -> Self { + pub fn new(scale_factor: f64, parts: Option<i32>) -> Self { let (mut size_gb, mut total_rows) = Self::base_stats(scale_factor); - if scale_factor <= 1.0 && parts > 1 { - (size_gb, total_rows) = Self::base_stats(scale_factor / parts as f64); + if scale_factor <= 1.0 && parts > Option::from(1) { + (size_gb, total_rows) = Self::base_stats(scale_factor / parts.unwrap() as f64); } debug!( @@ -38,7 +38,7 @@ impl ZoneTableStats { } } - fn base_stats(sf: f64) -> (f64, i64) { + pub fn base_stats(sf: f64) -> (f64, i64) { if sf < 1.0 { (0.92 * sf, (156_095.0 * sf).ceil() as i64) } else if sf < 10.0 { @@ -109,29 +109,29 @@ mod tests { #[test] fn test_subtypes_for_different_scale_factors() { - let sf_01_stats = ZoneTableStats::new(0.1, 1); + let sf_01_stats = ZoneTableStats::new(0.1, Some(1)); assert_eq!( sf_01_stats.subtypes(), vec!["microhood", "macrohood", "county"] ); - let sf_10_stats = ZoneTableStats::new(10.0, 1); + let sf_10_stats = ZoneTableStats::new(10.0, Some(1)); assert_eq!( sf_10_stats.subtypes(), vec!["microhood", "macrohood", "county", "neighborhood"] ); - let sf_100_stats = ZoneTableStats::new(100.0, 1); + let sf_100_stats = ZoneTableStats::new(100.0, Some(1)); assert!(sf_100_stats.subtypes().contains(&"localadmin")); assert!(sf_100_stats.subtypes().contains(&"locality")); - let sf_1000_stats = ZoneTableStats::new(1000.0, 1); + let sf_1000_stats = ZoneTableStats::new(1000.0, Some(1)); assert!(sf_1000_stats.subtypes().contains(&"country")); } #[test] fn test_rows_per_group_bounds() { - let stats = ZoneTableStats::new(1.0, 1); + let stats = ZoneTableStats::new(1.0, Some(1)); let rows_per_group_tiny = stats.compute_rows_per_group(1_000_000, 128 * 1024 * 1024); assert!(rows_per_group_tiny >= 1000); @@ -147,9 +147,9 @@ mod tests { #[test] fn test_estimated_rows_scaling_consistency() { - let base_stats = ZoneTableStats::new(1.0, 1); - let half_stats = ZoneTableStats::new(0.5, 1); - let quarter_stats = ZoneTableStats::new(0.25, 1); + let base_stats = ZoneTableStats::new(1.0, Some(1)); + let half_stats = ZoneTableStats::new(0.5, Some(1)); + let quarter_stats = ZoneTableStats::new(0.25, Some(1)); let base_rows = base_stats.estimated_total_rows() as f64; let half_rows = half_stats.estimated_total_rows() as f64; diff --git a/spatialbench-cli/src/zone/writer.rs b/spatialbench-cli/src/zone/writer.rs index 7aff003..d8dc824 100644 --- a/spatialbench-cli/src/zone/writer.rs +++ b/spatialbench-cli/src/zone/writer.rs @@ -81,7 +81,7 @@ impl ParquetWriter { let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); info!( - "Zone -> {} (part {}/{}). write={:?}, total_rows={}", + "Zone -> {} (part {:?}/{:?}). write={:?}, total_rows={}", self.output_path.display(), self.args.part, self.args.parts, diff --git a/spatialbench-cli/tests/cli_integration.rs b/spatialbench-cli/tests/cli_integration.rs index 1c6028d..510a0e8 100644 --- a/spatialbench-cli/tests/cli_integration.rs +++ b/spatialbench-cli/tests/cli_integration.rs @@ -778,6 +778,120 @@ fn test_zone_generation_tbl_fails() { )); } +#[tokio::test] +async fn test_trip_output_file_size() { + let temp_dir = tempdir().expect("Failed to create temporary directory"); + + // Generate Trip table with max file size of 2MB + Command::cargo_bin("spatialbench-cli") + .expect("Binary not found") + .arg("--format") + .arg("parquet") + .arg("--scale-factor") + .arg("0.01") + .arg("--tables") + .arg("trip") + .arg("--output-dir") + .arg(temp_dir.path()) + .arg("--mb-per-file") + .arg("2") + .assert() + .success(); + + let trip_dir = temp_dir.path().join("trip"); + let output_file_size_mb = 2 * 1024 * 1024; // 2MB in bytes + + // Verify all files are under the max size + for entry in fs::read_dir(&trip_dir).expect("Failed to read trip directory") { + let entry = entry.expect("Failed to read directory entry"); + let metadata = entry.metadata().expect("Failed to get file metadata"); + let file_size = metadata.len(); + + assert!( + file_size <= output_file_size_mb, + "File {} size ({} bytes) exceeds max size ({} bytes)", + entry.file_name().to_string_lossy(), + file_size, + output_file_size_mb + ); + } + + // Verify multiple files were created + let file_count = fs::read_dir(&trip_dir) + .expect("Failed to read trip directory") + .count(); + assert!(file_count > 1, "Expected multiple files with 10MB limit"); +} + +#[tokio::test] +async fn test_customer_output_file_size() { + let temp_dir = tempdir().expect("Failed to create temporary directory"); + + // Generate Customer table with approx file size of 1MB + Command::cargo_bin("spatialbench-cli") + .expect("Binary not found") + .arg("--format") + .arg("parquet") + .arg("--scale-factor") + .arg("1") + .arg("--tables") + .arg("customer") + .arg("--output-dir") + .arg(temp_dir.path()) + .arg("--mb-per-file") + .arg("1") + .assert() + .success(); + + let customer_dir = temp_dir.path().join("customer"); + let output_file_size_mb = 1 * 1024 * 1024; // 1MB in bytes + + // Verify all files are under the max size + for entry in fs::read_dir(&customer_dir).expect("Failed to read customer directory") { + let entry = entry.expect("Failed to read directory entry"); + let metadata = entry.metadata().expect("Failed to get file metadata"); + let file_size = metadata.len(); + + assert!( + file_size <= output_file_size_mb, + "File {} size ({} bytes) exceeds max size ({} bytes)", + entry.file_name().to_string_lossy(), + file_size, + output_file_size_mb + ); + } + + // Verify multiple files were created + let file_count = fs::read_dir(&customer_dir) + .expect("Failed to read trip directory") + .count(); + assert!(file_count > 1, "Expected multiple files with 10MB limit"); +} + +#[tokio::test] +async fn test_zone_file_size_conflicts_with_parts() { + let temp_dir = tempdir().expect("Failed to create temporary directory"); + + // Should fail when both --mb-per-file and --parts are specified + Command::cargo_bin("spatialbench-cli") + .expect("Binary not found") + .arg("--format") + .arg("parquet") + .arg("--scale-factor") + .arg("1") + .arg("--tables") + .arg("zone") + .arg("--output-dir") + .arg(temp_dir.path()) + .arg("--mb-per-file") + .arg("50") + .arg("--parts") + .arg("10") + .assert() + .failure() + .stderr(predicates::str::contains("cannot be used with")); +} + fn read_gzipped_file_to_string<P: AsRef<Path>>(path: P) -> Result<String, std::io::Error> { let file = File::open(path)?; let mut decoder = flate2::read::GzDecoder::new(file);
