nsivabalan commented on code in PR #4957:
URL: https://github.com/apache/hudi/pull/4957#discussion_r846926292


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -57,20 +83,260 @@ public ListingBasedRollbackStrategy(HoodieTable table,
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      List<ListingBasedRollbackRequest> rollbackRequests = null;
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
-        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
-            table.getMetaClient().getBasePath());
-      } else {
-        rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
-      }
-      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
-          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
-      return listingBasedRollbackRequests;
-    } catch (IOException e) {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      List<String> partitionPaths =
+          FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
+
+      HoodieTableType tableType = table.getMetaClient().getTableType();
+      String baseFileExtension = getBaseFileExtension(metaClient);
+      Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(metaClient, instantToRollback);
+      Boolean isCommitMetadataCompleted = 
checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional);
+
+      return context.flatMap(partitionPaths, partitionPath -> {
+        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
+        FileStatus[] filesToDelete =
+            fetchFilesFromInstant(instantToRollback, partitionPath, 
metaClient.getBasePath(), baseFileExtension,
+                metaClient.getFs(), commitMetadataOptional, 
isCommitMetadataCompleted);
+
+        if (HoodieTableType.COPY_ON_WRITE == tableType) {
+          hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
+        } else if (HoodieTableType.MERGE_ON_READ == tableType) {
+          String commit = instantToRollback.getTimestamp();
+          HoodieActiveTimeline activeTimeline = 
table.getMetaClient().reloadActiveTimeline();
+          switch (instantToRollback.getAction()) {
+            case HoodieTimeline.COMMIT_ACTION:
+            case HoodieTimeline.REPLACE_COMMIT_ACTION:
+              
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
+              break;
+            case HoodieTimeline.COMPACTION_ACTION:
+              // If there is no delta commit present after the current commit 
(if compaction), no action, else we
+              // need to make sure that a compaction commit rollback also 
deletes any log files written as part of the
+              // succeeding deltacommit.
+              boolean higherDeltaCommits =

Review Comment:
   should have been named noHigherDeltaCommits



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -57,20 +83,260 @@ public ListingBasedRollbackStrategy(HoodieTable table,
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      List<ListingBasedRollbackRequest> rollbackRequests = null;
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
-        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
-            table.getMetaClient().getBasePath());
-      } else {
-        rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
-      }
-      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
-          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
-      return listingBasedRollbackRequests;
-    } catch (IOException e) {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      List<String> partitionPaths =
+          FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
+
+      HoodieTableType tableType = table.getMetaClient().getTableType();
+      String baseFileExtension = getBaseFileExtension(metaClient);
+      Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(metaClient, instantToRollback);
+      Boolean isCommitMetadataCompleted = 
checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional);
+
+      return context.flatMap(partitionPaths, partitionPath -> {
+        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
+        FileStatus[] filesToDelete =
+            fetchFilesFromInstant(instantToRollback, partitionPath, 
metaClient.getBasePath(), baseFileExtension,
+                metaClient.getFs(), commitMetadataOptional, 
isCommitMetadataCompleted);
+
+        if (HoodieTableType.COPY_ON_WRITE == tableType) {
+          hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
+        } else if (HoodieTableType.MERGE_ON_READ == tableType) {
+          String commit = instantToRollback.getTimestamp();
+          HoodieActiveTimeline activeTimeline = 
table.getMetaClient().reloadActiveTimeline();
+          switch (instantToRollback.getAction()) {
+            case HoodieTimeline.COMMIT_ACTION:
+            case HoodieTimeline.REPLACE_COMMIT_ACTION:
+              
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
+              break;
+            case HoodieTimeline.COMPACTION_ACTION:
+              // If there is no delta commit present after the current commit 
(if compaction), no action, else we
+              // need to make sure that a compaction commit rollback also 
deletes any log files written as part of the
+              // succeeding deltacommit.
+              boolean higherDeltaCommits =
+                  
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
 1)
+                      .empty();
+              if (higherDeltaCommits) {
+                // Rollback of a compaction action with no higher deltacommit 
means that the compaction is scheduled
+                // and has not yet finished. In this scenario we should delete 
only the newly created base files
+                // and not corresponding base commit log files created with 
this as baseCommit since updates would
+                // have been written to the log files.
+                
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath,
+                    listFilesToBeDeleted(instantToRollback.getTimestamp(), 
baseFileExtension, partitionPath,
+                        metaClient.getFs())));
+              } else {
+                // No deltacommits present after this compaction commit 
(inflight or requested). In this case, we
+                // can also delete any log files that were created with this 
compaction commit as base

Review Comment:
   not sure the comment is clear. if there are no delta commits after the 
compaction commit that is being rolledback, why do we need to delete log files? 
there should not be any log files right. 
   
   Also, irrespective of whether there are any log files or not (if else 
condition from L121 to 134), we can just do 
   ```
   listFilesToBeDeleted(instantToRollback.getTimestamp(), baseFileExtension, 
partitionPath,
                           metaClient.getFs())
   ```
   for compaction commit which will return all files w/ matching commit time 
and we should be good right? or am I missing anything here ?
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -57,20 +83,260 @@ public ListingBasedRollbackStrategy(HoodieTable table,
   @Override
   public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant 
instantToRollback) {
     try {
-      List<ListingBasedRollbackRequest> rollbackRequests = null;
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
-        rollbackRequests = 
RollbackUtils.generateRollbackRequestsByListingCOW(context,
-            table.getMetaClient().getBasePath());
-      } else {
-        rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
-      }
-      List<HoodieRollbackRequest> listingBasedRollbackRequests = new 
ListingBasedRollbackHelper(table.getMetaClient(), config)
-          .getRollbackRequestsForRollbackPlan(context, instantToRollback, 
rollbackRequests);
-      return listingBasedRollbackRequests;
-    } catch (IOException e) {
+      HoodieTableMetaClient metaClient = table.getMetaClient();
+      List<String> partitionPaths =
+          FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getBasePath(), false, false);
+      int numPartitions = Math.max(Math.min(partitionPaths.size(), 
config.getRollbackParallelism()), 1);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan");
+
+      HoodieTableType tableType = table.getMetaClient().getTableType();
+      String baseFileExtension = getBaseFileExtension(metaClient);
+      Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(metaClient, instantToRollback);
+      Boolean isCommitMetadataCompleted = 
checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional);
+
+      return context.flatMap(partitionPaths, partitionPath -> {
+        List<HoodieRollbackRequest> hoodieRollbackRequests = new 
ArrayList<>(partitionPaths.size());
+        FileStatus[] filesToDelete =
+            fetchFilesFromInstant(instantToRollback, partitionPath, 
metaClient.getBasePath(), baseFileExtension,
+                metaClient.getFs(), commitMetadataOptional, 
isCommitMetadataCompleted);
+
+        if (HoodieTableType.COPY_ON_WRITE == tableType) {
+          hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
+        } else if (HoodieTableType.MERGE_ON_READ == tableType) {
+          String commit = instantToRollback.getTimestamp();
+          HoodieActiveTimeline activeTimeline = 
table.getMetaClient().reloadActiveTimeline();
+          switch (instantToRollback.getAction()) {
+            case HoodieTimeline.COMMIT_ACTION:
+            case HoodieTimeline.REPLACE_COMMIT_ACTION:
+              
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
+              break;
+            case HoodieTimeline.COMPACTION_ACTION:
+              // If there is no delta commit present after the current commit 
(if compaction), no action, else we
+              // need to make sure that a compaction commit rollback also 
deletes any log files written as part of the
+              // succeeding deltacommit.
+              boolean higherDeltaCommits =
+                  
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
 1)
+                      .empty();
+              if (higherDeltaCommits) {
+                // Rollback of a compaction action with no higher deltacommit 
means that the compaction is scheduled
+                // and has not yet finished. In this scenario we should delete 
only the newly created base files
+                // and not corresponding base commit log files created with 
this as baseCommit since updates would
+                // have been written to the log files.
+                
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath,
+                    listFilesToBeDeleted(instantToRollback.getTimestamp(), 
baseFileExtension, partitionPath,
+                        metaClient.getFs())));
+              } else {
+                // No deltacommits present after this compaction commit 
(inflight or requested). In this case, we
+                // can also delete any log files that were created with this 
compaction commit as base
+                // commit.
+                
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
+              }
+              break;
+            case HoodieTimeline.DELTA_COMMIT_ACTION:
+              // 
--------------------------------------------------------------------------------------------------
+              // (A) The following cases are possible if 
index.canIndexLogFiles and/or index.isGlobal
+              // 
--------------------------------------------------------------------------------------------------
+              // (A.1) Failed first commit - Inserts were written to log files 
and HoodieWriteStat has no entries. In
+              // this scenario we would want to delete these log files.
+              // (A.2) Failed recurring commit - Inserts/Updates written to 
log files. In this scenario,
+              // HoodieWriteStat will have the baseCommitTime for the first 
log file written, add rollback blocks.
+              // (A.3) Rollback triggered for first commit - Inserts were 
written to the log files but the commit is
+              // being reverted. In this scenario, HoodieWriteStat will be 
`null` for the attribute prevCommitTime and
+              // and hence will end up deleting these log files. This is done 
so there are no orphan log files
+              // lying around.
+              // (A.4) Rollback triggered for recurring commits - 
Inserts/Updates are being rolled back, the actions
+              // taken in this scenario is a combination of (A.2) and (A.3)
+              // 
---------------------------------------------------------------------------------------------------
+              // (B) The following cases are possible if 
!index.canIndexLogFiles and/or !index.isGlobal
+              // 
---------------------------------------------------------------------------------------------------
+              // (B.1) Failed first commit - Inserts were written to base 
files and HoodieWriteStat has no entries.
+              // In this scenario, we delete all the base files written for 
the failed commit.
+              // (B.2) Failed recurring commits - Inserts were written to base 
files and updates to log files. In
+              // this scenario, perform (A.1) and for updates written to log 
files, write rollback blocks.
+              // (B.3) Rollback triggered for first commit - Same as (B.1)
+              // (B.4) Rollback triggered for recurring commits - Same as 
(B.2) plus we need to delete the log files
+              // as well if the base base file gets deleted.
+              HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
+                  
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+                  HoodieCommitMetadata.class);
+
+              // In case all data was inserts and the commit failed, delete 
the file belonging to that commit
+              // We do not know fileIds for inserts (first inserts are either 
log files or base files),
+              // delete all files for the corresponding failed commit, if 
present (same as COW)
+              
hoodieRollbackRequests.add(getHoodieRollbackRequest(partitionPath, 
filesToDelete));
+
+              // append rollback blocks for updates and inserts as A.2 and B.2
+              if 
(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
+                hoodieRollbackRequests.addAll(
+                    getRollbackRequestToAppend(partitionPath, 
instantToRollback, commitMetadata, table));
+              }
+              break;
+            default:
+              throw new HoodieRollbackException("Unknown listing type, during 
rollback of " + instantToRollback);
+          }
+        } else {
+          throw new HoodieRollbackException(
+              String.format("Unsupported table type: %s, during listing 
rollback of %s", tableType, instantToRollback));
+        }
+        return hoodieRollbackRequests.stream();
+      }, numPartitions);
+    } catch (Exception e) {
       LOG.error("Generating rollback requests failed for " + 
instantToRollback.getTimestamp(), e);
       throw new HoodieRollbackException("Generating rollback requests failed 
for " + instantToRollback.getTimestamp(), e);
     }
   }
+
+  private String getBaseFileExtension(HoodieTableMetaClient metaClient) {
+    return metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+  }
+
+  @NotNull
+  private HoodieRollbackRequest getHoodieRollbackRequest(String partitionPath, 
FileStatus[] filesToDeletedStatus) {
+    List<String> filesToDelete = getFilesToBeDeleted(filesToDeletedStatus);
+    return new HoodieRollbackRequest(
+        partitionPath, EMPTY_STRING, EMPTY_STRING, filesToDelete, 
Collections.emptyMap());
+  }
+
+  @NotNull
+  private List<String> getFilesToBeDeleted(FileStatus[] 
dataFilesToDeletedStatus) {
+    return Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+      String dataFileToBeDeleted = fileStatus.getPath().toString();
+      // strip scheme E.g: file:/var/folders
+      return dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") + 
1);
+    }).collect(Collectors.toList());
+  }
+
+  private FileStatus[] listFilesToBeDeleted(String commit, String 
basefileExtension, String partitionPath,
+                                            FileSystem fs) throws IOException {
+    LOG.info("Collecting files to be cleaned/rolledback up for path " + 
partitionPath + " and commit " + commit);
+    PathFilter filter = (path) -> {
+      if (path.toString().contains(basefileExtension)) {
+        String fileCommitTime = FSUtils.getCommitTime(path.getName());
+        return commit.equals(fileCommitTime);
+      }
+      return false;
+    };
+    return fs.listStatus(FSUtils.getPartitionPath(config.getBasePath(), 
partitionPath), filter);
+  }
+
+  private FileStatus[] fetchFilesFromInstant(HoodieInstant instantToRollback, 
String partitionPath, String basePath,
+                                             String baseFileExtension, 
HoodieWrapperFileSystem fs,
+                                             Option<HoodieCommitMetadata> 
commitMetadataOptional,
+                                             Boolean 
isCommitMetadataCompleted) throws IOException {
+    if (isCommitMetadataCompleted) {
+      return fetchFilesFromCommitMetadata(instantToRollback, partitionPath, 
basePath, commitMetadataOptional.get(),
+          baseFileExtension, fs);
+    } else {
+      return fetchFilesFromListFiles(instantToRollback, partitionPath, 
basePath, baseFileExtension, fs);
+    }
+  }
+
+  private FileStatus[] fetchFilesFromCommitMetadata(HoodieInstant 
instantToRollback, String partitionPath,
+                                                    String basePath, 
HoodieCommitMetadata commitMetadata,
+                                                    String baseFileExtension, 
HoodieWrapperFileSystem fs)
+      throws IOException {
+    SerializablePathFilter pathFilter = 
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
+    Path[] filePaths = getFilesFromCommitMetadata(basePath, commitMetadata, 
partitionPath);
+
+    return fs.listStatus(filePaths, pathFilter);
+  }
+
+  private FileStatus[] fetchFilesFromListFiles(HoodieInstant 
instantToRollback, String partitionPath, String basePath,
+                                               String baseFileExtension, 
HoodieWrapperFileSystem fs)
+      throws IOException {
+    SerializablePathFilter pathFilter = 
getSerializablePathFilter(baseFileExtension, instantToRollback.getTimestamp());
+    Path[] filePaths = listFilesToBeDeleted(basePath, partitionPath);

Review Comment:
   shouldn't this be getFilesInPartitions() ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to