This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new bc98ac46 feat(rust/sedona-geoparquet): Support WKB validation in
`read_parquet()` (#578)
bc98ac46 is described below
commit bc98ac461e7f313bd112d3047dba18f52713a6b2
Author: Yongting You <[email protected]>
AuthorDate: Tue Feb 10 07:23:33 2026 +0800
feat(rust/sedona-geoparquet): Support WKB validation in `read_parquet()`
(#578)
Co-authored-by: Dewey Dunnington <[email protected]>
---
Cargo.lock | 1 +
python/sedonadb/python/sedonadb/context.py | 18 ++-
python/sedonadb/src/context.rs | 2 +
python/sedonadb/tests/io/test_parquet.py | 77 ++++++++++
rust/sedona-geoparquet/Cargo.toml | 1 +
rust/sedona-geoparquet/src/file_opener.rs | 233 ++++++++++++++++++++++++-----
rust/sedona-geoparquet/src/format.rs | 57 ++++---
rust/sedona-geoparquet/src/options.rs | 2 +
rust/sedona-geoparquet/src/provider.rs | 14 ++
rust/sedona-spatial-join/src/stream.rs | 9 +-
10 files changed, 344 insertions(+), 70 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index e5fefbe7..b9856cb6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5245,6 +5245,7 @@ dependencies = [
"tempfile",
"tokio",
"url",
+ "wkb",
]
[[package]]
diff --git a/python/sedonadb/python/sedonadb/context.py
b/python/sedonadb/python/sedonadb/context.py
index 21a380ae..f0793870 100644
--- a/python/sedonadb/python/sedonadb/context.py
+++ b/python/sedonadb/python/sedonadb/context.py
@@ -128,6 +128,7 @@ class SedonaContext:
table_paths: Union[str, Path, Iterable[str]],
options: Optional[Dict[str, Any]] = None,
geometry_columns: Optional[Union[str, Dict[str, Any]]] = None,
+ validate: bool = False,
) -> DataFrame:
"""Create a [DataFrame][sedonadb.dataframe.DataFrame] from one or more
Parquet files
@@ -176,9 +177,18 @@ class SedonaContext:
Safety:
- - Columns specified here are not validated against the
provided options
- (e.g., WKB encoding checks); inconsistent data may cause
undefined
- behavior.
+ - Columns specified here can optionally be validated according
to the
+ `validate` option (e.g., WKB encoding checks). If validation
is not
+ enabled, inconsistent data may cause undefined behavior.
+ validate:
+ When set to `True`, geometry column contents are validated
against
+ their metadata. Metadata can come from the source Parquet file
or
+ the user-provided `geometry_columns` option.
+ Only supported properties are validated; unsupported
properties are
+ ignored. If validation fails, execution stops with an error.
+
+ Currently the only property that is validated is the WKB of
input geometry
+ columns.
Examples:
@@ -200,7 +210,7 @@ class SedonaContext:
return DataFrame(
self._impl,
self._impl.read_parquet(
- [str(path) for path in table_paths], options, geometry_columns
+ [str(path) for path in table_paths], options,
geometry_columns, validate
),
self.options,
)
diff --git a/python/sedonadb/src/context.rs b/python/sedonadb/src/context.rs
index 3bb7c5e3..1647bd05 100644
--- a/python/sedonadb/src/context.rs
+++ b/python/sedonadb/src/context.rs
@@ -81,6 +81,7 @@ impl InternalContext {
table_paths: Vec<String>,
options: HashMap<String, PyObject>,
geometry_columns: Option<String>,
+ validate: bool,
) -> Result<InternalDataFrame, PySedonaError> {
// Convert Python options to strings, filtering out None values
let rust_options: HashMap<String, String> = options
@@ -108,6 +109,7 @@ impl InternalContext {
PySedonaError::SedonaPython(format!("Invalid
geometry_columns JSON: {e}"))
})?;
}
+ geo_options = geo_options.with_validate(validate);
let df = wait_for_future(
py,
diff --git a/python/sedonadb/tests/io/test_parquet.py
b/python/sedonadb/tests/io/test_parquet.py
index 7f87c027..c80d7478 100644
--- a/python/sedonadb/tests/io/test_parquet.py
+++ b/python/sedonadb/tests/io/test_parquet.py
@@ -22,6 +22,7 @@ from pathlib import Path
import geopandas
import geopandas.testing
import pyarrow as pa
+import pyarrow.parquet as pq
import pytest
import sedonadb
import shapely
@@ -412,3 +413,79 @@ def test_write_geoparquet_geography(con, geoarrow_data):
table_roundtrip = con.read_parquet(tmp_parquet).to_arrow_table()
assert table_roundtrip == table
+
+
+def test_read_parquet_validate_wkb_single_valid_row(con, tmp_path):
+ valid_wkb = bytes.fromhex("0101000000000000000000F03F0000000000000040")
+
+ table = pa.table({"id": [1], "geom": [valid_wkb]})
+ path = tmp_path / "single_valid_wkb.parquet"
+ pq.write_table(table, path)
+
+ geometry_columns = json.dumps({"geom": {"encoding": "WKB"}})
+
+ tab = con.read_parquet(
+ path, geometry_columns=geometry_columns, validate=False
+ ).to_arrow_table()
+ assert tab["geom"].type.extension_name == "geoarrow.wkb"
+ assert len(tab) == 1
+
+ tab = con.read_parquet(
+ path, geometry_columns=geometry_columns, validate=True
+ ).to_arrow_table()
+ assert tab["geom"].type.extension_name == "geoarrow.wkb"
+ assert len(tab) == 1
+
+
+def test_read_parquet_validate_wkb_single_invalid_row(con, tmp_path):
+ invalid_wkb = b"\x01"
+
+ table = pa.table({"id": [1], "geom": [invalid_wkb]})
+ path = tmp_path / "single_invalid_wkb.parquet"
+ pq.write_table(table, path)
+
+ geometry_columns = json.dumps({"geom": {"encoding": "WKB"}})
+
+ tab = con.read_parquet(
+ path, geometry_columns=geometry_columns, validate=False
+ ).to_arrow_table()
+ assert tab["geom"].type.extension_name == "geoarrow.wkb"
+ assert len(tab) == 1
+
+ with pytest.raises(
+ sedonadb._lib.SedonaError,
+ match=r"WKB validation failed",
+ ):
+ con.read_parquet(
+ path, geometry_columns=geometry_columns, validate=True
+ ).to_arrow_table()
+
+
+def test_read_parquet_validate_wkb_partial_invalid_rows(con, tmp_path):
+ valid_wkb = bytes.fromhex("0101000000000000000000F03F0000000000000040")
+ invalid_wkb = b"\x01"
+
+ table = pa.table(
+ {
+ "id": [1, 2, 3],
+ "geom": [valid_wkb, invalid_wkb, valid_wkb],
+ }
+ )
+ path = tmp_path / "partial_invalid_wkb.parquet"
+ pq.write_table(table, path)
+
+ geometry_columns = json.dumps({"geom": {"encoding": "WKB"}})
+
+ tab = con.read_parquet(
+ path, geometry_columns=geometry_columns, validate=False
+ ).to_arrow_table()
+ assert tab["geom"].type.extension_name == "geoarrow.wkb"
+ assert len(tab) == 3
+
+ with pytest.raises(
+ sedonadb._lib.SedonaError,
+ match=r"WKB validation failed",
+ ):
+ con.read_parquet(
+ path, geometry_columns=geometry_columns, validate=True
+ ).to_arrow_table()
diff --git a/rust/sedona-geoparquet/Cargo.toml
b/rust/sedona-geoparquet/Cargo.toml
index ba65d5a2..f70fb4b8 100644
--- a/rust/sedona-geoparquet/Cargo.toml
+++ b/rust/sedona-geoparquet/Cargo.toml
@@ -67,3 +67,4 @@ sedona-schema = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
+wkb = { workspace = true }
diff --git a/rust/sedona-geoparquet/src/file_opener.rs
b/rust/sedona-geoparquet/src/file_opener.rs
index ae2a56d2..72719c88 100644
--- a/rust/sedona-geoparquet/src/file_opener.rs
+++ b/rust/sedona-geoparquet/src/file_opener.rs
@@ -16,17 +16,22 @@
// under the License.
use std::{collections::HashMap, sync::Arc};
-use arrow_schema::SchemaRef;
+use arrow_array::{Array, RecordBatch};
+use arrow_schema::{DataType, SchemaRef};
use datafusion::datasource::{
listing::PartitionedFile,
physical_plan::{parquet::ParquetAccessPlan, FileOpenFuture, FileOpener},
};
-use datafusion_common::Result;
+use datafusion_common::{
+ cast::{as_binary_array, as_binary_view_array, as_large_binary_array},
+ exec_err, Result,
+};
use datafusion_datasource_parquet::metadata::DFParquetMetadata;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricType, MetricValue,
PruningMetrics,
};
+use futures::StreamExt;
use object_store::ObjectStore;
use parquet::{
basic::LogicalType,
@@ -47,7 +52,10 @@ use sedona_geometry::{
};
use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
-use crate::metadata::{GeoParquetColumnMetadata, GeoParquetMetadata};
+use crate::{
+ metadata::{GeoParquetColumnEncoding, GeoParquetMetadata},
+ options::TableGeoParquetOptions,
+};
#[derive(Clone)]
pub(crate) struct GeoParquetFileOpenerMetrics {
@@ -98,11 +106,11 @@ pub(crate) struct GeoParquetFileOpener {
pub inner: Arc<dyn FileOpener>,
pub object_store: Arc<dyn ObjectStore>,
pub metadata_size_hint: Option<usize>,
- pub predicate: Arc<dyn PhysicalExpr>,
+ pub predicate: Option<Arc<dyn PhysicalExpr>>,
pub file_schema: SchemaRef,
pub enable_pruning: bool,
pub metrics: GeoParquetFileOpenerMetrics,
- pub overrides: Option<HashMap<String, GeoParquetColumnMetadata>>,
+ pub options: TableGeoParquetOptions,
}
impl FileOpener for GeoParquetFileOpener {
@@ -118,37 +126,41 @@ impl FileOpener for GeoParquetFileOpener {
let mut access_plan =
ParquetAccessPlan::new_all(parquet_metadata.num_row_groups());
+ let maybe_geoparquet_metadata =
GeoParquetMetadata::try_from_parquet_metadata(
+ &parquet_metadata,
+ self_clone.options.geometry_columns.as_ref(),
+ )?;
+
if self_clone.enable_pruning {
- let spatial_filter =
SpatialFilter::try_from_expr(&self_clone.predicate)?;
-
- if let Some(geoparquet_metadata) =
GeoParquetMetadata::try_from_parquet_metadata(
- &parquet_metadata,
- self_clone.overrides.as_ref(),
- )? {
- filter_access_plan_using_geoparquet_file_metadata(
- &self_clone.file_schema,
- &mut access_plan,
- &spatial_filter,
- &geoparquet_metadata,
- &self_clone.metrics,
- )?;
-
- filter_access_plan_using_geoparquet_covering(
- &self_clone.file_schema,
- &mut access_plan,
- &spatial_filter,
- &geoparquet_metadata,
- &parquet_metadata,
- &self_clone.metrics,
- )?;
-
- filter_access_plan_using_native_geostats(
- &self_clone.file_schema,
- &mut access_plan,
- &spatial_filter,
- &parquet_metadata,
- &self_clone.metrics,
- )?;
+ if let Some(predicate) = self_clone.predicate.as_ref() {
+ let spatial_filter =
SpatialFilter::try_from_expr(predicate)?;
+
+ if let Some(geoparquet_metadata) =
maybe_geoparquet_metadata.as_ref() {
+ filter_access_plan_using_geoparquet_file_metadata(
+ &self_clone.file_schema,
+ &mut access_plan,
+ &spatial_filter,
+ geoparquet_metadata,
+ &self_clone.metrics,
+ )?;
+
+ filter_access_plan_using_geoparquet_covering(
+ &self_clone.file_schema,
+ &mut access_plan,
+ &spatial_filter,
+ geoparquet_metadata,
+ &parquet_metadata,
+ &self_clone.metrics,
+ )?;
+
+ filter_access_plan_using_native_geostats(
+ &self_clone.file_schema,
+ &mut access_plan,
+ &spatial_filter,
+ &parquet_metadata,
+ &self_clone.metrics,
+ )?;
+ }
}
}
@@ -158,12 +170,110 @@ impl FileOpener for GeoParquetFileOpener {
// We could also consider filtering using null_count here in the
future (i.e.,
// skip row groups that are all null)
let file = file.with_extensions(Arc::new(access_plan));
+ let stream = self_clone.inner.open(file)?.await?;
+
+ // Validate geometry columns when enabled from read option.
+ let validation_columns = if self_clone.options.validate {
+ maybe_geoparquet_metadata
+ .as_ref()
+ .map(|metadata|
wkb_validation_columns(&self_clone.file_schema, metadata))
+ .unwrap_or_default()
+ } else {
+ Vec::new()
+ };
- self_clone.inner.open(file)?.await
+ if !self_clone.options.validate || validation_columns.is_empty() {
+ return Ok(stream);
+ }
+
+ let validated_stream = stream.map(move |batch_result| {
+ let batch = batch_result?;
+ validate_wkb_batch(&batch, &validation_columns)?;
+ Ok(batch)
+ });
+
+ Ok(Box::pin(validated_stream))
}))
}
}
+fn wkb_validation_columns(
+ file_schema: &SchemaRef,
+ metadata: &GeoParquetMetadata,
+) -> Vec<(usize, String)> {
+ file_schema
+ .fields()
+ .iter()
+ .enumerate()
+ .filter_map(|(column_index, field)| {
+ metadata
+ .columns
+ .get(field.name())
+ .and_then(|column_metadata| {
+ if matches!(column_metadata.encoding,
GeoParquetColumnEncoding::WKB) {
+ Some((column_index, field.name().clone()))
+ } else {
+ None
+ }
+ })
+ })
+ .collect()
+}
+
+fn validate_wkb_batch(batch: &RecordBatch, validation_columns: &[(usize,
String)]) -> Result<()> {
+ for (column_index, column_name) in validation_columns {
+ let column = batch.column(*column_index);
+ validate_wkb_array(column.as_ref(), column_name)?;
+ }
+ Ok(())
+}
+
+fn validate_wkb_array(array: &dyn Array, column_name: &str) -> Result<()> {
+ match array.data_type() {
+ DataType::Binary => {
+ let array = as_binary_array(array)?;
+ validate_wkb_values(array.iter(), column_name)?;
+ }
+ DataType::LargeBinary => {
+ let array = as_large_binary_array(array)?;
+ validate_wkb_values(array.iter(), column_name)?;
+ }
+ DataType::BinaryView => {
+ let array = as_binary_view_array(array)?;
+ validate_wkb_values(array.iter(), column_name)?;
+ }
+ other => {
+ return exec_err!(
+ "Expected Binary/LargeBinary/BinaryView storage for WKB
validation in column '{}' but got {}",
+ column_name,
+ other
+ );
+ }
+ }
+
+ Ok(())
+}
+
+fn validate_wkb_values<'a>(
+ values: impl IntoIterator<Item = Option<&'a [u8]>>,
+ column_name: &str,
+) -> Result<()> {
+ for (row_index, maybe_wkb) in values.into_iter().enumerate() {
+ if let Some(wkb_bytes) = maybe_wkb {
+ if let Err(e) = wkb::reader::read_wkb(wkb_bytes) {
+ return exec_err!(
+ "WKB validation failed for column '{}' at row {}: {}",
+ column_name,
+ row_index,
+ e
+ );
+ }
+ }
+ }
+
+ Ok(())
+}
+
/// Filter an access plan using the GeoParquet file metadata
///
/// Inspects the GeoParquetMetadata for a bbox at the column metadata level
@@ -565,6 +675,9 @@ pub fn storage_schema_contains_geo(schema: &SchemaRef) ->
bool {
#[cfg(test)]
mod test {
+ use std::sync::Arc;
+
+ use arrow_array::{ArrayRef, BinaryArray, BinaryViewArray, Int64Array,
RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use parquet::{
arrow::ArrowSchemaConverter,
@@ -1199,6 +1312,54 @@ mod test {
assert!(result.geometry_types().is_some());
}
+ #[test]
+ fn validate_wkb_array_binary() {
+ let valid_point_wkb: [u8; 21] = [
+ 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0xf0, 0x3f, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40,
+ ];
+
+ let valid_array: BinaryArray = [Some(valid_point_wkb.as_slice()),
None].iter().collect();
+ validate_wkb_array(&valid_array, "geom").unwrap();
+
+ let invalid_array: BinaryArray = [Some(&b"\x01"[..]),
None].iter().collect();
+ let err = validate_wkb_array(&invalid_array, "geom").unwrap_err();
+ assert!(err.to_string().contains("WKB validation failed"));
+ }
+
+ #[test]
+ fn validate_wkb_array_binary_view() {
+ let valid_point_wkb: [u8; 21] = [
+ 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0xf0, 0x3f, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40,
+ ];
+
+ let valid_array: BinaryViewArray =
+ [Some(valid_point_wkb.as_slice()), None].iter().collect();
+ validate_wkb_array(&valid_array, "geom").unwrap();
+
+ let invalid_array: BinaryViewArray = [Some(&b"\x01"[..]),
None].iter().collect();
+ let err = validate_wkb_array(&invalid_array, "geom").unwrap_err();
+ assert!(err.to_string().contains("WKB validation failed"));
+ }
+
+ #[test]
+ fn validate_wkb_batch_errors_on_invalid_wkb() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int64, true),
+ Field::new("geom", DataType::Binary, true),
+ ]));
+
+ let id_column: ArrayRef = Arc::new(Int64Array::from(vec![Some(1)]));
+ let geom_array: BinaryArray = [Some(&b"\x01"[..])].iter().collect();
+ let geom_column: ArrayRef = Arc::new(geom_array);
+
+ let batch = RecordBatch::try_new(schema, vec![id_column,
geom_column]).unwrap();
+ let validation_columns = vec![(1, "geom".to_string())];
+ let err = validate_wkb_batch(&batch, &validation_columns).unwrap_err();
+ assert!(err.to_string().contains("WKB validation failed"));
+ }
+
fn file_schema_with_covering() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("not_geo", DataType::Binary, true),
diff --git a/rust/sedona-geoparquet/src/format.rs
b/rust/sedona-geoparquet/src/format.rs
index 352d23a3..b8d50cd6 100644
--- a/rust/sedona-geoparquet/src/format.rs
+++ b/rust/sedona-geoparquet/src/format.rs
@@ -53,7 +53,7 @@ use sedona_schema::extension_type::ExtensionType;
use crate::{
file_opener::{storage_schema_contains_geo, GeoParquetFileOpener,
GeoParquetFileOpenerMetrics},
- metadata::{GeoParquetColumnEncoding, GeoParquetColumnMetadata,
GeoParquetMetadata},
+ metadata::{GeoParquetColumnEncoding, GeoParquetMetadata},
options::TableGeoParquetOptions,
writer::create_geoparquet_writer_physical_plan,
};
@@ -341,10 +341,11 @@ impl FileFormat for GeoParquetFormat {
}
fn file_source(&self) -> Arc<dyn FileSource> {
- Arc::new(
+ let mut source =
GeoParquetFileSource::try_from_file_source(self.inner().file_source(), None,
None)
- .unwrap(),
- )
+ .unwrap();
+ source.options = self.options.clone();
+ Arc::new(source)
}
}
@@ -361,7 +362,7 @@ pub struct GeoParquetFileSource {
inner: ParquetSource,
metadata_size_hint: Option<usize>,
predicate: Option<Arc<dyn PhysicalExpr>>,
- overrides: Option<HashMap<String, GeoParquetColumnMetadata>>,
+ options: TableGeoParquetOptions,
}
impl GeoParquetFileSource {
@@ -371,7 +372,7 @@ impl GeoParquetFileSource {
inner: ParquetSource::new(options.inner.clone()),
metadata_size_hint: None,
predicate: None,
- overrides: options.geometry_columns.clone(),
+ options,
}
}
@@ -419,7 +420,9 @@ impl GeoParquetFileSource {
inner: parquet_source.clone(),
metadata_size_hint,
predicate: new_predicate,
- overrides: None,
+ options: TableGeoParquetOptions::from(
+ parquet_source.table_parquet_options().clone(),
+ ),
})
} else {
sedona_internal_err!("GeoParquetFileSource constructed from
non-ParquetSource")
@@ -432,7 +435,7 @@ impl GeoParquetFileSource {
inner: self.inner.with_predicate(predicate.clone()),
metadata_size_hint: self.metadata_size_hint,
predicate: Some(predicate),
- overrides: self.overrides.clone(),
+ options: self.options.clone(),
}
}
@@ -457,7 +460,7 @@ impl GeoParquetFileSource {
inner: parquet_source,
metadata_size_hint: self.metadata_size_hint,
predicate: self.predicate.clone(),
- overrides: self.overrides.clone(),
+ options: self.options.clone(),
}
}
@@ -467,7 +470,7 @@ impl GeoParquetFileSource {
inner: self.inner.clone().with_metadata_size_hint(hint),
metadata_size_hint: Some(hint),
predicate: self.predicate.clone(),
- overrides: self.overrides.clone(),
+ options: self.options.clone(),
}
}
}
@@ -483,8 +486,7 @@ impl FileSource for GeoParquetFileSource {
self.inner
.create_file_opener(object_store.clone(), base_config,
partition);
- // If there are no geo columns or no pruning predicate, just return
the inner opener
- if self.predicate.is_none() ||
!storage_schema_contains_geo(base_config.file_schema()) {
+ if !storage_schema_contains_geo(base_config.file_schema()) {
return inner_opener;
}
@@ -492,13 +494,13 @@ impl FileSource for GeoParquetFileSource {
inner: inner_opener,
object_store,
metadata_size_hint: self.metadata_size_hint,
- predicate: self.predicate.clone().unwrap(),
+ predicate: self.predicate.clone(),
file_schema: base_config.file_schema().clone(),
enable_pruning: self.inner.table_parquet_options().global.pruning,
// HACK: Since there is no public API to set inner's metrics, so
we use
// inner's metrics as the ExecutionPlan-global metrics
metrics: GeoParquetFileOpenerMetrics::new(self.inner.metrics()),
- overrides: self.overrides.clone(),
+ options: self.options.clone(),
})
}
@@ -516,8 +518,7 @@ impl FileSource for GeoParquetFileSource {
// TODO should this be None?
None,
)?;
- // TODO: part of try_from_file_source()?
- updated_inner.overrides = self.overrides.clone();
+ updated_inner.options = self.options.clone();
Ok(inner_result.with_updated_node(Arc::new(updated_inner)))
}
None => Ok(inner_result),
@@ -529,35 +530,43 @@ impl FileSource for GeoParquetFileSource {
}
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
- Arc::new(Self::from_file_source(
+ let mut source = Self::from_file_source(
self.inner.with_batch_size(batch_size),
self.metadata_size_hint,
self.predicate.clone(),
- ))
+ );
+ source.options = self.options.clone();
+ Arc::new(source)
}
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
- Arc::new(Self::from_file_source(
+ let mut source = Self::from_file_source(
self.inner.with_schema(schema),
self.metadata_size_hint,
self.predicate.clone(),
- ))
+ );
+ source.options = self.options.clone();
+ Arc::new(source)
}
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
- Arc::new(Self::from_file_source(
+ let mut source = Self::from_file_source(
self.inner.with_projection(config),
self.metadata_size_hint,
self.predicate.clone(),
- ))
+ );
+ source.options = self.options.clone();
+ Arc::new(source)
}
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
- Arc::new(Self::from_file_source(
+ let mut source = Self::from_file_source(
self.inner.with_statistics(statistics),
self.metadata_size_hint,
self.predicate.clone(),
- ))
+ );
+ source.options = self.options.clone();
+ Arc::new(source)
}
fn metrics(&self) -> &ExecutionPlanMetricsSet {
diff --git a/rust/sedona-geoparquet/src/options.rs
b/rust/sedona-geoparquet/src/options.rs
index eaa53dd5..2cc1fd12 100644
--- a/rust/sedona-geoparquet/src/options.rs
+++ b/rust/sedona-geoparquet/src/options.rs
@@ -34,6 +34,8 @@ pub struct TableGeoParquetOptions {
pub overwrite_bbox_columns: bool,
/// Optional geometry column metadata overrides for schema inference.
pub geometry_columns: Option<HashMap<String, GeoParquetColumnMetadata>>,
+ /// Validate geometry column contents against metadata when reading.
+ pub validate: bool,
}
impl TableGeoParquetOptions {
diff --git a/rust/sedona-geoparquet/src/provider.rs
b/rust/sedona-geoparquet/src/provider.rs
index be2f5925..b9d2ba52 100644
--- a/rust/sedona-geoparquet/src/provider.rs
+++ b/rust/sedona-geoparquet/src/provider.rs
@@ -84,6 +84,7 @@ pub struct GeoParquetReadOptions<'a> {
inner: ParquetReadOptions<'a>,
table_options: Option<HashMap<String, String>>,
geometry_columns: Option<HashMap<String, GeoParquetColumnMetadata>>,
+ validate: bool,
}
impl GeoParquetReadOptions<'_> {
@@ -189,6 +190,7 @@ impl GeoParquetReadOptions<'_> {
inner: ParquetReadOptions::default(),
table_options: Some(options),
geometry_columns: None,
+ validate: false,
})
}
@@ -214,6 +216,17 @@ impl GeoParquetReadOptions<'_> {
pub fn geometry_columns(&self) -> Option<&HashMap<String,
GeoParquetColumnMetadata>> {
self.geometry_columns.as_ref()
}
+
+ /// Enable/disable geometry content validation.
+ pub fn with_validate(mut self, validate: bool) -> Self {
+ self.validate = validate;
+ self
+ }
+
+ /// Get whether geometry content validation is enabled.
+ pub fn validate(&self) -> bool {
+ self.validate
+ }
}
fn parse_geometry_columns_json(
@@ -252,6 +265,7 @@ impl ReadOptions<'_> for GeoParquetReadOptions<'_> {
if let Some(geometry_columns) = &self.geometry_columns {
geoparquet_options.geometry_columns =
Some(geometry_columns.clone());
}
+ geoparquet_options.validate = self.validate;
options.format =
Arc::new(GeoParquetFormat::new(geoparquet_options));
return options;
}
diff --git a/rust/sedona-spatial-join/src/stream.rs
b/rust/sedona-spatial-join/src/stream.rs
index 177e7b0f..9233e366 100644
--- a/rust/sedona-spatial-join/src/stream.rs
+++ b/rust/sedona-spatial-join/src/stream.rs
@@ -1921,12 +1921,9 @@ mod tests {
pos: 0,
};
let mut produced_probe_indices: Vec<u32> = Vec::new();
- loop {
- let Some((_, probe_indices)) =
- progress.indices_for_next_batch(JoinSide::Left, join_type,
max_batch_size)
- else {
- break;
- };
+ while let Some((_, probe_indices)) =
+ progress.indices_for_next_batch(JoinSide::Left, join_type,
max_batch_size)
+ {
let probe_indices = probe_indices.to_vec();
let adjust_range = progress.next_probe_range(&probe_indices);
let build_indices = UInt64Array::from(vec![0;
probe_indices.len()]);