This is an automated email from the ASF dual-hosted git repository.
imbruced pushed a commit to branch sedona-arrow-udf-example
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/sedona-arrow-udf-example by
this push:
new a904bebf3a SEDONA-721 Add docs.
a904bebf3a is described below
commit a904bebf3a584235d18b08fdfeeee0a9d5546ccb
Author: pawelkocinski <[email protected]>
AuthorDate: Sun Mar 16 18:58:46 2025 +0100
SEDONA-721 Add docs.
---
.../org/apache/sedona/spark/SedonaContext.scala | 28 +++++++++++++++++++++-
.../org/apache/sedona/sql/RasterRegistrator.scala | 7 ------
.../sql/UDF}/PythonEvalType.scala | 2 +-
.../spark/sql/udf/ExtractSedonaUDFRule.scala} | 5 ++--
.../spark/sql/udf/SedonaArrowEvalPython.scala | 15 ++++++++++++
.../spark/sql/udf}/SedonaArrowStrategy.scala | 3 ++-
.../org/apache/spark/sql/udf}/StrategySuite.scala | 4 ++--
.../spark/sql/udf}/TestScalarPandasUDF.scala | 5 ++--
8 files changed, 53 insertions(+), 16 deletions(-)
diff --git
a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
index 7cfb8670be..d38ad5e1b6 100644
--- a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
@@ -24,10 +24,12 @@ import org.apache.sedona.sql.RasterRegistrator
import org.apache.sedona.sql.UDF.Catalog
import org.apache.sedona.sql.UDT.UdtRegistrator
import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.sedona_sql.optimization._
import org.apache.spark.sql.sedona_sql.strategy.join.JoinQueryDetector
import
org.apache.spark.sql.sedona_sql.strategy.physical.function.EvalPhysicalFunctionStrategy
-import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.{SQLContext, SparkSession, Strategy}
import scala.annotation.StaticAnnotation
import scala.util.Try
@@ -50,6 +52,7 @@ object SedonaContext {
/**
* This is the entry point of the entire Sedona system
+ *
* @param sparkSession
* @return
*/
@@ -64,6 +67,28 @@ object SedonaContext {
sparkSession.experimental.extraStrategies ++= Seq(new
JoinQueryDetector(sparkSession))
}
+ val sedonaArrowStrategy = Try(
+ Class
+ .forName("org.apache.spark.sql.udf.SedonaArrowStrategy")
+ .getDeclaredConstructor()
+ .newInstance()
+ .asInstanceOf[Strategy])
+
+ val extractSedonaUDFRule =
+ Try(
+ Class
+ .forName("org.apache.spark.sql.udf.ExtractSedonaUDFRule")
+ .getDeclaredConstructor()
+ .newInstance()
+ .asInstanceOf[Rule[LogicalPlan]])
+
+ if (sedonaArrowStrategy.isSuccess && extractSedonaUDFRule.isSuccess) {
+ sparkSession.experimental.extraStrategies =
+ sparkSession.experimental.extraStrategies :+ sedonaArrowStrategy.get
+ sparkSession.experimental.extraOptimizations =
+ sparkSession.experimental.extraOptimizations :+
extractSedonaUDFRule.get
+ }
+
customOptimizationsWithSession(sparkSession).foreach { opt =>
if (!sparkSession.experimental.extraOptimizations.exists {
case _: opt.type => true
@@ -95,6 +120,7 @@ object SedonaContext {
* This method adds the basic Sedona configurations to the SparkSession
Usually the user does
* not need to call this method directly This is only needed when the user
needs to manually
* configure Sedona
+ *
* @return
*/
def builder(): SparkSession.Builder = {
diff --git
a/spark/common/src/main/scala/org/apache/sedona/sql/RasterRegistrator.scala
b/spark/common/src/main/scala/org/apache/sedona/sql/RasterRegistrator.scala
index bcacb2ab29..ee7aa8b0be 100644
--- a/spark/common/src/main/scala/org/apache/sedona/sql/RasterRegistrator.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/sql/RasterRegistrator.scala
@@ -22,7 +22,6 @@ import org.apache.sedona.sql.UDF.RasterUdafCatalog
import
org.apache.sedona.sql.utils.GeoToolsCoverageAvailability.{gridClassName,
isGeoToolsAvailable}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.sedona_sql.UDT.RasterUdtRegistratorWrapper
-import org.apache.spark.sql.sedona_sql.strategies.{ExtractSedonaUDF,
SedonaArrowStrategy}
import org.apache.spark.sql.{SparkSession, functions}
import org.slf4j.{Logger, LoggerFactory}
@@ -30,12 +29,6 @@ object RasterRegistrator {
val logger: Logger = LoggerFactory.getLogger(getClass)
def registerAll(sparkSession: SparkSession): Unit = {
-
- sparkSession.experimental.extraStrategies =
- sparkSession.experimental.extraStrategies :+ new SedonaArrowStrategy()
- sparkSession.experimental.extraOptimizations =
- sparkSession.experimental.extraOptimizations :+ ExtractSedonaUDF
-
if (isGeoToolsAvailable) {
RasterUdtRegistratorWrapper.registerAll(gridClassName)
sparkSession.udf.register(
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/PythonEvalType.scala
b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
similarity index 95%
rename from
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/PythonEvalType.scala
rename to
spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
index 0a8904edb4..9a4c43679d 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/PythonEvalType.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.spark.sql.sedona_sql.strategies
+package org.apache.sedona.sql.UDF
object PythonEvalType {
val SQL_SCALAR_SEDONA_UDF = 5200
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/ExtractSedonaUDF.scala
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
similarity index 97%
rename from
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/ExtractSedonaUDF.scala
rename to
spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
index be34fa5fcc..e69e13d065 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/ExtractSedonaUDF.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.spark.sql.sedona_sql.strategies
+package org.apache.spark.sql.udf
+import org.apache.sedona.sql.UDF.PythonEvalType
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, ExpressionSet, PythonUDF}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project,
Subquery}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -25,7 +26,7 @@ import
org.apache.spark.sql.catalyst.trees.TreePattern.PYTHON_UDF
import scala.collection.mutable
-object ExtractSedonaUDF extends Rule[LogicalPlan] {
+class ExtractSedonaUDFRule extends Rule[LogicalPlan] {
private def hasScalarPythonUDF(e: Expression): Boolean = {
e.exists(PythonUDF.isScalarPythonUDF)
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowEvalPython.scala
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowEvalPython.scala
new file mode 100644
index 0000000000..ed2fe073ee
--- /dev/null
+++
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowEvalPython.scala
@@ -0,0 +1,15 @@
+package org.apache.spark.sql.udf
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, PythonUDF}
+import org.apache.spark.sql.catalyst.plans.logical.{BaseEvalPython,
LogicalPlan}
+
+case class SedonaArrowEvalPython(
+ udfs: Seq[PythonUDF],
+ resultAttrs: Seq[Attribute],
+ child: LogicalPlan,
+ evalType: Int)
+ extends BaseEvalPython {
+ override protected def withNewChildInternal(newChild: LogicalPlan):
SedonaArrowEvalPython =
+ copy(child = newChild)
+}
+
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/SedonaArrowStrategy.scala
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
similarity index 97%
rename from
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/SedonaArrowStrategy.scala
rename to
spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
index f5a0d1c95f..34f4d472b0 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/SedonaArrowStrategy.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.spark.sql.sedona_sql.strategies
+package org.apache.spark.sql.udf
+import org.apache.sedona.sql.UDF.PythonEvalType
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.{JobArtifactSet, TaskContext}
import org.apache.spark.sql.Strategy
diff --git
a/spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/StrategySuite.scala
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
similarity index 94%
rename from
spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/StrategySuite.scala
rename to
spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
index 52d8ea8bac..e8ef369660 100644
---
a/spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/StrategySuite.scala
+++
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
@@ -16,12 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.spark.sql.sedona_sql.strategies
+package org.apache.spark.sql.udf
import org.apache.sedona.spark.SedonaContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
-import
org.apache.spark.sql.sedona_sql.strategies.ScalarUDF.geoPandasScalaFunction
+import org.apache.spark.sql.udf.ScalarUDF.geoPandasScalaFunction
import org.locationtech.jts.io.WKTReader
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
diff --git
a/spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/TestScalarPandasUDF.scala
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
similarity index 97%
rename from
spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/TestScalarPandasUDF.scala
rename to
spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
index 925115b5e8..4fbb11ea76 100644
---
a/spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/TestScalarPandasUDF.scala
+++
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.spark.sql.sedona_sql.strategies
+package org.apache.spark.sql.udf
+import org.apache.sedona.sql.UDF
import org.apache.spark.TestUtils
import org.apache.spark.api.python._
import org.apache.spark.broadcast.Broadcast
@@ -116,6 +117,6 @@ object ScalarUDF {
broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
accumulator = null),
dataType = GeometryUDT,
- pythonEvalType = PythonEvalType.SQL_SCALAR_SEDONA_UDF,
+ pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF,
udfDeterministic = true)
}