This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new bc98ac46 feat(rust/sedona-geoparquet): Support WKB validation in 
`read_parquet()` (#578)
bc98ac46 is described below

commit bc98ac461e7f313bd112d3047dba18f52713a6b2
Author: Yongting You <[email protected]>
AuthorDate: Tue Feb 10 07:23:33 2026 +0800

    feat(rust/sedona-geoparquet): Support WKB validation in `read_parquet()` 
(#578)
    
    Co-authored-by: Dewey Dunnington <[email protected]>
---
 Cargo.lock                                 |   1 +
 python/sedonadb/python/sedonadb/context.py |  18 ++-
 python/sedonadb/src/context.rs             |   2 +
 python/sedonadb/tests/io/test_parquet.py   |  77 ++++++++++
 rust/sedona-geoparquet/Cargo.toml          |   1 +
 rust/sedona-geoparquet/src/file_opener.rs  | 233 ++++++++++++++++++++++++-----
 rust/sedona-geoparquet/src/format.rs       |  57 ++++---
 rust/sedona-geoparquet/src/options.rs      |   2 +
 rust/sedona-geoparquet/src/provider.rs     |  14 ++
 rust/sedona-spatial-join/src/stream.rs     |   9 +-
 10 files changed, 344 insertions(+), 70 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index e5fefbe7..b9856cb6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5245,6 +5245,7 @@ dependencies = [
  "tempfile",
  "tokio",
  "url",
+ "wkb",
 ]
 
 [[package]]
diff --git a/python/sedonadb/python/sedonadb/context.py 
b/python/sedonadb/python/sedonadb/context.py
index 21a380ae..f0793870 100644
--- a/python/sedonadb/python/sedonadb/context.py
+++ b/python/sedonadb/python/sedonadb/context.py
@@ -128,6 +128,7 @@ class SedonaContext:
         table_paths: Union[str, Path, Iterable[str]],
         options: Optional[Dict[str, Any]] = None,
         geometry_columns: Optional[Union[str, Dict[str, Any]]] = None,
+        validate: bool = False,
     ) -> DataFrame:
         """Create a [DataFrame][sedonadb.dataframe.DataFrame] from one or more 
Parquet files
 
@@ -176,9 +177,18 @@ class SedonaContext:
 
 
                 Safety:
-                - Columns specified here are not validated against the 
provided options
-                  (e.g., WKB encoding checks); inconsistent data may cause 
undefined
-                  behavior.
+                - Columns specified here can optionally be validated according 
to the
+                  `validate` option (e.g., WKB encoding checks). If validation 
is not
+                  enabled, inconsistent data may cause undefined behavior.
+            validate:
+                When set to `True`, geometry column contents are validated 
against
+                their metadata. Metadata can come from the source Parquet file 
or
+                the user-provided `geometry_columns` option.
+                Only supported properties are validated; unsupported 
properties are
+                ignored. If validation fails, execution stops with an error.
+
+                Currently the only property that is validated is the WKB of 
input geometry
+                columns.
 
 
         Examples:
@@ -200,7 +210,7 @@ class SedonaContext:
         return DataFrame(
             self._impl,
             self._impl.read_parquet(
-                [str(path) for path in table_paths], options, geometry_columns
+                [str(path) for path in table_paths], options, 
geometry_columns, validate
             ),
             self.options,
         )
diff --git a/python/sedonadb/src/context.rs b/python/sedonadb/src/context.rs
index 3bb7c5e3..1647bd05 100644
--- a/python/sedonadb/src/context.rs
+++ b/python/sedonadb/src/context.rs
@@ -81,6 +81,7 @@ impl InternalContext {
         table_paths: Vec<String>,
         options: HashMap<String, PyObject>,
         geometry_columns: Option<String>,
+        validate: bool,
     ) -> Result<InternalDataFrame, PySedonaError> {
         // Convert Python options to strings, filtering out None values
         let rust_options: HashMap<String, String> = options
@@ -108,6 +109,7 @@ impl InternalContext {
                     PySedonaError::SedonaPython(format!("Invalid 
geometry_columns JSON: {e}"))
                 })?;
         }
+        geo_options = geo_options.with_validate(validate);
 
         let df = wait_for_future(
             py,
diff --git a/python/sedonadb/tests/io/test_parquet.py 
b/python/sedonadb/tests/io/test_parquet.py
index 7f87c027..c80d7478 100644
--- a/python/sedonadb/tests/io/test_parquet.py
+++ b/python/sedonadb/tests/io/test_parquet.py
@@ -22,6 +22,7 @@ from pathlib import Path
 import geopandas
 import geopandas.testing
 import pyarrow as pa
+import pyarrow.parquet as pq
 import pytest
 import sedonadb
 import shapely
@@ -412,3 +413,79 @@ def test_write_geoparquet_geography(con, geoarrow_data):
 
         table_roundtrip = con.read_parquet(tmp_parquet).to_arrow_table()
         assert table_roundtrip == table
+
+
+def test_read_parquet_validate_wkb_single_valid_row(con, tmp_path):
+    valid_wkb = bytes.fromhex("0101000000000000000000F03F0000000000000040")
+
+    table = pa.table({"id": [1], "geom": [valid_wkb]})
+    path = tmp_path / "single_valid_wkb.parquet"
+    pq.write_table(table, path)
+
+    geometry_columns = json.dumps({"geom": {"encoding": "WKB"}})
+
+    tab = con.read_parquet(
+        path, geometry_columns=geometry_columns, validate=False
+    ).to_arrow_table()
+    assert tab["geom"].type.extension_name == "geoarrow.wkb"
+    assert len(tab) == 1
+
+    tab = con.read_parquet(
+        path, geometry_columns=geometry_columns, validate=True
+    ).to_arrow_table()
+    assert tab["geom"].type.extension_name == "geoarrow.wkb"
+    assert len(tab) == 1
+
+
+def test_read_parquet_validate_wkb_single_invalid_row(con, tmp_path):
+    invalid_wkb = b"\x01"
+
+    table = pa.table({"id": [1], "geom": [invalid_wkb]})
+    path = tmp_path / "single_invalid_wkb.parquet"
+    pq.write_table(table, path)
+
+    geometry_columns = json.dumps({"geom": {"encoding": "WKB"}})
+
+    tab = con.read_parquet(
+        path, geometry_columns=geometry_columns, validate=False
+    ).to_arrow_table()
+    assert tab["geom"].type.extension_name == "geoarrow.wkb"
+    assert len(tab) == 1
+
+    with pytest.raises(
+        sedonadb._lib.SedonaError,
+        match=r"WKB validation failed",
+    ):
+        con.read_parquet(
+            path, geometry_columns=geometry_columns, validate=True
+        ).to_arrow_table()
+
+
+def test_read_parquet_validate_wkb_partial_invalid_rows(con, tmp_path):
+    valid_wkb = bytes.fromhex("0101000000000000000000F03F0000000000000040")
+    invalid_wkb = b"\x01"
+
+    table = pa.table(
+        {
+            "id": [1, 2, 3],
+            "geom": [valid_wkb, invalid_wkb, valid_wkb],
+        }
+    )
+    path = tmp_path / "partial_invalid_wkb.parquet"
+    pq.write_table(table, path)
+
+    geometry_columns = json.dumps({"geom": {"encoding": "WKB"}})
+
+    tab = con.read_parquet(
+        path, geometry_columns=geometry_columns, validate=False
+    ).to_arrow_table()
+    assert tab["geom"].type.extension_name == "geoarrow.wkb"
+    assert len(tab) == 3
+
+    with pytest.raises(
+        sedonadb._lib.SedonaError,
+        match=r"WKB validation failed",
+    ):
+        con.read_parquet(
+            path, geometry_columns=geometry_columns, validate=True
+        ).to_arrow_table()
diff --git a/rust/sedona-geoparquet/Cargo.toml 
b/rust/sedona-geoparquet/Cargo.toml
index ba65d5a2..f70fb4b8 100644
--- a/rust/sedona-geoparquet/Cargo.toml
+++ b/rust/sedona-geoparquet/Cargo.toml
@@ -67,3 +67,4 @@ sedona-schema = { workspace = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
 serde_with = { workspace = true }
+wkb = { workspace = true }
diff --git a/rust/sedona-geoparquet/src/file_opener.rs 
b/rust/sedona-geoparquet/src/file_opener.rs
index ae2a56d2..72719c88 100644
--- a/rust/sedona-geoparquet/src/file_opener.rs
+++ b/rust/sedona-geoparquet/src/file_opener.rs
@@ -16,17 +16,22 @@
 // under the License.
 use std::{collections::HashMap, sync::Arc};
 
-use arrow_schema::SchemaRef;
+use arrow_array::{Array, RecordBatch};
+use arrow_schema::{DataType, SchemaRef};
 use datafusion::datasource::{
     listing::PartitionedFile,
     physical_plan::{parquet::ParquetAccessPlan, FileOpenFuture, FileOpener},
 };
-use datafusion_common::Result;
+use datafusion_common::{
+    cast::{as_binary_array, as_binary_view_array, as_large_binary_array},
+    exec_err, Result,
+};
 use datafusion_datasource_parquet::metadata::DFParquetMetadata;
 use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_plan::metrics::{
     ExecutionPlanMetricsSet, MetricBuilder, MetricType, MetricValue, 
PruningMetrics,
 };
+use futures::StreamExt;
 use object_store::ObjectStore;
 use parquet::{
     basic::LogicalType,
@@ -47,7 +52,10 @@ use sedona_geometry::{
 };
 use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
 
-use crate::metadata::{GeoParquetColumnMetadata, GeoParquetMetadata};
+use crate::{
+    metadata::{GeoParquetColumnEncoding, GeoParquetMetadata},
+    options::TableGeoParquetOptions,
+};
 
 #[derive(Clone)]
 pub(crate) struct GeoParquetFileOpenerMetrics {
@@ -98,11 +106,11 @@ pub(crate) struct GeoParquetFileOpener {
     pub inner: Arc<dyn FileOpener>,
     pub object_store: Arc<dyn ObjectStore>,
     pub metadata_size_hint: Option<usize>,
-    pub predicate: Arc<dyn PhysicalExpr>,
+    pub predicate: Option<Arc<dyn PhysicalExpr>>,
     pub file_schema: SchemaRef,
     pub enable_pruning: bool,
     pub metrics: GeoParquetFileOpenerMetrics,
-    pub overrides: Option<HashMap<String, GeoParquetColumnMetadata>>,
+    pub options: TableGeoParquetOptions,
 }
 
 impl FileOpener for GeoParquetFileOpener {
@@ -118,37 +126,41 @@ impl FileOpener for GeoParquetFileOpener {
 
             let mut access_plan = 
ParquetAccessPlan::new_all(parquet_metadata.num_row_groups());
 
+            let maybe_geoparquet_metadata = 
GeoParquetMetadata::try_from_parquet_metadata(
+                &parquet_metadata,
+                self_clone.options.geometry_columns.as_ref(),
+            )?;
+
             if self_clone.enable_pruning {
-                let spatial_filter = 
SpatialFilter::try_from_expr(&self_clone.predicate)?;
-
-                if let Some(geoparquet_metadata) = 
GeoParquetMetadata::try_from_parquet_metadata(
-                    &parquet_metadata,
-                    self_clone.overrides.as_ref(),
-                )? {
-                    filter_access_plan_using_geoparquet_file_metadata(
-                        &self_clone.file_schema,
-                        &mut access_plan,
-                        &spatial_filter,
-                        &geoparquet_metadata,
-                        &self_clone.metrics,
-                    )?;
-
-                    filter_access_plan_using_geoparquet_covering(
-                        &self_clone.file_schema,
-                        &mut access_plan,
-                        &spatial_filter,
-                        &geoparquet_metadata,
-                        &parquet_metadata,
-                        &self_clone.metrics,
-                    )?;
-
-                    filter_access_plan_using_native_geostats(
-                        &self_clone.file_schema,
-                        &mut access_plan,
-                        &spatial_filter,
-                        &parquet_metadata,
-                        &self_clone.metrics,
-                    )?;
+                if let Some(predicate) = self_clone.predicate.as_ref() {
+                    let spatial_filter = 
SpatialFilter::try_from_expr(predicate)?;
+
+                    if let Some(geoparquet_metadata) = 
maybe_geoparquet_metadata.as_ref() {
+                        filter_access_plan_using_geoparquet_file_metadata(
+                            &self_clone.file_schema,
+                            &mut access_plan,
+                            &spatial_filter,
+                            geoparquet_metadata,
+                            &self_clone.metrics,
+                        )?;
+
+                        filter_access_plan_using_geoparquet_covering(
+                            &self_clone.file_schema,
+                            &mut access_plan,
+                            &spatial_filter,
+                            geoparquet_metadata,
+                            &parquet_metadata,
+                            &self_clone.metrics,
+                        )?;
+
+                        filter_access_plan_using_native_geostats(
+                            &self_clone.file_schema,
+                            &mut access_plan,
+                            &spatial_filter,
+                            &parquet_metadata,
+                            &self_clone.metrics,
+                        )?;
+                    }
                 }
             }
 
@@ -158,12 +170,110 @@ impl FileOpener for GeoParquetFileOpener {
             // We could also consider filtering using null_count here in the 
future (i.e.,
             // skip row groups that are all null)
             let file = file.with_extensions(Arc::new(access_plan));
+            let stream = self_clone.inner.open(file)?.await?;
+
+            // Validate geometry columns when enabled from read option.
+            let validation_columns = if self_clone.options.validate {
+                maybe_geoparquet_metadata
+                    .as_ref()
+                    .map(|metadata| 
wkb_validation_columns(&self_clone.file_schema, metadata))
+                    .unwrap_or_default()
+            } else {
+                Vec::new()
+            };
 
-            self_clone.inner.open(file)?.await
+            if !self_clone.options.validate || validation_columns.is_empty() {
+                return Ok(stream);
+            }
+
+            let validated_stream = stream.map(move |batch_result| {
+                let batch = batch_result?;
+                validate_wkb_batch(&batch, &validation_columns)?;
+                Ok(batch)
+            });
+
+            Ok(Box::pin(validated_stream))
         }))
     }
 }
 
+fn wkb_validation_columns(
+    file_schema: &SchemaRef,
+    metadata: &GeoParquetMetadata,
+) -> Vec<(usize, String)> {
+    file_schema
+        .fields()
+        .iter()
+        .enumerate()
+        .filter_map(|(column_index, field)| {
+            metadata
+                .columns
+                .get(field.name())
+                .and_then(|column_metadata| {
+                    if matches!(column_metadata.encoding, 
GeoParquetColumnEncoding::WKB) {
+                        Some((column_index, field.name().clone()))
+                    } else {
+                        None
+                    }
+                })
+        })
+        .collect()
+}
+
+fn validate_wkb_batch(batch: &RecordBatch, validation_columns: &[(usize, 
String)]) -> Result<()> {
+    for (column_index, column_name) in validation_columns {
+        let column = batch.column(*column_index);
+        validate_wkb_array(column.as_ref(), column_name)?;
+    }
+    Ok(())
+}
+
+fn validate_wkb_array(array: &dyn Array, column_name: &str) -> Result<()> {
+    match array.data_type() {
+        DataType::Binary => {
+            let array = as_binary_array(array)?;
+            validate_wkb_values(array.iter(), column_name)?;
+        }
+        DataType::LargeBinary => {
+            let array = as_large_binary_array(array)?;
+            validate_wkb_values(array.iter(), column_name)?;
+        }
+        DataType::BinaryView => {
+            let array = as_binary_view_array(array)?;
+            validate_wkb_values(array.iter(), column_name)?;
+        }
+        other => {
+            return exec_err!(
+                "Expected Binary/LargeBinary/BinaryView storage for WKB 
validation in column '{}' but got {}",
+                column_name,
+                other
+            );
+        }
+    }
+
+    Ok(())
+}
+
+fn validate_wkb_values<'a>(
+    values: impl IntoIterator<Item = Option<&'a [u8]>>,
+    column_name: &str,
+) -> Result<()> {
+    for (row_index, maybe_wkb) in values.into_iter().enumerate() {
+        if let Some(wkb_bytes) = maybe_wkb {
+            if let Err(e) = wkb::reader::read_wkb(wkb_bytes) {
+                return exec_err!(
+                    "WKB validation failed for column '{}' at row {}: {}",
+                    column_name,
+                    row_index,
+                    e
+                );
+            }
+        }
+    }
+
+    Ok(())
+}
+
 /// Filter an access plan using the GeoParquet file metadata
 ///
 /// Inspects the GeoParquetMetadata for a bbox at the column metadata level
@@ -565,6 +675,9 @@ pub fn storage_schema_contains_geo(schema: &SchemaRef) -> 
bool {
 
 #[cfg(test)]
 mod test {
+    use std::sync::Arc;
+
+    use arrow_array::{ArrayRef, BinaryArray, BinaryViewArray, Int64Array, 
RecordBatch};
     use arrow_schema::{DataType, Field, Schema};
     use parquet::{
         arrow::ArrowSchemaConverter,
@@ -1199,6 +1312,54 @@ mod test {
         assert!(result.geometry_types().is_some());
     }
 
+    #[test]
+    fn validate_wkb_array_binary() {
+        let valid_point_wkb: [u8; 21] = [
+            0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0xf0, 0x3f, 0x00,
+            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40,
+        ];
+
+        let valid_array: BinaryArray = [Some(valid_point_wkb.as_slice()), 
None].iter().collect();
+        validate_wkb_array(&valid_array, "geom").unwrap();
+
+        let invalid_array: BinaryArray = [Some(&b"\x01"[..]), 
None].iter().collect();
+        let err = validate_wkb_array(&invalid_array, "geom").unwrap_err();
+        assert!(err.to_string().contains("WKB validation failed"));
+    }
+
+    #[test]
+    fn validate_wkb_array_binary_view() {
+        let valid_point_wkb: [u8; 21] = [
+            0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0xf0, 0x3f, 0x00,
+            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40,
+        ];
+
+        let valid_array: BinaryViewArray =
+            [Some(valid_point_wkb.as_slice()), None].iter().collect();
+        validate_wkb_array(&valid_array, "geom").unwrap();
+
+        let invalid_array: BinaryViewArray = [Some(&b"\x01"[..]), 
None].iter().collect();
+        let err = validate_wkb_array(&invalid_array, "geom").unwrap_err();
+        assert!(err.to_string().contains("WKB validation failed"));
+    }
+
+    #[test]
+    fn validate_wkb_batch_errors_on_invalid_wkb() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int64, true),
+            Field::new("geom", DataType::Binary, true),
+        ]));
+
+        let id_column: ArrayRef = Arc::new(Int64Array::from(vec![Some(1)]));
+        let geom_array: BinaryArray = [Some(&b"\x01"[..])].iter().collect();
+        let geom_column: ArrayRef = Arc::new(geom_array);
+
+        let batch = RecordBatch::try_new(schema, vec![id_column, 
geom_column]).unwrap();
+        let validation_columns = vec![(1, "geom".to_string())];
+        let err = validate_wkb_batch(&batch, &validation_columns).unwrap_err();
+        assert!(err.to_string().contains("WKB validation failed"));
+    }
+
     fn file_schema_with_covering() -> SchemaRef {
         Arc::new(Schema::new(vec![
             Field::new("not_geo", DataType::Binary, true),
diff --git a/rust/sedona-geoparquet/src/format.rs 
b/rust/sedona-geoparquet/src/format.rs
index 352d23a3..b8d50cd6 100644
--- a/rust/sedona-geoparquet/src/format.rs
+++ b/rust/sedona-geoparquet/src/format.rs
@@ -53,7 +53,7 @@ use sedona_schema::extension_type::ExtensionType;
 
 use crate::{
     file_opener::{storage_schema_contains_geo, GeoParquetFileOpener, 
GeoParquetFileOpenerMetrics},
-    metadata::{GeoParquetColumnEncoding, GeoParquetColumnMetadata, 
GeoParquetMetadata},
+    metadata::{GeoParquetColumnEncoding, GeoParquetMetadata},
     options::TableGeoParquetOptions,
     writer::create_geoparquet_writer_physical_plan,
 };
@@ -341,10 +341,11 @@ impl FileFormat for GeoParquetFormat {
     }
 
     fn file_source(&self) -> Arc<dyn FileSource> {
-        Arc::new(
+        let mut source =
             
GeoParquetFileSource::try_from_file_source(self.inner().file_source(), None, 
None)
-                .unwrap(),
-        )
+                .unwrap();
+        source.options = self.options.clone();
+        Arc::new(source)
     }
 }
 
@@ -361,7 +362,7 @@ pub struct GeoParquetFileSource {
     inner: ParquetSource,
     metadata_size_hint: Option<usize>,
     predicate: Option<Arc<dyn PhysicalExpr>>,
-    overrides: Option<HashMap<String, GeoParquetColumnMetadata>>,
+    options: TableGeoParquetOptions,
 }
 
 impl GeoParquetFileSource {
@@ -371,7 +372,7 @@ impl GeoParquetFileSource {
             inner: ParquetSource::new(options.inner.clone()),
             metadata_size_hint: None,
             predicate: None,
-            overrides: options.geometry_columns.clone(),
+            options,
         }
     }
 
@@ -419,7 +420,9 @@ impl GeoParquetFileSource {
                 inner: parquet_source.clone(),
                 metadata_size_hint,
                 predicate: new_predicate,
-                overrides: None,
+                options: TableGeoParquetOptions::from(
+                    parquet_source.table_parquet_options().clone(),
+                ),
             })
         } else {
             sedona_internal_err!("GeoParquetFileSource constructed from 
non-ParquetSource")
@@ -432,7 +435,7 @@ impl GeoParquetFileSource {
             inner: self.inner.with_predicate(predicate.clone()),
             metadata_size_hint: self.metadata_size_hint,
             predicate: Some(predicate),
-            overrides: self.overrides.clone(),
+            options: self.options.clone(),
         }
     }
 
@@ -457,7 +460,7 @@ impl GeoParquetFileSource {
             inner: parquet_source,
             metadata_size_hint: self.metadata_size_hint,
             predicate: self.predicate.clone(),
-            overrides: self.overrides.clone(),
+            options: self.options.clone(),
         }
     }
 
@@ -467,7 +470,7 @@ impl GeoParquetFileSource {
             inner: self.inner.clone().with_metadata_size_hint(hint),
             metadata_size_hint: Some(hint),
             predicate: self.predicate.clone(),
-            overrides: self.overrides.clone(),
+            options: self.options.clone(),
         }
     }
 }
@@ -483,8 +486,7 @@ impl FileSource for GeoParquetFileSource {
             self.inner
                 .create_file_opener(object_store.clone(), base_config, 
partition);
 
-        // If there are no geo columns or no pruning predicate, just return 
the inner opener
-        if self.predicate.is_none() || 
!storage_schema_contains_geo(base_config.file_schema()) {
+        if !storage_schema_contains_geo(base_config.file_schema()) {
             return inner_opener;
         }
 
@@ -492,13 +494,13 @@ impl FileSource for GeoParquetFileSource {
             inner: inner_opener,
             object_store,
             metadata_size_hint: self.metadata_size_hint,
-            predicate: self.predicate.clone().unwrap(),
+            predicate: self.predicate.clone(),
             file_schema: base_config.file_schema().clone(),
             enable_pruning: self.inner.table_parquet_options().global.pruning,
             // HACK: Since there is no public API to set inner's metrics, so 
we use
             // inner's metrics as the ExecutionPlan-global metrics
             metrics: GeoParquetFileOpenerMetrics::new(self.inner.metrics()),
-            overrides: self.overrides.clone(),
+            options: self.options.clone(),
         })
     }
 
@@ -516,8 +518,7 @@ impl FileSource for GeoParquetFileSource {
                     // TODO should this be None?
                     None,
                 )?;
-                // TODO: part of try_from_file_source()?
-                updated_inner.overrides = self.overrides.clone();
+                updated_inner.options = self.options.clone();
                 Ok(inner_result.with_updated_node(Arc::new(updated_inner)))
             }
             None => Ok(inner_result),
@@ -529,35 +530,43 @@ impl FileSource for GeoParquetFileSource {
     }
 
     fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
-        Arc::new(Self::from_file_source(
+        let mut source = Self::from_file_source(
             self.inner.with_batch_size(batch_size),
             self.metadata_size_hint,
             self.predicate.clone(),
-        ))
+        );
+        source.options = self.options.clone();
+        Arc::new(source)
     }
 
     fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
-        Arc::new(Self::from_file_source(
+        let mut source = Self::from_file_source(
             self.inner.with_schema(schema),
             self.metadata_size_hint,
             self.predicate.clone(),
-        ))
+        );
+        source.options = self.options.clone();
+        Arc::new(source)
     }
 
     fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
-        Arc::new(Self::from_file_source(
+        let mut source = Self::from_file_source(
             self.inner.with_projection(config),
             self.metadata_size_hint,
             self.predicate.clone(),
-        ))
+        );
+        source.options = self.options.clone();
+        Arc::new(source)
     }
 
     fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
-        Arc::new(Self::from_file_source(
+        let mut source = Self::from_file_source(
             self.inner.with_statistics(statistics),
             self.metadata_size_hint,
             self.predicate.clone(),
-        ))
+        );
+        source.options = self.options.clone();
+        Arc::new(source)
     }
 
     fn metrics(&self) -> &ExecutionPlanMetricsSet {
diff --git a/rust/sedona-geoparquet/src/options.rs 
b/rust/sedona-geoparquet/src/options.rs
index eaa53dd5..2cc1fd12 100644
--- a/rust/sedona-geoparquet/src/options.rs
+++ b/rust/sedona-geoparquet/src/options.rs
@@ -34,6 +34,8 @@ pub struct TableGeoParquetOptions {
     pub overwrite_bbox_columns: bool,
     /// Optional geometry column metadata overrides for schema inference.
     pub geometry_columns: Option<HashMap<String, GeoParquetColumnMetadata>>,
+    /// Validate geometry column contents against metadata when reading.
+    pub validate: bool,
 }
 
 impl TableGeoParquetOptions {
diff --git a/rust/sedona-geoparquet/src/provider.rs 
b/rust/sedona-geoparquet/src/provider.rs
index be2f5925..b9d2ba52 100644
--- a/rust/sedona-geoparquet/src/provider.rs
+++ b/rust/sedona-geoparquet/src/provider.rs
@@ -84,6 +84,7 @@ pub struct GeoParquetReadOptions<'a> {
     inner: ParquetReadOptions<'a>,
     table_options: Option<HashMap<String, String>>,
     geometry_columns: Option<HashMap<String, GeoParquetColumnMetadata>>,
+    validate: bool,
 }
 
 impl GeoParquetReadOptions<'_> {
@@ -189,6 +190,7 @@ impl GeoParquetReadOptions<'_> {
             inner: ParquetReadOptions::default(),
             table_options: Some(options),
             geometry_columns: None,
+            validate: false,
         })
     }
 
@@ -214,6 +216,17 @@ impl GeoParquetReadOptions<'_> {
     pub fn geometry_columns(&self) -> Option<&HashMap<String, 
GeoParquetColumnMetadata>> {
         self.geometry_columns.as_ref()
     }
+
+    /// Enable/disable geometry content validation.
+    pub fn with_validate(mut self, validate: bool) -> Self {
+        self.validate = validate;
+        self
+    }
+
+    /// Get whether geometry content validation is enabled.
+    pub fn validate(&self) -> bool {
+        self.validate
+    }
 }
 
 fn parse_geometry_columns_json(
@@ -252,6 +265,7 @@ impl ReadOptions<'_> for GeoParquetReadOptions<'_> {
             if let Some(geometry_columns) = &self.geometry_columns {
                 geoparquet_options.geometry_columns = 
Some(geometry_columns.clone());
             }
+            geoparquet_options.validate = self.validate;
             options.format = 
Arc::new(GeoParquetFormat::new(geoparquet_options));
             return options;
         }
diff --git a/rust/sedona-spatial-join/src/stream.rs 
b/rust/sedona-spatial-join/src/stream.rs
index 177e7b0f..9233e366 100644
--- a/rust/sedona-spatial-join/src/stream.rs
+++ b/rust/sedona-spatial-join/src/stream.rs
@@ -1921,12 +1921,9 @@ mod tests {
             pos: 0,
         };
         let mut produced_probe_indices: Vec<u32> = Vec::new();
-        loop {
-            let Some((_, probe_indices)) =
-                progress.indices_for_next_batch(JoinSide::Left, join_type, 
max_batch_size)
-            else {
-                break;
-            };
+        while let Some((_, probe_indices)) =
+            progress.indices_for_next_batch(JoinSide::Left, join_type, 
max_batch_size)
+        {
             let probe_indices = probe_indices.to_vec();
             let adjust_range = progress.next_probe_range(&probe_indices);
             let build_indices = UInt64Array::from(vec![0; 
probe_indices.len()]);

Reply via email to