alexeykudinkin commented on a change in pull request #5141:
URL: https://github.com/apache/hudi/pull/5141#discussion_r836669643



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
##########
@@ -30,14 +32,17 @@
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.ParquetReaderIterator;
+
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.hadoop.ParquetReader;
 
 public class HoodieParquetReader<R extends IndexedRecord> implements 
HoodieFileReader<R> {
+  
   private final Path path;
   private final Configuration conf;
   private final BaseFileUtils parquetUtils;
+  private List<ParquetReaderIterator> readerIterators = new ArrayList<>();

Review comment:
       @nsivabalan please make it final

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -333,7 +335,13 @@ object HoodieBaseRelation {
     partitionedFile => {
       val extension = FSUtils.getFileExtension(partitionedFile.filePath)
       if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) {
-        parquetReader.apply(partitionedFile)
+        val iter = parquetReader.apply(partitionedFile)
+        if (iter.isInstanceOf[Closeable]) {
+          // register a callback to close parquetReader which will be executed 
on task completion.
+          // when tasks finished, this method will be called, and release 
resources.
+          
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
iter.asInstanceOf[Closeable].close()))

Review comment:
       While i appreciate the intent here to tie up the iterator to the scope 
of particular task, i don't think this is the right way to fix it: you're tying 
the lifespan of the iterator to that one of the task (which in this case runs 
on executor), but there's no clear invariant why this iterator could not 
outlive this task.
   
   Instead we should rely on the RDD to close out the iterator when its done 
with iteration. And if you would take a look at `FileScanRDD` (which we rely 
on) you can see that it does exactly that. The reason why it's broken right now 
is b/c we modify the iterator (which is not inheriting from Closeable anymore):
   
   ```
   file: PartitionedFile => {
         val iter = readParquetFile(file)
         iter.flatMap {
           case r: InternalRow => Seq(r)
           case b: ColumnarBatch => b.rowIterator().asScala
         }
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to