This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 31dbd89d6c9 [HUDI-7507] Fixing Rollbacks and Cleaning to acquire locks
as needed (#13064)
31dbd89d6c9 is described below
commit 31dbd89d6c99f704211aae0bf8dedf5320bab2f4
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jun 4 22:01:58 2025 -0700
[HUDI-7507] Fixing Rollbacks and Cleaning to acquire locks as needed
(#13064)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 44 ++++-
.../apache/hudi/client/BaseHoodieWriteClient.java | 23 +--
.../hudi/client/timeline/TimestampUtils.java | 10 +-
.../org/apache/hudi/config/HoodieWriteConfig.java | 2 +-
.../java/org/apache/hudi/table/HoodieTable.java | 65 +++++---
.../rollback/BaseRollbackPlanActionExecutor.java | 22 ++-
.../HoodieFlinkMergeOnReadTableCompactor.java | 3 +-
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 3 +-
.../HoodieJavaMergeOnReadTableCompactor.java | 3 +-
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 3 +-
.../commit/BaseSparkCommitActionExecutor.java | 9 +-
.../TestHoodieClientOnMergeOnReadStorage.java | 3 +-
.../table/action/compact/TestAsyncCompaction.java | 2 +-
.../rollback/TestRollbackPlanActionExecutor.java | 179 +++++++++++++++++++++
.../TestHoodieSparkMergeOnReadTableRollback.java | 5 +-
.../sink/clustering/HoodieFlinkClusteringJob.java | 3 +-
.../hudi/sink/compact/HoodieFlinkCompactor.java | 2 +-
.../java/org/apache/hudi/util/ClusteringUtil.java | 6 +-
.../java/org/apache/hudi/util/CompactionUtil.java | 6 +-
.../command/procedures/RunCleanProcedure.scala | 2 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 2 +-
21 files changed, 328 insertions(+), 69 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 1b7e854b0f9..62fa6f98d4a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -80,6 +80,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
@@ -216,7 +217,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
HoodieInstant inflightInstant =
HoodieTimeline.getLogCompactionInflightInstant(logCompactionInstantTime);
if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
LOG.info("Found Log compaction inflight file. Rolling back the commit
and exiting.");
- table.rollbackInflightLogCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+ table.rollbackInflightLogCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
+ Option.of(txnManager));
table.getMetaClient().reloadActiveTimeline();
throw new HoodieException("Execution is aborted since it found an
Inflight logcompaction,"
+ "log compaction plans are mutable plans, so reschedule another
logcompaction.");
@@ -290,7 +292,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant =
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+ table.rollbackInflightCompaction(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
+ Option.of(txnManager));
table.getMetaClient().reloadActiveTimeline();
}
compactionTimer = metrics.getCompactionCtx();
@@ -449,7 +452,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
HoodieInstant inflightInstant =
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
if
(pendingClusteringTimeline.isPendingClusterInstant(inflightInstant.getTimestamp()))
{
- table.rollbackInflightClustering(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
+ table.rollbackInflightClustering(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false),
+ Option.of(txnManager));
table.getMetaClient().reloadActiveTimeline();
} else {
throw new HoodieClusteringException("Non clustering replace-commit
inflight at timestamp " + clusteringInstant);
@@ -483,7 +487,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
HoodieTimeline pendingClusteringTimeline =
table.getActiveTimeline().filterPendingReplaceTimeline();
HoodieInstant inflightInstant =
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightClustering(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), true);
+ table.rollbackInflightClustering(inflightInstant, commitToRollback ->
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), true,
+ Option.of(txnManager));
table.getMetaClient().reloadActiveTimeline();
return true;
}
@@ -746,7 +751,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
LOG.info("Cleaner started");
// proceed only if multiple clean schedules are enabled or if there are
no pending cleans.
if (scheduleInline) {
- scheduleTableServiceInternal(cleanInstantTime, Option.empty(),
TableServiceType.CLEAN);
+ scheduleClean(cleanInstantTime);
table.getMetaClient().reloadActiveTimeline();
}
@@ -769,6 +774,16 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
return metadata;
}
+ private void scheduleClean(String cleanInstantTime) {
+ HoodieInstant cleanInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, CLEAN_ACTION, cleanInstantTime);
+ try {
+ txnManager.beginTransaction(Option.of(cleanInstant), Option.empty());
+ scheduleTableServiceInternal(cleanInstantTime, Option.empty(),
TableServiceType.CLEAN);
+ } finally {
+ txnManager.endTransaction(Option.of(cleanInstant));
+ }
+ }
+
/**
* Trigger archival for the table. This ensures that the number of commits
do not explode
* and keep increasing unbounded over time.
@@ -1034,8 +1049,7 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
+ "(exists in active timeline: %s), with rollback plan: %s",
rollbackInstantTime, commitInstantOpt.isPresent(),
pendingRollbackInfo.isPresent()));
Option<HoodieRollbackPlan> rollbackPlanOption =
pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
- .orElseGet(() -> table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(),
- false));
+ .orElseGet(() -> scheduleRollback(table, rollbackInstantTime,
commitInstantOpt, skipLocking));
if (rollbackPlanOption.isPresent()) {
// There can be a case where the inflight rollback failed after the
instant files
// are deleted for commitInstantTime, so that commitInstantOpt is
empty as it is
@@ -1065,6 +1079,22 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
}
}
+ private Option<HoodieRollbackPlan> scheduleRollback(HoodieTable table,
String rollbackInstantTime, Option<HoodieInstant> commitInstantOpt,
+ boolean skipLocking) {
+ HoodieInstant rollbackInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, rollbackInstantTime,
HoodieTimeline.ROLLBACK_ACTION);
+ try {
+ if (!skipLocking) {
+ txnManager.beginTransaction(Option.of(rollbackInstant),
Option.empty());
+ }
+ return table.scheduleRollback(context, rollbackInstantTime,
commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers(),
+ false);
+ } finally {
+ if (!skipLocking) {
+ txnManager.endTransaction(Option.of(rollbackInstant));
+ }
+ }
+ }
+
/**
* Main API to rollback failed bootstrap.
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 780d48d16bc..d4f41d4c9fd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -52,6 +52,7 @@ import
org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
@@ -342,8 +343,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
protected abstract void validateTimestamp(HoodieTableMetaClient metaClient,
String instantTime);
protected void validateTimestampInternal(HoodieTableMetaClient metaClient,
String instantTime) {
- if (config.shouldEnableTimestampOrderinValidation() &&
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
- TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
+ if (config.shouldEnableTimestampOrderingValidation() &&
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ TimestampUtils.validateForLatestTimestamp(Either.left(metaClient),
metaClient.isMetadataTable(), instantTime);
}
}
@@ -829,20 +830,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* cleaned)
*/
public HoodieCleanMetadata clean(String cleanInstantTime) throws
HoodieIOException {
- return clean(cleanInstantTime, true, false);
- }
-
- /**
- * Clean up any stale/old files/data lying around (either on file storage or
index storage) based on the
- * configurations and CleaningPolicy used. (typically files that no longer
can be used by a running query can be
- * cleaned)
- * @param cleanInstantTime instant time for clean.
- * @param skipLocking if this is triggered by another parent transaction,
locking can be skipped.
- * @return instance of {@link HoodieCleanMetadata}.
- */
- @Deprecated
- public HoodieCleanMetadata clean(String cleanInstantTime, boolean
skipLocking) throws HoodieIOException {
- return clean(cleanInstantTime, true, false);
+ return clean(cleanInstantTime, true);
}
/**
@@ -853,9 +841,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* of clean.
* @param cleanInstantTime instant time for clean.
* @param scheduleInline true if needs to be scheduled inline. false
otherwise.
- * @param skipLocking if this is triggered by another parent transaction,
locking can be skipped.
*/
- public HoodieCleanMetadata clean(String cleanInstantTime, boolean
scheduleInline, boolean skipLocking) throws HoodieIOException {
+ public HoodieCleanMetadata clean(String cleanInstantTime, boolean
scheduleInline) throws HoodieIOException {
return tableServiceClient.clean(cleanInstantTime, scheduleInline);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
index dfc1578290b..9a380c46707 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/TimestampUtils.java
@@ -19,16 +19,18 @@
package org.apache.hudi.client.timeline;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.ValidationUtils;
public class TimestampUtils {
- public static void validateForLatestTimestamp(HoodieTableMetaClient
metaClient, String instantTime) {
+ public static void validateForLatestTimestamp(Either<HoodieTableMetaClient,
HoodieActiveTimeline> metaClientOrActiveTimeline, boolean isMetadataTable,
String instantTime) {
// validate that the instant for which requested is about to be created is
the latest in the timeline.
- if (!metaClient.isMetadataTable()) { // lets validate data table that
timestamps are generated in monotically increasing order.
- HoodieTableMetaClient reloadedMetaClient =
HoodieTableMetaClient.reload(metaClient);
-
reloadedMetaClient.getActiveTimeline().getWriteTimeline().lastInstant().ifPresent(entry
-> {
+ if (!isMetadataTable) { // lets validate data table that timestamps are
generated in monotically increasing order.
+ HoodieActiveTimeline reloadedActiveTimeline =
metaClientOrActiveTimeline.isLeft() ?
metaClientOrActiveTimeline.asLeft().reloadActiveTimeline() :
metaClientOrActiveTimeline.asRight();
+ reloadedActiveTimeline.getWriteTimeline().lastInstant().ifPresent(entry
-> {
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(entry.getTimestamp(),
HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime),
"Found later commit time " + entry + ", compared to the current
instant " + instantTime + ", hence failing to create requested commit meta
file");
});
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index f4b2be343cb..a369b748ee2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2629,7 +2629,7 @@ public class HoodieWriteConfig extends HoodieConfig {
return props.getInteger(WRITES_FILEID_ENCODING,
HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
}
- public Boolean shouldEnableTimestampOrderinValidation() {
+ public Boolean shouldEnableTimestampOrderingValidation() {
return getBoolean(ENABLE_TIMESTAMP_ORDERING_VALIDATION);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 8ff7b279024..9fc39341e19 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -32,6 +32,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.timeline.TimestampUtils;
+import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -58,6 +59,7 @@ import
org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
+import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -540,6 +542,7 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
* @param instantTime Instant Time for scheduling rollback
* @param instantToRollback instant to be rolled back
* @param shouldRollbackUsingMarkers uses marker based rollback strategy
when set to true. uses list based rollback when false.
+ * @param isRestore {@code true} when invoked as part of restore.
* @return HoodieRollbackPlan containing info on rollback.
*/
public abstract Option<HoodieRollbackPlan>
scheduleRollback(HoodieEngineContext context,
@@ -607,12 +610,12 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
String
restoreInstantTimestamp,
String
savepointToRestoreTimestamp);
- public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
- rollbackInflightCompaction(inflightInstant, s -> Option.empty());
+ public void rollbackInflightCompaction(HoodieInstant inflightInstant,
Option<TransactionManager> txnManagerOpt) {
+ rollbackInflightCompaction(inflightInstant, s -> Option.empty(),
txnManagerOpt);
}
- public void rollbackInflightLogCompaction(HoodieInstant inflightInstant) {
- rollbackInflightLogCompaction(inflightInstant, s -> Option.empty());
+ public void rollbackInflightLogCompaction(HoodieInstant inflightInstant,
Option<TransactionManager> txnManagerOpt) {
+ rollbackInflightLogCompaction(inflightInstant, s -> Option.empty(),
txnManagerOpt);
}
/**
@@ -622,9 +625,10 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
* @param inflightInstant Inflight Compaction Instant
*/
public void rollbackInflightCompaction(HoodieInstant inflightInstant,
- Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
+ Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc,
+ Option<TransactionManager>
txnManagerOpt) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.COMPACTION_ACTION));
- rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
+ rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc,
txnManagerOpt);
}
/**
@@ -634,8 +638,9 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
* @param getPendingRollbackInstantFunc Function to get rollback instant
*/
public void rollbackInflightClustering(HoodieInstant inflightInstant,
- Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
- rollbackInflightClustering(inflightInstant, getPendingRollbackInstantFunc,
false);
+ Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc,
+ Option<TransactionManager>
txnManagerOpt) {
+ rollbackInflightClustering(inflightInstant, getPendingRollbackInstantFunc,
false, txnManagerOpt);
}
/**
@@ -645,9 +650,10 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
* @param getPendingRollbackInstantFunc Function to get rollback instant
*/
public void rollbackInflightClustering(HoodieInstant inflightInstant,
- Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc, boolean
deleteInstants) {
+ Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc, boolean
deleteInstants,
+ Option<TransactionManager>
txnManagerOpt) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION));
- rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc);
+ rollbackInflightInstant(inflightInstant, getPendingRollbackInstantFunc,
txnManagerOpt);
if (deleteInstants) {
// above rollback would still keep requested in the timeline. so, lets
delete it if if are looking to purge the pending clustering fully.
getActiveTimeline().deletePending(new
HoodieInstant(HoodieInstant.State.REQUESTED, inflightInstant.getAction(),
inflightInstant.getTimestamp()));
@@ -661,12 +667,12 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
* @param getPendingRollbackInstantFunc Function to get rollback instant
*/
private void rollbackInflightInstant(HoodieInstant inflightInstant,
- Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc) {
+ Function<String,
Option<HoodiePendingRollbackInfo>> getPendingRollbackInstantFunc,
+ Option<TransactionManager>
txnManagerOpt) {
final String commitTime =
getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
-> entry.getRollbackInstant().getTimestamp())
.orElseGet(HoodieActiveTimeline::createNewInstantTime);
- scheduleRollback(context, commitTime, inflightInstant, false,
config.shouldRollbackUsingMarkers(),
- false);
+ scheduleRollback(commitTime, inflightInstant, txnManagerOpt);
rollback(context, commitTime, inflightInstant, false, false);
getActiveTimeline().revertInstantFromInflightToRequested(inflightInstant);
}
@@ -677,15 +683,26 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
*
* @param inflightInstant Inflight Compaction Instant
*/
- public void rollbackInflightLogCompaction(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>>
getPendingRollbackInstantFunc) {
+ public void rollbackInflightLogCompaction(HoodieInstant inflightInstant,
Function<String, Option<HoodiePendingRollbackInfo>>
getPendingRollbackInstantFunc,
+ Option<TransactionManager>
txnManagerOpt) {
final String commitTime =
getPendingRollbackInstantFunc.apply(inflightInstant.getTimestamp()).map(entry
-> entry.getRollbackInstant().getTimestamp())
.orElseGet(HoodieActiveTimeline::createNewInstantTime);
- scheduleRollback(context, commitTime, inflightInstant, false,
config.shouldRollbackUsingMarkers(),
- false);
+ scheduleRollback(commitTime, inflightInstant, txnManagerOpt);
rollback(context, commitTime, inflightInstant, true, false);
}
+ private void scheduleRollback(String commitTime, HoodieInstant
inflightInstant, Option<TransactionManager> txnManagerOpt) {
+ HoodieInstant rollbackInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, commitTime,
HoodieTimeline.ROLLBACK_ACTION);
+ try {
+ txnManagerOpt.ifPresent(txnManager ->
txnManager.beginTransaction(Option.of(rollbackInstant), Option.empty()));
+ scheduleRollback(context, commitTime, inflightInstant, false,
config.shouldRollbackUsingMarkers(),
+ false);
+ } finally {
+ txnManagerOpt.ifPresent(txnManager ->
txnManager.endTransaction(Option.of(rollbackInstant)));
+ }
+ }
+
/**
* Finalize the written data onto storage. Perform any final cleanups.
*
@@ -897,15 +914,25 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
}
}
+ /**
+ * Validates that the instantTime is latest in the write timeline. This
method is specifically used to avoid additional timeline reloads.
+ * If the caller expects the validation to explicitly reload, please use
{@link #validateForLatestTimestampInternal(Either, boolean, String)}.
+ * @param metaClient instance of {@link HoodieTableMetaClient} to use.
+ * @param instantTime instant time of interest.
+ */
+ public void validateForLatestTimestampWithoutReload(HoodieTableMetaClient
metaClient, String instantTime) {
+
validateForLatestTimestampInternal(Either.right(metaClient.getActiveTimeline()),
metaClient.isMetadataTable(), instantTime);
+ }
+
/**
* Validates that the instantTime is latest in the write timeline.
* @param instantTime instant time of interest.
*/
public abstract void validateForLatestTimestamp(String instantTime);
- protected void validateForLatestTimestampInternal(String instantTime) {
- if (this.config.shouldEnableTimestampOrderinValidation() &&
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
- TimestampUtils.validateForLatestTimestamp(metaClient, instantTime);
+ protected void
validateForLatestTimestampInternal(Either<HoodieTableMetaClient,
HoodieActiveTimeline> metaClientOrActiveTimeline, boolean isMetadataTable,
String instantTime) {
+ if (this.config.shouldEnableTimestampOrderingValidation() &&
config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ TimestampUtils.validateForLatestTimestamp(metaClientOrActiveTimeline,
isMetadataTable, instantTime);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
index 045e8a5a720..cfda4711558 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackPlanActionExecutor.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -113,7 +114,9 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O>
extends BaseActionExecut
HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan(new
HoodieInstantInfo(instantToRollback.getTimestamp(),
instantToRollback.getAction()), rollbackRequests,
LATEST_ROLLBACK_PLAN_VERSION);
if (!skipTimelinePublish) {
- table.validateForLatestTimestamp(rollbackInstant.getTimestamp());
+ if (!canProceedWithRollback(rollbackInstant)) {
+ return Option.empty();
+ }
if
(table.getRollbackTimeline().filterInflightsAndRequested().containsInstant(rollbackInstant.getTimestamp()))
{
LOG.warn("Request Rollback found with instant time " +
rollbackInstant + ", hence skipping scheduling rollback");
} else {
@@ -129,6 +132,23 @@ public class BaseRollbackPlanActionExecutor<T, I, K, O>
extends BaseActionExecut
}
}
+ private boolean canProceedWithRollback(HoodieInstant rollbackInstant) {
+ if
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ // check for concurrent rollbacks. i.e if the commit being rolledback is
already rolled back, we can bail out.
+ HoodieTableMetaClient reloadedMetaClient =
HoodieTableMetaClient.reload(table.getMetaClient());
+ HoodieTimeline reloadedActiveTimeline =
reloadedMetaClient.getActiveTimeline();
+ if
(!reloadedActiveTimeline.filterInflightsAndRequested().containsInstant(instantToRollback.getTimestamp())
+ &&
!reloadedActiveTimeline.filterCompletedInstants().containsInstant(instantToRollback.getTimestamp()))
{
+ // if instant to rollback is already rolled back, we can bail out.
+ return false;
+ } else {
+ // since we had already reloaded the timeline above, lets avoid
additional reload with validateForLatestTimestamp.
+ table.validateForLatestTimestampWithoutReload(reloadedMetaClient,
rollbackInstant.getTimestamp());
+ }
+ }
+ return true;
+ }
+
@Override
public Option<HoodieRollbackPlan> execute() {
// Plan a new rollback action
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
index 058459b1bf6..ee2f31a9bff 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
@@ -49,7 +50,7 @@ public class HoodieFlinkMergeOnReadTableCompactor<T>
? HoodieTimeline.getCompactionInflightInstant(instantTime)
: HoodieTimeline.getLogCompactionInflightInstant(instantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightCompaction(inflightInstant);
+ table.rollbackInflightCompaction(inflightInstant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
}
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index cc91d5d3436..ff6f49b815a 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
@@ -94,7 +95,7 @@ public class HoodieJavaCopyOnWriteTable<T>
@Override
public void validateForLatestTimestamp(String instantTime) {
- validateForLatestTimestampInternal(instantTime);
+ validateForLatestTimestampInternal(Either.left(metaClient),
metaClient.isMetadataTable(), instantTime);
}
@Override
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
index d8994ce02c3..06b962aa056 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/compact/HoodieJavaMergeOnReadTableCompactor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
@@ -48,7 +49,7 @@ public class HoodieJavaMergeOnReadTableCompactor<T>
}
HoodieInstant inflightInstant =
HoodieTimeline.getCompactionInflightInstant(instantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
- table.rollbackInflightCompaction(inflightInstant);
+ table.rollbackInflightCompaction(inflightInstant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index fea0a19dcdf..869b9c46824 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -106,7 +107,7 @@ public class HoodieSparkCopyOnWriteTable<T>
@Override
public void validateForLatestTimestamp(String instantTime) {
- validateForLatestTimestampInternal(instantTime);
+ validateForLatestTimestampInternal(Either.left(metaClient),
metaClient.isMetadataTable(), instantTime);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 129ace5f8d1..f9b474eedd4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -140,7 +141,13 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
.collect(Collectors.toSet());
pendingClusteringInstantsToRollback.forEach(instant -> {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
- table.scheduleRollback(context, commitTime, instant, false,
config.shouldRollbackUsingMarkers(), false);
+ HoodieInstant rollbackInstant = new
HoodieInstant(HoodieInstant.State.INFLIGHT, commitTime,
HoodieTimeline.ROLLBACK_ACTION);
+ try {
+ txnManagerOption.ifPresent(txnManager ->
txnManager.beginTransaction(Option.of(rollbackInstant), Option.empty()));
+ table.scheduleRollback(context, commitTime, instant, false,
config.shouldRollbackUsingMarkers(), false);
+ } finally {
+ txnManagerOption.ifPresent(txnManager ->
txnManager.endTransaction(Option.of(rollbackInstant)));
+ }
table.rollback(context, commitTime, instant, true, true);
});
table.getMetaClient().reloadActiveTimeline();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 59421597013..179ee72c531 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -401,7 +402,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
// Rollback the log compaction commit.
HoodieInstant instant = new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionTimeStamp.get());
- getHoodieTable(metaClient,
config).rollbackInflightLogCompaction(instant);
+ getHoodieTable(metaClient,
config).rollbackInflightLogCompaction(instant, Option.of(new
TransactionManager(config, metaClient.getStorage())));
// Validate timeline.
HoodieTimeline activeTimeline = metaClient.reloadActiveTimeline();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 6eb9da120ce..310c851683d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -94,7 +94,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context,
metaClient);
hoodieTable.rollbackInflightCompaction(
- new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
compactionInstantTime));
+ new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
compactionInstantTime), Option.empty());
metaClient =
HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(cfg.getBasePath()).build();
pendingCompactionInstant =
metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline()
.getInstantsAsStream().findFirst().get();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackPlanActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackPlanActionExecutor.java
new file mode 100644
index 00000000000..8d9b37a74bd
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestRollbackPlanActionExecutor.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.rollback;
+
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.marker.WriteMarkers;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRollbackPlanActionExecutor extends
HoodieClientRollbackTestBase {
+ @BeforeEach
+ public void setUp() throws Exception {
+ initPath();
+ initSparkContexts();
+ initHoodieStorage();
+ initMetaClient();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ cleanupResources();
+ }
+
+ @Test
+ public void
testCopyOnWriteRollbackActionExecutorForFileListingAsGenerateFile() throws
Exception {
+ final String p1 = "2015/03/16";
+ final String p2 = "2015/03/17";
+ final String p3 = "2016/03/15";
+ // Let's create some commit files and base files
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient)
+ .withPartitionMetaFiles(p1, p2, p3)
+ .addCommit("001")
+ .withBaseFilesInPartition(p1, "id11").getLeft()
+ .withBaseFilesInPartition(p2, "id12").getLeft()
+ .withLogFile(p1, "id11", 3).getLeft();
+
+ Map<String, String> partitionAndFileId1 = new HashMap<String, String>() {
+ {
+ put(p1, "id21");
+ put(p2, "id22");
+ }
+ };
+ Map<String, List<Pair<String, Integer>>> partitionToFilesNameLengthMap1 =
new HashMap<>();
+ partitionAndFileId1.forEach((k, v) ->
partitionToFilesNameLengthMap1.put(k, Collections.singletonList(Pair.of(v,
100))));
+ HoodieCommitMetadata commitMetadata = testTable.doWriteOperation("002",
WriteOperationType.UPSERT, Arrays.asList(p1, p2, p3),
partitionToFilesNameLengthMap1,
+ false, true);
+
+ HoodieWriteConfig writeConfig =
getConfigBuilder().withRollbackUsingMarkers(true).withEmbeddedTimelineServerEnabled(false)
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL).build();
+ HoodieTable table = this.getHoodieTable(metaClient, writeConfig);
+
+ // create markers for inflight commit 002
+ WriteMarkers writeMarkers = WriteMarkersFactory.get(MarkerType.DIRECT,
table, "002");
+ writeMarkers.createIfNotExists(p1,
commitMetadata.getPartitionToWriteStats().get(p1).get(0).getPath().substring(11),
IOType.CREATE);
+ writeMarkers.createIfNotExists(p1,
commitMetadata.getPartitionToWriteStats().get(p2).get(0).getPath().substring(11),
IOType.CREATE);
+
+ HoodieInstant needRollBackInstant = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, "002");
+ String rollbackInstant1 = "003";
+ String rollbackInstant2 = "004";
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ CompletableFuture<Boolean> future1 = CompletableFuture.supplyAsync(new
Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ // trigger rollback planning by writer 1.
+ BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor
=
+ new BaseRollbackPlanActionExecutor(context, table.getConfig(),
table, rollbackInstant2, needRollBackInstant, false,
+ table.getConfig().shouldRollbackUsingMarkers(), false);
+ HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan)
copyOnWriteRollbackPlanActionExecutor.execute().get();
+ CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor
= new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table,
rollbackInstant2, needRollBackInstant, true,
+ false);
+ HoodieRollbackMetadata hoodieRollbackMetadata =
copyOnWriteRollbackActionExecutor.execute();
+ assertTrue(hoodieRollbackMetadata.getTotalFilesDeleted() > 0);
+ // once rollback planning is complete, count down the latch
+ countDownLatch.countDown();
+ return true;
+ }
+ }, executor);
+
+ CompletableFuture<Boolean> future2 = CompletableFuture.supplyAsync(new
Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ MockRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor
=
+ new MockRollbackPlanActionExecutor(context, table.getConfig(),
table, rollbackInstant1, needRollBackInstant, false,
+ table.getConfig().shouldRollbackUsingMarkers(), false,
countDownLatch);
+ // this execute will wait until count down latch reaches 0
successfully before proceeding.
+ Option<HoodieRollbackPlan> rollbackPlan =
(Option<HoodieRollbackPlan>) copyOnWriteRollbackPlanActionExecutor.execute();
+ assertTrue(rollbackPlan.isEmpty());
+ return true;
+ }
+ }, executor);
+
+ CompletableFuture.allOf(future1, future2).join();
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ /**
+ * Mock RollbackPlanActionExecutor to await on count down latch before
calling super.execute() for rollback planning.
+ */
+ class MockRollbackPlanActionExecutor extends BaseRollbackPlanActionExecutor {
+
+ private CountDownLatch countDownLatch;
+
+ public MockRollbackPlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table, String instantTime,
+ HoodieInstant instantToRollback,
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers, boolean
isRestore,
+ CountDownLatch countDownLatch) {
+ super(context, config, table, instantTime, instantToRollback,
skipTimelinePublish, shouldRollbackUsingMarkers, isRestore);
+ this.countDownLatch = countDownLatch;
+ }
+
+ @Override
+ public Option<HoodieRollbackPlan> execute() {
+ try {
+ if (countDownLatch.await(120, TimeUnit.SECONDS)) {
+ return super.execute();
+ } else {
+ throw new HoodieException("Hitting timeout waiting for countdown
latch to reach 0.");
+ }
+ } catch (InterruptedException e) {
+ throw new HoodieException("Interrupted exception thrown waiting for
countdown latch to reach 0.");
+ }
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index c08026946c0..cb60a2c1c9a 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -331,7 +331,7 @@ public class TestHoodieSparkMergeOnReadTableRollback
extends TestHoodieSparkRoll
assertTrue(listAllBaseFilesInPath(hoodieTable).stream()
.anyMatch(file -> compactedCommitTime.equals(new
HoodieBaseFile(file).getCommitTime())));
hoodieTable.rollbackInflightCompaction(new HoodieInstant(
- HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
compactedCommitTime));
+ HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
compactedCommitTime), Option.empty());
allFiles = listAllBaseFilesInPath(hoodieTable);
metaClient = HoodieTableMetaClient.reload(metaClient);
tableView = getHoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline(), allFiles);
@@ -1005,8 +1005,7 @@ public class TestHoodieSparkMergeOnReadTableRollback
extends TestHoodieSparkRoll
//writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
// Trigger a rollback of compaction
table.getActiveTimeline().reload();
- table.rollbackInflightCompaction(new HoodieInstant(
- HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
newCommitTime));
+ table.rollbackInflightCompaction(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
newCommitTime), Option.empty());
metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieSparkTable.create(config, context(), metaClient);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index 308ebf898a5..0f63cc31ddf 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -285,7 +285,8 @@ public class HoodieFlinkClusteringJob {
if
(table.getMetaClient().getActiveTimeline().containsInstant(inflightInstant)) {
LOG.info("Rollback inflight clustering instant: [" + clusteringInstant
+ "]");
table.rollbackInflightClustering(inflightInstant,
- commitToRollback ->
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
commitToRollback, false));
+ commitToRollback ->
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
commitToRollback, false),
+ Option.empty());
table.getMetaClient().reloadActiveTimeline();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 74fe7929607..c9c9eb28d35 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -247,7 +247,7 @@ public class HoodieFlinkCompactor {
HoodieInstant inflightInstant =
HoodieTimeline.getCompactionInflightInstant(timestamp);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
LOG.info("Rollback inflight compaction instant: [" + timestamp +
"]");
- table.rollbackInflightCompaction(inflightInstant);
+ table.rollbackInflightCompaction(inflightInstant, null);
table.getMetaClient().reloadActiveTimeline();
}
});
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index 6f0bb97a053..0c735ea305c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -95,7 +95,8 @@ public class ClusteringUtil {
inflightInstants.forEach(inflightInstant -> {
LOG.info("Rollback the inflight clustering instant: " + inflightInstant
+ " for failover");
table.rollbackInflightClustering(inflightInstant,
- commitToRollback ->
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
commitToRollback, false));
+ commitToRollback ->
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
commitToRollback, false),
+ Option.empty());
table.getMetaClient().reloadActiveTimeline();
});
}
@@ -112,7 +113,8 @@ public class ClusteringUtil {
if
(table.getMetaClient().reloadActiveTimeline().isPendingClusterInstant(instantTime))
{
LOG.warn("Rollback failed clustering instant: [" + instantTime + "]");
table.rollbackInflightClustering(inflightInstant,
- commitToRollback ->
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
commitToRollback, false));
+ commitToRollback ->
writeClient.getTableServiceClient().getPendingRollbackInfo(table.getMetaClient(),
commitToRollback, false),
+ Option.empty());
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 1927645d308..a1918ae447a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -168,7 +168,7 @@ public class CompactionUtil {
HoodieInstant inflightInstant =
HoodieTimeline.getCompactionInflightInstant(instantTime);
if
(table.getMetaClient().reloadActiveTimeline().filterPendingCompactionTimeline().containsInstant(inflightInstant))
{
LOG.warn("Rollback failed compaction instant: [" + instantTime + "]");
- table.rollbackInflightCompaction(inflightInstant);
+ table.rollbackInflightCompaction(inflightInstant, Option.empty());
}
}
@@ -184,7 +184,7 @@ public class CompactionUtil {
instant.getState() == HoodieInstant.State.INFLIGHT);
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
LOG.info("Rollback the inflight compaction instant: " + inflightInstant
+ " for failover");
- table.rollbackInflightCompaction(inflightInstant);
+ table.rollbackInflightCompaction(inflightInstant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
});
}
@@ -207,7 +207,7 @@ public class CompactionUtil {
int timeout = conf.getInteger(FlinkOptions.COMPACTION_TIMEOUT_SECONDS);
if (StreamerUtil.instantTimeDiffSeconds(currentTime,
instant.getTimestamp()) >= timeout) {
LOG.info("Rollback the inflight compaction instant: " + instant + "
for timeout(" + timeout + "s)");
- table.rollbackInflightCompaction(instant);
+ table.rollbackInflightCompaction(instant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
index 7f7c2fe0825..b0b1ebd4284 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala
@@ -100,7 +100,7 @@ class RunCleanProcedure extends BaseProcedure with
ProcedureBuilder with Logging
try {
client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath,
confs,
tableName.asInstanceOf[Option[String]])
- val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine,
skipLocking)
+ val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine)
if (hoodieCleanMeta == null) Seq.empty
else Seq(Row(hoodieCleanMeta.getStartCleanTime,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index e297bdae191..f5f0abf0c7e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -1899,7 +1899,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
new SparkRDDWriteClient(context,
writeConfig).getTableServiceClient
.getPendingRollbackInfo(table.getMetaClient,
commitToRollback, false)
}
- })
+ }, org.apache.hudi.common.util.Option.empty())
val requestedClustering =
metaClient.reloadActiveTimeline.getCommitsTimeline.lastInstant.get
assertTrue(requestedClustering.isRequested)
assertEquals(