This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 15ae6787ec4 [HUDI-9219] Add callbacks for CloseableIterators and eager 
close data readers (#13178)
15ae6787ec4 is described below

commit 15ae6787ec4fa6ddad0d30c0c81e81b4a41b477e
Author: Tim Brown <[email protected]>
AuthorDate: Mon Apr 28 23:13:50 2025 -0500

    [HUDI-9219] Add callbacks for CloseableIterators and eager close data 
readers (#13178)
---
 .../MultipleSparkJobExecutionStrategy.java         |   7 +-
 .../hudi/data/CloseableIteratorListener.java       |  66 +++++++++++
 .../org/apache/hudi/data/HoodieJavaPairRDD.java    |   2 +-
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |   6 +-
 .../apache/hudi/data/TestHoodieJavaPairRDD.java    |  12 ++
 .../org/apache/hudi/data/TestHoodieJavaRDD.java    |  36 ++++++
 .../hudi/data/TrackingCloseableIterator.java       |  62 +++++++++++
 .../hudi/common/data/HoodieBaseListData.java       |  35 +++++-
 .../apache/hudi/common/data/HoodieListData.java    |  20 ++--
 .../hudi/common/data/HoodieListPairData.java       | 124 +++++++++++----------
 .../common/table/read/HoodieFileGroupReader.java   |  14 ++-
 .../io/storage/HoodieAvroHFileReaderImplBase.java  |  10 +-
 .../hudi/common/data/CloseValidationIterator.java  |  56 ++++++++++
 .../hudi/common/data/TestHoodieListData.java       |  38 +++++++
 .../common/data/TestHoodieListDataPairData.java    |  46 ++++++++
 .../hadoop/TestHoodieHBaseHFileReaderWriter.java   |  17 +--
 .../io/hadoop/TestHoodieHFileReaderWriter.java     |  17 +--
 .../hudi/io/hadoop/TestOrcReaderIterator.java      |  49 ++++----
 .../datasources/HoodieMultipleBaseFileFormat.scala |   4 +-
 ...odieFileGroupReaderBasedParquetFileFormat.scala |   5 +-
 .../procedures/PartitionBucketIndexManager.scala   |   2 +
 21 files changed, 503 insertions(+), 125 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 147c3c2b687..f57367f502d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -55,6 +55,7 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.CloseableIteratorListener;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.exception.HoodieException;
@@ -335,7 +336,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
         };
         suppliers.add(iteratorSupplier);
       });
-      return new LazyConcatenatingIterator<>(suppliers);
+      return CloseableIteratorListener.addListener(new 
LazyConcatenatingIterator<>(suppliers));
     }));
   }
 
@@ -357,7 +358,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
             iteratorGettersForPartition.add(recordIteratorGetter);
           });
 
-          return new LazyConcatenatingIterator<>(iteratorGettersForPartition);
+          return CloseableIteratorListener.addListener(new 
LazyConcatenatingIterator<>(iteratorGettersForPartition));
         }));
   }
 
@@ -477,7 +478,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
             0, Long.MAX_VALUE, usePosition, false);
         fileGroupReader.initRecordIterators();
         // read records from the FG reader
-        return fileGroupReader.getClosableIterator();
+        return 
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator());
       }
     }).rdd();
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/CloseableIteratorListener.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/CloseableIteratorListener.java
new file mode 100644
index 00000000000..c018aeccfa3
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/CloseableIteratorListener.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.spark.TaskContext;
+import org.apache.spark.util.TaskCompletionListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+
+/**
+ * Helper class for adding a spark task completion listener that will ensure 
the iterator is closed if it is an instance of {@link AutoCloseable}.
+ * This is commonly used with {@link 
org.apache.hudi.common.util.collection.ClosableIterator} to ensure the 
resources are closed after the task completes.
+ */
+public class CloseableIteratorListener implements TaskCompletionListener {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CloseableIteratorListener.class);
+  private final Object iterator;
+
+  private CloseableIteratorListener(Object iterator) {
+    this.iterator = iterator;
+  }
+
+  public static <T> Iterator<T> addListener(Iterator<T> iterator) {
+    TaskContext.get().addTaskCompletionListener(new 
CloseableIteratorListener(iterator));
+    return iterator;
+  }
+
+  public static <T> scala.collection.Iterator<T> 
addListener(scala.collection.Iterator<T> iterator) {
+    TaskContext.get().addTaskCompletionListener(new 
CloseableIteratorListener(iterator));
+    return iterator;
+  }
+
+  /**
+   * Closes the iterator if it also implements {@link AutoCloseable}, 
otherwise it is a no-op.
+   *
+   * @param context the spark context
+   */
+  @Override
+  public void onTaskCompletion(TaskContext context) {
+    if (iterator instanceof AutoCloseable) {
+      try {
+        ((AutoCloseable) iterator).close();
+      } catch (Exception ex) {
+        LOG.warn("Failed to properly close iterator", ex);
+      }
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
index fbcab6b575e..2dbbb1880bf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
@@ -131,7 +131,7 @@ public class HoodieJavaPairRDD<K, V> implements 
HoodiePairData<K, V> {
   }
 
   public <W> HoodiePairData<K, W> flatMapValues(SerializableFunction<V, 
Iterator<W>> func) {
-    return HoodieJavaPairRDD.of(pairRDDData.flatMapValues(func::apply));
+    return HoodieJavaPairRDD.of(pairRDDData.flatMapValues(iter -> 
CloseableIteratorListener.addListener(func.apply(iter))));
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index faec42368ca..0eca77693db 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -150,21 +150,21 @@ public class HoodieJavaRDD<T> implements HoodieData<T> {
 
   @Override
   public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, 
Iterator<O>> func, boolean preservesPartitioning) {
-    return HoodieJavaRDD.of(rddData.mapPartitions(func::apply, 
preservesPartitioning));
+    return HoodieJavaRDD.of(rddData.mapPartitions(iter -> 
CloseableIteratorListener.addListener(func.apply(iter)), 
preservesPartitioning));
   }
 
   @Override
   public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
     // NOTE: Unrolling this lambda into a method reference results in 
[[ClassCastException]]
     //       due to weird interop b/w Scala and Java
-    return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e)));
+    return HoodieJavaRDD.of(rddData.flatMap(e -> 
CloseableIteratorListener.addListener(func.apply(e))));
   }
 
   @Override
   public <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T, 
Iterator<? extends Pair<K, V>>> func) {
     return HoodieJavaPairRDD.of(
         rddData.flatMapToPair(e ->
-            new MappingIterator<>(func.apply(e), p -> new Tuple2<>(p.getKey(), 
p.getValue()))));
+            new 
MappingIterator<>(CloseableIteratorListener.addListener(func.apply(e)), p -> 
new Tuple2<>(p.getKey(), p.getValue()))));
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java
index 75bc888a71d..3c11b1279e4 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaPairRDD.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import scala.Tuple2;
@@ -107,4 +108,15 @@ public class TestHoodieJavaPairRDD {
       assertEquals(Option.of("value1"), item.getRight().getRight());
     });
   }
+
+  @Test
+  void testFlatMapValuesWithCloseable() {
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+    HoodiePairData<Integer, String> input = 
HoodieJavaPairRDD.of(jsc.parallelizePairs(Arrays.asList(Tuple2.apply(1, 
partition1), Tuple2.apply(2, partition2)), 2));
+    input.flatMapValues(partition -> new 
TrackingCloseableIterator<>(partition, Collections.singletonList(1).iterator()))
+        .collectAsList();
+    assertTrue(TrackingCloseableIterator.isClosed(partition1));
+    assertTrue(TrackingCloseableIterator.isClosed(partition2));
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
index a2617b592d6..cbaf7fa604d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
@@ -27,10 +27,13 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.spark.sql.internal.SQLConf;
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieJavaRDD extends HoodieClientTestBase {
   @Test
@@ -65,4 +68,37 @@ public class TestHoodieJavaRDD extends HoodieClientTestBase {
         .reduceByKey((p1, p2) -> p1, 11);
     assertEquals(11, shuffleRDD.deduceNumPartitions());
   }
+
+  @Test
+  void testMapPartitionsWithCloseable() {
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+    HoodieData<String> input = HoodieJavaRDD.of(Arrays.asList(partition1, 
partition2), context, 2);
+    input.mapPartitions(partition -> new 
TrackingCloseableIterator<>(partition.next(), 
Collections.singletonList("a").iterator()), true)
+        .collectAsList();
+    assertTrue(TrackingCloseableIterator.isClosed(partition1));
+    assertTrue(TrackingCloseableIterator.isClosed(partition2));
+  }
+
+  @Test
+  void testFlatMapWithCloseable() {
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+    HoodieData<String> input = HoodieJavaRDD.of(Arrays.asList(partition1, 
partition2), context, 2);
+    input.flatMap(partition -> new TrackingCloseableIterator<>(partition, 
Collections.singletonList("a").iterator()))
+        .collectAsList();
+    assertTrue(TrackingCloseableIterator.isClosed(partition1));
+    assertTrue(TrackingCloseableIterator.isClosed(partition2));
+  }
+
+  @Test
+  void testFlatMapToPairWithCloseable() {
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+    HoodieData<String> input = HoodieJavaRDD.of(Arrays.asList(partition1, 
partition2), context, 2);
+    input.flatMapToPair(partition -> new 
TrackingCloseableIterator<>(partition, Collections.singletonList(Pair.of(1, 
"1")).iterator()))
+        .collectAsList();
+    assertTrue(TrackingCloseableIterator.isClosed(partition1));
+    assertTrue(TrackingCloseableIterator.isClosed(partition2));
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TrackingCloseableIterator.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TrackingCloseableIterator.java
new file mode 100644
index 00000000000..d06ec7766f3
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TrackingCloseableIterator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Closeable iterator to use in Spark related testing to ensure that the close 
method is properly called after transformations.
+ * @param <T> type of record within the iterator
+ */
+class TrackingCloseableIterator<T> implements ClosableIterator<T>, 
Serializable {
+  private static final Map<String, Boolean> IS_CLOSED_BY_ID = new HashMap<>();
+  private final String id;
+  private final Iterator<T> inner;
+
+  public TrackingCloseableIterator(String id, Iterator<T> inner) {
+    this.id = id;
+    this.inner = inner;
+    IS_CLOSED_BY_ID.put(id, false);
+  }
+
+  public static boolean isClosed(String id) {
+    return IS_CLOSED_BY_ID.get(id);
+  }
+
+  @Override
+  public void close() {
+    IS_CLOSED_BY_ID.put(id, true);
+  }
+
+  @Override
+  public boolean hasNext() {
+    return inner.hasNext();
+  }
+
+  @Override
+  public T next() {
+    return inner.next();
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
index 6f3dbfcef99..b603b99d930 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieBaseListData.java
@@ -21,6 +21,10 @@ package org.apache.hudi.common.data;
 
 import org.apache.hudi.common.util.Either;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -43,7 +47,12 @@ public abstract class HoodieBaseListData<T> {
   protected HoodieBaseListData(Stream<T> dataStream, boolean lazy) {
     // NOTE: In case this container is being instantiated by an eager parent, 
we have to
     //       pre-materialize the stream
-    this.data = lazy ? Either.left(dataStream) : 
Either.right(dataStream.collect(Collectors.toList()));
+    if (lazy) {
+      this.data = Either.left(dataStream);
+    } else {
+      this.data = Either.right(dataStream.collect(Collectors.toList()));
+      dataStream.close();
+    }
     this.lazy = lazy;
   }
 
@@ -69,9 +78,31 @@ public abstract class HoodieBaseListData<T> {
 
   protected List<T> collectAsList() {
     if (lazy) {
-      return data.asLeft().collect(Collectors.toList());
+      try (Stream<T> stream = data.asLeft()) {
+        return stream.collect(Collectors.toList());
+      }
     } else {
       return data.asRight();
     }
   }
+
+  static class IteratorCloser implements Runnable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(IteratorCloser.class);
+    private final Iterator<?> iterator;
+
+    IteratorCloser(Iterator<?> iterator) {
+      this.iterator = iterator;
+    }
+
+    @Override
+    public void run() {
+      if (iterator instanceof AutoCloseable) {
+        try {
+          ((AutoCloseable) iterator).close();
+        } catch (Exception ex) {
+          LOG.warn("Failed to properly close iterator", ex);
+        }
+      }
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
index 690ab71c090..5eebf2a2401 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -119,10 +119,12 @@ public class HoodieListData<T> extends 
HoodieBaseListData<T> implements HoodieDa
   @Override
   public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, 
Iterator<O>> func, boolean preservesPartitioning) {
     Function<Iterator<T>, Iterator<O>> mapper = throwingMapWrapper(func);
+    Iterator<T> iterator = asStream().iterator();
+    Iterator<O> newIterator = mapper.apply(iterator);
     return new HoodieListData<>(
         StreamSupport.stream(
             Spliterators.spliteratorUnknownSize(
-                mapper.apply(asStream().iterator()), Spliterator.ORDERED), 
true),
+                newIterator, Spliterator.ORDERED), true).onClose(new 
IteratorCloser(newIterator)),
         lazy
     );
   }
@@ -130,18 +132,22 @@ public class HoodieListData<T> extends 
HoodieBaseListData<T> implements HoodieDa
   @Override
   public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
     Function<T, Iterator<O>> mapper = throwingMapWrapper(func);
-    Stream<O> mappedStream = asStream().flatMap(e ->
-        StreamSupport.stream(
-            Spliterators.spliteratorUnknownSize(mapper.apply(e), 
Spliterator.ORDERED), true));
+    Stream<O> mappedStream = asStream().flatMap(e -> {
+      Iterator<O> iterator = mapper.apply(e);
+      return StreamSupport.stream(
+          Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), 
true).onClose(new IteratorCloser(iterator));
+    });
     return new HoodieListData<>(mappedStream, lazy);
   }
 
   @Override
   public <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T, 
Iterator<? extends Pair<K, V>>> func) {
     Function<T, Iterator<? extends Pair<K, V>>> mapper = 
throwingMapWrapper(func);
-    Stream<Pair<K, V>> mappedStream = asStream().flatMap(e ->
-        StreamSupport.stream(
-            Spliterators.spliteratorUnknownSize(mapper.apply(e), 
Spliterator.ORDERED), true));
+    Stream<Pair<K, V>> mappedStream = asStream().flatMap(e -> {
+      Iterator<? extends Pair<K, V>> iterator = mapper.apply(e);
+      return StreamSupport.stream(
+          Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), 
true).onClose(new IteratorCloser(iterator));
+    });
 
     return new HoodieListPairData<>(mappedStream, lazy);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
index ebf7207c84e..dd5bb224962 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
@@ -105,7 +105,9 @@ public class HoodieListPairData<K, V> extends 
HoodieBaseListData<Pair<K, V>> imp
 
   @Override
   public Map<K, Long> countByKey() {
-    return asStream().collect(Collectors.groupingBy(Pair::getKey, 
Collectors.counting()));
+    try (Stream<Pair<K, V>> stream = asStream()) {
+      return stream.collect(Collectors.groupingBy(Pair::getKey, 
Collectors.counting()));
+    }
   }
 
   @Override
@@ -114,27 +116,31 @@ public class HoodieListPairData<K, V> extends 
HoodieBaseListData<Pair<K, V>> imp
     Collector<Pair<K, V>, ?, Map<K, List<V>>> groupingCollector =
         Collectors.groupingBy(Pair::getKey, mappingCollector);
 
-    Map<K, List<V>> groupedByKey = asStream().collect(groupingCollector);
-    return new HoodieListPairData<>(
-        groupedByKey.entrySet().stream().map(e -> Pair.of(e.getKey(), 
e.getValue())),
-        lazy
-    );
+    try (Stream<Pair<K, V>> s = asStream()) {
+      Map<K, List<V>> groupedByKey = s.collect(groupingCollector);
+      return new HoodieListPairData<>(
+          groupedByKey.entrySet().stream().map(e -> Pair.of(e.getKey(), 
e.getValue())),
+          lazy
+      );
+    }
   }
 
   @Override
   public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> 
combiner, int parallelism) {
-    Map<K, java.util.Optional<V>> reducedMap = asStream().collect(
-        Collectors.groupingBy(
-            Pair::getKey,
-            HashMap::new,
-            Collectors.mapping(Pair::getValue, 
Collectors.reducing(combiner::apply))));
-
-    return new HoodieListPairData<>(
-        reducedMap.entrySet()
-            .stream()
-            .map(e -> Pair.of(e.getKey(), e.getValue().orElse(null))),
-        lazy
-    );
+    try (Stream<Pair<K, V>> stream = asStream()) {
+      Map<K, java.util.Optional<V>> reducedMap = stream.collect(
+          Collectors.groupingBy(
+              Pair::getKey,
+              HashMap::new,
+              Collectors.mapping(Pair::getValue, 
Collectors.reducing(combiner::apply))));
+
+      return new HoodieListPairData<>(
+          reducedMap.entrySet()
+              .stream()
+              .map(e -> Pair.of(e.getKey(), e.getValue().orElse(null))),
+          lazy
+      );
+    }
   }
 
   @Override
@@ -158,7 +164,7 @@ public class HoodieListPairData<K, V> extends 
HoodieBaseListData<Pair<K, V>> imp
           new MappingIterator<>(mappedValuesIterator, w -> Pair.of(p.getKey(), 
w));
 
       return StreamSupport.stream(
-          Spliterators.spliteratorUnknownSize(mappedPairsIterator, 
Spliterator.ORDERED), true);
+          Spliterators.spliteratorUnknownSize(mappedPairsIterator, 
Spliterator.ORDERED), true).onClose(new IteratorCloser(mappedValuesIterator));
     }), lazy);
   }
 
@@ -172,26 +178,28 @@ public class HoodieListPairData<K, V> extends 
HoodieBaseListData<Pair<K, V>> imp
     ValidationUtils.checkArgument(other instanceof HoodieListPairData);
 
     // Transform right-side container to a multi-map of [[K]] to [[List<W>]] 
values
-    HashMap<K, List<W>> rightStreamMap = ((HoodieListPairData<K, W>) 
other).asStream().collect(
-        Collectors.groupingBy(
-            Pair::getKey,
-            HashMap::new,
-            Collectors.mapping(Pair::getValue, Collectors.toList())));
-
-    Stream<Pair<K, Pair<V, Option<W>>>> leftOuterJoined = 
asStream().flatMap(pair -> {
-      K key = pair.getKey();
-      V leftValue = pair.getValue();
-      List<W> rightValues = rightStreamMap.get(key);
-
-      if (rightValues == null) {
-        return Stream.of(Pair.of(key, Pair.of(leftValue, Option.empty())));
-      } else {
-        return rightValues.stream().map(rightValue ->
-            Pair.of(key, Pair.of(leftValue, Option.of(rightValue))));
-      }
-    });
-
-    return new HoodieListPairData<>(leftOuterJoined, lazy);
+    try (Stream<Pair<K, W>> stream = ((HoodieListPairData<K, W>) 
other).asStream()) {
+      HashMap<K, List<W>> rightStreamMap = stream.collect(
+          Collectors.groupingBy(
+              Pair::getKey,
+              HashMap::new,
+              Collectors.mapping(Pair::getValue, Collectors.toList())));
+
+      Stream<Pair<K, Pair<V, Option<W>>>> leftOuterJoined = 
asStream().flatMap(pair -> {
+        K key = pair.getKey();
+        V leftValue = pair.getValue();
+        List<W> rightValues = rightStreamMap.get(key);
+
+        if (rightValues == null) {
+          return Stream.of(Pair.of(key, Pair.of(leftValue, Option.empty())));
+        } else {
+          return rightValues.stream().map(rightValue ->
+              Pair.of(key, Pair.of(leftValue, Option.of(rightValue))));
+        }
+      });
+
+      return new HoodieListPairData<>(leftOuterJoined, lazy);
+    }
   }
 
   @Override
@@ -206,24 +214,26 @@ public class HoodieListPairData<K, V> extends 
HoodieBaseListData<Pair<K, V>> imp
     ValidationUtils.checkArgument(other instanceof HoodieListPairData);
 
     // Transform right-side container to a multi-map of [[K]] to [[List<W>]] 
values
-    HashMap<K, List<W>> rightStreamMap = ((HoodieListPairData<K, W>) 
other).asStream().collect(
-        Collectors.groupingBy(
-            Pair::getKey,
-            HashMap::new,
-            Collectors.mapping(Pair::getValue, Collectors.toList())));
-
-    List<Pair<K, Pair<V, W>>> joinResult = new ArrayList<>();
-    asStream().forEach(pair -> {
-      K key = pair.getKey();
-      V leftValue = pair.getValue();
-      List<W> rightValues = rightStreamMap.getOrDefault(key, 
Collections.emptyList());
-
-      for (W rightValue : rightValues) {
-        joinResult.add(Pair.of(key, Pair.of(leftValue, rightValue)));
-      }
-    });
-
-    return new HoodieListPairData<>(joinResult, lazy);
+    try (Stream<Pair<K, W>> stream = ((HoodieListPairData<K, W>) 
other).asStream()) {
+      HashMap<K, List<W>> rightStreamMap = stream.collect(
+          Collectors.groupingBy(
+              Pair::getKey,
+              HashMap::new,
+              Collectors.mapping(Pair::getValue, Collectors.toList())));
+
+      List<Pair<K, Pair<V, W>>> joinResult = new ArrayList<>();
+      asStream().forEach(pair -> {
+        K key = pair.getKey();
+        V leftValue = pair.getValue();
+        List<W> rightValues = rightStreamMap.getOrDefault(key, 
Collections.emptyList());
+
+        for (W rightValue : rightValues) {
+          joinResult.add(Pair.of(key, Pair.of(leftValue, rightValue)));
+        }
+      });
+
+      return new HoodieListPairData<>(joinResult, lazy);
+    }
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index a57fb777fbd..570d9580ca0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -348,12 +348,14 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
 
     @Override
     public void close() {
-      try {
-        reader.close();
-      } catch (IOException e) {
-        throw new HoodieIOException("Failed to close the reader", e);
-      } finally {
-        this.reader = null;
+      if (reader != null) {
+        try {
+          reader.close();
+        } catch (IOException e) {
+          throw new HoodieIOException("Failed to close the reader", e);
+        } finally {
+          this.reader = null;
+        }
       }
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
index 143d3ab0168..4dba4d840f0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
@@ -54,8 +54,9 @@ public abstract class HoodieAvroHFileReaderImplBase extends 
HoodieAvroFileReader
   public static List<IndexedRecord> readAllRecords(HoodieAvroFileReader reader)
       throws IOException {
     Schema schema = reader.getSchema();
-    return toStream(reader.getIndexedRecordIterator(schema))
-        .collect(Collectors.toList());
+    try (ClosableIterator<IndexedRecord> indexedRecordIterator = 
reader.getIndexedRecordIterator(schema)) {
+      return toStream(indexedRecordIterator).collect(Collectors.toList());
+    }
   }
 
   /**
@@ -77,8 +78,9 @@ public abstract class HoodieAvroHFileReaderImplBase extends 
HoodieAvroFileReader
                                                 List<String> keys,
                                                 Schema schema) throws 
IOException {
     Collections.sort(keys);
-    return toStream(reader.getIndexedRecordsByKeysIterator(keys, schema))
-        .collect(Collectors.toList());
+    try (ClosableIterator<IndexedRecord> indexedRecordsByKeysIterator = 
reader.getIndexedRecordsByKeysIterator(keys, schema)) {
+      return 
toStream(indexedRecordsByKeysIterator).collect(Collectors.toList());
+    }
   }
 
   public abstract ClosableIterator<IndexedRecord> 
getIndexedRecordsByKeysIterator(List<String> keys,
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/data/CloseValidationIterator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/data/CloseValidationIterator.java
new file mode 100644
index 00000000000..b61bdd6536f
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/data/CloseValidationIterator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.util.collection.ClosableIterator;
+
+import java.util.Iterator;
+
+/**
+ * Implementation of a {@link ClosableIterator} to help validate that the 
close method is properly called.
+ * @param <T> type of record within the iterator
+ */
+class CloseValidationIterator<T> implements ClosableIterator<T> {
+  private final Iterator<T> inner;
+  private boolean isClosed = false;
+
+  public CloseValidationIterator(Iterator<T> inner) {
+    this.inner = inner;
+  }
+
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  @Override
+  public void close() {
+    isClosed = true;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return inner.hasNext();
+  }
+
+  @Override
+  public T next() {
+    return inner.next();
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
index 795318f5e01..24bd9a90949 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -94,4 +95,41 @@ class TestHoodieListData {
     emptyListData = HoodieListData.lazy(Collections.emptyList());
     assertTrue(emptyListData.isEmpty());
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testMapPartitionsWithCloseable(boolean isLazy) {
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+    HoodieData<String> input = new HoodieListData<>(Stream.of(partition1, 
partition2), isLazy);
+    CloseValidationIterator<String> iterator = new 
CloseValidationIterator<>(Collections.singletonList("value").iterator());
+    assertEquals(1, input.mapPartitions(partition -> iterator, 
true).collectAsList().size());
+    assertTrue(iterator.isClosed());
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testFlatMapWithCloseable(boolean isLazy) {
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+    CloseValidationIterator<String> iterator1 = new 
CloseValidationIterator<>(Collections.singletonList("value").iterator());
+    CloseValidationIterator<String> iterator2 = new 
CloseValidationIterator<>(Collections.singletonList("value").iterator());
+    HoodieData<String> input = new HoodieListData<>(Stream.of(partition1, 
partition2), isLazy);
+    assertEquals(2, input.flatMap(partition -> partition.equals(partition1) ? 
iterator1 : iterator2).collectAsList().size());
+    assertTrue(iterator1.isClosed());
+    assertTrue(iterator2.isClosed());
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testFlatMapToPairWithCloseable(boolean isLazy) {
+    String partition1 = "partition1";
+    String partition2 = "partition2";
+    HoodieData<String> input = new HoodieListData<>(Stream.of(partition1, 
partition2), isLazy);
+    CloseValidationIterator<Pair<String, String>> iterator1 = new 
CloseValidationIterator<>(Collections.singletonList(Pair.of("1", 
"value")).iterator());
+    CloseValidationIterator<Pair<String, String>> iterator2 = new 
CloseValidationIterator<>(Collections.singletonList(Pair.of("2", 
"value")).iterator());
+    assertEquals(2, input.flatMapToPair(partition -> 
partition.equals(partition1) ? iterator1 : iterator2).collectAsList().size());
+    assertTrue(iterator1.isClosed());
+    assertTrue(iterator2.isClosed());
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
index 8355a5f30ed..d0bda7715a6 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListDataPairData.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -42,6 +43,7 @@ import java.util.stream.StreamSupport;
 import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;
 import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Tests {@link HoodieListPairData}.
@@ -149,6 +151,50 @@ public class TestHoodieListDataPairData {
     assertEquals(expected, toMap(reduced));
   }
 
+  @Test
+  void testReduceByKeyWithCloseableInput() {
+    List<CloseValidationIterator<Pair<Integer, Integer>>> createdIterators = 
new ArrayList<>();
+    HoodiePairData<Integer, Integer> data = 
HoodieListData.lazy(Arrays.asList(1, 1, 1))
+        .flatMapToPair(key -> {
+          CloseValidationIterator<Pair<Integer, Integer>> iter = new 
CloseValidationIterator<>(Collections.singletonList(Pair.of(key, 
1)).iterator());
+          createdIterators.add(iter);
+          return iter;
+        });
+    List<Pair<Integer, Integer>> result = data.reduceByKey(Integer::sum, 
1).collectAsList();
+    assertEquals(Collections.singletonList(Pair.of(1, 3)), result);
+    createdIterators.forEach(iter -> assertTrue(iter.isClosed()));
+  }
+
+  @Test
+  void testLeftOuterJoinWithCloseableInput() {
+    List<CloseValidationIterator<Pair<Integer, Integer>>> createdIterators = 
new ArrayList<>();
+    HoodiePairData<Integer, Integer> dataToJoin = 
HoodieListData.lazy(Arrays.asList(1, 2, 3))
+        .flatMapToPair(key -> {
+          CloseValidationIterator<Pair<Integer, Integer>> iter = new 
CloseValidationIterator<>(Collections.singletonList(Pair.of(key, 
1)).iterator());
+          createdIterators.add(iter);
+          return iter;
+        });
+    HoodiePairData<Integer, Integer> data = 
HoodieListPairData.lazy(Arrays.asList(Pair.of(1, 1), Pair.of(4, 2)));
+    List<Pair<Integer, Pair<Integer, Option<Integer>>>> result = 
data.leftOuterJoin(dataToJoin).collectAsList();
+    assertEquals(2, result.size());
+    createdIterators.forEach(iter -> assertTrue(iter.isClosed()));
+  }
+
+  @Test
+  void testJoinWithCloseableInput() {
+    List<CloseValidationIterator<Pair<Integer, Integer>>> createdIterators = 
new ArrayList<>();
+    HoodiePairData<Integer, Integer> dataToJoin = 
HoodieListData.lazy(Arrays.asList(1, 2, 3))
+        .flatMapToPair(key -> {
+          CloseValidationIterator<Pair<Integer, Integer>> iter = new 
CloseValidationIterator<>(Collections.singletonList(Pair.of(key, 
1)).iterator());
+          createdIterators.add(iter);
+          return iter;
+        });
+    HoodiePairData<Integer, Integer> data = 
HoodieListPairData.lazy(Arrays.asList(Pair.of(1, 1), Pair.of(4, 2)));
+    List<Pair<Integer, Pair<Integer, Integer>>> result = 
data.join(dataToJoin).collectAsList();
+    assertEquals(1, result.size());
+    createdIterators.forEach(iter -> assertTrue(iter.isClosed()));
+  }
+
   @Test
   public void testLeftOuterJoinSingleValuePerKey() {
     HoodiePairData<String, String> pairData1 = 
HoodieListPairData.lazy(Arrays.asList(
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
index f34034d0a35..e0d307e2f03 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
@@ -42,7 +43,6 @@ import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Spliterator;
 import java.util.Spliterators;
@@ -107,15 +107,16 @@ public class TestHoodieHBaseHFileReaderWriter extends 
TestHoodieHFileReaderWrite
           (entry.get("_row_key").toString()).contains("key05")
               || (entry.get("_row_key").toString()).contains("key24")
               || 
(entry.get("_row_key").toString()).contains("key31"))).collect(Collectors.toList());
-      Iterator<IndexedRecord> iterator =
+      try (ClosableIterator<IndexedRecord> iterator =
           hfileReader.getIndexedRecordsByKeysIterator(
               Arrays.asList("key00001", "key05", "key24", "key16", "key31", 
"key61"),
-              avroSchema);
-      List<GenericRecord> recordsByKeys =
-          StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
-              .map(r -> (GenericRecord) r)
-              .collect(Collectors.toList());
-      assertEquals(expectedKey1s, recordsByKeys);
+              avroSchema)) {
+        List<GenericRecord> recordsByKeys =
+            StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+                .map(r -> (GenericRecord) r)
+                .collect(Collectors.toList());
+        assertEquals(expectedKey1s, recordsByKeys);
+      }
     }
   }
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
index 362f58a00cf..1044604419a 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader;
@@ -32,7 +33,6 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.stream.Collectors;
@@ -78,15 +78,16 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieHFileReaderWriterBase
       // Even though key16 exists, it's a backward seek not in order.
       // Our native HFile reader does not allow backward seek, and throws an 
exception
       // Note that backward seek is not expected to happen in production code
-      Iterator<IndexedRecord> iterator =
+      try (ClosableIterator<IndexedRecord> iterator =
           hfileReader.getIndexedRecordsByKeysIterator(
               Arrays.asList("key00001", "key05", "key24", "key16", "key31", 
"key61"),
-              avroSchema);
-      assertThrows(
-          IllegalStateException.class,
-          () -> StreamSupport.stream(
-                  Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
-              .collect(Collectors.toList()));
+              avroSchema)) {
+        assertThrows(
+            IllegalStateException.class,
+            () -> StreamSupport.stream(
+                    Spliterators.spliteratorUnknownSize(iterator, 
Spliterator.ORDERED), false)
+                .collect(Collectors.toList()));
+      }
     }
   }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java
index 4cf6f7c27c7..698f44597ce 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestOrcReaderIterator.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.util.AvroOrcUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -39,7 +40,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
-import java.util.Iterator;
 
 import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@@ -66,33 +66,34 @@ public class TestOrcReaderIterator {
     Schema avroSchema = getSchemaFromResource(TestOrcReaderIterator.class, 
"/simple-test.avsc");
     TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema);
     OrcFile.WriterOptions options = 
OrcFile.writerOptions(conf).setSchema(orcSchema).compress(CompressionKind.ZLIB);
-    Writer writer = OrcFile.createWriter(filePath, options);
-    VectorizedRowBatch batch = orcSchema.createRowBatch();
-    BytesColumnVector nameColumns = (BytesColumnVector) batch.cols[0];
-    LongColumnVector numberColumns = (LongColumnVector) batch.cols[1];
-    BytesColumnVector colorColumns = (BytesColumnVector) batch.cols[2];
-    for (int r = 0; r < 5; ++r) {
-      int row = batch.size++;
-      byte[] name = getUTF8Bytes("name" + r);
-      nameColumns.setVal(row, name);
-      byte[] color = getUTF8Bytes("color" + r);
-      colorColumns.setVal(row, color);
-      numberColumns.vector[row] = r;
+    try (Writer writer = OrcFile.createWriter(filePath, options)) {
+      VectorizedRowBatch batch = orcSchema.createRowBatch();
+      BytesColumnVector nameColumns = (BytesColumnVector) batch.cols[0];
+      LongColumnVector numberColumns = (LongColumnVector) batch.cols[1];
+      BytesColumnVector colorColumns = (BytesColumnVector) batch.cols[2];
+      for (int r = 0; r < 5; ++r) {
+        int row = batch.size++;
+        byte[] name = getUTF8Bytes("name" + r);
+        nameColumns.setVal(row, name);
+        byte[] color = getUTF8Bytes("color" + r);
+        colorColumns.setVal(row, color);
+        numberColumns.vector[row] = r;
+      }
+      writer.addRowBatch(batch);
     }
-    writer.addRowBatch(batch);
-    writer.close();
 
     Reader reader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(conf));
     RecordReader recordReader = reader.rows(new 
Reader.Options(conf).schema(orcSchema));
-    Iterator<GenericRecord> iterator = new OrcReaderIterator<>(recordReader, 
avroSchema, orcSchema);
-    int recordCount = 0;
-    while (iterator.hasNext()) {
-      GenericRecord record = iterator.next();
-      assertEquals("name" + recordCount, record.get("name").toString());
-      assertEquals("color" + recordCount, 
record.get("favorite_color").toString());
-      assertEquals(recordCount, record.get("favorite_number"));
-      recordCount++;
+    try (ClosableIterator<GenericRecord> iterator = new 
OrcReaderIterator<>(recordReader, avroSchema, orcSchema)) {
+      int recordCount = 0;
+      while (iterator.hasNext()) {
+        GenericRecord record = iterator.next();
+        assertEquals("name" + recordCount, record.get("name").toString());
+        assertEquals("color" + recordCount, 
record.get("favorite_color").toString());
+        assertEquals(recordCount, record.get("favorite_number"));
+        recordCount++;
+      }
+      assertEquals(5, recordCount);
     }
-    assertEquals(5, recordCount);
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
index c1b41061992..e54faaac1da 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
@@ -24,6 +24,7 @@ import 
org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL,
 import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.data.CloseableIteratorListener
 import org.apache.hudi.storage.StoragePath
 
 import org.apache.hadoop.conf.Configuration
@@ -134,7 +135,7 @@ class HoodieMultipleBaseFileFormat(tableState: 
Broadcast[HoodieTableState],
     (file: PartitionedFile) => {
       val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
       val fileFormat = detectFileFormat(filePath.toString)
-      file.partitionValues match {
+      val iter = file.partitionValues match {
         case fileSliceMapping: HoodiePartitionFileSliceMapping =>
           if (FSUtils.isLogFile(filePath)) {
             // no base file
@@ -192,6 +193,7 @@ class HoodieMultipleBaseFileFormat(tableState: 
Broadcast[HoodieTableState],
           case _ => throw new UnsupportedOperationException(s"Base file format 
$fileFormat is not supported.")
         }
       }
+      CloseableIteratorListener.addListener(iter)
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 4eef9a5aaec..d6989d9075c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.{HoodieMemoryConfig, 
TypedProperties}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
+import org.apache.hudi.data.CloseableIteratorListener
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.io.IOUtils
 import org.apache.hudi.storage.StorageConfiguration
@@ -163,7 +164,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
 
     (file: PartitionedFile) => {
       val storageConf = new 
HadoopStorageConfiguration(broadcastedStorageConf.value.value)
-      file.partitionValues match {
+      val iter = file.partitionValues match {
         // Snapshot or incremental queries.
         case fileSliceMapping: HoodiePartitionFileSliceMapping =>
           val fileGroupName = FSUtils.getFileIdFromFilePath(sparkAdapter
@@ -205,6 +206,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
           readBaseFile(file, parquetFileReader.value, requestedSchema, 
remainingPartitionSchema, fixedPartitionIndexes,
             requiredSchema, partitionSchema, outputSchema, filters, 
storageConf)
       }
+      CloseableIteratorListener.addListener(iter)
     }
   }
 
@@ -270,6 +272,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
 
   private def 
makeCloseableFileGroupMappingRecordIterator(closeableFileGroupRecordIterator: 
HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
                                                           mappingFunction: 
Function[InternalRow, InternalRow]): Iterator[InternalRow] = {
+    CloseableIteratorListener.addListener(closeableFileGroupRecordIterator)
     new Iterator[InternalRow] with Closeable {
       override def hasNext: Boolean = closeableFileGroupRecordIterator.hasNext
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
index 24c4663583e..a1b39d03584 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -31,6 +31,7 @@ import 
org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.util.{Option, ValidationUtils}
 import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig}
 import org.apache.hudi.config.HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE
+import org.apache.hudi.data.CloseableIteratorListener
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.index.bucket.partition.{PartitionBucketIndexCalculator, 
PartitionBucketIndexUtils}
 import org.apache.hudi.internal.schema.InternalSchema
@@ -242,6 +243,7 @@ class PartitionBucketIndexManager extends BaseProcedure
             false)
           fileGroupReader.initRecordIterators()
           val iterator = 
fileGroupReader.getClosableIterator.asInstanceOf[HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow]]
+          CloseableIteratorListener.addListener(iterator)
           iterator.asScala
         })
       }

Reply via email to