yihua commented on code in PR #18348: URL: https://github.com/apache/hudi/pull/18348#discussion_r3036317837
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/UnsupportedExpressionIndexRecordGenerator.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.metadata.model.FileInfoAndPartition; +import org.apache.hudi.storage.StorageConfiguration; + +import java.util.List; + +/** + * Fallback {@link ExpressionIndexRecordGenerator} that throws a not-supported exception + * when expression index bootstrap is requested for unsupported engines. + */ +public class UnsupportedExpressionIndexRecordGenerator implements ExpressionIndexRecordGenerator { + + private final EngineType engineType; + + public UnsupportedExpressionIndexRecordGenerator(EngineType engineType) { + this.engineType = engineType; + } + + @Override + public EngineType getEngineType() { + return engineType; + } + + @Override + public HoodieData<HoodieRecord> generate( + List<FileInfoAndPartition> filesToIndex, + HoodieIndexDefinition indexDefinition, + HoodieTableMetaClient metaClient, + int parallelism, + HoodieSchema tableSchema, + HoodieSchema readerSchema, + StorageConfiguration<?> storageConf, + String instantTime) { + if (metaClient.getTableConfig().getTableVersion().lesserThan(HoodieTableVersion.EIGHT)) { + throw new HoodieNotSupportedException("Table version 6 and below does not support expression index"); + } Review Comment: π€ The error message says "Table version 6 and below" but the check is `lesserThan(HoodieTableVersion.EIGHT)`, which also covers version 7 (0.16.0). Should this say "Table version 7 and below" or "versions before 8"? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.columnstats; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.index.HoodieIndexUtils.register; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault; +import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; + +/** + * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata + */ +@Slf4j +public class ColumnStatsIndexer extends BaseIndexer { + private Lazy<List<String>> columnsToIndex; + + public ColumnStatsIndexer(HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + + this.columnsToIndex = Lazy.lazily(() -> + new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex( + dataTableMetaClient.getTableConfig(), + dataTableWriteConfig.getMetadataConfig(), + Lazy.lazily(() -> HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)), + true, + Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()), + existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, dataTableMetaClient)).keySet())); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) throws IOException { + final int fileGroupCount = dataTableWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount(); + if (partitionToAllFilesMap.isEmpty()) { + this.columnsToIndex = Lazy.lazily(Collections::emptyList); + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData())); + } Review Comment: π€ Overwriting `this.columnsToIndex` with an empty list when `partitionToAllFilesMap` is empty means `postInitialization` will register the column_stats index definition with zero source fields. Is that intentional? If this indexer instance is reused for a subsequent call with non-empty partitions, the columns would also be lost. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema; + +/** + * Base implementation for record-index. + */ +@Slf4j +public abstract class BaseRecordIndexer extends BaseIndexer { + + private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48; + + protected BaseRecordIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + return initializeRecordIndexPartition(null, latestMergedPartitionFileSliceList, recordIndexMaxParallelism); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + String dataPartition, + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + log.info("Initializing record index from {} file slices", latestMergedPartitionFileSliceList.size()); + HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot( + engineContext, + latestMergedPartitionFileSliceList, + recordIndexMaxParallelism, + this.getClass().getSimpleName(), + dataTableMetaClient, + dataTableWriteConfig); + + // Initialize the file groups + final int fileGroupCount = estimateFileGroupCount(records); + log.info("Initializing record index with {} file groups.", fileGroupCount); + return new DataPartitionAndRecords(fileGroupCount, Option.ofNullable(dataPartition), records); + } + + @Override + public void postInitialization(HoodieTableMetaClient metadataMetaClient, HoodieData<HoodieRecord> records, int fileGroupCount, String relativePartitionPath) { + super.postInitialization(metadataMetaClient, records, fileGroupCount, relativePartitionPath); + // Validate record index after commit if validation is enabled + if (dataTableWriteConfig.getMetadataConfig().isRecordIndexInitializationValidationEnabled()) { + validateRecordIndex(records, fileGroupCount, metadataMetaClient); + } + records.unpersist(); + } + + /** + * Validates the record index after bootstrap by comparing the expected record count with the actual + * record count stored in the metadata table. The validation is performed in a distributed manner + * using the engine context to count records from HFiles in parallel. + * + * @param recordIndexRecords the HoodieData containing the expected records + * @param fileGroupCount the expected number of file groups + * @param metadataMetaClient meta client for the metadata table + */ + protected void validateRecordIndex(HoodieData<HoodieRecord> recordIndexRecords, int fileGroupCount, HoodieTableMetaClient metadataMetaClient) { + String partitionName = MetadataPartitionType.RECORD_INDEX.getPartitionPath(); + HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient); + try { + // Use merged file slices to handle cases with pending compactions + List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, fsView, partitionName); + + // Filter to only file slices with base files and extract their storage paths + List<StoragePath> baseFilePaths = fileSlices.stream() + .filter(fs -> fs.getBaseFile().isPresent()) + .map(fs -> fs.getBaseFile().get().getStoragePath()) + .collect(Collectors.toList()); + + // Count records in a distributed manner using the engine context + long totalRecords = countRecordsInHFiles(baseFilePaths, metadataMetaClient); + long expectedRecordCount = recordIndexRecords.count(); + + ValidationUtils.checkArgument(totalRecords == expectedRecordCount, "Record Count Validation failed with " + + totalRecords + " present in record index vs the expected " + expectedRecordCount); + log.info(String.format("Record index initialized on %d shards (expected = %d) with %d records (expected = %d)", + fileSlices.size(), fileGroupCount, totalRecords, expectedRecordCount)); + } finally { + fsView.close(); + } + } + + /** + * Counts the total number of records in HFiles in a distributed manner. + * + * @param baseFilePaths list of storage paths to HFiles + * @param metadataMetaClient meta client for the metadata table + * @return total number of records across all HFiles + */ + private long countRecordsInHFiles(List<StoragePath> baseFilePaths, HoodieTableMetaClient metadataMetaClient) { + if (baseFilePaths.isEmpty()) { + return 0L; + } + + int parallelism = Math.min(baseFilePaths.size(), dataTableWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()); + StorageConfiguration<?> storageConfBroadcast = metadataMetaClient.getStorageConf(); + HoodieFileFormat baseFileFormat = metadataMetaClient.getTableConfig().getBaseFileFormat(); + + return engineContext.parallelize(baseFilePaths, parallelism) + .mapPartitions(pathIterator -> { + long count = 0L; + while (pathIterator.hasNext()) { + StoragePath path = pathIterator.next(); + try { + HoodieStorage storage = HoodieStorageUtils.getStorage(path, storageConfBroadcast); + HoodieConfig readerConfig = new HoodieConfig(); + HoodieAvroFileReader reader = (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(storage) + .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + .getFileReader(readerConfig, path, baseFileFormat, Option.empty()); + try { + count += reader.getTotalRecords(); + } finally { + reader.close(); + } + } catch (IOException e) { + throw new HoodieIOException("Error reading total records from file " + path, e); + } + } + return Collections.singletonList(count).iterator(); + }, true) + .collectAsList() + .stream() + .mapToLong(Long::longValue) + .sum(); + } + + /** + * Fetch record locations from FileSlice snapshot. + * + * @param engineContext context ot use. + * @param partitionFileSlicePairs list of pairs of partition and file slice. + * @param recordIndexMaxParallelism parallelism to use. + * @param activeModule active module of interest. + * @param metaClient metaclient instance to use. + * @param dataWriteConfig write config to use. + * @return metadata records for initializing record index entries. + */ + protected <T> HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot( + HoodieEngineContext engineContext, + List<FileSliceAndPartition> partitionFileSlicePairs, + int recordIndexMaxParallelism, + String activeModule, + HoodieTableMetaClient metaClient, + HoodieWriteConfig dataWriteConfig) { + if (partitionFileSlicePairs.isEmpty()) { + return engineContext.emptyHoodieData(); + } + + Option<String> instantTime = metaClient.getActiveTimeline().getCommitsTimeline() + .filterCompletedInstants() + .lastInstant() + .map(HoodieInstant::requestedTime); + if (!instantTime.isPresent()) { + return engineContext.emptyHoodieData(); + } + + engineContext.setJobStatus(activeModule, "Record Index: reading record keys from " + partitionFileSlicePairs.size() + " file slices"); + final int parallelism = Math.min(partitionFileSlicePairs.size(), recordIndexMaxParallelism); + ReaderContextFactory<T> readerContextFactory = engineContext.getReaderContextFactory(metaClient); + return engineContext.parallelize(partitionFileSlicePairs, parallelism).flatMap(partitionAndFileSlice -> { + final String partition = partitionAndFileSlice.partitionPath(); + final FileSlice fileSlice = partitionAndFileSlice.fileSlice(); + final String fileId = fileSlice.getFileId(); + HoodieReaderContext<T> readerContext = readerContextFactory.getContext(); + HoodieSchema dataSchema = HoodieSchemaCache.intern(HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(dataWriteConfig.getWriteSchema()), dataWriteConfig.allowOperationMetadataField())); + HoodieSchema requestedSchema = metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema() Review Comment: π€ The `HoodieFileGroupReader` is created but never explicitly closed β only its iterator is wrapped in the `CloseableMappingIterator`. If `HoodieFileGroupReader` holds resources beyond what `getClosableIterator()` manages (e.g., open file handles, buffers), those would leak. Could you verify that closing the closeable iterator from the reader also fully closes the reader itself? If not, this needs a composite closeable that closes both. ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestGlobalRecordLevelIndexTableVersionSix.scala: ########## @@ -18,15 +18,49 @@ package org.apache.hudi.functional -import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.metadata.MetadataPartitionType +import org.apache.spark.sql.SaveMode +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Tag +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource + +import java.util.Collections @Tag("functional-b") class TestGlobalRecordLevelIndexTableVersionSix extends TestGlobalRecordLevelIndex { override def commonOpts: Map[String, String] = super.commonOpts ++ Map( HoodieTableConfig.VERSION.key() -> "6", HoodieWriteConfig.WRITE_TABLE_VERSION.key() -> "6" ) + + @ParameterizedTest + @EnumSource(classOf[HoodieTableType]) + override def testRLIUpsertAndDropIndex(tableType: HoodieTableType): Unit = { + val hudiOpts = commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name(), + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true") Review Comment: π€ The comment mentions that `partition stats` should be present, but the assertion checks for a size of 2. If only `files` and `col stats` are expected, the count is correct, but the comment might be misleading. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -181,7 +165,8 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTab private boolean hasPartitionsStateChanged = false; protected final transient HoodieEngineContext engineContext; @Getter - protected final List<MetadataPartitionType> enabledPartitionTypes; + protected final transient Map<MetadataPartitionType, Indexer> enabledIndexerMap; Review Comment: π€ Making these fields `transient` breaks the `Serializable` contract of `HoodieTableMetadataWriter`. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/IndexerFactory.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata.index; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.bloomfilters.BloomFiltersIndexer; +import org.apache.hudi.metadata.index.columnstats.ColumnStatsIndexer; +import org.apache.hudi.metadata.index.expression.ExpressionIndexer; +import org.apache.hudi.metadata.index.files.FilesIndexer; +import org.apache.hudi.metadata.index.partitionstats.PartitionStatsIndexer; +import org.apache.hudi.metadata.index.record.PartitionedRecordIndexer; +import org.apache.hudi.metadata.index.record.RecordIndexer; +import org.apache.hudi.metadata.index.secondary.SecondaryIndexer; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Factory for creating {@link Indexer} implementations and resolving enabled indexers + * based on table and metadata configuration. + */ +public class IndexerFactory { + private static Indexer getIndexer(MetadataPartitionType partitionType, + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient, + ExpressionIndexRecordGenerator expressionIndexRecordGenerator) { + switch (partitionType) { + case FILES: + return new FilesIndexer(engineContext, dataTableWriteConfig, dataTableMetaClient); + case BLOOM_FILTERS: + return new BloomFiltersIndexer(engineContext, dataTableWriteConfig, dataTableMetaClient); + case COLUMN_STATS: + return new ColumnStatsIndexer(engineContext, dataTableWriteConfig, dataTableMetaClient); + case RECORD_INDEX: + return dataTableWriteConfig.isRecordLevelIndexEnabled() + ? new PartitionedRecordIndexer(engineContext, dataTableWriteConfig, dataTableMetaClient) + : new RecordIndexer(engineContext, dataTableWriteConfig, dataTableMetaClient); + case EXPRESSION_INDEX: + return new ExpressionIndexer(engineContext, dataTableWriteConfig, dataTableMetaClient, expressionIndexRecordGenerator); + case PARTITION_STATS: + return new PartitionStatsIndexer(engineContext, dataTableWriteConfig, dataTableMetaClient); + case SECONDARY_INDEX: + return new SecondaryIndexer(engineContext, dataTableWriteConfig, dataTableMetaClient); + default: + throw new HoodieNotSupportedException("Unsupported metadata partition type for indexing: " + partitionType); + } + } + + /** + * Returns the map of metadata partition type to the indexer for the enabled metadata + * partition types based on the metadata config and table config. + */ + public static Map<MetadataPartitionType, Indexer> getEnabledIndexerMap( + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient, + ExpressionIndexRecordGenerator expressionIndexRecordGenerator) { + if (!dataTableWriteConfig.getMetadataConfig().isEnabled()) { + return Collections.emptyMap(); + } + return Collections.unmodifiableMap(Arrays.stream(MetadataPartitionType.getValidValues(dataTableMetaClient.getTableConfig().getTableVersion())) + .filter(partitionType -> + (partitionType.isMetadataPartitionEnabled(dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient.getTableConfig()) Review Comment: π€ Is there a versioned `getValidValues` method added to `MetadataPartitionType`? This might cause a compilation error. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.expression; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.metadata.model.FileInfoAndPartition; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex; +import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX; + +/** + * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index + */ +@Slf4j +public class ExpressionIndexer extends BaseIndexer { + + private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator; + + public ExpressionIndexer( + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient, + ExpressionIndexRecordGenerator expressionIndexRecordGenerator) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + + this.expressionIndexRecordGenerator = expressionIndexRecordGenerator; + } + + @Override + public List<IndexPartitionInitialization> buildInitialization( + String dataTableInstantTime, + String instantTimeForPartition, + Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) throws IOException { + Set<String> expressionIndexPartitionsToInit = getExpressionIndexPartitionsToInit( + EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient); + if (expressionIndexPartitionsToInit.size() != 1) { + if (expressionIndexPartitionsToInit.size() > 1) { + log.warn("Skipping expression index initialization as only one expression index " + + "bootstrap at a time is supported for now. Provided: {}", expressionIndexPartitionsToInit); + } + return Collections.emptyList(); + } + + String indexName = expressionIndexPartitionsToInit.iterator().next(); + HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, dataTableMetaClient); + ValidationUtils.checkState(indexDefinition != null, "Expression Index definition is not present for index " + indexName); + + List<FileSliceAndPartition> partitionFileSlicePairs = lazyLatestMergedPartitionFileSliceList.get(); + List<FileInfoAndPartition> filesToIndex = new ArrayList<>(); + partitionFileSlicePairs.forEach(fsp -> { + if (fsp.fileSlice().getBaseFile().isPresent()) { + filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), fsp.fileSlice().getBaseFile().get().getPath(), fsp.fileSlice().getBaseFile().get().getFileSize())); + } + fsp.fileSlice().getLogFiles() + .forEach(hoodieLogFile + -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize()))); + }); + + int fileGroupCount = dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount(); + if (partitionFileSlicePairs.isEmpty()) { + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, indexName, engineContext.emptyHoodieData())); + } + Review Comment: π€ The `Lazy` wrapper here is created and immediately `.get()`-ed on the next line, so it doesn't actually provide any lazy evaluation benefit. Could simplify to a direct call: `HoodieSchema tableSchema = HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient).orElseThrow(...);` ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/files/FilesIndexer.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata.index.files; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.MetadataPartitionType.FILES; + +/** + * Implementation of {@link MetadataPartitionType#FILES} metadata + */ +@Slf4j +public class FilesIndexer extends BaseIndexer { + public FilesIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization(String dataTableInstantTime, String instantTimeForPartition, Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) throws IOException { + // FILES partition uses a single file group + final int fileGroupCount = 1; + + Set<String> partitions = partitionToAllFilesMap.keySet(); + final int totalDataFilesCount = partitionToAllFilesMap.values().stream().mapToInt(List::size).sum(); + log.info("Committing total {} partitions and {} files to metadata", partitions.size(), totalDataFilesCount); + + // Record which saves the list of all partitions + HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(partitions); + HoodieData<HoodieRecord> allPartitionsRecord = engineContext.parallelize(Collections.singletonList(record), 1); + if (partitionToAllFilesMap.isEmpty()) { + return Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, FILES.getPartitionPath(), allPartitionsRecord)); + } + + // Records which save the file listing of each partition + engineContext.setJobStatus(this.getClass().getSimpleName(), "Creating records for metadata FILES partition"); + HoodieData<HoodieRecord> fileListRecords = engineContext.parallelize( + new ArrayList<>(partitionToAllFilesMap.entrySet()), partitionToAllFilesMap.size()) + .map(partitionInfo -> { + Map<String, Long> fileNameToSizeMap = partitionInfo.getValue().stream() + .collect(Collectors.toMap(FileInfo::fileName, FileInfo::size)); + return HoodieMetadataPayload.createPartitionFilesRecord( + partitionInfo.getKey(), fileNameToSizeMap, Collections.emptyList()); Review Comment: π€ Calling `fileListRecords.count()` triggers a full Spark action just for validation. For tables with many partitions, this doubles the initialization cost of this step. Could you validate using `partitionToAllFilesMap.size()` against `partitions.size()` instead (which is always true by construction), or remove this check? ########## hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java: ########## @@ -453,8 +454,25 @@ public static Set<String> getAllPartitionPaths() { */ public static MetadataPartitionType[] getValidValues() { // ALL_PARTITIONS is just another record type in FILES partition Review Comment: π€ The no-arg `getValidValues()` now delegates to `getValidValues(HoodieTableVersion.current())`. Is this safe for all existing callers? If any caller uses this method in a context where the table is actually pre-v8, it would now silently include partition types (SECONDARY_INDEX, EXPRESSION_INDEX, PARTITION_STATS) that aren't supported for that table. Might be worth adding a note or deprecating the no-arg version in favor of always passing the table version explicitly. ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ########## @@ -762,7 +724,8 @@ public static Map<String, HoodieData<HoodieRecord>> convertMissingPartitionRecor filesAddedCount[0] += filesToAdd.size(); List<String> filesToDelete = filesDeleted.getOrDefault(partition, Collections.emptyList()); fileDeleteCount[0] += filesToDelete.size(); - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesToAdd, filesToDelete); + Map<String, Long> filesToAddMap = filesToAdd.stream().collect(Collectors.toMap(FileInfo::fileName, FileInfo::size)); Review Comment: π€ The old signature used `Map<String, Long>` which inherently prevented duplicate file names. The new `List<FileInfo>` can potentially contain duplicates, and `Collectors.toMap(FileInfo::fileName, FileInfo::size)` will throw `IllegalStateException` on duplicate keys. Could you add a merge function (e.g., `Long::max`) to handle this defensively, or document why duplicates can't occur? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -432,109 +425,80 @@ private boolean initializeFromFilesystem(String dataTableInstantTime, List<Metad partitionInfoList = Collections.emptyList(); } } - Map<String, Map<String, Long>> partitionIdToAllFilesMap = partitionInfoList.stream() - .map(p -> { - String partitionName = HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath()); - return Pair.of(partitionName, p.getFilenameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - // validate that each index is eligible to be initialized - Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator(); - while (iterator.hasNext()) { - MetadataPartitionType partitionType = iterator.next(); - if (partitionType == PARTITION_STATS && !dataMetaClient.getTableConfig().isTablePartitioned()) { - // Partition stats index cannot be enabled for a non-partitioned table - iterator.remove(); - this.enabledPartitionTypes.remove(partitionType); - } + Map<String, List<FileInfo>> partitionIdToAllFilesMap = DirectoryInfo.getPartitionToFileInfo(partitionInfoList); + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = getLazyLatestMergedPartitionFileSliceList(); + + // FILES partition should always be initialized first if enabled + if (!filesPartitionAvailable) { + initializeMetadataPartition(FILES, indexerMapForPartitionsToInit.get(FILES), + dataTableInstantTime, partitionIdToAllFilesMap, lazyLatestMergedPartitionFileSliceList); Review Comment: π€ This will NPE if the metadata table directory exists but the FILES partition is not marked as available. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -417,7 +401,16 @@ private boolean initializeFromFilesystem(String dataTableInstantTime, List<Metad } // Already initialized partitions can be ignored - partitionsToInit.removeIf(metadataPartition -> dataMetaClient.getTableConfig().isMetadataPartitionAvailable((metadataPartition))); + indexerMapForPartitionsToInit.keySet().removeIf( + metadataPartition -> dataMetaClient.getTableConfig().isMetadataPartitionAvailable(metadataPartition)); + + // For a fresh table, defer RLI initialization + if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() + && this.enabledIndexerMap.containsKey(RECORD_INDEX) + && dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 0) { + this.enabledIndexerMap.remove(RECORD_INDEX); + indexerMapForPartitionsToInit.remove(RECORD_INDEX); + } Review Comment: π€ This will throw `UnsupportedOperationException` because `enabledIndexerMap` is unmodifiable. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/secondary/SecondaryIndexer.java: ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.secondary; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.metadata.model.FileInfo; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.metadata.index.model.IndexPartitionInitialization; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.util.Lazy; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit; +import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX; +import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX; +import static org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices; + +/** + * Implementation of {@link MetadataPartitionType#SECONDARY_INDEX} index + */ +@Slf4j +public class SecondaryIndexer extends BaseIndexer { + + private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48; + + public SecondaryIndexer( + HoodieEngineContext engineContext, + HoodieWriteConfig dataTableWriteConfig, + HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + @Override + public List<IndexPartitionInitialization> buildInitialization(String dataTableInstantTime, String instantTimeForPartition, Map<String, List<FileInfo>> partitionToAllFilesMap, + Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList) throws IOException { + Set<String> secondaryIndexPartitionsToInit = getSecondaryIndexPartitionsToInit(SECONDARY_INDEX, dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient); + if (secondaryIndexPartitionsToInit.size() != 1) { + if (secondaryIndexPartitionsToInit.size() > 1) { + log.warn("Skipping secondary index initialization as only one secondary index bootstrap at a time is supported for now. Provided: {}", secondaryIndexPartitionsToInit); + } + return Collections.emptyList(); + } + + String indexName = secondaryIndexPartitionsToInit.iterator().next(); + HoodieIndexDefinition indexDefinition = HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, dataTableMetaClient); + ValidationUtils.checkState(indexDefinition != null, "Secondary Index definition is not present for index " + indexName); + + List<FileSliceAndPartition> partitionFileSlicePairs = lazyLatestMergedPartitionFileSliceList.get(); + + int parallelism = Math.min(partitionFileSlicePairs.size(), dataTableWriteConfig.getMetadataConfig().getSecondaryIndexParallelism()); + HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices( + engineContext, + partitionFileSlicePairs, + parallelism, + this.getClass().getSimpleName(), + dataTableMetaClient, + indexDefinition, + dataTableWriteConfig.getProps()); Review Comment: π€ This passes `RECORD_INDEX` as the partition type to `estimateFileGroupCount` for secondary index sizing. Is that intentional? If `estimateFileGroupCount` uses the partition type for anything beyond estimation logic (e.g., logging, metrics, config lookup), passing the wrong type could cause confusion or incorrect behavior. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.metadata.index.record; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.engine.ReaderContextFactory; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.metadata.index.model.DataPartitionAndRecords; +import org.apache.hudi.metadata.model.FileSliceAndPartition; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.utils.SerDeHelper; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.index.BaseIndexer; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema; + +/** + * Base implementation for record-index. + */ +@Slf4j +public abstract class BaseRecordIndexer extends BaseIndexer { + + private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48; + + protected BaseRecordIndexer(HoodieEngineContext engineContext, HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient dataTableMetaClient) { + super(engineContext, dataTableWriteConfig, dataTableMetaClient); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + return initializeRecordIndexPartition(null, latestMergedPartitionFileSliceList, recordIndexMaxParallelism); + } + + protected DataPartitionAndRecords initializeRecordIndexPartition( + String dataPartition, + List<FileSliceAndPartition> latestMergedPartitionFileSliceList, + int recordIndexMaxParallelism) { + log.info("Initializing record index from {} file slices", latestMergedPartitionFileSliceList.size()); + HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot( + engineContext, + latestMergedPartitionFileSliceList, + recordIndexMaxParallelism, + this.getClass().getSimpleName(), + dataTableMetaClient, + dataTableWriteConfig); + + // Initialize the file groups + final int fileGroupCount = estimateFileGroupCount(records); + log.info("Initializing record index with {} file groups.", fileGroupCount); + return new DataPartitionAndRecords(fileGroupCount, Option.ofNullable(dataPartition), records); + } + + @Override + public void postInitialization(HoodieTableMetaClient metadataMetaClient, HoodieData<HoodieRecord> records, int fileGroupCount, String relativePartitionPath) { + super.postInitialization(metadataMetaClient, records, fileGroupCount, relativePartitionPath); + // Validate record index after commit if validation is enabled Review Comment: π€ `postInitialization` calls `records.unpersist()` at the end, but in `PartitionedRecordIndexer`, `estimateFileGroupCount` persists records for each partition independently. When the caller invokes `postInitialization`, which `records` object is passed? If it's only one of the per-partition datasets, the others remain persisted and never cleaned up. Is the caller responsible for unpersisting all of them? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
