This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 76b4fa404c72 feat(table-services): Support
hoodie.clustering.enable.expirations to allow cleanup of failed clustering
plans (intended for PreferWriterConflictResolutionStrategy) (#18302)
76b4fa404c72 is described below
commit 76b4fa404c724219f657cd64b788253bb31f23a4
Author: Krishen <[email protected]>
AuthorDate: Wed Apr 1 17:27:46 2026 -0700
feat(table-services): Support hoodie.clustering.enable.expirations to allow
cleanup of failed clustering plans (intended for
PreferWriterConflictResolutionStrategy) (#18302)
Adds opt-in support for automatically rolling back failed/stale clustering
instants during the rollbackFailedWrites flow (LAZY cleaning policy), and a
utility for partition-targeted rollback of failed clustering.
New Configurations:
hoodie.clustering.enable.expirations (default: false): Enables rollback of
incomplete clustering instants with expired heartbeats. Can only be applied if
PreferWriterConflictResolutionStrategy is the configured conflict resolution
strategy.
hoodie.clustering.expiration.time.mins (default: 60): Minimum age (in
minutes) a clustering instant must have before it is eligible for rollback.
Acts as a guardrail against ingestion jobs rolling back clustering operations
that a table service platform is anyway immediately attempting to rollback.
Behavioral Changes:
HoodieWriteConfig.autoAdjustConfigsForConcurrencyMode: When
PreferWriterConflictResolutionStrategy is enabled, the clustering updates
strategy is automatically set to SparkAllowUpdateStrategy so that ingestion
writes can proceed even when there is inflight clustering targeting the same
file groups.
BaseHoodieTableServiceClient.getInstantsToRollback: Under the LAZY failed
writes cleaning policy, eligible incomplete clustering instants (old enough,
config enabled, confirmed as clustering action) are now included in the
inflight stream before heartbeat-based expiry filtering.
BaseHoodieTableServiceClient.getInstantsToRollbackForLazyCleanPolicy: The
double-check after timeline reload now also considers the pending
replace/clustering timeline when the config is enabled, so that expired
clustering instants (if eligible) are not inadvertently filtered out.
New helper
BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback:
Encapsulates the check for whether an instant is a clustering instant (with
expired heartbeat) that is old enough and the rollback config is enabled.
BaseHoodieTableServiceClient.getPendingRollbackInfos: Uses the new helper
to allow re-attempting pending rollback plans for eligible clustering instants.
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
---
.../hudi/client/BaseHoodieTableServiceClient.java | 69 +++++-
.../apache/hudi/config/HoodieClusteringConfig.java | 37 +++-
.../org/apache/hudi/config/HoodieWriteConfig.java | 10 +
.../client/TestBaseHoodieTableServiceClient.java | 245 +++++++++++++++++++++
.../apache/hudi/config/TestHoodieWriteConfig.java | 83 +++++++
.../apache/hudi/utilities/HoodieClusteringJob.java | 68 +++++-
.../offlinejob/TestHoodieClusteringJob.java | 151 +++++++++++++
7 files changed, 650 insertions(+), 13 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 970df09cffea..26282affd177 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -29,6 +29,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.timeline.TimelineArchivers;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
@@ -43,6 +44,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -81,12 +83,17 @@ import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.text.ParseException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
import java.util.Collections;
+
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -625,6 +632,9 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs,
replaceCommitMetadata, HoodieActiveTimeline.CLUSTERING_ACTION)
);
}
+ if (config.isExpirationOfClusteringEnabled()) {
+ heartbeatClient.stop(clusteringCommitTime);
+ }
log.info("Clustering successfully on commit {} for table {}",
clusteringCommitTime, table.getConfig().getBasePath());
}
@@ -726,6 +736,10 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
Option<HoodieClusteringPlan> clusteringPlan = table
.scheduleClustering(context, instantTime, extraMetadata);
option = clusteringPlan.map(plan -> instantTime);
+ if (option.isPresent() && config.isExpirationOfClusteringEnabled()) {
+ heartbeatClient.start(instantTime);
+ log.info("Started heartbeat for clustering instant {}",
instantTime);
+ }
break;
case COMPACT:
log.info("Scheduling compaction at instant time: {} for table {}",
instantTime, config.getBasePath());
@@ -1007,15 +1021,18 @@ public abstract class BaseHoodieTableServiceClient<I,
T, O> extends BaseHoodieCl
String instantToRollback =
rollbackPlan.getInstantToRollback().getCommitTime();
if (ignoreCompactionAndClusteringInstants) {
if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
- InstantGenerator instantGenerator =
metaClient.getInstantGenerator();
- boolean isClustering =
ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(),
-
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, action,
instantToRollback), instantGenerator);
- if (!isClustering) {
- infoMap.putIfAbsent(instantToRollback, Option.of(new
HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
+ HoodieInstant instant = metaClient.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.INFLIGHT, action,
instantToRollback);
+ boolean isClustering = ClusteringUtils.isClusteringInstant(
+ metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator());
+ if (!isClustering ||
isClusteringInstantEligibleForRollback(metaClient, instant, config,
heartbeatClient)) {
+ infoMap.putIfAbsent(instantToRollback,
+ Option.of(new HoodiePendingRollbackInfo(rollbackInstant,
rollbackPlan)));
}
}
} else {
- infoMap.putIfAbsent(instantToRollback, Option.of(new
HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
+ infoMap.putIfAbsent(instantToRollback,
+ Option.of(new HoodiePendingRollbackInfo(rollbackInstant,
rollbackPlan)));
}
} catch (Exception e) {
log.warn("Processing rollback plan failed for {}, skip the plan",
rollbackInstant, e);
@@ -1132,6 +1149,20 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
}
}).collect(Collectors.toList());
} else if (cleaningPolicy.isLazy()) {
+ if (config.isExpirationOfClusteringEnabled()) {
+ Stream<HoodieInstant> eligibleClusteringInstants =
+ metaClient.getActiveTimeline().filterPendingClusteringTimeline()
+ .getInstantsAsStream()
+ .filter(instant -> {
+ try {
+ return isClusteringInstantEligibleForRollback(metaClient,
instant, config, heartbeatClient);
+ } catch (Exception e) {
+ log.warn("Failed to check clustering eligibility for
instant {}, skipping", instant.requestedTime(), e);
+ return false;
+ }
+ });
+ inflightInstantsStream = Stream.concat(inflightInstantsStream,
eligibleClusteringInstants);
+ }
return getInstantsToRollbackForLazyCleanPolicy(metaClient,
inflightInstantsStream);
} else if (cleaningPolicy.isNever()) {
return Collections.emptyList();
@@ -1155,13 +1186,35 @@ public abstract class BaseHoodieTableServiceClient<I,
T, O> extends BaseHoodieCl
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public static boolean isClusteringInstantEligibleForRollback(
+ HoodieTableMetaClient metaClient, HoodieInstant instant,
+ HoodieWriteConfig config, HoodieHeartbeatClient heartbeatClient) {
+ try {
+ return config.isExpirationOfClusteringEnabled()
+ && hasInstantExpired(metaClient, instant.requestedTime(),
config.getClusteringExpirationThresholdMins())
+ && heartbeatClient.isHeartbeatExpired(instant.requestedTime());
+ } catch (Exception e) {
+ throw new HoodieException("Failed to check heartbeat for clustering
instant " + instant.requestedTime(), e);
+ }
+ }
+
+ private static boolean hasInstantExpired(HoodieTableMetaClient metaClient,
String instantTime, long expirationMins) throws ParseException {
+ ZoneId zoneId =
metaClient.getTableConfig().getTimelineTimezone().getZoneId();
+ long currentTimeMs = ZonedDateTime.ofInstant(java.time.Instant.now(),
zoneId).toInstant().toEpochMilli();
+ long instantTimeMs =
HoodieInstantTimeGenerator.parseDateFromInstantTime(instantTime).toInstant().toEpochMilli();
+ long ageMs = currentTimeMs - instantTimeMs;
+ return ageMs >= TimeUnit.MINUTES.toMillis(expirationMins);
+ }
+
/**
* @param commitInstantTime Instant time of the commit
* @param pendingRollbackInfo pending rollback instant and plan if rollback
failed from previous attempt.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index f8505147df5c..fd713d73bb00 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -18,6 +18,7 @@
package org.apache.hudi.config;
+import
org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
@@ -26,6 +27,7 @@ import org.apache.hudi.common.config.EnumFieldDescription;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
@@ -243,14 +245,45 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Columns to sort the data by when clustering");
+ static final String SPARK_ALLOW_UPDATE_STRATEGY_CLASS_NAME =
+
"org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy";
+
+ static final String SPARK_REJECT_UPDATE_STRATEGY_CLASS_NAME =
+
"org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy";
+
public static final ConfigProperty<String> UPDATES_STRATEGY = ConfigProperty
.key("hoodie.clustering.updates.strategy")
-
.defaultValue("org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy")
+ .noDefaultValue()
+ .withInferFunction(cfg -> {
+ String strategy =
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
"");
+ if
(PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) {
+ return Option.of(SPARK_ALLOW_UPDATE_STRATEGY_CLASS_NAME);
+ }
+ return Option.of(SPARK_REJECT_UPDATE_STRATEGY_CLASS_NAME);
+ })
.markAdvanced()
.sinceVersion("0.7.0")
.withDocumentation("Determines how to handle updates, deletes to file
groups that are under clustering."
+ " Default strategy just rejects the update");
+ public static final ConfigProperty<Boolean> ENABLE_EXPIRATIONS =
ConfigProperty
+ .key("hoodie.clustering.enable.expirations")
+ .defaultValue(false)
+ .markAdvanced()
+ .withDocumentation("When enabled, rollback of failed writes (under LAZY
cleaning policy) will also attempt to rollback "
+ + "clustering replacecommit instants whose heartbeat has expired.
Clustering jobs will start a heartbeat before "
+ + "scheduling a plan, so that other writers can detect stale/failed
clustering attempts. Note that the same "
+ + "client must be used to schedule, execute, and commit the
clustering instant. And a clustering plan cannot be "
+ + "re-attempted");
+
+ public static final ConfigProperty<Long> EXPIRATION_THRESHOLD_MINS =
ConfigProperty
+ .key("hoodie.clustering.expiration.threshold.mins")
+ .defaultValue(60L)
+ .markAdvanced()
+ .withDocumentation("When hoodie.clustering.enable.expirations is
enabled, a clustering instant will not be "
+ + "considered expired unless its instant creation time is at least
this many minutes old. This serves as a guardrail to avoid "
+ + "unnecessary work in rolling back clustering instants that other
writers are already attempting to roll back.");
+
public static final ConfigProperty<String> SCHEDULE_INLINE_CLUSTERING =
ConfigProperty
.key("hoodie.clustering.schedule.inline")
.defaultValue("false")
@@ -490,7 +523,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
* @deprecated Use {@link #UPDATES_STRATEGY} and its methods instead
*/
@Deprecated
- public static final String DEFAULT_CLUSTERING_UPDATES_STRATEGY =
UPDATES_STRATEGY.defaultValue();
+ public static final String DEFAULT_CLUSTERING_UPDATES_STRATEGY =
SPARK_REJECT_UPDATE_STRATEGY_CLASS_NAME;
/**
* @deprecated Use {@link #ASYNC_CLUSTERING_ENABLE} and its methods instead
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e73bd4c6ffec..6090ccd245a7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -21,6 +21,7 @@ package org.apache.hudi.config;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
+
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
@@ -2685,6 +2686,14 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBooleanOrDefault(CLUSTERING_BLOCK_FOR_PENDING_INGESTION);
}
+ public boolean isExpirationOfClusteringEnabled() {
+ return getBooleanOrDefault(HoodieClusteringConfig.ENABLE_EXPIRATIONS);
+ }
+
+ public long getClusteringExpirationThresholdMins() {
+ return getLong(HoodieClusteringConfig.EXPIRATION_THRESHOLD_MINS);
+ }
+
/**
* File listing metadata configs.
*/
@@ -3721,6 +3730,7 @@ public class HoodieWriteConfig extends HoodieConfig {
HoodieFailedWritesCleaningPolicy.LAZY.name(),
writeConcurrencyMode.name());
}
+
}
private void validate() {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
index a178aad4a835..11ee95e35594 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
@@ -20,20 +20,25 @@ package org.apache.hudi.client;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.testutils.MockHoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.metrics.MetricsReporterType;
@@ -48,13 +53,18 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -87,12 +97,14 @@ class TestBaseHoodieTableServiceClient extends
HoodieCommonTestHarness {
String newInstantTime = InProcessTimeGenerator.createNewInstantTime();
HoodieTimeline pendingTimeline = new MockHoodieTimeline(Stream.empty(),
Stream.of(newInstantTime));
when(mockMetaClient.getCommitsTimeline().filterPendingExcludingCompaction()).thenReturn(pendingTimeline);
+
when(mockMetaClient.getActiveTimeline().filterInflightsAndRequested()).thenReturn(pendingTimeline);
when(mockMetaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants()).thenReturn(Collections.emptyList());
expectedRollbackInfo = Collections.singletonMap(newInstantTime,
Option.empty());
when(secondTable.getActiveTimeline()).thenReturn(timeline);
} else {
HoodieTimeline pendingTimeline = new MockHoodieTimeline(Stream.empty(),
Stream.empty());
when(mockMetaClient.getCommitsTimeline().filterPendingExcludingCompaction()).thenReturn(pendingTimeline);
+
when(mockMetaClient.getActiveTimeline().filterInflightsAndRequested()).thenReturn(pendingTimeline);
when(mockMetaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants()).thenReturn(Collections.emptyList());
expectedRollbackInfo = Collections.emptyMap();
when(firstTable.getActiveTimeline()).thenReturn(timeline);
@@ -242,6 +254,239 @@ class TestBaseHoodieTableServiceClient extends
HoodieCommonTestHarness {
verify(mockMetaClient).reloadActiveTimeline();
}
+ // --- Tests for clustering expiration logic ---
+
+ @Test
+ void isClusteringInstantEligibleForRollback_returnsFalseWhenConfigDisabled()
throws IOException {
+ initMetaClient();
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .build();
+ assertFalse(writeConfig.isExpirationOfClusteringEnabled());
+
+ TestTableServiceClient client = createSimpleTestClient(writeConfig);
+ HoodieInstant clusteringInstant = metaClient.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.CLUSTERING_ACTION, createOldInstantTime());
+
+
assertFalse(BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback(
+ metaClient, clusteringInstant, writeConfig,
client.getHeartbeatClient()));
+ }
+
+ // preTableVersion8=true → v6 (replacecommit), false → v9 (clustering)
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void
isClusteringInstantEligibleForRollback_returnsFalseWhenInstantTooRecent(boolean
preTableVersion8) throws IOException {
+ initMetaClient(preTableVersion8);
+ String clusteringAction = preTableVersion8 ?
HoodieTimeline.REPLACE_COMMIT_ACTION : HoodieTimeline.CLUSTERING_ACTION;
+ Properties props = new Properties();
+ props.setProperty(HoodieClusteringConfig.ENABLE_EXPIRATIONS.key(), "true");
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withProperties(props)
+ .build();
+
+ TestTableServiceClient client = createSimpleTestClient(writeConfig);
+
+ String recentTime = InProcessTimeGenerator.createNewInstantTime();
+ createClusteringInstant(recentTime, clusteringAction);
+ HoodieInstant inflightInstant = metaClient.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.INFLIGHT, clusteringAction,
recentTime);
+
+
assertFalse(BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback(
+ metaClient, inflightInstant, writeConfig,
client.getHeartbeatClient()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void isClusteringInstantEligibleForRollback_returnsTrueWhenEligible(boolean
preTableVersion8) throws IOException {
+ initMetaClient(preTableVersion8);
+ String clusteringAction = preTableVersion8 ?
HoodieTimeline.REPLACE_COMMIT_ACTION : HoodieTimeline.CLUSTERING_ACTION;
+ HoodieWriteConfig writeConfig = buildConfigWithClusteringExpiration(0L);
+
+ TestTableServiceClient client = createSimpleTestClient(writeConfig);
+
+ String oldTime = createOldInstantTime();
+ createClusteringInstant(oldTime, clusteringAction);
+ HoodieInstant inflightInstant = metaClient.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.INFLIGHT, clusteringAction,
oldTime);
+
+
assertTrue(BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback(
+ metaClient, inflightInstant, writeConfig,
client.getHeartbeatClient()));
+ }
+
+ @Test
+ void
isClusteringInstantEligibleForRollback_returnsFalseWhenExpirationDisabled()
throws IOException {
+ initMetaClient();
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .build();
+
+ TestTableServiceClient client = createSimpleTestClient(writeConfig);
+
+ String oldTime = createOldInstantTime();
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ HoodieInstant requestedInstant = metaClient.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLUSTERING_ACTION, oldTime);
+ timeline.createNewInstant(requestedInstant);
+ HoodieInstant inflightInstant = metaClient.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.CLUSTERING_ACTION, oldTime);
+ timeline.transitionClusterRequestedToInflight(requestedInstant,
Option.empty());
+ metaClient.reloadActiveTimeline();
+
+
assertFalse(BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback(
+ metaClient, inflightInstant, writeConfig,
client.getHeartbeatClient()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void
getInstantsToRollback_includesEligibleClusteringInstantsWithExpiredHeartbeat(boolean
preTableVersion8) throws IOException {
+ initMetaClient(preTableVersion8);
+ String clusteringAction = preTableVersion8 ?
HoodieTimeline.REPLACE_COMMIT_ACTION : HoodieTimeline.CLUSTERING_ACTION;
+ HoodieWriteConfig writeConfig =
buildConfigWithClusteringExpirationAndLazy(0L);
+
+ TestTableServiceClient client = createSimpleTestClient(writeConfig);
+
+ String oldClusteringTime = createOldInstantTime();
+ createClusteringInstant(oldClusteringTime, clusteringAction);
+
+ List<String> instants = client.getInstantsToRollback(
+ metaClient, HoodieFailedWritesCleaningPolicy.LAZY, Option.empty());
+
+ assertTrue(instants.contains(oldClusteringTime),
+ "Old clustering instant with expired heartbeat should be rolled back");
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void
getInstantsToRollback_skipsClusteringInstantsWithActiveHeartbeat(boolean
preTableVersion8) throws IOException {
+ initMetaClient(preTableVersion8);
+ String clusteringAction = preTableVersion8 ?
HoodieTimeline.REPLACE_COMMIT_ACTION : HoodieTimeline.CLUSTERING_ACTION;
+ HoodieWriteConfig writeConfig =
buildConfigWithClusteringExpirationAndLazy(0L);
+
+ TestTableServiceClient client = createSimpleTestClient(writeConfig);
+
+ String oldClusteringTime = createOldInstantTime();
+ createClusteringInstant(oldClusteringTime, clusteringAction);
+
+ client.getHeartbeatClient().start(oldClusteringTime);
+
+ List<String> instants = client.getInstantsToRollback(
+ metaClient, HoodieFailedWritesCleaningPolicy.LAZY, Option.empty());
+
+ assertFalse(instants.contains(oldClusteringTime),
+ "Clustering instant with active heartbeat should NOT be rolled back");
+
+ client.getHeartbeatClient().stop(oldClusteringTime);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void getInstantsToRollback_skipsRecentClusteringInstants(boolean
preTableVersion8) throws IOException {
+ initMetaClient(preTableVersion8);
+ String clusteringAction = preTableVersion8 ?
HoodieTimeline.REPLACE_COMMIT_ACTION : HoodieTimeline.CLUSTERING_ACTION;
+ HoodieWriteConfig writeConfig =
buildConfigWithClusteringExpirationAndLazy(60L);
+
+ TestTableServiceClient client = createSimpleTestClient(writeConfig);
+
+ String recentTime = InProcessTimeGenerator.createNewInstantTime();
+ createClusteringInstant(recentTime, clusteringAction);
+
+ List<String> instants = client.getInstantsToRollback(
+ metaClient, HoodieFailedWritesCleaningPolicy.LAZY, Option.empty());
+
+ assertFalse(instants.contains(recentTime),
+ "Recent clustering instant should NOT be rolled back (wait minutes
guardrail)");
+ }
+
+ @Test
+ void getInstantsToRollback_skipsClusteringInstantsWhenConfigDisabled()
throws IOException {
+ initMetaClient();
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .build())
+ .build();
+ assertFalse(writeConfig.isExpirationOfClusteringEnabled());
+
+ TestTableServiceClient client = createSimpleTestClient(writeConfig);
+
+ String oldClusteringTime = createOldInstantTime();
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ HoodieInstant requestedInstant = metaClient.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.CLUSTERING_ACTION, oldClusteringTime);
+ timeline.createNewInstant(requestedInstant);
+ timeline.transitionClusterRequestedToInflight(requestedInstant,
Option.empty());
+ metaClient.reloadActiveTimeline();
+
+ List<String> instants = client.getInstantsToRollback(
+ metaClient, HoodieFailedWritesCleaningPolicy.LAZY, Option.empty());
+
+ assertFalse(instants.contains(oldClusteringTime),
+ "Clustering instants should NOT be rolled back when config is
disabled");
+ }
+
+ private void createClusteringInstant(String instantTime, String
clusteringAction) {
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ HoodieInstant requestedInstant = metaClient.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.REQUESTED, clusteringAction,
instantTime);
+ if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(clusteringAction)) {
+ HoodieRequestedReplaceMetadata metadata =
HoodieRequestedReplaceMetadata.newBuilder()
+ .setOperationType(WriteOperationType.CLUSTER.name())
+ .setExtraMetadata(Collections.emptyMap())
+ .setClusteringPlan(HoodieClusteringPlan.newBuilder()
+ .setInputGroups(Collections.emptyList())
+ .setExtraMetadata(Collections.emptyMap())
+ .setVersion(1)
+ .build())
+ .build();
+ timeline.saveToPendingClusterCommit(requestedInstant, metadata);
+ } else {
+ timeline.createNewInstant(requestedInstant);
+ }
+ timeline.transitionClusterRequestedToInflight(requestedInstant,
Option.empty());
+ metaClient.reloadActiveTimeline();
+ }
+
+ private String createOldInstantTime() {
+ Date oldDate = new Date(System.currentTimeMillis() - 2 * 60 * 60 * 1000);
+ return TimelineUtils.formatDate(oldDate);
+ }
+
+ private HoodieWriteConfig buildConfigWithClusteringExpiration(long
expirationMins) {
+ Properties props = new Properties();
+ props.setProperty(HoodieClusteringConfig.ENABLE_EXPIRATIONS.key(), "true");
+ props.setProperty(HoodieClusteringConfig.EXPIRATION_THRESHOLD_MINS.key(),
String.valueOf(expirationMins));
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withProperties(props)
+ .build();
+ }
+
+ private HoodieWriteConfig buildConfigWithClusteringExpirationAndLazy(long
expirationMins) {
+ Properties props = new Properties();
+ props.setProperty(HoodieClusteringConfig.ENABLE_EXPIRATIONS.key(), "true");
+ props.setProperty(HoodieClusteringConfig.EXPIRATION_THRESHOLD_MINS.key(),
String.valueOf(expirationMins));
+ return HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withProperties(props)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .build())
+ .build();
+ }
+
+ private TestTableServiceClient createSimpleTestClient(HoodieWriteConfig
writeConfig) {
+ return new TestTableServiceClient(
+ writeConfig,
+ Collections.emptyIterator(),
+ Option.empty(),
+ Collections.emptyMap(),
+ Collections.emptyIterator());
+ }
+
+ // --- End clustering expiration tests ---
+
private static class TestTableServiceClient extends
BaseHoodieTableServiceClient<String, String, String> {
private final Iterator<HoodieTable<String, String, String, String>> tables;
// specify the expected rollback map
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 2a8d8d54a2b9..6c1d1418b190 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -825,4 +825,87 @@ public class TestHoodieWriteConfig {
assertEquals(expectedCleanPolicy,
writeConfig.getFailedWritesCleanPolicy());
assertEquals(expectedLockProviderName, writeConfig.getLockProviderClass());
}
+
+ @Test
+ public void testClusteringExpirationDefaultsToFalse() {
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .build();
+ assertFalse(writeConfig.isExpirationOfClusteringEnabled());
+ assertEquals(60L, writeConfig.getClusteringExpirationThresholdMins());
+ }
+
+ @Test
+ public void testClusteringExpirationExplicitlyEnabled() {
+ Properties props = new Properties();
+ props.setProperty(HoodieClusteringConfig.ENABLE_EXPIRATIONS.key(), "true");
+ props.setProperty(HoodieClusteringConfig.EXPIRATION_THRESHOLD_MINS.key(),
"30");
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withProperties(props)
+ .build();
+ assertTrue(writeConfig.isExpirationOfClusteringEnabled());
+ assertEquals(30L, writeConfig.getClusteringExpirationThresholdMins());
+ }
+
+ @Test
+ public void testClusteringExpirationNotInferredFromPreferWriterStrategy() {
+ Properties props = new Properties();
+
props.setProperty(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key(),
+
"org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy");
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withProperties(props)
+ .build();
+ assertFalse(writeConfig.isExpirationOfClusteringEnabled());
+ }
+
+ @Test
+ public void testUpdatesStrategyDefaultsToRejectStrategy() {
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .build();
+
assertEquals(HoodieClusteringConfig.SPARK_REJECT_UPDATE_STRATEGY_CLASS_NAME,
+ writeConfig.getClusteringUpdatesStrategyClass());
+ }
+
+ @Test
+ public void testUpdatesStrategyRejectWithOtherConflictStrategy() {
+ Properties props = new Properties();
+
props.setProperty(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key(),
+
"org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy");
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withProperties(props)
+ .build();
+
assertEquals(HoodieClusteringConfig.SPARK_REJECT_UPDATE_STRATEGY_CLASS_NAME,
+ writeConfig.getClusteringUpdatesStrategyClass());
+ }
+
+ @Test
+ public void testUpdatesStrategyInferredFromPreferWriterStrategy() {
+ Properties props = new Properties();
+
props.setProperty(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key(),
+
"org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy");
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withProperties(props)
+ .build();
+ assertEquals(HoodieClusteringConfig.SPARK_ALLOW_UPDATE_STRATEGY_CLASS_NAME,
+ writeConfig.getClusteringUpdatesStrategyClass());
+ }
+
+ @Test
+ public void testUpdatesStrategyNotOverriddenWhenExplicitlySet() {
+ Properties props = new Properties();
+
props.setProperty(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME.key(),
+
"org.apache.hudi.client.transaction.PreferWriterConflictResolutionStrategy");
+ String customStrategy = "org.apache.hudi.custom.MyCustomStrategy";
+ props.setProperty(HoodieClusteringConfig.UPDATES_STRATEGY.key(),
customStrategy);
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp")
+ .withProperties(props)
+ .build();
+ assertEquals(customStrategy,
writeConfig.getClusteringUpdatesStrategyClass());
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index 1a2478197593..c378bce9026c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -18,6 +18,8 @@
package org.apache.hudi.utilities;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.BaseHoodieTableServiceClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -25,9 +27,10 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.TableServiceUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TableServiceUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.exception.HoodieException;
@@ -40,7 +43,10 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
import static org.apache.hudi.utilities.UtilHelpers.PURGE_PENDING_INSTANT;
@@ -214,8 +220,6 @@ public class HoodieClusteringJob {
String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
Option.empty(), props)) {
if (StringUtils.isNullOrEmpty(cfg.clusteringInstantTime)) {
- // Instant time is not specified
- // Find the earliest scheduled clustering instant for execution
Option<HoodieInstant> firstClusteringInstant =
metaClient.getActiveTimeline().getFirstPendingClusterInstant();
if (firstClusteringInstant.isPresent()) {
@@ -289,6 +293,64 @@ public class HoodieClusteringJob {
return 0;
}
+ /**
+ * Returns the pending clustering instants that target any of the given
partitions.
+ *
+ * @param metaClient the table meta client
+ * @param partitions list of partition paths to check against pending
clustering plans
+ * @return list of clustering instants targeting the given partitions
+ */
+ public static List<HoodieInstant> getPendingClusteringInstantsForPartitions(
+ HoodieTableMetaClient metaClient,
+ List<String> partitions) {
+ Set<String> partitionSet = new HashSet<>(partitions);
+ Set<String> matchingInstantTimes =
ClusteringUtils.getAllPendingClusteringPlans(metaClient)
+ .filter(planPair -> {
+ HoodieClusteringPlan plan = planPair.getRight();
+ return plan.getInputGroups().stream()
+ .flatMap(group -> group.getSlices().stream())
+ .map(slice -> slice.getPartitionPath())
+ .anyMatch(partitionSet::contains);
+ })
+ .map(planPair -> planPair.getLeft().requestedTime())
+ .collect(Collectors.toSet());
+ return metaClient.getActiveTimeline().filterInflightsAndRequested()
+ .getInstantsAsStream()
+ .filter(instant ->
matchingInstantTimes.contains(instant.requestedTime()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Rolls back pending clustering instants that target any of the given
partitions,
+ * are eligible for rollback (config enabled, old enough, and a clustering
instant),
+ * and whose heartbeat has expired (indicating the clustering job is no
longer alive).
+ *
+ * @param client the write client to use for rollback operations
+ * @param metaClient the table meta client
+ * @param partitions list of partition paths to check against pending
clustering plans
+ */
+ public static void rollbackFailedClusteringForPartitions(
+ SparkRDDWriteClient<?> client,
+ HoodieTableMetaClient metaClient,
+ List<String> partitions) {
+ for (HoodieInstant instant :
getPendingClusteringInstantsForPartitions(metaClient, partitions)) {
+ if (!BaseHoodieTableServiceClient.isClusteringInstantEligibleForRollback(
+ metaClient, instant, client.getConfig(),
client.getHeartbeatClient())) {
+ throw new HoodieException("Clustering instant " +
instant.requestedTime()
+ + " targeting requested partitions is not eligible for rollback "
+ + "(heartbeat still active or instant too recent)");
+ }
+ // Reload timeline to handle the case where the instant committed and
cleaned up
+ // its heartbeat after the timeline was first loaded
+ metaClient.reloadActiveTimeline();
+ if (metaClient.getActiveTimeline().filterInflightsAndRequested()
+ .containsInstant(instant.requestedTime())) {
+ LOG.info("Rolling back expired clustering instant {}",
instant.requestedTime());
+ client.rollback(instant.requestedTime());
+ }
+ }
+ }
+
private void clean(SparkRDDWriteClient<?> client) {
if (!cfg.skipClean && client.getConfig().isAutoClean()) {
client.clean();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
index 38b936b34386..0e7ad0a43291 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
@@ -23,10 +23,14 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
@@ -37,6 +41,9 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
import static
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
@@ -44,8 +51,12 @@ import static
org.apache.hudi.common.table.HoodieTableMetaClient.TIMELINEFOLDER_
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
import static org.apache.hudi.utilities.UtilHelpers.PURGE_PENDING_INSTANT;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
import static
org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.deleteFileFromDfs;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for {@link HoodieClusteringJob}.
@@ -135,6 +146,122 @@ public class TestHoodieClusteringJob extends
HoodieOfflineJobTestBase {
"Must not contain any records w/ clustering instant time");
}
+ @Test
+ public void testGetPendingClusteringInstantsForPartitions() throws Exception
{
+ String tableBasePath = basePath + "/pendingClusteringPartitions";
+ Properties props = getPropertiesForKeyGen(true);
+ HoodieWriteConfig config = getWriteConfig(tableBasePath);
+ props.putAll(config.getProps());
+ metaClient = HoodieTableMetaClient.newTableBuilder()
+ .setTableType(HoodieTableType.COPY_ON_WRITE)
+ .setPayloadClass(HoodieAvroPayload.class)
+ .fromProperties(props)
+
.initTable(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()),
tableBasePath);
+ client = new SparkRDDWriteClient(context, config);
+
+ writeData(false, 100, true);
+ writeData(false, 100, true);
+
+ // Schedule clustering only (don't execute) — leaves a pending clustering
plan
+ HoodieClusteringJob hoodieCluster = init(tableBasePath, true, SCHEDULE,
false, false);
+ assertEquals(0, hoodieCluster.cluster(0));
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ // Verify a pending clustering plan exists
+
assertTrue(metaClient.getActiveTimeline().getFirstPendingClusterInstant().isPresent());
+
+ // getPendingClusteringInstantsForPartitions should find the instant when
queried with matching partitions
+ List<HoodieInstant> matchingInstants =
HoodieClusteringJob.getPendingClusteringInstantsForPartitions(
+ metaClient,
Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS));
+ assertFalse(matchingInstants.isEmpty(),
+ "Should find pending clustering instants for data partitions");
+
+ // getPendingClusteringInstantsForPartitions should return empty for
non-overlapping partitions
+ List<HoodieInstant> noMatchInstants =
HoodieClusteringJob.getPendingClusteringInstantsForPartitions(
+ metaClient, Collections.singletonList("non/existent/partition"));
+ assertTrue(noMatchInstants.isEmpty(),
+ "Should not find pending clustering instants for non-existent
partition");
+ }
+
+ @Test
+ public void testExpiredClusteringRolledBackForPartitions() throws Exception {
+ String tableBasePath = basePath + "/expiredClusteringRollback";
+ Properties props = getPropertiesForKeyGen(true);
+ HoodieWriteConfig config =
getWriteConfigWithClusteringExpiration(tableBasePath);
+ props.putAll(config.getProps());
+ metaClient = HoodieTableMetaClient.newTableBuilder()
+ .setTableType(HoodieTableType.COPY_ON_WRITE)
+ .setPayloadClass(HoodieAvroPayload.class)
+ .fromProperties(props)
+
.initTable(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()),
tableBasePath);
+ client = new SparkRDDWriteClient(context, config);
+
+ writeData(false, 100, true);
+ writeData(false, 100, true);
+
+ // Schedule clustering only
+ HoodieClusteringJob hoodieCluster = init(tableBasePath, true, SCHEDULE,
false, false);
+ assertEquals(0, hoodieCluster.cluster(0));
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ Option<HoodieInstant> pendingCluster =
metaClient.getActiveTimeline().getFirstPendingClusterInstant();
+ assertTrue(pendingCluster.isPresent());
+ String clusteringInstantTime = pendingCluster.get().requestedTime();
+
+ // No heartbeat file exists (simulates failed/dead clustering job) →
heartbeat expired
+ // rollbackFailedClusteringForPartitions should roll it back
+ try (SparkRDDWriteClient rollbackClient = new SparkRDDWriteClient(context,
config)) {
+ HoodieClusteringJob.rollbackFailedClusteringForPartitions(
+ rollbackClient, metaClient,
Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS));
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
assertFalse(metaClient.getActiveTimeline().getFirstPendingClusterInstant().isPresent(),
+ "Pending clustering instant should have been rolled back");
+ }
+
+ @Test
+ public void testClusteringExpirationSkipsInstantWithActiveHeartbeat() throws
Exception {
+ String tableBasePath = basePath + "/expirationSkipsActiveHeartbeat";
+ Properties props = getPropertiesForKeyGen(true);
+ HoodieWriteConfig config =
getWriteConfigWithClusteringExpiration(tableBasePath);
+ props.putAll(config.getProps());
+ metaClient = HoodieTableMetaClient.newTableBuilder()
+ .setTableType(HoodieTableType.COPY_ON_WRITE)
+ .setPayloadClass(HoodieAvroPayload.class)
+ .fromProperties(props)
+
.initTable(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()),
tableBasePath);
+ client = new SparkRDDWriteClient(context, config);
+
+ writeData(false, 100, true);
+ writeData(false, 100, true);
+
+ // Schedule clustering only
+ HoodieClusteringJob hoodieCluster = init(tableBasePath, true, SCHEDULE,
false, false);
+ assertEquals(0, hoodieCluster.cluster(0));
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ Option<HoodieInstant> pendingCluster =
metaClient.getActiveTimeline().getFirstPendingClusterInstant();
+ assertTrue(pendingCluster.isPresent());
+ String clusteringInstantTime = pendingCluster.get().requestedTime();
+
+ // Start a heartbeat for the clustering instant (simulates an alive
clustering job)
+ try (SparkRDDWriteClient rollbackClient = new SparkRDDWriteClient(context,
config)) {
+ rollbackClient.getHeartbeatClient().start(clusteringInstantTime);
+
+ assertThrows(HoodieException.class, () ->
+ HoodieClusteringJob.rollbackFailedClusteringForPartitions(
+ rollbackClient, metaClient,
Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS)));
+
+ rollbackClient.getHeartbeatClient().stop(clusteringInstantTime);
+ }
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
assertTrue(metaClient.getActiveTimeline().getFirstPendingClusterInstant().isPresent(),
+ "Pending clustering instant should NOT be rolled back when heartbeat
is active");
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
@@ -185,4 +312,28 @@ public class TestHoodieClusteringJob extends
HoodieOfflineJobTestBase {
.build();
}
+ private HoodieWriteConfig getWriteConfigWithClusteringExpiration(String
tableBasePath) {
+ Properties extraProps = new Properties();
+ extraProps.setProperty(HoodieClusteringConfig.ENABLE_EXPIRATIONS.key(),
"true");
+
extraProps.setProperty(HoodieClusteringConfig.EXPIRATION_THRESHOLD_MINS.key(),
"0");
+ return HoodieWriteConfig.newBuilder()
+ .forTable("asyncClustering")
+ .withPath(tableBasePath)
+ .withSchema(TRIP_EXAMPLE_SCHEMA)
+ .withParallelism(2, 2)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withInlineClustering(false)
+ .withScheduleInlineClustering(false)
+ .withAsyncClustering(false).build())
+ .withStorageConfig(HoodieStorageConfig.newBuilder()
+ .logFileMaxSize(1024).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).withAsyncClean(false).build())
+ .withProperties(extraProps)
+ .build();
+ }
+
}