This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fe04212745d [HUDI-6973] Instantiate HoodieFileGroupRecordBuffer inside
new file group reader (#9910)
fe04212745d is described below
commit fe04212745d9e34b09695b2d26205d951005c78f
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Oct 25 01:00:24 2023 -0700
[HUDI-6973] Instantiate HoodieFileGroupRecordBuffer inside new file group
reader (#9910)
This commit refactors the new file group reader (`HoodieFileGroupReader`)
to instantiate `HoodieFileGroupRecordBuffer` inside the file group reader's
constrcutors, instead of being passed in from outside.
---
.../read/HoodieBaseFileGroupRecordBuffer.java | 8 +---
.../common/table/read/HoodieFileGroupReader.java | 44 ++++++++++++++++------
.../read/HoodieKeyBasedFileGroupRecordBuffer.java | 8 ++--
.../HoodiePositionBasedFileGroupRecordBuffer.java | 8 ++--
.../table/read/TestHoodieFileGroupReaderBase.java | 18 ++-------
...odieFileGroupReaderBasedParquetFileFormat.scala | 29 ++------------
6 files changed, 47 insertions(+), 68 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index a1685b82a93..991428ae87c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -55,7 +54,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
protected final Option<String[]> partitionPathFieldOpt;
protected final HoodieRecordMerger recordMerger;
protected final TypedProperties payloadProps;
- protected final HoodieTableMetaClient hoodieTableMetaClient;
protected final Map<Object, Pair<Option<T>, Map<String, Object>>> records;
protected ClosableIterator<T> baseFileIterator;
protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
@@ -67,8 +65,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
HoodieRecordMerger recordMerger,
- TypedProperties payloadProps,
- HoodieTableMetaClient
hoodieTableMetaClient) {
+ TypedProperties payloadProps) {
this.readerContext = readerContext;
this.readerSchema = readerSchema;
this.baseFileSchema = baseFileSchema;
@@ -76,7 +73,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
this.partitionPathFieldOpt = partitionPathFieldOpt;
this.recordMerger = recordMerger;
this.payloadProps = payloadProps;
- this.hoodieTableMetaClient = hoodieTableMetaClient;
this.records = new HashMap<>();
}
@@ -162,7 +158,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
// For same ordering values, uses the natural order(arrival time
semantics).
Comparable existingOrderingVal = readerContext.getOrderingValue(
existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema,
- this.hoodieTableMetaClient.getTableConfig().getProps());
+ payloadProps);
Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
// Checks the ordering value does not equal to 0
// because we use 0 as the default value which means natural order
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 296b8155e99..b655238412d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -23,13 +23,17 @@ import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableQueryType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
import org.apache.hudi.exception.HoodieIOException;
@@ -41,9 +45,12 @@ import org.apache.hadoop.fs.Path;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
+import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
@@ -82,19 +89,14 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
HoodieTableQueryType queryType,
Option<String> instantTime,
Option<String> startInstantTime,
- HoodieFileGroupRecordBuffer<T> recordBuffer) {
+ boolean shouldUseRecordPosition) throws
Exception {
// This constructor is a placeholder now to allow automatically fetching
the correct list of
// base and log files for a file group.
// Derive base and log files and call the corresponding constructor.
- this.readerContext = readerContext;
- this.hadoopConf = metaClient.getHadoopConf();
- this.baseFilePath = Option.empty();
- this.logFilePathList = Option.empty();
- this.props = props;
- this.start = 0;
- this.length = Long.MAX_VALUE;
- this.baseFileIterator = new EmptyIterator<>();
- this.recordBuffer = recordBuffer;
+ this(readerContext, metaClient.getHadoopConf(),
metaClient.getBasePathV2().toString(),
+ instantTime.get(), Option.empty(), Option.empty(),
+ new TableSchemaResolver(metaClient).getTableAvroSchema(),
+ props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
}
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
@@ -107,7 +109,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
TypedProperties props,
long start,
long length,
- HoodieFileGroupRecordBuffer<T> recordBuffer) {
+ boolean shouldUseRecordPosition) {
this.readerContext = readerContext;
this.hadoopConf = hadoopConf;
this.baseFilePath = baseFilePath;
@@ -122,7 +124,25 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
this.readerState.baseFileAvroSchema = avroSchema;
this.readerState.logRecordAvroSchema = avroSchema;
this.readerState.mergeProps.putAll(props);
- this.recordBuffer = recordBuffer;
+ String filePath = baseFilePath.isPresent()
+ ? baseFilePath.get().getPath()
+ : logFilePathList.get().get(0);
+ String partitionPath = FSUtils.getRelativePartitionPath(
+ new Path(tablePath), new Path(filePath).getParent());
+ Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
+ ? Option.empty() : Option.of(partitionPath);
+ Option<Object> partitionConfigValue =
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
+ Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
+ ?
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
+ .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new
String[] {}))
+ : Option.empty();
+ this.recordBuffer = shouldUseRecordPosition
+ ? new HoodiePositionBasedFileGroupRecordBuffer<>(
+ readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ recordMerger, props)
+ : new HoodieKeyBasedFileGroupRecordBuffer<>(
+ readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ recordMerger, props);
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index d055246cfa8..479a0c1b339 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -20,11 +20,10 @@
package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
-import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.util.Option;
@@ -55,10 +54,9 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
HoodieRecordMerger recordMerger,
- TypedProperties payloadProps,
- HoodieTableMetaClient
hoodieTableMetaClient) {
+ TypedProperties payloadProps) {
super(readerContext, readerSchema, baseFileSchema,
partitionNameOverrideOpt, partitionPathFieldOpt,
- recordMerger, payloadProps, hoodieTableMetaClient);
+ recordMerger, payloadProps);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 7fc99523e19..6c80af8b56c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -20,11 +20,10 @@
package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
-import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.util.Option;
@@ -60,10 +59,9 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
HoodieRecordMerger
recordMerger,
- TypedProperties payloadProps,
- HoodieTableMetaClient
hoodieTableMetaClient) {
+ TypedProperties
payloadProps) {
super(readerContext, readerSchema, baseFileSchema,
partitionNameOverrideOpt, partitionPathFieldOpt,
- recordMerger, payloadProps, hoodieTableMetaClient);
+ recordMerger, payloadProps);
}
@Override
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index f3e142b2f57..af1fc120c5b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -38,7 +38,6 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -48,13 +47,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
-import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -131,18 +129,8 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
props.setProperty("hoodie.datasource.write.precombine.field", "timestamp");
props.setProperty("hoodie.payload.ordering.field", "timestamp");
props.setProperty(RECORD_MERGER_STRATEGY.key(),
RECORD_MERGER_STRATEGY.defaultValue());
- String filePath = fileSlice.getBaseFile().isPresent() ?
fileSlice.getBaseFile().get().getPath() : logFilePathList.get(0);
+ props.setProperty(PARTITION_FIELDS.key(),
metaClient.getTableConfig().getString(PARTITION_FIELDS.key()));
String[] partitionValues = {partitionPaths[0]};
- HoodieFileGroupRecordBuffer<T> recordBuffer = new
HoodieKeyBasedFileGroupRecordBuffer<>(
- getHoodieReaderContext(partitionValues),
- avroSchema,
- avroSchema,
- Option.of(getRelativePartitionPath(new Path(basePath), new
Path(filePath).getParent())),
- metaClient.getTableConfig().getPartitionFields(),
- getHoodieReaderContext(partitionValues).getRecordMerger(
- getStringWithAltKeys(props, RECORD_MERGER_STRATEGY,
RECORD_MERGER_STRATEGY.defaultValue())),
- props,
- metaClient);
HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<T>(
getHoodieReaderContext(partitionValues),
hadoopConf,
@@ -154,7 +142,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
props,
0,
fileSlice.getTotalFileSize(),
- recordBuffer);
+ false);
fileGroupReader.initRecordIterators();
while (fileGroupReader.hasNext()) {
actualRecordList.add(fileGroupReader.next());
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 5bc4e4a80f9..189bcb39e61 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -18,18 +18,14 @@
package org.apache.spark.sql.execution.datasources.parquet
import kotlin.NotImplementedError
-import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
-import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile,
HoodieRecord, HoodieRecordMerger}
-import org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile,
HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.read.{HoodieFileGroupReader,
HoodieFileGroupRecordBuffer, HoodieKeyBasedFileGroupRecordBuffer,
HoodiePositionBasedFileGroupRecordBuffer}
-import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys
+import org.apache.hudi.common.table.read.HoodieFileGroupReader
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils,
HoodieTableSchema, HoodieTableState, MergeOnReadSnapshotRelation,
PartitionFileSliceMapping, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
import org.apache.spark.sql.SparkSession
@@ -160,22 +156,6 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
preMergeBaseFileReader, partitionValues)
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
.builder().setConf(hadoopConf).setBasePath(tableState.tablePath).build
- val avroSchema: Schema =
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName)
- val filePath: String = if (baseFile != null) baseFile.getPath else
logFiles.head.toString
- val partitionNameOpt: HOption[String] =
HOption.of(FSUtils.getRelativePartitionPath(
- new Path(tableState.tablePath), new Path(filePath).getParent))
- val partitionPathFieldOpt = metaClient.getTableConfig.getPartitionFields
- val recordMerger: HoodieRecordMerger =
readerContext.getRecordMerger(getStringWithAltKeys(
- new TypedProperties, RECORD_MERGER_STRATEGY,
RECORD_MERGER_STRATEGY.defaultValue))
- val recordBuffer: HoodieFileGroupRecordBuffer[InternalRow] = if
(shouldUseRecordPosition) {
- new HoodiePositionBasedFileGroupRecordBuffer[InternalRow](
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
- recordMerger, new TypedProperties, metaClient)
- } else {
- new HoodieKeyBasedFileGroupRecordBuffer[InternalRow](
- readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
- recordMerger, new TypedProperties, metaClient)
- }
val reader = new HoodieFileGroupReader[InternalRow](
readerContext,
hadoopConf,
@@ -184,11 +164,10 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
HOption.of(baseFile),
HOption.of(logFiles.map(f => f.getPath.toString).asJava),
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory,
tableName),
- new TypedProperties(),
+ metaClient.getTableConfig.getProps,
start,
length,
- recordBuffer
- )
+ shouldUseRecordPosition)
reader.initRecordIterators()
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala
}