This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new bf11a3c70 [SEDONA-670] Fix GeoJSON reader for DBR (#1662)
bf11a3c70 is described below

commit bf11a3c709fcea0f1ee742a35e71615d31e8c035
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Oct 30 15:15:52 2024 +0800

    [SEDONA-670] Fix GeoJSON reader for DBR (#1662)
---
 .../sedona_sql/io/geojson/GeoJSONFileFormat.scala  |  4 +-
 .../sedona_sql/io/geojson/SparkCompatUtil.scala    | 43 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
index b1db6fd0f..6a843d475 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/GeoJSONFileFormat.scala
@@ -151,8 +151,8 @@ class GeoJSONFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
         allowArrayAsStructs = true)
       val dataSource = JsonDataSource(parsedOptions)
 
-      dataSource
-        .readFile(broadcastedHadoopConf.value.value, file, parser, 
actualSchema)
+      SparkCompatUtil
+        .readFile(dataSource, broadcastedHadoopConf.value.value, file, parser, 
actualSchema)
         .map(row => {
           val newRow = GeoJSONUtils.convertGeoJsonToGeometry(row, 
alteredSchema)
           newRow
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala
index 4043dbd3e..8ce7b61ad 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/geojson/SparkCompatUtil.scala
@@ -18,10 +18,14 @@
  */
 package org.apache.spark.sql.sedona_sql.io.geojson
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.json.{JSONOptions, JacksonParser}
 import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.LegacyDateFormat
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.json.JsonDataSource
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
 
 import scala.reflect.runtime.{universe => ru}
@@ -158,4 +162,43 @@ object SparkCompatUtil {
         }
     }
   }
+
+  def readFile(
+      jsonDataSource: JsonDataSource,
+      conf: Configuration,
+      file: PartitionedFile,
+      parser: JacksonParser,
+      schema: StructType): Iterator[InternalRow] = {
+    val readFileMethods =
+      jsonDataSource.getClass.getDeclaredMethods.filter(_.getName == 
"readFile")
+    // Get the number of input arguments of the readFile method
+    readFileMethods.find(_.getParameterCount == 4) match {
+      case Some(readFileMethod) =>
+        // The readFile method defined by open-source Apache Spark:
+        // def readFile(
+        //    conf: Configuration,
+        //    file: PartitionedFile,
+        //    parser: JacksonParser,
+        //    schema: StructType): Iterator[InternalRow]
+        readFileMethod
+          .invoke(jsonDataSource, conf, file, parser, schema)
+          .asInstanceOf[Iterator[InternalRow]]
+      case None =>
+        readFileMethods.find(_.getParameterCount == 5) match {
+          case Some(readFileMethod) =>
+            // The readFile method defined by DBR:
+            // def readFile(
+            //    conf: Configuration,
+            //    file: PartitionedFile,
+            //    parser: JacksonParser,
+            //    schema: StructType,
+            //    badRecordsWriter: Option[BadRecordsWriter]): 
Iterator[InternalRow]
+            readFileMethod
+              .invoke(jsonDataSource, conf, file, parser, schema, None)
+              .asInstanceOf[Iterator[InternalRow]]
+          case None =>
+            throw new Exception("No suitable readFile method found in 
JsonDataSource")
+        }
+    }
+  }
 }

Reply via email to