This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 020786a [HUDI-3451] Delete metadata table when the write client disables MDT (#5186) 020786a is described below commit 020786a5f9d25bf140decf24d65e07dd738e4f9d Author: YueZhang <69956021+zhangyue19921...@users.noreply.github.com> AuthorDate: Sat Apr 2 19:01:06 2022 +0800 [HUDI-3451] Delete metadata table when the write client disables MDT (#5186) * Add checks for metadata table init to avoid possible out-of-sync * Revise the logic to reuse existing table config * Revise docs and naming Co-authored-by: yuezhang <yuezh...@freewheel.tv> Co-authored-by: Y Ethan Guo <ethan.guoyi...@gmail.com> --- .../java/org/apache/hudi/table/HoodieTable.java | 55 ++++++++++++-- .../org/apache/hudi/table/HoodieFlinkTable.java | 4 +- .../org/apache/hudi/table/HoodieSparkTable.java | 2 + .../functional/TestHoodieBackedMetadata.java | 84 +++++++++++++++++++--- .../client/functional/TestHoodieMetadataBase.java | 5 ++ .../hudi/common/table/HoodieTableConfig.java | 2 +- 6 files changed, 136 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index f52d46a..ae06e6b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -18,11 +18,6 @@ package org.apache.hudi.table; -import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; @@ -50,6 +45,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -64,6 +60,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -80,6 +77,12 @@ import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.storage.HoodieLayoutFactory; import org.apache.hudi.table.storage.HoodieStorageLayout; + +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -802,6 +805,48 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem return Option.empty(); } + /** + * Deletes the metadata table if the writer disables metadata table with hoodie.metadata.enable=false + */ + public void maybeDeleteMetadataTable() { + if (shouldExecuteMetadataTableDeletion()) { + try { + Path mdtBasePath = new Path(HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath())); + FileSystem fileSystem = metaClient.getFs(); + if (fileSystem.exists(mdtBasePath)) { + LOG.info("Deleting metadata table because it is disabled in writer."); + fileSystem.delete(mdtBasePath, true); + } + clearMetadataTablePartitionsConfig(); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to delete metadata table.", ioe); + } + } + } + + private boolean shouldExecuteMetadataTableDeletion() { + // Only execute metadata table deletion when all the following conditions are met + // (1) This is data table + // (2) Metadata table is disabled in HoodieWriteConfig for the writer + // (3) Check `HoodieTableConfig.TABLE_METADATA_PARTITIONS`. Either the table config + // does not exist, or the table config is non-empty indicating that metadata table + // partitions are ready to use + return !HoodieTableMetadata.isMetadataTable(metaClient.getBasePath()) + && !config.isMetadataTableEnabled() + && (!metaClient.getTableConfig().contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS) + || !StringUtils.isNullOrEmpty(metaClient.getTableConfig().getMetadataPartitions())); + } + + /** + * Clears hoodie.table.metadata.partitions in hoodie.properties + */ + private void clearMetadataTablePartitionsConfig() { + LOG.info("Clear hoodie.table.metadata.partitions in hoodie.properties"); + metaClient.getTableConfig().setValue( + HoodieTableConfig.TABLE_METADATA_PARTITIONS.key(), StringUtils.EMPTY_STRING); + HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), metaClient.getTableConfig().getProps()); + } + public HoodieTableMetadata getMetadataTable() { return this.metadata; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 2f08a55..f749ce4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -18,7 +18,6 @@ package org.apache.hudi.table; -import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.data.HoodieData; @@ -37,6 +36,8 @@ import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.avro.specific.SpecificRecordBase; + import java.util.List; import static org.apache.hudi.common.data.HoodieList.getList; @@ -107,6 +108,7 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload> return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, actionMetadata, Option.of(triggeringInstantTimestamp))); } else { + maybeDeleteMetadataTable(); return Option.empty(); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index ce14d43..9e4bb14 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -122,6 +122,8 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload> } catch (IOException e) { throw new HoodieMetadataException("Checking existence of metadata table failed", e); } + } else { + maybeDeleteMetadataTable(); } return Option.empty(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index f60de7d..5c73d96 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -18,15 +18,6 @@ package org.apache.hudi.client.functional; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.util.Time; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieMetadataRecord; @@ -76,6 +67,7 @@ import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; @@ -100,6 +92,16 @@ import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.testutils.MetadataMergeWriteStatus; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.Time; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -114,6 +116,7 @@ import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -198,6 +201,69 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { validateMetadata(testTable, true); } + @Test + public void testTurnOffMetadataTableAfterEnable() throws Exception { + init(COPY_ON_WRITE, true); + String instant1 = "0000001"; + HoodieCommitMetadata hoodieCommitMetadata = doWriteOperationWithMeta(testTable, instant1, INSERT); + + // Simulate the complete data directory including ".hoodie_partition_metadata" file + File metaForP1 = new File(metaClient.getBasePath() + "/p1",".hoodie_partition_metadata"); + File metaForP2 = new File(metaClient.getBasePath() + "/p2",".hoodie_partition_metadata"); + metaForP1.createNewFile(); + metaForP2.createNewFile(); + + // Sync to metadata table + metaClient.reloadActiveTimeline(); + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + Option metadataWriter = table.getMetadataWriter(instant1, Option.of(hoodieCommitMetadata)); + validateMetadata(testTable, true); + + assertTrue(metadataWriter.isPresent()); + HoodieTableConfig hoodieTableConfig = + new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig.getPayloadClass()); + assertFalse(hoodieTableConfig.getMetadataPartitions().isEmpty()); + + // Turn off metadata table + HoodieWriteConfig writeConfig2 = HoodieWriteConfig.newBuilder() + .withProperties(this.writeConfig.getProps()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); + testTable = HoodieTestTable.of(metaClient); + String instant2 = "0000002"; + HoodieCommitMetadata hoodieCommitMetadata2 = doWriteOperationWithMeta(testTable, instant2, INSERT); + metaClient.reloadActiveTimeline(); + HoodieTable table2 = HoodieSparkTable.create(writeConfig2, context, metaClient); + Option metadataWriter2 = table2.getMetadataWriter(instant2, Option.of(hoodieCommitMetadata2)); + assertFalse(metadataWriter2.isPresent()); + + HoodieTableConfig hoodieTableConfig2 = + new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig2.getPayloadClass()); + assertEquals(StringUtils.EMPTY_STRING, hoodieTableConfig2.getMetadataPartitions()); + // Assert metadata table folder is deleted + assertFalse(metaClient.getFs().exists( + new Path(HoodieTableMetadata.getMetadataTableBasePath(writeConfig2.getBasePath())))); + + // Enable metadata table again and initialize metadata table through + // HoodieTable.getMetadataWriter() function + HoodieWriteConfig writeConfig3 = HoodieWriteConfig.newBuilder() + .withProperties(this.writeConfig.getProps()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) + .build(); + testTable = HoodieTestTable.of(metaClient); + metaClient.reloadActiveTimeline(); + String instant3 = "0000003"; + HoodieCommitMetadata hoodieCommitMetadata3 = doWriteOperationWithMeta(testTable, instant3, INSERT); + metaClient.reloadActiveTimeline(); + HoodieTable table3 = HoodieSparkTable.create(writeConfig3, context, metaClient); + Option metadataWriter3 = table3.getMetadataWriter(instant3, Option.of(hoodieCommitMetadata3)); + validateMetadata(testTable, true); + assertTrue(metadataWriter3.isPresent()); + HoodieTableConfig hoodieTableConfig3 = + new HoodieTableConfig(this.fs, metaClient.getMetaPath(), writeConfig.getPayloadClass()); + assertFalse(hoodieTableConfig3.getMetadataPartitions().isEmpty()); + } + /** * Only valid partition directories are added to the metadata. */ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index f00a0b8..93d4ac5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.HoodieTimelineArchiver; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteConcurrencyMode; @@ -176,6 +177,10 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3); } + protected HoodieCommitMetadata doWriteOperationWithMeta(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception { + return testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3); + } + protected void doClean(HoodieTestTable testTable, String commitTime, List<String> commitsToClean) throws IOException { doCleanInternal(testTable, commitTime, commitsToClean, false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 923ee27..cfb0df3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -607,7 +607,7 @@ public class HoodieTableConfig extends HoodieConfig { public String getMetadataPartitions() { return getStringOrDefault(TABLE_METADATA_PARTITIONS, StringUtils.EMPTY_STRING); } - + public Map<String, String> propsMap() { return props.entrySet().stream() .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));