This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new bfcc14761a [GH-2608] Fix RasterUDT JSON schema serialization for
Delta/Parquet write (#2636)
bfcc14761a is described below
commit bfcc14761aa075c34b0a2533b70d121cfe81670f
Author: Jia Yu <[email protected]>
AuthorDate: Tue Feb 10 02:22:31 2026 -0700
[GH-2608] Fix RasterUDT JSON schema serialization for Delta/Parquet write
(#2636)
---
.../spark/sql/sedona_sql/UDT/RasterUDT.scala | 9 ++++
.../org/apache/sedona/sql/RasterUDTSuite.scala | 58 +++++++++++++++++++++-
2 files changed, 65 insertions(+), 2 deletions(-)
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/RasterUDT.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/RasterUDT.scala
index ee05cd1521..a34b372211 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/RasterUDT.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/RasterUDT.scala
@@ -21,6 +21,8 @@ package org.apache.spark.sql.sedona_sql.UDT
import org.apache.sedona.common.raster.serde.Serde
import org.apache.spark.sql.types.{BinaryType, DataType, UserDefinedType}
import org.geotools.coverage.grid.GridCoverage2D
+import org.json4s.JsonDSL._
+import org.json4s.JsonAST.JValue
class RasterUDT extends UserDefinedType[GridCoverage2D] {
override def sqlType: DataType = BinaryType
@@ -41,6 +43,13 @@ class RasterUDT extends UserDefinedType[GridCoverage2D] {
override def userClass: Class[GridCoverage2D] = classOf[GridCoverage2D]
+ override private[sql] def jsonValue: JValue = {
+ super.jsonValue mapField {
+ case ("class", _) => "class" -> this.getClass.getName.stripSuffix("$")
+ case other: Any => other
+ }
+ }
+
override def equals(other: Any): Boolean = other match {
case _: UserDefinedType[_] => other.isInstanceOf[RasterUDT]
case _ => false
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/RasterUDTSuite.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/RasterUDTSuite.scala
index 45f6696963..ba22561139 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/RasterUDTSuite.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/RasterUDTSuite.scala
@@ -19,9 +19,14 @@
package org.apache.sedona.sql
import org.apache.spark.sql.sedona_sql.UDT.RasterUDT
-import org.scalatest.funspec.AnyFunSpec
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.junit.rules.TemporaryFolder
+import org.scalatest.BeforeAndAfter
+
+class RasterUDTSuite extends TestBaseScala with BeforeAndAfter {
+
+ var tempFolder: TemporaryFolder = new TemporaryFolder
-class RasterUDTSuite extends AnyFunSpec {
describe("RasterUDT Test") {
it("Case object and new instance should be equals") {
assert(RasterUDT == RasterUDT)
@@ -35,5 +40,54 @@ class RasterUDTSuite extends AnyFunSpec {
val udt = new RasterUDT
assert(udt.hashCode() == RasterUDT.hashCode())
}
+
+ it("Should be able to render and parse JSON schema with RasterUDT") {
+ // This reproduces the Delta write bug (#2608):
+ // Delta and Parquet serialize the schema to JSON, then deserialize it.
+ // Without a jsonValue override, the class name includes a '$' suffix
+ // from the Scala case object, causing ClassNotFoundException on read.
+ val rasterDf = sparkSession.sql("SELECT RS_MakeEmptyRaster(1, 10, 10, 0,
0, 1) as raster")
+ assert(
+ DataType
+ .fromJson(rasterDf.schema.json)
+ .asInstanceOf[StructType]
+ .equals(rasterDf.schema))
+ }
+
+ it("Should write and read raster DataFrame in Parquet format") {
+ // Parquet also serializes schema as JSON, triggering the same bug as
Delta.
+ tempFolder.create()
+ val rasterDf = sparkSession.sql("SELECT RS_MakeEmptyRaster(1, 10, 10, 0,
0, 1) as raster")
+
+ rasterDf.write.parquet(tempFolder.getRoot.getPath + "/raster_parquet")
+
+ val readDf =
+ sparkSession.read.parquet(tempFolder.getRoot.getPath +
"/raster_parquet")
+ assert(readDf.schema.fields(0).dataType.isInstanceOf[RasterUDT])
+ assert(readDf.count() == 1)
+ }
+
+ it("RS_Union_Aggr output should write and read Parquet successfully") {
+ // Users reported (#2608) that RS_Union_Aggr output can be written to
Delta/Parquet
+ // while RS_MakeEmptyRaster output cannot. RS_Union_Aggr uses
ExpressionEncoder
+ // which resolves the UDT via UDTRegistration (class name without '$'
suffix),
+ // whereas InferredExpression-based functions use the case object
singleton
+ // whose getClass.getName includes '$'.
+ tempFolder.create()
+ val rasterDf = sparkSession.sql("""SELECT RS_Union_Aggr(raster) as
raster FROM (
+ | SELECT RS_MakeEmptyRaster(1, 10, 10, 0, 0, 1) as raster
+ |)""".stripMargin)
+
+ rasterDf.write.parquet(tempFolder.getRoot.getPath +
"/union_aggr_parquet")
+
+ val readDf =
+ sparkSession.read.parquet(tempFolder.getRoot.getPath +
"/union_aggr_parquet")
+ assert(readDf.schema.fields(0).dataType.isInstanceOf[RasterUDT])
+ assert(readDf.count() == 1)
+ }
+ }
+
+ after {
+ tempFolder.delete()
}
}