This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new ea07640  bug: Fix `SpatialFilter` for correct pruning when reading 
from geoparquet (#51)
ea07640 is described below

commit ea07640f228cee81ecd4c647716b75f9df9f1af0
Author: Peter Nguyen <[email protected]>
AuthorDate: Wed Sep 10 18:49:37 2025 -0700

    bug: Fix `SpatialFilter` for correct pruning when reading from geoparquet 
(#51)
---
 docs/development.md                      |   6 ++
 python/sedonadb/tests/io/test_parquet.py | 124 ++++++++++++++++++++++++++++---
 rust/sedona-expr/src/spatial_filter.rs   |  73 +++++++++---------
 3 files changed, 156 insertions(+), 47 deletions(-)

diff --git a/docs/development.md b/docs/development.md
index 6b67ead..58f7178 100644
--- a/docs/development.md
+++ b/docs/development.md
@@ -34,6 +34,12 @@ git submodule init
 git submodule update --recursive
 ```
 
+Additionally, some of the data required in the tests can be downloaded by 
running the following script.
+
+```bash
+python submodules/download-assets.py
+```
+
 Some crates wrap external native libraries and require system dependencies
 to build. At this time the only crate that requires this is the 
sedona-s2geography
 crate, which requires [CMake](https://cmake.org),
diff --git a/python/sedonadb/tests/io/test_parquet.py 
b/python/sedonadb/tests/io/test_parquet.py
index 2af3577..42fcaf5 100644
--- a/python/sedonadb/tests/io/test_parquet.py
+++ b/python/sedonadb/tests/io/test_parquet.py
@@ -60,13 +60,25 @@ def test_read_sedona_testing(sedona_testing, name):
 
 
 @pytest.mark.parametrize("name", ["water-junc", "water-point"])
-def test_read_geoparquet_pruned(geoarrow_data, name):
[email protected](
+    "predicate",
+    [
+        "intersects",
+        "coveredby",
+        "within",
+        "equals",
+        "disjoint",
+    ],
+)
+def test_read_geoparquet_prune_points(geoarrow_data, name, predicate):
     # Note that this doesn't check that pruning actually occurred, just that
     # for a query where we should be pruning automatically that we don't omit 
results.
     eng = SedonaDB()
     path = geoarrow_data / "ns-water" / "files" / 
f"ns-water_{name}_geo.parquet"
     skip_if_not_exists(path)
 
+    gdf = geopandas.read_parquet(path)
+
     # Roughly a diamond around Gaspereau Lake, Nova Scotia, in UTM zone 20
     wkt_filter = """
         POLYGON ((
@@ -74,14 +86,22 @@ def test_read_geoparquet_pruned(geoarrow_data, name):
             376000 4983000, 371000 4978000
         ))
     """
+
+    # Use a wkt_filter that will lead to non-empty results
+    if predicate == "equals":
+        wkt_filter = gdf.geometry.iloc[0].wkt
+
     poly_filter = shapely.from_wkt(wkt_filter)
 
-    gdf = geopandas.read_parquet(path)
-    gdf = (
-        gdf[gdf.geometry.intersects(poly_filter)]
-        .sort_values(by="OBJECTID")
-        .reset_index(drop=True)
-    )
+    if predicate == "equals":
+        # Geopandas does not have an equals predicate, so we use the == 
operator
+        mask = gdf.geometry == poly_filter
+    elif predicate == "coveredby":
+        mask = gdf.geometry.covered_by(poly_filter)
+    else:
+        mask = getattr(gdf.geometry, predicate)(poly_filter)
+
+    gdf = gdf[mask].sort_values(by="OBJECTID").reset_index(drop=True)
     gdf = gdf[["OBJECTID", "geometry"]]
 
     # Make sure this isn't a bogus test
@@ -102,7 +122,7 @@ def test_read_geoparquet_pruned(geoarrow_data, name):
         result = eng.execute_and_collect(
             f"""
             SELECT "OBJECTID", geometry FROM tab
-            WHERE ST_Intersects(geometry, 
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
+            WHERE ST_{predicate}(geometry, 
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
             ORDER BY "OBJECTID";
         """
         )
@@ -127,8 +147,94 @@ def test_read_geoparquet_pruned(geoarrow_data, name):
         result = eng.execute_and_collect(
             f"""
             SELECT * FROM tab_dataset
-            WHERE ST_Intersects(geometry, 
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
+            WHERE ST_{predicate}(geometry, 
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
             ORDER BY "OBJECTID";
         """
         )
         eng.assert_result(result, gdf)
+
+
[email protected](
+    "predicate",
+    [
+        "contains",
+        "covers",
+        "touches",
+    ],
+)
+def test_read_geoparquet_prune_polygons(sedona_testing, predicate):
+    # Note that this doesn't check that pruning actually occurred, just that
+    # for a query where we should be pruning automatically that we don't omit 
results.
+    eng = SedonaDB()
+    path = sedona_testing / "data" / "parquet" / "geoparquet-1.0.0.parquet"
+    skip_if_not_exists(path)
+
+    # A point inside of a polygon for contains / covers
+    wkt_filter = "POINT (33.60 -5.54)"
+
+    if predicate == "touches":
+        # A point on the boundary of a polygon
+        wkt_filter = "POINT (33.90371119710453 -0.9500000000000001)"
+
+    poly_filter = shapely.from_wkt(wkt_filter)
+
+    gdf = geopandas.read_parquet(path)
+    if predicate == "equals":
+        # Geopandas does not have an equals predicate, so we use the == 
operator
+        mask = gdf.geometry == poly_filter
+    elif predicate == "coveredby":
+        mask = gdf.geometry.covered_by(poly_filter)
+    else:
+        mask = getattr(gdf.geometry, predicate)(poly_filter)
+
+    gdf = gdf[mask].sort_values(by="pop_est").reset_index(drop=True)
+    gdf = gdf[["pop_est", "geometry"]]
+
+    # Make sure this isn't a bogus test
+    assert len(gdf) > 0
+
+    with tempfile.TemporaryDirectory() as td:
+        # Write using GeoPandas, which implements GeoParquet 1.1 bbox covering
+        # Write tiny row groups so that many bounding boxes have to be checked
+        tmp_parquet = Path(td) / "geoparquet-1.0.0.parquet"
+        geopandas.read_parquet(path).to_parquet(
+            tmp_parquet,
+            schema_version="1.1.0",
+            write_covering_bbox=True,
+            row_group_size=1024,
+        )
+
+        eng.create_view_parquet("tab", tmp_parquet)
+        result = eng.execute_and_collect(
+            f"""
+            SELECT "pop_est", geometry FROM tab
+            WHERE ST_{predicate}(geometry, 
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
+            ORDER BY "pop_est";
+        """
+        )
+        eng.assert_result(result, gdf)
+
+        # Write a dataset with one file per row group to check file pruning 
correctness
+        ds_dir = Path(td) / "ds"
+        ds_dir.mkdir()
+        ds_paths = []
+
+        with parquet.ParquetFile(tmp_parquet) as f:
+            for i in range(f.metadata.num_row_groups):
+                tab = f.read_row_group(i, ["pop_est", "geometry"])
+                df = geopandas.GeoDataFrame.from_arrow(tab)
+                ds_path = ds_dir / f"file{i}.parquet"
+                df.to_parquet(ds_path)
+                ds_paths.append(ds_path)
+
+        # Check a query against the same dataset without the bbox column but 
with file-level
+        # geoparquet metadata bounding boxes
+        eng.create_view_parquet("tab_dataset", ds_paths)
+        result = eng.execute_and_collect(
+            f"""
+            SELECT * FROM tab_dataset
+            WHERE ST_{predicate}(geometry, 
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
+            ORDER BY "pop_est";
+        """
+        )
+        eng.assert_result(result, gdf)
diff --git a/rust/sedona-expr/src/spatial_filter.rs 
b/rust/sedona-expr/src/spatial_filter.rs
index 348a4c4..64feda2 100644
--- a/rust/sedona-expr/src/spatial_filter.rs
+++ b/rust/sedona-expr/src/spatial_filter.rs
@@ -45,8 +45,8 @@ use crate::{
 pub enum SpatialFilter {
     /// ST_Intersects(\<column\>, \<literal\>) or ST_Intersects(\<literal\>, 
\<column\>)
     Intersects(Column, BoundingBox),
-    /// ST_CoveredBy(\<column\>, \<literal\>) or ST_CoveredBy(\<literal\>, 
\<column\>)
-    CoveredBy(Column, BoundingBox),
+    /// ST_Covers(\<column\>, \<literal\>) or ST_Covers(\<literal\>, 
\<column\>)
+    Covers(Column, BoundingBox),
     /// ST_HasZ(\<column\>)
     HasZ(Column),
     /// Logical AND
@@ -69,8 +69,8 @@ impl SpatialFilter {
             SpatialFilter::Intersects(column, bounds) => {
                 Self::evaluate_intersects_bbox(&table_stats[column.index()], 
bounds)
             }
-            SpatialFilter::CoveredBy(column, bounds) => {
-                Self::evaluate_covered_by_bbox(&table_stats[column.index()], 
bounds)
+            SpatialFilter::Covers(column, bounds) => {
+                Self::evaluate_covers_bbox(&table_stats[column.index()], 
bounds)
             }
             SpatialFilter::HasZ(column) => 
Self::evaluate_has_z(&table_stats[column.index()]),
             SpatialFilter::And(lhs, rhs) => Self::evaluate_and(lhs, rhs, 
table_stats),
@@ -88,9 +88,9 @@ impl SpatialFilter {
         }
     }
 
-    fn evaluate_covered_by_bbox(column_stats: &GeoStatistics, bounds: 
&BoundingBox) -> bool {
+    fn evaluate_covers_bbox(column_stats: &GeoStatistics, bounds: 
&BoundingBox) -> bool {
         if let Some(bbox) = column_stats.bbox() {
-            bounds.contains(bbox)
+            bbox.contains(bounds)
         } else {
             true
         }
@@ -203,19 +203,19 @@ impl SpatialFilter {
 
                 match (&args[0], &args[1]) {
                     (ArgRef::Col(column), ArgRef::Lit(literal)) => {
-                        // column within/covered_by literal -> CoveredBy filter
+                        // column within/covered_by literal -> Intersects 
filter
                         match literal_bounds(literal) {
                             Ok(literal_bounds) => {
-                                Ok(Some(Self::CoveredBy(column.clone(), 
literal_bounds)))
+                                Ok(Some(Self::Intersects(column.clone(), 
literal_bounds)))
                             }
                             Err(e) => 
Err(DataFusionError::External(Box::new(e))),
                         }
                     }
                     (ArgRef::Lit(literal), ArgRef::Col(column)) => {
-                        // literal within/covered_by column -> Intersects 
filter
+                        // literal within/covered_by column -> Covers filter
                         match literal_bounds(literal) {
                             Ok(literal_bounds) => {
-                                Ok(Some(Self::Intersects(column.clone(), 
literal_bounds)))
+                                Ok(Some(Self::Covers(column.clone(), 
literal_bounds)))
                             }
                             Err(e) => 
Err(DataFusionError::External(Box::new(e))),
                         }
@@ -231,21 +231,21 @@ impl SpatialFilter {
 
                 match (&args[0], &args[1]) {
                     (ArgRef::Col(column), ArgRef::Lit(literal)) => {
-                        // column contains/covers literal -> Intersects filter
-                        // (column must potentially intersect literal to 
contain it)
+                        // column contains/covers literal -> Covers filter
+                        // (column's bbox must fully cover literal's bbox)
                         match literal_bounds(literal) {
                             Ok(literal_bounds) => {
-                                Ok(Some(Self::Intersects(column.clone(), 
literal_bounds)))
+                                Ok(Some(Self::Covers(column.clone(), 
literal_bounds)))
                             }
                             Err(e) => 
Err(DataFusionError::External(Box::new(e))),
                         }
                     }
                     (ArgRef::Lit(literal), ArgRef::Col(column)) => {
-                        // literal contains/covers column -> CoveredBy filter
-                        // (equivalent to st_within(column, literal))
+                        // literal contains/covers column -> Intersects filter
+                        // (if literal contains column, they must at least 
intersect)
                         match literal_bounds(literal) {
                             Ok(literal_bounds) => {
-                                Ok(Some(Self::CoveredBy(column.clone(), 
literal_bounds)))
+                                Ok(Some(Self::Intersects(column.clone(), 
literal_bounds)))
                             }
                             Err(e) => 
Err(DataFusionError::External(Box::new(e))),
                         }
@@ -424,7 +424,7 @@ mod test {
     }
 
     #[test]
-    fn predicate_covered_by() {
+    fn predicate_covers() {
         let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
         let literal = Literal::new_with_metadata(
             create_scalar(Some("POLYGON ((0 0, 4 0, 4 4, 0 4, 0 0))"), 
&WKB_GEOMETRY),
@@ -433,20 +433,17 @@ mod test {
         let bounds = literal_bounds(&literal).unwrap();
 
         let stats_no_info = [GeoStatistics::unspecified()];
-        let stats_covered = [
-            GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((1.0, 
1.0), (2.0, 2.0))))
-        ];
+        let stats_covered =
+            [GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0, 
4), (0, 4))))];
         let stats_not_covered = [
             GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((3.0, 
3.0), (5.0, 5.0))))
         ];
         let col0 = Column::new("col0", 0);
 
-        // CoveredBy should return true when column bbox is fully contained in 
literal bounds
-        assert!(SpatialFilter::CoveredBy(col0.clone(), 
bounds.clone()).evaluate(&stats_no_info));
-        assert!(SpatialFilter::CoveredBy(col0.clone(), 
bounds.clone()).evaluate(&stats_covered));
-        assert!(
-            !SpatialFilter::CoveredBy(col0.clone(), 
bounds.clone()).evaluate(&stats_not_covered)
-        );
+        // Covers should return true when column bbox is fully contained in 
literal bounds
+        assert!(SpatialFilter::Covers(col0.clone(), 
bounds.clone()).evaluate(&stats_no_info));
+        assert!(SpatialFilter::Covers(col0.clone(), 
bounds.clone()).evaluate(&stats_covered));
+        assert!(!SpatialFilter::Covers(col0.clone(), 
bounds.clone()).evaluate(&stats_not_covered));
     }
 
     #[test]
@@ -607,8 +604,8 @@ mod test {
         ));
         let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
         assert!(
-            matches!(predicate, SpatialFilter::CoveredBy(_, _)),
-            "Function {func_name} should produce CoveredBy filter"
+            matches!(predicate, SpatialFilter::Intersects(_, _)),
+            "Function {func_name} should produce Intersects filter"
         );
 
         // Test reversed argument order: should be converted to Intersects 
filter
@@ -620,8 +617,8 @@ mod test {
         ));
         let predicate_reversed = 
SpatialFilter::try_from_expr(&expr_reversed).unwrap();
         assert!(
-            matches!(predicate_reversed, SpatialFilter::Intersects(_, _)),
-            "Function {func_name} with reversed args should produce Intersects 
filter"
+            matches!(predicate_reversed, SpatialFilter::Covers(_, _)),
+            "Function {func_name} with reversed args should produce Covers 
filter"
         );
     }
 
@@ -636,8 +633,8 @@ mod test {
             Some(storage_field.metadata().into()),
         ));
 
-        // Test functions that should result in Intersects filter when column 
is first arg
-        // (column contains/covers literal -> column must intersect literal)
+        // Test functions that should result in CoveredBy filter when column 
is first arg
+        // (column contains/covers literal -> column's bbox must fully contain 
literal's bbox)
         let func = create_dummy_spatial_function(func_name, 2);
         let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
             func_name,
@@ -647,12 +644,12 @@ mod test {
         ));
         let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
         assert!(
-            matches!(predicate, SpatialFilter::Intersects(_, _)),
-            "Function {func_name} should produce Intersects filter"
+            matches!(predicate, SpatialFilter::Covers(_, _)),
+            "Function {func_name} should produce CoveredBy filter"
         );
 
-        // Test reversed argument order: should be converted to CoveredBy 
filter
-        // (literal contains/covers column -> equivalent to st_within(column, 
literal))
+        // Test reversed argument order: should be converted to Intersects 
filter
+        // (literal contains/covers column -> they must at least intersect)
         let expr_reversed: Arc<dyn PhysicalExpr> = 
Arc::new(ScalarFunctionExpr::new(
             func_name,
             Arc::new(func),
@@ -661,8 +658,8 @@ mod test {
         ));
         let predicate_reversed = 
SpatialFilter::try_from_expr(&expr_reversed).unwrap();
         assert!(
-            matches!(predicate_reversed, SpatialFilter::CoveredBy(_, _)),
-            "Function {func_name} with reversed args should produce CoveredBy 
filter"
+            matches!(predicate_reversed, SpatialFilter::Intersects(_, _)),
+            "Function {func_name} with reversed args should produce Intersects 
filter"
         );
     }
 

Reply via email to