yihua commented on code in PR #18348:
URL: https://github.com/apache/hudi/pull/18348#discussion_r3036317837


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/UnsupportedExpressionIndexRecordGenerator.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.metadata.model.FileInfoAndPartition;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import java.util.List;
+
+/**
+ * Fallback {@link ExpressionIndexRecordGenerator} that throws a not-supported 
exception
+ * when expression index bootstrap is requested for unsupported engines.
+ */
+public class UnsupportedExpressionIndexRecordGenerator implements 
ExpressionIndexRecordGenerator {
+
+  private final EngineType engineType;
+
+  public UnsupportedExpressionIndexRecordGenerator(EngineType engineType) {
+    this.engineType = engineType;
+  }
+
+  @Override
+  public EngineType getEngineType() {
+    return engineType;
+  }
+
+  @Override
+  public HoodieData<HoodieRecord> generate(
+      List<FileInfoAndPartition> filesToIndex,
+      HoodieIndexDefinition indexDefinition,
+      HoodieTableMetaClient metaClient,
+      int parallelism,
+      HoodieSchema tableSchema,
+      HoodieSchema readerSchema,
+      StorageConfiguration<?> storageConf,
+      String instantTime) {
+    if 
(metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT))
 {
+      throw new HoodieNotSupportedException("Table version 6 and below does 
not support expression index");
+    }

Review Comment:
   πŸ€– The error message says "Table version 6 and below" but the check is 
`lesserThan(HoodieTableVersion.EIGHT)`, which also covers version 7 (0.16.0). 
Should this say "Table version 7 and below" or "versions before 8"?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.columnstats;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.index.HoodieIndexUtils.register;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
+
+/**
+ * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata
+ */
+@Slf4j
+public class ColumnStatsIndexer extends BaseIndexer {
+  private Lazy<List<String>> columnsToIndex;
+
+  public ColumnStatsIndexer(HoodieEngineContext engineContext,
+                               HoodieWriteConfig dataTableWriteConfig,
+                               HoodieTableMetaClient dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+
+    this.columnsToIndex = Lazy.lazily(() ->
+        new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(
+            dataTableMetaClient.getTableConfig(),
+            dataTableWriteConfig.getMetadataConfig(),
+            Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)),
+            true,
+            Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()),
+            existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, 
dataTableMetaClient)).keySet()));
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> 
lazyLatestMergedPartitionFileSliceList) throws IOException {
+    final int fileGroupCount = 
dataTableWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    if (partitionToAllFilesMap.isEmpty()) {
+      this.columnsToIndex = Lazy.lazily(Collections::emptyList);
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData()));
+    }

Review Comment:
   πŸ€– Overwriting `this.columnsToIndex` with an empty list when 
`partitionToAllFilesMap` is empty means `postInitialization` will register the 
column_stats index definition with zero source fields. Is that intentional? If 
this indexer instance is reused for a subsequent call with non-empty 
partitions, the columns would also be lost.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema;
+
+/**
+ * Base implementation for record-index.
+ */
+@Slf4j
+public abstract class BaseRecordIndexer extends BaseIndexer {
+
+  private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
+
+  protected BaseRecordIndexer(HoodieEngineContext engineContext, 
HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient 
dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    return initializeRecordIndexPartition(null, 
latestMergedPartitionFileSliceList, recordIndexMaxParallelism);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      String dataPartition,
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    log.info("Initializing record index from {} file slices", 
latestMergedPartitionFileSliceList.size());
+    HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot(
+        engineContext,
+        latestMergedPartitionFileSliceList,
+        recordIndexMaxParallelism,
+        this.getClass().getSimpleName(),
+        dataTableMetaClient,
+        dataTableWriteConfig);
+
+    // Initialize the file groups
+    final int fileGroupCount = estimateFileGroupCount(records);
+    log.info("Initializing record index with {} file groups.", fileGroupCount);
+    return new DataPartitionAndRecords(fileGroupCount, 
Option.ofNullable(dataPartition), records);
+  }
+
+  @Override
+  public void postInitialization(HoodieTableMetaClient metadataMetaClient, 
HoodieData<HoodieRecord> records, int fileGroupCount, String 
relativePartitionPath) {
+    super.postInitialization(metadataMetaClient, records, fileGroupCount, 
relativePartitionPath);
+    // Validate record index after commit if validation is enabled
+    if 
(dataTableWriteConfig.getMetadataConfig().isRecordIndexInitializationValidationEnabled())
 {
+      validateRecordIndex(records, fileGroupCount, metadataMetaClient);
+    }
+    records.unpersist();
+  }
+
+  /**
+   * Validates the record index after bootstrap by comparing the expected 
record count with the actual
+   * record count stored in the metadata table. The validation is performed in 
a distributed manner
+   * using the engine context to count records from HFiles in parallel.
+   *
+   * @param recordIndexRecords the HoodieData containing the expected records
+   * @param fileGroupCount the expected number of file groups
+   * @param metadataMetaClient meta client for the metadata table
+   */
+  protected void validateRecordIndex(HoodieData<HoodieRecord> 
recordIndexRecords, int fileGroupCount, HoodieTableMetaClient 
metadataMetaClient) {
+    String partitionName = 
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+    try {
+      // Use merged file slices to handle cases with pending compactions
+      List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
fsView, partitionName);
+
+      // Filter to only file slices with base files and extract their storage 
paths
+      List<StoragePath> baseFilePaths = fileSlices.stream()
+          .filter(fs -> fs.getBaseFile().isPresent())
+          .map(fs -> fs.getBaseFile().get().getStoragePath())
+          .collect(Collectors.toList());
+
+      // Count records in a distributed manner using the engine context
+      long totalRecords = countRecordsInHFiles(baseFilePaths, 
metadataMetaClient);
+      long expectedRecordCount = recordIndexRecords.count();
+
+      ValidationUtils.checkArgument(totalRecords == expectedRecordCount, 
"Record Count Validation failed with "
+          + totalRecords + " present in record index vs the expected " + 
expectedRecordCount);
+      log.info(String.format("Record index initialized on %d shards (expected 
= %d) with %d records (expected = %d)",
+          fileSlices.size(), fileGroupCount, totalRecords, 
expectedRecordCount));
+    } finally {
+      fsView.close();
+    }
+  }
+
+  /**
+   * Counts the total number of records in HFiles in a distributed manner.
+   *
+   * @param baseFilePaths list of storage paths to HFiles
+   * @param metadataMetaClient meta client for the metadata table
+   * @return total number of records across all HFiles
+   */
+  private long countRecordsInHFiles(List<StoragePath> baseFilePaths, 
HoodieTableMetaClient metadataMetaClient) {
+    if (baseFilePaths.isEmpty()) {
+      return 0L;
+    }
+
+    int parallelism = Math.min(baseFilePaths.size(), 
dataTableWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    StorageConfiguration<?> storageConfBroadcast = 
metadataMetaClient.getStorageConf();
+    HoodieFileFormat baseFileFormat = 
metadataMetaClient.getTableConfig().getBaseFileFormat();
+
+    return engineContext.parallelize(baseFilePaths, parallelism)
+        .mapPartitions(pathIterator -> {
+          long count = 0L;
+          while (pathIterator.hasNext()) {
+            StoragePath path = pathIterator.next();
+            try {
+              HoodieStorage storage = HoodieStorageUtils.getStorage(path, 
storageConfBroadcast);
+              HoodieConfig readerConfig = new HoodieConfig();
+              HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieIOFactory.getIOFactory(storage)
+                  .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+                  .getFileReader(readerConfig, path, baseFileFormat, 
Option.empty());
+              try {
+                count += reader.getTotalRecords();
+              } finally {
+                reader.close();
+              }
+            } catch (IOException e) {
+              throw new HoodieIOException("Error reading total records from 
file " + path, e);
+            }
+          }
+          return Collections.singletonList(count).iterator();
+        }, true)
+        .collectAsList()
+        .stream()
+        .mapToLong(Long::longValue)
+        .sum();
+  }
+
+  /**
+   * Fetch record locations from FileSlice snapshot.
+   *
+   * @param engineContext             context ot use.
+   * @param partitionFileSlicePairs   list of pairs of partition and file 
slice.
+   * @param recordIndexMaxParallelism parallelism to use.
+   * @param activeModule              active module of interest.
+   * @param metaClient                metaclient instance to use.
+   * @param dataWriteConfig           write config to use.
+   * @return metadata records for initializing record index entries.
+   */
+  protected <T> HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(
+      HoodieEngineContext engineContext,
+      List<FileSliceAndPartition> partitionFileSlicePairs,
+      int recordIndexMaxParallelism,
+      String activeModule,
+      HoodieTableMetaClient metaClient,
+      HoodieWriteConfig dataWriteConfig) {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    Option<String> instantTime = 
metaClient.getActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants()
+        .lastInstant()
+        .map(HoodieInstant::requestedTime);
+    if (!instantTime.isPresent()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(activeModule, "Record Index: reading record 
keys from " + partitionFileSlicePairs.size() + " file slices");
+    final int parallelism = Math.min(partitionFileSlicePairs.size(), 
recordIndexMaxParallelism);
+    ReaderContextFactory<T> readerContextFactory = 
engineContext.getReaderContextFactory(metaClient);
+    return engineContext.parallelize(partitionFileSlicePairs, 
parallelism).flatMap(partitionAndFileSlice -> {
+      final String partition = partitionAndFileSlice.partitionPath();
+      final FileSlice fileSlice = partitionAndFileSlice.fileSlice();
+      final String fileId = fileSlice.getFileId();
+      HoodieReaderContext<T> readerContext = readerContextFactory.getContext();
+      HoodieSchema dataSchema = 
HoodieSchemaCache.intern(HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(dataWriteConfig.getWriteSchema()),
 dataWriteConfig.allowOperationMetadataField()));
+      HoodieSchema requestedSchema = 
metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema()

Review Comment:
   πŸ€– The `HoodieFileGroupReader` is created but never explicitly closed β€” only 
its iterator is wrapped in the `CloseableMappingIterator`. If 
`HoodieFileGroupReader` holds resources beyond what `getClosableIterator()` 
manages (e.g., open file handles, buffers), those would leak. Could you verify 
that closing the closeable iterator from the reader also fully closes the 
reader itself? If not, this needs a composite closeable that closes both.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGlobalRecordLevelIndexTableVersionSix.scala:
##########
@@ -18,15 +18,49 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.metadata.MetadataPartitionType
 
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Tag
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import java.util.Collections
 
 @Tag("functional-b")
 class TestGlobalRecordLevelIndexTableVersionSix extends 
TestGlobalRecordLevelIndex {
   override def commonOpts: Map[String, String] = super.commonOpts ++ Map(
     HoodieTableConfig.VERSION.key() -> "6",
     HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> "6"
   )
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  override def testRLIUpsertAndDropIndex(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name(),
+      HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true")

Review Comment:
   πŸ€– The comment mentions that `partition stats` should be present, but the 
assertion checks for a size of 2. If only `files` and `col stats` are expected, 
the count is correct, but the comment might be misleading.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -181,7 +165,8 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
   private boolean hasPartitionsStateChanged = false;
   protected final transient HoodieEngineContext engineContext;
   @Getter
-  protected final List<MetadataPartitionType> enabledPartitionTypes;
+  protected final transient Map<MetadataPartitionType, Indexer> 
enabledIndexerMap;

Review Comment:
   πŸ€– Making these fields `transient` breaks the `Serializable` contract of 
`HoodieTableMetadataWriter`.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/IndexerFactory.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata.index;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.bloomfilters.BloomFiltersIndexer;
+import org.apache.hudi.metadata.index.columnstats.ColumnStatsIndexer;
+import org.apache.hudi.metadata.index.expression.ExpressionIndexer;
+import org.apache.hudi.metadata.index.files.FilesIndexer;
+import org.apache.hudi.metadata.index.partitionstats.PartitionStatsIndexer;
+import org.apache.hudi.metadata.index.record.PartitionedRecordIndexer;
+import org.apache.hudi.metadata.index.record.RecordIndexer;
+import org.apache.hudi.metadata.index.secondary.SecondaryIndexer;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Factory for creating {@link Indexer} implementations and resolving enabled 
indexers
+ * based on table and metadata configuration.
+ */
+public class IndexerFactory {
+  private static Indexer getIndexer(MetadataPartitionType partitionType,
+                                   HoodieEngineContext engineContext,
+                                   HoodieWriteConfig dataTableWriteConfig,
+                                   HoodieTableMetaClient dataTableMetaClient,
+                                   ExpressionIndexRecordGenerator 
expressionIndexRecordGenerator) {
+    switch (partitionType) {
+      case FILES:
+        return new FilesIndexer(engineContext, dataTableWriteConfig, 
dataTableMetaClient);
+      case BLOOM_FILTERS:
+        return new BloomFiltersIndexer(engineContext, dataTableWriteConfig, 
dataTableMetaClient);
+      case COLUMN_STATS:
+        return new ColumnStatsIndexer(engineContext, dataTableWriteConfig, 
dataTableMetaClient);
+      case RECORD_INDEX:
+        return dataTableWriteConfig.isRecordLevelIndexEnabled()
+            ? new PartitionedRecordIndexer(engineContext, 
dataTableWriteConfig, dataTableMetaClient)
+            : new RecordIndexer(engineContext, dataTableWriteConfig, 
dataTableMetaClient);
+      case EXPRESSION_INDEX:
+        return new ExpressionIndexer(engineContext, dataTableWriteConfig, 
dataTableMetaClient, expressionIndexRecordGenerator);
+      case PARTITION_STATS:
+        return new PartitionStatsIndexer(engineContext, dataTableWriteConfig, 
dataTableMetaClient);
+      case SECONDARY_INDEX:
+        return new SecondaryIndexer(engineContext, dataTableWriteConfig, 
dataTableMetaClient);
+      default:
+        throw new HoodieNotSupportedException("Unsupported metadata partition 
type for indexing: " + partitionType);
+    }
+  }
+
+  /**
+   * Returns the map of metadata partition type to the indexer for the enabled 
metadata
+   * partition types based on the metadata config and table config.
+   */
+  public static Map<MetadataPartitionType, Indexer> getEnabledIndexerMap(
+      HoodieEngineContext engineContext,
+      HoodieWriteConfig dataTableWriteConfig,
+      HoodieTableMetaClient dataTableMetaClient,
+      ExpressionIndexRecordGenerator expressionIndexRecordGenerator) {
+    if (!dataTableWriteConfig.getMetadataConfig().isEnabled()) {
+      return Collections.emptyMap();
+    }
+    return 
Collections.unmodifiableMap(Arrays.stream(MetadataPartitionType.getValidValues(dataTableMetaClient.getTableConfig().getTableVersion()))
+        .filter(partitionType ->
+            
(partitionType.isMetadataPartitionEnabled(dataTableWriteConfig.getMetadataConfig(),
 dataTableMetaClient.getTableConfig())

Review Comment:
   πŸ€– Is there a versioned `getValidValues` method added to 
`MetadataPartitionType`? This might cause a compilation error.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.expression;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.metadata.model.FileInfoAndPartition;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
+import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX;
+
+/**
+ * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index
+ */
+@Slf4j
+public class ExpressionIndexer extends BaseIndexer {
+
+  private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator;
+
+  public ExpressionIndexer(
+      HoodieEngineContext engineContext,
+      HoodieWriteConfig dataTableWriteConfig,
+      HoodieTableMetaClient dataTableMetaClient,
+      ExpressionIndexRecordGenerator expressionIndexRecordGenerator) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+
+    this.expressionIndexRecordGenerator = expressionIndexRecordGenerator;
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> 
lazyLatestMergedPartitionFileSliceList) throws IOException {
+    Set<String> expressionIndexPartitionsToInit = 
getExpressionIndexPartitionsToInit(
+        EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), 
dataTableMetaClient);
+    if (expressionIndexPartitionsToInit.size() != 1) {
+      if (expressionIndexPartitionsToInit.size() > 1) {
+        log.warn("Skipping expression index initialization as only one 
expression index "
+            + "bootstrap at a time is supported for now. Provided: {}", 
expressionIndexPartitionsToInit);
+      }
+      return Collections.emptyList();
+    }
+
+    String indexName = expressionIndexPartitionsToInit.iterator().next();
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, 
dataTableMetaClient);
+    ValidationUtils.checkState(indexDefinition != null, "Expression Index 
definition is not present for index " + indexName);
+
+    List<FileSliceAndPartition> partitionFileSlicePairs = 
lazyLatestMergedPartitionFileSliceList.get();
+    List<FileInfoAndPartition> filesToIndex = new ArrayList<>();
+    partitionFileSlicePairs.forEach(fsp -> {
+      if (fsp.fileSlice().getBaseFile().isPresent()) {
+        filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
fsp.fileSlice().getBaseFile().get().getPath(), 
fsp.fileSlice().getBaseFile().get().getFileSize()));
+      }
+      fsp.fileSlice().getLogFiles()
+          .forEach(hoodieLogFile
+              -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize())));
+    });
+
+    int fileGroupCount = 
dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
+    if (partitionFileSlicePairs.isEmpty()) {
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
indexName, engineContext.emptyHoodieData()));
+    }
+

Review Comment:
   πŸ€– The `Lazy` wrapper here is created and immediately `.get()`-ed on the next 
line, so it doesn't actually provide any lazy evaluation benefit. Could 
simplify to a direct call: `HoodieSchema tableSchema = 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient).orElseThrow(...);`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/files/FilesIndexer.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata.index.files;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
+
+/**
+ * Implementation of {@link MetadataPartitionType#FILES} metadata
+ */
+@Slf4j
+public class FilesIndexer extends BaseIndexer {
+  public FilesIndexer(HoodieEngineContext engineContext, HoodieWriteConfig 
dataTableWriteConfig,
+                         HoodieTableMetaClient dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(String 
dataTableInstantTime, String instantTimeForPartition, Map<String, 
List<FileInfo>> partitionToAllFilesMap,
+                                                                
Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) 
throws IOException {
+    // FILES partition uses a single file group
+    final int fileGroupCount = 1;
+
+    Set<String> partitions = partitionToAllFilesMap.keySet();
+    final int totalDataFilesCount = 
partitionToAllFilesMap.values().stream().mapToInt(List::size).sum();
+    log.info("Committing total {} partitions and {} files to metadata", 
partitions.size(), totalDataFilesCount);
+
+    // Record which saves the list of all partitions
+    HoodieRecord record = 
HoodieMetadataPayload.createPartitionListRecord(partitions);
+    HoodieData<HoodieRecord> allPartitionsRecord = 
engineContext.parallelize(Collections.singletonList(record), 1);
+    if (partitionToAllFilesMap.isEmpty()) {
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
FILES.getPartitionPath(), allPartitionsRecord));
+    }
+
+    // Records which save the file listing of each partition
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating 
records for metadata FILES partition");
+    HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize(
+            new ArrayList<>(partitionToAllFilesMap.entrySet()), 
partitionToAllFilesMap.size())
+        .map(partitionInfo -> {
+          Map<String, Long> fileNameToSizeMap = 
partitionInfo.getValue().stream()
+              .collect(Collectors.toMap(FileInfo::fileName, FileInfo::size));
+          return HoodieMetadataPayload.createPartitionFilesRecord(
+              partitionInfo.getKey(), fileNameToSizeMap, 
Collections.emptyList());

Review Comment:
   πŸ€– Calling `fileListRecords.count()` triggers a full Spark action just for 
validation. For tables with many partitions, this doubles the initialization 
cost of this step. Could you validate using `partitionToAllFilesMap.size()` 
against `partitions.size()` instead (which is always true by construction), or 
remove this check?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -453,8 +454,25 @@ public static Set<String> getAllPartitionPaths() {
    */
   public static MetadataPartitionType[] getValidValues() {
     // ALL_PARTITIONS is just another record type in FILES partition

Review Comment:
   πŸ€– The no-arg `getValidValues()` now delegates to 
`getValidValues(HoodieTableVersion.current())`. Is this safe for all existing 
callers? If any caller uses this method in a context where the table is 
actually pre-v8, it would now silently include partition types 
(SECONDARY_INDEX, EXPRESSION_INDEX, PARTITION_STATS) that aren't supported for 
that table. Might be worth adding a note or deprecating the no-arg version in 
favor of always passing the table version explicitly.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -762,7 +724,8 @@ public static Map<String, HoodieData<HoodieRecord>> 
convertMissingPartitionRecor
       filesAddedCount[0] += filesToAdd.size();
       List<String> filesToDelete = filesDeleted.getOrDefault(partition, 
Collections.emptyList());
       fileDeleteCount[0] += filesToDelete.size();
-      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, filesToAdd, 
filesToDelete);
+      Map<String, Long> filesToAddMap = 
filesToAdd.stream().collect(Collectors.toMap(FileInfo::fileName, 
FileInfo::size));

Review Comment:
   πŸ€– The old signature used `Map<String, Long>` which inherently prevented 
duplicate file names. The new `List<FileInfo>` can potentially contain 
duplicates, and `Collectors.toMap(FileInfo::fileName, FileInfo::size)` will 
throw `IllegalStateException` on duplicate keys. Could you add a merge function 
(e.g., `Long::max`) to handle this defensively, or document why duplicates 
can't occur?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -432,109 +425,80 @@ private boolean initializeFromFilesystem(String 
dataTableInstantTime, List<Metad
         partitionInfoList = Collections.emptyList();
       }
     }
-    Map<String, Map<String, Long>> partitionIdToAllFilesMap = 
partitionInfoList.stream()
-        .map(p -> {
-          String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());
-          return Pair.of(partitionName, p.getFilenameToSizeMap());
-        })
-        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-    // validate that each index is eligible to be initialized
-    Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator();
-    while (iterator.hasNext()) {
-      MetadataPartitionType partitionType = iterator.next();
-      if (partitionType == PARTITION_STATS && 
!dataMetaClient.getTableConfig().isTablePartitioned()) {
-        // Partition stats index cannot be enabled for a non-partitioned table
-        iterator.remove();
-        this.enabledPartitionTypes.remove(partitionType);
-      }
+    Map<String, List<FileInfo>> partitionIdToAllFilesMap = 
DirectoryInfo.getPartitionToFileInfo(partitionInfoList);
+    Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = 
getLazyLatestMergedPartitionFileSliceList();
+
+    // FILES partition should always be initialized first if enabled
+    if (!filesPartitionAvailable) {
+      initializeMetadataPartition(FILES, 
indexerMapForPartitionsToInit.get(FILES),
+          dataTableInstantTime, partitionIdToAllFilesMap, 
lazyLatestMergedPartitionFileSliceList);

Review Comment:
   πŸ€– This will NPE if the metadata table directory exists but the FILES 
partition is not marked as available.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -417,7 +401,16 @@ private boolean initializeFromFilesystem(String 
dataTableInstantTime, List<Metad
     }
 
     // Already initialized partitions can be ignored
-    partitionsToInit.removeIf(metadataPartition -> 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable((metadataPartition)));
+    indexerMapForPartitionsToInit.keySet().removeIf(
+        metadataPartition -> 
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(metadataPartition));
+
+    // For a fresh table, defer RLI initialization
+    if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable()
+        && this.enabledIndexerMap.containsKey(RECORD_INDEX)
+        && 
dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 
0) {
+      this.enabledIndexerMap.remove(RECORD_INDEX);
+      indexerMapForPartitionsToInit.remove(RECORD_INDEX);
+    }
 

Review Comment:
   πŸ€– This will throw `UnsupportedOperationException` because 
`enabledIndexerMap` is unmodifiable.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/secondary/SecondaryIndexer.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.secondary;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX;
+import static 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices;
+
+/**
+ * Implementation of {@link MetadataPartitionType#SECONDARY_INDEX} index
+ */
+@Slf4j
+public class SecondaryIndexer extends BaseIndexer {
+
+  private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
+
+  public SecondaryIndexer(
+      HoodieEngineContext engineContext,
+      HoodieWriteConfig dataTableWriteConfig,
+      HoodieTableMetaClient dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(String 
dataTableInstantTime, String instantTimeForPartition, Map<String, 
List<FileInfo>> partitionToAllFilesMap,
+                                                                
Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) 
throws IOException {
+    Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit(SECONDARY_INDEX, 
dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient);
+    if (secondaryIndexPartitionsToInit.size() != 1) {
+      if (secondaryIndexPartitionsToInit.size() > 1) {
+        log.warn("Skipping secondary index initialization as only one 
secondary index bootstrap at a time is supported for now. Provided: {}", 
secondaryIndexPartitionsToInit);
+      }
+      return Collections.emptyList();
+    }
+
+    String indexName = secondaryIndexPartitionsToInit.iterator().next();
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, 
dataTableMetaClient);
+    ValidationUtils.checkState(indexDefinition != null, "Secondary Index 
definition is not present for index " + indexName);
+
+    List<FileSliceAndPartition> partitionFileSlicePairs = 
lazyLatestMergedPartitionFileSliceList.get();
+
+    int parallelism = Math.min(partitionFileSlicePairs.size(), 
dataTableWriteConfig.getMetadataConfig().getSecondaryIndexParallelism());
+    HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices(
+        engineContext,
+        partitionFileSlicePairs,
+        parallelism,
+        this.getClass().getSimpleName(),
+        dataTableMetaClient,
+        indexDefinition,
+        dataTableWriteConfig.getProps());

Review Comment:
   πŸ€– This passes `RECORD_INDEX` as the partition type to 
`estimateFileGroupCount` for secondary index sizing. Is that intentional? If 
`estimateFileGroupCount` uses the partition type for anything beyond estimation 
logic (e.g., logging, metrics, config lookup), passing the wrong type could 
cause confusion or incorrect behavior.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema;
+
+/**
+ * Base implementation for record-index.
+ */
+@Slf4j
+public abstract class BaseRecordIndexer extends BaseIndexer {
+
+  private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
+
+  protected BaseRecordIndexer(HoodieEngineContext engineContext, 
HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient 
dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    return initializeRecordIndexPartition(null, 
latestMergedPartitionFileSliceList, recordIndexMaxParallelism);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      String dataPartition,
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    log.info("Initializing record index from {} file slices", 
latestMergedPartitionFileSliceList.size());
+    HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot(
+        engineContext,
+        latestMergedPartitionFileSliceList,
+        recordIndexMaxParallelism,
+        this.getClass().getSimpleName(),
+        dataTableMetaClient,
+        dataTableWriteConfig);
+
+    // Initialize the file groups
+    final int fileGroupCount = estimateFileGroupCount(records);
+    log.info("Initializing record index with {} file groups.", fileGroupCount);
+    return new DataPartitionAndRecords(fileGroupCount, 
Option.ofNullable(dataPartition), records);
+  }
+
+  @Override
+  public void postInitialization(HoodieTableMetaClient metadataMetaClient, 
HoodieData<HoodieRecord> records, int fileGroupCount, String 
relativePartitionPath) {
+    super.postInitialization(metadataMetaClient, records, fileGroupCount, 
relativePartitionPath);
+    // Validate record index after commit if validation is enabled

Review Comment:
   πŸ€– `postInitialization` calls `records.unpersist()` at the end, but in 
`PartitionedRecordIndexer`, `estimateFileGroupCount` persists records for each 
partition independently. When the caller invokes `postInitialization`, which 
`records` object is passed? If it's only one of the per-partition datasets, the 
others remain persisted and never cleaned up. Is the caller responsible for 
unpersisting all of them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to