nsivabalan commented on code in PR #18302:
URL: https://github.com/apache/hudi/pull/18302#discussion_r2933860170


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1123,13 +1138,33 @@ private List<String> 
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
     if (!expiredInstants.isEmpty()) {
       // Only return instants that haven't been completed by other writers
       metaClient.reloadActiveTimeline();
-      HoodieTimeline refreshedInflightTimeline = 
getInflightTimelineExcludeCompactionAndClustering(metaClient);
-      return 
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+      HoodieTimeline refreshedIncompleteTimeline = 
metaClient.getActiveTimeline().filterInflightsAndRequested();
+      return expiredInstants.stream().filter(instantTime ->
+          refreshedIncompleteTimeline.containsInstant(instantTime)
+      ).collect(Collectors.toList());
     } else {
       return Collections.emptyList();
     }
   }
 
+  public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient 
metaClient, HoodieInstant instant) {
+    return config.isRollbackFailedClustering()
+        && isInstantOldEnough(instant.requestedTime(), 
config.getRollbackFailedClusteringWaitMinutes())
+        && ClusteringUtils.isClusteringInstant(
+            metaClient.getActiveTimeline(), instant, 
metaClient.getInstantGenerator());
+  }
+
+  private static boolean isInstantOldEnough(String instantTime, long 
waitMinutes) {

Review Comment:
   `hasInstantExpired` 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -974,17 +981,18 @@ protected Map<String, Option<HoodiePendingRollbackInfo>> 
getPendingRollbackInfos
         String action = rollbackPlan.getInstantToRollback().getAction();
         String instantToRollback = 
rollbackPlan.getInstantToRollback().getCommitTime();
         if (ignoreCompactionAndClusteringInstants) {
-          if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
-            InstantGenerator instantGenerator = 
metaClient.getInstantGenerator();
-            boolean isClustering = 
ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(),
-                
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, action, 
instantToRollback), instantGenerator);
-            if (!isClustering) {
-              infoMap.putIfAbsent(instantToRollback, Option.of(new 
HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
-            }
+          if (HoodieTimeline.COMPACTION_ACTION.equals(action)) {
+            continue;
+          }
+          HoodieInstant instant = metaClient.getInstantGenerator()
+              .createNewInstant(HoodieInstant.State.INFLIGHT, action, 
instantToRollback);
+          if (!isClusteringInstantEligibleForRollback(metaClient, instant)

Review Comment:
   can we split this up to keep it simple as before. current code structure is 
bit confusing. 
   
   ```
   boolean isClusteringInstant = ClusteringUtils.isClusteringInstant(
                     metaClient.getActiveTimeline(), instant, 
metaClient.getInstantGenerator())
   if (isClusteringInstant) {
       if (!isClusteringInstantEligibleForRollback(metaClient, instant)) {
       continue;
       }
   } else {
       continue;
   }
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -674,6 +675,35 @@ public class HoodieWriteConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Number of heartbeat misses, before a writer is 
deemed not alive and all pending writes are aborted.");
 
+  public static final ConfigProperty<Boolean> ROLLBACK_FAILED_CLUSTERING = 
ConfigProperty
+      .key("hoodie.rollback.failed.clustering")
+      .defaultValue(false)
+      .withInferFunction(cfg -> {
+        String strategy = 
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
 "");
+        if 
(PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) {
+          return Option.of(true);
+        }
+        return Option.empty();
+      })
+      .markAdvanced()
+      .withDocumentation("When enabled, rollback of failed writes (under LAZY 
cleaning policy) will also attempt to rollback "
+          + "clustering replacecommit instants whose heartbeat has expired. 
This is automatically enabled when using "
+          + "PreferWriterConflictResolutionStrategy. Clustering jobs will 
start a heartbeat before scheduling a plan, "
+          + "so that other writers can detect stale/failed clustering 
attempts. Note that the same "
+          + "client must be used to schedule, execute, and commit the 
clustering instant.");
+
+  public static final ConfigProperty<Long> 
ROLLBACK_FAILED_CLUSTERING_WAIT_MINUTES = ConfigProperty
+      .key("hoodie.rollback.failed.clustering.wait.minutes")

Review Comment:
   how about `hoodie.clustering.expiration.time.mins` ? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -674,6 +675,35 @@ public class HoodieWriteConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Number of heartbeat misses, before a writer is 
deemed not alive and all pending writes are aborted.");
 
+  public static final ConfigProperty<Boolean> ROLLBACK_FAILED_CLUSTERING = 
ConfigProperty

Review Comment:
   lets move this to HoodieClusteringConfig and name this
   `hoodie.clustering.enable.expirations` 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -694,6 +697,10 @@ Option<String> scheduleTableServiceInternal(Option<String> 
providedInstantTime,
           Option<HoodieClusteringPlan> clusteringPlan = table
               .scheduleClustering(context, instantTime, extraMetadata);
           option = clusteringPlan.map(plan -> instantTime);
+          if (option.isPresent() && config.isRollbackFailedClustering()) {

Review Comment:
   using `config.isRollbackFailedClustering()` here does not sit well. 
   
   can we name something like 
   `config.isExpirationOfClusteringEnabled()` 
   
   this reads nicely and is understandable. 
   wdyt? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1123,13 +1138,33 @@ private List<String> 
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
     if (!expiredInstants.isEmpty()) {
       // Only return instants that haven't been completed by other writers
       metaClient.reloadActiveTimeline();
-      HoodieTimeline refreshedInflightTimeline = 
getInflightTimelineExcludeCompactionAndClustering(metaClient);
-      return 
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+      HoodieTimeline refreshedIncompleteTimeline = 
metaClient.getActiveTimeline().filterInflightsAndRequested();
+      return expiredInstants.stream().filter(instantTime ->
+          refreshedIncompleteTimeline.containsInstant(instantTime)
+      ).collect(Collectors.toList());
     } else {
       return Collections.emptyList();
     }
   }
 
+  public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient 
metaClient, HoodieInstant instant) {
+    return config.isRollbackFailedClustering()
+        && isInstantOldEnough(instant.requestedTime(), 
config.getRollbackFailedClusteringWaitMinutes())
+        && ClusteringUtils.isClusteringInstant(
+            metaClient.getActiveTimeline(), instant, 
metaClient.getInstantGenerator());
+  }
+
+  private static boolean isInstantOldEnough(String instantTime, long 
waitMinutes) {
+    try {
+      Date instantDate = TimelineUtils.parseDateFromInstantTime(instantTime);
+      long ageMs = System.currentTimeMillis() - instantDate.getTime();

Review Comment:
   Lets get the time zone from 
hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone() and 
calculate based on that? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -674,6 +675,35 @@ public class HoodieWriteConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Number of heartbeat misses, before a writer is 
deemed not alive and all pending writes are aborted.");
 
+  public static final ConfigProperty<Boolean> ROLLBACK_FAILED_CLUSTERING = 
ConfigProperty
+      .key("hoodie.rollback.failed.clustering")
+      .defaultValue(false)
+      .withInferFunction(cfg -> {
+        String strategy = 
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
 "");
+        if 
(PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) {
+          return Option.of(true);
+        }
+        return Option.empty();
+      })
+      .markAdvanced()
+      .withDocumentation("When enabled, rollback of failed writes (under LAZY 
cleaning policy) will also attempt to rollback "
+          + "clustering replacecommit instants whose heartbeat has expired. 
This is automatically enabled when using "
+          + "PreferWriterConflictResolutionStrategy. Clustering jobs will 
start a heartbeat before scheduling a plan, "
+          + "so that other writers can detect stale/failed clustering 
attempts. Note that the same "
+          + "client must be used to schedule, execute, and commit the 
clustering instant.");
+
+  public static final ConfigProperty<Long> 
ROLLBACK_FAILED_CLUSTERING_WAIT_MINUTES = ConfigProperty
+      .key("hoodie.rollback.failed.clustering.wait.minutes")
+      .defaultValue(60L)
+      .markAdvanced()
+      .withDocumentation("When hoodie.rollback.failed.clustering is enabled, 
rollbackFailedWrites will not attempt to rollback "
+          + "a clustering instant unless it is at least this many minutes old. 
This is a temporary guardrail to reduce the chance "
+          + "of transient failures from concurrent rollback attempts until 
https://github.com/apache/hudi/issues/18050 is resolved.");

Review Comment:
   lets add talk about 18050. 
   we are going to get 18050 landed for 1.2 shortly. 
   So, why talk about it in the documentation. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -974,17 +981,18 @@ protected Map<String, Option<HoodiePendingRollbackInfo>> 
getPendingRollbackInfos
         String action = rollbackPlan.getInstantToRollback().getAction();
         String instantToRollback = 
rollbackPlan.getInstantToRollback().getCommitTime();
         if (ignoreCompactionAndClusteringInstants) {
-          if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
-            InstantGenerator instantGenerator = 
metaClient.getInstantGenerator();
-            boolean isClustering = 
ClusteringUtils.isClusteringInstant(metaClient.getActiveTimeline(),
-                
instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, action, 
instantToRollback), instantGenerator);
-            if (!isClustering) {
-              infoMap.putIfAbsent(instantToRollback, Option.of(new 
HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
-            }
+          if (HoodieTimeline.COMPACTION_ACTION.equals(action)) {
+            continue;
+          }
+          HoodieInstant instant = metaClient.getInstantGenerator()
+              .createNewInstant(HoodieInstant.State.INFLIGHT, action, 
instantToRollback);
+          if (!isClusteringInstantEligibleForRollback(metaClient, instant)

Review Comment:
   actually instead of `continue`, can we add it to list w/n the if condition 
only. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -674,6 +675,35 @@ public class HoodieWriteConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Number of heartbeat misses, before a writer is 
deemed not alive and all pending writes are aborted.");
 
+  public static final ConfigProperty<Boolean> ROLLBACK_FAILED_CLUSTERING = 
ConfigProperty
+      .key("hoodie.rollback.failed.clustering")
+      .defaultValue(false)
+      .withInferFunction(cfg -> {
+        String strategy = 
cfg.getStringOrDefault(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME,
 "");
+        if 
(PreferWriterConflictResolutionStrategy.class.getName().equals(strategy)) {
+          return Option.of(true);
+        }
+        return Option.empty();
+      })
+      .markAdvanced()
+      .withDocumentation("When enabled, rollback of failed writes (under LAZY 
cleaning policy) will also attempt to rollback "
+          + "clustering replacecommit instants whose heartbeat has expired. 
This is automatically enabled when using "
+          + "PreferWriterConflictResolutionStrategy. Clustering jobs will 
start a heartbeat before scheduling a plan, "
+          + "so that other writers can detect stale/failed clustering 
attempts. Note that the same "
+          + "client must be used to schedule, execute, and commit the 
clustering instant.");
+
+  public static final ConfigProperty<Long> 
ROLLBACK_FAILED_CLUSTERING_WAIT_MINUTES = ConfigProperty
+      .key("hoodie.rollback.failed.clustering.wait.minutes")
+      .defaultValue(60L)

Review Comment:
   which interval this config dictates. 
   is it, after the heart beat expires, we need to wait until X mins to trigger 
rollback? 
   
   if yes, the documentation is not clear 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -674,6 +675,35 @@ public class HoodieWriteConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Number of heartbeat misses, before a writer is 
deemed not alive and all pending writes are aborted.");
 
+  public static final ConfigProperty<Boolean> ROLLBACK_FAILED_CLUSTERING = 
ConfigProperty

Review Comment:
   Are there chances that we will enable expiration by clustering runners too 
or you folks purely needed this from ingestion writer standpoint? 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1123,13 +1138,33 @@ private List<String> 
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClie
     if (!expiredInstants.isEmpty()) {
       // Only return instants that haven't been completed by other writers
       metaClient.reloadActiveTimeline();
-      HoodieTimeline refreshedInflightTimeline = 
getInflightTimelineExcludeCompactionAndClustering(metaClient);
-      return 
expiredInstants.stream().filter(refreshedInflightTimeline::containsInstant).collect(Collectors.toList());
+      HoodieTimeline refreshedIncompleteTimeline = 
metaClient.getActiveTimeline().filterInflightsAndRequested();
+      return expiredInstants.stream().filter(instantTime ->
+          refreshedIncompleteTimeline.containsInstant(instantTime)
+      ).collect(Collectors.toList());
     } else {
       return Collections.emptyList();
     }
   }
 
+  public boolean isClusteringInstantEligibleForRollback(HoodieTableMetaClient 
metaClient, HoodieInstant instant) {

Review Comment:
   where are we checking for heart beats?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to