This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e0fb9fde9 [SEDONA-699] Fix issue with not closing parquet files. 
(#1749)
0e0fb9fde9 is described below

commit 0e0fb9fde90eba99d85f8a392280f4706f9d44aa
Author: PaweÅ‚ Tokaj <[email protected]>
AuthorDate: Fri Jan 17 05:54:42 2025 +0100

    [SEDONA-699] Fix issue with not closing parquet files. (#1749)
    
    * Fix issue with not closing parquet files.
    
    * Fix the geoparquet footer reader.
    
    * Fix the geoparquet footer reader.
---
 .../GeoParquetMetadataPartitionReaderFactory.scala         | 12 +++++++-----
 .../GeoParquetMetadataPartitionReaderFactory.scala         | 13 ++++++++-----
 .../GeoParquetMetadataPartitionReaderFactory.scala         | 14 +++++++++-----
 3 files changed, 24 insertions(+), 15 deletions(-)

diff --git 
a/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 1fe2faa2e0..a101206fbf 100644
--- 
a/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -20,6 +20,8 @@ package 
org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.parquet.ParquetReadOptions
+import org.apache.parquet.format.converter.ParquetMetadataConverter
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.hadoop.util.HadoopInputFile
 import org.apache.spark.broadcast.Broadcast
@@ -67,11 +69,11 @@ object GeoParquetMetadataPartitionReaderFactory {
       partitionedFile: PartitionedFile,
       readDataSchema: StructType): Iterator[InternalRow] = {
     val filePath = partitionedFile.filePath
-    val metadata = ParquetFileReader
-      .open(HadoopInputFile.fromPath(new Path(filePath), configuration))
-      .getFooter
-      .getFileMetaData
-      .getKeyValueMetaData
+
+    val footer = ParquetFileReader
+      .readFooter(configuration, new Path(filePath), 
ParquetMetadataConverter.NO_FILTER)
+
+    val metadata = footer.getFileMetaData.getKeyValueMetaData
     val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
       case Some(geo) =>
         val geoColumnsMap = geo.columns.map { case (columnName, 
columnMetadata) =>
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 2a5e70624c..e4ca35992b 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -19,6 +19,7 @@
 package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.ParquetReadOptions
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.hadoop.util.HadoopInputFile
 import org.apache.spark.broadcast.Broadcast
@@ -66,12 +67,14 @@ object GeoParquetMetadataPartitionReaderFactory {
       configuration: Configuration,
       partitionedFile: PartitionedFile,
       readDataSchema: StructType): Iterator[InternalRow] = {
+    val inputFile = HadoopInputFile.fromPath(partitionedFile.toPath, 
configuration)
+    val inputStream = inputFile.newStream()
+
+    val footer = ParquetFileReader
+      .readFooter(inputFile, ParquetReadOptions.builder().build(), inputStream)
+
     val filePath = partitionedFile.toPath.toString
-    val metadata = ParquetFileReader
-      .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))
-      .getFooter
-      .getFileMetaData
-      .getKeyValueMetaData
+    val metadata = footer.getFileMetaData.getKeyValueMetaData
     val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
       case Some(geo) =>
         val geoColumnsMap = geo.columns.map { case (columnName, 
columnMetadata) =>
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 2a5e70624c..e1234e79d8 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -19,6 +19,7 @@
 package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.ParquetReadOptions
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.parquet.hadoop.util.HadoopInputFile
 import org.apache.spark.broadcast.Broadcast
@@ -66,12 +67,15 @@ object GeoParquetMetadataPartitionReaderFactory {
       configuration: Configuration,
       partitionedFile: PartitionedFile,
       readDataSchema: StructType): Iterator[InternalRow] = {
+
+    val inputFile = HadoopInputFile.fromPath(partitionedFile.toPath, 
configuration)
+    val inputStream = inputFile.newStream()
+
+    val footer = ParquetFileReader
+      .readFooter(inputFile, ParquetReadOptions.builder().build(), inputStream)
+
     val filePath = partitionedFile.toPath.toString
-    val metadata = ParquetFileReader
-      .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))
-      .getFooter
-      .getFileMetaData
-      .getKeyValueMetaData
+    val metadata = footer.getFileMetaData.getKeyValueMetaData
     val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
       case Some(geo) =>
         val geoColumnsMap = geo.columns.map { case (columnName, 
columnMetadata) =>

Reply via email to