This is an automated email from the ASF dual-hosted git repository. prantogg pushed a commit to branch update-zone-generation in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
commit 3ee616b218d8b2c5895f3d02e6bbbe748f9eb895 Author: Pranav Toggi <[email protected]> AuthorDate: Sun Sep 28 20:22:39 2025 -0700 init: use datafusion for Zone --- spatialbench-arrow/src/lib.rs | 2 - spatialbench-arrow/src/zone.rs | 108 --------- spatialbench-arrow/tests/reparse.rs | 8 +- spatialbench-cli/Cargo.toml | 5 + spatialbench-cli/src/csv.rs | 4 +- spatialbench-cli/src/main.rs | 33 ++- spatialbench-cli/src/plan.rs | 6 +- spatialbench-cli/src/tbl.rs | 2 - spatialbench-cli/src/zone_df.rs | 238 ++++++++++++++++++ spatialbench/src/csv.rs | 49 +--- spatialbench/src/generators.rs | 411 +------------------------------- spatialbench/tests/integration_tests.rs | 17 -- 12 files changed, 272 insertions(+), 611 deletions(-) diff --git a/spatialbench-arrow/src/lib.rs b/spatialbench-arrow/src/lib.rs index eb8f66d..ac6fb78 100644 --- a/spatialbench-arrow/src/lib.rs +++ b/spatialbench-arrow/src/lib.rs @@ -41,7 +41,6 @@ mod customer; mod driver; mod trip; mod vehicle; -mod zone; use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; @@ -50,7 +49,6 @@ pub use customer::CustomerArrow; pub use driver::DriverArrow; pub use trip::TripArrow; pub use vehicle::VehicleArrow; -pub use zone::ZoneArrow; /// Iterator of Arrow [`RecordBatch`] that also knows its schema pub trait RecordBatchIterator: Iterator<Item = RecordBatch> + Send { diff --git a/spatialbench-arrow/src/zone.rs b/spatialbench-arrow/src/zone.rs deleted file mode 100644 index 06794b3..0000000 --- a/spatialbench-arrow/src/zone.rs +++ /dev/null @@ -1,108 +0,0 @@ -use crate::conversions::string_view_array_from_display_iter; -use crate::{DEFAULT_BATCH_SIZE, RecordBatchIterator}; -use arrow::array::{BinaryArray, Int64Array, RecordBatch}; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use geozero::{CoordDimensions, ToWkb}; -use spatialbench::generators::{ZoneGenerator, ZoneGeneratorIterator}; -use std::sync::{Arc, LazyLock}; - -/// Generate [`Zone`]s in [`RecordBatch`] format -/// -/// [`Zone`]: spatialbench::generators::Zone -/// -/// # Example -/// ``` -/// # use spatialbench::generators::{ZoneGenerator}; -/// # use spatialbench_arrow::ZoneArrow; -/// -/// // Create a SF=1.0 generator and wrap it in an Arrow generator -/// let generator = ZoneGenerator::new(0.001, 1, 1); -/// let mut arrow_generator = ZoneArrow::new(generator) -/// .with_batch_size(10); -/// // Read the first 10 batches -/// let batch = arrow_generator.next().unwrap(); -/// // compare the output by pretty printing it -/// let formatted_batches = arrow::util::pretty::pretty_format_batches(&[batch]) -/// .unwrap() -/// .to_string(); -/// ``` -pub struct ZoneArrow { - inner: ZoneGeneratorIterator, - batch_size: usize, -} - -impl ZoneArrow { - pub fn new(generator: ZoneGenerator) -> Self { - let inner = generator.clone().into_iter(); - Self { - inner, - batch_size: DEFAULT_BATCH_SIZE, - } - } - - pub fn with_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size = batch_size; - self - } -} - -impl RecordBatchIterator for ZoneArrow { - fn schema(&self) -> &SchemaRef { - &ZONE_SCHEMA - } -} - -impl Iterator for ZoneArrow { - type Item = RecordBatch; - - fn next(&mut self) -> Option<Self::Item> { - // Get next rows to convert - let rows: Vec<_> = self.inner.by_ref().take(self.batch_size).collect(); - if rows.is_empty() { - return None; - } - - let z_zonekey = Int64Array::from_iter_values(rows.iter().map(|r| r.z_zonekey)); - let z_gersid = string_view_array_from_display_iter(rows.iter().map(|r| &r.z_gersid)); - let z_country = string_view_array_from_display_iter(rows.iter().map(|r| &r.z_country)); - let z_region = string_view_array_from_display_iter(rows.iter().map(|r| &r.z_region)); - let z_name = string_view_array_from_display_iter(rows.iter().map(|r| &r.z_name)); - let z_subtype = string_view_array_from_display_iter(rows.iter().map(|r| &r.z_subtype)); - - // Convert geo::Polygon to WKB binary format - let z_boundary = BinaryArray::from_iter_values(rows.iter().map(|r| { - r.z_boundary - .to_wkb(CoordDimensions::xy()) - .expect("Failed to encode WKB") - })); - - let batch = RecordBatch::try_new( - Arc::clone(self.schema()), - vec![ - Arc::new(z_zonekey), - Arc::new(z_gersid), - Arc::new(z_country), - Arc::new(z_region), - Arc::new(z_name), - Arc::new(z_subtype), - Arc::new(z_boundary), - ], - ) - .unwrap(); - Some(batch) - } -} - -/// Schema for the Zone -static ZONE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(make_zone_schema); -fn make_zone_schema() -> SchemaRef { - Arc::new(Schema::new(vec![ - Field::new("z_zonekey", DataType::Int64, false), - Field::new("z_gersid", DataType::Utf8View, false), - Field::new("z_country", DataType::Utf8View, false), - Field::new("z_region", DataType::Utf8View, false), - Field::new("z_name", DataType::Utf8View, false), - Field::new("z_subtype", DataType::Utf8View, false), - Field::new("z_boundary", DataType::Binary, false), - ])) -} diff --git a/spatialbench-arrow/tests/reparse.rs b/spatialbench-arrow/tests/reparse.rs index 49c87e3..889933a 100644 --- a/spatialbench-arrow/tests/reparse.rs +++ b/spatialbench-arrow/tests/reparse.rs @@ -3,14 +3,13 @@ use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; -use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, VehicleCsv, ZoneCsv}; +use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, VehicleCsv}; use spatialbench::generators::{ Building, BuildingGenerator, Customer, CustomerGenerator, Driver, DriverGenerator, Trip, - TripGenerator, Vehicle, VehicleGenerator, Zone, ZoneGenerator, + TripGenerator, Vehicle, VehicleGenerator, }; use spatialbench_arrow::{ BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow, VehicleArrow, - ZoneArrow, }; use std::io::Write; use std::sync::Arc; @@ -49,8 +48,6 @@ test_row_type!(trip_tbl, TripGenerator, TripArrow, Test::tbl()); test_row_type!(trip_csv, TripGenerator, TripArrow, Test::csv()); test_row_type!(building_tbl, BuildingGenerator, BuildingArrow, Test::tbl()); test_row_type!(building_csv, BuildingGenerator, BuildingArrow, Test::csv()); -test_row_type!(zone_tbl, ZoneGenerator, ZoneArrow, Test::tbl()); -test_row_type!(zone_csv, ZoneGenerator, ZoneArrow, Test::csv()); /// Common trait for writing rows in TBL and CSV format trait RowType { @@ -84,7 +81,6 @@ impl_row_type!(Vehicle<'_>, VehicleCsv); impl_row_type!(Driver, DriverCsv); impl_row_type!(Trip, TripCsv); impl_row_type!(Building<'_>, BuildingCsv); -impl_row_type!(Zone, ZoneCsv); #[derive(Debug, Clone, Copy)] #[allow(clippy::upper_case_acronyms)] diff --git a/spatialbench-cli/Cargo.toml b/spatialbench-cli/Cargo.toml index a77c23d..1180f39 100644 --- a/spatialbench-cli/Cargo.toml +++ b/spatialbench-cli/Cargo.toml @@ -23,6 +23,11 @@ env_logger = "0.11.7" serde = { version = "1.0.219", features = ["derive"] } anyhow = "1.0.99" serde_yaml = "0.9.33" +datafusion = "47.0.0" +object_store = { version = "0.12.4", features = ["aws"] } +arrow-array = "55.2.0" +arrow-schema = "55.2.0" +url = "2.5.7" [dev-dependencies] assert_cmd = "2.0" diff --git a/spatialbench-cli/src/csv.rs b/spatialbench-cli/src/csv.rs index 7f9ca3b..78f93e5 100644 --- a/spatialbench-cli/src/csv.rs +++ b/spatialbench-cli/src/csv.rs @@ -1,9 +1,8 @@ //! Implementations of [`Source`] for generating data in TBL format use super::generate::Source; -use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, VehicleCsv, ZoneCsv}; +use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, VehicleCsv}; use spatialbench::generators::{ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, VehicleGenerator, - ZoneGenerator, }; use std::io::Write; @@ -45,4 +44,3 @@ define_csv_source!(DriverCsvSource, DriverGenerator<'static>, DriverCsv); define_csv_source!(CustomerCsvSource, CustomerGenerator<'static>, CustomerCsv); define_csv_source!(TripCsvSource, TripGenerator, TripCsv); define_csv_source!(BuildingCsvSource, BuildingGenerator<'static>, BuildingCsv); -define_csv_source!(ZoneCsvSource, ZoneGenerator, ZoneCsv); diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs index f94e4b9..6c05299 100644 --- a/spatialbench-cli/src/main.rs +++ b/spatialbench-cli/src/main.rs @@ -47,6 +47,7 @@ mod plan; mod spatial_config_file; mod statistics; mod tbl; +mod zone_df; use crate::csv::*; use crate::generate::{generate_in_chunks, Sink, Source}; @@ -62,13 +63,11 @@ use log::{debug, info, LevelFilter}; use spatialbench::distribution::Distributions; use spatialbench::generators::{ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, VehicleGenerator, - ZoneGenerator, }; use spatialbench::spatial::overrides::{set_overrides, SpatialOverrides}; use spatialbench::text::TextPool; use spatialbench_arrow::{ BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow, VehicleArrow, - ZoneArrow, }; use std::fmt::Display; use std::fs::{self, File}; @@ -408,6 +407,28 @@ impl Cli { Ok(()) } + async fn generate_zone(&self) -> io::Result<()> { + match self.format { + OutputFormat::Parquet => { + let args = zone_df::ZoneDfArgs { + scale_factor: self.scale_factor, + output_dir: self.output_dir.clone(), + parts: self.parts.unwrap_or(1), + part: self.part.unwrap_or(1), + parquet_row_group_bytes: self.parquet_row_group_bytes, + parquet_compression: self.parquet_compression, + }; + zone_df::generate_zone_parquet(args) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + } + _ => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Zone table is only supported in --format=parquet (via DataFusion/S3).", + )), + } + } + define_generate!( generate_vehicle, Table::Vehicle, @@ -448,14 +469,6 @@ impl Cli { BuildingCsvSource, BuildingArrow ); - define_generate!( - generate_zone, - Table::Zone, - ZoneGenerator, - ZoneTblSource, - ZoneCsvSource, - ZoneArrow - ); /// return the output filename for the given table fn output_filename(&self, table: Table) -> String { diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs index 926f379..809814f 100644 --- a/spatialbench-cli/src/plan.rs +++ b/spatialbench-cli/src/plan.rs @@ -4,7 +4,6 @@ use crate::{OutputFormat, Table}; use log::debug; use spatialbench::generators::{ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, VehicleGenerator, - ZoneGenerator, }; use std::fmt::Display; use std::ops::RangeInclusive; @@ -329,10 +328,7 @@ impl OutputSize { Table::Customer => CustomerGenerator::calculate_row_count(scale_factor, 1, 1), Table::Trip => TripGenerator::calculate_row_count(scale_factor, 1, 1), Table::Building => BuildingGenerator::calculate_row_count(scale_factor, 1, 1), - Table::Zone => { - let generator = ZoneGenerator::new(scale_factor, 1, 1); - generator.calculate_row_count() - } + Table::Zone => todo!(), } } } diff --git a/spatialbench-cli/src/tbl.rs b/spatialbench-cli/src/tbl.rs index b7019c7..8eeb448 100644 --- a/spatialbench-cli/src/tbl.rs +++ b/spatialbench-cli/src/tbl.rs @@ -3,7 +3,6 @@ use super::generate::Source; use spatialbench::generators::{ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, VehicleGenerator, - ZoneGenerator, }; use std::io::Write; @@ -43,4 +42,3 @@ define_tbl_source!(DriverTblSource, DriverGenerator<'static>); define_tbl_source!(CustomerTblSource, CustomerGenerator<'static>); define_tbl_source!(TripTblSource, TripGenerator); define_tbl_source!(BuildingTblSource, BuildingGenerator<'static>); -define_tbl_source!(ZoneTblSource, ZoneGenerator); diff --git a/spatialbench-cli/src/zone_df.rs b/spatialbench-cli/src/zone_df.rs new file mode 100644 index 0000000..2a7a4cb --- /dev/null +++ b/spatialbench-cli/src/zone_df.rs @@ -0,0 +1,238 @@ +// spatialbench-cli/src/zone_df.rs +use std::{path::PathBuf, sync::Arc, time::Instant}; + +use anyhow::{anyhow, Result}; +use arrow_array::RecordBatch; +use arrow_schema::{Schema, SchemaRef}; +use datafusion::{ + common::config::ConfigOptions, execution::runtime_env::RuntimeEnvBuilder, prelude::*, + sql::TableReference, +}; + +use datafusion::execution::runtime_env::RuntimeEnv; +use log::info; +use object_store::aws::AmazonS3Builder; +use object_store::ObjectStore; +use parquet::{ + arrow::ArrowWriter, basic::Compression as ParquetCompression, + file::properties::WriterProperties, +}; +use url::Url; + +const OVERTURE_RELEASE_DATE: &str = "2025-08-20.1"; +const OVERTURE_S3_BUCKET: &str = "overturemaps-us-west-2"; +const OVERTURE_S3_PREFIX: &str = "release"; + +fn zones_parquet_url() -> String { + format!( + "s3://{}/{}/{}/theme=divisions/type=division_area/", + OVERTURE_S3_BUCKET, OVERTURE_S3_PREFIX, OVERTURE_RELEASE_DATE + ) +} + +fn subtypes_for_scale_factor(sf: f64) -> Vec<&'static str> { + let mut v = vec!["microhood", "macrohood", "county"]; + if sf >= 10.0 { + v.push("neighborhood"); + } + if sf >= 100.0 { + v.extend_from_slice(&["localadmin", "locality", "region", "dependency"]); + } + if sf >= 1000.0 { + v.push("country"); + } + v +} + +fn estimated_total_rows_for_sf(sf: f64) -> i64 { + let mut total = 0i64; + for s in subtypes_for_scale_factor(sf) { + total += match s { + "microhood" => 74797, + "macrohood" => 42619, + "neighborhood" => 298615, + "county" => 39680, + "localadmin" => 19007, + "locality" => 555834, + "region" => 4714, + "dependency" => 105, + "country" => 378, + _ => 0, + }; + } + if sf < 1.0 { + (total as f64 * sf).ceil() as i64 + } else { + total + } +} + +fn parquet_writer_props(comp: ParquetCompression) -> WriterProperties { + WriterProperties::builder().set_compression(comp).build() +} + +fn approx_bytes_per_row(batches: &[RecordBatch]) -> f64 { + let mut rows = 0usize; + let mut bytes = 0usize; + for b in batches { + rows += b.num_rows(); + for col in b.columns() { + bytes += col.get_array_memory_size(); + } + } + if rows == 0 { + 0.0 + } else { + bytes as f64 / rows as f64 + } +} + +fn write_parquet_with_rowgroup_bytes( + out_path: &PathBuf, + schema: SchemaRef, + all_batches: Vec<RecordBatch>, + target_rowgroup_bytes: i64, + props: WriterProperties, +) -> Result<()> { + let mut writer = ArrowWriter::try_new(std::fs::File::create(out_path)?, schema, Some(props))?; + + if all_batches.is_empty() { + writer.close()?; + return Ok(()); + } + + let bpr = approx_bytes_per_row(&all_batches); + let rows_per_group: usize = if bpr > 0.0 { + (target_rowgroup_bytes as f64 / bpr) + .floor() + .max(10_000.0) + .min(1_000_000.0) as usize + } else { + 128_000 + }; + + for batch in all_batches { + let mut start = 0usize; + while start < batch.num_rows() { + let end = (start + rows_per_group).min(batch.num_rows()); + writer.write(&batch.slice(start, end - start))?; + start = end; + } + } + writer.close()?; + Ok(()) +} + +pub struct ZoneDfArgs { + pub scale_factor: f64, + pub output_dir: PathBuf, + pub parts: i32, + pub part: i32, + pub parquet_row_group_bytes: i64, + pub parquet_compression: ParquetCompression, +} + +impl ZoneDfArgs { + fn output_filename(&self) -> PathBuf { + let fname = if self.parts > 1 { + format!("zone.part-{:03}-of-{:03}.parquet", self.part, self.parts) + } else { + "zone.parquet".to_string() + }; + self.output_dir.join(fname) + } +} + +pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> { + if args.part < 1 || args.part > args.parts { + return Err(anyhow!( + "Invalid --part={} for --parts={}", + args.part, + args.parts + )); + } + + let mut cfg = ConfigOptions::new(); + cfg.execution.target_partitions = 1; + + let rt: Arc<RuntimeEnv> = Arc::new(RuntimeEnvBuilder::new().build()?); + + // Register S3 store for Overture bucket (object_store 0.11) + let bucket = OVERTURE_S3_BUCKET; // "overturemaps-us-west-2" + let s3 = AmazonS3Builder::new() + .with_bucket_name(bucket) + .with_skip_signature(true) + .with_region("us-west-2") + .build()?; // -> object_store 0.11 AmazonS3 + + let s3_url = Url::parse(&format!("s3://{bucket}"))?; + let s3_store: Arc<dyn ObjectStore> = Arc::new(s3); + rt.register_object_store(&s3_url, s3_store); + + let ctx = SessionContext::new_with_config_rt(SessionConfig::from(cfg), rt); + + let url = zones_parquet_url(); + let mut df = ctx.read_parquet(url, ParquetReadOptions::default()).await?; + + let mut pred = col("subtype").eq(lit("__never__")); + for s in subtypes_for_scale_factor(args.scale_factor) { + pred = pred.or(col("subtype").eq(lit(s))); + } + df = df.filter(pred.and(col("is_land").eq(lit(true))))?; + + df = df.sort(vec![col("id").sort(true, true)])?; + let total = estimated_total_rows_for_sf(args.scale_factor); + let parts = args.parts as i64; + let this = (args.part as i64) - 1; + let rows_per_part = (total + parts - 1) / parts; + let offset = this * rows_per_part; + df = df.limit(offset as usize, Some(rows_per_part as usize))?; + + ctx.register_table(TableReference::bare("zone_filtered"), df.into_view())?; + let sql = format!( + r#" + SELECT + CAST(ROW_NUMBER() OVER (ORDER BY id) + {offset} AS BIGINT) AS z_zonekey, + COALESCE(id, '') AS z_gersid, + COALESCE(country, '') AS z_country, + COALESCE(region, '') AS z_region, + COALESCE(names.primary, '') AS z_name, + COALESCE(subtype, '') AS z_subtype, + geometry AS z_boundary + FROM zone_filtered + "# + ); + let df2 = ctx.sql(&sql).await?; + + let t0 = Instant::now(); + let batches = df2.clone().collect().await?; + let collect_dur = t0.elapsed(); + + std::fs::create_dir_all(&args.output_dir)?; + let out = args.output_filename(); + let props = parquet_writer_props(args.parquet_compression); + + // Convert DFSchema to Arrow Schema + let schema = Arc::new(Schema::new( + df2.schema() + .fields() + .iter() + .map(|f| f.as_ref().clone()) + .collect::<Vec<_>>(), + )); + + let t1 = Instant::now(); + write_parquet_with_rowgroup_bytes(&out, schema, batches, args.parquet_row_group_bytes, props)?; + let write_dur = t1.elapsed(); + + info!( + "Zone -> {} (part {}/{}). collect={:?}, write={:?}", + out.display(), + args.part, + args.parts, + collect_dur, + write_dur + ); + + Ok(()) +} diff --git a/spatialbench/src/csv.rs b/spatialbench/src/csv.rs index b9b5a9d..37eb9ab 100644 --- a/spatialbench/src/csv.rs +++ b/spatialbench/src/csv.rs @@ -1,6 +1,6 @@ //! CSV formatting support for the row struct objects generated by the library. -use crate::generators::{Building, Customer, Driver, Trip, Vehicle, Zone}; +use crate::generators::{Building, Customer, Driver, Trip, Vehicle}; use core::fmt; use std::fmt::Display; @@ -259,50 +259,3 @@ impl Display for BuildingCsv<'_> { ) } } - -/// Write [`Zone`]s in CSV format. -/// -/// # Example -/// ``` -/// # use spatialbench::generators::ZoneGenerator; -/// # use spatialbench::csv::ZoneCsv; -/// # use std::fmt::Write; -/// // Output the first 3 rows in CSV format -/// let generator = ZoneGenerator::new(0.001, 1, 1); -/// let mut csv = String::new(); -/// writeln!(&mut csv, "{}", ZoneCsv::header()).unwrap(); // write header -/// for line in generator.iter().take(3) { -/// // write line using CSV formatter -/// writeln!(&mut csv, "{}", ZoneCsv::new(line)).unwrap(); -/// } -/// ``` -pub struct ZoneCsv { - inner: Zone, -} - -impl ZoneCsv { - pub fn new(inner: Zone) -> Self { - Self { inner } - } - - /// Returns the CSV header for the Zone table - pub fn header() -> &'static str { - "z_zonekey,z_gersid,z_country,z_region,z_name,z_subtype,z_boundary" - } -} - -impl Display for ZoneCsv { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "{},{},{},{},{},{},\"{:?}\"", - self.inner.z_zonekey, - self.inner.z_gersid, - self.inner.z_country, - self.inner.z_region, - self.inner.z_name, - self.inner.z_subtype, - self.inner.z_boundary, - ) - } -} diff --git a/spatialbench/src/generators.rs b/spatialbench/src/generators.rs index 109c0ce..5144217 100644 --- a/spatialbench/src/generators.rs +++ b/spatialbench/src/generators.rs @@ -14,17 +14,13 @@ use crate::spatial::utils::continent::{build_continent_cdf, WeightedTarget}; use crate::spatial::utils::{hash_to_unit_u64, spider_seed_for_index}; use crate::spatial::{ContinentAffines, SpatialDefaults, SpatialGenerator}; use crate::text::TextPool; -use duckdb::Connection; -use geo::Geometry; use geo::Point; -use geozero::{wkb::Wkb, ToGeo}; -use log::{debug, error, info}; +use geozero::ToGeo; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use std::convert::TryInto; use std::fmt; use std::fmt::Display; -use std::time::Instant; /// A Vehicle Manufacturer, formatted as `"Manufacturer#<n>"` #[derive(Debug, Clone, Copy, PartialEq)] @@ -1439,337 +1435,6 @@ impl<'a> Iterator for BuildingGeneratorIterator<'a> { } } -/// Represents a Zone in the dataset -#[derive(Debug, Clone, PartialEq)] -pub struct Zone { - /// Primary key - pub z_zonekey: i64, - /// GERS ID of the zone - pub z_gersid: String, - /// Country of the zone - pub z_country: String, - /// Region of the zone - pub z_region: String, - /// Name of the zone - pub z_name: String, - /// Subtype of the zone - pub z_subtype: String, - /// Boundary geometry in WKT format - pub z_boundary: Geometry, -} - -impl Display for Zone { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "{}|{}|{}|{}|{}|{}|{:?}|", - self.z_zonekey, - self.z_gersid, - self.z_country, - self.z_region, - self.z_name, - self.z_subtype, - self.z_boundary - ) - } -} - -/// Generator for [`Zone`]s that loads from a parquet file in S3 -#[derive(Debug, Clone)] -pub struct ZoneGenerator { - scale_factor: f64, - part: i32, - part_count: i32, -} - -impl ZoneGenerator { - /// S3 URL for the zones parquet file - const OVERTURE_RELEASE_DATE: &'static str = "2025-08-20.1"; - const OVERTURE_S3_BUCKET: &'static str = "overturemaps-us-west-2"; - const OVERTURE_S3_PREFIX: &'static str = "release"; - - /// Gets the S3 URL for the zones parquet file - fn get_zones_parquet_url() -> String { - format!( - "s3://{}/{}/{}/theme=divisions/type=division_area/*", - Self::OVERTURE_S3_BUCKET, - Self::OVERTURE_S3_PREFIX, - Self::OVERTURE_RELEASE_DATE - ) - } - - /// Get zone subtypes based on scale factor - fn get_zone_subtypes_for_scale_factor(scale_factor: f64) -> Vec<&'static str> { - let mut subtypes = vec!["microhood", "macrohood", "county"]; - - if scale_factor >= 10.0 { - subtypes.extend_from_slice(&["neighborhood"]); - } - - if scale_factor >= 100.0 { - subtypes.extend_from_slice(&["localadmin", "locality", "region", "dependency"]); - } - - if scale_factor >= 1000.0 { - subtypes.push("country"); - } - - subtypes - } - - /// Calculate total zones for a given scale factor based on subtype counts - fn calculate_total_zones_for_scale_factor(scale_factor: f64) -> i64 { - let subtypes = Self::get_zone_subtypes_for_scale_factor(scale_factor); - let mut total = 0i64; - - for subtype in subtypes { - let count = match subtype { - "microhood" => 74797, - "macrohood" => 42619, - "neighborhood" => 298615, - "county" => 39680, - "localadmin" => 19007, - "locality" => 555834, - "region" => 4714, - "dependency" => 105, - "country" => 378, - _ => 0, - }; - total += count; - } - - // Scale down for testing purposes - if scale_factor < 1.0 { - total = (total as f64 * scale_factor).ceil() as i64; - } - - total - } - - /// Create a new zone generator with streaming approach - pub fn new(scale_factor: f64, part: i32, part_count: i32) -> Self { - let start = Instant::now(); - info!( - "Creating ZoneGenerator with scale_factor={}, part={}, part_count={}", - scale_factor, part, part_count - ); - let elapsed = start.elapsed(); - info!("ZoneGenerator created in {:?}", elapsed); - - Self { - scale_factor, - part, - part_count, - } - } - - /// Calculate zones per partition - fn calculate_zones_per_part(&self) -> i64 { - let total_zones = Self::calculate_total_zones_for_scale_factor(self.scale_factor); - (total_zones as f64 / self.part_count as f64).ceil() as i64 - } - - /// Calculate offset for this partition - fn calculate_offset(&self) -> i64 { - let zones_per_part = self.calculate_zones_per_part(); - (self.part - 1) as i64 * zones_per_part - } - - /// Load zones for this specific partition using LIMIT and OFFSET - fn load_partition_zones(&self) -> Result<Vec<Zone>, Box<dyn std::error::Error>> { - info!( - "Loading zones for partition {} of {}", - self.part, self.part_count - ); - let start_total = Instant::now(); - - // Create a connection to DuckDB - let t0 = Instant::now(); - let conn = Connection::open_in_memory()?; - debug!("Opened DuckDB connection in {:?}", t0.elapsed()); - - // Install and load required extensions - let t1 = Instant::now(); - conn.execute_batch( - r#" - INSTALL httpfs; - LOAD httpfs; - INSTALL spatial; - LOAD spatial; - - -- Public bucket: force unsigned requests - SET s3_access_key_id = ''; - SET s3_secret_access_key = ''; - SET s3_session_token = ''; - - -- Region + endpoint for the Overture bucket - SET s3_region = 'us-west-2'; - SET s3_endpoint = 's3.us-west-2.amazonaws.com'; - "#, - )?; - debug!( - "Installed and loaded DuckDB extensions in {:?}", - t1.elapsed() - ); - - // Calculate partition parameters - let zones_per_part = self.calculate_zones_per_part(); - let offset = self.calculate_offset(); - let zones_url = Self::get_zones_parquet_url(); - let subtypes = Self::get_zone_subtypes_for_scale_factor(self.scale_factor); - - info!( - "Partition {}: LIMIT {} OFFSET {} from {} with subtypes: {:?}", - self.part, zones_per_part, offset, zones_url, subtypes - ); - - // Build the subtype filter - let subtype_filter = if subtypes.is_empty() { - return Err(format!( - "No subtypes found for scale factor {} in partition {}. This indicates a logic error.", - self.scale_factor, - self.part - ).into()); - } else { - format!( - "subtype IN ({})", - subtypes - .iter() - .map(|s| format!("'{}'", s)) - .collect::<Vec<_>>() - .join(", ") - ) - }; - - // Combine subtype filter with is_land filter - let combined_filter = format!("{} AND is_land = true", subtype_filter); - - let query = format!( - "SELECT - COALESCE(id, '') as z_gersid, - COALESCE(country, '') as z_country, - COALESCE(region, '') as z_region, - COALESCE(names.primary, '') as z_name, - COALESCE(subtype, '') as z_subtype, - ST_AsWKB(geometry) as z_boundary - FROM read_parquet('{}', hive_partitioning=1) - WHERE {} - LIMIT {} OFFSET {};", - zones_url, combined_filter, zones_per_part, offset - ); - debug!("Generated partition query: {}", query); - - // Prepare + execute query - let t2 = Instant::now(); - let mut stmt = conn.prepare(&query)?; - debug!("Prepared statement in {:?}", t2.elapsed()); - - let t3 = Instant::now(); - let mut rows = stmt.query([])?; - debug!("Executed query and got row iterator in {:?}", t3.elapsed()); - - // Iterate rows and parse geometries - let mut zones = Vec::new(); - let mut zone_id = offset + 1; - - let t4 = Instant::now(); - while let Ok(Some(row)) = rows.next() { - let z_gersid: String = row.get(0)?; - let z_country: String = row.get(1)?; - let z_region: String = row.get(2)?; - let z_name: String = row.get(3)?; - let z_subtype: String = row.get(4)?; - let wkb_bytes: Vec<u8> = row.get(5)?; - let geometry: Geometry = Wkb(&wkb_bytes).to_geo()?; - - zones.push(Zone { - z_zonekey: zone_id, - z_gersid, - z_country, - z_region, - z_name, - z_subtype, - z_boundary: geometry, - }); - - if zones.len() % 1000 == 0 { - debug!("Loaded {} zones for partition {}", zones.len(), self.part); - } - zone_id += 1; - } - - info!( - "Partition {} loaded: {} zones in {:?}", - self.part, - zones.len(), - t4.elapsed() - ); - - info!("Total partition load took {:?}", start_total.elapsed()); - Ok(zones) - } - - /// Return the row count for the given part - pub fn calculate_row_count(&self) -> i64 { - let total_zones = Self::calculate_total_zones_for_scale_factor(self.scale_factor); - let zones_per_part = self.calculate_zones_per_part(); - let offset = self.calculate_offset(); - - // Don't exceed total available zones - std::cmp::min(zones_per_part, total_zones - offset).max(0) - } - - /// Returns an iterator over the zone rows - pub fn iter(&self) -> ZoneGeneratorIterator { - ZoneGeneratorIterator::new(self.clone()) - } -} - -impl IntoIterator for ZoneGenerator { - type Item = Zone; - type IntoIter = ZoneGeneratorIterator; - - fn into_iter(self) -> Self::IntoIter { - self.iter() - } -} - -/// Iterator that generates Zone rows by loading partition data on-demand -#[derive(Debug)] -pub struct ZoneGeneratorIterator { - zones: Vec<Zone>, - index: usize, -} - -impl ZoneGeneratorIterator { - fn new(generator: ZoneGenerator) -> Self { - // Load zones for this partition only - let zones = generator.load_partition_zones().unwrap_or_else(|e| { - error!( - "Failed to load zones for partition {}: {}", - generator.part, e - ); - Vec::new() - }); - - ZoneGeneratorIterator { zones, index: 0 } - } -} - -impl Iterator for ZoneGeneratorIterator { - type Item = Zone; - - fn next(&mut self) -> Option<Self::Item> { - if self.index >= self.zones.len() { - return None; - } - - let zone = self.zones[self.index].clone(); - self.index += 1; - Some(zone) - } -} - #[cfg(test)] mod tests { use super::*; @@ -1908,78 +1573,4 @@ mod tests { assert_eq!(first.b_buildingkey, 2); assert_eq!(first.to_string(), "2|blush|POLYGON((124.218033476 10.538071565,124.215762091 10.536069114,124.214352934 10.536014944,124.212486371 10.539913704,124.217919324 10.539075339,124.218033476 10.538071565))|") } - - #[test] - fn test_zone_generation() { - // Create a generator with a small scale factor - let generator = ZoneGenerator::new(0.001, 1, 1); - let zones: Vec<_> = generator.into_iter().collect(); - - assert_eq!(zones.len(), 158); - - // Check first zone - let first = &zones[0]; - assert_eq!(first.z_zonekey, 1); - // The first zone is now a county due to the is_land filter and county being in base subtypes - assert_eq!(first.z_subtype, "county"); - // Verify the string format matches the expected pattern (but don't check exact content since it's dynamic) - let expected_pattern = format!( - "{}|{}|{}|{}|{}|{}|{:?}|", - first.z_zonekey, - first.z_gersid, - first.z_country, - first.z_region, - first.z_name, - first.z_subtype, - first.z_boundary - ); - assert_eq!(first.to_string(), expected_pattern); - } - - #[test] - fn test_zone_subtype_filters() { - // Test scale factor 0-10: should include microhood, macrohood, and county - let subtypes_0_10 = ZoneGenerator::get_zone_subtypes_for_scale_factor(5.0); - assert_eq!(subtypes_0_10, vec!["microhood", "macrohood", "county"]); - - // Test scale factor 10-100: should include microhood, macrohood, county, and neighborhood - let subtypes_10_100 = ZoneGenerator::get_zone_subtypes_for_scale_factor(50.0); - assert_eq!( - subtypes_10_100, - vec!["microhood", "macrohood", "county", "neighborhood"] - ); - - // Test scale factor 100-1000: should include all except country - let subtypes_100_1000 = ZoneGenerator::get_zone_subtypes_for_scale_factor(500.0); - assert_eq!( - subtypes_100_1000, - vec![ - "microhood", - "macrohood", - "county", - "neighborhood", - "localadmin", - "locality", - "region", - "dependency" - ] - ); - - // Test scale factor 1000+: should include all subtypes - let subtypes_1000_plus = ZoneGenerator::get_zone_subtypes_for_scale_factor(2000.0); - assert_eq!( - subtypes_1000_plus, - vec![ - "microhood", - "macrohood", - "county", - "neighborhood", - "localadmin", - "locality", - "region", - "dependency", - "country" - ] - ); - } } diff --git a/spatialbench/tests/integration_tests.rs b/spatialbench/tests/integration_tests.rs index 6108279..1880d79 100644 --- a/spatialbench/tests/integration_tests.rs +++ b/spatialbench/tests/integration_tests.rs @@ -3,7 +3,6 @@ use spatialbench::generators::{ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, VehicleGenerator, - ZoneGenerator, }; struct TestIntoIterator<G> @@ -103,22 +102,6 @@ fn test_vehicle_into_iter() { } } -#[test] -fn test_zone_into_iter() { - { - assert_eq!( - TestIntoIterator::new(ZoneGenerator::new(0.001, 1, 1)) - .to_string_vec(5) - .len(), - 5 - ); - } - { - let zone = ZoneGenerator::new(0.001, 1, 1); - assert_eq!(TestIntoIterator::new(zone).to_string_vec(5).len(), 5); - } -} - #[test] fn test_building_into_iter() { {
