paleolimbot commented on code in PR #560:
URL: https://github.com/apache/sedona-db/pull/560#discussion_r2755009906
##########
python/sedonadb/tests/test_context.py:
##########
@@ -14,10 +14,37 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import json
+from pathlib import Path
+from typing import Any, Mapping
+
import geoarrow.pyarrow as ga # noqa: F401
import pyarrow as pa
+import pyarrow.parquet as pq
import pytest
import sedonadb
+import shapely
+
+
+def _parse_geo_metadata(geoparquet_path: Path) -> Mapping[str, Any]:
+ """Return the GeoParquet "geo" metadata map, asserting it exists."""
+ metadata = pq.read_metadata(geoparquet_path).metadata
+ assert metadata is not None
+
+ geo = metadata.get(b"geo")
+ assert geo is not None
+
+ return json.loads(geo.decode("utf-8"))
Review Comment:
Optional nit:
```suggestion
return json.loads(geo.decode())
```
##########
python/sedonadb/python/sedonadb/context.py:
##########
@@ -151,7 +198,9 @@ def read_parquet(
return DataFrame(
self._impl,
- self._impl.read_parquet([str(path) for path in table_paths],
options),
+ self._impl.read_parquet(
+ [str(path) for path in table_paths], options, geometry_columns
+ ),
Review Comment:
Optional, but accepting a dictionary here would probably be an easy/nice
quality of life improvement for the Python user.
```suggestion
if geometry_columns is not None and not
isinstance(geometry_columns, str):
geometry_columns = json.dumps(geometry_columns)
self._impl.read_parquet(
[str(path) for path in table_paths], options,
geometry_columns
),
```
##########
rust/sedona-geoparquet/src/provider.rs:
##########
@@ -213,23 +244,32 @@ impl ReadOptions<'_> for GeoParquetReadOptions<'_> {
let mut options = self.inner.to_listing_options(config, table_options);
if let Some(parquet_format) =
options.format.as_any().downcast_ref::<ParquetFormat>() {
- let geoparquet_options = parquet_format.options().clone().into();
+ let mut geoparquet_options =
+ TableGeoParquetOptions::from(parquet_format.options().clone());
+ if let Some(geometry_columns) = &self.geometry_columns {
+ geoparquet_options.geometry_columns =
Some(geometry_columns.clone());
+ }
options.format =
Arc::new(GeoParquetFormat::new(geoparquet_options));
return options;
}
unreachable!("GeoParquetReadOptions with non-ParquetFormat
ListingOptions");
}
+ /// Infer schema from GeoParquet metadata, then apply the user option
+ /// `geometry_columns` from `read_parquet()` to override if provided. See
the
+ /// Python DataFrame `read_parquet(..)` documentation for details.
Review Comment:
```suggestion
```
I'm not sure this comment is the best place to document the rules for
calculating a GeoParquet schema (which are about to change again with Parquet
geometry/geography).
##########
rust/sedona-geoparquet/src/format.rs:
##########
@@ -146,6 +150,114 @@ impl GeoParquetFormat {
}
}
+/// See `merge_geometry_columns(..)` for the override rule
+fn apply_option_override<T: PartialEq + Clone>(
+ column_name: &str,
+ field: &str,
+ existing: &mut Option<T>,
+ override_value: &Option<T>,
+) -> Result<()> {
+ let Some(override_value) = override_value.as_ref() else {
+ return Ok(());
+ };
+
+ match existing.as_ref() {
+ Some(existing_value) => {
+ if existing_value != override_value {
+ return plan_err!(
+ "Geometry column '{column_name}' override conflicts with
existing '{field}' value"
+ );
+ }
+ }
+ None => {
+ *existing = Some(override_value.clone());
+ }
+ }
+
+ Ok(())
+}
+
+/// See `merge_geometry_columns(..)` for the override rule
+fn apply_geometry_columns_override(
+ column_name: &str,
+ existing: &mut GeoParquetColumnMetadata,
+ override_meta: &GeoParquetColumnMetadata,
+) -> Result<()> {
+ apply_option_override(
+ column_name,
+ "encoding",
+ &mut existing.encoding,
+ &override_meta.encoding,
+ )?;
+ apply_option_override(column_name, "crs", &mut existing.crs,
&override_meta.crs)?;
+ apply_option_override(
+ column_name,
+ "edges",
+ &mut existing.edges,
+ &override_meta.edges,
+ )?;
+ apply_option_override(
+ column_name,
+ "orientation",
+ &mut existing.orientation,
+ &override_meta.orientation,
+ )?;
+ apply_option_override(column_name, "bbox", &mut existing.bbox,
&override_meta.bbox)?;
+ apply_option_override(
+ column_name,
+ "epoch",
+ &mut existing.epoch,
+ &override_meta.epoch,
+ )?;
+ apply_option_override(
+ column_name,
+ "covering",
+ &mut existing.covering,
+ &override_meta.covering,
+ )?;
+
+ if !override_meta.geometry_types.is_empty() {
+ if existing.geometry_types.is_empty() {
+ existing.geometry_types = override_meta.geometry_types.clone();
+ } else if existing.geometry_types != override_meta.geometry_types {
+ return plan_err!(
+ "Geometry column '{column_name}' override conflicts with
existing 'geometry_types' value"
+ );
+ }
+ }
+
+ Ok(())
+}
+
+/// Merge geometry columns metadata.
+/// `overrides` columns may only provide additional data; for example,
+/// if `crs` is None (missing) in the `base` metadata, the combined
+/// metadata uses the `crs` field from `overrides`.
+///
+/// If conflicting field data is provided, returns a plan error.
+fn merge_geometry_columns(
+ base: &mut HashMap<String, GeoParquetColumnMetadata>,
+ overrides: &HashMap<String, GeoParquetColumnMetadata>,
+) -> Result<()> {
Review Comment:
I think we can make this quite a bit simpler by just overriding columns
instead of attempting to merge them, which serves your specific use case and I
think is closer to what people actually want to do (ensure that a column is
interpreted in a particular way regardless of what is specified by the file).
##########
rust/sedona-geoparquet/src/format.rs:
##########
@@ -222,38 +339,66 @@ impl FileFormat for GeoParquetFormat {
}
}
- if let Some(geo_metadata) = geoparquet_metadata {
- let new_fields: Result<Vec<_>> = inner_schema_without_metadata
- .fields()
- .iter()
- .map(|field| {
- if let Some(geo_column) =
geo_metadata.columns.get(field.name()) {
- match geo_column.encoding {
- GeoParquetColumnEncoding::WKB => {
- let extension = ExtensionType::new(
- "geoarrow.wkb",
- field.data_type().clone(),
- Some(geo_column.to_geoarrow_metadata()?),
- );
- Ok(Arc::new(
- extension.to_field(field.name(),
field.is_nullable()),
- ))
- }
- _ => plan_err!(
- "Unsupported GeoParquet encoding: {}",
- geo_column.encoding
- ),
+ // Geometry columns have been inferred from metadata, next combine
column
+ // metadata from options with the inferred ones
+ let mut inferred_geo_cols = match geoparquet_metadata {
+ Some(geo_metadata) => geo_metadata.columns,
+ None => HashMap::new(),
+ };
+
+ if let Some(geometry_columns) = &self.options.geometry_columns {
+ merge_geometry_columns(&mut inferred_geo_cols, geometry_columns)?;
+ }
+
+ if inferred_geo_cols.is_empty() {
+ return Ok(inner_schema_without_metadata);
+ }
+
+ let mut remaining: HashSet<String> =
inferred_geo_cols.keys().cloned().collect();
+ let new_fields: Result<Vec<_>> = inner_schema_without_metadata
+ .fields()
+ .iter()
+ .map(|field| {
+ if let Some(geo_column) = inferred_geo_cols.get(field.name()) {
+ remaining.remove(field.name());
+ let encoding = match geo_column.encoding {
+ Some(encoding) => encoding,
+ None => {
+ return plan_err!(
+ "GeoParquet column '{}' missing required field
'encoding'",
+ field.name()
+ )
}
- } else {
- Ok(field.clone())
+ };
+ match encoding {
+ GeoParquetColumnEncoding::WKB => {
+ let extension = ExtensionType::new(
+ "geoarrow.wkb",
+ field.data_type().clone(),
+ Some(geo_column.to_geoarrow_metadata()?),
+ );
+ Ok(Arc::new(
+ extension.to_field(field.name(),
field.is_nullable()),
+ ))
+ }
+ _ => plan_err!("Unsupported GeoParquet encoding: {}",
encoding),
}
- })
- .collect();
+ } else {
+ Ok(field.clone())
+ }
+ })
+ .collect();
- Ok(Arc::new(Schema::new(new_fields?)))
- } else {
- Ok(inner_schema_without_metadata)
+ if !remaining.is_empty() {
+ let mut missing: Vec<_> = remaining.into_iter().collect();
+ missing.sort();
+ return plan_err!(
+ "Geometry columns not found in schema: {}",
+ missing.join(", ")
+ );
Review Comment:
It's great to fix other issues with the Parquet Schema/GeoParquetMetadata
interactions like this one (but if we do it in this PR we should add a test). I
can also do these as part of the Parquet geometry/geometry PR since that
further complicates the interaction.
##########
rust/sedona-geoparquet/src/lib.rs:
##########
@@ -20,3 +20,5 @@ mod metadata;
pub mod options;
pub mod provider;
mod writer;
+
+pub use metadata::{GeoParquetColumnEncoding, GeoParquetColumnMetadata};
Review Comment:
I am not sure we want to expose these in this way...these are more like
internal utilities at the moment (they could be exposed later if there is a
good reason to do so and we're confident they won't change).
##########
python/sedonadb/python/sedonadb/context.py:
##########
@@ -134,14 +135,60 @@ def read_parquet(
files.
options: Optional dictionary of options to pass to the Parquet
reader.
For S3 access, use {"aws.skip_signature": True, "aws.region":
"us-west-2"} for anonymous access to public buckets.
+ geometry_columns: Optional JSON string mapping column name to
+ GeoParquet column metadata (e.g.,
+ '{"geom": {"encoding": "WKB"}}'). Use this to mark binary WKB
+ columns as geometry columns or correct metadata such as the
+ column CRS.
+
+ Supported keys (others in the spec are not implemented):
+ - encoding: "WKB" (required if the column is not already
geometry)
+ - crs: (e.g., "EPSG:4326")
+ - edges: "planar" (default) or "spherical"
+ See spec for details: https://geoparquet.org/releases/v1.1.0/
+
+ Useful for:
+ - Legacy Parquet files with Binary columns containing WKB
payloads.
+ - Overriding GeoParquet metadata when fields like `crs` are
missing.
+
+ Precedence:
+ - GeoParquet metadata is used to infer geometry columns first.
+ - geometry_columns then overrides the auto-inferred schema:
+ - If a column is not geometry in metadata but appears in
+ geometry_columns, it is treated as a geometry column.
+ - If a column is geometry in metadata and also appears in
+ geometry_columns, only the provided keys override; other
+ fields remain as inferred. If a key already exists in
metadata
+ and is provided again with a different value, an error is
+ returned.
+
+ Example:
+ - For `geo.parquet(geo1: geometry, geo2: geometry, geo3:
binary)`,
+ `read_parquet("geo.parquet", geometry_columns='{"geo2":
{"encoding": "WKB"}, "geo3": {"encoding": "WKB"}}')`
+ overrides `geo2` metadata and treats `geo3` as a geometry
column.
+ - If `geo` inferred from metadata has:
+ - `geo: {"encoding": "wkb", "crs": None, "edges":
"spherical"...}`
+ and geometry_columns provides:
+ - `geo: {"crs": 4326}`
+ then the result is (only override provided keys):
+ - `geo: {"encoding": "wkb", "crs": "EPSG:4326", "edges":
"spherical"...}`
+ - If `geo` inferred from metadata has:
+ - `geo: {"encoding": "wkb", "crs": "EPSG:4326"}`
+ and geometry_columns provides:
+ - `geo: {"crs": "EPSG:3857"}`
+ an error is returned for a conflicting key. This option is
only
+ allowed to provide missing optional fields in geometry
columns.
Review Comment:
```suggestion
```
If constraining this option to only ever override the columns provided by
the file we can make this interaction considerably simpler.
##########
rust/sedona-geoparquet/src/format.rs:
##########
Review Comment:
Can we apply the column overrides here and eliminate the somewhat
complicated logic below?
Note that in https://github.com/apache/sedona-db/pull/561 this is simplified
to just use `try_from_parquet_metadata()`.
##########
rust/sedona-geoparquet/src/format.rs:
##########
@@ -167,6 +279,9 @@ impl FileFormat for GeoParquetFormat {
self.inner().compression_type()
}
+ /// Infer schema from GeoParquet metadata, then optionally override it
using
+ /// user-provided geometry column options (see Python DataFrame API
+ /// `read_parquet(..)` `geometry_columns` for details).
Review Comment:
```suggestion
```
The rules for this are about to change with Parquet geometry/geography
support and in that PR I will try to find a good place to document the rules
for schema inference based on the file schema, GeoParquet metadata, and
overrides.
##########
rust/sedona-geoparquet/src/provider.rs:
##########
@@ -185,13 +188,41 @@ impl GeoParquetReadOptions<'_> {
Ok(GeoParquetReadOptions {
inner: ParquetReadOptions::default(),
table_options: Some(options),
+ geometry_columns: None,
})
}
/// Get the table options
pub fn table_options(&self) -> Option<&HashMap<String, String>> {
self.table_options.as_ref()
}
+
+ /// Add geometry column metadata (JSON string) to apply during schema
resolution
+ /// See python `read_parquet(..)` comments for details.
+ ///
+ /// Errors if invalid json configuration string is provided.
Review Comment:
```suggestion
///
/// Reads Parquet files as if GeoParquet metadata with the
`"geometry_columns"`
/// key were present. If GeoParquet metadata is already present, the
values provided
/// here will override any definitions provided in the original metadata.
/// Errors if an invalid JSON configuration string is provided.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]