This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch sedona-arrow-udf-example
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/sedona-arrow-udf-example by 
this push:
     new a904bebf3a SEDONA-721 Add docs.
a904bebf3a is described below

commit a904bebf3a584235d18b08fdfeeee0a9d5546ccb
Author: pawelkocinski <[email protected]>
AuthorDate: Sun Mar 16 18:58:46 2025 +0100

    SEDONA-721 Add docs.
---
 .../org/apache/sedona/spark/SedonaContext.scala    | 28 +++++++++++++++++++++-
 .../org/apache/sedona/sql/RasterRegistrator.scala  |  7 ------
 .../sql/UDF}/PythonEvalType.scala                  |  2 +-
 .../spark/sql/udf/ExtractSedonaUDFRule.scala}      |  5 ++--
 .../spark/sql/udf/SedonaArrowEvalPython.scala      | 15 ++++++++++++
 .../spark/sql/udf}/SedonaArrowStrategy.scala       |  3 ++-
 .../org/apache/spark/sql/udf}/StrategySuite.scala  |  4 ++--
 .../spark/sql/udf}/TestScalarPandasUDF.scala       |  5 ++--
 8 files changed, 53 insertions(+), 16 deletions(-)

diff --git 
a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala 
b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
index 7cfb8670be..d38ad5e1b6 100644
--- a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
@@ -24,10 +24,12 @@ import org.apache.sedona.sql.RasterRegistrator
 import org.apache.sedona.sql.UDF.Catalog
 import org.apache.sedona.sql.UDT.UdtRegistrator
 import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.sedona_sql.optimization._
 import org.apache.spark.sql.sedona_sql.strategy.join.JoinQueryDetector
 import 
org.apache.spark.sql.sedona_sql.strategy.physical.function.EvalPhysicalFunctionStrategy
-import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.{SQLContext, SparkSession, Strategy}
 
 import scala.annotation.StaticAnnotation
 import scala.util.Try
@@ -50,6 +52,7 @@ object SedonaContext {
 
   /**
    * This is the entry point of the entire Sedona system
+   *
    * @param sparkSession
    * @return
    */
@@ -64,6 +67,28 @@ object SedonaContext {
       sparkSession.experimental.extraStrategies ++= Seq(new 
JoinQueryDetector(sparkSession))
     }
 
+    val sedonaArrowStrategy = Try(
+      Class
+        .forName("org.apache.spark.sql.udf.SedonaArrowStrategy")
+        .getDeclaredConstructor()
+        .newInstance()
+        .asInstanceOf[Strategy])
+
+    val extractSedonaUDFRule =
+      Try(
+        Class
+          .forName("org.apache.spark.sql.udf.ExtractSedonaUDFRule")
+          .getDeclaredConstructor()
+          .newInstance()
+          .asInstanceOf[Rule[LogicalPlan]])
+
+    if (sedonaArrowStrategy.isSuccess && extractSedonaUDFRule.isSuccess) {
+      sparkSession.experimental.extraStrategies =
+        sparkSession.experimental.extraStrategies :+ sedonaArrowStrategy.get
+      sparkSession.experimental.extraOptimizations =
+        sparkSession.experimental.extraOptimizations :+ 
extractSedonaUDFRule.get
+    }
+
     customOptimizationsWithSession(sparkSession).foreach { opt =>
       if (!sparkSession.experimental.extraOptimizations.exists {
           case _: opt.type => true
@@ -95,6 +120,7 @@ object SedonaContext {
    * This method adds the basic Sedona configurations to the SparkSession 
Usually the user does
    * not need to call this method directly This is only needed when the user 
needs to manually
    * configure Sedona
+   *
    * @return
    */
   def builder(): SparkSession.Builder = {
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/sql/RasterRegistrator.scala 
b/spark/common/src/main/scala/org/apache/sedona/sql/RasterRegistrator.scala
index bcacb2ab29..ee7aa8b0be 100644
--- a/spark/common/src/main/scala/org/apache/sedona/sql/RasterRegistrator.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/sql/RasterRegistrator.scala
@@ -22,7 +22,6 @@ import org.apache.sedona.sql.UDF.RasterUdafCatalog
 import 
org.apache.sedona.sql.utils.GeoToolsCoverageAvailability.{gridClassName, 
isGeoToolsAvailable}
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.sedona_sql.UDT.RasterUdtRegistratorWrapper
-import org.apache.spark.sql.sedona_sql.strategies.{ExtractSedonaUDF, 
SedonaArrowStrategy}
 import org.apache.spark.sql.{SparkSession, functions}
 import org.slf4j.{Logger, LoggerFactory}
 
@@ -30,12 +29,6 @@ object RasterRegistrator {
   val logger: Logger = LoggerFactory.getLogger(getClass)
 
   def registerAll(sparkSession: SparkSession): Unit = {
-
-    sparkSession.experimental.extraStrategies =
-      sparkSession.experimental.extraStrategies :+ new SedonaArrowStrategy()
-    sparkSession.experimental.extraOptimizations =
-      sparkSession.experimental.extraOptimizations :+ ExtractSedonaUDF
-
     if (isGeoToolsAvailable) {
       RasterUdtRegistratorWrapper.registerAll(gridClassName)
       sparkSession.udf.register(
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/PythonEvalType.scala
 b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
similarity index 95%
rename from 
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/PythonEvalType.scala
rename to 
spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
index 0a8904edb4..9a4c43679d 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/PythonEvalType.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.spark.sql.sedona_sql.strategies
+package org.apache.sedona.sql.UDF
 
 object PythonEvalType {
   val SQL_SCALAR_SEDONA_UDF = 5200
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/ExtractSedonaUDF.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
similarity index 97%
rename from 
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/ExtractSedonaUDF.scala
rename to 
spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
index be34fa5fcc..e69e13d065 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/ExtractSedonaUDF.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
@@ -16,8 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.spark.sql.sedona_sql.strategies
+package org.apache.spark.sql.udf
 
+import org.apache.sedona.sql.UDF.PythonEvalType
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExpressionSet, PythonUDF}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, 
Subquery}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -25,7 +26,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.PYTHON_UDF
 
 import scala.collection.mutable
 
-object ExtractSedonaUDF extends Rule[LogicalPlan] {
+class ExtractSedonaUDFRule extends Rule[LogicalPlan] {
 
   private def hasScalarPythonUDF(e: Expression): Boolean = {
     e.exists(PythonUDF.isScalarPythonUDF)
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowEvalPython.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowEvalPython.scala
new file mode 100644
index 0000000000..ed2fe073ee
--- /dev/null
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowEvalPython.scala
@@ -0,0 +1,15 @@
+package org.apache.spark.sql.udf
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, PythonUDF}
+import org.apache.spark.sql.catalyst.plans.logical.{BaseEvalPython, 
LogicalPlan}
+
+case class SedonaArrowEvalPython(
+                                  udfs: Seq[PythonUDF],
+                                  resultAttrs: Seq[Attribute],
+                                  child: LogicalPlan,
+                                  evalType: Int)
+  extends BaseEvalPython {
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
SedonaArrowEvalPython =
+    copy(child = newChild)
+}
+
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/SedonaArrowStrategy.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
similarity index 97%
rename from 
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/SedonaArrowStrategy.scala
rename to 
spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
index f5a0d1c95f..34f4d472b0 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategies/SedonaArrowStrategy.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/SedonaArrowStrategy.scala
@@ -16,8 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.spark.sql.sedona_sql.strategies
+package org.apache.spark.sql.udf
 
+import org.apache.sedona.sql.UDF.PythonEvalType
 import org.apache.spark.api.python.ChainedPythonFunctions
 import org.apache.spark.{JobArtifactSet, TaskContext}
 import org.apache.spark.sql.Strategy
diff --git 
a/spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/StrategySuite.scala
 b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
similarity index 94%
rename from 
spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/StrategySuite.scala
rename to 
spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
index 52d8ea8bac..e8ef369660 100644
--- 
a/spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/StrategySuite.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.spark.sql.sedona_sql.strategies
+package org.apache.spark.sql.udf
 
 import org.apache.sedona.spark.SedonaContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.col
-import 
org.apache.spark.sql.sedona_sql.strategies.ScalarUDF.geoPandasScalaFunction
+import org.apache.spark.sql.udf.ScalarUDF.geoPandasScalaFunction
 import org.locationtech.jts.io.WKTReader
 import org.scalatest.funsuite.AnyFunSuite
 import org.scalatest.matchers.should.Matchers
diff --git 
a/spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/TestScalarPandasUDF.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
similarity index 97%
rename from 
spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/TestScalarPandasUDF.scala
rename to 
spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
index 925115b5e8..4fbb11ea76 100644
--- 
a/spark/common/src/test/scala/org/apache/spark/sql/sedona_sql/strategies/TestScalarPandasUDF.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
@@ -16,8 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.spark.sql.sedona_sql.strategies
+package org.apache.spark.sql.udf
 
+import org.apache.sedona.sql.UDF
 import org.apache.spark.TestUtils
 import org.apache.spark.api.python._
 import org.apache.spark.broadcast.Broadcast
@@ -116,6 +117,6 @@ object ScalarUDF {
       broadcastVars = List.empty[Broadcast[PythonBroadcast]].asJava,
       accumulator = null),
     dataType = GeometryUDT,
-    pythonEvalType = PythonEvalType.SQL_SCALAR_SEDONA_UDF,
+    pythonEvalType = UDF.PythonEvalType.SQL_SCALAR_SEDONA_UDF,
     udfDeterministic = true)
 }

Reply via email to