This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch release-0.11.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 46deea05c402e986395875b35f8bdab6cd8bbda5 Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Tue Apr 19 07:58:46 2022 +0530 [HUDI-3899] Drop index to delete pending index instants from timeline if applicable (#5342) Co-authored-by: sivabalan <n.siv...@gmail.com> --- .../metadata/HoodieBackedTableMetadataWriter.java | 39 ++++- .../org/apache/hudi/utilities/HoodieIndexer.java | 2 +- .../apache/hudi/utilities/TestHoodieIndexer.java | 176 +++++++++++++++++++-- .../indexer-only-bloom.properties | 25 +++ 4 files changed, 225 insertions(+), 17 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index be1aed75e2..d080d14a69 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -18,13 +18,9 @@ package org.apache.hudi.metadata; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; +import org.apache.hudi.avro.model.HoodieIndexPlan; import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieRestoreMetadata; @@ -70,6 +66,12 @@ import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieMetadataException; + +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -88,6 +90,9 @@ import java.util.Set; import java.util.stream.Collectors; import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant; +import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan; import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING; import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; @@ -727,9 +732,33 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta HoodieTableConfig.update(dataMetaClient.getFs(), new Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps()); LOG.warn("Deleting Metadata Table partitions: " + partitionPath); dataMetaClient.getFs().delete(new Path(metadataWriteConfig.getBasePath(), partitionPath), true); + // delete corresponding pending indexing instant file in the timeline + LOG.warn("Deleting pending indexing instant from the timeline for partition: " + partitionPath); + deletePendingIndexingInstant(dataMetaClient, partitionPath); } } + /** + * Deletes any pending indexing instant, if it exists. + * It reads the plan from indexing.requested file and deletes both requested and inflight instants, + * if the partition path in the plan matches with the given partition path. + */ + private static void deletePendingIndexingInstant(HoodieTableMetaClient metaClient, String partitionPath) { + metaClient.reloadActiveTimeline().filterPendingIndexTimeline().getInstants().filter(instant -> REQUESTED.equals(instant.getState())) + .forEach(instant -> { + try { + HoodieIndexPlan indexPlan = deserializeIndexPlan(metaClient.getActiveTimeline().readIndexPlanAsBytes(instant).get()); + if (indexPlan.getIndexPartitionInfos().stream() + .anyMatch(indexPartitionInfo -> indexPartitionInfo.getMetadataPartitionPath().equals(partitionPath))) { + metaClient.getActiveTimeline().deleteInstantFileIfExists(instant); + metaClient.getActiveTimeline().deleteInstantFileIfExists(getIndexInflightInstant(instant.getTimestamp())); + } + } catch (IOException e) { + LOG.error("Failed to delete the instant file corresponding to " + instant); + } + }); + } + private MetadataRecordsGenerationParams getRecordsGenerationParams() { return new MetadataRecordsGenerationParams( dataMetaClient, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java index 501f9296b4..96f6ce38cd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java @@ -85,7 +85,7 @@ import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE; public class HoodieIndexer { private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class); - private static final String DROP_INDEX = "dropindex"; + static final String DROP_INDEX = "dropindex"; private final HoodieIndexer.Config cfg; private TypedProperties props; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java index 695ca88413..9312a26b4f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java @@ -28,14 +28,17 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.testutils.providers.SparkProvider; +import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -50,10 +53,16 @@ import java.util.List; import java.util.Objects; import static org.apache.hudi.common.table.HoodieTableMetaClient.reload; +import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions; +import static org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists; import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS; import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; import static org.apache.hudi.metadata.MetadataPartitionType.FILES; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.apache.hudi.utilities.HoodieIndexer.DROP_INDEX; +import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE; +import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -81,6 +90,14 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP initMetaClient(); } + protected void initMetaClient() throws IOException { + String rootPathStr = "file://" + tempDir.toAbsolutePath().toString(); + Path rootPath = new Path(rootPathStr); + rootPath.getFileSystem(jsc.hadoopConfiguration()).mkdirs(rootPath); + metaClient = HoodieTestUtils.init(rootPathStr, getTableType()); + basePath = metaClient.getBasePath(); + } + @Test public void testGetRequestedPartitionTypes() { HoodieIndexer.Config config = new HoodieIndexer.Config(); @@ -132,29 +149,166 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP assertNoWriteErrors(statuses); // validate table config - assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); - assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); // build indexer config which has only column_stats enabled (files is enabled by default) HoodieIndexer.Config config = new HoodieIndexer.Config(); String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); config.basePath = basePath; config.tableName = tableName; - config.indexTypes = "COLUMN_STATS"; - config.runningMode = "scheduleAndExecute"; + config.indexTypes = COLUMN_STATS.name(); + config.runningMode = SCHEDULE_AND_EXECUTE; config.propsFilePath = propsPath; // start the indexer and validate column_stats index is also complete HoodieIndexer indexer = new HoodieIndexer(jsc, config); assertEquals(0, indexer.start(0)); // validate table config - assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); - assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(HoodieTableMetadataUtil.getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(COLUMN_STATS.getPartitionPath())); + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(COLUMN_STATS.getPartitionPath())); // validate metadata partitions actually exist - assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, FILES)); - assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, COLUMN_STATS)); - assertTrue(HoodieTableMetadataUtil.metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS)); + assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + } + + @Test + public void testIndexerDropPartitionDeletesInstantFromTimeline() { + initTestDataGenerator(); + String tableName = "indexer_test"; + HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName); + // enable files on the regular write client + HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true); + HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build(); + // do one upsert with synchronous metadata update + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); + String instant = "0001"; + writeClient.startCommitWithTime(instant); + List<HoodieRecord> records = dataGen.generateInserts(instant, 100); + JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), instant); + List<WriteStatus> statuses = result.collect(); + assertNoWriteErrors(statuses); + + // validate partitions built successfully + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); + assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); + assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + + // build indexer config which has only column_stats enabled (files is enabled by default) + HoodieIndexer.Config config = new HoodieIndexer.Config(); + String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); + config.basePath = basePath; + config.tableName = tableName; + config.indexTypes = COLUMN_STATS.name(); + config.runningMode = SCHEDULE; + config.propsFilePath = propsPath; + + // schedule indexing and validate column_stats index is also initialized + HoodieIndexer indexer = new HoodieIndexer(jsc, config); + assertEquals(0, indexer.start(0)); + Option<HoodieInstant> indexInstantInTimeline = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); + assertTrue(indexInstantInTimeline.isPresent()); + assertEquals(REQUESTED, indexInstantInTimeline.get().getState()); + assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS)); + + // drop column_stats and validate indexing.requested is also removed from the timeline + config.runningMode = DROP_INDEX; + indexer = new HoodieIndexer(jsc, config); + assertEquals(0, indexer.start(0)); + indexInstantInTimeline = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); + assertFalse(indexInstantInTimeline.isPresent()); + assertFalse(metadataPartitionExists(basePath, context, COLUMN_STATS)); + + // check other partitions are intact + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); + assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); + assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + } + + @Test + public void testTwoIndexersOneCreateOneDropPartition() { + initTestDataGenerator(); + String tableName = "indexer_test"; + HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName); + // enable files on the regular write client + HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false); + HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build(); + // do one upsert with synchronous metadata update + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); + String instant = "0001"; + writeClient.startCommitWithTime(instant); + List<HoodieRecord> records = dataGen.generateInserts(instant, 100); + JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), instant); + List<WriteStatus> statuses = result.collect(); + assertNoWriteErrors(statuses); + + // validate files partition built successfully + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); + assertTrue(metadataPartitionExists(basePath, context, FILES)); + + // build indexer config which has only bloom_filters enabled + HoodieIndexer.Config config = getHoodieIndexConfig(BLOOM_FILTERS.name(), SCHEDULE_AND_EXECUTE, "delta-streamer-config/indexer-only-bloom.properties"); + // start the indexer and validate bloom_filters index is also complete + HoodieIndexer indexer = new HoodieIndexer(jsc, config); + assertEquals(0, indexer.start(0)); + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); + assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + + // completed index timeline for later validation + Option<HoodieInstant> bloomIndexInstant = metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant(); + assertTrue(bloomIndexInstant.isPresent()); + + // build indexer config which has only column_stats enabled + config = getHoodieIndexConfig(COLUMN_STATS.name(), SCHEDULE, "delta-streamer-config/indexer.properties"); + + // schedule indexing and validate column_stats index is also initialized + // and indexing.requested instant is present + indexer = new HoodieIndexer(jsc, config); + assertEquals(0, indexer.start(0)); + Option<HoodieInstant> columnStatsIndexInstant = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); + assertTrue(columnStatsIndexInstant.isPresent()); + assertEquals(REQUESTED, columnStatsIndexInstant.get().getState()); + assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS)); + + // drop column_stats and validate indexing.requested is also removed from the timeline + // and completed indexing instant corresponding to bloom_filters index is still present + dropIndexAndAssert(COLUMN_STATS, "delta-streamer-config/indexer.properties", Option.empty()); + + // check other partitions are intact + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(FILES.getPartitionPath())); + assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(getCompletedMetadataPartitions(reload(metaClient).getTableConfig()).contains(BLOOM_FILTERS.getPartitionPath())); + assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + + // drop bloom filter partition. timeline files should not be deleted since the index building is complete. + dropIndexAndAssert(BLOOM_FILTERS, "delta-streamer-config/indexer-only-bloom.properties", bloomIndexInstant); + } + + private void dropIndexAndAssert(MetadataPartitionType indexType, String resourceFilePath, Option<HoodieInstant> completedIndexInstant) { + HoodieIndexer.Config config = getHoodieIndexConfig(indexType.name(), DROP_INDEX, resourceFilePath); + HoodieIndexer indexer = new HoodieIndexer(jsc, config); + assertEquals(0, indexer.start(0)); + Option<HoodieInstant> pendingFlights = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); + assertFalse(pendingFlights.isPresent()); + assertFalse(metadataPartitionExists(basePath, context, indexType)); + if (completedIndexInstant.isPresent()) { + assertEquals(completedIndexInstant, metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant()); + } + } + + private HoodieIndexer.Config getHoodieIndexConfig(String indexType, String runMode, String resourceFilePath) { + HoodieIndexer.Config config = new HoodieIndexer.Config(); + String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource(resourceFilePath)).getPath(); + config.basePath = basePath; + config.tableName = tableName; + config.indexTypes = indexType; + config.runningMode = runMode; + config.propsFilePath = propsPath; + return config; } private static HoodieWriteConfig.Builder getWriteConfigBuilder(String basePath, String tableName) { diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/indexer-only-bloom.properties b/hudi-utilities/src/test/resources/delta-streamer-config/indexer-only-bloom.properties new file mode 100644 index 0000000000..6035077437 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/indexer-only-bloom.properties @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +hoodie.metadata.enable=true +hoodie.metadata.index.async=true +hoodie.metadata.index.bloom.filter.enable=true +hoodie.metadata.index.check.timeout.seconds=60 +hoodie.write.concurrency.mode=optimistic_concurrency_control +hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider \ No newline at end of file