This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new ba6262a0b8 [SEDONA-741] Make ST_DWithin and ST_DistanceSpheroid
filters to push down (#2214)
ba6262a0b8 is described below
commit ba6262a0b82b5475638140c5d66302691e9ae4f8
Author: Pranav Toggi <[email protected]>
AuthorDate: Sat Aug 2 11:53:48 2025 -0700
[SEDONA-741] Make ST_DWithin and ST_DistanceSpheroid filters to push down
(#2214)
* make ST_DWithin and ST_DistanceSpheroid filters to pushdown
* update buffer inflation to +3%
---
.../SpatialFilterPushDownForGeoParquet.scala | 76 +++++++++++++++++-----
.../sql/GeoParquetSpatialFilterPushDownSuite.scala | 46 ++++++++++++-
.../sql/GeoParquetSpatialFilterPushDownSuite.scala | 46 ++++++++++++-
.../sql/GeoParquetSpatialFilterPushDownSuite.scala | 46 ++++++++++++-
4 files changed, 190 insertions(+), 24 deletions(-)
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala
index 7ef96ac970..b18a74831a 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.sedona_sql.optimization
import org.apache.sedona.common.geometryObjects.Circle
import org.apache.sedona.core.spatialOperator.SpatialPredicate
+import org.apache.sedona.sql.utils.GeometrySerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.And
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -44,17 +45,7 @@ import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetSpatialFilte
import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetSpatialFilter.LeafFilter
import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetSpatialFilter.OrFilter
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
-import org.apache.spark.sql.sedona_sql.expressions.ST_Contains
-import org.apache.spark.sql.sedona_sql.expressions.ST_CoveredBy
-import org.apache.spark.sql.sedona_sql.expressions.ST_Covers
-import org.apache.spark.sql.sedona_sql.expressions.ST_Crosses
-import org.apache.spark.sql.sedona_sql.expressions.ST_Distance
-import org.apache.spark.sql.sedona_sql.expressions.ST_Equals
-import org.apache.spark.sql.sedona_sql.expressions.ST_Intersects
-import org.apache.spark.sql.sedona_sql.expressions.ST_OrderingEquals
-import org.apache.spark.sql.sedona_sql.expressions.ST_Overlaps
-import org.apache.spark.sql.sedona_sql.expressions.ST_Touches
-import org.apache.spark.sql.sedona_sql.expressions.ST_Within
+import org.apache.spark.sql.sedona_sql.expressions.{ST_AsEWKT, ST_Buffer,
ST_Contains, ST_CoveredBy, ST_Covers, ST_Crosses, ST_DWithin, ST_Distance,
ST_DistanceSpheroid, ST_Equals, ST_Intersects, ST_OrderingEquals, ST_Overlaps,
ST_Touches, ST_Within}
import
org.apache.spark.sql.sedona_sql.optimization.ExpressionUtils.splitConjunctivePredicates
import org.apache.spark.sql.types.DoubleType
import org.locationtech.jts.geom.Geometry
@@ -161,17 +152,66 @@ class SpatialFilterPushDownForGeoParquet(sparkSession:
SparkSession) extends Rul
for ((name, value) <- resolveNameAndLiteral(distArgs, pushableColumn))
yield distanceFilter(name, GeometryUDT.deserialize(value),
d.asInstanceOf[Double])
+ case LessThan(ST_DistanceSpheroid(distArgs), Literal(d, DoubleType)) =>
+ for ((name, value) <- resolveNameAndLiteral(distArgs, pushableColumn))
+ yield distanceFilter(
+ name,
+ GeometryUDT.deserialize(value),
+ d.asInstanceOf[Double],
+ useSpheroid = true)
+
+ case LessThanOrEqual(ST_DistanceSpheroid(distArgs), Literal(d,
DoubleType)) =>
+ for ((name, value) <- resolveNameAndLiteral(distArgs, pushableColumn))
+ yield distanceFilter(
+ name,
+ GeometryUDT.deserialize(value),
+ d.asInstanceOf[Double],
+ useSpheroid = true)
+
+ case ST_DWithin(args) if args.length == 3 || args.length == 4 =>
+ val distanceLit = args(2)
+ val useSpheroid = if (args.length == 4) {
+ args(3) match {
+ case Literal(flag: Boolean, _) => flag
+ case _ => false
+ }
+ } else false
+
+ distanceLit match {
+ case Literal(distance: Double, DoubleType) =>
+ resolveNameAndLiteral(args.take(2), pushableColumn).map { case
(name, value) =>
+ distanceFilter(name, GeometryUDT.deserialize(value), distance,
useSpheroid)
+ }
+ case _ => None
+ }
case _ => None
}
}
- private def distanceFilter(name: String, geom: Geometry, distance: Double) =
{
- val queryWindow = geom match {
- case point: Point => new Circle(point, distance)
- case _ =>
- val envelope = geom.getEnvelopeInternal
- envelope.expandBy(distance)
- geom.getFactory.toGeometry(envelope)
+ private def distanceFilter(
+ name: String,
+ geom: Geometry,
+ distance: Double,
+ useSpheroid: Boolean = false): GeoParquetSpatialFilter = {
+ val queryWindow: Geometry = if (useSpheroid) {
+ // Spheroidal buffer
+ // Increase buffer distance by 3% to account for false negatives with
Spheroidal Buffer calculations
+ val distanceLit = Literal(distance * 1.03)
+ val spheroidLit = Literal(true)
+ val geomLit = Literal.create(GeometrySerializer.serialize(geom), new
GeometryUDT())
+
+ val bufferGeometry = {
+ val bufferExpr = ST_Buffer(
+ scala.collection.immutable.Seq(geomLit, distanceLit, spheroidLit))
+ val wkb = bufferExpr.eval().asInstanceOf[Array[Byte]]
+ GeometrySerializer.deserialize(wkb)
+ }
+ bufferGeometry
+ } else {
+ // Euclidean distance
+ val envelope = geom.getEnvelopeInternal
+ envelope.expandBy(distance)
+ geom.getFactory.toGeometry(envelope)
}
LeafFilter(unquote(name), SpatialPredicate.INTERSECTS, queryWindow)
}
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
index a2a257e8f5..7e8e900b2b 100644
---
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
+++
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
@@ -170,12 +170,27 @@ class GeoParquetSpatialFilterPushDownSuite extends
TestBaseScala with TableDrive
Seq.empty)
}
+ it("Push down ST_DWithin") {
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (3 4)'), 1)", Seq(1))
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (0 0)'), 1)",
Seq.empty)
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (0 0)'), 3.9)",
Seq.empty)
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (3 4)'), 1000000,
true)", Seq(0, 1, 3))
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (0 0)'), 5)", Seq(0,
1, 2, 3))
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (-5 -5)'), 1)",
Seq(2))
+ testFilter(
+ "ST_DWithin(geom, ST_GeomFromText('POLYGON ((-16 -16, -16 -14, -14
-14, -14 -16, -16 -16))'), 10)",
+ Seq(2))
+ testFilter("ST_DWithin(ST_GeomFromText('POINT (-5 -5)'), geom, 1)",
Seq(2))
+ testFilter("ST_DWithin(ST_GeomFromText('POINT (-5 -5)'), geom, 1,
false)", Seq(2))
+ testFilter("ST_DWithin(ST_GeomFromText('POINT (0 0)'), geom, 100,
true)", Seq.empty)
+ }
+
forAll(Table("<", "<=")) { op =>
it(s"Push down ST_Distance $op d") {
testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op 1",
Seq.empty)
- testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op 5",
Seq.empty)
+ testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op
3.9", Seq.empty)
testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (3 4)')) $op 1",
Seq(1))
- testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op
7.1", Seq(0, 1, 2, 3))
+ testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op 5",
Seq(0, 1, 2, 3))
testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (-5 -5)')) $op
1", Seq(2))
testFilter(
s"ST_Distance(geom, ST_GeomFromText('POLYGON ((-1 -1, 1 -1, 1 1, -1
1, -1 -1))')) $op 2",
@@ -187,6 +202,33 @@ class GeoParquetSpatialFilterPushDownSuite extends
TestBaseScala with TableDrive
s"ST_Distance(geom, ST_GeomFromText('LINESTRING (17 17, 18 18)'))
$op 1",
Seq(1))
}
+
+ it(s"Push down ST_DistanceSpheroid $op d") {
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (0 0)')) $op 100",
+ Seq.empty)
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (0 0)')) $op 500",
+ Seq.empty)
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (3 4)')) $op
500000",
+ Seq(1))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (0 0)')) $op
7100000",
+ Seq(0, 1, 2, 3))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (-5 -5)')) $op
500000",
+ Seq(2))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POLYGON ((-1 -1, 1 -1,
1 1, -1 1, -1 -1))')) $op 2",
+ Seq.empty)
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POLYGON ((-1 -1, 1 -1,
1 1, -1 1, -1 -1))')) $op 500000",
+ Seq(0, 1, 2, 3))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('LINESTRING (17 17, 18
18)')) $op 500000",
+ Seq(1))
+ }
}
it("Push down And(filters...)") {
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
index a2a257e8f5..7e8e900b2b 100644
---
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
+++
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
@@ -170,12 +170,27 @@ class GeoParquetSpatialFilterPushDownSuite extends
TestBaseScala with TableDrive
Seq.empty)
}
+ it("Push down ST_DWithin") {
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (3 4)'), 1)", Seq(1))
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (0 0)'), 1)",
Seq.empty)
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (0 0)'), 3.9)",
Seq.empty)
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (3 4)'), 1000000,
true)", Seq(0, 1, 3))
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (0 0)'), 5)", Seq(0,
1, 2, 3))
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (-5 -5)'), 1)",
Seq(2))
+ testFilter(
+ "ST_DWithin(geom, ST_GeomFromText('POLYGON ((-16 -16, -16 -14, -14
-14, -14 -16, -16 -16))'), 10)",
+ Seq(2))
+ testFilter("ST_DWithin(ST_GeomFromText('POINT (-5 -5)'), geom, 1)",
Seq(2))
+ testFilter("ST_DWithin(ST_GeomFromText('POINT (-5 -5)'), geom, 1,
false)", Seq(2))
+ testFilter("ST_DWithin(ST_GeomFromText('POINT (0 0)'), geom, 100,
true)", Seq.empty)
+ }
+
forAll(Table("<", "<=")) { op =>
it(s"Push down ST_Distance $op d") {
testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op 1",
Seq.empty)
- testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op 5",
Seq.empty)
+ testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op
3.9", Seq.empty)
testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (3 4)')) $op 1",
Seq(1))
- testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op
7.1", Seq(0, 1, 2, 3))
+ testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op 5",
Seq(0, 1, 2, 3))
testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (-5 -5)')) $op
1", Seq(2))
testFilter(
s"ST_Distance(geom, ST_GeomFromText('POLYGON ((-1 -1, 1 -1, 1 1, -1
1, -1 -1))')) $op 2",
@@ -187,6 +202,33 @@ class GeoParquetSpatialFilterPushDownSuite extends
TestBaseScala with TableDrive
s"ST_Distance(geom, ST_GeomFromText('LINESTRING (17 17, 18 18)'))
$op 1",
Seq(1))
}
+
+ it(s"Push down ST_DistanceSpheroid $op d") {
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (0 0)')) $op 100",
+ Seq.empty)
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (0 0)')) $op 500",
+ Seq.empty)
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (3 4)')) $op
500000",
+ Seq(1))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (0 0)')) $op
7100000",
+ Seq(0, 1, 2, 3))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (-5 -5)')) $op
500000",
+ Seq(2))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POLYGON ((-1 -1, 1 -1,
1 1, -1 1, -1 -1))')) $op 2",
+ Seq.empty)
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POLYGON ((-1 -1, 1 -1,
1 1, -1 1, -1 -1))')) $op 500000",
+ Seq(0, 1, 2, 3))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('LINESTRING (17 17, 18
18)')) $op 500000",
+ Seq(1))
+ }
}
it("Push down And(filters...)") {
diff --git
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
index a2a257e8f5..7e8e900b2b 100644
---
a/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
+++
b/spark/spark-4.0/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala
@@ -170,12 +170,27 @@ class GeoParquetSpatialFilterPushDownSuite extends
TestBaseScala with TableDrive
Seq.empty)
}
+ it("Push down ST_DWithin") {
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (3 4)'), 1)", Seq(1))
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (0 0)'), 1)",
Seq.empty)
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (0 0)'), 3.9)",
Seq.empty)
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (3 4)'), 1000000,
true)", Seq(0, 1, 3))
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (0 0)'), 5)", Seq(0,
1, 2, 3))
+ testFilter("ST_DWithin(geom, ST_GeomFromText('POINT (-5 -5)'), 1)",
Seq(2))
+ testFilter(
+ "ST_DWithin(geom, ST_GeomFromText('POLYGON ((-16 -16, -16 -14, -14
-14, -14 -16, -16 -16))'), 10)",
+ Seq(2))
+ testFilter("ST_DWithin(ST_GeomFromText('POINT (-5 -5)'), geom, 1)",
Seq(2))
+ testFilter("ST_DWithin(ST_GeomFromText('POINT (-5 -5)'), geom, 1,
false)", Seq(2))
+ testFilter("ST_DWithin(ST_GeomFromText('POINT (0 0)'), geom, 100,
true)", Seq.empty)
+ }
+
forAll(Table("<", "<=")) { op =>
it(s"Push down ST_Distance $op d") {
testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op 1",
Seq.empty)
- testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op 5",
Seq.empty)
+ testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op
3.9", Seq.empty)
testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (3 4)')) $op 1",
Seq(1))
- testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op
7.1", Seq(0, 1, 2, 3))
+ testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (0 0)')) $op 5",
Seq(0, 1, 2, 3))
testFilter(s"ST_Distance(geom, ST_GeomFromText('POINT (-5 -5)')) $op
1", Seq(2))
testFilter(
s"ST_Distance(geom, ST_GeomFromText('POLYGON ((-1 -1, 1 -1, 1 1, -1
1, -1 -1))')) $op 2",
@@ -187,6 +202,33 @@ class GeoParquetSpatialFilterPushDownSuite extends
TestBaseScala with TableDrive
s"ST_Distance(geom, ST_GeomFromText('LINESTRING (17 17, 18 18)'))
$op 1",
Seq(1))
}
+
+ it(s"Push down ST_DistanceSpheroid $op d") {
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (0 0)')) $op 100",
+ Seq.empty)
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (0 0)')) $op 500",
+ Seq.empty)
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (3 4)')) $op
500000",
+ Seq(1))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (0 0)')) $op
7100000",
+ Seq(0, 1, 2, 3))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POINT (-5 -5)')) $op
500000",
+ Seq(2))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POLYGON ((-1 -1, 1 -1,
1 1, -1 1, -1 -1))')) $op 2",
+ Seq.empty)
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('POLYGON ((-1 -1, 1 -1,
1 1, -1 1, -1 -1))')) $op 500000",
+ Seq(0, 1, 2, 3))
+ testFilter(
+ s"ST_DistanceSpheroid(geom, ST_GeomFromText('LINESTRING (17 17, 18
18)')) $op 500000",
+ Seq(1))
+ }
}
it("Push down And(filters...)") {