yihua commented on code in PR #18448:
URL: https://github.com/apache/hudi/pull/18448#discussion_r3034497041


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1194,14 +1226,17 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
         // is set to false since they are already deleted.
         // Execute rollback
         HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
-            ? table.rollback(context, rollbackInstantTime, 
commitInstantOpt.get(), true, skipLocking)
-            : table.rollback(context, rollbackInstantTime, 
table.getMetaClient().createNewInstant(
+            ? table.rollback(context, rollbackInstantTimeOpt.get(), 
commitInstantOpt.get(), true, skipLocking)
+            : table.rollback(context, rollbackInstantTimeOpt.get(), 
table.getMetaClient().createNewInstant(
                 HoodieInstant.State.INFLIGHT, 
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
             false, skipLocking);
         if (timerContext != null) {
           long durationInMs = metrics.getDurationInMs(timerContext.stop());
           metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
         }
+        if (config.isExclusiveRollbackEnabled()) {
+          heartbeatClient.stop(rollbackInstantTimeOpt.get());
+        }

Review Comment:
   If `table.rollback()` throws an exception after `heartbeatClient.start()` 
was called, the heartbeat is never stopped — `heartbeatClient.stop()` is only 
on the success path. This could block other writers from attempting the 
rollback until the heartbeat naturally expires. Could you move the `stop()` 
call into a `finally` block to ensure cleanup on failure?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1160,25 +1160,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;
+              }
+            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
+                .findFirst()).isEmpty()) {
+              // Assume rollback is already executed since the commit is no 
longer present in the timeline
+              return false;
+            }
+          } else {
+            // Case where no pending rollback is present,
+            if (commitInstantOpt.isEmpty()) {
+              log.error("Cannot find instant {} in the timeline of table {} 
for rollback", commitInstantTime, config.getBasePath());
+              return false;
+            }
+            rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> 
Option.of(createNewInstantTime(false)));

Review Comment:
   This `if (config.isExclusiveRollbackEnabled())` check inside the `else` 
branch is unreachable — we only enter the `else` when 
`isExclusiveRollbackEnabled()` is false. The `heartbeatClient.start()` call 
here will never execute. I suspect this scheduling + heartbeat logic was meant 
to live in the exclusive-rollback branch above (to handle the case where no 
pending rollback exists yet).



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1160,25 +1160,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now

Review Comment:
   nit: `// TODO: ABCDEFGHI revisit return value` — looks like a placeholder 
that slipped in. Worth cleaning up or replacing with a real JIRA ticket 
reference before merging.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1160,25 +1160,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;
+              }
+            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
+                .findFirst()).isEmpty()) {
+              // Assume rollback is already executed since the commit is no 
longer present in the timeline
+              return false;

Review Comment:
   When exclusive rollback is enabled and no pending rollback exists yet (first 
writer to schedule a rollback for a failed commit), this path falls through 
without setting `rollbackPlanOption` or `rollbackInstantTimeOpt`. Since 
`rollbackPlanOption` is initialized to `Option.empty()`, the method will always 
throw `HoodieRollbackException` for first-time rollbacks in exclusive mode. It 
looks like the scheduling logic (calling `scheduleRollback` + 
`heartbeatClient.start`) needs to be included in this `if 
(config.isExclusiveRollbackEnabled())` branch as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to