This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fe04212745d [HUDI-6973] Instantiate HoodieFileGroupRecordBuffer inside 
new file group reader (#9910)
fe04212745d is described below

commit fe04212745d9e34b09695b2d26205d951005c78f
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Oct 25 01:00:24 2023 -0700

    [HUDI-6973] Instantiate HoodieFileGroupRecordBuffer inside new file group 
reader (#9910)
    
    This commit refactors the new file group reader (`HoodieFileGroupReader`) 
to instantiate `HoodieFileGroupRecordBuffer` inside the file group reader's 
constrcutors, instead of being passed in from outside.
---
 .../read/HoodieBaseFileGroupRecordBuffer.java      |  8 +---
 .../common/table/read/HoodieFileGroupReader.java   | 44 ++++++++++++++++------
 .../read/HoodieKeyBasedFileGroupRecordBuffer.java  |  8 ++--
 .../HoodiePositionBasedFileGroupRecordBuffer.java  |  8 ++--
 .../table/read/TestHoodieFileGroupReaderBase.java  | 18 ++-------
 ...odieFileGroupReaderBasedParquetFileFormat.scala | 29 ++------------
 6 files changed, 47 insertions(+), 68 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index a1685b82a93..991428ae87c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -55,7 +54,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected final Option<String[]> partitionPathFieldOpt;
   protected final HoodieRecordMerger recordMerger;
   protected final TypedProperties payloadProps;
-  protected final HoodieTableMetaClient hoodieTableMetaClient;
   protected final Map<Object, Pair<Option<T>, Map<String, Object>>> records;
   protected ClosableIterator<T> baseFileIterator;
   protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
@@ -67,8 +65,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
                                          Option<String> 
partitionNameOverrideOpt,
                                          Option<String[]> 
partitionPathFieldOpt,
                                          HoodieRecordMerger recordMerger,
-                                         TypedProperties payloadProps,
-                                         HoodieTableMetaClient 
hoodieTableMetaClient) {
+                                         TypedProperties payloadProps) {
     this.readerContext = readerContext;
     this.readerSchema = readerSchema;
     this.baseFileSchema = baseFileSchema;
@@ -76,7 +73,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     this.partitionPathFieldOpt = partitionPathFieldOpt;
     this.recordMerger = recordMerger;
     this.payloadProps = payloadProps;
-    this.hoodieTableMetaClient = hoodieTableMetaClient;
     this.records = new HashMap<>();
   }
 
@@ -162,7 +158,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
       // For same ordering values, uses the natural order(arrival time 
semantics).
       Comparable existingOrderingVal = readerContext.getOrderingValue(
           existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema,
-          this.hoodieTableMetaClient.getTableConfig().getProps());
+          payloadProps);
       Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
       // Checks the ordering value does not equal to 0
       // because we use 0 as the default value which means natural order
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 296b8155e99..b655238412d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -23,13 +23,17 @@ import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableQueryType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.EmptyIterator;
 import org.apache.hudi.exception.HoodieIOException;
@@ -41,9 +45,12 @@ import org.apache.hadoop.fs.Path;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
+import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
@@ -82,19 +89,14 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
                                HoodieTableQueryType queryType,
                                Option<String> instantTime,
                                Option<String> startInstantTime,
-                               HoodieFileGroupRecordBuffer<T> recordBuffer) {
+                               boolean shouldUseRecordPosition) throws 
Exception {
     // This constructor is a placeholder now to allow automatically fetching 
the correct list of
     // base and log files for a file group.
     // Derive base and log files and call the corresponding constructor.
-    this.readerContext = readerContext;
-    this.hadoopConf = metaClient.getHadoopConf();
-    this.baseFilePath = Option.empty();
-    this.logFilePathList = Option.empty();
-    this.props = props;
-    this.start = 0;
-    this.length = Long.MAX_VALUE;
-    this.baseFileIterator = new EmptyIterator<>();
-    this.recordBuffer = recordBuffer;
+    this(readerContext, metaClient.getHadoopConf(), 
metaClient.getBasePathV2().toString(),
+        instantTime.get(), Option.empty(), Option.empty(),
+        new TableSchemaResolver(metaClient).getTableAvroSchema(),
+        props, 0, Long.MAX_VALUE, shouldUseRecordPosition);
   }
 
   public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
@@ -107,7 +109,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
                                TypedProperties props,
                                long start,
                                long length,
-                               HoodieFileGroupRecordBuffer<T> recordBuffer) {
+                               boolean shouldUseRecordPosition) {
     this.readerContext = readerContext;
     this.hadoopConf = hadoopConf;
     this.baseFilePath = baseFilePath;
@@ -122,7 +124,25 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     this.readerState.baseFileAvroSchema = avroSchema;
     this.readerState.logRecordAvroSchema = avroSchema;
     this.readerState.mergeProps.putAll(props);
-    this.recordBuffer = recordBuffer;
+    String filePath = baseFilePath.isPresent()
+        ? baseFilePath.get().getPath()
+        : logFilePathList.get().get(0);
+    String partitionPath = FSUtils.getRelativePartitionPath(
+        new Path(tablePath), new Path(filePath).getParent());
+    Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPath)
+        ? Option.empty() : Option.of(partitionPath);
+    Option<Object> partitionConfigValue = 
ConfigUtils.getRawValueWithAltKeys(props, PARTITION_FIELDS);
+    Option<String[]> partitionPathFieldOpt = partitionConfigValue.isPresent()
+        ? 
Option.of(Arrays.stream(partitionConfigValue.get().toString().split(","))
+        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[] {}))
+        : Option.empty();
+    this.recordBuffer = shouldUseRecordPosition
+        ? new HoodiePositionBasedFileGroupRecordBuffer<>(
+        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        recordMerger, props)
+        : new HoodieKeyBasedFileGroupRecordBuffer<>(
+        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
+        recordMerger, props);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index d055246cfa8..479a0c1b339 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -20,11 +20,10 @@
 package org.apache.hudi.common.table.read;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
-import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.util.Option;
@@ -55,10 +54,9 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
                                              Option<String> 
partitionNameOverrideOpt,
                                              Option<String[]> 
partitionPathFieldOpt,
                                              HoodieRecordMerger recordMerger,
-                                             TypedProperties payloadProps,
-                                             HoodieTableMetaClient 
hoodieTableMetaClient) {
+                                             TypedProperties payloadProps) {
     super(readerContext, readerSchema, baseFileSchema, 
partitionNameOverrideOpt, partitionPathFieldOpt,
-        recordMerger, payloadProps, hoodieTableMetaClient);
+        recordMerger, payloadProps);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 7fc99523e19..6c80af8b56c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -20,11 +20,10 @@
 package org.apache.hudi.common.table.read;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
-import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.util.Option;
@@ -60,10 +59,9 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
                                                   Option<String> 
partitionNameOverrideOpt,
                                                   Option<String[]> 
partitionPathFieldOpt,
                                                   HoodieRecordMerger 
recordMerger,
-                                                  TypedProperties payloadProps,
-                                                  HoodieTableMetaClient 
hoodieTableMetaClient) {
+                                                  TypedProperties 
payloadProps) {
     super(readerContext, readerSchema, baseFileSchema, 
partitionNameOverrideOpt, partitionPathFieldOpt,
-        recordMerger, payloadProps, hoodieTableMetaClient);
+        recordMerger, payloadProps);
   }
 
   @Override
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index f3e142b2f57..af1fc120c5b 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -38,7 +38,6 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -48,13 +47,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
 import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
-import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -131,18 +129,8 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     props.setProperty("hoodie.datasource.write.precombine.field", "timestamp");
     props.setProperty("hoodie.payload.ordering.field", "timestamp");
     props.setProperty(RECORD_MERGER_STRATEGY.key(), 
RECORD_MERGER_STRATEGY.defaultValue());
-    String filePath = fileSlice.getBaseFile().isPresent() ? 
fileSlice.getBaseFile().get().getPath() : logFilePathList.get(0);
+    props.setProperty(PARTITION_FIELDS.key(), 
metaClient.getTableConfig().getString(PARTITION_FIELDS.key()));
     String[] partitionValues = {partitionPaths[0]};
-    HoodieFileGroupRecordBuffer<T> recordBuffer = new 
HoodieKeyBasedFileGroupRecordBuffer<>(
-        getHoodieReaderContext(partitionValues),
-        avroSchema,
-        avroSchema,
-        Option.of(getRelativePartitionPath(new Path(basePath), new 
Path(filePath).getParent())),
-        metaClient.getTableConfig().getPartitionFields(),
-        getHoodieReaderContext(partitionValues).getRecordMerger(
-            getStringWithAltKeys(props, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue())),
-        props,
-        metaClient);
     HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<T>(
         getHoodieReaderContext(partitionValues),
         hadoopConf,
@@ -154,7 +142,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
         props,
         0,
         fileSlice.getTotalFileSize(),
-        recordBuffer);
+        false);
     fileGroupReader.initRecordIterators();
     while (fileGroupReader.hasNext()) {
       actualRecordList.add(fileGroupReader.next());
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 5bc4e4a80f9..189bcb39e61 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -18,18 +18,14 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import kotlin.NotImplementedError
-import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
-import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile, 
HoodieRecord, HoodieRecordMerger}
-import org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile, 
HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.read.{HoodieFileGroupReader, 
HoodieFileGroupRecordBuffer, HoodieKeyBasedFileGroupRecordBuffer, 
HoodiePositionBasedFileGroupRecordBuffer}
-import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys
+import org.apache.hudi.common.table.read.HoodieFileGroupReader
 import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils, 
HoodieTableSchema, HoodieTableState, MergeOnReadSnapshotRelation, 
PartitionFileSliceMapping, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
 import org.apache.spark.sql.SparkSession
@@ -160,22 +156,6 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       preMergeBaseFileReader, partitionValues)
     val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
       .builder().setConf(hadoopConf).setBasePath(tableState.tablePath).build
-    val avroSchema: Schema = 
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName)
-    val filePath: String = if (baseFile != null) baseFile.getPath else 
logFiles.head.toString
-    val partitionNameOpt: HOption[String] = 
HOption.of(FSUtils.getRelativePartitionPath(
-      new Path(tableState.tablePath), new Path(filePath).getParent))
-    val partitionPathFieldOpt = metaClient.getTableConfig.getPartitionFields
-    val recordMerger: HoodieRecordMerger = 
readerContext.getRecordMerger(getStringWithAltKeys(
-      new TypedProperties, RECORD_MERGER_STRATEGY, 
RECORD_MERGER_STRATEGY.defaultValue))
-    val recordBuffer: HoodieFileGroupRecordBuffer[InternalRow] = if 
(shouldUseRecordPosition) {
-      new HoodiePositionBasedFileGroupRecordBuffer[InternalRow](
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
-        recordMerger, new TypedProperties, metaClient)
-    } else {
-      new HoodieKeyBasedFileGroupRecordBuffer[InternalRow](
-        readerContext, avroSchema, avroSchema, partitionNameOpt, 
partitionPathFieldOpt,
-        recordMerger, new TypedProperties, metaClient)
-    }
     val reader = new HoodieFileGroupReader[InternalRow](
       readerContext,
       hadoopConf,
@@ -184,11 +164,10 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       HOption.of(baseFile),
       HOption.of(logFiles.map(f => f.getPath.toString).asJava),
       HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, 
tableName),
-      new TypedProperties(),
+      metaClient.getTableConfig.getProps,
       start,
       length,
-      recordBuffer
-    )
+      shouldUseRecordPosition)
     reader.initRecordIterators()
     
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala
   }

Reply via email to