This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 24c4a111 fix(rust/sedona-expr): Fix GeoParquet pruning when number of
final columns is less than the geometry column index (#385)
24c4a111 is described below
commit 24c4a1116ae933ed7c891ed43e356b32897e1e25
Author: Dewey Dunnington <[email protected]>
AuthorDate: Sun Nov 30 20:21:38 2025 -0600
fix(rust/sedona-expr): Fix GeoParquet pruning when number of final columns
is less than the geometry column index (#385)
Co-authored-by: Peter Nguyen <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
rust/sedona-datasource/src/spec.rs | 10 +-
rust/sedona-expr/src/spatial_filter.rs | 294 ++++++++++++++++++++++--------
rust/sedona-expr/src/statistics.rs | 29 +--
rust/sedona-geoparquet/src/file_opener.rs | 19 +-
4 files changed, 255 insertions(+), 97 deletions(-)
diff --git a/rust/sedona-datasource/src/spec.rs
b/rust/sedona-datasource/src/spec.rs
index d9b8f1af..2c2ca31a 100644
--- a/rust/sedona-datasource/src/spec.rs
+++ b/rust/sedona-datasource/src/spec.rs
@@ -141,7 +141,15 @@ pub struct OpenReaderArgs {
/// Filter expressions
///
/// Expressions that may be used for pruning. Implementations need not
- /// apply these filters.
+ /// apply these filters to produce a correct result (i.e., DataFusion will
+ /// evaluate the filters at a later step regardless of how this
implementation
+ /// uses the provided filters).
+ ///
+ /// Note that `Column`s in this [PhysicalExpr] are relative to
`file_projection`.
+ /// For example, in a scan with file_projection `[5, 6]` (i.e.,
DataFusion is only
+ /// requesting the 6th and 7th columns from the `file_schema` inferred for
this object),
+ /// a `Column { index: 1, ... }` refers to the column at index 6 (i.e.,
+ /// `file_schema.field(file_projection[1])`).
pub filters: Vec<Arc<dyn PhysicalExpr>>,
}
diff --git a/rust/sedona-expr/src/spatial_filter.rs
b/rust/sedona-expr/src/spatial_filter.rs
index 733cd746..922b8ded 100644
--- a/rust/sedona-expr/src/spatial_filter.rs
+++ b/rust/sedona-expr/src/spatial_filter.rs
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
+use std::{collections::HashMap, sync::Arc};
use arrow_schema::{DataType, Schema};
use datafusion_common::{DataFusionError, Result, ScalarValue};
@@ -30,7 +30,7 @@ use sedona_geometry::{
bounds::wkb_bounds_xy,
interval::{Interval, IntervalTrait},
};
-use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
+use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher,
schema::SedonaSchema};
use crate::{
statistics::GeoStatistics,
@@ -106,19 +106,24 @@ impl SpatialFilter {
///
/// In other words, returns false if and only if the expression is
guaranteed
/// to be false.
- pub fn evaluate(&self, table_stats: &[GeoStatistics]) -> bool {
+ pub fn evaluate(&self, table_stats: &TableGeoStatistics) -> Result<bool> {
+ self.evaluate_internal(table_stats)
+ }
+
+ fn evaluate_internal(&self, table_stats: &TableGeoStatistics) ->
Result<bool> {
match self {
- SpatialFilter::Intersects(column, bounds) => {
- Self::evaluate_intersects_bbox(&table_stats[column.index()],
bounds)
- }
+ SpatialFilter::Intersects(column, bounds) =>
Ok(Self::evaluate_intersects_bbox(
+ table_stats.get(column)?,
+ bounds,
+ )),
SpatialFilter::Covers(column, bounds) => {
- Self::evaluate_covers_bbox(&table_stats[column.index()],
bounds)
+ Ok(Self::evaluate_covers_bbox(table_stats.get(column)?,
bounds))
}
- SpatialFilter::HasZ(column) =>
Self::evaluate_has_z(&table_stats[column.index()]),
+ SpatialFilter::HasZ(column) =>
Ok(Self::evaluate_has_z(table_stats.get(column)?)),
SpatialFilter::And(lhs, rhs) => Self::evaluate_and(lhs, rhs,
table_stats),
SpatialFilter::Or(lhs, rhs) => Self::evaluate_or(lhs, rhs,
table_stats),
- SpatialFilter::LiteralFalse => false,
- SpatialFilter::Unknown => true,
+ SpatialFilter::LiteralFalse => Ok(false),
+ SpatialFilter::Unknown => Ok(true),
}
}
@@ -161,16 +166,16 @@ impl SpatialFilter {
true
}
- fn evaluate_and(lhs: &Self, rhs: &Self, table_stats: &[GeoStatistics]) ->
bool {
- let maybe_lhs = lhs.evaluate(table_stats);
- let maybe_rhs = rhs.evaluate(table_stats);
- maybe_lhs && maybe_rhs
+ fn evaluate_and(lhs: &Self, rhs: &Self, table_stats: &TableGeoStatistics)
-> Result<bool> {
+ let maybe_lhs = lhs.evaluate_internal(table_stats)?;
+ let maybe_rhs = rhs.evaluate_internal(table_stats)?;
+ Ok(maybe_lhs && maybe_rhs)
}
- fn evaluate_or(lhs: &Self, rhs: &Self, table_stats: &[GeoStatistics]) ->
bool {
- let maybe_lhs = lhs.evaluate(table_stats);
- let maybe_rhs = rhs.evaluate(table_stats);
- maybe_lhs || maybe_rhs
+ fn evaluate_or(lhs: &Self, rhs: &Self, table_stats: &TableGeoStatistics)
-> Result<bool> {
+ let maybe_lhs = lhs.evaluate_internal(table_stats)?;
+ let maybe_rhs = rhs.evaluate_internal(table_stats)?;
+ Ok(maybe_lhs || maybe_rhs)
}
/// Construct a SpatialPredicate from a [PhysicalExpr]
@@ -389,6 +394,81 @@ impl SpatialFilter {
}
}
+/// Table GeoStatistics
+///
+/// Enables providing a collection of GeoStatistics to
[SpatialFilter::evaluate]
+/// such that attempts to access out-of-bounds values results in a readable
+/// error.
+pub enum TableGeoStatistics {
+ /// Provide statistics for every Column in the table. These must be
+ /// [GeoStatistics::unspecified] for non-spatial columns.
+ ///
+ /// These are resolved using [Column::index].
+ ByPosition(Vec<GeoStatistics>),
+
+ /// Provide statistics for specific named columns. Columns not included
+ /// are treated as [GeoStatistics::unspecified].
+ ///
+ /// These are resolved using [Column::name]. This may be used for logical
+ /// expressions (where columns are resolved by name) or as a workaround
+ /// for physical expressions where the index is relative to a projected
+ /// schema <https://github.com/apache/sedona-db/issues/389>.
+ ByName(HashMap<String, GeoStatistics>),
+}
+
+impl TableGeoStatistics {
+ /// Construct TableGeoStatistics with no columns
+ pub fn empty() -> Self {
+ TableGeoStatistics::ByPosition(vec![])
+ }
+
+ /// Construct TableGeoStatistics from a slice of all column statistics and
a schema
+ pub fn try_from_stats_and_schema(
+ column_stats: &[GeoStatistics],
+ schema: &Schema,
+ ) -> Result<Self> {
+ let mut stats_map = HashMap::new();
+ for i in schema.geometry_column_indices()? {
+ stats_map.insert(schema.field(i).name().to_string(),
column_stats[i].clone());
+ }
+ Ok(Self::ByName(stats_map))
+ }
+
+ /// For a given [Column], obtain [GeoStatistics]
+ ///
+ /// This will error if the provided statistics have an index out of bounds.
+ /// Names that cannot be resolved will be treated as unspecified.
+ fn get(&self, column: &Column) -> Result<&GeoStatistics> {
+ match self {
+ Self::ByPosition(items) => {
+ if column.index() >= items.len() {
+ sedona_internal_err!(
+ "Can't obtain GeoStatistics for column at index {}
from schema with {} columns",
+ column.index(),
+ items.len()
+ )
+ } else {
+ Ok(&items[column.index()])
+ }
+ }
+ Self::ByName(items) => {
+ if let Some(item) = items.get(column.name()) {
+ Ok(item)
+ } else {
+ Ok(&GeoStatistics::UNSPECIFIED)
+ }
+ }
+ }
+ }
+}
+
+// Useful for testing (create from a single GeoStatistics)
+impl From<GeoStatistics> for TableGeoStatistics {
+ fn from(value: GeoStatistics) -> Self {
+ TableGeoStatistics::ByPosition(vec![value])
+ }
+}
+
/// Internal utility to help match physical expression types
enum ArgRef<'a> {
Col(Column),
@@ -492,24 +572,28 @@ mod test {
);
let bounds = literal_bounds(&literal).unwrap();
- let stats_no_info = [GeoStatistics::unspecified()];
- let stats_intersecting = [
- GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0.5,
1.5), (1.5, 2.5))))
- ];
- let col0 = Column::new("col0", 0);
-
- assert!(SpatialFilter::Intersects(col0.clone(),
bounds.clone()).evaluate(&stats_no_info));
- assert!(
- SpatialFilter::Intersects(col0.clone(),
bounds.clone()).evaluate(&stats_intersecting)
+ let stats_no_info =
TableGeoStatistics::from(GeoStatistics::unspecified());
+ let stats_intersecting = TableGeoStatistics::from(
+ GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0.5,
1.5), (1.5, 2.5)))),
);
+ let col0 = Column::new("col0", 0);
- let stats_empty_bbox = [GeoStatistics::unspecified()
- .with_bbox(Some(BoundingBox::xy(Interval::empty(),
Interval::empty())))];
+ assert!(SpatialFilter::Intersects(col0.clone(), bounds.clone())
+ .evaluate(&stats_no_info)
+ .unwrap());
+ assert!(SpatialFilter::Intersects(col0.clone(), bounds.clone())
+ .evaluate(&stats_intersecting)
+ .unwrap());
- assert!(
- !SpatialFilter::Intersects(col0.clone(),
bounds.clone()).evaluate(&stats_empty_bbox)
+ let stats_empty_bbox = TableGeoStatistics::from(
+ GeoStatistics::unspecified()
+ .with_bbox(Some(BoundingBox::xy(Interval::empty(),
Interval::empty()))),
);
+ assert!(!SpatialFilter::Intersects(col0.clone(), bounds.clone())
+ .evaluate(&stats_empty_bbox)
+ .unwrap());
+
let unrelated_literal = Literal::new(ScalarValue::Null);
let err = literal_bounds(&unrelated_literal).unwrap_err();
@@ -527,18 +611,25 @@ mod test {
);
let bounds = literal_bounds(&literal).unwrap();
- let stats_no_info = [GeoStatistics::unspecified()];
- let stats_covered =
- [GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0,
4), (0, 4))))];
- let stats_not_covered = [
- GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((3.0,
3.0), (5.0, 5.0))))
- ];
+ let stats_no_info =
TableGeoStatistics::from(GeoStatistics::unspecified());
+ let stats_covered = TableGeoStatistics::from(
+ GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0,
4), (0, 4)))),
+ );
+ let stats_not_covered = TableGeoStatistics::from(
+ GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((3.0,
3.0), (5.0, 5.0)))),
+ );
let col0 = Column::new("col0", 0);
// Covers should return true when column bbox is fully contained in
literal bounds
- assert!(SpatialFilter::Covers(col0.clone(),
bounds.clone()).evaluate(&stats_no_info));
- assert!(SpatialFilter::Covers(col0.clone(),
bounds.clone()).evaluate(&stats_covered));
- assert!(!SpatialFilter::Covers(col0.clone(),
bounds.clone()).evaluate(&stats_not_covered));
+ assert!(SpatialFilter::Covers(col0.clone(), bounds.clone())
+ .evaluate(&stats_no_info)
+ .unwrap());
+ assert!(SpatialFilter::Covers(col0.clone(), bounds.clone())
+ .evaluate(&stats_covered)
+ .unwrap());
+ assert!(!SpatialFilter::Covers(col0.clone(), bounds.clone())
+ .evaluate(&stats_not_covered)
+ .unwrap());
}
#[test]
@@ -546,73 +637,77 @@ mod test {
let col0 = Column::new("col0", 0);
let has_z = SpatialFilter::HasZ(col0.clone());
- let stats_z_geometry_types = [GeoStatistics::unspecified()
- .try_with_str_geometry_types(Some(&["POINT Z"]))
- .unwrap()];
- let stats_z_bbox = [
- GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xyzm(
- (0, 1),
- (2, 3),
- Some((4, 5).into()),
- None,
- ))),
- ];
- let stats_no_info = [GeoStatistics::unspecified()];
-
- assert!(has_z.evaluate(&stats_z_geometry_types));
- assert!(has_z.evaluate(&stats_z_bbox));
- assert!(has_z.evaluate(&stats_no_info));
-
- let stats_no_z_geometry_types = [GeoStatistics::unspecified()
- .try_with_str_geometry_types(Some(&["POINT"]))
- .unwrap()];
- let stats_no_z_bbox = [
- GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xyzm(
- (0, 1),
- (2, 3),
- Some(Interval::empty()),
- None,
- ))),
- ];
-
- assert!(!has_z.evaluate(&stats_no_z_geometry_types));
- assert!(!has_z.evaluate(&stats_no_z_bbox));
+ let stats_z_geometry_types = TableGeoStatistics::from(
+ GeoStatistics::unspecified()
+ .try_with_str_geometry_types(Some(&["POINT Z"]))
+ .unwrap(),
+ );
+ let stats_z_bbox =
TableGeoStatistics::from(GeoStatistics::unspecified().with_bbox(Some(
+ BoundingBox::xyzm((0, 1), (2, 3), Some((4, 5).into()), None),
+ )));
+ let stats_no_info =
TableGeoStatistics::from(GeoStatistics::unspecified());
+
+ assert!(has_z.evaluate(&stats_z_geometry_types).unwrap());
+ assert!(has_z.evaluate(&stats_z_bbox).unwrap());
+ assert!(has_z.evaluate(&stats_no_info).unwrap());
+
+ let stats_no_z_geometry_types = TableGeoStatistics::from(
+ GeoStatistics::unspecified()
+ .try_with_str_geometry_types(Some(&["POINT"]))
+ .unwrap(),
+ );
+ let stats_no_z_bbox =
+
TableGeoStatistics::from(GeoStatistics::unspecified().with_bbox(Some(
+ BoundingBox::xyzm((0, 1), (2, 3), Some(Interval::empty()),
None),
+ )));
+
+ assert!(!has_z.evaluate(&stats_no_z_geometry_types).unwrap());
+ assert!(!has_z.evaluate(&stats_no_z_bbox).unwrap());
}
#[test]
fn predicate_other() {
- assert!(!SpatialFilter::LiteralFalse.evaluate(&[]));
- assert!(SpatialFilter::Unknown.evaluate(&[]));
+ assert!(!SpatialFilter::LiteralFalse
+ .evaluate(&TableGeoStatistics::empty())
+ .unwrap());
+ assert!(SpatialFilter::Unknown
+ .evaluate(&TableGeoStatistics::empty())
+ .unwrap());
assert!(SpatialFilter::And(
Box::new(SpatialFilter::Unknown),
Box::new(SpatialFilter::Unknown)
)
- .evaluate(&[]));
+ .evaluate(&TableGeoStatistics::empty())
+ .unwrap());
assert!(!SpatialFilter::And(
Box::new(SpatialFilter::Unknown),
Box::new(SpatialFilter::LiteralFalse)
)
- .evaluate(&[]));
+ .evaluate(&TableGeoStatistics::empty())
+ .unwrap());
assert!(SpatialFilter::Or(
Box::new(SpatialFilter::Unknown),
Box::new(SpatialFilter::Unknown)
)
- .evaluate(&[]));
+ .evaluate(&TableGeoStatistics::empty())
+ .unwrap());
assert!(SpatialFilter::Or(
Box::new(SpatialFilter::Unknown),
Box::new(SpatialFilter::LiteralFalse)
)
- .evaluate(&[]));
+ .evaluate(&TableGeoStatistics::empty())
+ .unwrap());
assert!(!SpatialFilter::Or(
Box::new(SpatialFilter::LiteralFalse),
Box::new(SpatialFilter::LiteralFalse)
)
- .evaluate(&[]));
+ .evaluate(&TableGeoStatistics::empty())
+ .unwrap());
}
#[test]
@@ -1146,6 +1241,49 @@ mod test {
}
}
+ #[test]
+ fn table_geo_stats_position() {
+ let column_stats =
+ GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0.5,
1.5), (1.5, 2.5))));
+ let table_stats = TableGeoStatistics::from(column_stats.clone());
+
+ assert_eq!(
+ table_stats.get(&Column::new("col0", 0)).unwrap(),
+ &column_stats
+ );
+ assert!(table_stats.get(&Column::new("col1", 1)).is_err());
+ }
+
+ #[test]
+ fn table_geo_stats_name() {
+ let geo_stats =
+ GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0.5,
1.5), (1.5, 2.5))));
+ let schema = Schema::new(vec![
+ Field::new("col0", DataType::Binary, true),
+ WKB_GEOMETRY.to_storage_field("col1", true).unwrap(),
+ ]);
+ let table_stats = TableGeoStatistics::try_from_stats_and_schema(
+ &[GeoStatistics::UNSPECIFIED, geo_stats.clone()],
+ &schema,
+ )
+ .unwrap();
+
+ assert_eq!(
+ table_stats.get(&Column::new("col0", usize::MAX)).unwrap(),
+ &GeoStatistics::UNSPECIFIED
+ );
+ assert_eq!(
+ table_stats.get(&Column::new("col1", usize::MAX)).unwrap(),
+ &geo_stats
+ );
+ assert_eq!(
+ table_stats
+ .get(&Column::new("col_not_in_schema", usize::MAX))
+ .unwrap(),
+ &GeoStatistics::UNSPECIFIED
+ );
+ }
+
#[test]
fn bounding_box() {
let col_zero = Column::new("foofy", 0);
diff --git a/rust/sedona-expr/src/statistics.rs
b/rust/sedona-expr/src/statistics.rs
index eafa3d1b..08c0dd9c 100644
--- a/rust/sedona-expr/src/statistics.rs
+++ b/rust/sedona-expr/src/statistics.rs
@@ -55,21 +55,24 @@ pub struct GeoStatistics {
}
impl GeoStatistics {
+ /// Statistics representing unspecified information
+ pub const UNSPECIFIED: GeoStatistics = Self {
+ bbox: None,
+ geometry_types: None,
+ total_geometries: None,
+ total_size_bytes: None,
+ total_points: None,
+ puntal_count: None,
+ lineal_count: None,
+ polygonal_count: None,
+ collection_count: None,
+ total_envelope_width: None,
+ total_envelope_height: None,
+ };
+
/// Create statistics representing unspecified information
pub fn unspecified() -> Self {
- Self {
- bbox: None,
- geometry_types: None,
- total_geometries: None,
- total_size_bytes: None,
- total_points: None,
- puntal_count: None,
- lineal_count: None,
- polygonal_count: None,
- collection_count: None,
- total_envelope_width: None,
- total_envelope_height: None,
- }
+ Self::UNSPECIFIED.clone()
}
/// Create statistics representing empty information (with zero values
instead of None)
diff --git a/rust/sedona-geoparquet/src/file_opener.rs
b/rust/sedona-geoparquet/src/file_opener.rs
index 183fe0a8..d634b92a 100644
--- a/rust/sedona-geoparquet/src/file_opener.rs
+++ b/rust/sedona-geoparquet/src/file_opener.rs
@@ -30,7 +30,10 @@ use parquet::file::{
metadata::{ParquetMetaData, RowGroupMetaData},
statistics::Statistics,
};
-use sedona_expr::{spatial_filter::SpatialFilter, statistics::GeoStatistics};
+use sedona_expr::{
+ spatial_filter::{SpatialFilter, TableGeoStatistics},
+ statistics::GeoStatistics,
+};
use sedona_geometry::bounding_box::BoundingBox;
use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
@@ -175,8 +178,10 @@ fn filter_access_plan_using_geoparquet_file_metadata(
metadata: &GeoParquetMetadata,
metrics: &GeoParquetFileOpenerMetrics,
) -> Result<()> {
- let table_geo_stats = geoparquet_file_geo_stats(file_schema, metadata)?;
- if !spatial_filter.evaluate(&table_geo_stats) {
+ let column_geo_stats = geoparquet_file_geo_stats(file_schema, metadata)?;
+ let table_geo_stats =
+ TableGeoStatistics::try_from_stats_and_schema(&column_geo_stats,
file_schema)?;
+ if !spatial_filter.evaluate(&table_geo_stats)? {
metrics.files_ranges_spatial_pruned.add(1);
for i in access_plan.row_group_indexes() {
access_plan.skip(i);
@@ -214,11 +219,15 @@ fn filter_access_plan_using_geoparquet_covering(
// Iterate through the row groups
for i in row_group_indices_to_scan {
// Generate row group statistics based on the covering statistics
- let row_group_geo_stats =
+ let row_group_column_geo_stats =
row_group_covering_geo_stats(parquet_metadata.row_group(i),
&covering_specs);
+ let row_group_geo_stats =
TableGeoStatistics::try_from_stats_and_schema(
+ &row_group_column_geo_stats,
+ file_schema,
+ )?;
// Evaluate predicate!
- if !spatial_filter.evaluate(&row_group_geo_stats) {
+ if !spatial_filter.evaluate(&row_group_geo_stats)? {
metrics.row_groups_spatial_pruned.add(1);
access_plan.skip(i);
} else {