This is an automated email from the ASF dual-hosted git repository. paleolimbot pushed a commit to branch branch-0.2.0 in repository https://gitbox.apache.org/repos/asf/sedona-db.git
commit 625816335f6dfc188a0d3e8b747f38f04a6e1aeb Author: Dewey Dunnington <[email protected]> AuthorDate: Sun Nov 30 20:21:38 2025 -0600 fix(rust/sedona-expr): Fix GeoParquet pruning when number of final columns is less than the geometry column index (#385) Co-authored-by: Peter Nguyen <[email protected]> Co-authored-by: Copilot <[email protected]> --- rust/sedona-datasource/src/spec.rs | 10 +- rust/sedona-expr/src/spatial_filter.rs | 294 ++++++++++++++++++++++-------- rust/sedona-expr/src/statistics.rs | 29 +-- rust/sedona-geoparquet/src/file_opener.rs | 19 +- 4 files changed, 255 insertions(+), 97 deletions(-) diff --git a/rust/sedona-datasource/src/spec.rs b/rust/sedona-datasource/src/spec.rs index d9b8f1af..2c2ca31a 100644 --- a/rust/sedona-datasource/src/spec.rs +++ b/rust/sedona-datasource/src/spec.rs @@ -141,7 +141,15 @@ pub struct OpenReaderArgs { /// Filter expressions /// /// Expressions that may be used for pruning. Implementations need not - /// apply these filters. + /// apply these filters to produce a correct result (i.e., DataFusion will + /// evaluate the filters at a later step regardless of how this implementation + /// uses the provided filters). + /// + /// Note that `Column`s in this [PhysicalExpr] are relative to `file_projection`. + /// For example, in a scan with file_projection `[5, 6]` (i.e., DataFusion is only + /// requesting the 6th and 7th columns from the `file_schema` inferred for this object), + /// a `Column { index: 1, ... }` refers to the column at index 6 (i.e., + /// `file_schema.field(file_projection[1])`). pub filters: Vec<Arc<dyn PhysicalExpr>>, } diff --git a/rust/sedona-expr/src/spatial_filter.rs b/rust/sedona-expr/src/spatial_filter.rs index 733cd746..922b8ded 100644 --- a/rust/sedona-expr/src/spatial_filter.rs +++ b/rust/sedona-expr/src/spatial_filter.rs @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use arrow_schema::{DataType, Schema}; use datafusion_common::{DataFusionError, Result, ScalarValue}; @@ -30,7 +30,7 @@ use sedona_geometry::{ bounds::wkb_bounds_xy, interval::{Interval, IntervalTrait}, }; -use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher}; +use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher, schema::SedonaSchema}; use crate::{ statistics::GeoStatistics, @@ -106,19 +106,24 @@ impl SpatialFilter { /// /// In other words, returns false if and only if the expression is guaranteed /// to be false. - pub fn evaluate(&self, table_stats: &[GeoStatistics]) -> bool { + pub fn evaluate(&self, table_stats: &TableGeoStatistics) -> Result<bool> { + self.evaluate_internal(table_stats) + } + + fn evaluate_internal(&self, table_stats: &TableGeoStatistics) -> Result<bool> { match self { - SpatialFilter::Intersects(column, bounds) => { - Self::evaluate_intersects_bbox(&table_stats[column.index()], bounds) - } + SpatialFilter::Intersects(column, bounds) => Ok(Self::evaluate_intersects_bbox( + table_stats.get(column)?, + bounds, + )), SpatialFilter::Covers(column, bounds) => { - Self::evaluate_covers_bbox(&table_stats[column.index()], bounds) + Ok(Self::evaluate_covers_bbox(table_stats.get(column)?, bounds)) } - SpatialFilter::HasZ(column) => Self::evaluate_has_z(&table_stats[column.index()]), + SpatialFilter::HasZ(column) => Ok(Self::evaluate_has_z(table_stats.get(column)?)), SpatialFilter::And(lhs, rhs) => Self::evaluate_and(lhs, rhs, table_stats), SpatialFilter::Or(lhs, rhs) => Self::evaluate_or(lhs, rhs, table_stats), - SpatialFilter::LiteralFalse => false, - SpatialFilter::Unknown => true, + SpatialFilter::LiteralFalse => Ok(false), + SpatialFilter::Unknown => Ok(true), } } @@ -161,16 +166,16 @@ impl SpatialFilter { true } - fn evaluate_and(lhs: &Self, rhs: &Self, table_stats: &[GeoStatistics]) -> bool { - let maybe_lhs = lhs.evaluate(table_stats); - let maybe_rhs = rhs.evaluate(table_stats); - maybe_lhs && maybe_rhs + fn evaluate_and(lhs: &Self, rhs: &Self, table_stats: &TableGeoStatistics) -> Result<bool> { + let maybe_lhs = lhs.evaluate_internal(table_stats)?; + let maybe_rhs = rhs.evaluate_internal(table_stats)?; + Ok(maybe_lhs && maybe_rhs) } - fn evaluate_or(lhs: &Self, rhs: &Self, table_stats: &[GeoStatistics]) -> bool { - let maybe_lhs = lhs.evaluate(table_stats); - let maybe_rhs = rhs.evaluate(table_stats); - maybe_lhs || maybe_rhs + fn evaluate_or(lhs: &Self, rhs: &Self, table_stats: &TableGeoStatistics) -> Result<bool> { + let maybe_lhs = lhs.evaluate_internal(table_stats)?; + let maybe_rhs = rhs.evaluate_internal(table_stats)?; + Ok(maybe_lhs || maybe_rhs) } /// Construct a SpatialPredicate from a [PhysicalExpr] @@ -389,6 +394,81 @@ impl SpatialFilter { } } +/// Table GeoStatistics +/// +/// Enables providing a collection of GeoStatistics to [SpatialFilter::evaluate] +/// such that attempts to access out-of-bounds values results in a readable +/// error. +pub enum TableGeoStatistics { + /// Provide statistics for every Column in the table. These must be + /// [GeoStatistics::unspecified] for non-spatial columns. + /// + /// These are resolved using [Column::index]. + ByPosition(Vec<GeoStatistics>), + + /// Provide statistics for specific named columns. Columns not included + /// are treated as [GeoStatistics::unspecified]. + /// + /// These are resolved using [Column::name]. This may be used for logical + /// expressions (where columns are resolved by name) or as a workaround + /// for physical expressions where the index is relative to a projected + /// schema <https://github.com/apache/sedona-db/issues/389>. + ByName(HashMap<String, GeoStatistics>), +} + +impl TableGeoStatistics { + /// Construct TableGeoStatistics with no columns + pub fn empty() -> Self { + TableGeoStatistics::ByPosition(vec![]) + } + + /// Construct TableGeoStatistics from a slice of all column statistics and a schema + pub fn try_from_stats_and_schema( + column_stats: &[GeoStatistics], + schema: &Schema, + ) -> Result<Self> { + let mut stats_map = HashMap::new(); + for i in schema.geometry_column_indices()? { + stats_map.insert(schema.field(i).name().to_string(), column_stats[i].clone()); + } + Ok(Self::ByName(stats_map)) + } + + /// For a given [Column], obtain [GeoStatistics] + /// + /// This will error if the provided statistics have an index out of bounds. + /// Names that cannot be resolved will be treated as unspecified. + fn get(&self, column: &Column) -> Result<&GeoStatistics> { + match self { + Self::ByPosition(items) => { + if column.index() >= items.len() { + sedona_internal_err!( + "Can't obtain GeoStatistics for column at index {} from schema with {} columns", + column.index(), + items.len() + ) + } else { + Ok(&items[column.index()]) + } + } + Self::ByName(items) => { + if let Some(item) = items.get(column.name()) { + Ok(item) + } else { + Ok(&GeoStatistics::UNSPECIFIED) + } + } + } + } +} + +// Useful for testing (create from a single GeoStatistics) +impl From<GeoStatistics> for TableGeoStatistics { + fn from(value: GeoStatistics) -> Self { + TableGeoStatistics::ByPosition(vec![value]) + } +} + /// Internal utility to help match physical expression types enum ArgRef<'a> { Col(Column), @@ -492,24 +572,28 @@ mod test { ); let bounds = literal_bounds(&literal).unwrap(); - let stats_no_info = [GeoStatistics::unspecified()]; - let stats_intersecting = [ - GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0.5, 1.5), (1.5, 2.5)))) - ]; - let col0 = Column::new("col0", 0); - - assert!(SpatialFilter::Intersects(col0.clone(), bounds.clone()).evaluate(&stats_no_info)); - assert!( - SpatialFilter::Intersects(col0.clone(), bounds.clone()).evaluate(&stats_intersecting) + let stats_no_info = TableGeoStatistics::from(GeoStatistics::unspecified()); + let stats_intersecting = TableGeoStatistics::from( + GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0.5, 1.5), (1.5, 2.5)))), ); + let col0 = Column::new("col0", 0); - let stats_empty_bbox = [GeoStatistics::unspecified() - .with_bbox(Some(BoundingBox::xy(Interval::empty(), Interval::empty())))]; + assert!(SpatialFilter::Intersects(col0.clone(), bounds.clone()) + .evaluate(&stats_no_info) + .unwrap()); + assert!(SpatialFilter::Intersects(col0.clone(), bounds.clone()) + .evaluate(&stats_intersecting) + .unwrap()); - assert!( - !SpatialFilter::Intersects(col0.clone(), bounds.clone()).evaluate(&stats_empty_bbox) + let stats_empty_bbox = TableGeoStatistics::from( + GeoStatistics::unspecified() + .with_bbox(Some(BoundingBox::xy(Interval::empty(), Interval::empty()))), ); + assert!(!SpatialFilter::Intersects(col0.clone(), bounds.clone()) + .evaluate(&stats_empty_bbox) + .unwrap()); + let unrelated_literal = Literal::new(ScalarValue::Null); let err = literal_bounds(&unrelated_literal).unwrap_err(); @@ -527,18 +611,25 @@ mod test { ); let bounds = literal_bounds(&literal).unwrap(); - let stats_no_info = [GeoStatistics::unspecified()]; - let stats_covered = - [GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0, 4), (0, 4))))]; - let stats_not_covered = [ - GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((3.0, 3.0), (5.0, 5.0)))) - ]; + let stats_no_info = TableGeoStatistics::from(GeoStatistics::unspecified()); + let stats_covered = TableGeoStatistics::from( + GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0, 4), (0, 4)))), + ); + let stats_not_covered = TableGeoStatistics::from( + GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((3.0, 3.0), (5.0, 5.0)))), + ); let col0 = Column::new("col0", 0); // Covers should return true when column bbox is fully contained in literal bounds - assert!(SpatialFilter::Covers(col0.clone(), bounds.clone()).evaluate(&stats_no_info)); - assert!(SpatialFilter::Covers(col0.clone(), bounds.clone()).evaluate(&stats_covered)); - assert!(!SpatialFilter::Covers(col0.clone(), bounds.clone()).evaluate(&stats_not_covered)); + assert!(SpatialFilter::Covers(col0.clone(), bounds.clone()) + .evaluate(&stats_no_info) + .unwrap()); + assert!(SpatialFilter::Covers(col0.clone(), bounds.clone()) + .evaluate(&stats_covered) + .unwrap()); + assert!(!SpatialFilter::Covers(col0.clone(), bounds.clone()) + .evaluate(&stats_not_covered) + .unwrap()); } #[test] @@ -546,73 +637,77 @@ mod test { let col0 = Column::new("col0", 0); let has_z = SpatialFilter::HasZ(col0.clone()); - let stats_z_geometry_types = [GeoStatistics::unspecified() - .try_with_str_geometry_types(Some(&["POINT Z"])) - .unwrap()]; - let stats_z_bbox = [ - GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xyzm( - (0, 1), - (2, 3), - Some((4, 5).into()), - None, - ))), - ]; - let stats_no_info = [GeoStatistics::unspecified()]; - - assert!(has_z.evaluate(&stats_z_geometry_types)); - assert!(has_z.evaluate(&stats_z_bbox)); - assert!(has_z.evaluate(&stats_no_info)); - - let stats_no_z_geometry_types = [GeoStatistics::unspecified() - .try_with_str_geometry_types(Some(&["POINT"])) - .unwrap()]; - let stats_no_z_bbox = [ - GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xyzm( - (0, 1), - (2, 3), - Some(Interval::empty()), - None, - ))), - ]; - - assert!(!has_z.evaluate(&stats_no_z_geometry_types)); - assert!(!has_z.evaluate(&stats_no_z_bbox)); + let stats_z_geometry_types = TableGeoStatistics::from( + GeoStatistics::unspecified() + .try_with_str_geometry_types(Some(&["POINT Z"])) + .unwrap(), + ); + let stats_z_bbox = TableGeoStatistics::from(GeoStatistics::unspecified().with_bbox(Some( + BoundingBox::xyzm((0, 1), (2, 3), Some((4, 5).into()), None), + ))); + let stats_no_info = TableGeoStatistics::from(GeoStatistics::unspecified()); + + assert!(has_z.evaluate(&stats_z_geometry_types).unwrap()); + assert!(has_z.evaluate(&stats_z_bbox).unwrap()); + assert!(has_z.evaluate(&stats_no_info).unwrap()); + + let stats_no_z_geometry_types = TableGeoStatistics::from( + GeoStatistics::unspecified() + .try_with_str_geometry_types(Some(&["POINT"])) + .unwrap(), + ); + let stats_no_z_bbox = + TableGeoStatistics::from(GeoStatistics::unspecified().with_bbox(Some( + BoundingBox::xyzm((0, 1), (2, 3), Some(Interval::empty()), None), + ))); + + assert!(!has_z.evaluate(&stats_no_z_geometry_types).unwrap()); + assert!(!has_z.evaluate(&stats_no_z_bbox).unwrap()); } #[test] fn predicate_other() { - assert!(!SpatialFilter::LiteralFalse.evaluate(&[])); - assert!(SpatialFilter::Unknown.evaluate(&[])); + assert!(!SpatialFilter::LiteralFalse + .evaluate(&TableGeoStatistics::empty()) + .unwrap()); + assert!(SpatialFilter::Unknown + .evaluate(&TableGeoStatistics::empty()) + .unwrap()); assert!(SpatialFilter::And( Box::new(SpatialFilter::Unknown), Box::new(SpatialFilter::Unknown) ) - .evaluate(&[])); + .evaluate(&TableGeoStatistics::empty()) + .unwrap()); assert!(!SpatialFilter::And( Box::new(SpatialFilter::Unknown), Box::new(SpatialFilter::LiteralFalse) ) - .evaluate(&[])); + .evaluate(&TableGeoStatistics::empty()) + .unwrap()); assert!(SpatialFilter::Or( Box::new(SpatialFilter::Unknown), Box::new(SpatialFilter::Unknown) ) - .evaluate(&[])); + .evaluate(&TableGeoStatistics::empty()) + .unwrap()); assert!(SpatialFilter::Or( Box::new(SpatialFilter::Unknown), Box::new(SpatialFilter::LiteralFalse) ) - .evaluate(&[])); + .evaluate(&TableGeoStatistics::empty()) + .unwrap()); assert!(!SpatialFilter::Or( Box::new(SpatialFilter::LiteralFalse), Box::new(SpatialFilter::LiteralFalse) ) - .evaluate(&[])); + .evaluate(&TableGeoStatistics::empty()) + .unwrap()); } #[test] @@ -1146,6 +1241,49 @@ mod test { } } + #[test] + fn table_geo_stats_position() { + let column_stats = + GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0.5, 1.5), (1.5, 2.5)))); + let table_stats = TableGeoStatistics::from(column_stats.clone()); + + assert_eq!( + table_stats.get(&Column::new("col0", 0)).unwrap(), + &column_stats + ); + assert!(table_stats.get(&Column::new("col1", 1)).is_err()); + } + + #[test] + fn table_geo_stats_name() { + let geo_stats = + GeoStatistics::unspecified().with_bbox(Some(BoundingBox::xy((0.5, 1.5), (1.5, 2.5)))); + let schema = Schema::new(vec![ + Field::new("col0", DataType::Binary, true), + WKB_GEOMETRY.to_storage_field("col1", true).unwrap(), + ]); + let table_stats = TableGeoStatistics::try_from_stats_and_schema( + &[GeoStatistics::UNSPECIFIED, geo_stats.clone()], + &schema, + ) + .unwrap(); + + assert_eq!( + table_stats.get(&Column::new("col0", usize::MAX)).unwrap(), + &GeoStatistics::UNSPECIFIED + ); + assert_eq!( + table_stats.get(&Column::new("col1", usize::MAX)).unwrap(), + &geo_stats + ); + assert_eq!( + table_stats + .get(&Column::new("col_not_in_schema", usize::MAX)) + .unwrap(), + &GeoStatistics::UNSPECIFIED + ); + } + #[test] fn bounding_box() { let col_zero = Column::new("foofy", 0); diff --git a/rust/sedona-expr/src/statistics.rs b/rust/sedona-expr/src/statistics.rs index eafa3d1b..08c0dd9c 100644 --- a/rust/sedona-expr/src/statistics.rs +++ b/rust/sedona-expr/src/statistics.rs @@ -55,21 +55,24 @@ pub struct GeoStatistics { } impl GeoStatistics { + /// Statistics representing unspecified information + pub const UNSPECIFIED: GeoStatistics = Self { + bbox: None, + geometry_types: None, + total_geometries: None, + total_size_bytes: None, + total_points: None, + puntal_count: None, + lineal_count: None, + polygonal_count: None, + collection_count: None, + total_envelope_width: None, + total_envelope_height: None, + }; + /// Create statistics representing unspecified information pub fn unspecified() -> Self { - Self { - bbox: None, - geometry_types: None, - total_geometries: None, - total_size_bytes: None, - total_points: None, - puntal_count: None, - lineal_count: None, - polygonal_count: None, - collection_count: None, - total_envelope_width: None, - total_envelope_height: None, - } + Self::UNSPECIFIED.clone() } /// Create statistics representing empty information (with zero values instead of None) diff --git a/rust/sedona-geoparquet/src/file_opener.rs b/rust/sedona-geoparquet/src/file_opener.rs index 183fe0a8..d634b92a 100644 --- a/rust/sedona-geoparquet/src/file_opener.rs +++ b/rust/sedona-geoparquet/src/file_opener.rs @@ -30,7 +30,10 @@ use parquet::file::{ metadata::{ParquetMetaData, RowGroupMetaData}, statistics::Statistics, }; -use sedona_expr::{spatial_filter::SpatialFilter, statistics::GeoStatistics}; +use sedona_expr::{ + spatial_filter::{SpatialFilter, TableGeoStatistics}, + statistics::GeoStatistics, +}; use sedona_geometry::bounding_box::BoundingBox; use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher}; @@ -175,8 +178,10 @@ fn filter_access_plan_using_geoparquet_file_metadata( metadata: &GeoParquetMetadata, metrics: &GeoParquetFileOpenerMetrics, ) -> Result<()> { - let table_geo_stats = geoparquet_file_geo_stats(file_schema, metadata)?; - if !spatial_filter.evaluate(&table_geo_stats) { + let column_geo_stats = geoparquet_file_geo_stats(file_schema, metadata)?; + let table_geo_stats = + TableGeoStatistics::try_from_stats_and_schema(&column_geo_stats, file_schema)?; + if !spatial_filter.evaluate(&table_geo_stats)? { metrics.files_ranges_spatial_pruned.add(1); for i in access_plan.row_group_indexes() { access_plan.skip(i); @@ -214,11 +219,15 @@ fn filter_access_plan_using_geoparquet_covering( // Iterate through the row groups for i in row_group_indices_to_scan { // Generate row group statistics based on the covering statistics - let row_group_geo_stats = + let row_group_column_geo_stats = row_group_covering_geo_stats(parquet_metadata.row_group(i), &covering_specs); + let row_group_geo_stats = TableGeoStatistics::try_from_stats_and_schema( + &row_group_column_geo_stats, + file_schema, + )?; // Evaluate predicate! - if !spatial_filter.evaluate(&row_group_geo_stats) { + if !spatial_filter.evaluate(&row_group_geo_stats)? { metrics.row_groups_spatial_pruned.add(1); access_plan.skip(i); } else {
