This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fd5ecbb6f5e [HUDI-8200] Adding support to configure view storage type
with HoodieMetadataTableValidator (#11944)
fd5ecbb6f5e is described below
commit fd5ecbb6f5e7d8d298eea841c57ede3c99c0986f
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Sep 19 15:29:45 2024 -0700
[HUDI-8200] Adding support to configure view storage type with
HoodieMetadataTableValidator (#11944)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../utilities/HoodieMetadataTableValidator.java | 56 +++++++++++++++++++---
.../TestHoodieMetadataTableValidator.java | 19 +++++++-
2 files changed, 67 insertions(+), 8 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index cf1365bf06a..6a0fd473248 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -23,6 +23,7 @@ import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
@@ -48,12 +49,16 @@ import
org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.table.view.SpillableMapBasedFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -308,6 +313,18 @@ public class HoodieMetadataTableValidator implements
Serializable {
required = false)
public int numRecordIndexErrorSamples = 100;
+ @Parameter(names = {"--view-storage-type-fs-listing"},
+ description = "View storage type to use for File System based listing.
"
+ + "Supported values are MEMORY (by default) and SPILLABLE_DISK.",
+ required = false)
+ public String viewStorageTypeForFSListing =
FileSystemViewStorageType.MEMORY.name();
+
+ @Parameter(names = {"--view-storage-type-mdt"},
+ description = "View storage type to use for metadata table based
listing. "
+ + "Supported values are MEMORY (by default) and SPILLABLE_DISK.",
+ required = false)
+ public String viewStorageTypeForMetadata =
FileSystemViewStorageType.MEMORY.name();
+
@Parameter(names = {"--min-validate-interval-seconds"},
description = "the min validate interval of each validate when set
--continuous, default is 10 minutes.")
public Integer minValidateIntervalSeconds = 10 * 60;
@@ -356,6 +373,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
+ " --validate-record-index-count " + validateRecordIndexCount +
", \n"
+ " --validate-record-index-content " + validateRecordIndexContent
+ ", \n"
+ " --num-record-index-error-samples " +
numRecordIndexErrorSamples + ", \n"
+ + " --view-storage-type-fs-listing " + viewStorageTypeForFSListing
+ ", \n"
+ + " --view-storage-type-mdt " + viewStorageTypeForMetadata + ", \n"
+ " --continuous " + continuous + ", \n"
+ " --skip-data-files-for-cleaning " + skipDataFilesForCleaning +
", \n"
+ " --ignore-failed " + ignoreFailed + ", \n"
@@ -389,6 +408,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
&& Objects.equals(validateBloomFilters, config.validateBloomFilters)
&& Objects.equals(validateRecordIndexCount,
config.validateRecordIndexCount)
&& Objects.equals(validateRecordIndexContent,
config.validateRecordIndexContent)
+ && Objects.equals(viewStorageTypeForFSListing,
config.viewStorageTypeForFSListing)
+ && Objects.equals(viewStorageTypeForMetadata,
config.viewStorageTypeForMetadata)
&& Objects.equals(numRecordIndexErrorSamples,
config.numRecordIndexErrorSamples)
&& Objects.equals(minValidateIntervalSeconds,
config.minValidateIntervalSeconds)
&& Objects.equals(parallelism, config.parallelism)
@@ -406,6 +427,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
return Objects.hash(basePath, continuous, skipDataFilesForCleaning,
validateLatestFileSlices,
validateLatestBaseFiles, validateAllFileGroups,
validateAllColumnStats, validateBloomFilters,
validateRecordIndexCount, validateRecordIndexContent,
numRecordIndexErrorSamples,
+ viewStorageTypeForFSListing, viewStorageTypeForMetadata,
minValidateIntervalSeconds, parallelism, recordIndexParallelism,
ignoreFailed,
sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath,
configs, help);
}
@@ -528,9 +550,9 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
try (HoodieMetadataValidationContext metadataTableBasedContext =
- new HoodieMetadataValidationContext(engineContext, props,
metaClient, true);
+ new HoodieMetadataValidationContext(engineContext, props,
metaClient, true, cfg.viewStorageTypeForMetadata);
HoodieMetadataValidationContext fsBasedContext =
- new HoodieMetadataValidationContext(engineContext, props,
metaClient, false)) {
+ new HoodieMetadataValidationContext(engineContext, props,
metaClient, false, cfg.viewStorageTypeForFSListing)) {
Set<String> finalBaseFilesForCleaning = baseFilesForCleaning;
List<Pair<Boolean, ? extends Exception>> result = new ArrayList<>(
engineContext.parallelize(allPartitions,
allPartitions.size()).map(partitionPath -> {
@@ -1395,8 +1417,9 @@ public class HoodieMetadataTableValidator implements
Serializable {
public HoodieMetadataValidationContext(
HoodieEngineContext engineContext, Properties props,
HoodieTableMetaClient metaClient,
- boolean enableMetadataTable) {
- this.props = props;
+ boolean enableMetadataTable, String viewStorageType) {
+ this.props = new Properties();
+ this.props.putAll(props);
this.metaClient = metaClient;
this.enableMetadataTable = enableMetadataTable;
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
@@ -1405,8 +1428,12 @@ public class HoodieMetadataTableValidator implements
Serializable {
.withMetadataIndexColumnStats(enableMetadataTable)
.withEnableRecordIndex(enableMetadataTable)
.build();
- this.fileSystemView =
FileSystemViewManager.createInMemoryFileSystemView(engineContext,
- metaClient, metadataConfig);
+ props.put(FileSystemViewStorageConfig.VIEW_TYPE.key(), viewStorageType);
+ FileSystemViewStorageConfig viewConf =
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build();
+
ValidationUtils.checkArgument(viewConf.getStorageType().name().equals(viewStorageType),
"View storage type not reflected");
+ HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(props).build();
+ this.fileSystemView = getFileSystemView(engineContext,
+ metaClient, metadataConfig, viewConf, commonConfig);
this.tableMetadata = HoodieTableMetadata.create(
engineContext, metaClient.getStorage(), metadataConfig,
metaClient.getBasePath().toString());
if
(metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0)
{
@@ -1414,6 +1441,23 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
}
+ private HoodieTableFileSystemView getFileSystemView(HoodieEngineContext
context,
+ HoodieTableMetaClient
metaClient, HoodieMetadataConfig metadataConfig,
+
FileSystemViewStorageConfig viewConf, HoodieCommonConfig commonConfig) {
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
+ switch (viewConf.getStorageType()) {
+ case SPILLABLE_DISK:
+ LOG.debug("Creating Spillable Disk based Table View");
+ return new SpillableMapBasedFileSystemView(metaClient, timeline,
viewConf, commonConfig);
+ case MEMORY:
+ LOG.debug("Creating in-memory based Table View");
+ return FileSystemViewManager.createInMemoryFileSystemView(context,
+ metaClient, metadataConfig);
+ default:
+ throw new HoodieException("Unsupported storage type " +
viewConf.getStorageType() + ", used with HoodieMetadataTableValidator");
+ }
+ }
+
public HoodieTableMetaClient getMetaClient() {
return metaClient;
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index a97140a27ab..0f4592cedf9 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimeGenerator;
import org.apache.hudi.common.table.timeline.TimeGenerators;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -87,8 +88,18 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
);
}
- @Test
- public void testMetadataTableValidation() {
+ private static Stream<Arguments> viewStorageArgs() {
+ return Stream.of(
+ Arguments.of(null, null),
+ Arguments.of(FileSystemViewStorageType.MEMORY.name(),
FileSystemViewStorageType.MEMORY.name()),
+ Arguments.of(FileSystemViewStorageType.SPILLABLE_DISK.name(),
FileSystemViewStorageType.SPILLABLE_DISK.name()),
+ Arguments.of(FileSystemViewStorageType.MEMORY.name(),
FileSystemViewStorageType.SPILLABLE_DISK.name())
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("viewStorageArgs")
+ public void testMetadataTableValidation(String viewStorageTypeForFSListing,
String viewStorageTypeForMDTListing) {
Map<String, String> writeOptions = new HashMap<>();
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
writeOptions.put("hoodie.table.name", "test_table");
@@ -119,6 +130,10 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
config.basePath = basePath;
config.validateLatestFileSlices = true;
config.validateAllFileGroups = true;
+ if (viewStorageTypeForFSListing != null && viewStorageTypeForMDTListing !=
null) {
+ config.viewStorageTypeForFSListing = viewStorageTypeForFSListing;
+ config.viewStorageTypeForMetadata = viewStorageTypeForMDTListing;
+ }
HoodieMetadataTableValidator validator = new
HoodieMetadataTableValidator(jsc, config);
assertTrue(validator.run());
assertFalse(validator.hasValidationFailure());