This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 555ff1aea4d7cc5947e890866b2a384b5409fb3a Author: Balaji Varadarajan <[email protected]> AuthorDate: Mon May 18 19:27:24 2020 -0700 [HUDI-858] Allow multiple operations to be executed within a single commit (#1633) --- .../org/apache/hudi/config/HoodieWriteConfig.java | 26 +++++++++++++++- .../TestHoodieClientOnCopyOnWriteStorage.java | 36 +++++++++++++++++++++- .../table/timeline/HoodieActiveTimeline.java | 20 ++++++++++-- 3 files changed, 77 insertions(+), 5 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index f88d96a..24984db 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -96,6 +96,20 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; + /** + * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow + * multiple write operations (upsert/buk-insert/...) to be executed within a single commit. + * + * Given Hudi commit protocol, these are generally unsafe operations and user need to handle failure scenarios. It + * only works with COW table. Hudi 0.5.x had stopped this behavior. + * + * Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag + * (disabled by default) which will allow this old behavior. + */ + private static final String ALLOW_MULTI_WRITE_ON_SAME_INSTANT = + "_.hoodie.allow.multi.write.on.same.instant"; + private static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false"; + private ConsistencyGuardConfig consistencyGuardConfig; // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled @@ -187,6 +201,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return StorageLevel.fromString(props.getProperty(WRITE_STATUS_STORAGE_LEVEL)); } + public boolean shouldAllowMultiWriteOnSameInstant() { + return Boolean.parseBoolean(props.getProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT)); + } + public String getWriteStatusClassName() { return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP); } @@ -706,6 +724,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withAllowMultiWriteOnSameInstant(boolean allow) { + props.setProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT, String.valueOf(allow)); + return this; + } + public HoodieWriteConfig build() { // Check for mandatory properties setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM); @@ -721,6 +744,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { DEFAULT_COMBINE_BEFORE_UPSERT); setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_DELETE_PROP), COMBINE_BEFORE_DELETE_PROP, DEFAULT_COMBINE_BEFORE_DELETE); + setDefaultOnCondition(props, !props.containsKey(ALLOW_MULTI_WRITE_ON_SAME_INSTANT), + ALLOW_MULTI_WRITE_ON_SAME_INSTANT, DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT); setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), WRITE_STATUS_STORAGE_LEVEL, DEFAULT_WRITE_STATUS_STORAGE_LEVEL); setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP, @@ -760,7 +785,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { // Ensure Layout Version is good new TimelineLayoutVersion(Integer.parseInt(layoutVersion)); - // Build WriteConfig at the end HoodieWriteConfig config = new HoodieWriteConfig(props); Objects.requireNonNull(config.getBasePath()); diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index c7da7a7..bba8ecd 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -20,8 +20,8 @@ package org.apache.hudi.client; import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieTestDataGenerator; -import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRollingStat; @@ -981,6 +981,40 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { return Pair.of(markerFilePath, result); } + @Test + public void testMultiOperationsPerCommit() throws IOException { + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false) + .withAllowMultiWriteOnSameInstant(true) + .build(); + HoodieWriteClient client = getHoodieWriteClient(cfg); + String firstInstantTime = "0000"; + client.startCommitWithTime(firstInstantTime); + int numRecords = 200; + JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(firstInstantTime, numRecords), 1); + JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, firstInstantTime); + assertTrue("Commit should succeed", client.commit(firstInstantTime, result)); + assertTrue("After explicit commit, commit file should be created", HoodieTestUtils.doesCommitExist(basePath, firstInstantTime)); + + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain " + numRecords + " records", numRecords, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + + String nextInstantTime = "0001"; + client.startCommitWithTime(nextInstantTime); + JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(dataGen.generateUpdates(nextInstantTime, numRecords), 1); + JavaRDD<HoodieRecord> insertRecords = jsc.parallelize(dataGen.generateInserts(nextInstantTime, numRecords), 1); + JavaRDD<WriteStatus> inserts = client.bulkInsert(insertRecords, nextInstantTime); + JavaRDD<WriteStatus> upserts = client.upsert(updateRecords, nextInstantTime); + assertTrue("Commit should succeed", client.commit(nextInstantTime, inserts.union(upserts))); + assertTrue("After explicit commit, commit file should be created", HoodieTestUtils.doesCommitExist(basePath, firstInstantTime)); + int totalRecords = 2 * numRecords; + assertEquals("Must contain " + totalRecords + " records", totalRecords, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + } + /** * Build Hoodie Write Config for small data file sizes. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 389314f..ab16428 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -307,11 +307,16 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) { + transitionState(fromInstant, toInstant, data, false); + } + + private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data, + boolean allowRedundantTransitions) { ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp())); try { if (metaClient.getTimelineLayoutVersion().isNullVersion()) { // Re-create the .inflight file by opening a new file and write the commit metadata in - createFileInMetaPath(fromInstant.getFileName(), data, false); + createFileInMetaPath(fromInstant.getFileName(), data, allowRedundantTransitions); Path fromInstantPath = new Path(metaClient.getMetaPath(), fromInstant.getFileName()); Path toInstantPath = new Path(metaClient.getMetaPath(), toInstant.getFileName()); boolean success = metaClient.getFs().rename(fromInstantPath, toInstantPath); @@ -324,7 +329,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { ValidationUtils.checkArgument(metaClient.getFs().exists(new Path(metaClient.getMetaPath(), fromInstant.getFileName()))); // Use Write Once to create Target File - createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data); + if (allowRedundantTransitions) { + createFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data); + } else { + createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data); + } LOG.info("Create new file for toInstant ?" + new Path(metaClient.getMetaPath(), toInstant.getFileName())); } } catch (IOException e) { @@ -367,9 +376,14 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content) { + transitionRequestedToInflight(requested, content, false); + } + + public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content, + boolean allowRedundantTransitions) { HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, requested.getAction(), requested.getTimestamp()); ValidationUtils.checkArgument(requested.isRequested(), "Instant " + requested + " in wrong state"); - transitionState(requested, inflight, content); + transitionState(requested, inflight, content, allowRedundantTransitions); } public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> content) {
