This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6848a68  [HUDI-1867] Streaming read for Flink COW table (#2895)
6848a68 is described below

commit 6848a683bd89a6f5f30559f7daff918a498f0808
Author: Danny Chan <[email protected]>
AuthorDate: Thu Apr 29 20:44:45 2021 +0800

    [HUDI-1867] Streaming read for Flink COW table (#2895)
    
    Supports streaming read for Copy On Write table.
---
 .../hudi/source/StreamReadMonitoringFunction.java  | 12 +++-
 .../org/apache/hudi/table/HoodieTableSource.java   | 25 +++++--
 .../table/format/mor/MergeOnReadInputFormat.java   | 78 +++++++++++++++++++++-
 .../apache/hudi/table/HoodieDataSourceITCase.java  | 14 ++--
 4 files changed, 112 insertions(+), 17 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 21966e0..f508726 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.source;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -111,6 +112,8 @@ public class StreamReadMonitoringFunction
 
   private final long maxCompactionMemoryInBytes;
 
+  private final boolean isDelta;
+
   public StreamReadMonitoringFunction(
       Configuration conf,
       Path path,
@@ -121,6 +124,7 @@ public class StreamReadMonitoringFunction
     this.metaClient = metaClient;
     this.interval = 
conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
     this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+    this.isDelta = 
conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
   }
 
   @Override
@@ -185,7 +189,10 @@ public class StreamReadMonitoringFunction
   @VisibleForTesting
   public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> 
context) {
     metaClient.reloadActiveTimeline();
-    HoodieTimeline commitTimeline = 
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
+    HoodieTimeline commitTimeline = isDelta
+        // if is delta, exclude the parquet files from compaction
+        ? 
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
+        : 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     if (commitTimeline.empty()) {
       LOG.warn("No splits found for the table under path " + path);
       return;
@@ -238,8 +245,9 @@ public class StreamReadMonitoringFunction
               .sorted(HoodieLogFile.getLogFileComparator())
               .map(logFile -> logFile.getPath().toString())
               .collect(Collectors.toList()));
+          String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
           return new MergeOnReadInputSplit(cnt.getAndAdd(1),
-              null, logPaths, commitToIssue,
+              basePath, logPaths, commitToIssue,
               metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, 
instantRange);
         }).collect(Collectors.toList()))
         .flatMap(Collection::stream)
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index ada9e01..bc53c92 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.HoodieROTablePathFilter;
@@ -156,11 +155,6 @@ public class HoodieTableSource implements
     this.hadoopConf = StreamerUtil.getHadoopConf();
     this.metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
     this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new 
JobConf(this.hadoopConf));
-    if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
-      ValidationUtils.checkArgument(
-          
conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ),
-          "Streaming read is only supported for table type: " + 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
-    }
   }
 
   @Override
@@ -377,6 +371,25 @@ public class HoodieTableSource implements
                 .emitDelete(isStreaming)
                 .build();
           case COPY_ON_WRITE:
+            if (isStreaming) {
+              final MergeOnReadTableState hoodieTableState2 = new 
MergeOnReadTableState(
+                  rowType,
+                  requiredRowType,
+                  tableAvroSchema.toString(),
+                  
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+                  Collections.emptyList(),
+                  conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+              return MergeOnReadInputFormat.builder()
+                  .config(this.conf)
+                  .paths(FilePathUtils.toFlinkPaths(paths))
+                  .tableState(hoodieTableState2)
+                  // use the explicit fields data type because the 
AvroSchemaConverter
+                  // is not very stable.
+                  .fieldTypes(rowDataType.getChildren())
+                  
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+                  .limit(this.limit)
+                  .build();
+            }
             FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
                 FilePathUtils.toFlinkPaths(paths),
                 this.schema.getFieldNames(),
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 1264ea9..595bf5a 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format.mor;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
@@ -64,6 +65,7 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
 import static 
org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
 import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
 import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
 
@@ -162,9 +164,17 @@ public class MergeOnReadInputFormat
   public void open(MergeOnReadInputSplit split) throws IOException {
     this.currentReadCount = 0L;
     this.hadoopConf = StreamerUtil.getHadoopConf();
-    if (!split.getLogPaths().isPresent()) {
-      // base file only
-      this.iterator = new 
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
+    if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() 
> 0)) {
+      if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
+        // base file only with commit time filtering
+        this.iterator = new BaseFileOnlyFilteringIterator(
+            split.getInstantRange(),
+            this.tableState.getRequiredRowType(),
+            getReader(split.getBasePath().get(), 
getRequiredPosWithCommitTime(this.requiredPos)));
+      } else {
+        // base file only
+        this.iterator = new 
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
+      }
     } else if (!split.getBasePath().isPresent()) {
       // log files only
       this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
@@ -390,6 +400,57 @@ public class MergeOnReadInputFormat
     }
   }
 
+  /**
+   * Similar with {@link BaseFileOnlyIterator} but with instant time filtering.
+   */
+  static class BaseFileOnlyFilteringIterator implements RecordIterator {
+    // base file reader
+    private final ParquetColumnarRowSplitReader reader;
+    private final InstantRange instantRange;
+    private final RowDataProjection projection;
+
+    private RowData currentRecord;
+
+    BaseFileOnlyFilteringIterator(
+        Option<InstantRange> instantRange,
+        RowType requiredRowType,
+        ParquetColumnarRowSplitReader reader) {
+      this.reader = reader;
+      this.instantRange = instantRange.orElse(null);
+      int[] positions = IntStream.range(1, 1 + 
requiredRowType.getFieldCount()).toArray();
+      projection = RowDataProjection.instance(requiredRowType, positions);
+    }
+
+    @Override
+    public boolean reachedEnd() throws IOException {
+      while (!this.reader.reachedEnd()) {
+        currentRecord = this.reader.nextRecord();
+        if (instantRange != null) {
+          boolean isInRange = 
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
+          if (isInRange) {
+            return false;
+          }
+        } else {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public RowData nextRecord() {
+      // can promote: no need to project with null instant range
+      return projection.project(currentRecord);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (this.reader != null) {
+        this.reader.close();
+      }
+    }
+  }
+
   static class LogFileOnlyIterator implements RecordIterator {
     // iterator for log files
     private final Iterator<RowData> iterator;
@@ -625,6 +686,17 @@ public class MergeOnReadInputFormat
     }
   }
 
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private static int[] getRequiredPosWithCommitTime(int[] requiredPos) {
+    int[] requiredPos2 = new int[requiredPos.length + 1];
+    requiredPos2[0] = HOODIE_COMMIT_TIME_COL_POS;
+    System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length);
+    return requiredPos2;
+  }
+
   @VisibleForTesting
   public void isEmitDelete(boolean emitDelete) {
     this.emitDelete = emitDelete;
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index fe652c5..3d413a7 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -82,8 +82,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
   @TempDir
   File tempFile;
 
-  @Test
-  void testStreamWriteAndRead() throws Exception {
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
     // create filesystem table named source
     String createSource = TestConfigurations.getFileSourceDDL("source");
     streamTableEnv.executeSql(createSource);
@@ -91,7 +92,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
     options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
-    options.put(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
     String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
     streamTableEnv.executeSql(hoodieTableDDL);
     String insertInto = "insert into t1 select * from source";
@@ -106,8 +107,9 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
   }
 
-  @Test
-  void testStreamReadAppendData() throws Exception {
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testStreamReadAppendData(HoodieTableType tableType) throws Exception {
     // create filesystem table named source
     String createSource = TestConfigurations.getFileSourceDDL("source");
     String createSource2 = TestConfigurations.getFileSourceDDL("source2", 
"test_source_2.data");
@@ -117,7 +119,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
     options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
-    options.put(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
     String createHoodieTable = 
TestConfigurations.getCreateHoodieTableDDL("t1", options);
     streamTableEnv.executeSql(createHoodieTable);
     String insertInto = "insert into t1 select * from source";

Reply via email to