alexeykudinkin commented on a change in pull request #5141:
URL: https://github.com/apache/hudi/pull/5141#discussion_r836669643
##########
File path:
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
##########
@@ -30,14 +32,17 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ParquetReaderIterator;
+
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
public class HoodieParquetReader<R extends IndexedRecord> implements
HoodieFileReader<R> {
+
private final Path path;
private final Configuration conf;
private final BaseFileUtils parquetUtils;
+ private List<ParquetReaderIterator> readerIterators = new ArrayList<>();
Review comment:
@nsivabalan please make it final
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -333,7 +335,13 @@ object HoodieBaseRelation {
partitionedFile => {
val extension = FSUtils.getFileExtension(partitionedFile.filePath)
if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) {
- parquetReader.apply(partitionedFile)
+ val iter = parquetReader.apply(partitionedFile)
+ if (iter.isInstanceOf[Closeable]) {
+ // register a callback to close parquetReader which will be executed
on task completion.
+ // when tasks finished, this method will be called, and release
resources.
+
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ =>
iter.asInstanceOf[Closeable].close()))
Review comment:
While i appreciate the intent here to tie up the iterator to the scope
of particular task, i don't think this is the right way to fix it: you're tying
the lifespan of the iterator to that one of the task (which in this case runs
on executor), but there's no clear invariant why this iterator could not
outlive this task.
Instead we should rely on the RDD to close out the iterator when its done
with iteration. And if you would take a look at `FileScanRDD` (which we rely
on) you can see that it does exactly that. The reason why it's broken right now
is b/c we modify the iterator (which is not inheriting from Closeable anymore):
```
file: PartitionedFile => {
val iter = readParquetFile(file)
iter.flatMap {
case r: InternalRow => Seq(r)
case b: ColumnarBatch => b.rowIterator().asScala
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]