This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 09b95b4804 [GH-2240] Fix write and read nested geometry array using 
vectorized parquet reader  (#2359)
09b95b4804 is described below

commit 09b95b480480137716a6db12b7c05962ef0378b6
Author: Feng Zhang <[email protected]>
AuthorDate: Tue Sep 23 13:45:18 2025 -0700

    [GH-2240] Fix write and read nested geometry array using vectorized parquet 
reader  (#2359)
    
    * Fix write and read nested geometry array using vectorized parquet reader
    
    * remove temp test files
    
    * remove printout in tests
    
    * rename FixNestedUDTInParquetRule
    
    * remove unused file
    
    * add type check cache
    
    * optimize regular parquet reader path
    
    * rename hasHash method
    
    * rename to be more clear
    
    * add more comments
    
    * fix spark 4.0 compilation error
    
    * fix test
    
    * address copilot comments
    
    * revert changes to ParquetColumnVector.java
    
    * add futher tests on read back dataframe
---
 .../org/apache/sedona/spark/SedonaContext.scala    |   2 +
 .../sedona_sql/UDT/TransformNestedUDTParquet.scala | 145 +++++++++++++++++++++
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 101 ++++++++++++++
 3 files changed, 248 insertions(+)

diff --git 
a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala 
b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
index 9619837691..b0e46cf6e9 100644
--- a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
@@ -27,6 +27,7 @@ import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkStrategy
+import org.apache.spark.sql.sedona_sql.UDT.TransformNestedUDTParquet
 import org.apache.spark.sql.sedona_sql.optimization._
 import org.apache.spark.sql.sedona_sql.strategy.join.JoinQueryDetector
 import 
org.apache.spark.sql.sedona_sql.strategy.physical.function.EvalPhysicalFunctionStrategy
@@ -43,6 +44,7 @@ object SedonaContext {
 
   private def customOptimizationsWithSession(sparkSession: SparkSession) =
     Seq(
+      new TransformNestedUDTParquet(sparkSession),
       new SpatialFilterPushDownForGeoParquet(sparkSession),
       new SpatialTemporalFilterPushDownForStacScan(sparkSession))
 
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/TransformNestedUDTParquet.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/TransformNestedUDTParquet.scala
new file mode 100644
index 0000000000..902a16d4b2
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/TransformNestedUDTParquet.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.UDT
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.types._
+
+/**
+ * Catalyst rule that automatically transforms schemas with nested GeometryUDT 
to prevent
+ * SPARK-48942 errors in Parquet reading.
+ *
+ * This rule detects LogicalRelations that use ParquetFileFormat and have 
nested GeometryUDT in
+ * their schema, then transforms the schema to use BinaryType instead.
+ */
+class TransformNestedUDTParquet(spark: SparkSession) extends Rule[LogicalPlan] 
{
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.transformUp {
+      case lr: LogicalRelation
+          if lr.relation.isInstanceOf[HadoopFsRelation] &&
+            lr.relation
+              .asInstanceOf[HadoopFsRelation]
+              .fileFormat
+              .isInstanceOf[ParquetFileFormat] &&
+            hasNestedGeometryUDT(lr.schema) =>
+        val relation = lr.relation.asInstanceOf[HadoopFsRelation]
+
+        // Transform the schema to use BinaryType for nested GeometryUDT
+        val transformedSchema = transformSchemaForNestedUDT(lr.schema)
+
+        // Create new AttributeReferences with transformed data types
+        val transformedAttributes = transformedSchema.fields.zipWithIndex.map {
+          case (field, index) =>
+            val originalAttr = lr.output(index)
+            AttributeReference(field.name, field.dataType, field.nullable, 
field.metadata)(
+              originalAttr.exprId,
+              originalAttr.qualifier)
+        }
+        lr.copy(output = transformedAttributes)
+
+      case other => other
+    }
+  }
+
+  /**
+   * Checks if a schema contains nested GeometryUDT fields, meaning 
GeometryUDT instances that are
+   * inside arrays, maps, or structs (i.e., not at the top level of the 
schema). Top-level
+   * GeometryUDT fields (fields whose type is GeometryUDT directly in the 
StructType) are NOT
+   * considered nested and do not trigger the transformation. This distinction 
is important
+   * because the SPARK-48942 Parquet bug only affects nested UDTs, not 
top-level ones. Therefore,
+   * this method returns true only if a GeometryUDT is found inside a 
container type, ensuring
+   * that only affected fields are transformed.
+   */
+  private def hasNestedGeometryUDT(schema: StructType): Boolean = {
+    schema.fields.exists(field => hasNestedGeometryUDTInType(field.dataType, 
isTopLevel = true))
+  }
+
+  /**
+   * Recursively check if a data type contains nested GeometryUDT.
+   * @param dataType
+   *   the data type to check
+   * @param isTopLevel
+   *   true if this is a top-level field, false if nested inside a container
+   * @return
+   *   true if nested GeometryUDT is found, false otherwise
+   */
+  private def hasNestedGeometryUDTInType(dataType: DataType, isTopLevel: 
Boolean): Boolean = {
+    dataType match {
+      case _: GeometryUDT => !isTopLevel // GeometryUDT is "nested" only if 
NOT at top level
+      case ArrayType(elementType, _) =>
+        hasNestedGeometryUDTInType(elementType, isTopLevel = false)
+      case MapType(keyType, valueType, _) =>
+        hasNestedGeometryUDTInType(keyType, isTopLevel = false) ||
+        hasNestedGeometryUDTInType(valueType, isTopLevel = false)
+      case StructType(fields) =>
+        fields.exists(field => hasNestedGeometryUDTInType(field.dataType, 
isTopLevel = false))
+      case _ => false
+    }
+  }
+
+  /**
+   * Transform a schema to handle nested UDT by processing each field. This 
preserves top-level
+   * GeometryUDT fields while transforming nested ones to BinaryType.
+   */
+  private def transformSchemaForNestedUDT(schema: StructType): StructType = {
+    StructType(schema.fields.map(field =>
+      field.copy(dataType = transformDataType(field.dataType, isTopLevel = 
true))))
+  }
+
+  /**
+   * Recursively transform data types based on nesting level.
+   * @param dataType
+   *   the data type to transform
+   * @param isTopLevel
+   *   true if this is a top-level field (preserves GeometryUDT), false if 
nested (converts
+   *   GeometryUDT to BinaryType)
+   * @return
+   *   transformed data type
+   */
+  private def transformDataType(dataType: DataType, isTopLevel: Boolean): 
DataType = {
+    dataType match {
+      case _: GeometryUDT =>
+        if (isTopLevel) dataType else BinaryType // Preserve at top-level, 
convert if nested
+
+      case ArrayType(elementType, containsNull) =>
+        ArrayType(transformDataType(elementType, isTopLevel = false), 
containsNull)
+
+      case MapType(keyType, valueType, valueContainsNull) =>
+        MapType(
+          transformDataType(keyType, isTopLevel = false),
+          transformDataType(valueType, isTopLevel = false),
+          valueContainsNull)
+
+      case StructType(fields) =>
+        StructType(fields.map(field =>
+          field.copy(dataType = transformDataType(field.dataType, isTopLevel = 
false))))
+
+      case udt: UserDefinedType[_] if !isTopLevel =>
+        transformDataType(udt.sqlType, isTopLevel = false)
+
+      case other => other
+    }
+  }
+}
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 369feeb22a..c3ef8dd89e 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -784,6 +784,107 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
     }
   }
 
+  describe("Fix SPARK-48942 reading parquet with array of structs of UDTs 
workaround") {
+    it("should handle array of struct with geometry UDT") {
+      // This test reproduces the issue described in SPARK-48942
+      // https://issues.apache.org/jira/browse/SPARK-48942
+      // where reading back nested geometry from Parquet with PySpark 3.5 fails
+      val testPath = geoparquetoutputlocation + "/spark_48942_test.parquet"
+
+      // Create DataFrame with array of struct containing geometry
+      val df = sparkSession.sql("""
+        SELECT ARRAY(STRUCT(ST_POINT(1.0, 1.1) AS geometry)) AS 
nested_geom_array
+      """)
+
+      // Write to Parquet
+      df.write.mode("overwrite").format("parquet").save(testPath)
+
+      // The fix allows vectorized reading to handle UDT compatibility properly
+      val readDf = sparkSession.read.format("parquet").load(testPath)
+
+      // Verify the geometry data is correct and accessible
+      val result = readDf.collect()
+      assert(result.length == 1)
+      val nestedArray = result(0).getSeq[Any](0)
+      assert(nestedArray.length == 1)
+
+      // Verify we can perform geometric operations on the read data
+      // This tests that the nested geometry is functionally correct after the 
SPARK-48942 fix
+      readDf.createOrReplaceTempView("nested_geom_test")
+      val extractedGeom = sparkSession.sql("""
+        SELECT nested_geom_array[0].geometry as extracted_geometry
+        FROM nested_geom_test
+      """)
+
+      val extractedResult = extractedGeom.collect()
+      assert(extractedResult.length == 1)
+      assert(extractedResult(0).get(0) != null, "Extracted geometry should not 
be null")
+
+      // Test that we can use the extracted geometry in spatial functions
+      extractedGeom.createOrReplaceTempView("extracted_test")
+      val spatialTest = sparkSession.sql("""
+        SELECT ST_X(extracted_geometry) as x, ST_Y(extracted_geometry) as y
+        FROM extracted_test
+      """)
+
+      val spatialResult = spatialTest.collect()
+      assert(spatialResult.length == 1)
+      assert(spatialResult(0).getDouble(0) == 1.0, "X coordinate should be 
1.0")
+      assert(spatialResult(0).getDouble(1) == 1.1, "Y coordinate should be 
1.1")
+
+      // Test that we can perform more complex spatial operations
+      val spatialOperations = sparkSession.sql("""
+        SELECT
+          ST_AsText(extracted_geometry) as wkt,
+          ST_GeometryType(extracted_geometry) as geom_type
+        FROM extracted_test
+      """)
+
+      val operationResult = spatialOperations.collect()
+      assert(operationResult.length == 1)
+      assert(
+        operationResult(0).getString(0) == "POINT (1 1.1)",
+        "WKT should match original point")
+      assert(operationResult(0).getString(1) == "ST_Point", "Geometry type 
should be ST_Point")
+    }
+
+    it("should reject nested geometry when using GeoParquet format") {
+      // GeoParquet specification requires "Geometry columns MUST be at the 
root of the schema"
+      // Therefore, nested geometry should be rejected
+      val testPath = geoparquetoutputlocation + 
"/spark_48942_geoparquet_test.geoparquet"
+
+      val df = sparkSession.sql("""
+        SELECT ARRAY(STRUCT(ST_POINT(1.0, 1.1) AS geometry)) AS 
nested_geom_array
+      """)
+
+      // Writing nested geometry to GeoParquet should fail according to the 
specification
+      assertThrows[SparkException] {
+        df.write.mode("overwrite").format("geoparquet").save(testPath)
+      }
+    }
+
+    it("should handle deeply nested arrays with geometry UDT") {
+      // Test deeply nested arrays: array of array of struct with geometry
+      val testPath = geoparquetoutputlocation + 
"/spark_48942_deep_nested_test.parquet"
+
+      val df = sparkSession.sql("""
+        SELECT ARRAY(
+          ARRAY(STRUCT(ST_POINT(1.0, 1.1) AS geometry)),
+          ARRAY(STRUCT(ST_POINT(2.0, 2.1) AS geometry))
+        ) AS deeply_nested_geom_array
+      """)
+
+      // Write to Parquet
+      df.write.mode("overwrite").format("parquet").save(testPath)
+
+      // Read back and verify
+      val readDf = sparkSession.read.format("parquet").load(testPath)
+
+      val result = readDf.collect()
+      assert(result.length == 1)
+    }
+  }
+
   def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => 
Unit): Unit = {
     val parquetFiles = new 
File(path).listFiles().filter(_.getName.endsWith(".parquet"))
     parquetFiles.foreach { filePath =>

Reply via email to