This is an automated email from the ASF dual-hosted git repository. jiayu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
commit f02617d6f4ee614676325811f34a8ca256ce2a7d Author: Pranav Toggi <[email protected]> AuthorDate: Thu Aug 21 10:30:50 2025 -0700 [EWT-3199] Temporary partitioning of Zone generation via LIMIT and OFFSET in DuckDB (#8) * Use LIMIT/OFFSET approach * fmt fix --- spatialbench/Cargo.toml | 1 + spatialbench/src/generators.rs | 225 ++++++++++++++++++++++------------------- 2 files changed, 123 insertions(+), 103 deletions(-) diff --git a/spatialbench/Cargo.toml b/spatialbench/Cargo.toml index e679fec..8eac353 100644 --- a/spatialbench/Cargo.toml +++ b/spatialbench/Cargo.toml @@ -16,6 +16,7 @@ rand = { version = "0.8", features = ["small_rng"] } duckdb = { version = "1.3.0", features = ["bundled"] } geo = { workspace = true } geozero = { workspace = true } +log = "0.4.27" [dev-dependencies] flate2 = "1.1.0" diff --git a/spatialbench/src/generators.rs b/spatialbench/src/generators.rs index 7bfc047..8ab801c 100644 --- a/spatialbench/src/generators.rs +++ b/spatialbench/src/generators.rs @@ -16,11 +16,13 @@ use duckdb::Connection; use geo::Geometry; use geo::Point; use geozero::{wkb::Wkb, ToGeo}; +use log::{debug, error, info}; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use std::convert::TryInto; use std::fmt; use std::fmt::Display; +use std::time::Instant; /// A Vehicle Manufacturer, formatted as `"Manufacturer#<n>"` #[derive(Debug, Clone, Copy, PartialEq)] @@ -1426,7 +1428,6 @@ impl Display for Zone { #[derive(Debug, Clone)] pub struct ZoneGenerator { scale_factor: f64, - zones: Vec<Zone>, part: i32, part_count: i32, } @@ -1448,41 +1449,68 @@ impl ZoneGenerator { ) } - /// Creates a new ZoneGenerator that loads data from S3 - pub fn new(scale_factor: f64, part: i32, part_count: i32) -> ZoneGenerator { - // construct temporary ZoneGenerator with empty zones - let mut generator = ZoneGenerator { + /// Create a new zone generator with streaming approach + pub fn new(scale_factor: f64, part: i32, part_count: i32) -> Self { + let start = Instant::now(); + info!( + "Creating ZoneGenerator with scale_factor={}, part={}, part_count={}", + scale_factor, part, part_count + ); + let elapsed = start.elapsed(); + info!("ZoneGenerator created in {:?}", elapsed); + + Self { scale_factor, part, part_count, - zones: Vec::new(), - }; + } + } - let zones = generator.load_zones_from_s3(); - generator.zones = zones; + /// Calculate zones per partition + fn calculate_zones_per_part(&self) -> i64 { + let total_zones = (self.scale_factor * Self::SCALE_BASE as f64).ceil() as i64; + (total_zones as f64 / self.part_count as f64).ceil() as i64 + } - generator + /// Calculate offset for this partition + fn calculate_offset(&self) -> i64 { + let zones_per_part = self.calculate_zones_per_part(); + (self.part - 1) as i64 * zones_per_part } - /// Loads zone data from S3 parquet file using DuckDB - fn load_zones_from_s3(&self) -> Vec<Zone> { + /// Load zones for this specific partition using LIMIT and OFFSET + fn load_partition_zones(&self) -> Result<Vec<Zone>, Box<dyn std::error::Error>> { + info!( + "Loading zones for partition {} of {}", + self.part, self.part_count + ); + let start_total = Instant::now(); + // Create a connection to DuckDB - let conn = Connection::open_in_memory().expect("Failed to open DuckDB connection"); + let t0 = Instant::now(); + let conn = Connection::open_in_memory()?; + debug!("Opened DuckDB connection in {:?}", t0.elapsed()); // Install and load required extensions - conn.execute("INSTALL httpfs;", []) - .expect("Failed to install httpfs"); - conn.execute("LOAD httpfs;", []) - .expect("Failed to load httpfs"); - conn.execute("INSTALL spatial;", []) - .expect("Failed to install spatial"); - conn.execute("LOAD spatial;", []) - .expect("Failed to load spatial"); + let t1 = Instant::now(); + conn.execute("INSTALL httpfs;", [])?; + conn.execute("LOAD httpfs;", [])?; + conn.execute("INSTALL spatial;", [])?; + conn.execute("LOAD spatial;", [])?; + debug!( + "Installed and loaded DuckDB extensions in {:?}", + t1.elapsed() + ); + // Calculate partition parameters + let zones_per_part = self.calculate_zones_per_part(); + let offset = self.calculate_offset(); let zones_url = Self::get_zones_parquet_url(); - // Compute the limit based on scale factor - let limit = (self.scale_factor * Self::SCALE_BASE as f64).ceil() as i64; + info!( + "Partition {}: LIMIT {} OFFSET {} from {}", + self.part, zones_per_part, offset, zones_url + ); let query = format!( "SELECT @@ -1494,75 +1522,78 @@ impl ZoneGenerator { ST_AsWKB(geometry) as z_boundary FROM read_parquet('{}', hive_partitioning=1) WHERE subtype IN ('localadmin', 'locality', 'neighborhood') - LIMIT {};", - zones_url, limit + LIMIT {} OFFSET {};", + zones_url, zones_per_part, offset ); + debug!("Generated partition query: {}", query); + + // Prepare + execute query + let t2 = Instant::now(); + let mut stmt = conn.prepare(&query)?; + debug!("Prepared statement in {:?}", t2.elapsed()); - let mut stmt = conn.prepare(&query).unwrap(); - let mut rows = stmt.query([]).unwrap(); + let t3 = Instant::now(); + let mut rows = stmt.query([])?; + debug!("Executed query and got row iterator in {:?}", t3.elapsed()); + // Iterate rows and parse geometries let mut zones = Vec::new(); - // Counter for primary key - let mut zone_id = 1; + let mut zone_id = offset + 1; + let t4 = Instant::now(); while let Ok(Some(row)) = rows.next() { - let wkb_bytes: Vec<u8> = row.get(5).unwrap(); - let geometry: Geometry = Wkb(&wkb_bytes).to_geo().unwrap(); + let z_gersid: String = row.get(0)?; + let z_country: String = row.get(1)?; + let z_region: String = row.get(2)?; + let z_name: String = row.get(3)?; + let z_subtype: String = row.get(4)?; + let wkb_bytes: Vec<u8> = row.get(5)?; + let geometry: Geometry = Wkb(&wkb_bytes).to_geo()?; zones.push(Zone { z_zonekey: zone_id, - z_gersid: row.get(0).unwrap(), - z_country: row.get(1).unwrap(), - z_region: row.get(2).unwrap(), - z_name: row.get(3).unwrap(), - z_subtype: row.get(4).unwrap(), + z_gersid, + z_country, + z_region, + z_name, + z_subtype, z_boundary: geometry, }); + + if zones.len() % 1000 == 0 { + debug!( + "Processed {} rows for partition {}...", + zones.len(), + self.part + ); + } zone_id += 1; } - zones + info!( + "Partition {} loaded: {} zones in {:?}", + self.part, + zones.len(), + t4.elapsed() + ); + + info!("Total partition load took {:?}", start_total.elapsed()); + Ok(zones) } /// Return the row count for the given part pub fn calculate_row_count(&self) -> i64 { - let zone_count = self.zones.len() as i64; - - if self.part_count <= 1 { - return zone_count; - } - - // Partition the zones based on part number - let zones_per_part = (zone_count + self.part_count as i64 - 1) / self.part_count as i64; - let start = (self.part - 1) as i64 * zones_per_part; - let end = std::cmp::min(start + zones_per_part, zone_count); + let total_zones = (self.scale_factor * Self::SCALE_BASE as f64).ceil() as i64; + let zones_per_part = self.calculate_zones_per_part(); + let offset = self.calculate_offset(); - end - start + // Don't exceed total available zones + std::cmp::min(zones_per_part, total_zones - offset).max(0) } /// Returns an iterator over the zone rows pub fn iter(&self) -> ZoneGeneratorIterator { - let zone_count = self.zones.len() as i64; - - // If there's only one part, return all zones - if self.part_count <= 1 { - return ZoneGeneratorIterator { - zones: self.zones.clone(), - end_index: zone_count, - current_index: 0, - }; - } - - // Otherwise, calculate the correct range for this part - let zones_per_part = (zone_count + self.part_count as i64 - 1) / self.part_count as i64; - let start = (self.part - 1) as i64 * zones_per_part; - let end = std::cmp::min(start + zones_per_part, zone_count); - - ZoneGeneratorIterator { - zones: self.zones.clone(), - end_index: end, - current_index: start, - } + ZoneGeneratorIterator::new(self.clone()) } } @@ -1571,55 +1602,43 @@ impl IntoIterator for ZoneGenerator { type IntoIter = ZoneGeneratorIterator; fn into_iter(self) -> Self::IntoIter { - let zone_count = self.zones.len() as i64; - - // If there's only one part, return all zones - if self.part_count <= 1 { - return ZoneGeneratorIterator { - zones: self.zones, - end_index: zone_count, - current_index: 0, - }; - } - - // Otherwise, calculate the correct range for this part - let zones_per_part = (zone_count + self.part_count as i64 - 1) / self.part_count as i64; - let start = (self.part - 1) as i64 * zones_per_part; - let end = std::cmp::min(start + zones_per_part, zone_count); - - ZoneGeneratorIterator { - zones: self.zones, - end_index: end, - current_index: start, - } + self.iter() } } -/// Iterator that provides access to Zone rows +/// Iterator that generates Zone rows by loading partition data on-demand #[derive(Debug)] pub struct ZoneGeneratorIterator { zones: Vec<Zone>, - end_index: i64, - current_index: i64, + index: usize, +} + +impl ZoneGeneratorIterator { + fn new(generator: ZoneGenerator) -> Self { + // Load zones for this partition only + let zones = generator.load_partition_zones().unwrap_or_else(|e| { + error!( + "Failed to load zones for partition {}: {}", + generator.part, e + ); + Vec::new() + }); + + ZoneGeneratorIterator { zones, index: 0 } + } } impl Iterator for ZoneGeneratorIterator { type Item = Zone; fn next(&mut self) -> Option<Self::Item> { - if self.current_index >= self.end_index { + if self.index >= self.zones.len() { return None; } - let index = self.current_index as usize; - self.current_index += 1; - - Some(self.zones[index].clone()) - } - - fn size_hint(&self) -> (usize, Option<usize>) { - let remaining = (self.end_index - self.current_index) as usize; - (remaining, Some(remaining)) + let zone = self.zones[self.index].clone(); + self.index += 1; + Some(zone) } }
