This is an automated email from the ASF dual-hosted git repository. jiayu pushed a commit to branch SEDONA-703 in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 3844869e7b09ebd5904dad99b595bcf2de8bb07f Author: Jia Yu <[email protected]> AuthorDate: Wed Jan 29 22:08:22 2025 -0800 Initial commit --- .../apache/sedona/core/spatialRDD/SpatialRDD.java | 5 +- .../org/apache/sedona/sql/utils/Adapter.scala | 10 + .../sedona_sql/adapters/StructuredAdapter.scala | 206 +++++++++++++++++++++ .../sedona/sql/structuredAdapterTestScala.scala | 92 +++++++++ 4 files changed, 312 insertions(+), 1 deletion(-) diff --git a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java index d81b916183..07ec40eeff 100644 --- a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java +++ b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java @@ -42,6 +42,7 @@ import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import org.apache.spark.util.random.SamplingUtils; import org.locationtech.jts.geom.Coordinate; @@ -84,7 +85,9 @@ public class SpatialRDD<T extends Geometry> implements Serializable { /** The raw spatial RDD. */ public JavaRDD<T> rawSpatialRDD; - public List<String> fieldNames; + @Deprecated public List<String> fieldNames; + + public StructType schema; /** The CR stransformation. */ protected boolean CRStransformation = false; /** The source epsg code. */ diff --git a/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala b/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala index 9b1067a25a..bd366f3f65 100644 --- a/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala +++ b/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala @@ -38,6 +38,7 @@ object Adapter { * @param geometryFieldName * @return */ + @deprecated("Use StructuredAdapter instead", since = "1.7.1") def toSpatialRdd(dataFrame: DataFrame, geometryFieldName: String): SpatialRDD[Geometry] = { // Delete the field that have geometry if (dataFrame.schema.size == 1) { @@ -60,6 +61,7 @@ object Adapter { * @param fieldNames * @return */ + @deprecated("Use StructuredAdapter instead", since = "1.7.1") def toSpatialRdd( dataFrame: DataFrame, geometryFieldName: String, @@ -88,6 +90,7 @@ object Adapter { * @param fieldNames * @return */ + @deprecated("Use StructuredAdapter instead", since = "1.7.1") def toSpatialRdd( dataFrame: DataFrame, geometryColId: Int, @@ -107,6 +110,7 @@ object Adapter { * @param geometryColId * @return */ + @deprecated("Use StructuredAdapter instead", since = "1.7.1") def toSpatialRdd(dataFrame: DataFrame, geometryColId: Int): SpatialRDD[Geometry] = { // Delete the field that have geometry if (dataFrame.schema.size == 1) { @@ -121,6 +125,7 @@ object Adapter { } } + @deprecated("Use StructuredAdapter instead", since = "1.7.1") def toDf[T <: Geometry](spatialRDD: SpatialRDD[T], sparkSession: SparkSession): DataFrame = { import scala.jdk.CollectionConverters._ if (spatialRDD.fieldNames != null) @@ -128,6 +133,7 @@ object Adapter { toDf(spatialRDD = spatialRDD, fieldNames = null, sparkSession = sparkSession) } + @deprecated("Use StructuredAdapter instead", since = "1.7.1") def toDf[T <: Geometry]( spatialRDD: SpatialRDD[T], fieldNames: Seq[String], @@ -158,6 +164,7 @@ object Adapter { * @return * DataFrame with the specified schema */ + @deprecated("Use StructuredAdapter instead", since = "1.7.1") def toDf[T <: Geometry]( spatialRDD: SpatialRDD[T], schema: StructType, @@ -170,12 +177,14 @@ object Adapter { sparkSession.sqlContext.createDataFrame(rdd, schema) } + @deprecated("Use StructuredAdapter instead", since = "1.7.1") def toDf( spatialPairRDD: JavaPairRDD[Geometry, Geometry], sparkSession: SparkSession): DataFrame = { toDf(spatialPairRDD, null, null, sparkSession) } + @deprecated("Use StructuredAdapter instead", since = "1.7.1") def toDf( spatialPairRDD: JavaPairRDD[Geometry, Geometry], leftFieldnames: Seq[String], @@ -218,6 +227,7 @@ object Adapter { * @return * Spatial pair RDD as a DataFrame with the desired schema */ + @deprecated("Use StructuredAdapter instead", since = "1.7.1") def toDf( spatialPairRDD: JavaPairRDD[Geometry, Geometry], schema: StructType, diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala new file mode 100644 index 0000000000..a359ae9457 --- /dev/null +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.sedona_sql.adapters + +import org.apache.sedona.core.spatialRDD.SpatialRDD +import org.apache.sedona.sql.utils.GeometrySerializer +import org.apache.spark.api.java.JavaPairRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.locationtech.jts.geom.Geometry +import org.slf4j.{Logger, LoggerFactory} + +object StructuredAdapter { + val logger: Logger = LoggerFactory.getLogger(getClass) + + /** + * Convert RDD[Row] to SpatialRDD. It puts Row as user data of Geometry. + * @param rdd + * @param geometryFieldName + * @return + */ + def toSpatialRdd(rdd: RDD[Row], geometryFieldName: String): SpatialRDD[Geometry] = { + val spatialRDD = new SpatialRDD[Geometry] + spatialRDD.schema = rdd.first().schema + spatialRDD.rawSpatialRDD = rdd + .map(row => { + val geom = row.getAs[Geometry](geometryFieldName) + geom.setUserData(row.copy()) + geom + }) + .toJavaRDD() + spatialRDD + } + + /** + * Convert RDD[Row] to SpatialRDD. It puts Row as user data of Geometry. It auto-detects + * geometry column if geometryFieldName is not provided. It uses the first geometry column in + * RDD. + * @param rdd + * @return + */ + def toSpatialRdd(rdd: RDD[Row]): SpatialRDD[Geometry] = { + toSpatialRdd(rdd, firstGeomColName(rdd.first().schema)) + } + + /** + * Convert SpatialRDD to RDD[Row]. It extracts Row from user data of Geometry. + * @param spatialRDD + * @return + */ + def toRowRdd(spatialRDD: SpatialRDD[Geometry]): RDD[Row] = { + spatialRDD.rawSpatialRDD.map(geometry => { + val row = geometry.getUserData.asInstanceOf[Row] + row + }) + } + + /** + * Convert DataFrame to SpatialRDD. It puts InternalRow as user data of Geometry. It allows only + * one geometry column. + * + * @param dataFrame + * @param geometryFieldName + */ + def toSpatialRdd(dataFrame: DataFrame, geometryFieldName: String): SpatialRDD[Geometry] = { + val spatialRDD = new SpatialRDD[Geometry] + spatialRDD.schema = dataFrame.schema + val ordinal = spatialRDD.schema.fieldIndex(geometryFieldName) + spatialRDD.rawSpatialRDD = dataFrame.queryExecution.toRdd + .map(row => { + val geom = GeometrySerializer.deserialize(row.getBinary(ordinal)) + geom.setUserData(row.copy()) + geom + }) + .toJavaRDD() + spatialRDD + } + + /** + * Convert DataFrame to SpatialRDD. It puts InternalRow as user data of Geometry. It + * auto-detects geometry column if geometryFieldName is not provided. It uses the first geometry + * column in DataFrame. + * @param dataFrame + * @return + */ + def toSpatialRdd(dataFrame: DataFrame): SpatialRDD[Geometry] = { + toSpatialRdd(dataFrame, firstGeomColName(dataFrame.schema)) + } + + /** + * Convert SpatialRDD.rawSpatialRdd to DataFrame + * @param spatialRDD + * The SpatialRDD to convert. It must have rawSpatialRDD set. + * @param sparkSession + * @return + */ + def toDf(spatialRDD: SpatialRDD[Geometry], sparkSession: SparkSession): DataFrame = { + val rowRdd = spatialRDD.rawSpatialRDD.map(geometry => { + val row = geometry.getUserData.asInstanceOf[InternalRow] + row + }) + sparkSession.internalCreateDataFrame(rowRdd, spatialRDD.schema) + } + + /** + * Convert SpatialRDD.spatialPartitionedRDD to DataFrame This is useful when you want to convert + * SpatialRDD after spatial partitioning. + * @param spatialRDD + * The SpatialRDD to convert. It must have spatialPartitionedRDD set. You must call + * spatialPartitioning method before calling this method. + * @param sparkSession + * @return + */ + def toSpatialPartitionedDf( + spatialRDD: SpatialRDD[Geometry], + sparkSession: SparkSession): DataFrame = { + if (spatialRDD.spatialPartitionedRDD == null) + throw new RuntimeException( + "SpatialRDD is not spatially partitioned. Please call spatialPartitioning method before calling this method.") + logger.warn( + "SpatialPartitionedRDD might have duplicate geometries. Please make sure you are aware of it.") + val rowRdd = spatialRDD.spatialPartitionedRDD.map(geometry => { + val row = geometry.getUserData.asInstanceOf[InternalRow] + row + }) + sparkSession.internalCreateDataFrame(rowRdd, spatialRDD.schema) + } + + /** + * Convert JavaPairRDD[Geometry, Geometry] to DataFrame This method is useful when you want to + * convert the result of spatial join to DataFrame. + * @param spatialPairRDD + * The JavaPairRDD to convert. + * @param leftSchema + * Schema of the left side. + * @param rightSchema + * Schema of the right side. + * @param sparkSession + * @return + */ + def toDf( + spatialPairRDD: JavaPairRDD[Geometry, Geometry], + leftSchema: StructType, + rightSchema: StructType, + sparkSession: SparkSession): DataFrame = { + val rowRdd = spatialPairRDD.rdd.map(pair => { + val leftRow = pair._1.getUserData.asInstanceOf[InternalRow].toSeq(leftSchema) + val rightRow = pair._2.getUserData.asInstanceOf[InternalRow].toSeq(rightSchema) + InternalRow.fromSeq(leftRow ++ rightRow) + }) + sparkSession.internalCreateDataFrame( + rowRdd, + StructType(leftSchema.fields ++ rightSchema.fields)) + } + + /** + * Convert JavaPairRDD[Geometry, Geometry] to DataFrame This method is useful when you want to + * convert the result of spatial join to DataFrame. + * @param spatialPairRDD + * The JavaPairRDD to convert. + * @param originalLeftSpatialRdd + * The original left SpatialRDD involved in the join. It is used to get the schema of the left + * side. + * @param originalRightSpatialRdd + * The original right SpatialRDD involved in the join. It is used to get the schema of the + * right side. + * @param sparkSession + * @return + */ + def toDf( + spatialPairRDD: JavaPairRDD[Geometry, Geometry], + originalLeftSpatialRdd: SpatialRDD[Geometry], + originalRightSpatialRdd: SpatialRDD[Geometry], + sparkSession: SparkSession): DataFrame = { + toDf( + spatialPairRDD, + originalLeftSpatialRdd.schema, + originalRightSpatialRdd.schema, + sparkSession) + } + + private def firstGeomColName(schema: StructType): String = { + schema.fields + .find(_.dataType.typeName == "geometry") + .map(_.name) + .getOrElse(throw new IllegalArgumentException("No geometry column found in DataFrame")) + } +} diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala new file mode 100644 index 0000000000..cda0eaf37b --- /dev/null +++ b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.sql + +import org.apache.sedona.core.enums.{GridType, IndexType} +import org.apache.sedona.core.spatialOperator.{JoinQuery, SpatialPredicate} +import org.apache.sedona.core.spatialRDD.CircleRDD +import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter +import org.junit.Assert.assertEquals +import org.scalatest.GivenWhenThen + +class structuredAdapterTestScala extends TestBaseScala with GivenWhenThen { + + describe("Structured Adapter") { + it("Should convert DataFrame to SpatialRDD and back") { + val seq = generateTestData() + val geom1 = seq.head._3 + val dfOrigin = sparkSession.createDataFrame(seq) + val rdd = StructuredAdapter.toSpatialRdd(dfOrigin, "_3") + assertGeometryEquals(geom1, rdd.rawSpatialRDD.take(1).get(0)) + val dfConverted = StructuredAdapter.toDf(rdd, sparkSession) + intercept[RuntimeException] { + StructuredAdapter.toSpatialPartitionedDf(rdd, sparkSession) + } + assertEquals(seq.size, dfConverted.count()) + } + + it("Should convert DataFrame to SpatialRDD and back, without specifying geometry column") { + val seq = generateTestData() + val geom1 = seq.head._3 + val dfOrigin = sparkSession.createDataFrame(seq) + val rdd = StructuredAdapter.toSpatialRdd(dfOrigin) + assertGeometryEquals(geom1, rdd.rawSpatialRDD.take(1).get(0)) + val dfConverted = StructuredAdapter.toDf(rdd, sparkSession) + intercept[RuntimeException] { + StructuredAdapter.toSpatialPartitionedDf(rdd, sparkSession) + } + assertEquals(seq.size, dfConverted.count()) + } + + it("Should convert to Rdd and do spatial partitioning") { + val seq = generateTestData() + val dfOrigin = sparkSession.createDataFrame(seq) + val rdd = StructuredAdapter.toSpatialRdd(dfOrigin, "_3") + rdd.analyze() + rdd.spatialPartitioning(GridType.KDBTREE, 10) + val dfConverted = StructuredAdapter.toSpatialPartitionedDf(rdd, sparkSession) + assertEquals(seq.size, dfConverted.count()) + } + + it("Should convert a spatial join result back to DataFrame") { + val pointRdd = + StructuredAdapter.toSpatialRdd(sparkSession.createDataFrame(generateTestData())) + val circleRDD = new CircleRDD(pointRdd, 0.0001) + circleRDD.analyze() + pointRdd.analyze() + circleRDD.spatialPartitioning(GridType.KDBTREE) + pointRdd.spatialPartitioning(circleRDD.getPartitioner) + circleRDD.buildIndex(IndexType.QUADTREE, true) + val pairRdd = + JoinQuery.DistanceJoinQueryFlat(pointRdd, circleRDD, true, SpatialPredicate.INTERSECTS) + val resultDf = + StructuredAdapter.toDf(pairRdd, pointRdd.schema, pointRdd.schema, sparkSession) + assertEquals(pointRdd.rawSpatialRDD.count(), resultDf.count()) + } + + it("Should convert a SpatialRdd to RowRdd and back") { + val seq = generateTestData() + val dfOrigin = sparkSession.createDataFrame(seq) + val spatialRdd = StructuredAdapter.toSpatialRdd(dfOrigin.rdd) + val rowRdd = StructuredAdapter.toRowRdd(spatialRdd) + assertEquals(seq.size, StructuredAdapter.toSpatialRdd(rowRdd).rawSpatialRDD.count()) + } + } + +}
