This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 07cc3e89a7 [HUDI-5306] Unify RecordIterator and HoodieParquetReader 
with ClosableIterator (#7340)
07cc3e89a7 is described below

commit 07cc3e89a73430832e6b109d215e67ef52aa4089
Author: Danny Chan <[email protected]>
AuthorDate: Thu Dec 1 17:13:59 2022 +0800

    [HUDI-5306] Unify RecordIterator and HoodieParquetReader with 
ClosableIterator (#7340)
    
    * Unify RecordIterator and HoodieParquetReader with ClosableIterator
    * Add a factory clazz for RecordIterator
    * Add more documents
---
 .../apache/hudi/configuration/OptionsResolver.java |   2 +-
 .../java/org/apache/hudi/table/format/CastMap.java |  19 ++-
 ...Reader.java => ParquetSplitRecordIterator.java} |  24 ++-
 ...odieParquetReader.java => RecordIterators.java} |  56 +++---
 ...eader.java => SchemaEvolvedRecordIterator.java} |  25 ++-
 .../hudi/table/format/cdc/CdcInputFormat.java      | 134 +++++++--------
 .../table/format/cow/CopyOnWriteInputFormat.java   |  19 ++-
 .../table/format/mor/MergeOnReadInputFormat.java   | 189 +++++++++------------
 8 files changed, 223 insertions(+), 245 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 16b15cf3d2..619674145a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -216,7 +216,7 @@ public class OptionsResolver {
    * Returns whether comprehensive schema evolution enabled.
    */
   public static boolean isSchemaEvolutionEnabled(Configuration conf) {
-    return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
false);
+    return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue());
   }
 
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
index 5f29e85adc..36cf870887 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
@@ -49,14 +49,17 @@ import static 
org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
 
 /**
  * CastMap is responsible for conversion of flink types when full schema 
evolution enabled.
- * Supported cast conversions:
- * Integer => Long, Float, Double, Decimal, String
- * Long => Float, Double, Decimal, String
- * Float => Double, Decimal, String
- * Double => Decimal, String
- * Decimal => Decimal, String
- * String => Decimal, Date
- * Date => String
+ *
+ * <p>Supported cast conversions:
+ * <ul>
+ *   <li>Integer => Long, Float, Double, Decimal, String</li>
+ *   <li>Long => Float, Double, Decimal, String</li>
+ *   <li>Float => Double, Decimal, String</li>
+ *   <li>Double => Decimal, String</li>
+ *   <li>Decimal => Decimal, String</li>
+ *   <li>String => Decimal, Date</li>
+ *   <li>Date => String</li>
+ * </ul>
  */
 public final class CastMap implements Serializable {
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java
similarity index 63%
rename from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java
rename to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java
index d13c6c7c21..7b26d71f11 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.table.format;
 
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
 import 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
 
 import org.apache.flink.table.data.RowData;
@@ -27,25 +29,33 @@ import java.io.IOException;
 /**
  * Hoodie wrapper for flink parquet reader.
  */
-public final class HoodieParquetSplitReader implements HoodieParquetReader {
+public final class ParquetSplitRecordIterator implements 
ClosableIterator<RowData> {
   private final ParquetColumnarRowSplitReader reader;
 
-  public HoodieParquetSplitReader(ParquetColumnarRowSplitReader reader) {
+  public ParquetSplitRecordIterator(ParquetColumnarRowSplitReader reader) {
     this.reader = reader;
   }
 
   @Override
-  public boolean reachedEnd() throws IOException {
-    return reader.reachedEnd();
+  public boolean hasNext() {
+    try {
+      return !reader.reachedEnd();
+    } catch (IOException e) {
+      throw new HoodieIOException("Decides whether the parquet columnar row 
split reader reached end exception", e);
+    }
   }
 
   @Override
-  public RowData nextRecord() {
+  public RowData next() {
     return reader.nextRecord();
   }
 
   @Override
-  public void close() throws IOException {
-    reader.close();
+  public void close() {
+    try {
+      reader.close();
+    } catch (IOException e) {
+      throw new HoodieIOException("Close the parquet columnar row split reader 
exception", e);
+    }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
similarity index 67%
rename from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java
rename to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index e762f03e98..8657f16ddc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.format;
 
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
@@ -26,23 +27,17 @@ import org.apache.hudi.util.RowDataProjection;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
-
 import org.apache.hadoop.conf.Configuration;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 
 /**
- * Base interface for hoodie parquet readers.
+ * Factory clazz for record iterators.
  */
-public interface HoodieParquetReader extends Closeable {
-
-  boolean reachedEnd() throws IOException;
+public abstract class RecordIterators {
 
-  RowData nextRecord();
-
-  static HoodieParquetReader getReader(
+  public static ClosableIterator<RowData> getParquetRecordIterator(
       InternalSchemaManager internalSchemaManager,
       boolean utcTimestamp,
       boolean caseSensitive,
@@ -55,10 +50,9 @@ public interface HoodieParquetReader extends Closeable {
       Path path,
       long splitStart,
       long splitLength) throws IOException {
-    Option<RowDataProjection> castProjection;
     InternalSchema fileSchema = 
internalSchemaManager.getFileSchema(path.getName());
     if (fileSchema.isEmptySchema()) {
-      return new HoodieParquetSplitReader(
+      return new ParquetSplitRecordIterator(
           ParquetSplitReaderUtil.genPartColumnarRowReader(
               utcTimestamp,
               caseSensitive,
@@ -73,27 +67,25 @@ public interface HoodieParquetReader extends Closeable {
               splitLength));
     } else {
       CastMap castMap = internalSchemaManager.getCastMap(fileSchema, 
fieldNames, fieldTypes, selectedFields);
-      castProjection = castMap.toRowDataProjection(selectedFields);
-      fieldNames = internalSchemaManager.getFileFieldNames(fileSchema, 
fieldNames);
-      fieldTypes = castMap.getFileFieldTypes();
-    }
-    HoodieParquetReader reader = new HoodieParquetSplitReader(
-        ParquetSplitReaderUtil.genPartColumnarRowReader(
-          utcTimestamp,
-          caseSensitive,
-          conf,
-          fieldNames,
-          fieldTypes,
-          partitionSpec,
-          selectedFields,
-          batchSize,
-          path,
-          splitStart,
-          splitLength));
-    if (castProjection.isPresent()) {
-      return new HoodieParquetEvolvedSplitReader(reader, castProjection.get());
-    } else {
-      return reader;
+      Option<RowDataProjection> castProjection = 
castMap.toRowDataProjection(selectedFields);
+      ClosableIterator<RowData> itr = new ParquetSplitRecordIterator(
+          ParquetSplitReaderUtil.genPartColumnarRowReader(
+              utcTimestamp,
+              caseSensitive,
+              conf,
+              internalSchemaManager.getFileFieldNames(fileSchema, fieldNames), 
// the reconciled field names
+              castMap.getFileFieldTypes(),                                     
// the reconciled field types
+              partitionSpec,
+              selectedFields,
+              batchSize,
+              path,
+              splitStart,
+              splitLength));
+      if (castProjection.isPresent()) {
+        return new SchemaEvolvedRecordIterator(itr, castProjection.get());
+      } else {
+        return itr;
+      }
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java
similarity index 63%
rename from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java
rename to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java
index 037a377635..739512c7b5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java
@@ -18,36 +18,35 @@
 
 package org.apache.hudi.table.format;
 
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.util.RowDataProjection;
 
 import org.apache.flink.table.data.RowData;
 
-import java.io.IOException;
-
 /**
- * Decorates origin hoodie parquet reader with cast projection.
+ * Decorates origin record iterator with cast projection.
  */
-public final class HoodieParquetEvolvedSplitReader implements 
HoodieParquetReader {
-  private final HoodieParquetReader originReader;
+public final class SchemaEvolvedRecordIterator implements 
ClosableIterator<RowData> {
+  private final ClosableIterator<RowData> nested;
   private final RowDataProjection castProjection;
 
-  public HoodieParquetEvolvedSplitReader(HoodieParquetReader originReader, 
RowDataProjection castProjection) {
-    this.originReader = originReader;
+  public SchemaEvolvedRecordIterator(ClosableIterator<RowData> nested, 
RowDataProjection castProjection) {
+    this.nested = nested;
     this.castProjection = castProjection;
   }
 
   @Override
-  public boolean reachedEnd() throws IOException {
-    return originReader.reachedEnd();
+  public boolean hasNext() {
+    return nested.hasNext();
   }
 
   @Override
-  public RowData nextRecord() {
-    return castProjection.project(originReader.nextRecord());
+  public RowData next() {
+    return castProjection.project(nested.next());
   }
 
   @Override
-  public void close() throws IOException {
-    originReader.close();
+  public void close() {
+    nested.close();
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 8d2842a160..4e162d8e2b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
@@ -38,7 +39,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.table.format.FormatUtils;
-import org.apache.hudi.table.format.HoodieParquetReader;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
@@ -94,11 +94,11 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
   }
 
   @Override
-  protected RecordIterator initIterator(MergeOnReadInputSplit split) throws 
IOException {
+  protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit 
split) throws IOException {
     if (split instanceof CdcInputSplit) {
       HoodieCDCSupplementalLoggingMode mode = 
OptionsResolver.getCDCSupplementalLoggingMode(conf);
       ImageManager manager = new ImageManager(conf, tableState.getRowType(), 
this::getFileSliceIterator);
-      Function<HoodieCDCFileSplit, RecordIterator> recordIteratorFunc =
+      Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc =
           cdcFileSplit -> getRecordIteratorV2(split.getTablePath(), 
split.getMaxCompactionMemoryInBytes(), cdcFileSplit, mode, manager);
       return new CdcFileSplitsIterator((CdcInputSplit) split, manager, 
recordIteratorFunc);
     } else {
@@ -113,10 +113,10 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
     return new Builder();
   }
 
-  private RecordIterator getFileSliceIterator(MergeOnReadInputSplit split) {
+  private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit 
split) {
     if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() 
> 0)) {
       // base file only
-      return new 
BaseFileOnlyIterator(getFullSchemaReader(split.getBasePath().get()));
+      return getBaseFileIteratorWithMetadata(split.getBasePath().get());
     } else if (!split.getBasePath().isPresent()) {
       // log files only
       return new LogFileOnlyIterator(getFullLogFileIterator(split));
@@ -134,11 +134,11 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
           Option.empty(),
           false,
           this.tableState.getOperationPos(),
-          getFullSchemaReader(split.getBasePath().get()));
+          getBaseFileIteratorWithMetadata(split.getBasePath().get()));
     }
   }
 
-  private RecordIterator getRecordIteratorV2(
+  private ClosableIterator<RowData> getRecordIteratorV2(
       String tablePath,
       long maxCompactionMemoryInBytes,
       HoodieCDCFileSplit fileSplit,
@@ -151,7 +151,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
     }
   }
 
-  private RecordIterator getRecordIterator(
+  private ClosableIterator<RowData> getRecordIterator(
       String tablePath,
       long maxCompactionMemoryInBytes,
       HoodieCDCFileSplit fileSplit,
@@ -162,7 +162,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
         ValidationUtils.checkState(fileSplit.getCdcFiles() != null && 
fileSplit.getCdcFiles().size() == 1,
             "CDC file path should exist and be only one");
         String path = new Path(tablePath, 
fileSplit.getCdcFiles().get(0)).toString();
-        return new AddBaseFileIterator(getRequiredSchemaReader(path));
+        return new AddBaseFileIterator(getBaseFileIterator(path));
       case BASE_FILE_DELETE:
         ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
             "Before file slice should exist");
@@ -192,26 +192,26 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
-  static class CdcFileSplitsIterator implements RecordIterator {
+  static class CdcFileSplitsIterator implements ClosableIterator<RowData> {
     private ImageManager imageManager; //  keep a reference to release resource
     private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
-    private final Function<HoodieCDCFileSplit, RecordIterator> 
recordIteratorFunc;
-    private RecordIterator recordIterator;
+    private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc;
+    private ClosableIterator<RowData> recordIterator;
 
     CdcFileSplitsIterator(
         CdcInputSplit inputSplit,
         ImageManager imageManager,
-        Function<HoodieCDCFileSplit, RecordIterator> recordIteratorFunc) {
+        Function<HoodieCDCFileSplit, ClosableIterator<RowData>> 
recordIteratorFunc) {
       this.fileSplitIterator = 
Arrays.asList(inputSplit.getChanges()).iterator();
       this.imageManager = imageManager;
       this.recordIteratorFunc = recordIteratorFunc;
     }
 
     @Override
-    public boolean reachedEnd() throws IOException {
+    public boolean hasNext() {
       if (recordIterator != null) {
-        if (!recordIterator.reachedEnd()) {
-          return false;
+        if (recordIterator.hasNext()) {
+          return true;
         } else {
           recordIterator.close(); // release resource
           recordIterator = null;
@@ -220,18 +220,18 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
       if (fileSplitIterator.hasNext()) {
         HoodieCDCFileSplit fileSplit = fileSplitIterator.next();
         recordIterator = recordIteratorFunc.apply(fileSplit);
-        return recordIterator.reachedEnd();
+        return recordIterator.hasNext();
       }
-      return true;
+      return false;
     }
 
     @Override
-    public RowData nextRecord() {
-      return recordIterator.nextRecord();
+    public RowData next() {
+      return recordIterator.next();
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
       if (recordIterator != null) {
         recordIterator.close();
       }
@@ -242,63 +242,63 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
     }
   }
 
-  static class AddBaseFileIterator implements RecordIterator {
-    // base file reader
-    private HoodieParquetReader reader;
+  static class AddBaseFileIterator implements ClosableIterator<RowData> {
+    // base file record iterator
+    private ClosableIterator<RowData> nested;
 
     private RowData currentRecord;
 
-    AddBaseFileIterator(HoodieParquetReader reader) {
-      this.reader = reader;
+    AddBaseFileIterator(ClosableIterator<RowData> nested) {
+      this.nested = nested;
     }
 
     @Override
-    public boolean reachedEnd() throws IOException {
-      if (!this.reader.reachedEnd()) {
-        currentRecord = this.reader.nextRecord();
+    public boolean hasNext() {
+      if (this.nested.hasNext()) {
+        currentRecord = this.nested.next();
         currentRecord.setRowKind(RowKind.INSERT);
-        return false;
+        return true;
       }
-      return true;
+      return false;
     }
 
     @Override
-    public RowData nextRecord() {
+    public RowData next() {
       return currentRecord;
     }
 
     @Override
-    public void close() throws IOException {
-      if (this.reader != null) {
-        this.reader.close();
-        this.reader = null;
+    public void close() {
+      if (this.nested != null) {
+        this.nested.close();
+        this.nested = null;
       }
     }
   }
 
-  static class RemoveBaseFileIterator implements RecordIterator {
-    private RecordIterator nested;
+  static class RemoveBaseFileIterator implements ClosableIterator<RowData> {
+    private ClosableIterator<RowData> nested;
     private final RowDataProjection projection;
 
-    RemoveBaseFileIterator(MergeOnReadTableState tableState, RecordIterator 
iterator) {
+    RemoveBaseFileIterator(MergeOnReadTableState tableState, 
ClosableIterator<RowData> iterator) {
       this.nested = iterator;
       this.projection = 
RowDataProjection.instance(tableState.getRequiredRowType(), 
tableState.getRequiredPositions());
     }
 
     @Override
-    public boolean reachedEnd() throws IOException {
-      return nested.reachedEnd();
+    public boolean hasNext() {
+      return nested.hasNext();
     }
 
     @Override
-    public RowData nextRecord() {
-      RowData row = nested.nextRecord();
+    public RowData next() {
+      RowData row = nested.next();
       row.setRowKind(RowKind.DELETE);
       return this.projection.project(row);
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
       if (this.nested != null) {
         this.nested.close();
         this.nested = null;
@@ -306,7 +306,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
     }
   }
 
-  abstract static class BaseImageIterator implements RecordIterator {
+  abstract static class BaseImageIterator implements ClosableIterator<RowData> 
{
     private final Schema requiredSchema;
     private final int[] requiredPos;
     private final GenericRecordBuilder recordBuilder;
@@ -353,18 +353,18 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
     }
 
     @Override
-    public boolean reachedEnd() {
+    public boolean hasNext() {
       if (this.sideImage != null) {
         currentImage = this.sideImage;
         this.sideImage = null;
-        return false;
+        return true;
       } else if (this.cdcItr.hasNext()) {
         cdcRecord = (GenericRecord) this.cdcItr.next();
         String op = String.valueOf(cdcRecord.get(0));
         resolveImage(op);
-        return false;
+        return true;
       }
-      return true;
+      return false;
     }
 
     protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord 
cdcRecord);
@@ -372,12 +372,12 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
     protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord 
cdcRecord);
 
     @Override
-    public RowData nextRecord() {
+    public RowData next() {
       return currentImage;
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
       if (this.cdcItr != null) {
         this.cdcItr.close();
         this.cdcItr = null;
@@ -420,7 +420,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
         MergeOnReadTableState tableState,
         org.apache.hadoop.conf.Configuration hadoopConf,
         Schema cdcSchema,
-        HoodieCDCFileSplit fileSplit) throws IOException {
+        HoodieCDCFileSplit fileSplit) {
       super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
     }
 
@@ -515,8 +515,8 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
     }
   }
 
-  static class ReplaceCommitIterator implements RecordIterator {
-    private final RecordIterator itr;
+  static class ReplaceCommitIterator implements ClosableIterator<RowData> {
+    private final ClosableIterator<RowData> itr;
     private final RowDataProjection projection;
 
     ReplaceCommitIterator(
@@ -524,16 +524,16 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
         String tablePath,
         MergeOnReadTableState tableState,
         HoodieCDCFileSplit fileSplit,
-        Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+        Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
       this.itr = initIterator(tablePath, 
StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), fileSplit, 
splitIteratorFunc);
       this.projection = 
RowDataProjection.instance(tableState.getRequiredRowType(), 
tableState.getRequiredPositions());
     }
 
-    private RecordIterator initIterator(
+    private ClosableIterator<RowData> initIterator(
         String tablePath,
         long maxCompactionMemoryInBytes,
         HoodieCDCFileSplit fileSplit,
-        Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+        Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
       // init before images
 
       // the before file slice must exist,
@@ -546,19 +546,19 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
     }
 
     @Override
-    public boolean reachedEnd() throws IOException {
-      return this.itr.reachedEnd();
+    public boolean hasNext() {
+      return this.itr.hasNext();
     }
 
     @Override
-    public RowData nextRecord() {
-      RowData row = this.itr.nextRecord();
+    public RowData next() {
+      RowData row = this.itr.next();
       row.setRowKind(RowKind.DELETE);
       return this.projection.project(row);
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
       this.itr.close();
     }
   }
@@ -602,14 +602,14 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
     private final HoodieWriteConfig writeConfig;
 
     private final RowDataSerializer serializer;
-    private final Function<MergeOnReadInputSplit, RecordIterator> 
splitIteratorFunc;
+    private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc;
 
     private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
 
     public ImageManager(
         Configuration flinkConf,
         RowType rowType,
-        Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+        Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
       this.serializer = new RowDataSerializer(rowType);
       this.splitIteratorFunc = splitIteratorFunc;
       this.cache = new TreeMap<>();
@@ -638,12 +638,12 @@ public class CdcInputFormat extends 
MergeOnReadInputFormat {
         long maxCompactionMemoryInBytes,
         FileSlice fileSlice) throws IOException {
       MergeOnReadInputSplit inputSplit = 
CdcInputFormat.fileSlice2Split(writeConfig.getBasePath(), fileSlice, 
maxCompactionMemoryInBytes);
-      RecordIterator itr = splitIteratorFunc.apply(inputSplit);
+      ClosableIterator<RowData> itr = splitIteratorFunc.apply(inputSplit);
       // initialize the image records map
       ExternalSpillableMap<String, byte[]> imageRecordsMap =
           FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes);
-      while (!itr.reachedEnd()) {
-        RowData row = itr.nextRecord();
+      while (itr.hasNext()) {
+        RowData row = itr.next();
         String recordKey = row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
         ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
         serializer.serialize(row, new BytesArrayOutputView(baos));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index 453d0fee23..820424549f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -20,8 +20,9 @@ package org.apache.hudi.table.format.cow;
 
 import java.util.Comparator;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.table.format.HoodieParquetReader;
+import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.table.format.RecordIterators;
 import org.apache.hudi.util.DataTypeUtils;
 
 import org.apache.flink.api.common.io.FileInputFormat;
@@ -75,7 +76,7 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
   private final SerializableConfiguration conf;
   private final long limit;
 
-  private transient HoodieParquetReader reader;
+  private transient ClosableIterator<RowData> itr;
   private transient long currentReadCount;
 
   /**
@@ -128,7 +129,7 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
       }
     });
 
-    this.reader = HoodieParquetReader.getReader(
+    this.itr = RecordIterators.getParquetRecordIterator(
         internalSchemaManager,
         utcTimestamp,
         true,
@@ -276,26 +277,26 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
   }
 
   @Override
-  public boolean reachedEnd() throws IOException {
+  public boolean reachedEnd() {
     if (currentReadCount >= limit) {
       return true;
     } else {
-      return reader.reachedEnd();
+      return !itr.hasNext();
     }
   }
 
   @Override
   public RowData nextRecord(RowData reuse) {
     currentReadCount++;
-    return reader.nextRecord();
+    return itr.next();
   }
 
   @Override
   public void close() throws IOException {
-    if (reader != null) {
-      this.reader.close();
+    if (itr != null) {
+      this.itr.close();
     }
-    this.reader = null;
+    this.itr = null;
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 7f7989e79e..2e40831d46 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -29,12 +29,13 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.FormatUtils;
-import org.apache.hudi.table.format.HoodieParquetReader;
 import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.table.format.RecordIterators;
 import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.RowDataProjection;
@@ -94,7 +95,7 @@ public class MergeOnReadInputFormat
   /**
    * Uniform iterator view for the underneath records.
    */
-  private transient RecordIterator iterator;
+  private transient ClosableIterator<RowData> iterator;
 
   // for project push down
   /**
@@ -178,17 +179,17 @@ public class MergeOnReadInputFormat
     mayShiftInputSplit(split);
   }
 
-  protected RecordIterator initIterator(MergeOnReadInputSplit split) throws 
IOException {
+  protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit 
split) throws IOException {
     if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() 
> 0)) {
       if (split.getInstantRange() != null) {
         // base file only with commit time filtering
         return new BaseFileOnlyFilteringIterator(
             split.getInstantRange(),
             this.tableState.getRequiredRowType(),
-            getReader(split.getBasePath().get(), 
getRequiredPosWithCommitTime(this.requiredPos)));
+            getBaseFileIterator(split.getBasePath().get(), 
getRequiredPosWithCommitTime(this.requiredPos)));
       } else {
         // base file only
-        return new 
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
+        return getBaseFileIterator(split.getBasePath().get());
       }
     } else if (!split.getBasePath().isPresent()) {
       // log files only
@@ -199,7 +200,7 @@ public class MergeOnReadInputFormat
       }
     } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
       return new SkipMergeIterator(
-          getRequiredSchemaReader(split.getBasePath().get()),
+          getBaseFileIterator(split.getBasePath().get()),
           getLogFileIterator(split));
     } else if 
(split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
       return new MergeIterator(
@@ -214,7 +215,7 @@ public class MergeOnReadInputFormat
           this.requiredPos,
           this.emitDelete,
           this.tableState.getOperationPos(),
-          getFullSchemaReader(split.getBasePath().get()));
+          getBaseFileIteratorWithMetadata(split.getBasePath().get()));
     } else {
       throw new HoodieException("Unable to select an Iterator to read the 
Hoodie MOR File Split for "
           + "file path: " + split.getBasePath()
@@ -253,14 +254,14 @@ public class MergeOnReadInputFormat
       return true;
     } else {
       // log file reaches end ?
-      return this.iterator.reachedEnd();
+      return !this.iterator.hasNext();
     }
   }
 
   @Override
   public RowData nextRecord(RowData o) {
     currentReadCount++;
-    return this.iterator.nextRecord();
+    return this.iterator.next();
   }
 
   @Override
@@ -295,19 +296,19 @@ public class MergeOnReadInputFormat
     }
   }
 
-  protected HoodieParquetReader getFullSchemaReader(String path) {
+  protected ClosableIterator<RowData> getBaseFileIteratorWithMetadata(String 
path) {
     try {
-      return getReader(path, IntStream.range(0, 
this.tableState.getRowType().getFieldCount()).toArray());
+      return getBaseFileIterator(path, IntStream.range(0, 
this.tableState.getRowType().getFieldCount()).toArray());
     } catch (IOException e) {
       throw new HoodieException("Get reader error for path: " + path);
     }
   }
 
-  protected HoodieParquetReader getRequiredSchemaReader(String path) throws 
IOException {
-    return getReader(path, this.requiredPos);
+  protected ClosableIterator<RowData> getBaseFileIterator(String path) throws 
IOException {
+    return getBaseFileIterator(path, this.requiredPos);
   }
 
-  private HoodieParquetReader getReader(String path, int[] requiredPos) throws 
IOException {
+  private ClosableIterator<RowData> getBaseFileIterator(String path, int[] 
requiredPos) throws IOException {
     // generate partition specs.
     LinkedHashMap<String, String> partSpec = 
FilePathUtils.extractPartitionKeyValues(
         new org.apache.hadoop.fs.Path(path).getParent(),
@@ -329,7 +330,7 @@ public class MergeOnReadInputFormat
       }
     });
 
-    return HoodieParquetReader.getReader(
+    return RecordIterators.getParquetRecordIterator(
         internalSchemaManager,
         this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
         true,
@@ -529,46 +530,12 @@ public class MergeOnReadInputFormat
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
-  protected interface RecordIterator {
-    boolean reachedEnd() throws IOException;
-
-    RowData nextRecord();
-
-    void close() throws IOException;
-  }
-
-  protected static class BaseFileOnlyIterator implements RecordIterator {
-    // base file reader
-    private final HoodieParquetReader reader;
-
-    public BaseFileOnlyIterator(HoodieParquetReader reader) {
-      this.reader = reader;
-    }
-
-    @Override
-    public boolean reachedEnd() throws IOException {
-      return this.reader.reachedEnd();
-    }
-
-    @Override
-    public RowData nextRecord() {
-      return this.reader.nextRecord();
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (this.reader != null) {
-        this.reader.close();
-      }
-    }
-  }
-
   /**
-   * Similar with {@link BaseFileOnlyIterator} but with instant time filtering.
+   * Base record iterator with instant time filtering.
    */
-  static class BaseFileOnlyFilteringIterator implements RecordIterator {
-    // base file reader
-    private final HoodieParquetReader reader;
+  static class BaseFileOnlyFilteringIterator implements 
ClosableIterator<RowData> {
+    // base file record iterator
+    private final ClosableIterator<RowData> nested;
     private final InstantRange instantRange;
     private final RowDataProjection projection;
 
@@ -577,44 +544,44 @@ public class MergeOnReadInputFormat
     BaseFileOnlyFilteringIterator(
         Option<InstantRange> instantRange,
         RowType requiredRowType,
-        HoodieParquetReader reader) {
-      this.reader = reader;
+        ClosableIterator<RowData> nested) {
+      this.nested = nested;
       this.instantRange = instantRange.orElse(null);
       int[] positions = IntStream.range(1, 1 + 
requiredRowType.getFieldCount()).toArray();
       projection = RowDataProjection.instance(requiredRowType, positions);
     }
 
     @Override
-    public boolean reachedEnd() throws IOException {
-      while (!this.reader.reachedEnd()) {
-        currentRecord = this.reader.nextRecord();
+    public boolean hasNext() {
+      while (this.nested.hasNext()) {
+        currentRecord = this.nested.next();
         if (instantRange != null) {
           boolean isInRange = 
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
           if (isInRange) {
-            return false;
+            return true;
           }
         } else {
-          return false;
+          return true;
         }
       }
-      return true;
+      return false;
     }
 
     @Override
-    public RowData nextRecord() {
+    public RowData next() {
       // can promote: no need to project with null instant range
       return projection.project(currentRecord);
     }
 
     @Override
-    public void close() throws IOException {
-      if (this.reader != null) {
-        this.reader.close();
+    public void close() {
+      if (this.nested != null) {
+        this.nested.close();
       }
     }
   }
 
-  protected static class LogFileOnlyIterator implements RecordIterator {
+  protected static class LogFileOnlyIterator implements 
ClosableIterator<RowData> {
     // iterator for log files
     private final ClosableIterator<RowData> iterator;
 
@@ -623,12 +590,12 @@ public class MergeOnReadInputFormat
     }
 
     @Override
-    public boolean reachedEnd() {
-      return !this.iterator.hasNext();
+    public boolean hasNext() {
+      return this.iterator.hasNext();
     }
 
     @Override
-    public RowData nextRecord() {
+    public RowData next() {
       return this.iterator.next();
     }
 
@@ -640,9 +607,9 @@ public class MergeOnReadInputFormat
     }
   }
 
-  static class SkipMergeIterator implements RecordIterator {
-    // base file reader
-    private final HoodieParquetReader reader;
+  static class SkipMergeIterator implements ClosableIterator<RowData> {
+    // base file record iterator
+    private final ClosableIterator<RowData> nested;
     // iterator for log files
     private final ClosableIterator<RowData> iterator;
 
@@ -653,34 +620,34 @@ public class MergeOnReadInputFormat
 
     private RowData currentRecord;
 
-    SkipMergeIterator(HoodieParquetReader reader, ClosableIterator<RowData> 
iterator) {
-      this.reader = reader;
+    SkipMergeIterator(ClosableIterator<RowData> nested, 
ClosableIterator<RowData> iterator) {
+      this.nested = nested;
       this.iterator = iterator;
     }
 
     @Override
-    public boolean reachedEnd() throws IOException {
-      if (!readLogs && !this.reader.reachedEnd()) {
-        currentRecord = this.reader.nextRecord();
-        return false;
+    public boolean hasNext() {
+      if (!readLogs && this.nested.hasNext()) {
+        currentRecord = this.nested.next();
+        return true;
       }
       readLogs = true;
       if (this.iterator.hasNext()) {
         currentRecord = this.iterator.next();
-        return false;
+        return true;
       }
-      return true;
+      return false;
     }
 
     @Override
-    public RowData nextRecord() {
+    public RowData next() {
       return currentRecord;
     }
 
     @Override
-    public void close() throws IOException {
-      if (this.reader != null) {
-        this.reader.close();
+    public void close() {
+      if (this.nested != null) {
+        this.nested.close();
       }
       if (this.iterator != null) {
         this.iterator.close();
@@ -688,9 +655,9 @@ public class MergeOnReadInputFormat
     }
   }
 
-  protected static class MergeIterator implements RecordIterator {
-    // base file reader
-    private final HoodieParquetReader reader;
+  protected static class MergeIterator implements ClosableIterator<RowData> {
+    // base file record iterator
+    private final ClosableIterator<RowData> nested;
     // log keys used for merging
     private final Iterator<String> logKeysIterator;
     // scanner
@@ -730,12 +697,12 @@ public class MergeOnReadInputFormat
         int[] requiredPos,
         boolean emitDelete,
         int operationPos,
-        HoodieParquetReader reader) { // the reader should be with full schema
+        ClosableIterator<RowData> nested) { // the iterator should be with 
full schema
       this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, 
tableSchema,
           querySchema,
           Option.of(RowDataProjection.instance(requiredRowType, requiredPos)),
           Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, 
requiredPos, new GenericRecordBuilder(requiredSchema))),
-          emitDelete, operationPos, reader);
+          emitDelete, operationPos, nested);
     }
 
     public MergeIterator(
@@ -750,9 +717,9 @@ public class MergeOnReadInputFormat
         Option<Function<IndexedRecord, GenericRecord>> avroProjection,
         boolean emitDelete,
         int operationPos,
-        HoodieParquetReader reader) { // the reader should be with full schema
+        ClosableIterator<RowData> nested) { // the iterator should be with 
full schema
       this.tableSchema = tableSchema;
-      this.reader = reader;
+      this.nested = nested;
       this.scanner = FormatUtils.logScanner(split, tableSchema, querySchema, 
flinkConf, hadoopConf);
       this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
       this.logKeysIterator = scanner.getRecords().keySet().iterator();
@@ -766,9 +733,9 @@ public class MergeOnReadInputFormat
     }
 
     @Override
-    public boolean reachedEnd() throws IOException {
-      while (!readLogs && !this.reader.reachedEnd()) {
-        currentRecord = this.reader.nextRecord();
+    public boolean hasNext() {
+      while (!readLogs && this.nested.hasNext()) {
+        currentRecord = this.nested.next();
         if (instantRange != null) {
           boolean isInRange = 
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
           if (!isInRange) {
@@ -794,14 +761,14 @@ public class MergeOnReadInputFormat
                 : mergedAvroRecord.get();
             this.currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
             this.currentRecord.setRowKind(rowKind);
-            return false;
+            return true;
           }
         }
         // project the full record in base with required positions
         if (projection.isPresent()) {
           currentRecord = projection.get().project(currentRecord);
         }
-        return false;
+        return true;
       }
       // read the logs
       readLogs = true;
@@ -816,42 +783,48 @@ public class MergeOnReadInputFormat
                 : insertAvroRecord.get();
             this.currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
             FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), 
this.operationPos);
-            return false;
+            return true;
           }
         }
       }
-      return true;
+      return false;
     }
 
-    private Option<IndexedRecord> getInsertValue(String curKey) throws 
IOException {
+    private Option<IndexedRecord> getInsertValue(String curKey) {
       final HoodieAvroRecord<?> record = (HoodieAvroRecord) 
scanner.getRecords().get(curKey);
       if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) {
         return Option.empty();
       }
-      return record.getData().getInsertValue(tableSchema);
+      try {
+        return record.getData().getInsertValue(tableSchema);
+      } catch (IOException e) {
+        throw new HoodieIOException("Get insert value from payload exception", 
e);
+      }
     }
 
     @Override
-    public RowData nextRecord() {
+    public RowData next() {
       return currentRecord;
     }
 
     @Override
-    public void close() throws IOException {
-      if (this.reader != null) {
-        this.reader.close();
+    public void close() {
+      if (this.nested != null) {
+        this.nested.close();
       }
       if (this.scanner != null) {
         this.scanner.close();
       }
     }
 
-    private Option<IndexedRecord> mergeRowWithLog(
-        RowData curRow,
-        String curKey) throws IOException {
+    private Option<IndexedRecord> mergeRowWithLog(RowData curRow, String 
curKey) {
       final HoodieAvroRecord<?> record = (HoodieAvroRecord) 
scanner.getRecords().get(curKey);
       GenericRecord historyAvroRecord = (GenericRecord) 
rowDataToAvroConverter.convert(tableSchema, curRow);
-      return record.getData().combineAndGetUpdateValue(historyAvroRecord, 
tableSchema, payloadProps);
+      try {
+        return record.getData().combineAndGetUpdateValue(historyAvroRecord, 
tableSchema, payloadProps);
+      } catch (IOException e) {
+        throw new HoodieIOException("Merge base and delta payloads exception", 
e);
+      }
     }
   }
 


Reply via email to