This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7be8944723d5cd5c1c54f4f94bdafe78351570ac Author: 董可伦 <[email protected]> AuthorDate: Thu Dec 30 11:53:17 2021 +0800 [HUDI-2675] Fix the exception 'Not an Avro data file' when archive and clean (#4016) --- .../hudi/table/HoodieTimelineArchiveLog.java | 37 ++++++----- .../table/action/clean/CleanActionExecutor.java | 14 +++-- .../hudi/table/action/clean/CleanPlanner.java | 14 +++-- .../hudi/io/TestHoodieTimelineArchiveLog.java | 45 ++++---------- .../java/org/apache/hudi/table/TestCleaner.java | 71 +++++++++++++++++++--- .../hudi/testutils/HoodieClientTestHarness.java | 34 ++++++++++- .../table/timeline/HoodieActiveTimeline.java | 12 +--- .../table/timeline/HoodieDefaultTimeline.java | 7 ++- .../hudi/common/table/timeline/HoodieTimeline.java | 2 + .../hudi/common/testutils/FileCreateUtils.java | 17 +++++- .../hudi/common/testutils/HoodieTestTable.java | 18 ++++-- 11 files changed, 185 insertions(+), 86 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 43a85dd..58e03f5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -156,7 +156,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> { private Stream<HoodieInstant> getCommitInstantsToArchive() { // TODO (na) : Add a way to return actions associated with a timeline and then merge/unify - // with logic above to avoid Stream.concats + // with logic above to avoid Stream.concat HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); Option<HoodieInstant> oldestPendingCompactionAndReplaceInstant = table.getActiveTimeline() @@ -176,7 +176,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> { // Actually do the commits Stream<HoodieInstant> instantToArchiveStream = commitTimeline.getInstants() .filter(s -> { - // if no savepoint present, then dont filter + // if no savepoint present, then don't filter return !(firstSavepoint.isPresent() && HoodieTimeline.compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, s.getTimestamp())); }).filter(s -> { // Ensure commits >= oldest pending compaction commit is retained @@ -233,9 +233,9 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> { private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException { LOG.info("Deleting instants " + archivedInstants); boolean success = true; - List<String> instantFiles = archivedInstants.stream().map(archivedInstant -> { - return new Path(metaClient.getMetaPath(), archivedInstant.getFileName()); - }).map(Path::toString).collect(Collectors.toList()); + List<String> instantFiles = archivedInstants.stream().map(archivedInstant -> + new Path(metaClient.getMetaPath(), archivedInstant.getFileName()) + ).map(Path::toString).collect(Collectors.toList()); context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants"); Map<String, Boolean> resultDeleteInstantFiles = FSUtils.parallelizeFilesProcess(context, @@ -265,7 +265,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> { || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); LOG.info("Latest Committed Instant=" + latestCommitted); if (latestCommitted.isPresent()) { - success &= deleteAllInstantsOlderorEqualsInAuxMetaFolder(latestCommitted.get()); + success &= deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); } return success; } @@ -277,7 +277,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> { * @return success if all eligible file deleted successfully * @throws IOException in case of error */ - private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thresholdInstant) throws IOException { + private boolean deleteAllInstantsOlderOrEqualsInAuxMetaFolder(HoodieInstant thresholdInstant) throws IOException { List<HoodieInstant> instants = null; boolean success = true; try { @@ -291,12 +291,12 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> { * On some FSs deletion of all files in the directory can auto remove the directory itself. * GCS is one example, as it doesn't have real directories and subdirectories. When client * removes all the files from a "folder" on GCS is has to create a special "/" to keep the folder - * around. If this doesn't happen (timeout, misconfigured client, ...) folder will be deleted and + * around. If this doesn't happen (timeout, mis configured client, ...) folder will be deleted and * in this case we should not break when aux folder is not found. * GCS information: (https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork) */ LOG.warn("Aux path not found. Skipping: " + metaClient.getMetaAuxiliaryPath()); - return success; + return true; } List<HoodieInstant> instantsToBeDeleted = @@ -308,7 +308,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> { Path metaFile = new Path(metaClient.getMetaAuxiliaryPath(), deleteInstant.getFileName()); if (metaClient.getFs().exists(metaFile)) { success &= metaClient.getFs().delete(metaFile, false); - LOG.info("Deleted instant file in auxiliary metapath : " + metaFile); + LOG.info("Deleted instant file in auxiliary meta path : " + metaFile); } } return success; @@ -321,10 +321,19 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> { List<IndexedRecord> records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { try { - deleteAnyLeftOverMarkers(context, hoodieInstant); - records.add(convertToAvroRecord(hoodieInstant)); - if (records.size() >= this.config.getCommitArchivalBatchSize()) { - writeToFile(wrapperSchema, records); + if (table.getActiveTimeline().isEmpty(hoodieInstant) + && ( + hoodieInstant.getAction().equals(HoodieTimeline.CLEAN_ACTION) + || (hoodieInstant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && hoodieInstant.isCompleted()) + ) + ) { + table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant); + } else { + deleteAnyLeftOverMarkers(context, hoodieInstant); + records.add(convertToAvroRecord(hoodieInstant)); + if (records.size() >= this.config.getCommitArchivalBatchSize()) { + writeToFile(wrapperSchema, records); + } } } catch (Exception e) { LOG.error("Failed to archive commits, .commit file: " + hoodieInstant.getFileName(), e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index f1f2e53..9813b2b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -230,11 +230,15 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends .filterInflightsAndRequested().getInstants().collect(Collectors.toList()); if (pendingCleanInstants.size() > 0) { pendingCleanInstants.forEach(hoodieInstant -> { - LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant); - try { - cleanMetadataList.add(runPendingClean(table, hoodieInstant)); - } catch (Exception e) { - LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e); + if (table.getCleanTimeline().isEmpty(hoodieInstant)) { + table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant); + } else { + LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant); + try { + cleanMetadataList.add(runPendingClean(table, hoodieInstant)); + } catch (Exception e) { + LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e); + } } }); table.getMetaClient().reloadActiveTimeline(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 80727ff..27937af 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -146,11 +146,15 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser if (config.incrementalCleanerModeEnabled()) { Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant(); if (lastClean.isPresent()) { - HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils - .deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get()); - if ((cleanMetadata.getEarliestCommitToRetain() != null) - && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) { - return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain); + if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) { + hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get()); + } else { + HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils + .deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get()); + if ((cleanMetadata.getEarliestCommitToRetain() != null) + && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) { + return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain); + } } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java index 2112031..faa0621 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java @@ -18,15 +18,10 @@ package org.apache.hudi.io; -import org.apache.hudi.avro.model.HoodieActionInstant; -import org.apache.hudi.avro.model.HoodieCleanMetadata; -import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.utils.MetadataConversionUtils; -import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; @@ -35,13 +30,11 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -70,11 +63,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -502,8 +493,9 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { HoodieTimeline.CLEAN_ACTION), expectedActiveInstants, commitsAfterArchival); } - @Test - public void testArchiveCompletedRollbackAndClean() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchiveCompletedRollbackAndClean(boolean isEmpty) throws Exception { init(); int minInstantsToKeep = 2; int maxInstantsToKeep = 10; @@ -519,11 +511,11 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { int startInstant = 1; for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) { - createCleanMetadata(startInstant + "", false); + createCleanMetadata(startInstant + "", false, isEmpty || i % 2 == 0); } for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) { - createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", false); + createCommitAndRollbackFile(startInstant + 1 + "", startInstant + "", false, isEmpty || i % 2 == 0); } HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); @@ -701,31 +693,16 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { return allInstants; } - private HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly) throws IOException { - HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), - CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); - if (inflightOnly) { - HoodieTestTable.of(metaClient).addInflightClean(instantTime, cleanerPlan); - } else { - HoodieCleanStat cleanStats = new HoodieCleanStat( - HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, - HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)], - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList(), - instantTime); - HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); - HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata); - } - return new HoodieInstant(inflightOnly, "clean", instantTime); + private void createCommitAndRollbackFile(String commitToRollback, String rollbackTIme, boolean isRollbackInflight) throws IOException { + createCommitAndRollbackFile(commitToRollback, rollbackTIme, isRollbackInflight, false); } - private void createCommitAndRollbackFile(String commitToRollback, String rollbackTIme, boolean isRollbackInflight) throws IOException { + private void createCommitAndRollbackFile(String commitToRollback, String rollbackTIme, boolean isRollbackInflight, boolean isEmpty) throws IOException { HoodieTestDataGenerator.createCommitFile(basePath, commitToRollback, wrapperFs.getConf()); - createRollbackMetadata(rollbackTIme, commitToRollback, isRollbackInflight); + createRollbackMetadata(rollbackTIme, commitToRollback, isRollbackInflight, isEmpty); } - private HoodieInstant createRollbackMetadata(String rollbackTime, String commitToRollback, boolean inflight) throws IOException { + private HoodieInstant createRollbackMetadata(String rollbackTime, String commitToRollback, boolean inflight, boolean isEmpty) throws IOException { if (inflight) { HoodieTestTable.of(metaClient).addInflightRollback(rollbackTime); } else { @@ -738,7 +715,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness { .setPartitionMetadata(Collections.emptyMap()) .setInstantsRollback(Collections.emptyList()) .build(); - HoodieTestTable.of(metaClient).addRollback(rollbackTime, hoodieRollbackMetadata); + HoodieTestTable.of(metaClient).addRollback(rollbackTime, hoodieRollbackMetadata, isEmpty); } return new HoodieInstant(inflight, "rollback", rollbackTime); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 491bba2..8a1f4ab 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -259,7 +259,7 @@ public class TestCleaner extends HoodieClientTestBase { * @param insertFn Insert API to be tested * @param upsertFn Upsert API to be tested * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during - * record generation to also tag the regards (de-dupe is implicit as we use uniq record-gen APIs) + * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) * @throws Exception in case of errors */ private void testInsertAndCleanByVersions( @@ -274,7 +274,7 @@ public class TestCleaner extends HoodieClientTestBase { .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()) .build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); @@ -429,7 +429,7 @@ public class TestCleaner extends HoodieClientTestBase { * @param insertFn Insert API to be tested * @param upsertFn Upsert API to be tested * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during - * record generation to also tag the regards (de-dupe is implicit as we use uniq record-gen APIs) + * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) * @throws Exception in case of errors */ private void testInsertAndCleanByCommits( @@ -550,10 +550,10 @@ public class TestCleaner extends HoodieClientTestBase { HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); assertTrue(timeline.getTimelineOfActions( CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants() == 3); - Option<HoodieInstant> rolleBackInstantForFailedCommit = timeline.getTimelineOfActions( + Option<HoodieInstant> rollBackInstantForFailedCommit = timeline.getTimelineOfActions( CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant(); HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeAvroMetadata( - timeline.getInstantDetails(rolleBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class); + timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class); // Rollback of one of the failed writes should have deleted 3 files assertEquals(3, rollbackMetadata.getTotalFilesDeleted()); } @@ -750,6 +750,59 @@ public class TestCleaner extends HoodieClientTestBase { assertTrue(testTable.baseFileExists(p0, "00000000000003", file3P0C2)); } + @Test + public void testCleanEmptyInstants() throws Exception { + HoodieWriteConfig config = + HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + + int commitCount = 20; + int cleanCount = 20; + + int startInstant = 1; + for (int i = 0; i < commitCount; i++, startInstant++) { + String commitTime = makeNewCommitTime(startInstant); + HoodieTestTable.of(metaClient).addCommit(commitTime); + } + + for (int i = 0; i < cleanCount; i++, startInstant++) { + String commitTime = makeNewCommitTime(startInstant); + createCleanMetadata(commitTime + "", false, true); + } + + List<HoodieCleanStat> cleanStats = runCleaner(config); + HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); + + assertEquals(0, cleanStats.size(), "Must not clean any files"); + assertEquals(1, timeline.getTimelineOfActions( + CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().countInstants()); + assertEquals(0, timeline.getTimelineOfActions( + CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflights().countInstants()); + assertEquals(--cleanCount, timeline.getTimelineOfActions( + CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants().countInstants()); + assertTrue(timeline.getTimelineOfActions( + CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--startInstant))); + + cleanStats = runCleaner(config); + timeline = metaClient.reloadActiveTimeline(); + + assertEquals(0, cleanStats.size(), "Must not clean any files"); + assertEquals(1, timeline.getTimelineOfActions( + CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().countInstants()); + assertEquals(0, timeline.getTimelineOfActions( + CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflights().countInstants()); + assertEquals(--cleanCount, timeline.getTimelineOfActions( + CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants().countInstants()); + assertTrue(timeline.getTimelineOfActions( + CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(makeNewCommitTime(--startInstant))); + } + + /** * Test HoodieTable.clean() Cleaning by versions logic for MOR table with Log files. */ @@ -1425,7 +1478,7 @@ public class TestCleaner extends HoodieClientTestBase { * * @param insertFn Insert API to be tested * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during - * record generation to also tag the regards (de-dupe is implicit as we use uniq record-gen APIs) + * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) * @throws Exception in case of errors */ private void testInsertAndCleanFailedWritesByVersions( @@ -1441,7 +1494,7 @@ public class TestCleaner extends HoodieClientTestBase { .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .build(); - try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); @@ -1474,10 +1527,10 @@ public class TestCleaner extends HoodieClientTestBase { HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); assertTrue(timeline.getTimelineOfActions( CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().countInstants() == 3); - Option<HoodieInstant> rolleBackInstantForFailedCommit = timeline.getTimelineOfActions( + Option<HoodieInstant> rollBackInstantForFailedCommit = timeline.getTimelineOfActions( CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants().lastInstant(); HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeAvroMetadata( - timeline.getInstantDetails(rolleBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class); + timeline.getInstantDetails(rollBackInstantForFailedCommit.get()).get(), HoodieRollbackMetadata.class); // Rollback of one of the failed writes should have deleted 3 files assertEquals(3, rollbackMetadata.getTotalFilesDeleted()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 1e07485..d312b64 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -17,13 +17,18 @@ package org.apache.hudi.testutils; +import org.apache.hudi.avro.model.HoodieActionInstant; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieRecord; @@ -31,7 +36,9 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; @@ -85,12 +92,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; import scala.Tuple2; +import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -671,7 +680,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im Assertions.assertEquals(MetadataPartitionType.values().length, metadataTablePartitions.size()); // Metadata table should automatically compact and clean - // versions are +1 as autoclean / compaction happens end of commits + // versions are +1 as autoClean / compaction happens end of commits int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1; HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline()); metadataTablePartitions.forEach(partition -> { @@ -682,4 +691,27 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im + numFileVersions + " but was " + latestSlices.size()); }); } + + public HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly) throws IOException { + return createCleanMetadata(instantTime, inflightOnly, false); + } + + public HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly, boolean isEmpty) throws IOException { + HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), + CleanPlanV2MigrationHandler.VERSION, new HashMap<>()); + if (inflightOnly) { + HoodieTestTable.of(metaClient).addInflightClean(instantTime, cleanerPlan); + } else { + HoodieCleanStat cleanStats = new HoodieCleanStat( + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, + HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)], + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + instantTime); + HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); + HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata, isEmpty); + } + return new HoodieInstant(inflightOnly, "clean", instantTime); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index b6f1a32..ca131f3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -79,13 +79,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } /** - * Format the java.time.Instant to a String representing the timestamp of a Hoodie Instant. - */ - public static String formatInstantTime(Instant timestamp) { - return HoodieInstantTimeGenerator.formatInstantTime(timestamp); - } - - /** * Format the Date to a String representing the timestamp of a Hoodie Instant. */ public static String formatDate(Date timestamp) { @@ -198,9 +191,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } } - public void deletePendingIfExists(HoodieInstant.State state, String action, String instantStr) { - HoodieInstant instant = new HoodieInstant(state, action, instantStr); - ValidationUtils.checkArgument(!instant.isCompleted()); + public void deleteEmptyInstantIfExists(HoodieInstant instant) { + ValidationUtils.checkArgument(isEmpty(instant)); deleteInstantFileIfExists(instant); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index aa48db7..15691f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -123,7 +123,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline { @Override public HoodieTimeline getCompletedReplaceTimeline() { return new HoodieDefaultTimeline( - instants.stream().filter(s -> s.getAction().equals(REPLACE_COMMIT_ACTION)).filter(s -> s.isCompleted()), details); + instants.stream().filter(s -> s.getAction().equals(REPLACE_COMMIT_ACTION)).filter(HoodieInstant::isCompleted), details); } @Override @@ -375,6 +375,11 @@ public class HoodieDefaultTimeline implements HoodieTimeline { } @Override + public boolean isEmpty(HoodieInstant instant) { + return getInstantDetails(instant).get().length == 0; + } + + @Override public String toString() { return this.getClass().getName() + ": " + instants.stream().map(Object::toString).collect(Collectors.joining(",")); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index bb6bed8..3b2779c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -289,6 +289,8 @@ public interface HoodieTimeline extends Serializable { */ Option<byte[]> getInstantDetails(HoodieInstant instant); + boolean isEmpty(HoodieInstant instant); + /** * Check WriteOperationType is DeletePartition. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 3754f37..1968ef4 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.testutils; +import org.apache.directory.api.util.Strings; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; @@ -219,14 +220,26 @@ public class FileCreateUtils { createMetaFile(basePath, instantTime, HoodieTimeline.CLEAN_EXTENSION, serializeCleanMetadata(metadata).get()); } + public static void createCleanFile(String basePath, String instantTime, HoodieCleanMetadata metadata, boolean isEmpty) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.CLEAN_EXTENSION, isEmpty ? Strings.EMPTY_BYTES : serializeCleanMetadata(metadata).get()); + } + public static void createRequestedCleanFile(String basePath, String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException { createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_CLEAN_EXTENSION, serializeCleanerPlan(cleanerPlan).get()); } + public static void createRequestedCleanFile(String basePath, String instantTime, HoodieCleanerPlan cleanerPlan, boolean isEmpty) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_CLEAN_EXTENSION, isEmpty ? Strings.EMPTY_BYTES : serializeCleanerPlan(cleanerPlan).get()); + } + public static void createInflightCleanFile(String basePath, String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException { createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_CLEAN_EXTENSION, serializeCleanerPlan(cleanerPlan).get()); } + public static void createInflightCleanFile(String basePath, String instantTime, HoodieCleanerPlan cleanerPlan, boolean isEmpty) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_CLEAN_EXTENSION, isEmpty ? Strings.EMPTY_BYTES : serializeCleanerPlan(cleanerPlan).get()); + } + public static void createRequestedRollbackFile(String basePath, String instantTime, HoodieRollbackPlan plan) throws IOException { createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, serializeRollbackPlan(plan).get()); } @@ -235,8 +248,8 @@ public class FileCreateUtils { createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION); } - public static void createRollbackFile(String basePath, String instantTime, HoodieRollbackMetadata hoodieRollbackMetadata) throws IOException { - createMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION, serializeRollbackMetadata(hoodieRollbackMetadata).get()); + public static void createRollbackFile(String basePath, String instantTime, HoodieRollbackMetadata hoodieRollbackMetadata, boolean isEmpty) throws IOException { + createMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION, isEmpty ? Strings.EMPTY_BYTES : serializeRollbackMetadata(hoodieRollbackMetadata).get()); } public static void createRestoreFile(String basePath, String instantTime, HoodieRestoreMetadata hoodieRestoreMetadata) throws IOException { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 8bd1ea7..7b8148a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -279,9 +279,13 @@ public class HoodieTestTable { } public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan cleanerPlan, HoodieCleanMetadata metadata) throws IOException { - createRequestedCleanFile(basePath, instantTime, cleanerPlan); - createInflightCleanFile(basePath, instantTime, cleanerPlan); - createCleanFile(basePath, instantTime, metadata); + return addClean(instantTime, cleanerPlan, metadata, false); + } + + public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan cleanerPlan, HoodieCleanMetadata metadata, boolean isEmpty) throws IOException { + createRequestedCleanFile(basePath, instantTime, cleanerPlan, isEmpty); + createInflightCleanFile(basePath, instantTime, cleanerPlan, isEmpty); + createCleanFile(basePath, instantTime, metadata, isEmpty); currentInstantTime = instantTime; return this; } @@ -324,8 +328,12 @@ public class HoodieTestTable { } public HoodieTestTable addRollback(String instantTime, HoodieRollbackMetadata rollbackMetadata) throws IOException { + return addRollback(instantTime, rollbackMetadata, false); + } + + public HoodieTestTable addRollback(String instantTime, HoodieRollbackMetadata rollbackMetadata, boolean isEmpty) throws IOException { createInflightRollbackFile(basePath, instantTime); - createRollbackFile(basePath, instantTime, rollbackMetadata); + createRollbackFile(basePath, instantTime, rollbackMetadata, isEmpty); currentInstantTime = instantTime; return this; } @@ -1015,7 +1023,7 @@ public class HoodieTestTable { * @param tableType - Hudi table type * @param commitTime - Write commit time * @param partitionToFilesNameLengthMap - Map of partition names to its list of files and their lengths - * @return Test tabke state for the requested partitions and files + * @return Test table state for the requested partitions and files */ private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(WriteOperationType operationType, HoodieTableType tableType,
