This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d4d665ecb1d5aaef45ac5ad2bb691d4c9af6f7be Author: Sivabalan Narayanan <[email protected]> AuthorDate: Tue Jan 4 21:57:18 2022 -0500 [HUDI-2966] Closing LogRecordScanner in compactor (#4478) * Closing LogRecordScanner in compactor * Addressing comments --- .../apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 58898e7..f00efa5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; @@ -54,6 +53,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap; private final Set<String> deltaRecordKeys; + private final HoodieMergedLogRecordScanner mergedLogRecordScanner; private int recordKeyIndex = HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS; private Iterator<String> deltaItr; @@ -61,7 +61,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader RecordReader<NullWritable, ArrayWritable> realReader) throws IOException { super(split, job); this.parquetReader = realReader; - this.deltaRecordMap = getMergedLogRecordScanner().getRecords(); + this.mergedLogRecordScanner = getMergedLogRecordScanner(); + this.deltaRecordMap = mergedLogRecordScanner.getRecords(); this.deltaRecordKeys = new HashSet<>(this.deltaRecordMap.keySet()); if (split.getHoodieVirtualKeyInfo().isPresent()) { this.recordKeyIndex = split.getHoodieVirtualKeyInfo().get().getRecordKeyFieldIndex(); @@ -192,7 +193,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader parquetReader.close(); // need clean the tmp file which created by logScanner // Otherwise, for resident process such as presto, the /tmp directory will overflow - ((ExternalSpillableMap) deltaRecordMap).close(); + mergedLogRecordScanner.close(); } @Override
