This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0fbf3128196 [HUDI-8040] Fix
SimpleConcurrentFileWritesConflictResolutionStrategy get clustering instant
contains insert overwrite (#11691)
0fbf3128196 is described below
commit 0fbf31281964a74e1d1595bc830cf04e8196063e
Author: xiaodong <[email protected]>
AuthorDate: Sat Aug 3 09:24:11 2024 +0800
[HUDI-8040] Fix SimpleConcurrentFileWritesConflictResolutionStrategy get
clustering instant contains insert overwrite (#11691)
---
...urrentFileWritesConflictResolutionStrategy.java | 3 +++
.../TestConflictResolutionStrategyUtil.java | 13 +++++++++++
...urrentFileWritesConflictResolutionStrategy.java | 26 ++++++++++++++++++++++
3 files changed, 42 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
index c162aee380a..3c2c0d21e3e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieWriteConflictException;
@@ -62,6 +63,8 @@ public class
SimpleConcurrentFileWritesConflictResolutionStrategy
Stream<HoodieInstant> compactionAndClusteringPendingTimeline =
activeTimeline
.filterPendingReplaceClusteringAndCompactionTimeline()
+ .filter(instant -> ClusteringUtils.isClusteringInstant(activeTimeline,
instant)
+ || HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction()))
.findInstantsAfter(currentInstant.getTimestamp())
.getInstantsAsStream();
return Stream.concat(completedCommitsInstantStream,
compactionAndClusteringPendingTimeline);
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
index d6988cae0fd..1ebe3f05d1e 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
@@ -144,6 +144,19 @@ public class TestConflictResolutionStrategyUtil {
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
}
+ public static void createPendingInsertOverwrite(String instantTime,
WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) throws
Exception {
+ //insert_overwrite
+ String fileId1 = "file-1";
+ String fileId2 = "file-2";
+
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata = new
HoodieRequestedReplaceMetadata();
+ HoodieReplaceCommitMetadata replaceMetadata = new
HoodieReplaceCommitMetadata();
+
+ HoodieTestTable.of(metaClient)
+ .addPendingReplace(instantTime, Option.of(requestedReplaceMetadata),
Option.empty())
+
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
fileId1, fileId2);
+ }
+
public static void createReplace(String instantTime, WriteOperationType
writeOperationType, HoodieTableMetaClient metaClient) throws Exception {
String fileId1 = "file-1";
String fileId2 = "file-2";
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
index d2ee5d67285..a2ace63bdb6 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleConcurrentFileWritesConflictResolutionStrategy.java
@@ -51,6 +51,7 @@ import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyU
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createInflightCommit;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCompaction;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingCluster;
+import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createPendingInsertOverwrite;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createReplace;
import static
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createRequestedCommit;
@@ -346,6 +347,31 @@ public class
TestSimpleConcurrentFileWritesConflictResolutionStrategy extends Ho
}
}
+ @Test
+ public void tstConcurrentWritesWithPendingInsertOverwriteReplace() throws
Exception {
+ createCommit(metaClient.createNewInstantTime(), metaClient);
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ // consider commits before this are all successful
+ Option<HoodieInstant> lastSuccessfulInstant =
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+
+ // writer 1 starts
+ String currentWriterInstant = metaClient.createNewInstantTime();
+ createInflightCommit(currentWriterInstant, metaClient);
+
+ // insert_overwrite 1 gets scheduled and inflighted
+ String newInstantTime = metaClient.createNewInstantTime();
+ createPendingInsertOverwrite(newInstantTime,
WriteOperationType.INSERT_OVERWRITE, metaClient);
+
+ Option<HoodieInstant> currentInstant = Option.of(new
HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
currentWriterInstant));
+ SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new
SimpleConcurrentFileWritesConflictResolutionStrategy();
+ HoodieCommitMetadata currentMetadata =
createCommitMetadata(currentWriterInstant);
+ metaClient.reloadActiveTimeline();
+ List<HoodieInstant> candidateInstants =
strategy.getCandidateInstants(metaClient, currentInstant.get(),
lastSuccessfulInstant).collect(
+ Collectors.toList());
+ // writer 1 will not conflicts with insert_overwrite 1
+ Assertions.assertTrue(candidateInstants.size() == 0);
+ }
+
// try to simulate HUDI-3355
@Test
public void testConcurrentWritesWithPendingInstants() throws Exception {