This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 11fefbd feat(rust/sedona-geoparquet): GeoParquet 1.1 write support
(#175)
11fefbd is described below
commit 11fefbddec15f59e29b5531b6a3ce1187d8b4468
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Oct 7 23:21:14 2025 -0500
feat(rust/sedona-geoparquet): GeoParquet 1.1 write support (#175)
Co-authored-by: Copilot <[email protected]>
---
Cargo.lock | 2 +
python/sedonadb/python/sedonadb/dataframe.py | 31 +-
python/sedonadb/src/dataframe.rs | 14 +-
python/sedonadb/tests/io/test_parquet.py | 73 +++-
rust/sedona-geoparquet/Cargo.toml | 2 +
rust/sedona-geoparquet/src/format.rs | 16 +-
rust/sedona-geoparquet/src/metadata.rs | 15 +
rust/sedona-geoparquet/src/options.rs | 30 +-
rust/sedona-geoparquet/src/writer.rs | 594 ++++++++++++++++++++++++++-
9 files changed, 732 insertions(+), 45 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 72f67a2..93db6d4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4954,6 +4954,7 @@ dependencies = [
"datafusion-expr",
"datafusion-physical-expr",
"datafusion-physical-plan",
+ "float_next_after",
"futures",
"geo-traits 0.2.0",
"object_store",
@@ -4961,6 +4962,7 @@ dependencies = [
"rstest",
"sedona-common",
"sedona-expr",
+ "sedona-functions",
"sedona-geometry",
"sedona-schema",
"sedona-testing",
diff --git a/python/sedonadb/python/sedonadb/dataframe.py
b/python/sedonadb/python/sedonadb/dataframe.py
index 673fbfb..9759dd1 100644
--- a/python/sedonadb/python/sedonadb/dataframe.py
+++ b/python/sedonadb/python/sedonadb/dataframe.py
@@ -16,7 +16,7 @@
# under the License.
from pathlib import Path
-from typing import TYPE_CHECKING, Union, Optional, Any, Iterable
+from typing import TYPE_CHECKING, Union, Optional, Any, Iterable, Literal
from sedonadb.utility import sedona # noqa: F401
@@ -295,13 +295,15 @@ class DataFrame:
partition_by: Optional[Union[str, Iterable[str]]] = None,
sort_by: Optional[Union[str, Iterable[str]]] = None,
single_file_output: Optional[bool] = None,
+ geoparquet_version: Literal["1.0", "1.1"] = "1.0",
+ overwrite_bbox_columns: bool = False,
):
"""Write this DataFrame to one or more (Geo)Parquet files
For input that contains geometry columns, GeoParquet metadata is
written
such that suitable readers can recreate Geometry/Geography types when
- reading the output.
-
+ reading the output and potentially read fewer row groups when only a
+ subset of the file is needed for a given query.
Args:
path: A filename or directory to which parquet file(s) should be
written.
@@ -313,6 +315,21 @@ class DataFrame:
file vs. writing one file per partition to a directory. By
default,
a single file is written if `partition_by` is unspecified and
`path` ends with `.parquet`.
+ geoparquet_version: GeoParquet metadata version to write if output
contains
+ one or more geometry columns. The default (1.0) is the most
widely
+ supported and will result in geometry columns being recognized
in many
+ readers; however, only includes statistics at the file level.
+
+ Use GeoParquet 1.1 to compute an additional bounding box column
+ for every geometry column in the output: some readers can use
these columns
+ to prune row groups when files contain an effective spatial
ordering.
+ The extra columns will appear just before their geometry
column and
+ will be named "[geom_col_name]_bbox" for all geometry columns
except
+ "geometry", whose bounding box column name is just "bbox".
+ overwrite_bbox_columns: Use `True` to overwrite any bounding box
columns
+ that already exist in the input. This is useful in a read ->
modify
+ -> write scenario to ensure these columns are up-to-date. If
`False`
+ (the default), an error will be raised if a bbox column
already exists.
Examples:
@@ -344,7 +361,13 @@ class DataFrame:
sort_by = []
self._impl.to_parquet(
- self._ctx, str(path), partition_by, sort_by, single_file_output
+ self._ctx,
+ str(path),
+ partition_by,
+ sort_by,
+ single_file_output,
+ geoparquet_version,
+ overwrite_bbox_columns,
)
def show(
diff --git a/python/sedonadb/src/dataframe.rs b/python/sedonadb/src/dataframe.rs
index 940c332..aa1dc60 100644
--- a/python/sedonadb/src/dataframe.rs
+++ b/python/sedonadb/src/dataframe.rs
@@ -32,7 +32,7 @@ use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
use sedona::show::{DisplayMode, DisplayTableOptions};
-use sedona_geoparquet::options::TableGeoParquetOptions;
+use sedona_geoparquet::options::{GeoParquetVersion, TableGeoParquetOptions};
use sedona_schema::schema::SedonaSchema;
use tokio::runtime::Runtime;
@@ -139,6 +139,7 @@ impl InternalDataFrame {
))
}
+ #[allow(clippy::too_many_arguments)]
fn to_parquet<'py>(
&self,
py: Python<'py>,
@@ -147,6 +148,8 @@ impl InternalDataFrame {
partition_by: Vec<String>,
sort_by: Vec<String>,
single_file_output: bool,
+ geoparquet_version: Option<String>,
+ overwrite_bbox_columns: bool,
) -> Result<(), PySedonaError> {
// sort_by needs to be SortExpr. A Vec<String> can unambiguously be
interpreted as
// field names (ascending), but other types of expressions aren't
supported here yet.
@@ -162,7 +165,14 @@ impl InternalDataFrame {
.with_partition_by(partition_by)
.with_sort_by(sort_by_expr)
.with_single_file_output(single_file_output);
- let writer_options = TableGeoParquetOptions::default();
+
+ let mut writer_options = TableGeoParquetOptions::new();
+ writer_options.overwrite_bbox_columns = overwrite_bbox_columns;
+ if let Some(geoparquet_version) = geoparquet_version {
+ writer_options.geoparquet_version = geoparquet_version.parse()?;
+ } else {
+ writer_options.geoparquet_version = GeoParquetVersion::Omitted;
+ }
wait_for_future(
py,
diff --git a/python/sedonadb/tests/io/test_parquet.py
b/python/sedonadb/tests/io/test_parquet.py
index a90e254..73afbfa 100644
--- a/python/sedonadb/tests/io/test_parquet.py
+++ b/python/sedonadb/tests/io/test_parquet.py
@@ -15,14 +15,17 @@
# specific language governing permissions and limitations
# under the License.
-import pytest
+import json
import tempfile
-import shapely
+from pathlib import Path
+
import geopandas
import geopandas.testing
+import pytest
+import shapely
from pyarrow import parquet
-from pathlib import Path
-from sedonadb.testing import geom_or_null, SedonaDB, DuckDB, skip_if_not_exists
+from sedonadb._lib import SedonaError
+from sedonadb.testing import DuckDB, SedonaDB, geom_or_null, skip_if_not_exists
@pytest.mark.parametrize("name", ["water-junc", "water-point"])
@@ -257,6 +260,68 @@ def test_write_geoparquet_geometry(con, geoarrow_data,
name):
geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf)
+def test_write_geoparquet_1_1(con, geoarrow_data):
+ # Checks GeoParquet 1.1 support specifically
+ path = geoarrow_data / "ns-water" / "files" /
"ns-water_water-junc_geo.parquet"
+ skip_if_not_exists(path)
+
+ gdf =
geopandas.read_parquet(path).sort_values(by="OBJECTID").reset_index(drop=True)
+
+ with tempfile.TemporaryDirectory() as td:
+ tmp_parquet = Path(td) / "tmp.parquet"
+ con.create_data_frame(gdf).to_parquet(
+ tmp_parquet, sort_by="OBJECTID", geoparquet_version="1.1"
+ )
+
+ file_kv_metadata = parquet.ParquetFile(tmp_parquet).metadata.metadata
+ assert b"geo" in file_kv_metadata
+ geo_metadata = json.loads(file_kv_metadata[b"geo"])
+ assert geo_metadata["version"] == "1.1.0"
+ geo_column = geo_metadata["columns"]["geometry"]
+ assert geo_column["covering"] == {
+ "bbox": {
+ "xmin": ["bbox", "xmin"],
+ "ymin": ["bbox", "ymin"],
+ "xmax": ["bbox", "xmax"],
+ "ymax": ["bbox", "ymax"],
+ }
+ }
+
+ # This should still roundtrip through GeoPandas because GeoPandas
removes
+ # the bbox column on read
+ gdf_roundtrip = geopandas.read_parquet(tmp_parquet)
+ assert all(gdf.columns == gdf_roundtrip.columns)
+ geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf)
+
+ # ...but the bbox column should still be there
+ df_roundtrip = con.read_parquet(tmp_parquet).to_pandas()
+ assert "bbox" in df_roundtrip.columns
+
+ # An attempt to rewrite this should fail because it would have to
overwrite
+ # the bbox column
+ tmp_parquet2 = Path(td) / "tmp2.parquet"
+ with pytest.raises(
+ SedonaError, match="Can't overwrite GeoParquet 1.1 bbox column
'bbox'"
+ ):
+ con.read_parquet(tmp_parquet).to_parquet(
+ tmp_parquet2, geoparquet_version="1.1"
+ )
+
+ # ...unless we pass the appropriate option
+ con.read_parquet(tmp_parquet).to_parquet(
+ tmp_parquet2, geoparquet_version="1.1", overwrite_bbox_columns=True
+ )
+ df_roundtrip = con.read_parquet(tmp_parquet2).to_pandas()
+ assert "bbox" in df_roundtrip.columns
+
+
+def test_write_geoparquet_unknown(con):
+ with pytest.raises(SedonaError, match="Unexpected GeoParquet version
string"):
+ con.sql("SELECT 1 as one").to_parquet(
+ "unused", geoparquet_version="not supported"
+ )
+
+
def test_write_geoparquet_geography(con, geoarrow_data):
# Checks a read and write of geography (rounctrip, since nobody else can
read/write)
path = (
diff --git a/rust/sedona-geoparquet/Cargo.toml
b/rust/sedona-geoparquet/Cargo.toml
index d11acd0..0a5110f 100644
--- a/rust/sedona-geoparquet/Cargo.toml
+++ b/rust/sedona-geoparquet/Cargo.toml
@@ -50,12 +50,14 @@ datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
+float_next_after = { workspace = true }
geo-traits = { workspace = true }
futures = { workspace = true }
object_store = { workspace = true }
parquet = { workspace = true }
sedona-common = { path = "../sedona-common" }
sedona-expr = { path = "../sedona-expr" }
+sedona-functions = { path = "../sedona-functions" }
sedona-geometry = { path = "../sedona-geometry" }
sedona-schema = { path = "../sedona-schema" }
serde = { workspace = true }
diff --git a/rust/sedona-geoparquet/src/format.rs
b/rust/sedona-geoparquet/src/format.rs
index bed3dd8..8ff8717 100644
--- a/rust/sedona-geoparquet/src/format.rs
+++ b/rust/sedona-geoparquet/src/format.rs
@@ -48,7 +48,7 @@ use sedona_schema::extension_type::ExtensionType;
use crate::{
file_opener::{storage_schema_contains_geo, GeoParquetFileOpener},
metadata::{GeoParquetColumnEncoding, GeoParquetMetadata},
- options::{GeoParquetVersion, TableGeoParquetOptions},
+ options::TableGeoParquetOptions,
writer::create_geoparquet_writer_physical_plan,
};
use datafusion::datasource::physical_plan::ParquetSource;
@@ -91,17 +91,9 @@ impl FileFormatFactory for GeoParquetFormatFactory {
) -> Result<Arc<dyn FileFormat>> {
let mut options_mut = self.options.clone().unwrap_or_default();
let mut format_options_mut = format_options.clone();
- options_mut.geoparquet_version =
- if let Some(version_string) =
format_options_mut.remove("geoparquet_version") {
- match version_string.as_str() {
- "1.0" => GeoParquetVersion::V1_0,
- "1.1" => GeoParquetVersion::V1_1,
- "2.0" => GeoParquetVersion::V2_0,
- _ => GeoParquetVersion::default(),
- }
- } else {
- GeoParquetVersion::default()
- };
+ if let Some(version_string) =
format_options_mut.remove("geoparquet_version") {
+ options_mut.geoparquet_version = version_string.parse()?;
+ }
let inner_format = self.inner.create(state, &format_options_mut)?;
if let Some(parquet_format) =
inner_format.as_any().downcast_ref::<ParquetFormat>() {
diff --git a/rust/sedona-geoparquet/src/metadata.rs
b/rust/sedona-geoparquet/src/metadata.rs
index b4be970..98ca0ff 100644
--- a/rust/sedona-geoparquet/src/metadata.rs
+++ b/rust/sedona-geoparquet/src/metadata.rs
@@ -268,6 +268,21 @@ pub struct GeoParquetCovering {
pub bbox: GeoParquetBboxCovering,
}
+impl GeoParquetCovering {
+ pub fn bbox_struct_xy(struct_column_name: &str) -> Self {
+ GeoParquetCovering {
+ bbox: GeoParquetBboxCovering {
+ xmin: vec![struct_column_name.to_string(), "xmin".to_string()],
+ ymin: vec![struct_column_name.to_string(), "ymin".to_string()],
+ zmin: None,
+ xmax: vec![struct_column_name.to_string(), "xmax".to_string()],
+ ymax: vec![struct_column_name.to_string(), "ymax".to_string()],
+ zmax: None,
+ },
+ }
+ }
+}
+
/// Top-level GeoParquet file metadata
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GeoParquetMetadata {
diff --git a/rust/sedona-geoparquet/src/options.rs
b/rust/sedona-geoparquet/src/options.rs
index 20fe4bd..43eafad 100644
--- a/rust/sedona-geoparquet/src/options.rs
+++ b/rust/sedona-geoparquet/src/options.rs
@@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+use std::str::FromStr;
+
use datafusion::config::TableParquetOptions;
+use datafusion_common::{plan_err, DataFusionError};
/// [TableParquetOptions] wrapper with GeoParquet-specific options
#[derive(Debug, Default, Clone)]
@@ -24,13 +27,22 @@ pub struct TableGeoParquetOptions {
pub inner: TableParquetOptions,
/// [GeoParquetVersion] to use when writing GeoParquet files
pub geoparquet_version: GeoParquetVersion,
+ /// When writing [GeoParquetVersion::V1_1], use `true` to overwrite
existing
+ /// bounding box columns.
+ pub overwrite_bbox_columns: bool,
+}
+
+impl TableGeoParquetOptions {
+ pub fn new() -> Self {
+ Self::default()
+ }
}
impl From<TableParquetOptions> for TableGeoParquetOptions {
fn from(value: TableParquetOptions) -> Self {
Self {
inner: value,
- geoparquet_version: GeoParquetVersion::default(),
+ ..Default::default()
}
}
}
@@ -73,3 +85,19 @@ impl Default for GeoParquetVersion {
Self::V1_0
}
}
+
+impl FromStr for GeoParquetVersion {
+ type Err = DataFusionError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "1.0" => Ok(GeoParquetVersion::V1_0),
+ "1.1" => Ok(GeoParquetVersion::V1_1),
+ "2.0" => Ok(GeoParquetVersion::V2_0),
+ "none" => Ok(GeoParquetVersion::Omitted),
+ _ => plan_err!(
+ "Unexpected GeoParquet version string (expected '1.0', '1.1',
'2.0', or 'none')"
+ ),
+ }
+ }
+}
diff --git a/rust/sedona-geoparquet/src/writer.rs
b/rust/sedona-geoparquet/src/writer.rs
index 0071233..ef741e1 100644
--- a/rust/sedona-geoparquet/src/writer.rs
+++ b/rust/sedona-geoparquet/src/writer.rs
@@ -15,33 +15,49 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
+use std::{collections::HashMap, sync::Arc};
+use arrow_array::{
+ builder::{Float32Builder, NullBufferBuilder},
+ ArrayRef, StructArray,
+};
+use arrow_schema::{DataType, Field, Fields};
use datafusion::{
config::TableParquetOptions,
datasource::{
file_format::parquet::ParquetSink, physical_plan::FileSinkConfig,
sink::DataSinkExec,
},
};
-use datafusion_common::{exec_datafusion_err, exec_err, not_impl_err, Result};
-use datafusion_expr::dml::InsertOp;
-use datafusion_physical_expr::LexRequirement;
-use datafusion_physical_plan::ExecutionPlan;
+use datafusion_common::{exec_datafusion_err, exec_err, not_impl_err,
DataFusionError, Result};
+use datafusion_expr::{dml::InsertOp, ColumnarValue, ScalarUDF, Volatility};
+use datafusion_physical_expr::{
+ expressions::Column, LexRequirement, PhysicalExpr, ScalarFunctionExpr,
+};
+use datafusion_physical_plan::{projection::ProjectionExec, ExecutionPlan};
+use float_next_after::NextAfter;
+use geo_traits::GeometryTrait;
use sedona_common::sedona_internal_err;
+use sedona_expr::scalar_udf::{SedonaScalarKernel, SedonaScalarUDF};
+use sedona_functions::executor::WkbExecutor;
+use sedona_geometry::{
+ bounds::geo_traits_update_xy_bounds,
+ interval::{Interval, IntervalTrait},
+};
use sedona_schema::{
crs::lnglat,
datatypes::{Edges, SedonaType},
+ matchers::ArgMatcher,
schema::SedonaSchema,
};
use crate::{
- metadata::{GeoParquetColumnMetadata, GeoParquetMetadata},
+ metadata::{GeoParquetColumnMetadata, GeoParquetCovering,
GeoParquetMetadata},
options::{GeoParquetVersion, TableGeoParquetOptions},
};
pub fn create_geoparquet_writer_physical_plan(
- input: Arc<dyn ExecutionPlan>,
- conf: FileSinkConfig,
+ mut input: Arc<dyn ExecutionPlan>,
+ mut conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
options: &TableGeoParquetOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
@@ -50,26 +66,33 @@ pub fn create_geoparquet_writer_physical_plan(
}
// If there is no geometry, just use the inner implementation
- let output_geometry_column_indices =
conf.output_schema().geometry_column_indices()?;
+ let mut output_geometry_column_indices =
conf.output_schema().geometry_column_indices()?;
if output_geometry_column_indices.is_empty() {
return create_inner_writer(input, conf, order_requirements,
options.inner.clone());
}
// We have geometry and/or geography! Collect the GeoParquetMetadata we'll
need to write
let mut metadata = GeoParquetMetadata::default();
+ let mut bbox_columns = HashMap::new();
// Check the version
match options.geoparquet_version {
GeoParquetVersion::V1_0 => {
metadata.version = "1.0.0".to_string();
}
+ GeoParquetVersion::V1_1 => {
+ metadata.version = "1.1.0".to_string();
+ (input, bbox_columns) = project_bboxes(input,
options.overwrite_bbox_columns)?;
+ conf.output_schema = input.schema();
+ output_geometry_column_indices =
input.schema().geometry_column_indices()?;
+ }
_ => {
return not_impl_err!(
"GeoParquetVersion {:?} is not yet supported",
options.geoparquet_version
);
}
- };
+ }
let field_names = conf
.output_schema()
@@ -118,6 +141,13 @@ pub fn create_geoparquet_writer_physical_plan(
);
}
+ // Add bbox column info, if we added one in project_bboxes()
+ if let Some(bbox_column_name) = bbox_columns.get(f.name()) {
+ column_metadata
+ .covering
+ .replace(GeoParquetCovering::bbox_struct_xy(bbox_column_name));
+ }
+
// Add to metadata
metadata
.columns
@@ -140,6 +170,7 @@ pub fn create_geoparquet_writer_physical_plan(
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}
+/// Create a regular Parquet writer like DataFusion would otherwise do.
fn create_inner_writer(
input: Arc<dyn ExecutionPlan>,
conf: FileSinkConfig,
@@ -151,18 +182,243 @@ fn create_inner_writer(
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}
+/// Create a projection that inserts a bbox column for every geometry column
+///
+/// This implements creating the GeoParquet 1.1 bounding box columns,
+/// returning a map from the name of the geometry column to the name of the
+/// bounding box column it created. This does not currently create such
+/// a column for any geography input.
+///
+/// The inserted bounding box columns always directly precede their
+/// corresponding geometry column and are named a follows:
+///
+/// - For a column named "geometry", the bbox column is named "bbox". This
+/// reflects what pretty much everybody is already naming their columns
+/// today.
+/// - For any other column, the bbox column is named "{col_name}_bbox".
+///
+/// If a bbox column name already exists in the schema, we replace it.
+/// In the context of writing a file and all that goes with it, the time it
+/// takes to recompute the bounding box is not important; because writing
+/// GeoParquet 1.1 is opt-in, if somebody *did* have a column with "bbox" or
+/// "some_col_bbox", it is unlikely that replacing it would have unintended
+/// consequences.
+fn project_bboxes(
+ input: Arc<dyn ExecutionPlan>,
+ overwrite_bbox_columns: bool,
+) -> Result<(Arc<dyn ExecutionPlan>, HashMap<String, String>)> {
+ let input_schema = input.schema();
+ let matcher = ArgMatcher::is_geometry();
+ let bbox_udf: Arc<ScalarUDF> = Arc::new(geoparquet_bbox_udf().into());
+ let bbox_udf_name = bbox_udf.name();
+
+ // Calculate and keep track of the expression, name pairs for the bounding
box
+ // columns we are about to (potentially) create.
+ let mut bbox_exprs = HashMap::<usize, (Arc<dyn PhysicalExpr>,
String)>::new();
+ let mut bbox_column_names = HashMap::new();
+ for (i, f) in input.schema().fields().iter().enumerate() {
+ let column = Arc::new(Column::new(f.name(), i));
+
+ // If this is a geometry column (not geography), compute the
+ // expression that is a function call to our bbox column creator
+ if matcher.match_type(&SedonaType::from_storage_field(
+ column.return_field(&input_schema)?.as_ref(),
+ )?) {
+ let bbox_field_name = bbox_column_name(f.name());
+ let expr = Arc::new(ScalarFunctionExpr::new(
+ bbox_udf_name,
+ bbox_udf.clone(),
+ vec![column],
+ Arc::new(Field::new("", bbox_type(), true)),
+ ));
+
+ bbox_exprs.insert(i, (expr, bbox_field_name.clone()));
+ bbox_column_names.insert(bbox_field_name, f.name().clone());
+ }
+ }
+
+ // If we don't need to create any bbox columns, don't add an additional
+ // projection at the end of the input plan
+ if bbox_exprs.is_empty() {
+ return Ok((input, HashMap::new()));
+ }
+
+ // Create the projection expressions
+ let mut exprs = Vec::new();
+ for (i, f) in input.schema().fields().iter().enumerate() {
+ // Skip any column with the same name as a bbox column, since we are
+ // about to replace it with the recomputed bbox.
+ if bbox_column_names.contains_key(f.name()) {
+ if overwrite_bbox_columns {
+ continue;
+ } else {
+ return exec_err!(
+ "Can't overwrite GeoParquet 1.1 bbox column '{}'.
+Use overwrite_bbox_columns = True if this is what was intended.",
+ f.name()
+ );
+ }
+ }
+
+ // If this is a column with a bbox, insert the bbox expression now
+ if let Some((expr, expr_name)) = bbox_exprs.remove(&i) {
+ exprs.push((expr, expr_name));
+ }
+
+ // Insert the column (whether it does or does not have geometry)
+ let column = Arc::new(Column::new(f.name(), i));
+ exprs.push((column, f.name().clone()));
+ }
+
+ // Create the projection
+ let exec = ProjectionExec::try_new(exprs, input)?;
+
+ // Flip the bbox_column_names into the form our caller needs it
+ let bbox_column_names_by_field = bbox_column_names.drain().map(|(k, v)|
(v, k)).collect();
+
+ Ok((Arc::new(exec), bbox_column_names_by_field))
+}
+
+fn geoparquet_bbox_udf() -> SedonaScalarUDF {
+ SedonaScalarUDF::new(
+ "geoparquet_bbox",
+ vec![Arc::new(GeoParquetBbox {})],
+ Volatility::Immutable,
+ None,
+ )
+}
+
+fn bbox_column_name(geometry_column_name: &str) -> String {
+ if geometry_column_name == "geometry" {
+ "bbox".to_string()
+ } else {
+ format!("{geometry_column_name}_bbox")
+ }
+}
+
+#[derive(Debug)]
+struct GeoParquetBbox {}
+
+impl SedonaScalarKernel for GeoParquetBbox {
+ fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
+ let matcher = ArgMatcher::new(
+ vec![ArgMatcher::is_geometry()],
+ SedonaType::Arrow(bbox_type()),
+ );
+ matcher.match_args(args)
+ }
+
+ fn invoke_batch(
+ &self,
+ arg_types: &[SedonaType],
+ args: &[ColumnarValue],
+ ) -> Result<ColumnarValue> {
+ let executor = WkbExecutor::new(arg_types, args);
+
+ // Initialize the builders. We use Float32 to minimize the impact
+ // on the file size.
+ let mut nulls = NullBufferBuilder::new(executor.num_iterations());
+ let mut builders = [
+ Float32Builder::with_capacity(executor.num_iterations()),
+ Float32Builder::with_capacity(executor.num_iterations()),
+ Float32Builder::with_capacity(executor.num_iterations()),
+ Float32Builder::with_capacity(executor.num_iterations()),
+ ];
+
+ executor.execute_wkb_void(|maybe_item| {
+ match maybe_item {
+ Some(item) => {
+ nulls.append(true);
+ append_float_bbox(&item, &mut builders)?;
+ }
+ None => {
+ // If we have a null, we set the outer validity bitmap to
null
+ // (i.e., "the bounding box is null") but also the inner
bitmap
+ // to null to ensure the value is not counted for the
purposes
+ // of computing statistics for the nested column.
+ nulls.append(false);
+ for builder in &mut builders {
+ builder.append_null();
+ }
+ }
+ }
+ Ok(())
+ })?;
+
+ let out_array = StructArray::try_new(
+ bbox_fields(),
+ builders
+ .iter_mut()
+ .map(|builder| -> ArrayRef { Arc::new(builder.finish()) })
+ .collect(),
+ nulls.finish(),
+ )?;
+
+ executor.finish(Arc::new(out_array))
+ }
+}
+
+fn bbox_type() -> DataType {
+ DataType::Struct(bbox_fields())
+}
+
+fn bbox_fields() -> Fields {
+ vec![
+ Field::new("xmin", DataType::Float32, true),
+ Field::new("ymin", DataType::Float32, true),
+ Field::new("xmax", DataType::Float32, true),
+ Field::new("ymax", DataType::Float32, true),
+ ]
+ .into()
+}
+
+// Calculates a bounding box and appends the float32-rounded version to
+// a set of builders, ensuring the float bounds always include the double
+// bounds.
+fn append_float_bbox(
+ wkb: impl GeometryTrait<T = f64>,
+ builders: &mut [Float32Builder],
+) -> Result<()> {
+ let mut x = Interval::empty();
+ let mut y = Interval::empty();
+ geo_traits_update_xy_bounds(wkb, &mut x, &mut y)
+ .map_err(|e| DataFusionError::External(e.into()))?;
+
+ // If we have an empty, append null values to the individual min/max
+ // columns to ensure their values aren't considered in the Parquet
+ // statistics.
+ if x.is_empty() || y.is_empty() {
+ for builder in builders {
+ builder.append_null();
+ }
+ } else {
+ builders[0].append_value((x.lo() as f32).next_after(-f32::INFINITY));
+ builders[1].append_value((y.lo() as f32).next_after(-f32::INFINITY));
+ builders[2].append_value((x.hi() as f32).next_after(f32::INFINITY));
+ builders[3].append_value((y.hi() as f32).next_after(f32::INFINITY));
+ }
+
+ Ok(())
+}
+
#[cfg(test)]
mod test {
use std::iter::zip;
+ use arrow_array::{create_array, Array, RecordBatch};
use datafusion::datasource::file_format::format_as_file_type;
use datafusion::prelude::DataFrame;
use datafusion::{
execution::SessionStateBuilder,
- prelude::{col, SessionContext},
+ prelude::{col, lit, SessionContext},
};
- use datafusion_expr::LogicalPlanBuilder;
+ use datafusion_common::cast::{as_float32_array, as_struct_array};
+ use datafusion_common::ScalarValue;
+ use datafusion_expr::{Expr, LogicalPlanBuilder};
+ use sedona_schema::datatypes::WKB_GEOMETRY;
+ use sedona_testing::create::create_array;
use sedona_testing::data::test_geoparquet;
+ use sedona_testing::testers::ScalarUdfTester;
use tempfile::tempdir;
use crate::format::GeoParquetFormatFactory;
@@ -177,28 +433,45 @@ mod test {
SessionContext::new_with_state(state).enable_url_table()
}
- async fn test_dataframe_roundtrip(ctx: SessionContext, df: DataFrame) {
+ async fn test_dataframe_roundtrip(ctx: SessionContext, src: DataFrame) {
+ let df_batches = src.clone().collect().await.unwrap();
+ test_write_dataframe(
+ ctx,
+ src,
+ df_batches,
+ TableGeoParquetOptions::default(),
+ vec![],
+ )
+ .await
+ .unwrap()
+ }
+
+ async fn test_write_dataframe(
+ ctx: SessionContext,
+ src: DataFrame,
+ expected_batches: Vec<RecordBatch>,
+ options: TableGeoParquetOptions,
+ partition_by: Vec<String>,
+ ) -> Result<()> {
// It's a bit verbose to trigger this without helpers
- let format = GeoParquetFormatFactory::new();
+ let format = GeoParquetFormatFactory::new_with_options(options);
let file_type = format_as_file_type(Arc::new(format));
let tmpdir = tempdir().unwrap();
- let df_batches = df.clone().collect().await.unwrap();
-
let tmp_parquet = tmpdir.path().join("foofy_spatial.parquet");
let plan = LogicalPlanBuilder::copy_to(
- df.into_unoptimized_plan(),
+ src.into_unoptimized_plan(),
tmp_parquet.to_string_lossy().into(),
file_type,
Default::default(),
- vec![],
+ partition_by,
)
.unwrap()
.build()
.unwrap();
- DataFrame::new(ctx.state(), plan).collect().await.unwrap();
+ DataFrame::new(ctx.state(), plan).collect().await?;
let df_parquet_batches = ctx
.table(tmp_parquet.to_string_lossy().to_string())
@@ -208,7 +481,22 @@ mod test {
.await
.unwrap();
- assert_eq!(df_parquet_batches.len(), df_batches.len());
+ assert_eq!(df_parquet_batches.len(), expected_batches.len());
+
+ // Check column names
+ let df_parquet_names = df_parquet_batches[0]
+ .schema()
+ .fields()
+ .iter()
+ .map(|f| f.name().clone())
+ .collect::<Vec<_>>();
+ let expected_names = expected_batches[0]
+ .schema()
+ .fields()
+ .iter()
+ .map(|f| f.name().clone())
+ .collect::<Vec<_>>();
+ assert_eq!(df_parquet_names, expected_names);
// Check types, since the schema may not compare byte-for-byte equal
(CRSes)
let df_parquet_sedona_types = df_parquet_batches[0]
@@ -216,7 +504,7 @@ mod test {
.sedona_types()
.collect::<Result<Vec<_>>>()
.unwrap();
- let df_sedona_types = df_batches[0]
+ let df_sedona_types = expected_batches[0]
.schema()
.sedona_types()
.collect::<Result<Vec<_>>>()
@@ -224,9 +512,11 @@ mod test {
assert_eq!(df_parquet_sedona_types, df_sedona_types);
// Check batches without metadata
- for (df_parquet_batch, df_batch) in zip(df_parquet_batches,
df_batches) {
+ for (df_parquet_batch, df_batch) in zip(df_parquet_batches,
expected_batches) {
assert_eq!(df_parquet_batch.columns(), df_batch.columns())
}
+
+ Ok(())
}
#[tokio::test]
@@ -262,4 +552,264 @@ mod test {
test_dataframe_roundtrip(ctx, df).await;
}
+
+ #[tokio::test]
+ async fn geoparquet_1_1_basic() {
+ let example = test_geoparquet("example", "geometry").unwrap();
+ let ctx = setup_context();
+ let df = ctx
+ .table(&example)
+ .await
+ .unwrap()
+ // DataFusion internals lose the nullability we assigned to the
bbox
+ // and without this line the test fails.
+ .filter(Expr::IsNotNull(col("geometry").into()))
+ .unwrap();
+
+ let mut options = TableGeoParquetOptions::new();
+ options.geoparquet_version = GeoParquetVersion::V1_1;
+
+ let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
+
+ let df_batches_with_bbox = df
+ .clone()
+ .select(vec![
+ col("wkt"),
+ bbox_udf.call(vec![col("geometry")]).alias("bbox"),
+ col("geometry"),
+ ])
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+ .await
+ .unwrap();
+ }
+
+ #[tokio::test]
+ async fn geoparquet_1_1_multiple_columns() {
+ let example = test_geoparquet("example", "geometry").unwrap();
+ let ctx = setup_context();
+
+ // Include >1 geometry and sprinkle in some non-geometry columns
+ let df = ctx
+ .table(&example)
+ .await
+ .unwrap()
+ // DataFusion internals lose the nullability we assigned to the
bbox
+ // and without this line the test fails.
+ .filter(Expr::IsNotNull(col("geometry").into()))
+ .unwrap()
+ .select(vec![
+ col("wkt"),
+ col("geometry").alias("geom"),
+ col("wkt").alias("wkt2"),
+ col("geometry"),
+ col("wkt").alias("wkt3"),
+ ])
+ .unwrap();
+
+ let mut options = TableGeoParquetOptions::new();
+ options.geoparquet_version = GeoParquetVersion::V1_1;
+
+ let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
+
+ let df_batches_with_bbox = df
+ .clone()
+ .select(vec![
+ col("wkt"),
+ bbox_udf.call(vec![col("geom")]).alias("geom_bbox"),
+ col("geom"),
+ col("wkt2"),
+ bbox_udf.call(vec![col("geometry")]).alias("bbox"),
+ col("geometry"),
+ col("wkt3"),
+ ])
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+ .await
+ .unwrap();
+ }
+
+ #[tokio::test]
+ async fn geoparquet_1_1_overwrite_existing_bbox() {
+ let example = test_geoparquet("example", "geometry").unwrap();
+ let ctx = setup_context();
+
+ // Test writing a DataFrame that already has a column named "bbox".
+ // Writing this using GeoParquet 1.1 will overwrite the column.
+ let df = ctx
+ .table(&example)
+ .await
+ .unwrap()
+ // DataFusion internals lose the nullability we assigned to the
bbox
+ // and without this line the test fails.
+ .filter(Expr::IsNotNull(col("geometry").into()))
+ .unwrap()
+ .select(vec![
+ lit("this is definitely not a bbox").alias("bbox"),
+ col("geometry"),
+ ])
+ .unwrap();
+
+ let mut options = TableGeoParquetOptions::new();
+ options.geoparquet_version = GeoParquetVersion::V1_1;
+
+ let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
+
+ let df_batches_with_bbox = df
+ .clone()
+ .select(vec![
+ bbox_udf.call(vec![col("geometry")]).alias("bbox"),
+ col("geometry"),
+ ])
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ // Without setting overwrite_bbox_columns = true, this should error
+ let err = test_write_dataframe(
+ ctx.clone(),
+ df.clone(),
+ df_batches_with_bbox.clone(),
+ options.clone(),
+ vec!["part".into()],
+ )
+ .await
+ .unwrap_err();
+ assert!(err
+ .message()
+ .starts_with("Can't overwrite GeoParquet 1.1 bbox column 'bbox'"));
+
+ options.overwrite_bbox_columns = true;
+ test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+ .await
+ .unwrap();
+ }
+
+ #[tokio::test]
+ async fn geoparquet_1_1_with_partition() {
+ let example = test_geoparquet("example", "geometry").unwrap();
+ let ctx = setup_context();
+ let df = ctx
+ .table(&example)
+ .await
+ .unwrap()
+ // DataFusion internals loose the nullability we assigned to the
bbox
+ // and without this line the test fails.
+ .filter(Expr::IsNotNull(col("geometry").into()))
+ .unwrap()
+ .select(vec![
+ lit("some_partition").alias("part"),
+ col("wkt"),
+ col("geometry"),
+ ])
+ .unwrap();
+
+ let mut options = TableGeoParquetOptions::new();
+ options.geoparquet_version = GeoParquetVersion::V1_1;
+
+ let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
+
+ let df_batches_with_bbox = df
+ .clone()
+ .select(vec![
+ col("wkt"),
+ bbox_udf.call(vec![col("geometry")]).alias("bbox"),
+ col("geometry"),
+ lit(ScalarValue::Dictionary(
+ DataType::UInt16.into(),
+
ScalarValue::Utf8(Some("some_partition".to_string())).into(),
+ ))
+ .alias("part"),
+ ])
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ test_write_dataframe(ctx, df, df_batches_with_bbox, options,
vec!["part".into()])
+ .await
+ .unwrap();
+ }
+
+ #[test]
+ fn float_bbox() {
+ let tester = ScalarUdfTester::new(geoparquet_bbox_udf().into(),
vec![WKB_GEOMETRY]);
+ assert_eq!(
+ tester.return_type().unwrap(),
+ SedonaType::Arrow(bbox_type())
+ );
+
+ let array = create_array(
+ &[
+ Some("POINT (0 1)"),
+ Some("POINT (2 3)"),
+ Some("LINESTRING (4 5, 6 7)"),
+ ],
+ &WKB_GEOMETRY,
+ );
+
+ let result = tester.invoke_array(array).unwrap();
+ assert_eq!(result.len(), 3);
+
+ let expected_cols_f64 = [
+ create_array!(Float64, [Some(0.0), Some(2.0), Some(4.0)]),
+ create_array!(Float64, [Some(1.0), Some(3.0), Some(5.0)]),
+ create_array!(Float64, [Some(0.0), Some(2.0), Some(6.0)]),
+ create_array!(Float64, [Some(1.0), Some(3.0), Some(7.0)]),
+ ];
+
+ let result_struct = as_struct_array(&result).unwrap();
+ let actual_cols = result_struct
+ .columns()
+ .iter()
+ .map(|col| as_float32_array(col).unwrap())
+ .collect::<Vec<_>>();
+ for i in 0..result.len() {
+ let actual = actual_cols
+ .iter()
+ .map(|a| a.value(i) as f64)
+ .collect::<Vec<_>>();
+ let expected = expected_cols_f64
+ .iter()
+ .map(|e| e.value(i))
+ .collect::<Vec<_>>();
+
+ // These values aren't equal (the actual values were float32
values that
+ // had been rounded down); however, they should "contain" the
expected box)
+ assert!(actual[0] <= expected[0]);
+ assert!(actual[1] <= expected[1]);
+ assert!(actual[2] >= expected[2]);
+ assert!(actual[3] >= expected[3]);
+ }
+ }
+
+ #[test]
+ fn float_bbox_null() {
+ let tester = ScalarUdfTester::new(geoparquet_bbox_udf().into(),
vec![WKB_GEOMETRY]);
+
+ let null_result = tester.invoke_scalar(ScalarValue::Null).unwrap();
+ assert!(null_result.is_null());
+ if let ScalarValue::Struct(s) = null_result {
+ let actual_cols = s
+ .columns()
+ .iter()
+ .map(|col| as_float32_array(col).unwrap())
+ .collect::<Vec<_>>();
+ assert!(actual_cols[0].is_null(0));
+ assert!(actual_cols[1].is_null(0));
+ assert!(actual_cols[2].is_null(0));
+ assert!(actual_cols[3].is_null(0));
+ } else {
+ panic!("Expected struct")
+ }
+ }
}