This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6848a68 [HUDI-1867] Streaming read for Flink COW table (#2895)
6848a68 is described below
commit 6848a683bd89a6f5f30559f7daff918a498f0808
Author: Danny Chan <[email protected]>
AuthorDate: Thu Apr 29 20:44:45 2021 +0800
[HUDI-1867] Streaming read for Flink COW table (#2895)
Supports streaming read for Copy On Write table.
---
.../hudi/source/StreamReadMonitoringFunction.java | 12 +++-
.../org/apache/hudi/table/HoodieTableSource.java | 25 +++++--
.../table/format/mor/MergeOnReadInputFormat.java | 78 +++++++++++++++++++++-
.../apache/hudi/table/HoodieDataSourceITCase.java | 14 ++--
4 files changed, 112 insertions(+), 17 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 21966e0..f508726 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -19,6 +19,7 @@
package org.apache.hudi.source;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -111,6 +112,8 @@ public class StreamReadMonitoringFunction
private final long maxCompactionMemoryInBytes;
+ private final boolean isDelta;
+
public StreamReadMonitoringFunction(
Configuration conf,
Path path,
@@ -121,6 +124,7 @@ public class StreamReadMonitoringFunction
this.metaClient = metaClient;
this.interval =
conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ this.isDelta =
conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
}
@Override
@@ -185,7 +189,10 @@ public class StreamReadMonitoringFunction
@VisibleForTesting
public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit>
context) {
metaClient.reloadActiveTimeline();
- HoodieTimeline commitTimeline =
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
+ HoodieTimeline commitTimeline = isDelta
+ // if is delta, exclude the parquet files from compaction
+ ?
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
+ :
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (commitTimeline.empty()) {
LOG.warn("No splits found for the table under path " + path);
return;
@@ -238,8 +245,9 @@ public class StreamReadMonitoringFunction
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
+ String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
- null, logPaths, commitToIssue,
+ basePath, logPaths, commitToIssue,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType,
instantRange);
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index ada9e01..bc53c92 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
@@ -156,11 +155,6 @@ public class HoodieTableSource implements
this.hadoopConf = StreamerUtil.getHadoopConf();
this.metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
this.maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new
JobConf(this.hadoopConf));
- if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
- ValidationUtils.checkArgument(
-
conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ),
- "Streaming read is only supported for table type: " +
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
- }
}
@Override
@@ -377,6 +371,25 @@ public class HoodieTableSource implements
.emitDelete(isStreaming)
.build();
case COPY_ON_WRITE:
+ if (isStreaming) {
+ final MergeOnReadTableState hoodieTableState2 = new
MergeOnReadTableState(
+ rowType,
+ requiredRowType,
+ tableAvroSchema.toString(),
+
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+ Collections.emptyList(),
+ conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+ return MergeOnReadInputFormat.builder()
+ .config(this.conf)
+ .paths(FilePathUtils.toFlinkPaths(paths))
+ .tableState(hoodieTableState2)
+ // use the explicit fields data type because the
AvroSchemaConverter
+ // is not very stable.
+ .fieldTypes(rowDataType.getChildren())
+
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+ .limit(this.limit)
+ .build();
+ }
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),
this.schema.getFieldNames(),
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 1264ea9..595bf5a 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format.mor;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
@@ -64,6 +65,7 @@ import java.util.stream.IntStream;
import static
org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
import static
org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
+import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
@@ -162,9 +164,17 @@ public class MergeOnReadInputFormat
public void open(MergeOnReadInputSplit split) throws IOException {
this.currentReadCount = 0L;
this.hadoopConf = StreamerUtil.getHadoopConf();
- if (!split.getLogPaths().isPresent()) {
- // base file only
- this.iterator = new
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
+ if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size()
> 0)) {
+ if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
+ // base file only with commit time filtering
+ this.iterator = new BaseFileOnlyFilteringIterator(
+ split.getInstantRange(),
+ this.tableState.getRequiredRowType(),
+ getReader(split.getBasePath().get(),
getRequiredPosWithCommitTime(this.requiredPos)));
+ } else {
+ // base file only
+ this.iterator = new
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
+ }
} else if (!split.getBasePath().isPresent()) {
// log files only
this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
@@ -390,6 +400,57 @@ public class MergeOnReadInputFormat
}
}
+ /**
+ * Similar with {@link BaseFileOnlyIterator} but with instant time filtering.
+ */
+ static class BaseFileOnlyFilteringIterator implements RecordIterator {
+ // base file reader
+ private final ParquetColumnarRowSplitReader reader;
+ private final InstantRange instantRange;
+ private final RowDataProjection projection;
+
+ private RowData currentRecord;
+
+ BaseFileOnlyFilteringIterator(
+ Option<InstantRange> instantRange,
+ RowType requiredRowType,
+ ParquetColumnarRowSplitReader reader) {
+ this.reader = reader;
+ this.instantRange = instantRange.orElse(null);
+ int[] positions = IntStream.range(1, 1 +
requiredRowType.getFieldCount()).toArray();
+ projection = RowDataProjection.instance(requiredRowType, positions);
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ while (!this.reader.reachedEnd()) {
+ currentRecord = this.reader.nextRecord();
+ if (instantRange != null) {
+ boolean isInRange =
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
+ if (isInRange) {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public RowData nextRecord() {
+ // can promote: no need to project with null instant range
+ return projection.project(currentRecord);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.reader != null) {
+ this.reader.close();
+ }
+ }
+ }
+
static class LogFileOnlyIterator implements RecordIterator {
// iterator for log files
private final Iterator<RowData> iterator;
@@ -625,6 +686,17 @@ public class MergeOnReadInputFormat
}
}
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private static int[] getRequiredPosWithCommitTime(int[] requiredPos) {
+ int[] requiredPos2 = new int[requiredPos.length + 1];
+ requiredPos2[0] = HOODIE_COMMIT_TIME_COL_POS;
+ System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length);
+ return requiredPos2;
+ }
+
@VisibleForTesting
public void isEmitDelete(boolean emitDelete) {
this.emitDelete = emitDelete;
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index fe652c5..3d413a7 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -82,8 +82,9 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
@TempDir
File tempFile;
- @Test
- void testStreamWriteAndRead() throws Exception {
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testStreamWriteAndRead(HoodieTableType tableType) throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);
@@ -91,7 +92,7 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
- options.put(FlinkOptions.TABLE_TYPE.key(),
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
@@ -106,8 +107,9 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
assertRowsEquals(rows2, TestData.DATA_SET_SOURCE_INSERT);
}
- @Test
- void testStreamReadAppendData() throws Exception {
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testStreamReadAppendData(HoodieTableType tableType) throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
String createSource2 = TestConfigurations.getFileSourceDDL("source2",
"test_source_2.data");
@@ -117,7 +119,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
- options.put(FlinkOptions.TABLE_TYPE.key(),
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ options.put(FlinkOptions.TABLE_TYPE.key(), tableType.name());
String createHoodieTable =
TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(createHoodieTable);
String insertInto = "insert into t1 select * from source";