This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new ea07640 bug: Fix `SpatialFilter` for correct pruning when reading
from geoparquet (#51)
ea07640 is described below
commit ea07640f228cee81ecd4c647716b75f9df9f1af0
Author: Peter Nguyen <[email protected]>
AuthorDate: Wed Sep 10 18:49:37 2025 -0700
bug: Fix `SpatialFilter` for correct pruning when reading from geoparquet
(#51)
---
docs/development.md | 6 ++
python/sedonadb/tests/io/test_parquet.py | 124 ++++++++++++++++++++++++++++---
rust/sedona-expr/src/spatial_filter.rs | 73 +++++++++---------
3 files changed, 156 insertions(+), 47 deletions(-)
diff --git a/docs/development.md b/docs/development.md
index 6b67ead..58f7178 100644
--- a/docs/development.md
+++ b/docs/development.md
@@ -34,6 +34,12 @@ git submodule init
git submodule update --recursive
```
+Additionally, some of the data required in the tests can be downloaded by
running the following script.
+
+```bash
+python submodules/download-assets.py
+```
+
Some crates wrap external native libraries and require system dependencies
to build. At this time the only crate that requires this is the
sedona-s2geography
crate, which requires [CMake](https://cmake.org),
diff --git a/python/sedonadb/tests/io/test_parquet.py
b/python/sedonadb/tests/io/test_parquet.py
index 2af3577..42fcaf5 100644
--- a/python/sedonadb/tests/io/test_parquet.py
+++ b/python/sedonadb/tests/io/test_parquet.py
@@ -60,13 +60,25 @@ def test_read_sedona_testing(sedona_testing, name):
@pytest.mark.parametrize("name", ["water-junc", "water-point"])
-def test_read_geoparquet_pruned(geoarrow_data, name):
[email protected](
+ "predicate",
+ [
+ "intersects",
+ "coveredby",
+ "within",
+ "equals",
+ "disjoint",
+ ],
+)
+def test_read_geoparquet_prune_points(geoarrow_data, name, predicate):
# Note that this doesn't check that pruning actually occurred, just that
# for a query where we should be pruning automatically that we don't omit
results.
eng = SedonaDB()
path = geoarrow_data / "ns-water" / "files" /
f"ns-water_{name}_geo.parquet"
skip_if_not_exists(path)
+ gdf = geopandas.read_parquet(path)
+
# Roughly a diamond around Gaspereau Lake, Nova Scotia, in UTM zone 20
wkt_filter = """
POLYGON ((
@@ -74,14 +86,22 @@ def test_read_geoparquet_pruned(geoarrow_data, name):
376000 4983000, 371000 4978000
))
"""
+
+ # Use a wkt_filter that will lead to non-empty results
+ if predicate == "equals":
+ wkt_filter = gdf.geometry.iloc[0].wkt
+
poly_filter = shapely.from_wkt(wkt_filter)
- gdf = geopandas.read_parquet(path)
- gdf = (
- gdf[gdf.geometry.intersects(poly_filter)]
- .sort_values(by="OBJECTID")
- .reset_index(drop=True)
- )
+ if predicate == "equals":
+ # Geopandas does not have an equals predicate, so we use the ==
operator
+ mask = gdf.geometry == poly_filter
+ elif predicate == "coveredby":
+ mask = gdf.geometry.covered_by(poly_filter)
+ else:
+ mask = getattr(gdf.geometry, predicate)(poly_filter)
+
+ gdf = gdf[mask].sort_values(by="OBJECTID").reset_index(drop=True)
gdf = gdf[["OBJECTID", "geometry"]]
# Make sure this isn't a bogus test
@@ -102,7 +122,7 @@ def test_read_geoparquet_pruned(geoarrow_data, name):
result = eng.execute_and_collect(
f"""
SELECT "OBJECTID", geometry FROM tab
- WHERE ST_Intersects(geometry,
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
+ WHERE ST_{predicate}(geometry,
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
ORDER BY "OBJECTID";
"""
)
@@ -127,8 +147,94 @@ def test_read_geoparquet_pruned(geoarrow_data, name):
result = eng.execute_and_collect(
f"""
SELECT * FROM tab_dataset
- WHERE ST_Intersects(geometry,
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
+ WHERE ST_{predicate}(geometry,
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
ORDER BY "OBJECTID";
"""
)
eng.assert_result(result, gdf)
+
+
[email protected](
+ "predicate",
+ [
+ "contains",
+ "covers",
+ "touches",
+ ],
+)
+def test_read_geoparquet_prune_polygons(sedona_testing, predicate):
+ # Note that this doesn't check that pruning actually occurred, just that
+ # for a query where we should be pruning automatically that we don't omit
results.
+ eng = SedonaDB()
+ path = sedona_testing / "data" / "parquet" / "geoparquet-1.0.0.parquet"
+ skip_if_not_exists(path)
+
+ # A point inside of a polygon for contains / covers
+ wkt_filter = "POINT (33.60 -5.54)"
+
+ if predicate == "touches":
+ # A point on the boundary of a polygon
+ wkt_filter = "POINT (33.90371119710453 -0.9500000000000001)"
+
+ poly_filter = shapely.from_wkt(wkt_filter)
+
+ gdf = geopandas.read_parquet(path)
+ if predicate == "equals":
+ # Geopandas does not have an equals predicate, so we use the ==
operator
+ mask = gdf.geometry == poly_filter
+ elif predicate == "coveredby":
+ mask = gdf.geometry.covered_by(poly_filter)
+ else:
+ mask = getattr(gdf.geometry, predicate)(poly_filter)
+
+ gdf = gdf[mask].sort_values(by="pop_est").reset_index(drop=True)
+ gdf = gdf[["pop_est", "geometry"]]
+
+ # Make sure this isn't a bogus test
+ assert len(gdf) > 0
+
+ with tempfile.TemporaryDirectory() as td:
+ # Write using GeoPandas, which implements GeoParquet 1.1 bbox covering
+ # Write tiny row groups so that many bounding boxes have to be checked
+ tmp_parquet = Path(td) / "geoparquet-1.0.0.parquet"
+ geopandas.read_parquet(path).to_parquet(
+ tmp_parquet,
+ schema_version="1.1.0",
+ write_covering_bbox=True,
+ row_group_size=1024,
+ )
+
+ eng.create_view_parquet("tab", tmp_parquet)
+ result = eng.execute_and_collect(
+ f"""
+ SELECT "pop_est", geometry FROM tab
+ WHERE ST_{predicate}(geometry,
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
+ ORDER BY "pop_est";
+ """
+ )
+ eng.assert_result(result, gdf)
+
+ # Write a dataset with one file per row group to check file pruning
correctness
+ ds_dir = Path(td) / "ds"
+ ds_dir.mkdir()
+ ds_paths = []
+
+ with parquet.ParquetFile(tmp_parquet) as f:
+ for i in range(f.metadata.num_row_groups):
+ tab = f.read_row_group(i, ["pop_est", "geometry"])
+ df = geopandas.GeoDataFrame.from_arrow(tab)
+ ds_path = ds_dir / f"file{i}.parquet"
+ df.to_parquet(ds_path)
+ ds_paths.append(ds_path)
+
+ # Check a query against the same dataset without the bbox column but
with file-level
+ # geoparquet metadata bounding boxes
+ eng.create_view_parquet("tab_dataset", ds_paths)
+ result = eng.execute_and_collect(
+ f"""
+ SELECT * FROM tab_dataset
+ WHERE ST_{predicate}(geometry,
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
+ ORDER BY "pop_est";
+ """
+ )
+ eng.assert_result(result, gdf)
diff --git a/rust/sedona-expr/src/spatial_filter.rs
b/rust/sedona-expr/src/spatial_filter.rs
index 348a4c4..64feda2 100644
--- a/rust/sedona-expr/src/spatial_filter.rs
+++ b/rust/sedona-expr/src/spatial_filter.rs
@@ -45,8 +45,8 @@ use crate::{
pub enum SpatialFilter {
/// ST_Intersects(\<column\>, \<literal\>) or ST_Intersects(\<literal\>,
\<column\>)
Intersects(Column, BoundingBox),
- /// ST_CoveredBy(\<column\>, \<literal\>) or ST_CoveredBy(\<literal\>,
\<column\>)
- CoveredBy(Column, BoundingBox),
+ /// ST_Covers(\<column\>, \<literal\>) or ST_Covers(\<literal\>,
\<column\>)
+ Covers(Column, BoundingBox),
/// ST_HasZ(\<column\>)
HasZ(Column),
/// Logical AND
@@ -69,8 +69,8 @@ impl SpatialFilter {
SpatialFilter::Intersects(column, bounds) => {
Self::evaluate_intersects_bbox(&table_stats[column.index()],
bounds)
}
- SpatialFilter::CoveredBy(column, bounds) => {
- Self::evaluate_covered_by_bbox(&table_stats[column.index()],
bounds)
+ SpatialFilter::Covers(column, bounds) => {
+ Self::evaluate_covers_bbox(&table_stats[column.index()],
bounds)
}
SpatialFilter::HasZ(column) =>
Self::evaluate_has_z(&table_stats[column.index()]),
SpatialFilter::And(lhs, rhs) => Self::evaluate_and(lhs, rhs,
table_stats),
@@ -88,9 +88,9 @@ impl SpatialFilter {
}
}
- fn evaluate_covered_by_bbox(column_stats: &GeoStatistics, bounds:
&BoundingBox) -> bool {
+ fn evaluate_covers_bbox(column_stats: &GeoStatistics, bounds:
&BoundingBox) -> bool {
if let Some(bbox) = column_stats.bbox() {
- bounds.contains(bbox)
+ bbox.contains(bounds)
} else {
true
}
@@ -203,19 +203,19 @@ impl SpatialFilter {
match (&args[0], &args[1]) {
(ArgRef::Col(column), ArgRef::Lit(literal)) => {
- // column within/covered_by literal -> CoveredBy filter
+ // column within/covered_by literal -> Intersects
filter
match literal_bounds(literal) {
Ok(literal_bounds) => {
- Ok(Some(Self::CoveredBy(column.clone(),
literal_bounds)))
+ Ok(Some(Self::Intersects(column.clone(),
literal_bounds)))
}
Err(e) =>
Err(DataFusionError::External(Box::new(e))),
}
}
(ArgRef::Lit(literal), ArgRef::Col(column)) => {
- // literal within/covered_by column -> Intersects
filter
+ // literal within/covered_by column -> Covers filter
match literal_bounds(literal) {
Ok(literal_bounds) => {
- Ok(Some(Self::Intersects(column.clone(),
literal_bounds)))
+ Ok(Some(Self::Covers(column.clone(),
literal_bounds)))
}
Err(e) =>
Err(DataFusionError::External(Box::new(e))),
}
@@ -231,21 +231,21 @@ impl SpatialFilter {
match (&args[0], &args[1]) {
(ArgRef::Col(column), ArgRef::Lit(literal)) => {
- // column contains/covers literal -> Intersects filter
- // (column must potentially intersect literal to
contain it)
+ // column contains/covers literal -> Covers filter
+ // (column's bbox must fully cover literal's bbox)
match literal_bounds(literal) {
Ok(literal_bounds) => {
- Ok(Some(Self::Intersects(column.clone(),
literal_bounds)))
+ Ok(Some(Self::Covers(column.clone(),
literal_bounds)))
}
Err(e) =>
Err(DataFusionError::External(Box::new(e))),
}
}
(ArgRef::Lit(literal), ArgRef::Col(column)) => {
- // literal contains/covers column -> CoveredBy filter
- // (equivalent to st_within(column, literal))
+ // literal contains/covers column -> Intersects filter
+ // (if literal contains column, they must at least
intersect)
match literal_bounds(literal) {
Ok(literal_bounds) => {
- Ok(Some(Self::CoveredBy(column.clone(),
literal_bounds)))
+ Ok(Some(Self::Intersects(column.clone(),
literal_bounds)))
}
Err(e) =>
Err(DataFusionError::External(Box::new(e))),
}
@@ -424,7 +424,7 @@ mod test {
}
#[test]
- fn predicate_covered_by() {
+ fn predicate_covers() {
let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
let literal = Literal::new_with_metadata(
create_scalar(Some("POLYGON ((0 0, 4 0, 4 4, 0 4, 0 0))"),
&WKB_GEOMETRY),
@@ -433,20 +433,17 @@ mod test {
let bounds = literal_bounds(&literal).unwrap();
let stats_no_info = [GeoStatistics::unspecified()];
- let stats_covered = [
- GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((1.0,
1.0), (2.0, 2.0))))
- ];
+ let stats_covered =
+ [GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0,
4), (0, 4))))];
let stats_not_covered = [
GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((3.0,
3.0), (5.0, 5.0))))
];
let col0 = Column::new("col0", 0);
- // CoveredBy should return true when column bbox is fully contained in
literal bounds
- assert!(SpatialFilter::CoveredBy(col0.clone(),
bounds.clone()).evaluate(&stats_no_info));
- assert!(SpatialFilter::CoveredBy(col0.clone(),
bounds.clone()).evaluate(&stats_covered));
- assert!(
- !SpatialFilter::CoveredBy(col0.clone(),
bounds.clone()).evaluate(&stats_not_covered)
- );
+ // Covers should return true when column bbox is fully contained in
literal bounds
+ assert!(SpatialFilter::Covers(col0.clone(),
bounds.clone()).evaluate(&stats_no_info));
+ assert!(SpatialFilter::Covers(col0.clone(),
bounds.clone()).evaluate(&stats_covered));
+ assert!(!SpatialFilter::Covers(col0.clone(),
bounds.clone()).evaluate(&stats_not_covered));
}
#[test]
@@ -607,8 +604,8 @@ mod test {
));
let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
assert!(
- matches!(predicate, SpatialFilter::CoveredBy(_, _)),
- "Function {func_name} should produce CoveredBy filter"
+ matches!(predicate, SpatialFilter::Intersects(_, _)),
+ "Function {func_name} should produce Intersects filter"
);
// Test reversed argument order: should be converted to Intersects
filter
@@ -620,8 +617,8 @@ mod test {
));
let predicate_reversed =
SpatialFilter::try_from_expr(&expr_reversed).unwrap();
assert!(
- matches!(predicate_reversed, SpatialFilter::Intersects(_, _)),
- "Function {func_name} with reversed args should produce Intersects
filter"
+ matches!(predicate_reversed, SpatialFilter::Covers(_, _)),
+ "Function {func_name} with reversed args should produce Covers
filter"
);
}
@@ -636,8 +633,8 @@ mod test {
Some(storage_field.metadata().into()),
));
- // Test functions that should result in Intersects filter when column
is first arg
- // (column contains/covers literal -> column must intersect literal)
+ // Test functions that should result in CoveredBy filter when column
is first arg
+ // (column contains/covers literal -> column's bbox must fully contain
literal's bbox)
let func = create_dummy_spatial_function(func_name, 2);
let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
func_name,
@@ -647,12 +644,12 @@ mod test {
));
let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
assert!(
- matches!(predicate, SpatialFilter::Intersects(_, _)),
- "Function {func_name} should produce Intersects filter"
+ matches!(predicate, SpatialFilter::Covers(_, _)),
+ "Function {func_name} should produce CoveredBy filter"
);
- // Test reversed argument order: should be converted to CoveredBy
filter
- // (literal contains/covers column -> equivalent to st_within(column,
literal))
+ // Test reversed argument order: should be converted to Intersects
filter
+ // (literal contains/covers column -> they must at least intersect)
let expr_reversed: Arc<dyn PhysicalExpr> =
Arc::new(ScalarFunctionExpr::new(
func_name,
Arc::new(func),
@@ -661,8 +658,8 @@ mod test {
));
let predicate_reversed =
SpatialFilter::try_from_expr(&expr_reversed).unwrap();
assert!(
- matches!(predicate_reversed, SpatialFilter::CoveredBy(_, _)),
- "Function {func_name} with reversed args should produce CoveredBy
filter"
+ matches!(predicate_reversed, SpatialFilter::Intersects(_, _)),
+ "Function {func_name} with reversed args should produce Intersects
filter"
);
}