nsivabalan commented on code in PR #18302:
URL: https://github.com/apache/hudi/pull/18302#discussion_r2933860170
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1123,13 +1138,33 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
+ return config.isRollbackFailedClustering()
+ && isInstantOldEnough(instant.requestedTime(),
config.getRollbackFailedClusteringWaitMinutes())
+ && ClusteringUtils.isClusteringInstant(
+ metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator());
+ }
+
+ private static boolean isInstantOldEnough(String instantTime, long
waitMinutes) {
Review Comment:
`hasInstantExpired`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -974,17 +981,18 @@ protected Map<String, Option<HoodiePendingRollbackInfo>>
getPendingRollbackInfos
String action = rollbackPlan.getInstantToRollback().getAction();
String instantToRollback =
rollbackPlan.getInstantToRollback().getCommitTime();
if (ignoreCompactionAndClusteringInstants) {
- if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
- InstantGenerator instantGenerator =
metaClient.getInstantGenerator();
- boolean isClustering =
ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(),
-
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, action,
instantToRollback), instantGenerator);
- if (!isClustering) {
- infoMap.putIfAbsent(instantToRollback, Option.of(new
HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
- }
+ if (HoodieTimeline.COMPACTION_ACTION.equals(action)) {
+ continue;
+ }
+ HoodieInstant instant = metaClient.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.INFLIGHT, action,
instantToRollback);
+ if (!isClusteringInstantEligibleForRollback(metaClient, instant)
Review Comment:
can we split this up to keep it simple as before. current code structure is
bit confusing.
```
boolean isClusteringInstant = ClusteringUtils.isClusteringInstant(
metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator())
if (isClusteringInstant) {
if (!isClusteringInstantEligibleForRollback(metaClient, instant)) {
continue;
}
} else {
continue;
}
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -674,6 +675,35 @@ public class HoodieWriteConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Number of heartbeat misses, before a writer is
deemed not alive and all pending writes are aborted.");
+ public static final ConfigProperty<Boolean> ROLLBACK_FAILED_CLUSTERING =
ConfigProperty
+ .key("hoodie.rollback.failed.clustering")
+ .defaultValue(false)
+ .withInferFunction(cfg -> {
+ String strategy =
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
"");
+ if
(PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) {
+ return Option.of(true);
+ }
+ return Option.empty();
+ })
+ .markAdvanced()
+ .withDocumentation("When enabled, rollback of failed writes (under LAZY
cleaning policy) will also attempt to rollback "
+ + "clustering replacecommit instants whose heartbeat has expired.
This is automatically enabled when using "
+ + "PreferWriterConflictResolutionStrategy. Clustering jobs will
start a heartbeat before scheduling a plan, "
+ + "so that other writers can detect stale/failed clustering
attempts. Note that the same "
+ + "client must be used to schedule, execute, and commit the
clustering instant.");
+
+ public static final ConfigProperty<Long>
ROLLBACK_FAILED_CLUSTERING_WAIT_MINUTES = ConfigProperty
+ .key("hoodie.rollback.failed.clustering.wait.minutes")
Review Comment:
how about `hoodie.clustering.expiration.time.mins` ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -674,6 +675,35 @@ public class HoodieWriteConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Number of heartbeat misses, before a writer is
deemed not alive and all pending writes are aborted.");
+ public static final ConfigProperty<Boolean> ROLLBACK_FAILED_CLUSTERING =
ConfigProperty
Review Comment:
lets move this to HoodieClusteringConfig and name this
`hoodie.clustering.enable.expirations`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -694,6 +697,10 @@ Option<String> scheduleTableServiceInternal(Option<String>
providedInstantTime,
Option<HoodieClusteringPlan> clusteringPlan = table
.scheduleClustering(context, instantTime, extraMetadata);
option = clusteringPlan.map(plan -> instantTime);
+ if (option.isPresent() && config.isRollbackFailedClustering()) {
Review Comment:
using `config.isRollbackFailedClustering()` here does not sit well.
can we name something like
`config.isExpirationOfClusteringEnabled()`
this reads nicely and is understandable.
wdyt?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1123,13 +1138,33 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
+ return config.isRollbackFailedClustering()
+ && isInstantOldEnough(instant.requestedTime(),
config.getRollbackFailedClusteringWaitMinutes())
+ && ClusteringUtils.isClusteringInstant(
+ metaClient.getActiveTimeline(), instant,
metaClient.getInstantGenerator());
+ }
+
+ private static boolean isInstantOldEnough(String instantTime, long
waitMinutes) {
+ try {
+ Date instantDate = TimelineUtils.parseDateFromInstantTime(instantTime);
+ long ageMs = System.currentTimeMillis() - instantDate.getTime();
Review Comment:
Lets get the time zone from
hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone() and
calculate based on that?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -674,6 +675,35 @@ public class HoodieWriteConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Number of heartbeat misses, before a writer is
deemed not alive and all pending writes are aborted.");
+ public static final ConfigProperty<Boolean> ROLLBACK_FAILED_CLUSTERING =
ConfigProperty
+ .key("hoodie.rollback.failed.clustering")
+ .defaultValue(false)
+ .withInferFunction(cfg -> {
+ String strategy =
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
"");
+ if
(PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) {
+ return Option.of(true);
+ }
+ return Option.empty();
+ })
+ .markAdvanced()
+ .withDocumentation("When enabled, rollback of failed writes (under LAZY
cleaning policy) will also attempt to rollback "
+ + "clustering replacecommit instants whose heartbeat has expired.
This is automatically enabled when using "
+ + "PreferWriterConflictResolutionStrategy. Clustering jobs will
start a heartbeat before scheduling a plan, "
+ + "so that other writers can detect stale/failed clustering
attempts. Note that the same "
+ + "client must be used to schedule, execute, and commit the
clustering instant.");
+
+ public static final ConfigProperty<Long>
ROLLBACK_FAILED_CLUSTERING_WAIT_MINUTES = ConfigProperty
+ .key("hoodie.rollback.failed.clustering.wait.minutes")
+ .defaultValue(60L)
+ .markAdvanced()
+ .withDocumentation("When hoodie.rollback.failed.clustering is enabled,
rollbackFailedWrites will not attempt to rollback "
+ + "a clustering instant unless it is at least this many minutes old.
This is a temporary guardrail to reduce the chance "
+ + "of transient failures from concurrent rollback attempts until
https://github.com/apache/hudi/issues/18050 is resolved.");
Review Comment:
lets add talk about 18050.
we are going to get 18050 landed for 1.2 shortly.
So, why talk about it in the documentation.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -974,17 +981,18 @@ protected Map<String, Option<HoodiePendingRollbackInfo>>
getPendingRollbackInfos
String action = rollbackPlan.getInstantToRollback().getAction();
String instantToRollback =
rollbackPlan.getInstantToRollback().getCommitTime();
if (ignoreCompactionAndClusteringInstants) {
- if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
- InstantGenerator instantGenerator =
metaClient.getInstantGenerator();
- boolean isClustering =
ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(),
-
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, action,
instantToRollback), instantGenerator);
- if (!isClustering) {
- infoMap.putIfAbsent(instantToRollback, Option.of(new
HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
- }
+ if (HoodieTimeline.COMPACTION_ACTION.equals(action)) {
+ continue;
+ }
+ HoodieInstant instant = metaClient.getInstantGenerator()
+ .createNewInstant(HoodieInstant.State.INFLIGHT, action,
instantToRollback);
+ if (!isClusteringInstantEligibleForRollback(metaClient, instant)
Review Comment:
actually instead of `continue`, can we add it to list w/n the if condition
only.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -674,6 +675,35 @@ public class HoodieWriteConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Number of heartbeat misses, before a writer is
deemed not alive and all pending writes are aborted.");
+ public static final ConfigProperty<Boolean> ROLLBACK_FAILED_CLUSTERING =
ConfigProperty
+ .key("hoodie.rollback.failed.clustering")
+ .defaultValue(false)
+ .withInferFunction(cfg -> {
+ String strategy =
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
"");
+ if
(PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) {
+ return Option.of(true);
+ }
+ return Option.empty();
+ })
+ .markAdvanced()
+ .withDocumentation("When enabled, rollback of failed writes (under LAZY
cleaning policy) will also attempt to rollback "
+ + "clustering replacecommit instants whose heartbeat has expired.
This is automatically enabled when using "
+ + "PreferWriterConflictResolutionStrategy. Clustering jobs will
start a heartbeat before scheduling a plan, "
+ + "so that other writers can detect stale/failed clustering
attempts. Note that the same "
+ + "client must be used to schedule, execute, and commit the
clustering instant.");
+
+ public static final ConfigProperty<Long>
ROLLBACK_FAILED_CLUSTERING_WAIT_MINUTES = ConfigProperty
+ .key("hoodie.rollback.failed.clustering.wait.minutes")
+ .defaultValue(60L)
Review Comment:
which interval this config dictates.
is it, after the heart beat expires, we need to wait until X mins to trigger
rollback?
if yes, the documentation is not clear
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -674,6 +675,35 @@ public class HoodieWriteConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Number of heartbeat misses, before a writer is
deemed not alive and all pending writes are aborted.");
+ public static final ConfigProperty<Boolean> ROLLBACK_FAILED_CLUSTERING =
ConfigProperty
Review Comment:
Are there chances that we will enable expiration by clustering runners too
or you folks purely needed this from ingestion writer standpoint?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1123,13 +1138,33 @@ private List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
if (!expiredInstants.isEmpty()) {
// Only return instants that haven't been completed by other writers
metaClient.reloadActiveTimeline();
- HoodieTimeline refreshedInflightTimeline =
getInflightTimelineExcludeCompactionAndClustering(metaClient);
- return
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+ HoodieTimeline refreshedIncompleteTimeline =
metaClient.getActiveTimeline().filterInflightsAndRequested();
+ return expiredInstants.stream().filter(instantTime ->
+ refreshedIncompleteTimeline.containsInstant(instantTime)
+ ).collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
+ public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient
metaClient, HoodieInstant instant) {
Review Comment:
where are we checking for heart beats?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]