This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 0e0fb9fde9 [SEDONA-699] Fix issue with not closing parquet files.
(#1749)
0e0fb9fde9 is described below
commit 0e0fb9fde90eba99d85f8a392280f4706f9d44aa
Author: Paweł Tokaj <[email protected]>
AuthorDate: Fri Jan 17 05:54:42 2025 +0100
[SEDONA-699] Fix issue with not closing parquet files. (#1749)
* Fix issue with not closing parquet files.
* Fix the geoparquet footer reader.
* Fix the geoparquet footer reader.
---
.../GeoParquetMetadataPartitionReaderFactory.scala | 12 +++++++-----
.../GeoParquetMetadataPartitionReaderFactory.scala | 13 ++++++++-----
.../GeoParquetMetadataPartitionReaderFactory.scala | 14 +++++++++-----
3 files changed, 24 insertions(+), 15 deletions(-)
diff --git
a/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
b/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 1fe2faa2e0..a101206fbf 100644
---
a/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++
b/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -20,6 +20,8 @@ package
org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.parquet.ParquetReadOptions
+import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.spark.broadcast.Broadcast
@@ -67,11 +69,11 @@ object GeoParquetMetadataPartitionReaderFactory {
partitionedFile: PartitionedFile,
readDataSchema: StructType): Iterator[InternalRow] = {
val filePath = partitionedFile.filePath
- val metadata = ParquetFileReader
- .open(HadoopInputFile.fromPath(new Path(filePath), configuration))
- .getFooter
- .getFileMetaData
- .getKeyValueMetaData
+
+ val footer = ParquetFileReader
+ .readFooter(configuration, new Path(filePath),
ParquetMetadataConverter.NO_FILTER)
+
+ val metadata = footer.getFileMetaData.getKeyValueMetaData
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName,
columnMetadata) =>
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 2a5e70624c..e4ca35992b 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata
import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.ParquetReadOptions
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.spark.broadcast.Broadcast
@@ -66,12 +67,14 @@ object GeoParquetMetadataPartitionReaderFactory {
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType): Iterator[InternalRow] = {
+ val inputFile = HadoopInputFile.fromPath(partitionedFile.toPath,
configuration)
+ val inputStream = inputFile.newStream()
+
+ val footer = ParquetFileReader
+ .readFooter(inputFile, ParquetReadOptions.builder().build(), inputStream)
+
val filePath = partitionedFile.toPath.toString
- val metadata = ParquetFileReader
- .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))
- .getFooter
- .getFileMetaData
- .getKeyValueMetaData
+ val metadata = footer.getFileMetaData.getKeyValueMetaData
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName,
columnMetadata) =>
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index 2a5e70624c..e1234e79d8 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata
import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.ParquetReadOptions
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.spark.broadcast.Broadcast
@@ -66,12 +67,15 @@ object GeoParquetMetadataPartitionReaderFactory {
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType): Iterator[InternalRow] = {
+
+ val inputFile = HadoopInputFile.fromPath(partitionedFile.toPath,
configuration)
+ val inputStream = inputFile.newStream()
+
+ val footer = ParquetFileReader
+ .readFooter(inputFile, ParquetReadOptions.builder().build(), inputStream)
+
val filePath = partitionedFile.toPath.toString
- val metadata = ParquetFileReader
- .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))
- .getFooter
- .getFileMetaData
- .getKeyValueMetaData
+ val metadata = footer.getFileMetaData.getKeyValueMetaData
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName,
columnMetadata) =>