paleolimbot commented on code in PR #578:
URL: https://github.com/apache/sedona-db/pull/578#discussion_r2776196192


##########
python/sedonadb/python/sedonadb/context.py:
##########
@@ -176,9 +177,18 @@ def read_parquet(
 
 
                 Safety:
-                - Columns specified here are not validated against the 
provided options
-                  (e.g., WKB encoding checks); inconsistent data may cause 
undefined
-                  behavior.
+                - Columns specified here can optionally be validated according 
to the
+                  `validate` option (e.g., WKB encoding checks). If validation 
is not
+                  enabled, inconsistent data may cause undefined behavior.
+            validate:
+                When set to `True`, geometry column contents are validated 
against
+                their metadata. Metadata can come from the source Parquet file 
or
+                the user-provided `geometry_columns` option.
+                Only supported properties are validated; unsupported 
properties are
+                ignored. If validation fails, execution stops with an error.
+
+                Supported validation properties:
+                - WKB encoding

Review Comment:
   ```suggestion
                   Currently the only property that is validated is the WKB of 
input geometry
                   columns.
   ```



##########
rust/sedona-geoparquet/src/file_opener.rs:
##########
@@ -158,12 +170,108 @@ impl FileOpener for GeoParquetFileOpener {
             // We could also consider filtering using null_count here in the 
future (i.e.,
             // skip row groups that are all null)
             let file = file.with_extensions(Arc::new(access_plan));
+            let stream = self_clone.inner.open(file)?.await?;
+
+            // Validate geometry columns when enabled from read option.
+            let validation_columns = if self_clone.options.validate {
+                maybe_geoparquet_metadata
+                    .as_ref()
+                    .map(|metadata| 
wkb_validation_columns(&self_clone.file_schema, metadata))
+                    .unwrap_or_default()
+            } else {
+                Vec::new()
+            };
 
-            self_clone.inner.open(file)?.await
+            if !self_clone.options.validate || validation_columns.is_empty() {
+                return Ok(stream);
+            }
+
+            let validated_stream = stream.map(move |batch_result| {
+                let batch = batch_result?;
+                validate_wkb_batch(&batch, &validation_columns)?;
+                Ok(batch)
+            });
+
+            Ok(Box::pin(validated_stream))
         }))
     }
 }
 
+fn wkb_validation_columns(
+    file_schema: &SchemaRef,
+    metadata: &GeoParquetMetadata,
+) -> Vec<(usize, String)> {
+    file_schema
+        .fields()
+        .iter()
+        .enumerate()
+        .filter_map(|(column_index, field)| {
+            metadata
+                .columns
+                .get(field.name())
+                .and_then(|column_metadata| {
+                    if matches!(column_metadata.encoding, 
GeoParquetColumnEncoding::WKB) {
+                        Some((column_index, field.name().clone()))
+                    } else {
+                        None
+                    }
+                })
+        })
+        .collect()
+}
+
+fn validate_wkb_batch(batch: &RecordBatch, validation_columns: &[(usize, 
String)]) -> Result<()> {
+    for (column_index, column_name) in validation_columns {
+        let column = batch.column(*column_index);
+        validate_wkb_array(column.as_ref(), column_name)?;
+    }
+    Ok(())
+}
+
+fn validate_wkb_array(array: &dyn Array, column_name: &str) -> Result<()> {
+    match array.data_type() {
+        DataType::Binary => {
+            let array = as_binary_array(array)?;
+            for (row_index, maybe_wkb) in array.iter().enumerate() {
+                if let Some(wkb_bytes) = maybe_wkb {
+                    if let Err(e) = wkb::reader::read_wkb(wkb_bytes) {
+                        return exec_err!(
+                            "WKB validation failed for column '{}' at row {}: 
{}",
+                            column_name,
+                            row_index,
+                            e
+                        );
+                    }
+                }
+            }
+        }

Review Comment:
   While we're here, it would be good to handle the `LargeBinary` case (this 
can happen for reasons outside a GeoParquet producer's control due to various 
auto-store/load of schemas)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to