This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch SEDONA-703
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 3844869e7b09ebd5904dad99b595bcf2de8bb07f
Author: Jia Yu <[email protected]>
AuthorDate: Wed Jan 29 22:08:22 2025 -0800

    Initial commit
---
 .../apache/sedona/core/spatialRDD/SpatialRDD.java  |   5 +-
 .../org/apache/sedona/sql/utils/Adapter.scala      |  10 +
 .../sedona_sql/adapters/StructuredAdapter.scala    | 206 +++++++++++++++++++++
 .../sedona/sql/structuredAdapterTestScala.scala    |  92 +++++++++
 4 files changed, 312 insertions(+), 1 deletion(-)

diff --git 
a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java 
b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
index d81b916183..07ec40eeff 100644
--- 
a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
+++ 
b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
@@ -42,6 +42,7 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.types.StructType;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.util.random.SamplingUtils;
 import org.locationtech.jts.geom.Coordinate;
@@ -84,7 +85,9 @@ public class SpatialRDD<T extends Geometry> implements 
Serializable {
   /** The raw spatial RDD. */
   public JavaRDD<T> rawSpatialRDD;
 
-  public List<String> fieldNames;
+  @Deprecated public List<String> fieldNames;
+
+  public StructType schema;
   /** The CR stransformation. */
   protected boolean CRStransformation = false;
   /** The source epsg code. */
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala 
b/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
index 9b1067a25a..bd366f3f65 100644
--- a/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
@@ -38,6 +38,7 @@ object Adapter {
    * @param geometryFieldName
    * @return
    */
+  @deprecated("Use StructuredAdapter instead", since = "1.7.1")
   def toSpatialRdd(dataFrame: DataFrame, geometryFieldName: String): 
SpatialRDD[Geometry] = {
     // Delete the field that have geometry
     if (dataFrame.schema.size == 1) {
@@ -60,6 +61,7 @@ object Adapter {
    * @param fieldNames
    * @return
    */
+  @deprecated("Use StructuredAdapter instead", since = "1.7.1")
   def toSpatialRdd(
       dataFrame: DataFrame,
       geometryFieldName: String,
@@ -88,6 +90,7 @@ object Adapter {
    * @param fieldNames
    * @return
    */
+  @deprecated("Use StructuredAdapter instead", since = "1.7.1")
   def toSpatialRdd(
       dataFrame: DataFrame,
       geometryColId: Int,
@@ -107,6 +110,7 @@ object Adapter {
    * @param geometryColId
    * @return
    */
+  @deprecated("Use StructuredAdapter instead", since = "1.7.1")
   def toSpatialRdd(dataFrame: DataFrame, geometryColId: Int): 
SpatialRDD[Geometry] = {
     // Delete the field that have geometry
     if (dataFrame.schema.size == 1) {
@@ -121,6 +125,7 @@ object Adapter {
     }
   }
 
+  @deprecated("Use StructuredAdapter instead", since = "1.7.1")
   def toDf[T <: Geometry](spatialRDD: SpatialRDD[T], sparkSession: 
SparkSession): DataFrame = {
     import scala.jdk.CollectionConverters._
     if (spatialRDD.fieldNames != null)
@@ -128,6 +133,7 @@ object Adapter {
     toDf(spatialRDD = spatialRDD, fieldNames = null, sparkSession = 
sparkSession)
   }
 
+  @deprecated("Use StructuredAdapter instead", since = "1.7.1")
   def toDf[T <: Geometry](
       spatialRDD: SpatialRDD[T],
       fieldNames: Seq[String],
@@ -158,6 +164,7 @@ object Adapter {
    * @return
    *   DataFrame with the specified schema
    */
+  @deprecated("Use StructuredAdapter instead", since = "1.7.1")
   def toDf[T <: Geometry](
       spatialRDD: SpatialRDD[T],
       schema: StructType,
@@ -170,12 +177,14 @@ object Adapter {
     sparkSession.sqlContext.createDataFrame(rdd, schema)
   }
 
+  @deprecated("Use StructuredAdapter instead", since = "1.7.1")
   def toDf(
       spatialPairRDD: JavaPairRDD[Geometry, Geometry],
       sparkSession: SparkSession): DataFrame = {
     toDf(spatialPairRDD, null, null, sparkSession)
   }
 
+  @deprecated("Use StructuredAdapter instead", since = "1.7.1")
   def toDf(
       spatialPairRDD: JavaPairRDD[Geometry, Geometry],
       leftFieldnames: Seq[String],
@@ -218,6 +227,7 @@ object Adapter {
    * @return
    *   Spatial pair RDD as a DataFrame with the desired schema
    */
+  @deprecated("Use StructuredAdapter instead", since = "1.7.1")
   def toDf(
       spatialPairRDD: JavaPairRDD[Geometry, Geometry],
       schema: StructType,
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
new file mode 100644
index 0000000000..a359ae9457
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.adapters
+
+import org.apache.sedona.core.spatialRDD.SpatialRDD
+import org.apache.sedona.sql.utils.GeometrySerializer
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.locationtech.jts.geom.Geometry
+import org.slf4j.{Logger, LoggerFactory}
+
+object StructuredAdapter {
+  val logger: Logger = LoggerFactory.getLogger(getClass)
+
+  /**
+   * Convert RDD[Row] to SpatialRDD. It puts Row as user data of Geometry.
+   * @param rdd
+   * @param geometryFieldName
+   * @return
+   */
+  def toSpatialRdd(rdd: RDD[Row], geometryFieldName: String): 
SpatialRDD[Geometry] = {
+    val spatialRDD = new SpatialRDD[Geometry]
+    spatialRDD.schema = rdd.first().schema
+    spatialRDD.rawSpatialRDD = rdd
+      .map(row => {
+        val geom = row.getAs[Geometry](geometryFieldName)
+        geom.setUserData(row.copy())
+        geom
+      })
+      .toJavaRDD()
+    spatialRDD
+  }
+
+  /**
+   * Convert RDD[Row] to SpatialRDD. It puts Row as user data of Geometry. It 
auto-detects
+   * geometry column if geometryFieldName is not provided. It uses the first 
geometry column in
+   * RDD.
+   * @param rdd
+   * @return
+   */
+  def toSpatialRdd(rdd: RDD[Row]): SpatialRDD[Geometry] = {
+    toSpatialRdd(rdd, firstGeomColName(rdd.first().schema))
+  }
+
+  /**
+   * Convert SpatialRDD to RDD[Row]. It extracts Row from user data of 
Geometry.
+   * @param spatialRDD
+   * @return
+   */
+  def toRowRdd(spatialRDD: SpatialRDD[Geometry]): RDD[Row] = {
+    spatialRDD.rawSpatialRDD.map(geometry => {
+      val row = geometry.getUserData.asInstanceOf[Row]
+      row
+    })
+  }
+
+  /**
+   * Convert DataFrame to SpatialRDD. It puts InternalRow as user data of 
Geometry. It allows only
+   * one geometry column.
+   *
+   * @param dataFrame
+   * @param geometryFieldName
+   */
+  def toSpatialRdd(dataFrame: DataFrame, geometryFieldName: String): 
SpatialRDD[Geometry] = {
+    val spatialRDD = new SpatialRDD[Geometry]
+    spatialRDD.schema = dataFrame.schema
+    val ordinal = spatialRDD.schema.fieldIndex(geometryFieldName)
+    spatialRDD.rawSpatialRDD = dataFrame.queryExecution.toRdd
+      .map(row => {
+        val geom = GeometrySerializer.deserialize(row.getBinary(ordinal))
+        geom.setUserData(row.copy())
+        geom
+      })
+      .toJavaRDD()
+    spatialRDD
+  }
+
+  /**
+   * Convert DataFrame to SpatialRDD. It puts InternalRow as user data of 
Geometry. It
+   * auto-detects geometry column if geometryFieldName is not provided. It 
uses the first geometry
+   * column in DataFrame.
+   * @param dataFrame
+   * @return
+   */
+  def toSpatialRdd(dataFrame: DataFrame): SpatialRDD[Geometry] = {
+    toSpatialRdd(dataFrame, firstGeomColName(dataFrame.schema))
+  }
+
+  /**
+   * Convert SpatialRDD.rawSpatialRdd to DataFrame
+   * @param spatialRDD
+   *   The SpatialRDD to convert. It must have rawSpatialRDD set.
+   * @param sparkSession
+   * @return
+   */
+  def toDf(spatialRDD: SpatialRDD[Geometry], sparkSession: SparkSession): 
DataFrame = {
+    val rowRdd = spatialRDD.rawSpatialRDD.map(geometry => {
+      val row = geometry.getUserData.asInstanceOf[InternalRow]
+      row
+    })
+    sparkSession.internalCreateDataFrame(rowRdd, spatialRDD.schema)
+  }
+
+  /**
+   * Convert SpatialRDD.spatialPartitionedRDD to DataFrame This is useful when 
you want to convert
+   * SpatialRDD after spatial partitioning.
+   * @param spatialRDD
+   *   The SpatialRDD to convert. It must have spatialPartitionedRDD set. You 
must call
+   *   spatialPartitioning method before calling this method.
+   * @param sparkSession
+   * @return
+   */
+  def toSpatialPartitionedDf(
+      spatialRDD: SpatialRDD[Geometry],
+      sparkSession: SparkSession): DataFrame = {
+    if (spatialRDD.spatialPartitionedRDD == null)
+      throw new RuntimeException(
+        "SpatialRDD is not spatially partitioned. Please call 
spatialPartitioning method before calling this method.")
+    logger.warn(
+      "SpatialPartitionedRDD might have duplicate geometries. Please make sure 
you are aware of it.")
+    val rowRdd = spatialRDD.spatialPartitionedRDD.map(geometry => {
+      val row = geometry.getUserData.asInstanceOf[InternalRow]
+      row
+    })
+    sparkSession.internalCreateDataFrame(rowRdd, spatialRDD.schema)
+  }
+
+  /**
+   * Convert JavaPairRDD[Geometry, Geometry] to DataFrame This method is 
useful when you want to
+   * convert the result of spatial join to DataFrame.
+   * @param spatialPairRDD
+   *   The JavaPairRDD to convert.
+   * @param leftSchema
+   *   Schema of the left side.
+   * @param rightSchema
+   *   Schema of the right side.
+   * @param sparkSession
+   * @return
+   */
+  def toDf(
+      spatialPairRDD: JavaPairRDD[Geometry, Geometry],
+      leftSchema: StructType,
+      rightSchema: StructType,
+      sparkSession: SparkSession): DataFrame = {
+    val rowRdd = spatialPairRDD.rdd.map(pair => {
+      val leftRow = 
pair._1.getUserData.asInstanceOf[InternalRow].toSeq(leftSchema)
+      val rightRow = 
pair._2.getUserData.asInstanceOf[InternalRow].toSeq(rightSchema)
+      InternalRow.fromSeq(leftRow ++ rightRow)
+    })
+    sparkSession.internalCreateDataFrame(
+      rowRdd,
+      StructType(leftSchema.fields ++ rightSchema.fields))
+  }
+
+  /**
+   * Convert JavaPairRDD[Geometry, Geometry] to DataFrame This method is 
useful when you want to
+   * convert the result of spatial join to DataFrame.
+   * @param spatialPairRDD
+   *   The JavaPairRDD to convert.
+   * @param originalLeftSpatialRdd
+   *   The original left SpatialRDD involved in the join. It is used to get 
the schema of the left
+   *   side.
+   * @param originalRightSpatialRdd
+   *   The original right SpatialRDD involved in the join. It is used to get 
the schema of the
+   *   right side.
+   * @param sparkSession
+   * @return
+   */
+  def toDf(
+      spatialPairRDD: JavaPairRDD[Geometry, Geometry],
+      originalLeftSpatialRdd: SpatialRDD[Geometry],
+      originalRightSpatialRdd: SpatialRDD[Geometry],
+      sparkSession: SparkSession): DataFrame = {
+    toDf(
+      spatialPairRDD,
+      originalLeftSpatialRdd.schema,
+      originalRightSpatialRdd.schema,
+      sparkSession)
+  }
+
+  private def firstGeomColName(schema: StructType): String = {
+    schema.fields
+      .find(_.dataType.typeName == "geometry")
+      .map(_.name)
+      .getOrElse(throw new IllegalArgumentException("No geometry column found 
in DataFrame"))
+  }
+}
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
 
b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
new file mode 100644
index 0000000000..cda0eaf37b
--- /dev/null
+++ 
b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.sedona.core.enums.{GridType, IndexType}
+import org.apache.sedona.core.spatialOperator.{JoinQuery, SpatialPredicate}
+import org.apache.sedona.core.spatialRDD.CircleRDD
+import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter
+import org.junit.Assert.assertEquals
+import org.scalatest.GivenWhenThen
+
+class structuredAdapterTestScala extends TestBaseScala with GivenWhenThen {
+
+  describe("Structured Adapter") {
+    it("Should convert DataFrame to SpatialRDD and back") {
+      val seq = generateTestData()
+      val geom1 = seq.head._3
+      val dfOrigin = sparkSession.createDataFrame(seq)
+      val rdd = StructuredAdapter.toSpatialRdd(dfOrigin, "_3")
+      assertGeometryEquals(geom1, rdd.rawSpatialRDD.take(1).get(0))
+      val dfConverted = StructuredAdapter.toDf(rdd, sparkSession)
+      intercept[RuntimeException] {
+        StructuredAdapter.toSpatialPartitionedDf(rdd, sparkSession)
+      }
+      assertEquals(seq.size, dfConverted.count())
+    }
+
+    it("Should convert DataFrame to SpatialRDD and back, without specifying 
geometry column") {
+      val seq = generateTestData()
+      val geom1 = seq.head._3
+      val dfOrigin = sparkSession.createDataFrame(seq)
+      val rdd = StructuredAdapter.toSpatialRdd(dfOrigin)
+      assertGeometryEquals(geom1, rdd.rawSpatialRDD.take(1).get(0))
+      val dfConverted = StructuredAdapter.toDf(rdd, sparkSession)
+      intercept[RuntimeException] {
+        StructuredAdapter.toSpatialPartitionedDf(rdd, sparkSession)
+      }
+      assertEquals(seq.size, dfConverted.count())
+    }
+
+    it("Should convert to Rdd and do spatial partitioning") {
+      val seq = generateTestData()
+      val dfOrigin = sparkSession.createDataFrame(seq)
+      val rdd = StructuredAdapter.toSpatialRdd(dfOrigin, "_3")
+      rdd.analyze()
+      rdd.spatialPartitioning(GridType.KDBTREE, 10)
+      val dfConverted = StructuredAdapter.toSpatialPartitionedDf(rdd, 
sparkSession)
+      assertEquals(seq.size, dfConverted.count())
+    }
+
+    it("Should convert a spatial join result back to DataFrame") {
+      val pointRdd =
+        
StructuredAdapter.toSpatialRdd(sparkSession.createDataFrame(generateTestData()))
+      val circleRDD = new CircleRDD(pointRdd, 0.0001)
+      circleRDD.analyze()
+      pointRdd.analyze()
+      circleRDD.spatialPartitioning(GridType.KDBTREE)
+      pointRdd.spatialPartitioning(circleRDD.getPartitioner)
+      circleRDD.buildIndex(IndexType.QUADTREE, true)
+      val pairRdd =
+        JoinQuery.DistanceJoinQueryFlat(pointRdd, circleRDD, true, 
SpatialPredicate.INTERSECTS)
+      val resultDf =
+        StructuredAdapter.toDf(pairRdd, pointRdd.schema, pointRdd.schema, 
sparkSession)
+      assertEquals(pointRdd.rawSpatialRDD.count(), resultDf.count())
+    }
+
+    it("Should convert a SpatialRdd to RowRdd and back") {
+      val seq = generateTestData()
+      val dfOrigin = sparkSession.createDataFrame(seq)
+      val spatialRdd = StructuredAdapter.toSpatialRdd(dfOrigin.rdd)
+      val rowRdd = StructuredAdapter.toRowRdd(spatialRdd)
+      assertEquals(seq.size, 
StructuredAdapter.toSpatialRdd(rowRdd).rawSpatialRDD.count())
+    }
+  }
+
+}

Reply via email to