This is an automated email from the ASF dual-hosted git repository.

prantogg pushed a commit to branch support-multipart
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git

commit 60bbdaf3ef53f1c05cc50791825bf0eab90d7613
Author: Pranav Toggi <[email protected]>
AuthorDate: Sat Oct 25 11:57:30 2025 -0700

    refactor zone generation for readability
---
 tpchgen-cli/src/main.rs            |  37 ++-
 tpchgen-cli/src/zone/config.rs     |  48 ++++
 tpchgen-cli/src/zone/datasource.rs |  87 +++++++
 tpchgen-cli/src/zone/main.rs       |  43 ++++
 tpchgen-cli/src/zone/mod.rs        |  53 ++++
 tpchgen-cli/src/zone/partition.rs  |  70 ++++++
 tpchgen-cli/src/zone/stats.rs      | 161 ++++++++++++
 tpchgen-cli/src/zone/transform.rs  |  54 ++++
 tpchgen-cli/src/zone/writer.rs     |  71 ++++++
 tpchgen-cli/src/zone_df.rs         | 503 -------------------------------------
 10 files changed, 604 insertions(+), 523 deletions(-)

diff --git a/tpchgen-cli/src/main.rs b/tpchgen-cli/src/main.rs
index 7c56fa5..a671404 100644
--- a/tpchgen-cli/src/main.rs
+++ b/tpchgen-cli/src/main.rs
@@ -47,7 +47,7 @@ mod plan;
 mod spatial_config_file;
 mod statistics;
 mod tbl;
-mod zone_df;
+mod zone;
 
 use crate::csv::*;
 use crate::generate::{generate_in_chunks, Sink, Source};
@@ -408,25 +408,22 @@ impl Cli {
     }
 
     async fn generate_zone(&self) -> io::Result<()> {
-        match self.format {
-            OutputFormat::Parquet => {
-                let args = zone_df::ZoneDfArgs {
-                    scale_factor: 1.0f64.max(self.scale_factor),
-                    output_dir: self.output_dir.clone(),
-                    parts: self.parts.unwrap_or(1),
-                    part: self.part.unwrap_or(1),
-                    parquet_row_group_bytes: self.parquet_row_group_bytes,
-                    parquet_compression: self.parquet_compression,
-                };
-                zone_df::generate_zone_parquet(args)
-                    .await
-                    .map_err(io::Error::other)
-            }
-            _ => Err(io::Error::new(
-                io::ErrorKind::InvalidInput,
-                "Zone table is only supported in --format=parquet (via 
DataFusion/S3).",
-            )),
-        }
+        let format = match self.format {
+            OutputFormat::Parquet => zone::main::OutputFormat::Parquet,
+            OutputFormat::Csv => zone::main::OutputFormat::Csv,
+            OutputFormat::Tbl => zone::main::OutputFormat::Tbl,
+        };
+
+        zone::main::generate_zone(
+            format,
+            self.scale_factor,
+            self.output_dir.clone(),
+            self.parts,
+            self.part,
+            self.parquet_row_group_bytes,
+            self.parquet_compression,
+        )
+            .await
     }
 
     define_generate!(
diff --git a/tpchgen-cli/src/zone/config.rs b/tpchgen-cli/src/zone/config.rs
new file mode 100644
index 0000000..c760dc9
--- /dev/null
+++ b/tpchgen-cli/src/zone/config.rs
@@ -0,0 +1,48 @@
+use std::path::PathBuf;
+use anyhow::{anyhow, Result};
+use parquet::basic::Compression as ParquetCompression;
+
+#[derive(Clone)]
+pub struct ZoneDfArgs {
+    pub scale_factor: f64,
+    pub output_dir: PathBuf,
+    pub parts: i32,
+    pub part: i32,
+    pub parquet_row_group_bytes: i64,
+    pub parquet_compression: ParquetCompression,
+}
+
+impl ZoneDfArgs {
+    pub fn new(
+        scale_factor: f64,
+        output_dir: PathBuf,
+        parts: i32,
+        part: i32,
+        parquet_row_group_bytes: i64,
+        parquet_compression: ParquetCompression,
+    ) -> Self {
+        Self {
+            scale_factor,
+            output_dir,
+            parts,
+            part,
+            parquet_row_group_bytes,
+            parquet_compression,
+        }
+    }
+
+    pub fn validate(&self) -> Result<()> {
+        if self.part < 1 || self.part > self.parts {
+            return Err(anyhow!(
+                "Invalid --part={} for --parts={}",
+                self.part,
+                self.parts
+            ));
+        }
+        Ok(())
+    }
+
+    pub fn output_filename(&self) -> PathBuf {
+        self.output_dir.join("zone.parquet")
+    }
+}
diff --git a/tpchgen-cli/src/zone/datasource.rs 
b/tpchgen-cli/src/zone/datasource.rs
new file mode 100644
index 0000000..2be0ebf
--- /dev/null
+++ b/tpchgen-cli/src/zone/datasource.rs
@@ -0,0 +1,87 @@
+use std::sync::Arc;
+use anyhow::Result;
+use datafusion::{
+    common::config::ConfigOptions,
+    execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
+    prelude::*,
+};
+use log::{debug, info};
+use object_store::http::HttpBuilder;
+use url::Url;
+
+use super::stats::ZoneTableStats;
+
+const OVERTURE_RELEASE_DATE: &str = "2025-08-20.1";
+const HUGGINGFACE_URL: &str = "https://huggingface.co";;
+const COMMIT_HASH: &str = "67822daa2fbc0039681922f0d7fea4157f41d13f";
+const PARQUET_PART_COUNT: usize = 4;
+const PARQUET_UUID: &str = "c998b093-fa14-440c-98f0-bbdb2126ed22";
+
+pub struct ZoneDataSource {
+    runtime: Arc<RuntimeEnv>,
+}
+
+impl ZoneDataSource {
+    pub async fn new() -> Result<Self> {
+        let rt = Arc::new(RuntimeEnvBuilder::new().build()?);
+
+        let hf_store = HttpBuilder::new().with_url(HUGGINGFACE_URL).build()?;
+        let hf_url = Url::parse(HUGGINGFACE_URL)?;
+        rt.register_object_store(&hf_url, Arc::new(hf_store));
+
+        debug!("Registered HTTPS object store for huggingface.co");
+
+        Ok(Self { runtime: rt })
+    }
+
+    pub fn create_context(&self) -> Result<SessionContext> {
+        let cfg = ConfigOptions::new();
+
+        let ctx = SessionContext::new_with_config_rt(
+            SessionConfig::from(cfg),
+            Arc::clone(&self.runtime),
+        );
+
+        debug!("Created DataFusion session context");
+        Ok(ctx)
+    }
+
+    pub async fn load_zone_data(
+        &self,
+        ctx: &SessionContext,
+        scale_factor: f64,
+    ) -> Result<DataFrame> {
+        let parquet_urls = self.generate_parquet_urls();
+        info!("Reading {} Parquet parts from Hugging Face...", 
parquet_urls.len());
+
+        let df = ctx
+            .read_parquet(parquet_urls, ParquetReadOptions::default())
+            .await?;
+
+        let stats = ZoneTableStats::new(scale_factor, 1);
+        let subtypes = stats.subtypes();
+
+        info!("Selected subtypes for SF {}: {:?}", scale_factor, subtypes);
+
+        let mut pred = col("subtype").eq(lit("__never__"));
+        for s in subtypes {
+            pred = pred.or(col("subtype").eq(lit(s)));
+        }
+
+        let df = df.filter(pred.and(col("is_land").eq(lit(true))))?;
+        info!("Applied subtype and is_land filters");
+
+        Ok(df)
+    }
+
+    fn generate_parquet_urls(&self) -> Vec<String> {
+        (0..PARQUET_PART_COUNT)
+            .map(|i| {
+                format!(
+                    
"https://huggingface.co/datasets/apache-sedona/spatialbench/resolve/{}/omf-division-area-{}/part-{:05}-{}-c000.zstd.parquet";,
+                    COMMIT_HASH, OVERTURE_RELEASE_DATE, i, PARQUET_UUID
+                )
+            })
+            .collect()
+    }
+}
diff --git a/tpchgen-cli/src/zone/main.rs b/tpchgen-cli/src/zone/main.rs
new file mode 100644
index 0000000..e5cc176
--- /dev/null
+++ b/tpchgen-cli/src/zone/main.rs
@@ -0,0 +1,43 @@
+use std::io;
+use std::path::PathBuf;
+use parquet::basic::Compression as ParquetCompression;
+
+use super::config::ZoneDfArgs;
+
+/// Generates zone table in the requested format
+pub async fn generate_zone(
+    format: OutputFormat,
+    scale_factor: f64,
+    output_dir: PathBuf,
+    parts: Option<i32>,
+    part: Option<i32>,
+    parquet_row_group_bytes: i64,
+    parquet_compression: ParquetCompression,
+) -> io::Result<()> {
+    match format {
+        OutputFormat::Parquet => {
+            let args = ZoneDfArgs::new(
+                1.0f64.max(scale_factor),
+                output_dir,
+                parts.unwrap_or(1),
+                part.unwrap_or(1),
+                parquet_row_group_bytes,
+                parquet_compression,
+            );
+            super::generate_zone_parquet(args)
+                .await
+                .map_err(io::Error::other)
+        }
+        _ => Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            "Zone table is only supported in --format=parquet.",
+        )),
+    }
+}
+
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum OutputFormat {
+    Tbl,
+    Csv,
+    Parquet,
+}
diff --git a/tpchgen-cli/src/zone/mod.rs b/tpchgen-cli/src/zone/mod.rs
new file mode 100644
index 0000000..fc0f89f
--- /dev/null
+++ b/tpchgen-cli/src/zone/mod.rs
@@ -0,0 +1,53 @@
+//! Zone table generation module using DataFusion and remote Parquet files
+
+mod config;
+mod datasource;
+mod partition;
+mod stats;
+mod transform;
+mod writer;
+
+pub mod main;
+
+use std::sync::Arc;
+use anyhow::Result;
+
+pub use config::ZoneDfArgs;
+use datasource::ZoneDataSource;
+use partition::PartitionStrategy;
+use stats::ZoneTableStats;
+use transform::ZoneTransformer;
+use writer::ParquetWriter;
+
+pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> {
+    args.validate()?;
+
+    let stats = ZoneTableStats::new(args.scale_factor, args.parts);
+    let datasource = ZoneDataSource::new().await?;
+    let ctx = datasource.create_context()?;
+
+    let df = datasource
+        .load_zone_data(&ctx, args.scale_factor)
+        .await?;
+
+    let partition = PartitionStrategy::calculate(
+        stats.estimated_total_rows(),
+        args.parts,
+        args.part,
+    );
+
+    let df = partition.apply_to_dataframe(df)?;
+
+    let transformer = ZoneTransformer::new(partition.offset());
+    let df = transformer.transform(&ctx, df).await?;
+
+    // Get schema before collecting (which moves df)
+    let schema = Arc::new(transformer.arrow_schema(&df)?);
+    let batches = df.collect().await?;
+
+    let writer = ParquetWriter::new(&args, &stats, schema);
+
+    writer.write(&batches)?;
+
+    Ok(())
+}
diff --git a/tpchgen-cli/src/zone/partition.rs 
b/tpchgen-cli/src/zone/partition.rs
new file mode 100644
index 0000000..8ea54ea
--- /dev/null
+++ b/tpchgen-cli/src/zone/partition.rs
@@ -0,0 +1,70 @@
+use datafusion::prelude::*;
+use log::info;
+
+pub struct PartitionStrategy {
+    offset: i64,
+    limit: i64,
+}
+
+impl PartitionStrategy {
+    pub fn calculate(total_rows: i64, parts: i32, part: i32) -> Self {
+        let parts = parts as i64;
+        let i = (part as i64) - 1;
+
+        let base = total_rows / parts;
+        let rem = total_rows % parts;
+
+        let limit = base + if i < rem { 1 } else { 0 };
+        let offset = i * base + std::cmp::min(i, rem);
+
+        info!(
+            "Partition: total={}, parts={}, part={}, offset={}, limit={}",
+            total_rows,
+            parts,
+            part,
+            offset,
+            limit
+        );
+
+        Self {
+            offset,
+            limit,
+        }
+    }
+
+    pub fn offset(&self) -> i64 {
+        self.offset
+    }
+
+    pub fn apply_to_dataframe(&self, df: DataFrame) -> 
datafusion::common::Result<DataFrame> {
+        df.limit(self.offset as usize, Some(self.limit as usize))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_partition_distribution() {
+        let total_rows = 100i64;
+        let parts = 3;
+
+        let mut collected_rows = Vec::new();
+        let mut collected_offsets = Vec::new();
+
+        for part in 1..=parts {
+            let strategy = PartitionStrategy::calculate(total_rows, parts, 
part);
+            collected_rows.push(strategy.limit);
+            collected_offsets.push(strategy.offset);
+        }
+
+        assert_eq!(collected_rows.iter().sum::<i64>(), total_rows);
+        assert_eq!(collected_offsets[0], 0);
+
+        for i in 1..parts as usize {
+            let expected_offset = collected_offsets[i - 1] + collected_rows[i 
- 1];
+            assert_eq!(collected_offsets[i], expected_offset);
+        }
+    }
+}
diff --git a/tpchgen-cli/src/zone/stats.rs b/tpchgen-cli/src/zone/stats.rs
new file mode 100644
index 0000000..ce5e5d5
--- /dev/null
+++ b/tpchgen-cli/src/zone/stats.rs
@@ -0,0 +1,161 @@
+use log::debug;
+
+const SUBTYPE_ROWS: &[(&str, i64)] = &[
+    ("microhood", 74797),
+    ("macrohood", 42619),
+    ("neighborhood", 298615),
+    ("county", 38679),
+    ("localadmin", 19007),
+    ("locality", 555834),
+    ("region", 3905),
+    ("dependency", 53),
+    ("country", 219),
+];
+
+pub struct ZoneTableStats {
+    scale_factor: f64,
+    size_gb: f64,
+    total_rows: i64,
+}
+
+impl ZoneTableStats {
+    pub fn new(scale_factor: f64, parts: i32) -> Self {
+        let (mut size_gb, mut total_rows) = Self::base_stats(scale_factor);
+
+        if scale_factor <= 1.0 && parts > 1 {
+            (size_gb, total_rows) = Self::base_stats(scale_factor / parts as 
f64);
+        }
+
+        debug!(
+            "Stats: size_gb={}, total_rows={} for SF={}",
+            size_gb, total_rows, scale_factor
+        );
+
+        Self {
+            scale_factor,
+            size_gb,
+            total_rows,
+        }
+    }
+
+    fn base_stats(sf: f64) -> (f64, i64) {
+        if sf < 1.0 {
+            (0.92 * sf, (156_095.0 * sf).ceil() as i64)
+        } else if sf < 10.0 {
+            (1.42, 156_095)
+        } else if sf < 100.0 {
+            (2.09, 454_710)
+        } else if sf < 1000.0 {
+            (5.68, 1_033_456)
+        } else {
+            (6.13, 1_033_675)
+        }
+    }
+
+    pub fn subtypes(&self) -> Vec<&'static str> {
+        let mut v = vec!["microhood", "macrohood", "county"];
+        if self.scale_factor >= 10.0 {
+            v.push("neighborhood");
+        }
+        if self.scale_factor >= 100.0 {
+            v.extend_from_slice(&["localadmin", "locality", "region", 
"dependency"]);
+        }
+        if self.scale_factor >= 1000.0 {
+            v.push("country");
+        }
+        v
+    }
+
+    pub fn estimated_total_rows(&self) -> i64 {
+        let mut total = 0i64;
+        for subtype in self.subtypes() {
+            total += SUBTYPE_ROWS
+                .iter()
+                .find(|(name, _)| *name == subtype)
+                .map(|(_, rows)| *rows)
+                .unwrap_or(0);
+        }
+
+        if self.scale_factor < 1.0 {
+            (total as f64 * self.scale_factor).ceil() as i64
+        } else {
+            total
+        }
+    }
+
+    pub fn compute_rows_per_group(&self, target_bytes: i64, default_bytes: 
i64) -> usize {
+        let total_bytes = self.size_gb * 1024.0 * 1024.0 * 1024.0;
+        let bytes_per_row = total_bytes / self.total_rows as f64;
+
+        let effective_target = if target_bytes <= 0 {
+            default_bytes
+        } else {
+            target_bytes
+        };
+
+        debug!(
+            "Stats: {:.2} GB, {} rows, {:.2} bytes/row, target: {} bytes",
+            self.size_gb, self.total_rows, bytes_per_row, effective_target
+        );
+
+        let est = (effective_target as f64 / bytes_per_row).floor();
+        est.clamp(1000.0, 32767.0) as usize
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_subtypes_for_different_scale_factors() {
+        let sf_01_stats = ZoneTableStats::new(0.1, 1);
+        assert_eq!(
+            sf_01_stats.subtypes(),
+            vec!["microhood", "macrohood", "county"]
+        );
+
+        let sf_10_stats = ZoneTableStats::new(10.0, 1);
+        assert_eq!(
+            sf_10_stats.subtypes(),
+            vec!["microhood", "macrohood", "county", "neighborhood"]
+        );
+
+        let sf_100_stats = ZoneTableStats::new(100.0, 1);
+        assert!(sf_100_stats.subtypes().contains(&"localadmin"));
+        assert!(sf_100_stats.subtypes().contains(&"locality"));
+
+        let sf_1000_stats = ZoneTableStats::new(1000.0, 1);
+        assert!(sf_1000_stats.subtypes().contains(&"country"));
+    }
+
+    #[test]
+    fn test_rows_per_group_bounds() {
+        let stats = ZoneTableStats::new(1.0, 1);
+
+        let rows_per_group_tiny = stats.compute_rows_per_group(1_000_000, 128 
* 1024 * 1024);
+        assert!(rows_per_group_tiny >= 1000);
+
+        let tiny_stats = ZoneTableStats {
+            scale_factor: 0.001,
+            size_gb: 1000.0,
+            total_rows: 1000,
+        };
+        let rows_per_group_huge = tiny_stats.compute_rows_per_group(1, 128 * 
1024 * 1024);
+        assert!(rows_per_group_huge <= 32767);
+    }
+
+    #[test]
+    fn test_estimated_rows_scaling_consistency() {
+        let base_stats = ZoneTableStats::new(1.0, 1);
+        let half_stats = ZoneTableStats::new(0.5, 1);
+        let quarter_stats = ZoneTableStats::new(0.25, 1);
+
+        let base_rows = base_stats.estimated_total_rows() as f64;
+        let half_rows = half_stats.estimated_total_rows() as f64;
+        let quarter_rows = quarter_stats.estimated_total_rows() as f64;
+
+        assert!((half_rows - (base_rows * 0.5)).abs() < 1.0);
+        assert!((quarter_rows - (base_rows * 0.25)).abs() < 1.0);
+    }
+}
diff --git a/tpchgen-cli/src/zone/transform.rs 
b/tpchgen-cli/src/zone/transform.rs
new file mode 100644
index 0000000..e2f423b
--- /dev/null
+++ b/tpchgen-cli/src/zone/transform.rs
@@ -0,0 +1,54 @@
+use anyhow::Result;
+use arrow_schema::Schema;
+use datafusion::{prelude::*, sql::TableReference};
+use log::{debug, info};
+
+pub struct ZoneTransformer {
+    offset: i64,
+}
+
+impl ZoneTransformer {
+    pub fn new(offset: i64) -> Self {
+        Self { offset }
+    }
+
+    pub async fn transform(
+        &self,
+        ctx: &SessionContext,
+        df: DataFrame,
+    ) -> Result<DataFrame> {
+        ctx.register_table(TableReference::bare("zone_filtered"), 
df.into_view())?;
+        debug!("Registered filtered data as 'zone_filtered' table");
+
+        let sql = format!(
+            r#"
+            SELECT
+              CAST(ROW_NUMBER() OVER (ORDER BY id) + {} AS BIGINT) AS 
z_zonekey,
+              COALESCE(id, '')            AS z_gersid,
+              COALESCE(country, '')       AS z_country,
+              COALESCE(region,  '')       AS z_region,
+              COALESCE(names.primary, '') AS z_name,
+              COALESCE(subtype, '')       AS z_subtype,
+              geometry                    AS z_boundary
+            FROM zone_filtered
+            "#,
+            self.offset
+        );
+
+        debug!("Executing SQL transformation with offset: {}", self.offset);
+        let df = ctx.sql(&sql).await?;
+        info!("SQL transformation completed successfully");
+
+        Ok(df)
+    }
+
+    pub fn arrow_schema(&self, df: &DataFrame) -> Result<Schema> {
+        Ok(Schema::new(
+            df.schema()
+                .fields()
+                .iter()
+                .map(|f| f.as_ref().clone())
+                .collect::<Vec<_>>(),
+        ))
+    }
+}
diff --git a/tpchgen-cli/src/zone/writer.rs b/tpchgen-cli/src/zone/writer.rs
new file mode 100644
index 0000000..fe273a9
--- /dev/null
+++ b/tpchgen-cli/src/zone/writer.rs
@@ -0,0 +1,71 @@
+use std::{path::PathBuf, sync::Arc, time::Instant};
+use anyhow::Result;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use log::{debug, info};
+use parquet::{
+    arrow::ArrowWriter,
+    file::properties::WriterProperties,
+};
+
+use super::config::ZoneDfArgs;
+use super::stats::ZoneTableStats;
+
+pub struct ParquetWriter {
+    output_path: PathBuf,
+    schema: SchemaRef,
+    props: WriterProperties,
+    args: ZoneDfArgs,
+}
+
+impl ParquetWriter {
+    pub fn new(args: &ZoneDfArgs, stats: &ZoneTableStats, schema: SchemaRef) 
-> Self {
+        let rows_per_group = stats.compute_rows_per_group(
+            args.parquet_row_group_bytes,
+            128 * 1024 * 1024,
+        );
+
+        let props = WriterProperties::builder()
+            .set_compression(args.parquet_compression)
+            .set_max_row_group_size(rows_per_group)
+            .build();
+
+        debug!("Using row group size: {} rows", rows_per_group);
+
+        Self {
+            output_path: args.output_filename(),
+            schema,
+            props,
+            args: args.clone(),
+        }
+    }
+
+    pub fn write(&self, batches: &[RecordBatch]) -> Result<()> {
+        std::fs::create_dir_all(&self.args.output_dir)?;
+        debug!("Created output directory: {:?}", self.args.output_dir);
+
+        let t0 = Instant::now();
+        let file = std::fs::File::create(&self.output_path)?;
+        let mut writer = ArrowWriter::try_new(file, Arc::clone(&self.schema), 
Some(self.props.clone()))?;
+
+        for batch in batches {
+            writer.write(batch)?;
+        }
+
+        writer.close()?;
+        let duration = t0.elapsed();
+
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+
+        info!(
+            "Zone -> {} (part {}/{}). write={:?}, total_rows={}",
+            self.output_path.display(),
+            self.args.part,
+            self.args.parts,
+            duration,
+            total_rows
+        );
+
+        Ok(())
+    }
+}
diff --git a/tpchgen-cli/src/zone_df.rs b/tpchgen-cli/src/zone_df.rs
deleted file mode 100644
index eb75805..0000000
--- a/tpchgen-cli/src/zone_df.rs
+++ /dev/null
@@ -1,503 +0,0 @@
-use std::{path::PathBuf, sync::Arc, time::Instant};
-
-use anyhow::{anyhow, Result};
-use arrow_array::RecordBatch;
-use arrow_schema::{Schema, SchemaRef};
-use datafusion::{
-    common::config::ConfigOptions, execution::runtime_env::RuntimeEnvBuilder, 
prelude::*,
-    sql::TableReference,
-};
-
-use crate::plan::DEFAULT_PARQUET_ROW_GROUP_BYTES;
-use datafusion::execution::runtime_env::RuntimeEnv;
-use log::{debug, info};
-use object_store::http::HttpBuilder;
-use parquet::{
-    arrow::ArrowWriter, basic::Compression as ParquetCompression,
-    file::properties::WriterProperties,
-};
-use url::Url;
-
-const OVERTURE_RELEASE_DATE: &str = "2025-08-20.1";
-const HUGGINGFACE_URL: &str = "https://huggingface.co";;
-const COMMIT_HASH: &str = "67822daa2fbc0039681922f0d7fea4157f41d13f";
-
-fn subtypes_for_scale_factor(sf: f64) -> Vec<&'static str> {
-    let mut v = vec!["microhood", "macrohood", "county"];
-    if sf >= 10.0 {
-        v.push("neighborhood");
-    }
-    if sf >= 100.0 {
-        v.extend_from_slice(&["localadmin", "locality", "region", 
"dependency"]);
-    }
-    if sf >= 1000.0 {
-        v.push("country");
-    }
-    v
-}
-
-fn estimated_total_rows_for_sf(sf: f64) -> i64 {
-    let mut total = 0i64;
-    for s in subtypes_for_scale_factor(sf) {
-        total += match s {
-            "microhood" => 74797,
-            "macrohood" => 42619,
-            "neighborhood" => 298615,
-            "county" => 38679,
-            "localadmin" => 19007,
-            "locality" => 555834,
-            "region" => 3905,
-            "dependency" => 53,
-            "country" => 219,
-            _ => 0,
-        };
-    }
-    if sf < 1.0 {
-        (total as f64 * sf).ceil() as i64
-    } else {
-        total
-    }
-}
-
-fn get_zone_table_stats(sf: f64) -> (f64, i64) {
-    // Returns (size_in_gb, total_rows) for the given scale factor
-    if sf < 1.0 {
-        (0.92 * sf, (156_095.0 * sf).ceil() as i64)
-    } else if sf < 10.0 {
-        (1.42, 156_095)
-    } else if sf < 100.0 {
-        (2.09, 454_710)
-    } else if sf < 1000.0 {
-        (5.68, 1_033_456)
-    } else {
-        (6.13, 1_033_675)
-    }
-}
-
-fn compute_rows_per_group_from_stats(size_gb: f64, total_rows: i64, 
target_bytes: i64) -> usize {
-    let total_bytes = size_gb * 1024.0 * 1024.0 * 1024.0; // Convert GB to 
bytes
-    let bytes_per_row = total_bytes / total_rows as f64;
-
-    // Use default if target_bytes is not specified or invalid
-    let effective_target = if target_bytes <= 0 {
-        DEFAULT_PARQUET_ROW_GROUP_BYTES
-    } else {
-        target_bytes
-    };
-
-    debug!(
-        "Using hardcoded stats: {:.2} GB, {} rows, {:.2} bytes/row, target: {} 
bytes",
-        size_gb, total_rows, bytes_per_row, effective_target
-    );
-
-    let est = (effective_target as f64 / bytes_per_row).floor();
-    // Keep RG count <= 32k, but avoid too-tiny RGs
-    est.clamp(1000.0, 32767.0) as usize
-}
-
-fn writer_props_with_rowgroup(comp: ParquetCompression, rows_per_group: usize) 
-> WriterProperties {
-    WriterProperties::builder()
-        .set_compression(comp)
-        .set_max_row_group_size(rows_per_group)
-        .build()
-}
-
-fn write_parquet_with_rowgroup_bytes(
-    out_path: &PathBuf,
-    schema: SchemaRef,
-    all_batches: Vec<RecordBatch>,
-    target_rowgroup_bytes: i64,
-    comp: ParquetCompression,
-    scale_factor: f64,
-    parts: i32,
-) -> Result<()> {
-    let (mut size_gb, mut total_rows) = get_zone_table_stats(scale_factor);
-
-    // Use linear scaling stats for SF <= 1.0 with parts > 1
-    if scale_factor <= 1.0 && parts > 1 {
-        (size_gb, total_rows) = get_zone_table_stats(scale_factor / parts as 
f64);
-    }
-
-    debug!(
-        "size_gb={}, total_rows={} for scale_factor={}",
-        size_gb, total_rows, scale_factor
-    );
-    let rows_per_group =
-        compute_rows_per_group_from_stats(size_gb, total_rows, 
target_rowgroup_bytes);
-    let props = writer_props_with_rowgroup(comp, rows_per_group);
-
-    debug!(
-        "Using row group size: {} rows (based on hardcoded stats)",
-        rows_per_group
-    );
-
-    let mut writer = ArrowWriter::try_new(std::fs::File::create(out_path)?, 
schema, Some(props))?;
-
-    for batch in all_batches {
-        writer.write(&batch)?;
-    }
-    writer.close()?;
-    Ok(())
-}
-
-#[derive(Clone)]
-pub struct ZoneDfArgs {
-    pub scale_factor: f64,
-    pub output_dir: PathBuf,
-    pub parts: i32,
-    pub part: i32,
-    pub parquet_row_group_bytes: i64,
-    pub parquet_compression: ParquetCompression,
-}
-
-impl ZoneDfArgs {
-    fn output_filename(&self) -> PathBuf {
-        let filename = "zone.parquet".to_string();
-        self.output_dir.join(filename)
-    }
-}
-
-pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> {
-    if args.part < 1 || args.part > args.parts {
-        return Err(anyhow!(
-            "Invalid --part={} for --parts={}",
-            args.part,
-            args.parts
-        ));
-    }
-
-    info!(
-        "Starting zone parquet generation with scale factor {}",
-        args.scale_factor
-    );
-    debug!("Zone generation args: parts={}, part={}, output_dir={:?}, 
row_group_bytes={}, compression={:?}",
-           args.parts, args.part, args.output_dir, 
args.parquet_row_group_bytes, args.parquet_compression);
-
-    let subtypes = subtypes_for_scale_factor(args.scale_factor);
-    info!(
-        "Selected subtypes for SF {}: {:?}",
-        args.scale_factor, subtypes
-    );
-
-    let estimated_rows = estimated_total_rows_for_sf(args.scale_factor);
-    info!(
-        "Estimated total rows for SF {}: {}",
-        args.scale_factor, estimated_rows
-    );
-
-    let mut cfg = ConfigOptions::new();
-    cfg.execution.target_partitions = 1;
-    debug!("Created DataFusion config with target_partitions=1");
-
-    let rt: Arc<RuntimeEnv> = Arc::new(RuntimeEnvBuilder::new().build()?);
-    debug!("Built DataFusion runtime environment");
-
-    // Register HTTPS object store for Hugging Face
-    let hf_store = HttpBuilder::new().with_url(HUGGINGFACE_URL).build()?;
-    let hf_url = Url::parse(HUGGINGFACE_URL)?;
-    rt.register_object_store(&hf_url, Arc::new(hf_store));
-    debug!("Registered HTTPS object store for huggingface.co");
-
-    let ctx = SessionContext::new_with_config_rt(SessionConfig::from(cfg), rt);
-    debug!("Created DataFusion session context");
-
-    // Parquet parts from Hugging Face (programmatically generated)
-    const PARQUET_PART_COUNT: usize = 4;
-    const PARQUET_UUID: &str = "c998b093-fa14-440c-98f0-bbdb2126ed22";
-    let parquet_urls: Vec<String> = (0..PARQUET_PART_COUNT)
-        .map(|i| format!(
-            
"https://huggingface.co/datasets/apache-sedona/spatialbench/resolve/{}/omf-division-area-{}/part-{i:05}-{uuid}-c000.zstd.parquet";,
-            COMMIT_HASH,
-            OVERTURE_RELEASE_DATE,
-            i = i,
-            uuid = PARQUET_UUID
-        ))
-        .collect();
-
-    info!(
-        "Reading {} Parquet parts from Hugging Face...",
-        parquet_urls.len()
-    );
-
-    let t_read_start = Instant::now();
-    let mut df = ctx
-        .read_parquet(parquet_urls, ParquetReadOptions::default())
-        .await?;
-    let read_dur = t_read_start.elapsed();
-    info!("Successfully read HF parquet data in {:?}", read_dur);
-
-    // Build filter predicate
-    debug!("Building filter predicate for subtypes: {:?}", subtypes);
-    let mut pred = col("subtype").eq(lit("__never__"));
-    for s in subtypes_for_scale_factor(args.scale_factor) {
-        pred = pred.or(col("subtype").eq(lit(s)));
-    }
-    df = df.filter(pred.and(col("is_land").eq(lit(true))))?;
-    info!("Applied subtype and is_land filters");
-
-    // df = df.sort(vec![col("id").sort(true, true)])?;
-    // debug!("Applied sorting by id");
-
-    let total = estimated_total_rows_for_sf(args.scale_factor);
-    let i = (args.part as i64) - 1; // 0-based part index
-    let parts = args.parts as i64;
-
-    let base = total / parts;
-    let rem = total % parts;
-
-    // first `rem` parts get one extra row
-    let rows_this = base + if i < rem { 1 } else { 0 };
-    let offset = i * base + std::cmp::min(i, rem);
-
-    info!(
-        "Partitioning data: total_rows={}, parts={}, base={}, rem={}, 
this_part_rows={}, offset={}",
-        total, parts, base, rem, rows_this, offset
-    );
-
-    df = df.limit(offset as usize, Some(rows_this as usize))?;
-    debug!("Applied limit with offset={}, rows={}", offset, rows_this);
-
-    ctx.register_table(TableReference::bare("zone_filtered"), df.into_view())?;
-    debug!("Registered filtered data as 'zone_filtered' table");
-
-    let sql = format!(
-        r#"
-        SELECT
-          CAST(ROW_NUMBER() OVER (ORDER BY id) + {offset} AS BIGINT) AS 
z_zonekey,
-          COALESCE(id, '')            AS z_gersid,
-          COALESCE(country, '')       AS z_country,
-          COALESCE(region,  '')       AS z_region,
-          COALESCE(names.primary, '') AS z_name,
-          COALESCE(subtype, '')       AS z_subtype,
-          geometry                    AS z_boundary
-        FROM zone_filtered
-        "#
-    );
-    debug!("Executing SQL transformation with offset: {}", offset);
-    let df2 = ctx.sql(&sql).await?;
-    info!("SQL transformation completed successfully");
-
-    let t0 = Instant::now();
-    info!("Starting data collection...");
-    let batches = df2.clone().collect().await?;
-    let collect_dur = t0.elapsed();
-
-    let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
-    info!(
-        "Collected {} record batches with {} total rows in {:?}",
-        batches.len(),
-        total_rows,
-        collect_dur
-    );
-
-    std::fs::create_dir_all(&args.output_dir)?;
-    debug!("Created output directory: {:?}", args.output_dir);
-
-    let out = args.output_filename();
-    info!("Writing output to: {}", out.display());
-
-    debug!(
-        "Created parquet writer properties with compression: {:?}",
-        args.parquet_compression
-    );
-
-    // Convert DFSchema to Arrow Schema
-    let schema = Arc::new(Schema::new(
-        df2.schema()
-            .fields()
-            .iter()
-            .map(|f| f.as_ref().clone())
-            .collect::<Vec<_>>(),
-    ));
-    debug!(
-        "Converted DataFusion schema to Arrow schema with {} fields",
-        schema.fields().len()
-    );
-
-    let t1 = Instant::now();
-    info!(
-        "Starting parquet file write with row group size: {} bytes",
-        args.parquet_row_group_bytes
-    );
-    write_parquet_with_rowgroup_bytes(
-        &out,
-        schema,
-        batches,
-        args.parquet_row_group_bytes,
-        args.parquet_compression,
-        args.scale_factor,
-        args.parts,
-    )?;
-    let write_dur = t1.elapsed();
-
-    info!(
-        "Zone -> {} (part {}/{}). collect={:?}, write={:?}, total_rows={}",
-        out.display(),
-        args.part,
-        args.parts,
-        collect_dur,
-        write_dur,
-        total_rows
-    );
-
-    Ok(())
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use parquet::basic::Compression;
-    use tempfile::TempDir;
-
-    fn create_test_args(scale_factor: f64, temp_dir: &TempDir) -> ZoneDfArgs {
-        ZoneDfArgs {
-            scale_factor,
-            output_dir: temp_dir.path().to_path_buf(),
-            parts: 1,
-            part: 1,
-            parquet_row_group_bytes: DEFAULT_PARQUET_ROW_GROUP_BYTES,
-            parquet_compression: Compression::SNAPPY,
-        }
-    }
-
-    #[tokio::test]
-    async fn test_zone_generation_invalid_part() {
-        let temp_dir = TempDir::new().unwrap();
-        let mut args = create_test_args(1.0, &temp_dir);
-        args.parts = 2;
-        args.part = 3; // Invalid part number
-
-        let result = generate_zone_parquet(args).await;
-        assert!(result.is_err(), "Should fail with invalid part number");
-    }
-
-    #[tokio::test]
-    async fn test_subtypes_for_different_scale_factors() {
-        // Test scale factor categorization
-        let sf_01_subtypes = subtypes_for_scale_factor(0.1);
-        assert_eq!(sf_01_subtypes, vec!["microhood", "macrohood", "county"]);
-
-        let sf_10_subtypes = subtypes_for_scale_factor(10.0);
-        assert_eq!(
-            sf_10_subtypes,
-            vec!["microhood", "macrohood", "county", "neighborhood"]
-        );
-
-        let sf_100_subtypes = subtypes_for_scale_factor(100.0);
-        assert!(sf_100_subtypes.contains(&"localadmin"));
-        assert!(sf_100_subtypes.contains(&"locality"));
-
-        let sf_1000_subtypes = subtypes_for_scale_factor(1000.0);
-        assert!(sf_1000_subtypes.contains(&"country"));
-    }
-
-    #[test]
-    fn test_partition_distribution_logic() {
-        // Test the mathematical logic for distributing rows across partitions
-        let total_rows = 100i64;
-        let parts = 3i64;
-
-        let mut collected_rows = Vec::new();
-        let mut collected_offsets = Vec::new();
-
-        // Simulate the partition calculation for each part
-        for part_idx in 0..parts {
-            let i = part_idx;
-            let base = total_rows / parts;
-            let rem = total_rows % parts;
-            let rows_this = base + if i < rem { 1 } else { 0 };
-            let offset = i * base + std::cmp::min(i, rem);
-
-            collected_rows.push(rows_this);
-            collected_offsets.push(offset);
-        }
-
-        // Verify partitioning logic
-        assert_eq!(collected_rows.iter().sum::<i64>(), total_rows); // All 
rows accounted for
-        assert_eq!(collected_offsets[0], 0); // First partition starts at 0
-
-        // Verify no gaps or overlaps between partitions
-        for i in 1..parts as usize {
-            let expected_offset = collected_offsets[i - 1] + collected_rows[i 
- 1];
-            assert_eq!(collected_offsets[i], expected_offset);
-        }
-
-        // Verify remainder distribution (first partitions get extra rows)
-        let remainder = (total_rows % parts) as usize;
-        for i in 0..remainder {
-            assert_eq!(collected_rows[i], collected_rows[remainder] + 1);
-        }
-    }
-
-    #[test]
-    fn test_rows_per_group_bounds() {
-        // Test that compute_rows_per_group_from_stats respects bounds
-
-        // Test minimum bound (should be at least 1000)
-        let rows_per_group_tiny = compute_rows_per_group_from_stats(0.001, 
1000, 1_000_000);
-        assert!(rows_per_group_tiny >= 1000);
-
-        // Test maximum bound (should not exceed 32767)
-        let rows_per_group_huge = compute_rows_per_group_from_stats(1000.0, 
1000, 1);
-        assert!(rows_per_group_huge <= 32767);
-
-        // Test negative target bytes falls back to default
-        let rows_per_group_negative = compute_rows_per_group_from_stats(1.0, 
100000, -1);
-        let rows_per_group_default =
-            compute_rows_per_group_from_stats(1.0, 100000, 
DEFAULT_PARQUET_ROW_GROUP_BYTES);
-        assert_eq!(rows_per_group_negative, rows_per_group_default);
-    }
-
-    #[test]
-    fn test_subtype_selection_logic() {
-        // Test the cumulative nature of subtype selection
-        let base_subtypes = subtypes_for_scale_factor(1.0);
-        let sf10_subtypes = subtypes_for_scale_factor(10.0);
-        let sf100_subtypes = subtypes_for_scale_factor(100.0);
-        let sf1000_subtypes = subtypes_for_scale_factor(1000.0);
-
-        // Each higher scale factor should include all previous subtypes
-        for subtype in &base_subtypes {
-            assert!(sf10_subtypes.contains(subtype));
-            assert!(sf100_subtypes.contains(subtype));
-            assert!(sf1000_subtypes.contains(subtype));
-        }
-
-        for subtype in &sf10_subtypes {
-            assert!(sf100_subtypes.contains(subtype));
-            assert!(sf1000_subtypes.contains(subtype));
-        }
-
-        for subtype in &sf100_subtypes {
-            assert!(sf1000_subtypes.contains(subtype));
-        }
-
-        // Verify progressive addition
-        assert!(sf10_subtypes.len() > base_subtypes.len());
-        assert!(sf100_subtypes.len() > sf10_subtypes.len());
-        assert!(sf1000_subtypes.len() > sf100_subtypes.len());
-    }
-
-    #[test]
-    fn test_estimated_rows_scaling_consistency() {
-        // Test that estimated rows scale proportionally for SF < 1.0
-        let base_rows = estimated_total_rows_for_sf(1.0);
-        let half_rows = estimated_total_rows_for_sf(0.5);
-        let quarter_rows = estimated_total_rows_for_sf(0.25);
-
-        // Should scale proportionally (within rounding)
-        assert!((half_rows as f64 - (base_rows as f64 * 0.5)).abs() < 1.0);
-        assert!((quarter_rows as f64 - (base_rows as f64 * 0.25)).abs() < 1.0);
-
-        // Test that SF >= 1.0 gives discrete jumps (not proportional scaling)
-        let sf1_rows = estimated_total_rows_for_sf(1.0);
-        let sf5_rows = estimated_total_rows_for_sf(5.0);
-        let sf10_rows = estimated_total_rows_for_sf(10.0);
-
-        // These should be equal (same category)
-        assert_eq!(sf1_rows, sf5_rows);
-
-        // This should be different (different category)
-        assert_ne!(sf5_rows, sf10_rows);
-    }
-}


Reply via email to