This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4286d7982374809e5517852adfcdcd5bba5619ec Author: Sivabalan Narayanan <[email protected]> AuthorDate: Fri Feb 3 17:59:18 2023 -0800 [HUDI-5496] Avoid unnecessary file system parsing to initialize metadata table for a new data table (#7841) - Optimizing instantiation of metadata table for a fresh table by avoiding file listing --- .../apache/hudi/client/BaseHoodieWriteClient.java | 2 + .../metadata/HoodieBackedTableMetadataWriter.java | 64 ++++++++++++---------- .../SparkHoodieBackedTableMetadataWriter.java | 1 + .../internal/HoodieDataSourceInternalWriter.java | 4 +- 4 files changed, 39 insertions(+), 32 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index c3260914bd5..17956479762 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -519,6 +519,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient */ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) { try { + context.setJobStatus(this.getClass().getSimpleName(),"Cleaning up marker directories for commit " + instantTime + " in table " + + config.getTableName()); // Delete the marker directory for the instant. WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index a8356ff9c71..5e8367e2095 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -1086,39 +1086,45 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>(); - List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient); - Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream() - .map(p -> { - String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); - return Pair.of(partitionName, p.getFileNameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum(); - List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet()); - - if (partitionTypes.contains(MetadataPartitionType.FILES)) { - // Record which saves the list of all partitions - HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); - HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord); - ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); - partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); - } + // skip file system listing to populate metadata records if its a fresh table. + // this is applicable only if the table already has N commits and metadata is enabled at a later point in time. + if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table. + // If not, last completed commit in data table will be chosen as the initial commit time. + LOG.info("Triggering empty Commit to metadata to initialize"); + } else { + List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient); + Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream() + .map(p -> { + String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum(); + List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet()); + + if (partitionTypes.contains(MetadataPartitionType.FILES)) { + // Record which saves the list of all partitions + HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); + HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord); + ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); + partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); + } - if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) { - final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( - engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD); - } + if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) { + final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); + partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD); + } - if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) { - final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( - engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); - partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); + if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) { + final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); + partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); + } + LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata"); } - LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata"); - commit(createInstantTime, partitionToRecordsMap, false); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 81526c25bcc..23537f6f798 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -133,6 +133,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap); JavaRDD<HoodieRecord> preppedRecordRDD = HoodieJavaRDD.getJavaRDD(preppedRecords); + engineContext.setJobStatus(this.getClass().getName(), "Committing " + instantTime + " to metadata table " + metadataWriteConfig.getTableName()); try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig)) { // rollback partially failed writes if any. if (writeClient.rollbackFailedWrites()) { diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java index c4b21483e8f..11f5d5030b4 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java @@ -34,7 +34,6 @@ import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; import org.apache.spark.sql.types.StructType; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -51,7 +50,6 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter { private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper; private final boolean populateMetaFields; private final Boolean arePartitionRecordsSorted; - private Map<String, String> extraMetadataMap = new HashMap<>(); public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType, SparkSession sparkSession, Configuration configuration, DataSourceOptions dataSourceOptions, @@ -61,7 +59,7 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter { this.structType = structType; this.populateMetaFields = populateMetaFields; this.arePartitionRecordsSorted = arePartitionRecordsSorted; - this.extraMetadataMap = DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap()); + Map<String, String> extraMetadataMap = DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap()); this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType, sparkSession, configuration, extraMetadataMap); }
