This is an automated email from the ASF dual-hosted git repository. prantogg pushed a commit to branch support-multipart in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
commit f8953a74fe51de4d77410d20e81707737cb2f299 Author: Pranav Toggi <[email protected]> AuthorDate: Sat Oct 25 14:30:51 2025 -0700 allow generating multi-part zone --- tpchgen-cli/src/main.rs | 14 ++++----- tpchgen-cli/src/zone/config.rs | 8 +++-- tpchgen-cli/src/zone/datasource.rs | 22 +++++++++----- tpchgen-cli/src/zone/main.rs | 44 ++++++++++++++++++++-------- tpchgen-cli/src/zone/mod.rs | 57 +++++++++++++++++++++++++++++------- tpchgen-cli/src/zone/partition.rs | 42 ++++++++++++++++++++------ tpchgen-cli/src/zone/transform.rs | 6 +--- tpchgen-cli/src/zone/writer.rs | 16 ++++------ tpchgen-cli/tests/cli_integration.rs | 10 +++---- 9 files changed, 151 insertions(+), 68 deletions(-) diff --git a/tpchgen-cli/src/main.rs b/tpchgen-cli/src/main.rs index a671404..c225e6a 100644 --- a/tpchgen-cli/src/main.rs +++ b/tpchgen-cli/src/main.rs @@ -60,6 +60,12 @@ use ::parquet::basic::Compression; use clap::builder::TypedValueParser; use clap::{Parser, ValueEnum}; use log::{debug, info, LevelFilter}; +use std::fmt::Display; +use std::fs::{self, File}; +use std::io::{self, BufWriter, Stdout, Write}; +use std::path::PathBuf; +use std::str::FromStr; +use std::time::Instant; use tpchgen::distribution::Distributions; use tpchgen::generators::{ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, VehicleGenerator, @@ -69,12 +75,6 @@ use tpchgen::text::TextPool; use tpchgen_arrow::{ BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow, VehicleArrow, }; -use std::fmt::Display; -use std::fs::{self, File}; -use std::io::{self, BufWriter, Stdout, Write}; -use std::path::PathBuf; -use std::str::FromStr; -use std::time::Instant; #[derive(Parser)] #[command(name = "tpchgen")] @@ -423,7 +423,7 @@ impl Cli { self.parquet_row_group_bytes, self.parquet_compression, ) - .await + .await } define_generate!( diff --git a/tpchgen-cli/src/zone/config.rs b/tpchgen-cli/src/zone/config.rs index c760dc9..2da6d5e 100644 --- a/tpchgen-cli/src/zone/config.rs +++ b/tpchgen-cli/src/zone/config.rs @@ -1,6 +1,6 @@ -use std::path::PathBuf; use anyhow::{anyhow, Result}; use parquet::basic::Compression as ParquetCompression; +use std::path::PathBuf; #[derive(Clone)] pub struct ZoneDfArgs { @@ -43,6 +43,10 @@ impl ZoneDfArgs { } pub fn output_filename(&self) -> PathBuf { - self.output_dir.join("zone.parquet") + if self.parts > 1 { + self.output_dir.join(format!("zone.{}.parquet", self.part)) + } else { + self.output_dir.join("zone.parquet") + } } } diff --git a/tpchgen-cli/src/zone/datasource.rs b/tpchgen-cli/src/zone/datasource.rs index 2be0ebf..2d61b87 100644 --- a/tpchgen-cli/src/zone/datasource.rs +++ b/tpchgen-cli/src/zone/datasource.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use anyhow::Result; use datafusion::{ common::config::ConfigOptions, @@ -7,6 +6,7 @@ use datafusion::{ }; use log::{debug, info}; use object_store::http::HttpBuilder; +use std::sync::Arc; use url::Url; use super::stats::ZoneTableStats; @@ -35,12 +35,13 @@ impl ZoneDataSource { } pub fn create_context(&self) -> Result<SessionContext> { - let cfg = ConfigOptions::new(); + let mut cfg = ConfigOptions::new(); - let ctx = SessionContext::new_with_config_rt( - SessionConfig::from(cfg), - Arc::clone(&self.runtime), - ); + // Avoid parallelism to ensure ordering of source data + cfg.execution.target_partitions = 1; + + let ctx = + SessionContext::new_with_config_rt(SessionConfig::from(cfg), Arc::clone(&self.runtime)); debug!("Created DataFusion session context"); Ok(ctx) @@ -52,7 +53,10 @@ impl ZoneDataSource { scale_factor: f64, ) -> Result<DataFrame> { let parquet_urls = self.generate_parquet_urls(); - info!("Reading {} Parquet parts from Hugging Face...", parquet_urls.len()); + info!( + "Reading {} Parquet parts from Hugging Face...", + parquet_urls.len() + ); let df = ctx .read_parquet(parquet_urls, ParquetReadOptions::default()) @@ -71,6 +75,10 @@ impl ZoneDataSource { let df = df.filter(pred.and(col("is_land").eq(lit(true))))?; info!("Applied subtype and is_land filters"); + // Sort by 'id' to ensure deterministic ordering regardless of parallelism + // let df = df.sort(vec![col("id").sort(true, false)])?; + // info!("Sorted by id for deterministic ordering"); + Ok(df) } diff --git a/tpchgen-cli/src/zone/main.rs b/tpchgen-cli/src/zone/main.rs index e5cc176..b37d97c 100644 --- a/tpchgen-cli/src/zone/main.rs +++ b/tpchgen-cli/src/zone/main.rs @@ -1,6 +1,7 @@ +use log::info; +use parquet::basic::Compression as ParquetCompression; use std::io; use std::path::PathBuf; -use parquet::basic::Compression as ParquetCompression; use super::config::ZoneDfArgs; @@ -16,17 +17,36 @@ pub async fn generate_zone( ) -> io::Result<()> { match format { OutputFormat::Parquet => { - let args = ZoneDfArgs::new( - 1.0f64.max(scale_factor), - output_dir, - parts.unwrap_or(1), - part.unwrap_or(1), - parquet_row_group_bytes, - parquet_compression, - ); - super::generate_zone_parquet(args) - .await - .map_err(io::Error::other) + let parts = parts.unwrap_or(1); + + if part.is_some() { + // Single part mode - use LIMIT/OFFSET + let args = ZoneDfArgs::new( + 1.0f64.max(scale_factor), + output_dir, + parts, + part.unwrap(), + parquet_row_group_bytes, + parquet_compression, + ); + super::generate_zone_parquet_single(args) + .await + .map_err(io::Error::other) + } else { + // Multi-part mode - collect once and partition in memory + info!("Generating all {} part(s) for zone table", parts); + let args = ZoneDfArgs::new( + 1.0f64.max(scale_factor), + output_dir, + parts, + 1, // dummy value, not used in multi mode + parquet_row_group_bytes, + parquet_compression, + ); + super::generate_zone_parquet_multi(args) + .await + .map_err(io::Error::other) + } } _ => Err(io::Error::new( io::ErrorKind::InvalidInput, diff --git a/tpchgen-cli/src/zone/mod.rs b/tpchgen-cli/src/zone/mod.rs index fc0f89f..f1b2f3c 100644 --- a/tpchgen-cli/src/zone/mod.rs +++ b/tpchgen-cli/src/zone/mod.rs @@ -9,8 +9,8 @@ mod writer; pub mod main; -use std::sync::Arc; use anyhow::Result; +use std::sync::Arc; pub use config::ZoneDfArgs; use datasource::ZoneDataSource; @@ -19,22 +19,18 @@ use stats::ZoneTableStats; use transform::ZoneTransformer; use writer::ParquetWriter; -pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> { +/// Generate a single part using LIMIT/OFFSET on the dataframe +pub async fn generate_zone_parquet_single(args: ZoneDfArgs) -> Result<()> { args.validate()?; let stats = ZoneTableStats::new(args.scale_factor, args.parts); let datasource = ZoneDataSource::new().await?; let ctx = datasource.create_context()?; - let df = datasource - .load_zone_data(&ctx, args.scale_factor) - .await?; + let df = datasource.load_zone_data(&ctx, args.scale_factor).await?; - let partition = PartitionStrategy::calculate( - stats.estimated_total_rows(), - args.parts, - args.part, - ); + let partition = + PartitionStrategy::calculate(stats.estimated_total_rows(), args.parts, args.part); let df = partition.apply_to_dataframe(df)?; @@ -46,8 +42,47 @@ pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> { let batches = df.collect().await?; let writer = ParquetWriter::new(&args, &stats, schema); - writer.write(&batches)?; Ok(()) } + +/// Generate all parts by collecting once and partitioning in memory +pub async fn generate_zone_parquet_multi(args: ZoneDfArgs) -> Result<()> { + let stats = ZoneTableStats::new(args.scale_factor, args.parts); + let datasource = ZoneDataSource::new().await?; + let ctx = datasource.create_context()?; + + let df = datasource.load_zone_data(&ctx, args.scale_factor).await?; + + // Transform without offset (we'll adjust per-part later) + let transformer = ZoneTransformer::new(0); + let df = transformer.transform(&ctx, df).await?; + + // Collect once + let schema = Arc::new(transformer.arrow_schema(&df)?); + let batches = df.collect().await?; + + // Calculate total rows + let total_rows: i64 = batches.iter().map(|b| b.num_rows() as i64).sum(); + + // Write each part + for part in 1..=args.parts { + let partition = PartitionStrategy::calculate(total_rows, args.parts, part); + let partitioned_batches = partition.apply_to_batches(&batches)?; + + let part_args = ZoneDfArgs::new( + args.scale_factor, + args.output_dir.clone(), + args.parts, + part, + args.parquet_row_group_bytes, + args.parquet_compression, + ); + + let writer = ParquetWriter::new(&part_args, &stats, schema.clone()); + writer.write(&partitioned_batches)?; + } + + Ok(()) +} diff --git a/tpchgen-cli/src/zone/partition.rs b/tpchgen-cli/src/zone/partition.rs index 8ea54ea..a27f656 100644 --- a/tpchgen-cli/src/zone/partition.rs +++ b/tpchgen-cli/src/zone/partition.rs @@ -1,3 +1,4 @@ +use arrow_array::RecordBatch; use datafusion::prelude::*; use log::info; @@ -19,17 +20,10 @@ impl PartitionStrategy { info!( "Partition: total={}, parts={}, part={}, offset={}, limit={}", - total_rows, - parts, - part, - offset, - limit + total_rows, parts, part, offset, limit ); - Self { - offset, - limit, - } + Self { offset, limit } } pub fn offset(&self) -> i64 { @@ -39,6 +33,36 @@ impl PartitionStrategy { pub fn apply_to_dataframe(&self, df: DataFrame) -> datafusion::common::Result<DataFrame> { df.limit(self.offset as usize, Some(self.limit as usize)) } + + /// Apply partition to already-collected batches + pub fn apply_to_batches(&self, batches: &[RecordBatch]) -> anyhow::Result<Vec<RecordBatch>> { + let mut result = Vec::new(); + let mut current_offset = 0i64; + let end_offset = self.offset + self.limit; + + for batch in batches { + let batch_rows = batch.num_rows() as i64; + let batch_end = current_offset + batch_rows; + + if batch_end <= self.offset || current_offset >= end_offset { + current_offset = batch_end; + continue; + } + + let start_in_batch = (self.offset.saturating_sub(current_offset)).max(0) as usize; + let end_in_batch = ((end_offset - current_offset).min(batch_rows)) as usize; + let length = end_in_batch - start_in_batch; + + if length > 0 { + let sliced = batch.slice(start_in_batch, length); + result.push(sliced); + } + + current_offset = batch_end; + } + + Ok(result) + } } #[cfg(test)] diff --git a/tpchgen-cli/src/zone/transform.rs b/tpchgen-cli/src/zone/transform.rs index e2f423b..74d9365 100644 --- a/tpchgen-cli/src/zone/transform.rs +++ b/tpchgen-cli/src/zone/transform.rs @@ -12,11 +12,7 @@ impl ZoneTransformer { Self { offset } } - pub async fn transform( - &self, - ctx: &SessionContext, - df: DataFrame, - ) -> Result<DataFrame> { + pub async fn transform(&self, ctx: &SessionContext, df: DataFrame) -> Result<DataFrame> { ctx.register_table(TableReference::bare("zone_filtered"), df.into_view())?; debug!("Registered filtered data as 'zone_filtered' table"); diff --git a/tpchgen-cli/src/zone/writer.rs b/tpchgen-cli/src/zone/writer.rs index fe273a9..627566b 100644 --- a/tpchgen-cli/src/zone/writer.rs +++ b/tpchgen-cli/src/zone/writer.rs @@ -1,12 +1,9 @@ -use std::{path::PathBuf, sync::Arc, time::Instant}; use anyhow::Result; use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use log::{debug, info}; -use parquet::{ - arrow::ArrowWriter, - file::properties::WriterProperties, -}; +use parquet::{arrow::ArrowWriter, file::properties::WriterProperties}; +use std::{path::PathBuf, sync::Arc, time::Instant}; use super::config::ZoneDfArgs; use super::stats::ZoneTableStats; @@ -20,10 +17,8 @@ pub struct ParquetWriter { impl ParquetWriter { pub fn new(args: &ZoneDfArgs, stats: &ZoneTableStats, schema: SchemaRef) -> Self { - let rows_per_group = stats.compute_rows_per_group( - args.parquet_row_group_bytes, - 128 * 1024 * 1024, - ); + let rows_per_group = + stats.compute_rows_per_group(args.parquet_row_group_bytes, 128 * 1024 * 1024); let props = WriterProperties::builder() .set_compression(args.parquet_compression) @@ -46,7 +41,8 @@ impl ParquetWriter { let t0 = Instant::now(); let file = std::fs::File::create(&self.output_path)?; - let mut writer = ArrowWriter::try_new(file, Arc::clone(&self.schema), Some(self.props.clone()))?; + let mut writer = + ArrowWriter::try_new(file, Arc::clone(&self.schema), Some(self.props.clone()))?; for batch in batches { writer.write(batch)?; diff --git a/tpchgen-cli/tests/cli_integration.rs b/tpchgen-cli/tests/cli_integration.rs index bdedc3b..6916b8e 100644 --- a/tpchgen-cli/tests/cli_integration.rs +++ b/tpchgen-cli/tests/cli_integration.rs @@ -2,14 +2,14 @@ use arrow_array::RecordBatch; use assert_cmd::Command; use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; use parquet::file::metadata::ParquetMetaDataReader; -use tpchgen::generators::TripGenerator; -use tpchgen_arrow::{RecordBatchIterator, TripArrow}; use std::fs; use std::fs::File; use std::io::Read; use std::path::{Path, PathBuf}; use std::sync::Arc; use tempfile::tempdir; +use tpchgen::generators::TripGenerator; +use tpchgen_arrow::{RecordBatchIterator, TripArrow}; /// Test TBL output for scale factor 0.51 and 0.001 using tpchgen-cli /// A scale factor of 0.51 is used because a sf of 0.5 and below will yield 0 results in the Building table @@ -106,7 +106,7 @@ async fn test_zone_deterministic_parts_generation() { .assert() .success(); - let zone_file1 = temp_dir1.path().join("zone.parquet"); + let zone_file1 = temp_dir1.path().join("zone.1.parquet"); // Reference file is a sf=0.01 zone table with z_boundary column removed let reference_file = PathBuf::from("../tpchgen/data/sf-v1/zone.parquet"); @@ -362,7 +362,7 @@ async fn test_zone_write_parquet_row_group_size_default() { expect_row_group_sizes( output_dir.path(), vec![RowGroups { - table: "zone", + table: "zone.1", row_group_bytes: vec![86288517], }], ); @@ -442,7 +442,7 @@ async fn test_zone_write_parquet_row_group_size_20mb() { expect_row_group_sizes( output_dir.path(), vec![RowGroups { - table: "zone", + table: "zone.1", row_group_bytes: vec![15428592, 17250042, 19338201, 17046885, 17251978], }], );
