This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 79c1da8375 [SEDONA-703] Introducing the StructuredAdapter class (#1780)
79c1da8375 is described below

commit 79c1da83754ecbbf9b6457876147ca0d35b224bb
Author: Jia Yu <[email protected]>
AuthorDate: Tue Feb 4 09:52:51 2025 -0800

    [SEDONA-703] Introducing the StructuredAdapter class (#1780)
    
    * Initial commit
    
    * Refactor the code according to comments
    
    * Add Python API
    
    * Update docs and fix lints
---
 docs/tutorial/rdd.md                               | 262 +--------------------
 docs/tutorial/sql.md                               | 104 ++------
 python/sedona/register/java_libs.py                |   1 +
 python/sedona/utils/adapter.py                     |  10 +-
 python/sedona/utils/structured_adapter.py          | 126 ++++++++++
 python/tests/sql/test_structured_adapter.py        |  60 +++++
 .../apache/sedona/core/spatialRDD/SpatialRDD.java  |   3 +
 .../org/apache/sedona/sql/utils/Adapter.scala      |   7 +
 .../scala/org/apache/sedona/stats/Weighting.scala  |   2 +-
 .../apache/sedona/stats/clustering/DBSCAN.scala    |   2 +-
 .../outlierDetection/LocalOutlierFactor.scala      |   3 +-
 .../scala/org/apache/sedona/util/DfUtils.scala     |   6 +-
 .../sedona_sql/adapters/StructuredAdapter.scala    | 229 ++++++++++++++++++
 .../sedona/sql/structuredAdapterTestScala.scala    | 110 +++++++++
 14 files changed, 574 insertions(+), 351 deletions(-)

diff --git a/docs/tutorial/rdd.md b/docs/tutorial/rdd.md
index b36dde62a4..fd5a1fcfde 100644
--- a/docs/tutorial/rdd.md
+++ b/docs/tutorial/rdd.md
@@ -32,197 +32,31 @@ Please refer to [Create Sedona 
config](sql.md#create-sedona-config) to create a
 
 Please refer to [Initiate SedonaContext](sql.md#initiate-sedonacontext) to 
initiate a SedonaContext.
 
-## Create a SpatialRDD
+## Create a SpatialRDD from SedonaSQL DataFrame
 
-### Create a typed SpatialRDD
-
-Sedona-core provides three special SpatialRDDs: PointRDD, PolygonRDD, and 
LineStringRDD.
-
-!!!warning
-       Typed SpatialRDD has been deprecated for a long time. We do NOT 
recommend it anymore.
-
-### Create a generic SpatialRDD
-
-A generic SpatialRDD is not typed to a certain geometry type and open to more 
scenarios. It allows an input data file contains mixed types of geometries. For 
instance, a WKT file contains three types geometries ==LineString==, 
==Polygon== and ==MultiPolygon==.
-
-#### From WKT/WKB
-
-Geometries in a WKT and WKB file always occupy a single column no matter how 
many coordinates they have. Sedona provides `WktReader` and `WkbReader` to 
create generic SpatialRDD.
-
-Suppose we have a `checkin.tsv` WKT TSV file at Path `/Download/checkin.tsv` 
as follows:
-
-```
-POINT (-88.331492 32.324142)   hotel
-POINT (-88.175933 32.360763)   gas
-POINT (-88.388954 32.357073)   bar
-POINT (-88.221102 32.35078)    restaurant
-```
-
-This file has two columns and corresponding ==offsets==(Column IDs) are 0, 1. 
Column 0 is the WKT string and Column 1 is the checkin business type.
-
-Use the following code to create a SpatialRDD
+Please refer to [Create a Geometry type 
column](sql.md#create-a-geometry-type-column) to create a Geometry type column. 
Then you can create a SpatialRDD from the DataFrame.
 
 === "Scala"
 
        ```scala
-       val inputLocation = "/Download/checkin.tsv"
-       val wktColumn = 0 // The WKT string starts from Column 0
-       val allowTopologyInvalidGeometries = true // Optional
-       val skipSyntaxInvalidGeometries = false // Optional
-       val spatialRDD = WktReader.readToGeometryRDD(sedona.sparkContext, 
inputLocation, wktColumn, allowTopologyInvalidGeometries, 
skipSyntaxInvalidGeometries)
+       var spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty")
        ```
 
 === "Java"
 
        ```java
-       String inputLocation = "/Download/checkin.tsv"
-       int wktColumn = 0 // The WKT string starts from Column 0
-       boolean allowTopologyInvalidGeometries = true // Optional
-       boolean skipSyntaxInvalidGeometries = false // Optional
-       SpatialRDD spatialRDD = 
WktReader.readToGeometryRDD(sedona.sparkContext, inputLocation, wktColumn, 
allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
+       SpatialRDD spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, 
"usacounty")
        ```
 
 === "Python"
 
        ```python
-       from sedona.core.formatMapper import WktReader
-       from sedona.core.formatMapper import WkbReader
-
-       WktReader.readToGeometryRDD(sc, wkt_geometries_location, 0, True, False)
-
-       WkbReader.readToGeometryRDD(sc, wkb_geometries_location, 0, True, False)
-       ```
-
-#### From GeoJSON
-
-!!!note
-       Reading GeoJSON using SpatialRDD is not recommended. Please use [Sedona 
SQL and DataFrame API](sql.md#load-geojson-data) to read GeoJSON files.
-
-Geometries in GeoJSON is similar to WKT/WKB. However, a GeoJSON file must be 
beaked into multiple lines.
-
-Suppose we have a `polygon.json` GeoJSON file at Path `/Download/polygon.json` 
as follows:
-
-```
-{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "077", 
"TRACTCE": "011501", "BLKGRPCE": "5", "AFFGEOID": "1500000US010770115015", 
"GEOID": "010770115015", "NAME": "5", "LSAD": "BG", "ALAND": 6844991, "AWATER": 
32636 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -87.621765, 
34.873444 ], [ -87.617535, 34.873369 ], [ -87.6123, 34.873337 ], [ -87.604049, 
34.873303 ], [ -87.604033, 34.872316 ], [ -87.60415, 34.867502 ], [ -87.604218, 
34.865687 ], [ -87.604409, 34. [...]
-{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "045", 
"TRACTCE": "021102", "BLKGRPCE": "4", "AFFGEOID": "1500000US010450211024", 
"GEOID": "010450211024", "NAME": "4", "LSAD": "BG", "ALAND": 11360854, 
"AWATER": 0 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 
-85.719017, 31.297901 ], [ -85.715626, 31.305203 ], [ -85.714271, 31.307096 ], 
[ -85.69999, 31.307552 ], [ -85.697419, 31.307951 ], [ -85.675603, 31.31218 ], 
[ -85.672733, 31.312876 ], [ -85.672275, 31.31 [...]
-{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "055", 
"TRACTCE": "001300", "BLKGRPCE": "3", "AFFGEOID": "1500000US010550013003", 
"GEOID": "010550013003", "NAME": "3", "LSAD": "BG", "ALAND": 1378742, "AWATER": 
247387 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -86.000685, 
34.00537 ], [ -85.998837, 34.009768 ], [ -85.998012, 34.010398 ], [ -85.987865, 
34.005426 ], [ -85.986656, 34.004552 ], [ -85.985, 34.002659 ], [ -85.98851, 
34.001502 ], [ -85.987567, 33.9 [...]
-{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "089", 
"TRACTCE": "001700", "BLKGRPCE": "2", "AFFGEOID": "1500000US010890017002", 
"GEOID": "010890017002", "NAME": "2", "LSAD": "BG", "ALAND": 1040641, "AWATER": 
0 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -86.574172, 
34.727375 ], [ -86.562684, 34.727131 ], [ -86.562797, 34.723865 ], [ 
-86.562957, 34.723168 ], [ -86.562336, 34.719766 ], [ -86.557381, 34.719143 ], 
[ -86.557352, 34.718322 ], [ -86.559921, 34.7 [...]
-
-```
-
-Use the following code to create a generic SpatialRDD:
-
-=== "Scala"
-
-       ```scala
-       val inputLocation = "/Download/polygon.json"
-       val allowTopologyInvalidGeometries = true // Optional
-       val skipSyntaxInvalidGeometries = false // Optional
-       val spatialRDD = GeoJsonReader.readToGeometryRDD(sedona.sparkContext, 
inputLocation, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
-       ```
-
-=== "Java"
-
-       ```java
-       String inputLocation = "/Download/polygon.json"
-       boolean allowTopologyInvalidGeometries = true // Optional
-       boolean skipSyntaxInvalidGeometries = false // Optional
-       SpatialRDD spatialRDD = 
GeoJsonReader.readToGeometryRDD(sedona.sparkContext, inputLocation, 
allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
-       ```
-
-=== "Python"
-
-       ```python
-       from sedona.core.formatMapper import GeoJsonReader
-
-       GeoJsonReader.readToGeometryRDD(sc, geo_json_file_location)
-       ```
-
-!!!warning
-       The way that Sedona reads JSON file is different from SparkSQL
-
-#### From Shapefile
-
-=== "Scala"
-
-       ```scala
-       val shapefileInputLocation="/Download/myshapefile"
-       val spatialRDD = ShapefileReader.readToGeometryRDD(sedona.sparkContext, 
shapefileInputLocation)
-       ```
-
-=== "Java"
-
-       ```java
-       String shapefileInputLocation="/Download/myshapefile"
-       SpatialRDD spatialRDD = 
ShapefileReader.readToGeometryRDD(sedona.sparkContext, shapefileInputLocation)
-       ```
-
-=== "Python"
-
-       ```python
-       from sedona.core.formatMapper.shapefileParser import ShapefileReader
-
-       ShapefileReader.readToGeometryRDD(sc, shape_file_location)
-       ```
-
-!!!note
-       The path to the shapefile is the path to the folder that contains the 
.shp file, not the path to the .shp file itself. The file extensions of .shp, 
.shx, .dbf must be in lowercase. Assume you have a shape file called 
==myShapefile==, the path should be `XXX/myShapefile`. The file structure 
should be like this:
+       from sedona.utils.structured_adapter import StructuredAdapter
 
+       spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty")
        ```
-       - shapefile1
-       - shapefile2
-       - myshapefile
-               - myshapefile.shp
-               - myshapefile.shx
-               - myshapefile.dbf
-               - myshapefile...
-               - ...
-       ```
-
-If the file you are reading contains non-ASCII characters you'll need to 
explicitly set the Spark config before initializing the SparkSession, then you 
can use `ShapefileReader.readToGeometryRDD`.
-
-Example:
-
-```scala
-spark.driver.extraJavaOptions  -Dsedona.global.charset=utf8
-spark.executor.extraJavaOptions  -Dsedona.global.charset=utf8
-```
-
-#### From SedonaSQL DataFrame
-
-!!!note
-       More details about SedonaSQL, please read the SedonaSQL tutorial.
-
-To create a generic SpatialRDD from CSV, TSV, WKT, WKB and GeoJSON input 
formats, you can use SedonaSQL.
-
-We use checkin.csv CSV file as the example. You can create a generic 
SpatialRDD using the following steps:
-
-1. Load data in SedonaSQL.
-
-```scala
-var df = sedona.read.format("csv").option("header", 
"false").load(csvPointInputLocation)
-df.createOrReplaceTempView("inputtable")
-```
-
-2. Create a Geometry type column in SedonaSQL
-
-```scala
-var spatialDf = sedona.sql(
-       """
-               |SELECT ST_Point(CAST(inputtable._c0 AS 
Decimal(24,20)),CAST(inputtable._c1 AS Decimal(24,20))) AS checkin
-               |FROM inputtable
-       """.stripMargin)
-```
 
-3. Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD
-
-```scala
-var spatialRDD = Adapter.toSpatialRdd(spatialDf, "checkin")
-```
-
-"checkin" is the name of the geometry column
-
-For WKT/WKB data, please use ==ST_GeomFromWKT / ST_GeomFromWKB == instead.
+"usacounty" is the name of the geometry column. It is an optional parameter. 
If you don't provide it, the first geometry column will be used.
 
 ## Transform the Coordinate Reference System
 
@@ -284,33 +118,6 @@ To convert Coordinate Reference System of an SpatialRDD, 
use the following code:
 
 The details CRS information can be found on [EPSG.io](https://epsg.io/)
 
-## Read other attributes in an SpatialRDD
-
-Each SpatialRDD can carry non-spatial attributes such as price, age and name.
-
-The other attributes are combined together to a string and stored in 
==UserData== field of each geometry.
-
-To retrieve the UserData field, use the following code:
-
-=== "Scala"
-
-       ```scala
-       val rddWithOtherAttributes = 
objectRDD.rawSpatialRDD.rdd.map[String](f=>f.getUserData.asInstanceOf[String])
-       ```
-
-=== "Java"
-
-       ```java
-       SpatialRDD<Geometry> spatialRDD = Adapter.toSpatialRdd(spatialDf, 
"arealandmark");
-       spatialRDD.rawSpatialRDD.map(obj -> {return obj.getUserData();});
-       ```
-
-=== "Python"
-
-       ```python
-       rdd_with_other_attributes = object_rdd.rawSpatialRDD.map(lambda x: 
x.getUserData())
-       ```
-
 ## Write a Spatial Range Query
 
 A spatial range query takes as input a range query window and an SpatialRDD 
and returns all geometries that have specified relationship with the query 
window.
@@ -380,7 +187,7 @@ Assume you now have a SpatialRDD (typed or generic). You 
can use the following c
     consider_boundary_intersection = False  ## Only return gemeotries fully 
covered by the window
     using_index = False
     query_result = RangeQueryRaw.SpatialRangeQuery(spatial_rdd, 
range_query_window, consider_boundary_intersection, using_index)
-    gdf = Adapter.toDf(query_result, spark, ["col1", ..., "coln"])
+    gdf = StructuredAdapter.toDf(query_result, spark, ["col1", ..., "coln"])
     ```
 
 ### Range query window
@@ -872,6 +679,7 @@ The index should be built on either one of two SpatialRDDs. 
In general, you shou
     from sedona.core.SpatialRDD import CircleRDD
     from sedona.core.enums import GridType
     from sedona.core.spatialOperator import JoinQueryRaw
+       from sedona.utils.structured_adapter import StructuredAdapter
 
     object_rdd.analyze()
 
@@ -886,7 +694,7 @@ The index should be built on either one of two SpatialRDDs. 
In general, you shou
 
     result = JoinQueryRaw.DistanceJoinQueryFlat(spatial_rdd, circle_rdd, 
using_index, consider_boundary_intersection)
 
-    gdf = Adapter.toDf(result, ["left_col1", ..., "lefcoln"], ["rightcol1", 
..., "rightcol2"], spark)
+    gdf = StructuredAdapter.toDf(result, ["left_col1", ..., "lefcoln"], 
["rightcol1", ..., "rightcol2"], spark)
     ```
 
 ## Write a Distance Join Query
@@ -972,45 +780,10 @@ The output format of the distance join query is 
[here](#output-format-2).
 
 ## Save to permanent storage
 
-You can always save an SpatialRDD back to some permanent storage such as HDFS 
and Amazon S3. You can save distributed SpatialRDD to WKT, GeoJSON and object 
files.
-
-!!!note
-       Non-spatial attributes such as price, age and name will also be stored 
to permanent storage.
+You can always save an SpatialRDD back to some permanent storage such as HDFS 
and Amazon S3.
 
 ### Save an SpatialRDD (not indexed)
 
-Typed SpatialRDD and generic SpatialRDD can be saved to permanent storage.
-
-#### Save to distributed WKT text file
-
-Use the following code to save an SpatialRDD as a distributed WKT text file:
-
-```scala
-objectRDD.rawSpatialRDD.saveAsTextFile("hdfs://PATH")
-objectRDD.saveAsWKT("hdfs://PATH")
-```
-
-#### Save to distributed WKB text file
-
-Use the following code to save an SpatialRDD as a distributed WKB text file:
-
-```scala
-objectRDD.saveAsWKB("hdfs://PATH")
-```
-
-#### Save to distributed GeoJSON text file
-
-!!!note
-       Saving GeoJSON using SpatialRDD is not recommended. Please use [Sedona 
SQL and DataFrame API](sql.md#save-as-geojson) to write GeoJSON files.
-
-Use the following code to save an SpatialRDD as a distributed GeoJSON text 
file:
-
-```scala
-objectRDD.saveAsGeoJSON("hdfs://PATH")
-```
-
-#### Save to distributed object file
-
 Use the following code to save an SpatialRDD as a distributed object file:
 
 === "Scala/Java"
@@ -1032,8 +805,6 @@ Use the following code to save an SpatialRDD as a 
distributed object file:
 
 Indexed typed SpatialRDD and generic SpatialRDD can be saved to permanent 
storage. However, the indexed SpatialRDD has to be stored as a distributed 
object file.
 
-#### Save to distributed object file
-
 Use the following code to save an SpatialRDD as a distributed object file:
 
 ```
@@ -1046,16 +817,7 @@ A spatial partitioned RDD can be saved to permanent 
storage but Spark is not abl
 
 ### Reload a saved SpatialRDD
 
-You can easily reload an SpatialRDD that has been saved to ==a distributed 
object file==.
-
-#### Load to a typed SpatialRDD
-
-!!!warning
-       Typed SpatialRDD has been deprecated for a long time. We do NOT 
recommend it anymore.
-
-#### Load to a generic SpatialRDD
-
-Use the following code to reload the SpatialRDD:
+You can easily reload an SpatialRDD that has been saved to ==a distributed 
object file==. Use the following code to reload the SpatialRDD:
 
 === "Scala"
 
diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md
index 35730e8c98..ffea04132a 100644
--- a/docs/tutorial/sql.md
+++ b/docs/tutorial/sql.md
@@ -565,10 +565,6 @@ The character encoding of string attributes are inferred 
from the `.cpg` file. I
     df = sedona.read.format("shapefile").option("charset", 
"UTF-8").load("/path/to/shapefile")
     ```
 
-### (Deprecated) Loading Shapefile using SpatialRDD
-
-If you are using Sedona earlier than v`1.7.0`, you can load shapefiles as 
SpatialRDD and converted to DataFrame using Adapter. Please read [Load 
SpatialRDD](rdd.md#create-a-generic-spatialrdd) and [DataFrame <-> 
RDD](#convert-between-dataframe-and-spatialrdd).
-
 ## Load GeoParquet
 
 Since v`1.3.0`, Sedona natively supports loading GeoParquet file. Sedona will 
infer geometry fields using the "geo" metadata in GeoParquet files.
@@ -1594,32 +1590,29 @@ my_postgis_db# alter table my_table alter column geom 
type geometry;
 
 ### DataFrame to SpatialRDD
 
-Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD. 
Please read [Adapter 
Scaladoc](../api/scaladoc/spark/org/apache/sedona/sql/utils/index.html)
+Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD.
 
 === "Scala"
 
        ```scala
-       var spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty")
+       var spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty")
        ```
 
 === "Java"
 
        ```java
-       SpatialRDD spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty")
+       SpatialRDD spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, 
"usacounty")
        ```
 
 === "Python"
 
        ```python
-       from sedona.utils.adapter import Adapter
+       from sedona.utils.structured_adapter import StructuredAdapter
 
-       spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty")
+       spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty")
        ```
 
-"usacounty" is the name of the geometry column
-
-!!!warning
-       Only one Geometry type column is allowed per DataFrame.
+"usacounty" is the name of the geometry column. It is an optional parameter. 
If you don't provide it, the first geometry column will be used.
 
 ### SpatialRDD to DataFrame
 
@@ -1628,109 +1621,42 @@ Use SedonaSQL DataFrame-RDD Adapter to convert a 
DataFrame to an SpatialRDD. Ple
 === "Scala"
 
        ```scala
-       var spatialDf = Adapter.toDf(spatialRDD, sedona)
+       var spatialDf = StructuredAdapter.toDf(spatialRDD, sedona)
        ```
 
 === "Java"
 
        ```java
-       Dataset<Row> spatialDf = Adapter.toDf(spatialRDD, sedona)
+       Dataset<Row> spatialDf = StructuredAdapter.toDf(spatialRDD, sedona)
        ```
 
 === "Python"
 
        ```python
-       from sedona.utils.adapter import Adapter
+       from sedona.utils.adapter import StructuredAdapter
 
-       spatialDf = Adapter.toDf(spatialRDD, sedona)
-       ```
-
-All other attributes such as price and age will be also brought to the 
DataFrame as long as you specify ==carryOtherAttributes== (see [Read other 
attributes in an SpatialRDD](rdd.md#read-other-attributes-in-an-spatialrdd)).
-
-You may also manually specify a schema for the resulting DataFrame in case you 
require different column names or data
-types. Note that string schemas and not all data types are 
supported&mdash;please check the
-[Adapter Scaladoc](../api/javadoc/sql/org/apache/sedona/sql/utils/index.html) 
to confirm what is supported for your use
-case. At least one column for the user data must be provided.
-
-=== "Scala"
-
-       ```scala
-       val schema = StructType(Array(
-         StructField("county", GeometryUDT, nullable = true),
-         StructField("name", StringType, nullable = true),
-         StructField("price", DoubleType, nullable = true),
-         StructField("age", IntegerType, nullable = true)
-       ))
-       val spatialDf = Adapter.toDf(spatialRDD, schema, sedona)
+       spatialDf = StructuredAdapter.toDf(spatialRDD, sedona)
        ```
 
 ### SpatialPairRDD to DataFrame
 
-PairRDD is the result of a spatial join query or distance join query. 
SedonaSQL DataFrame-RDD Adapter can convert the result to a DataFrame. But you 
need to provide the name of other attributes.
+PairRDD is the result of a spatial join query or distance join query. 
SedonaSQL DataFrame-RDD Adapter can convert the result to a DataFrame. But you 
need to provide the schema of the left and right RDDs.
 
 === "Scala"
 
        ```scala
-       var joinResultDf = Adapter.toDf(joinResultPairRDD, 
Seq("left_attribute1", "left_attribute2"), Seq("right_attribute1", 
"right_attribute2"), sedona)
+       var joinResultDf = StructuredAdapter.toDf(joinResultPairRDD, 
leftDf.schema, rightDf.schema, sedona)
        ```
 
 === "Java"
 
        ```java
-       import scala.collection.JavaConverters;
-
-       List leftFields = new ArrayList<>(Arrays.asList("c1", "c2", "c3"));
-       List rightFields = new ArrayList<>(Arrays.asList("c4", "c5", "c6"));
-       Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, 
JavaConverters.asScalaBuffer(leftFields).toSeq(), 
JavaConverters.asScalaBuffer(rightFields).toSeq(), sedona);
+       Dataset joinResultDf = StructuredAdapter.toDf(joinResultPairRDD, 
leftDf.schema, rightDf.schema, sedona);
        ```
-
 === "Python"
 
        ```python
-       from sedona.utils.adapter import Adapter
+       from sedona.utils.adapter import StructuredAdapter
 
-       joinResultDf = Adapter.toDf(jvm_sedona_rdd, ["poi_from_id", 
"poi_from_name"], ["poi_to_id", "poi_to_name"], spark))
-       ```
-or you can use the attribute names directly from the input RDD
-
-=== "Scala"
-
-       ```scala
-       import scala.collection.JavaConversions._
-       var joinResultDf = Adapter.toDf(joinResultPairRDD, leftRdd.fieldNames, 
rightRdd.fieldNames, sedona)
-       ```
-
-=== "Java"
-
-       ```java
-       import scala.collection.JavaConverters;
-       Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, 
JavaConverters.asScalaBuffer(leftRdd.fieldNames).toSeq(), 
JavaConverters.asScalaBuffer(rightRdd.fieldNames).toSeq(), sedona);
-       ```
-=== "Python"
-
-       ```python
-       from sedona.utils.adapter import Adapter
-
-       joinResultDf = Adapter.toDf(result_pair_rdd, leftRdd.fieldNames, 
rightRdd.fieldNames, spark)
-       ```
-
-All other attributes such as price and age will be also brought to the 
DataFrame as long as you specify ==carryOtherAttributes== (see [Read other 
attributes in an SpatialRDD](rdd.md#read-other-attributes-in-an-spatialrdd)).
-
-You may also manually specify a schema for the resulting DataFrame in case you 
require different column names or data
-types. Note that string schemas and not all data types are 
supported&mdash;please check the
-[Adapter Scaladoc](../api/javadoc/sql/org/apache/sedona/sql/utils/index.html) 
to confirm what is supported for your use
-case. Columns for the left and right user data must be provided.
-
-=== "Scala"
-
-       ```scala
-       val schema = StructType(Array(
-         StructField("leftGeometry", GeometryUDT, nullable = true),
-         StructField("name", StringType, nullable = true),
-         StructField("price", DoubleType, nullable = true),
-         StructField("age", IntegerType, nullable = true),
-         StructField("rightGeometry", GeometryUDT, nullable = true),
-         StructField("category", StringType, nullable = true)
-       ))
-       val joinResultDf = Adapter.toDf(joinResultPairRDD, schema, sedona)
+       joinResultDf = StructuredAdapter.pairRddToDf(result_pair_rdd, 
leftDf.schema, rightDf.schema, spark)
        ```
diff --git a/python/sedona/register/java_libs.py 
b/python/sedona/register/java_libs.py
index 931488d917..8d1681d15e 100644
--- a/python/sedona/register/java_libs.py
+++ b/python/sedona/register/java_libs.py
@@ -21,6 +21,7 @@ from enum import Enum
 class SedonaJvmLib(Enum):
     JoinParams = "org.apache.sedona.python.wrapper.adapters.JoinParamsAdapter"
     Adapter = "org.apache.sedona.sql.utils.Adapter"
+    StructuredAdapter = 
"org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter"
     JoinQuery = "org.apache.sedona.core.spatialOperator.JoinQuery"
     KNNQuery = "org.apache.sedona.core.spatialOperator.KNNQuery"
     RangeQuery = "org.apache.sedona.core.spatialOperator.RangeQuery"
diff --git a/python/sedona/utils/adapter.py b/python/sedona/utils/adapter.py
index b692fd2fdb..7b978e7212 100644
--- a/python/sedona/utils/adapter.py
+++ b/python/sedona/utils/adapter.py
@@ -29,16 +29,14 @@ from sedona.utils.meta import MultipleMeta
 class Adapter(metaclass=MultipleMeta):
     """
     Class which allow to convert between Spark DataFrame and SpatialRDD and 
reverse.
+    This class is used to convert between PySpark DataFrame and SpatialRDD. 
Schema
+    is lost during the conversion. This should be used if your data starts as a
+    SpatialRDD and you want to convert it to a DataFrame for further 
processing.
     """
 
     @staticmethod
     def _create_dataframe(jdf, sparkSession: SparkSession) -> DataFrame:
-        if hasattr(sparkSession, "_wrapped"):
-            # In Spark < 3.3, use the _wrapped SQLContext
-            return DataFrame(jdf, sparkSession._wrapped)
-        else:
-            # In Spark >= 3.3, use the session directly
-            return DataFrame(jdf, sparkSession)
+        return DataFrame(jdf, sparkSession)
 
     @classmethod
     def toRdd(cls, dataFrame: DataFrame) -> "JvmSpatialRDD":
diff --git a/python/sedona/utils/structured_adapter.py 
b/python/sedona/utils/structured_adapter.py
new file mode 100644
index 0000000000..bc3b959363
--- /dev/null
+++ b/python/sedona/utils/structured_adapter.py
@@ -0,0 +1,126 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from pyspark.sql import DataFrame, SparkSession
+from pyspark.sql.types import StructType
+
+from sedona.core.SpatialRDD.spatial_rdd import SpatialRDD
+from sedona.core.spatialOperator.rdd import SedonaPairRDD
+
+
+class StructuredAdapter:
+    """
+    Class which allow to convert between Spark DataFrame and SpatialRDD and 
reverse.
+    This class is used to convert between PySpark DataFrame and SpatialRDD. 
Schema
+    is lost during the conversion. This should be used if your data starts as a
+    SpatialRDD and you want to convert it to a DataFrame for further 
processing.
+    """
+
+    @staticmethod
+    def _create_dataframe(jdf, sparkSession: SparkSession) -> DataFrame:
+        return DataFrame(jdf, sparkSession)
+
+    @classmethod
+    def toSpatialRdd(
+        cls, dataFrame: DataFrame, geometryFieldName: str = None
+    ) -> SpatialRDD:
+        """
+        Convert a DataFrame to a SpatialRDD
+        :param dataFrame:
+        :param geometryFieldName:
+        :return:
+        """
+        sc = dataFrame._sc
+        jvm = sc._jvm
+        if geometryFieldName is None:
+            srdd = jvm.StructuredAdapter.toSpatialRdd(dataFrame._jdf)
+        else:
+            srdd = jvm.StructuredAdapter.toSpatialRdd(dataFrame._jdf, 
geometryFieldName)
+
+        spatial_rdd = SpatialRDD(sc)
+        spatial_rdd.set_srdd(srdd)
+
+        return spatial_rdd
+
+    @classmethod
+    def toDf(cls, spatialRDD: SpatialRDD, sparkSession: SparkSession) -> 
DataFrame:
+        """
+        Convert a SpatialRDD to a DataFrame
+        :param spatialRDD:
+        :param sparkSession:
+        :return:
+        """
+        sc = spatialRDD._sc
+        jvm = sc._jvm
+
+        jdf = jvm.StructuredAdapter.toDf(spatialRDD._srdd, 
sparkSession._jsparkSession)
+
+        df = StructuredAdapter._create_dataframe(jdf, sparkSession)
+
+        return df
+
+    @classmethod
+    def toSpatialPartitionedDf(
+        cls, spatialRDD: SpatialRDD, sparkSession: SparkSession
+    ) -> DataFrame:
+        """
+        Convert a SpatialRDD to a DataFrame. This DataFrame will be spatially 
partitioned
+        :param spatialRDD:
+        :param sparkSession:
+        :return:
+        """
+        sc = spatialRDD._sc
+        jvm = sc._jvm
+
+        jdf = jvm.StructuredAdapter.toSpatialPartitionedDf(
+            spatialRDD._srdd, sparkSession._jsparkSession
+        )
+
+        df = StructuredAdapter._create_dataframe(jdf, sparkSession)
+
+        return df
+
+    @classmethod
+    def pairRddToDf(
+        cls,
+        rawPairRDD: SedonaPairRDD,
+        left_schema: StructType,
+        right_schema: StructType,
+        sparkSession: SparkSession,
+    ) -> DataFrame:
+        """
+        Convert a raw pair RDD to a DataFrame. This is useful when you have a 
Spatial join result
+        Args:
+            rawPairRDD:
+            left_schema:
+            right_schema:
+            sparkSession:
+
+        Returns:
+
+        """
+        jvm = sparkSession._jvm
+        left_schema_json = left_schema.json()
+        right_schema_json = right_schema.json()
+        jdf = jvm.StructuredAdapter.toDf(
+            rawPairRDD.jsrdd,
+            left_schema_json,
+            right_schema_json,
+            sparkSession._jsparkSession,
+        )
+        df = StructuredAdapter._create_dataframe(jdf, sparkSession)
+        return df
diff --git a/python/tests/sql/test_structured_adapter.py 
b/python/tests/sql/test_structured_adapter.py
new file mode 100644
index 0000000000..54fcbc44ef
--- /dev/null
+++ b/python/tests/sql/test_structured_adapter.py
@@ -0,0 +1,60 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+from pyspark.sql import DataFrame
+
+from sedona.core.SpatialRDD import CircleRDD
+from sedona.core.enums import GridType
+from sedona.core.spatialOperator import JoinQueryRaw
+from sedona.utils.structured_adapter import StructuredAdapter
+from tests.test_base import TestBase
+
+
+class TestStructuredAdapter(TestBase):
+
+    def test_df_rdd(self):
+        spatial_df: DataFrame = self.spark.sql("select ST_MakePoint(1, 1) as 
geom")
+        srdd = StructuredAdapter.toSpatialRdd(spatial_df, "geom")
+        spatial_df = StructuredAdapter.toDf(srdd, self.spark)
+        assert spatial_df.count() == 1
+
+    def test_spatial_partitioned_df(self):
+        spatial_df: DataFrame = self.spark.sql("select ST_MakePoint(1, 1) as 
geom")
+        srdd = StructuredAdapter.toSpatialRdd(spatial_df, "geom")
+        srdd.analyze()
+        srdd.spatialPartitioning(GridType.KDBTREE, 1)
+        spatial_df = StructuredAdapter.toSpatialPartitionedDf(srdd, self.spark)
+        assert spatial_df.count() == 1
+
+    def test_distance_join_result_to_dataframe(self):
+        spatial_df: DataFrame = self.spark.sql("select ST_MakePoint(1, 1) as 
geom")
+        schema = spatial_df.schema
+        srdd = StructuredAdapter.toSpatialRdd(spatial_df, "geom")
+        srdd.analyze()
+
+        circle_rdd = CircleRDD(srdd, 0.001)
+
+        srdd.spatialPartitioning(GridType.QUADTREE)
+        circle_rdd.spatialPartitioning(srdd.getPartitioner())
+
+        join_result_pair_rdd = JoinQueryRaw.DistanceJoinQueryFlat(
+            srdd, circle_rdd, False, True
+        )
+
+        join_result_df = StructuredAdapter.pairRddToDf(
+            join_result_pair_rdd, schema, schema, self.spark
+        )
+        assert join_result_df.count() == 1
diff --git 
a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java 
b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
index d81b916183..f286dd1588 100644
--- 
a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
+++ 
b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java
@@ -42,6 +42,7 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.types.StructType;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.util.random.SamplingUtils;
 import org.locationtech.jts.geom.Coordinate;
@@ -85,6 +86,8 @@ public class SpatialRDD<T extends Geometry> implements 
Serializable {
   public JavaRDD<T> rawSpatialRDD;
 
   public List<String> fieldNames;
+
+  public StructType schema;
   /** The CR stransformation. */
   protected boolean CRStransformation = false;
   /** The source epsg code. */
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala 
b/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
index 9b1067a25a..96aab1287e 100644
--- a/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala
@@ -29,6 +29,13 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.locationtech.jts.geom.Geometry
 
+/**
+ * Adapter for converting between DataFrame and SpatialRDD. It provides 
methods to convert
+ * DataFrame to SpatialRDD and vice versa. The schema information is lost 
during conversion. It is
+ * different from 
[[org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter]] which does not
+ * lose the schema information during conversion. This should be used if your 
data starts as a
+ * SpatialRDD and you want to convert it to DataFrame.
+ */
 object Adapter {
 
   /**
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala 
b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala
index 41869817e0..7713674261 100644
--- a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala
@@ -79,7 +79,7 @@ object Weighting {
     require(alpha < 0, "Alpha must be less than 0")
 
     val geometryColumn = geometry match {
-      case null => getGeometryColumnName(dataframe)
+      case null => getGeometryColumnName(dataframe.schema)
       case _ =>
         require(
           dataframe.schema.fields.exists(_.name == geometry),
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala 
b/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala
index 11bdb8306e..c75291d971 100644
--- 
a/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala
+++ 
b/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala
@@ -68,7 +68,7 @@ object DBSCAN {
       clusterColumnName: String = "cluster"): DataFrame = {
 
     val geometryCol = geometry match {
-      case null => getGeometryColumnName(dataframe)
+      case null => getGeometryColumnName(dataframe.schema)
       case _ => geometry
     }
 
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/stats/outlierDetection/LocalOutlierFactor.scala
 
b/spark/common/src/main/scala/org/apache/sedona/stats/outlierDetection/LocalOutlierFactor.scala
index 55e682d079..2595a90852 100644
--- 
a/spark/common/src/main/scala/org/apache/sedona/stats/outlierDetection/LocalOutlierFactor.scala
+++ 
b/spark/common/src/main/scala/org/apache/sedona/stats/outlierDetection/LocalOutlierFactor.scala
@@ -73,7 +73,8 @@ object LocalOutlierFactor {
       if (useSphere) ST_DistanceSphere else ST_Distance
     val useSpheroidString = if (useSphere) "True" else "False" // for the SQL 
expression
 
-    val geometryColumn = if (geometry == null) 
getGeometryColumnName(dataframe) else geometry
+    val geometryColumn =
+      if (geometry == null) getGeometryColumnName(dataframe.schema) else 
geometry
 
     val KNNFunction = "ST_KNN"
 
diff --git a/spark/common/src/main/scala/org/apache/sedona/util/DfUtils.scala 
b/spark/common/src/main/scala/org/apache/sedona/util/DfUtils.scala
index 18d4bc0c1a..5b2bea2180 100644
--- a/spark/common/src/main/scala/org/apache/sedona/util/DfUtils.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/util/DfUtils.scala
@@ -18,12 +18,12 @@
  */
 package org.apache.sedona.util
 
-import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types.StructType
 
 object DfUtils {
-  def getGeometryColumnName(dataframe: DataFrame): String = {
-    val geomFields = dataframe.schema.fields.filter(_.dataType == GeometryUDT)
+  def getGeometryColumnName(schema: StructType): String = {
+    val geomFields = schema.fields.filter(_.dataType == GeometryUDT)
 
     if (geomFields.isEmpty)
       throw new IllegalArgumentException(
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
new file mode 100644
index 0000000000..d2ca5f8f09
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.adapters
+
+import org.apache.sedona.core.spatialRDD.SpatialRDD
+import org.apache.sedona.sql.utils.GeometrySerializer
+import org.apache.sedona.util.DfUtils
+import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.locationtech.jts.geom.Geometry
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * Adapter for converting between DataFrame and SpatialRDD. It provides 
methods to convert
+ * DataFrame to SpatialRDD and vice versa without losing schema. It is 
different from
+ * [[org.apache.sedona.sql.utils.Adapter]] which loses the schema information 
during conversion.
+ * This should be used if your data starts as a DataFrame and you want to 
convert it to SpatialRDD
+ */
+object StructuredAdapter {
+  val logger: Logger = LoggerFactory.getLogger(getClass)
+
+  /**
+   * Convert RDD[Row] to SpatialRDD. It puts Row as user data of Geometry.
+   * @param rdd
+   * @param geometryFieldName
+   * @return
+   */
+  def toSpatialRdd(rdd: RDD[Row], geometryFieldName: String): 
SpatialRDD[Geometry] = {
+    val spatialRDD = new SpatialRDD[Geometry]
+    if (rdd.isEmpty()) {
+      spatialRDD.schema = StructType(Seq())
+    } else spatialRDD.schema = rdd.first().schema
+    spatialRDD.rawSpatialRDD = rdd
+      .map(row => {
+        val geom = row.getAs[Geometry](geometryFieldName)
+        geom.setUserData(row.copy())
+        geom
+      })
+    spatialRDD
+  }
+
+  /**
+   * Convert RDD[Row] to SpatialRDD. It puts Row as user data of Geometry. It 
auto-detects
+   * geometry column if geometryFieldName is not provided. It uses the first 
geometry column in
+   * RDD.
+   * @param rdd
+   * @return
+   */
+  def toSpatialRdd(rdd: RDD[Row]): SpatialRDD[Geometry] = {
+    require(rdd.count() > 0, "Input RDD cannot be empty.")
+    toSpatialRdd(rdd, DfUtils.getGeometryColumnName(rdd.first().schema))
+  }
+
+  /**
+   * Convert SpatialRDD to RDD[Row]. It extracts Row from user data of 
Geometry.
+   * @param spatialRDD
+   * @return
+   */
+  def toRowRdd(spatialRDD: SpatialRDD[Geometry]): RDD[Row] = {
+    spatialRDD.rawSpatialRDD.map(geometry => {
+      val row = geometry.getUserData.asInstanceOf[Row]
+      row
+    })
+  }
+
+  /**
+   * Convert DataFrame to SpatialRDD. It puts InternalRow as user data of 
Geometry. It allows only
+   * one geometry column.
+   *
+   * @param dataFrame
+   * @param geometryFieldName
+   */
+  def toSpatialRdd(dataFrame: DataFrame, geometryFieldName: String): 
SpatialRDD[Geometry] = {
+    val spatialRDD = new SpatialRDD[Geometry]
+    spatialRDD.schema = dataFrame.schema
+    val ordinal = spatialRDD.schema.fieldIndex(geometryFieldName)
+    spatialRDD.rawSpatialRDD = dataFrame.queryExecution.toRdd
+      .map(row => {
+        val geom = GeometrySerializer.deserialize(row.getBinary(ordinal))
+        geom.setUserData(row.copy())
+        geom
+      })
+    spatialRDD
+  }
+
+  /**
+   * Convert DataFrame to SpatialRDD. It puts InternalRow as user data of 
Geometry. It
+   * auto-detects geometry column if geometryFieldName is not provided. It 
uses the first geometry
+   * column in DataFrame.
+   * @param dataFrame
+   * @return
+   */
+  def toSpatialRdd(dataFrame: DataFrame): SpatialRDD[Geometry] = {
+    toSpatialRdd(dataFrame, DfUtils.getGeometryColumnName(dataFrame.schema))
+  }
+
+  /**
+   * Convert SpatialRDD.rawSpatialRdd to DataFrame
+   * @param spatialRDD
+   *   The SpatialRDD to convert. It must have rawSpatialRDD set.
+   * @param sparkSession
+   * @return
+   */
+  def toDf(spatialRDD: SpatialRDD[Geometry], sparkSession: SparkSession): 
DataFrame = {
+    val rowRdd = spatialRDD.rawSpatialRDD.map(geometry => {
+      val row = geometry.getUserData.asInstanceOf[InternalRow]
+      row
+    })
+    sparkSession.internalCreateDataFrame(rowRdd, spatialRDD.schema)
+  }
+
+  /**
+   * Convert SpatialRDD.spatialPartitionedRDD to DataFrame This is useful when 
you want to convert
+   * SpatialRDD after spatial partitioning.
+   * @param spatialRDD
+   *   The SpatialRDD to convert. It must have spatialPartitionedRDD set. You 
must call
+   *   spatialPartitioning method before calling this method.
+   * @param sparkSession
+   * @return
+   */
+  def toSpatialPartitionedDf(
+      spatialRDD: SpatialRDD[Geometry],
+      sparkSession: SparkSession): DataFrame = {
+    if (spatialRDD.spatialPartitionedRDD == null)
+      throw new RuntimeException(
+        "SpatialRDD is not spatially partitioned. Please call 
spatialPartitioning method before calling this method.")
+    logger.warn(
+      "SpatialPartitionedRDD might have duplicate geometries. Please make sure 
you are aware of it.")
+    val rowRdd = spatialRDD.spatialPartitionedRDD.map(geometry => {
+      val row = geometry.getUserData.asInstanceOf[InternalRow]
+      row
+    })
+    sparkSession.internalCreateDataFrame(rowRdd, spatialRDD.schema)
+  }
+
+  /**
+   * Convert JavaPairRDD[Geometry, Geometry] to DataFrame This method is 
useful when you want to
+   * convert the result of spatial join to DataFrame.
+   * @param spatialPairRDD
+   *   The JavaPairRDD to convert.
+   * @param leftSchemaJson
+   *   Schema of the left side. In a json format.
+   * @param rightSchemaJson
+   *   Schema of the right side. In a json format.
+   * @param sparkSession
+   * @return
+   */
+  def toDf(
+      spatialPairRDD: JavaPairRDD[Geometry, Geometry],
+      leftSchemaJson: String,
+      rightSchemaJson: String,
+      sparkSession: SparkSession): DataFrame = {
+    val leftSchema = DataType.fromJson(leftSchemaJson).asInstanceOf[StructType]
+    val rightSchema = 
DataType.fromJson(rightSchemaJson).asInstanceOf[StructType]
+    toDf(spatialPairRDD, leftSchema, rightSchema, sparkSession)
+  }
+
+  /**
+   * Convert JavaPairRDD[Geometry, Geometry] to DataFrame This method is 
useful when you want to
+   * convert the result of spatial join to DataFrame.
+   * @param spatialPairRDD
+   *   The JavaPairRDD to convert.
+   * @param leftSchema
+   *   The schema of the left side.
+   * @param rightSchema
+   *   The schema of the right side.
+   * @param sparkSession
+   * @return
+   */
+  def toDf(
+      spatialPairRDD: JavaPairRDD[Geometry, Geometry],
+      leftSchema: StructType,
+      rightSchema: StructType,
+      sparkSession: SparkSession): DataFrame = {
+    val rowRdd = spatialPairRDD.rdd.map(pair => {
+      val leftRow = 
pair._1.getUserData.asInstanceOf[InternalRow].toSeq(leftSchema)
+      val rightRow = 
pair._2.getUserData.asInstanceOf[InternalRow].toSeq(rightSchema)
+      InternalRow.fromSeq(leftRow ++ rightRow)
+    })
+    sparkSession.internalCreateDataFrame(
+      rowRdd,
+      StructType(leftSchema.fields ++ rightSchema.fields))
+  }
+
+  /**
+   * Convert JavaPairRDD[Geometry, Geometry] to DataFrame This method is 
useful when you want to
+   * convert the result of spatial join to DataFrame.
+   * @param spatialPairRDD
+   *   The JavaPairRDD to convert.
+   * @param originalLeftSpatialRdd
+   *   The original left SpatialRDD involved in the join. It is used to get 
the schema of the left
+   *   side.
+   * @param originalRightSpatialRdd
+   *   The original right SpatialRDD involved in the join. It is used to get 
the schema of the
+   *   right side.
+   * @param sparkSession
+   * @return
+   */
+  def toDf(
+      spatialPairRDD: JavaPairRDD[Geometry, Geometry],
+      originalLeftSpatialRdd: SpatialRDD[Geometry],
+      originalRightSpatialRdd: SpatialRDD[Geometry],
+      sparkSession: SparkSession): DataFrame = {
+    toDf(
+      spatialPairRDD,
+      originalLeftSpatialRdd.schema,
+      originalRightSpatialRdd.schema,
+      sparkSession)
+  }
+}
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
 
b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
new file mode 100644
index 0000000000..6938916a6d
--- /dev/null
+++ 
b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.sedona.core.enums.{GridType, IndexType}
+import org.apache.sedona.core.spatialOperator.{JoinQuery, SpatialPredicate}
+import org.apache.sedona.core.spatialRDD.CircleRDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter
+import org.junit.Assert.assertEquals
+import org.scalatest.GivenWhenThen
+
+class structuredAdapterTestScala extends TestBaseScala with GivenWhenThen {
+
+  describe("Structured Adapter") {
+    it("Should convert DataFrame to SpatialRDD and back") {
+      val seq = generateTestData()
+      val geom1 = seq.head._3
+      val dfOrigin = sparkSession.createDataFrame(seq)
+      val rdd = StructuredAdapter.toSpatialRdd(dfOrigin, "_3")
+      assertGeometryEquals(geom1, rdd.rawSpatialRDD.take(1).get(0))
+      val dfConverted = StructuredAdapter.toDf(rdd, sparkSession)
+      intercept[RuntimeException] {
+        StructuredAdapter.toSpatialPartitionedDf(rdd, sparkSession)
+      }
+      assertEquals(seq.size, dfConverted.count())
+    }
+
+    it("Should convert DataFrame to SpatialRDD and back, without specifying 
geometry column") {
+      val seq = generateTestData()
+      val geom1 = seq.head._3
+      val dfOrigin = sparkSession.createDataFrame(seq)
+      val rdd = StructuredAdapter.toSpatialRdd(dfOrigin)
+      assertGeometryEquals(geom1, rdd.rawSpatialRDD.take(1).get(0))
+      val dfConverted = StructuredAdapter.toDf(rdd, sparkSession)
+      intercept[RuntimeException] {
+        StructuredAdapter.toSpatialPartitionedDf(rdd, sparkSession)
+      }
+      assertEquals(seq.size, dfConverted.count())
+    }
+
+    it("Should convert to Rdd and do spatial partitioning") {
+      val seq = generateTestData()
+      val dfOrigin = sparkSession.createDataFrame(seq)
+      val rdd = StructuredAdapter.toSpatialRdd(dfOrigin, "_3")
+      rdd.analyze()
+      rdd.spatialPartitioning(GridType.KDBTREE, 10)
+      val dfConverted = StructuredAdapter.toSpatialPartitionedDf(rdd, 
sparkSession)
+      assertEquals(seq.size, dfConverted.count())
+    }
+
+    it("Should convert a spatial join result back to DataFrame") {
+      val pointRdd =
+        
StructuredAdapter.toSpatialRdd(sparkSession.createDataFrame(generateTestData()))
+      val circleRDD = new CircleRDD(pointRdd, 0.0001)
+      circleRDD.analyze()
+      pointRdd.analyze()
+      circleRDD.spatialPartitioning(GridType.KDBTREE)
+      pointRdd.spatialPartitioning(circleRDD.getPartitioner)
+      circleRDD.buildIndex(IndexType.QUADTREE, true)
+      val pairRdd =
+        JoinQuery.DistanceJoinQueryFlat(pointRdd, circleRDD, true, 
SpatialPredicate.INTERSECTS)
+      var resultDf =
+        StructuredAdapter.toDf(pairRdd, pointRdd.schema, pointRdd.schema, 
sparkSession)
+      assertEquals(pointRdd.rawSpatialRDD.count(), resultDf.count())
+      resultDf =
+        StructuredAdapter.toDf(pairRdd, pointRdd.schema.json, 
pointRdd.schema.json, sparkSession)
+      assertEquals(pointRdd.rawSpatialRDD.count(), resultDf.count())
+    }
+
+    it("Should convert a SpatialRdd to RowRdd and back") {
+      val seq = generateTestData()
+      val dfOrigin = sparkSession.createDataFrame(seq)
+      val spatialRdd = StructuredAdapter.toSpatialRdd(dfOrigin.rdd)
+      val rowRdd = StructuredAdapter.toRowRdd(spatialRdd)
+      assertEquals(seq.size, 
StructuredAdapter.toSpatialRdd(rowRdd).rawSpatialRDD.count())
+    }
+
+    it("Should not be able to convert an empty Row RDD to SpatialRDD if schema 
is not provided") {
+      val rdd = sparkSession.sparkContext.parallelize(Seq.empty[Row])
+      intercept[IllegalArgumentException] {
+        StructuredAdapter.toSpatialRdd(rdd)
+      }
+    }
+
+    it("Should convert an empty Row RDD to SpatialRDD if schema is provided") {
+      val rdd = sparkSession.sparkContext.parallelize(Seq.empty[Row])
+      val spatialRdd = StructuredAdapter.toSpatialRdd(rdd, null)
+      assertEquals(0, spatialRdd.rawSpatialRDD.count())
+      assertEquals(0, spatialRdd.schema.size)
+    }
+  }
+
+}

Reply via email to