This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d4d665ecb1d5aaef45ac5ad2bb691d4c9af6f7be
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Jan 4 21:57:18 2022 -0500

    [HUDI-2966] Closing LogRecordScanner in compactor (#4478)
    
    * Closing LogRecordScanner in compactor
    
    * Addressing comments
---
 .../apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 58898e7..f00efa5 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
@@ -54,6 +53,7 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
   private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> 
deltaRecordMap;
 
   private final Set<String> deltaRecordKeys;
+  private final HoodieMergedLogRecordScanner mergedLogRecordScanner;
   private int recordKeyIndex = 
HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
   private Iterator<String> deltaItr;
 
@@ -61,7 +61,8 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
       RecordReader<NullWritable, ArrayWritable> realReader) throws IOException 
{
     super(split, job);
     this.parquetReader = realReader;
-    this.deltaRecordMap = getMergedLogRecordScanner().getRecords();
+    this.mergedLogRecordScanner = getMergedLogRecordScanner();
+    this.deltaRecordMap = mergedLogRecordScanner.getRecords();
     this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet());
     if (split.getHoodieVirtualKeyInfo().isPresent()) {
       this.recordKeyIndex = 
split.getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex();
@@ -192,7 +193,7 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
     parquetReader.close();
     // need clean the tmp file which created by logScanner
     // Otherwise, for resident process such as presto, the /tmp directory will 
overflow
-    ((ExternalSpillableMap) deltaRecordMap).close();
+    mergedLogRecordScanner.close();
   }
 
   @Override

Reply via email to