alexeykudinkin commented on a change in pull request #4957:
URL: https://github.com/apache/hudi/pull/4957#discussion_r837817036
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +81,252 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths =
+ FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ } else {
+ String commit = instantToRollback.getTimestamp();
+ HoodieActiveTimeline activeTimeline =
table.getMetaClient().reloadActiveTimeline();
+ switch (instantToRollback.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ break;
+ case HoodieTimeline.COMPACTION_ACTION:
+ // If there is no delta commit present after the current commit
(if compaction), no action, else we
+ // need to make sure that a compaction commit rollback also
deletes any log files written as part of the
+ // succeeding deltacommit.
+ boolean higherDeltaCommits =
+
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
1)
+ .empty();
+ if (higherDeltaCommits) {
+ // Rollback of a compaction action with no higher deltacommit
means that the compaction is scheduled
+ // and has not yet finished. In this scenario we should delete
only the newly created base files
+ // and not corresponding base commit log files created with
this as baseCommit since updates would
+ // have been written to the log files.
+ String baseFileExtension = getBaseFileExtension(metaClient);
+
hoodieRollbackRequests.add(deleteDataFilesOnlyRollbackRequest(instantToRollback,
baseFileExtension, partitionPath,
+ metaClient.getFs()));
+ } else {
+ // No deltacommits present after this compaction commit
(inflight or requested). In this case, we
+ // can also delete any log files that were created with this
compaction commit as base
+ // commit.
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ }
+ break;
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ //
--------------------------------------------------------------------------------------------------
+ // (A) The following cases are possible if
index.canIndexLogFiles and/or index.isGlobal
+ //
--------------------------------------------------------------------------------------------------
+ // (A.1) Failed first commit - Inserts were written to log files
and HoodieWriteStat has no entries. In
+ // this scenario we would want to delete these log files.
+ // (A.2) Failed recurring commit - Inserts/Updates written to
log files. In this scenario,
+ // HoodieWriteStat will have the baseCommitTime for the first
log file written, add rollback blocks.
+ // (A.3) Rollback triggered for first commit - Inserts were
written to the log files but the commit is
+ // being reverted. In this scenario, HoodieWriteStat will be
`null` for the attribute prevCommitTime and
+ // and hence will end up deleting these log files. This is done
so there are no orphan log files
+ // lying around.
+ // (A.4) Rollback triggered for recurring commits -
Inserts/Updates are being rolled back, the actions
+ // taken in this scenario is a combination of (A.2) and (A.3)
+ //
---------------------------------------------------------------------------------------------------
+ // (B) The following cases are possible if
!index.canIndexLogFiles and/or !index.isGlobal
+ //
---------------------------------------------------------------------------------------------------
+ // (B.1) Failed first commit - Inserts were written to base
files and HoodieWriteStat has no entries.
+ // In this scenario, we delete all the base files written for
the failed commit.
+ // (B.2) Failed recurring commits - Inserts were written to base
files and updates to log files. In
+ // this scenario, perform (A.1) and for updates written to log
files, write rollback blocks.
+ // (B.3) Rollback triggered for first commit - Same as (B.1)
+ // (B.4) Rollback triggered for recurring commits - Same as
(B.2) plus we need to delete the log files
+ // as well if the base base file gets deleted.
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+ // In case all data was inserts and the commit failed, delete
the file belonging to that commit
+ // We do not know fileIds for inserts (first inserts are either
log files or base files),
+ // delete all files for the corresponding failed commit, if
present (same as COW)
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+
+ // append rollback blocks for updates and inserts as A.2 and B.2
+ if
(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
+ hoodieRollbackRequests.addAll(
+ getRollbackRequestForAppend(partitionPath,
instantToRollback, commitMetadata, table));
Review comment:
nit: `getRollbackRequestToAppend`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +81,252 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths =
+ FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ } else {
Review comment:
Let's specify the conditional w/ all table-types being explicit like
following:
```
if (tableType == COW) { ... }
else if (tableType == MOR) { ... }
else { throw new Unsupported; }
```
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +81,252 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths =
+ FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ } else {
+ String commit = instantToRollback.getTimestamp();
+ HoodieActiveTimeline activeTimeline =
table.getMetaClient().reloadActiveTimeline();
+ switch (instantToRollback.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ break;
+ case HoodieTimeline.COMPACTION_ACTION:
+ // If there is no delta commit present after the current commit
(if compaction), no action, else we
+ // need to make sure that a compaction commit rollback also
deletes any log files written as part of the
+ // succeeding deltacommit.
+ boolean higherDeltaCommits =
+
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
1)
+ .empty();
+ if (higherDeltaCommits) {
+ // Rollback of a compaction action with no higher deltacommit
means that the compaction is scheduled
+ // and has not yet finished. In this scenario we should delete
only the newly created base files
+ // and not corresponding base commit log files created with
this as baseCommit since updates would
+ // have been written to the log files.
+ String baseFileExtension = getBaseFileExtension(metaClient);
+
hoodieRollbackRequests.add(deleteDataFilesOnlyRollbackRequest(instantToRollback,
baseFileExtension, partitionPath,
+ metaClient.getFs()));
+ } else {
+ // No deltacommits present after this compaction commit
(inflight or requested). In this case, we
+ // can also delete any log files that were created with this
compaction commit as base
+ // commit.
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ }
+ break;
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ //
--------------------------------------------------------------------------------------------------
+ // (A) The following cases are possible if
index.canIndexLogFiles and/or index.isGlobal
+ //
--------------------------------------------------------------------------------------------------
+ // (A.1) Failed first commit - Inserts were written to log files
and HoodieWriteStat has no entries. In
+ // this scenario we would want to delete these log files.
+ // (A.2) Failed recurring commit - Inserts/Updates written to
log files. In this scenario,
+ // HoodieWriteStat will have the baseCommitTime for the first
log file written, add rollback blocks.
+ // (A.3) Rollback triggered for first commit - Inserts were
written to the log files but the commit is
+ // being reverted. In this scenario, HoodieWriteStat will be
`null` for the attribute prevCommitTime and
+ // and hence will end up deleting these log files. This is done
so there are no orphan log files
+ // lying around.
+ // (A.4) Rollback triggered for recurring commits -
Inserts/Updates are being rolled back, the actions
+ // taken in this scenario is a combination of (A.2) and (A.3)
+ //
---------------------------------------------------------------------------------------------------
+ // (B) The following cases are possible if
!index.canIndexLogFiles and/or !index.isGlobal
+ //
---------------------------------------------------------------------------------------------------
+ // (B.1) Failed first commit - Inserts were written to base
files and HoodieWriteStat has no entries.
+ // In this scenario, we delete all the base files written for
the failed commit.
+ // (B.2) Failed recurring commits - Inserts were written to base
files and updates to log files. In
+ // this scenario, perform (A.1) and for updates written to log
files, write rollback blocks.
+ // (B.3) Rollback triggered for first commit - Same as (B.1)
+ // (B.4) Rollback triggered for recurring commits - Same as
(B.2) plus we need to delete the log files
+ // as well if the base base file gets deleted.
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+ // In case all data was inserts and the commit failed, delete
the file belonging to that commit
+ // We do not know fileIds for inserts (first inserts are either
log files or base files),
+ // delete all files for the corresponding failed commit, if
present (same as COW)
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+
+ // append rollback blocks for updates and inserts as A.2 and B.2
+ if
(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
+ hoodieRollbackRequests.addAll(
+ getRollbackRequestForAppend(partitionPath,
instantToRollback, commitMetadata, table));
+ }
+ break;
+ default:
+ throw new HoodieRollbackException("Unknown listing type, during
rollback of " + instantToRollback);
+ }
+ }
+ return hoodieRollbackRequests.stream();
+ }, numPartitions);
+ } catch (Exception e) {
LOG.error("Generating rollback requests failed for " +
instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed
for " + instantToRollback.getTimestamp(), e);
}
}
+
+ private String getBaseFileExtension(HoodieTableMetaClient metaClient) {
+ return metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ }
+
+ @NotNull
+ private HoodieRollbackRequest
deleteDataAndLogFilesRollbackRequest(HoodieInstant instantToRollback,
+
HoodieTableMetaClient metaClient,
+ String
partitionPath)
+ throws IOException {
+ final FileStatus[] filesToDeletedStatus =
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+ return getHoodieRollbackRequest(partitionPath, filesToDeletedStatus);
+ }
+
+ @NotNull
+ private HoodieRollbackRequest
deleteDataFilesOnlyRollbackRequest(HoodieInstant instantToRollback,
+ String
basefileExtension, String partitionPath,
+ FileSystem
fs)
+ throws IOException {
+ final FileStatus[] dataFilesToDeletedStatus =
+ getBaseFilesToBeDeleted(instantToRollback.getTimestamp(),
basefileExtension,
+ partitionPath, fs);
+ return getHoodieRollbackRequest(partitionPath, dataFilesToDeletedStatus);
+ }
+
+ @NotNull
+ private HoodieRollbackRequest getHoodieRollbackRequest(String partitionPath,
FileStatus[] filesToDeletedStatus) {
+ List<String> filesToBeDeleted = getFilesToBeDeleted(filesToDeletedStatus);
+ return new HoodieRollbackRequest(
+ partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted,
Collections.emptyMap());
+ }
+
+ @NotNull
+ private List<String> getFilesToBeDeleted(FileStatus[]
dataFilesToDeletedStatus) {
+ return Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+ String dataFileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") +
1);
Review comment:
Why do we need to strip the scheme?
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +81,252 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths =
+ FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ } else {
+ String commit = instantToRollback.getTimestamp();
+ HoodieActiveTimeline activeTimeline =
table.getMetaClient().reloadActiveTimeline();
+ switch (instantToRollback.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ break;
+ case HoodieTimeline.COMPACTION_ACTION:
+ // If there is no delta commit present after the current commit
(if compaction), no action, else we
+ // need to make sure that a compaction commit rollback also
deletes any log files written as part of the
+ // succeeding deltacommit.
+ boolean higherDeltaCommits =
+
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
1)
+ .empty();
+ if (higherDeltaCommits) {
+ // Rollback of a compaction action with no higher deltacommit
means that the compaction is scheduled
+ // and has not yet finished. In this scenario we should delete
only the newly created base files
+ // and not corresponding base commit log files created with
this as baseCommit since updates would
+ // have been written to the log files.
+ String baseFileExtension = getBaseFileExtension(metaClient);
+
hoodieRollbackRequests.add(deleteDataFilesOnlyRollbackRequest(instantToRollback,
baseFileExtension, partitionPath,
+ metaClient.getFs()));
+ } else {
+ // No deltacommits present after this compaction commit
(inflight or requested). In this case, we
+ // can also delete any log files that were created with this
compaction commit as base
+ // commit.
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ }
+ break;
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ //
--------------------------------------------------------------------------------------------------
+ // (A) The following cases are possible if
index.canIndexLogFiles and/or index.isGlobal
+ //
--------------------------------------------------------------------------------------------------
+ // (A.1) Failed first commit - Inserts were written to log files
and HoodieWriteStat has no entries. In
+ // this scenario we would want to delete these log files.
+ // (A.2) Failed recurring commit - Inserts/Updates written to
log files. In this scenario,
+ // HoodieWriteStat will have the baseCommitTime for the first
log file written, add rollback blocks.
+ // (A.3) Rollback triggered for first commit - Inserts were
written to the log files but the commit is
+ // being reverted. In this scenario, HoodieWriteStat will be
`null` for the attribute prevCommitTime and
+ // and hence will end up deleting these log files. This is done
so there are no orphan log files
+ // lying around.
+ // (A.4) Rollback triggered for recurring commits -
Inserts/Updates are being rolled back, the actions
+ // taken in this scenario is a combination of (A.2) and (A.3)
+ //
---------------------------------------------------------------------------------------------------
+ // (B) The following cases are possible if
!index.canIndexLogFiles and/or !index.isGlobal
+ //
---------------------------------------------------------------------------------------------------
+ // (B.1) Failed first commit - Inserts were written to base
files and HoodieWriteStat has no entries.
+ // In this scenario, we delete all the base files written for
the failed commit.
+ // (B.2) Failed recurring commits - Inserts were written to base
files and updates to log files. In
+ // this scenario, perform (A.1) and for updates written to log
files, write rollback blocks.
+ // (B.3) Rollback triggered for first commit - Same as (B.1)
+ // (B.4) Rollback triggered for recurring commits - Same as
(B.2) plus we need to delete the log files
+ // as well if the base base file gets deleted.
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+ // In case all data was inserts and the commit failed, delete
the file belonging to that commit
+ // We do not know fileIds for inserts (first inserts are either
log files or base files),
+ // delete all files for the corresponding failed commit, if
present (same as COW)
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+
+ // append rollback blocks for updates and inserts as A.2 and B.2
+ if
(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
+ hoodieRollbackRequests.addAll(
+ getRollbackRequestForAppend(partitionPath,
instantToRollback, commitMetadata, table));
+ }
+ break;
+ default:
+ throw new HoodieRollbackException("Unknown listing type, during
rollback of " + instantToRollback);
+ }
+ }
+ return hoodieRollbackRequests.stream();
+ }, numPartitions);
+ } catch (Exception e) {
LOG.error("Generating rollback requests failed for " +
instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed
for " + instantToRollback.getTimestamp(), e);
}
}
+
+ private String getBaseFileExtension(HoodieTableMetaClient metaClient) {
+ return metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ }
+
+ @NotNull
+ private HoodieRollbackRequest
deleteDataAndLogFilesRollbackRequest(HoodieInstant instantToRollback,
Review comment:
We should inline those methods, given that we're going to be supporting
both rollbacks of completed instance as well as pending ones:
```
getHoodieRollbackRequest(partitionPath,
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath))
```
will become (for completed instants)
```
getHoodieRollbackRequest(partitionPath,
fetchFilesFromCommitMetadata(metaClient, instantToRollback, partitionPath))
```
for pending ones
```
getHoodieRollbackRequest(partitionPath,
listFilesToBeDeleted(instantToRollback.getTimestamp(), basefileExtension,
partitionPath, fs))
```
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +81,252 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths =
+ FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ } else {
+ String commit = instantToRollback.getTimestamp();
+ HoodieActiveTimeline activeTimeline =
table.getMetaClient().reloadActiveTimeline();
+ switch (instantToRollback.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ break;
+ case HoodieTimeline.COMPACTION_ACTION:
+ // If there is no delta commit present after the current commit
(if compaction), no action, else we
+ // need to make sure that a compaction commit rollback also
deletes any log files written as part of the
+ // succeeding deltacommit.
+ boolean higherDeltaCommits =
+
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
1)
+ .empty();
+ if (higherDeltaCommits) {
+ // Rollback of a compaction action with no higher deltacommit
means that the compaction is scheduled
+ // and has not yet finished. In this scenario we should delete
only the newly created base files
+ // and not corresponding base commit log files created with
this as baseCommit since updates would
+ // have been written to the log files.
+ String baseFileExtension = getBaseFileExtension(metaClient);
+
hoodieRollbackRequests.add(deleteDataFilesOnlyRollbackRequest(instantToRollback,
baseFileExtension, partitionPath,
+ metaClient.getFs()));
+ } else {
+ // No deltacommits present after this compaction commit
(inflight or requested). In this case, we
+ // can also delete any log files that were created with this
compaction commit as base
+ // commit.
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ }
+ break;
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ //
--------------------------------------------------------------------------------------------------
+ // (A) The following cases are possible if
index.canIndexLogFiles and/or index.isGlobal
+ //
--------------------------------------------------------------------------------------------------
+ // (A.1) Failed first commit - Inserts were written to log files
and HoodieWriteStat has no entries. In
+ // this scenario we would want to delete these log files.
+ // (A.2) Failed recurring commit - Inserts/Updates written to
log files. In this scenario,
+ // HoodieWriteStat will have the baseCommitTime for the first
log file written, add rollback blocks.
+ // (A.3) Rollback triggered for first commit - Inserts were
written to the log files but the commit is
+ // being reverted. In this scenario, HoodieWriteStat will be
`null` for the attribute prevCommitTime and
+ // and hence will end up deleting these log files. This is done
so there are no orphan log files
+ // lying around.
+ // (A.4) Rollback triggered for recurring commits -
Inserts/Updates are being rolled back, the actions
+ // taken in this scenario is a combination of (A.2) and (A.3)
+ //
---------------------------------------------------------------------------------------------------
+ // (B) The following cases are possible if
!index.canIndexLogFiles and/or !index.isGlobal
+ //
---------------------------------------------------------------------------------------------------
+ // (B.1) Failed first commit - Inserts were written to base
files and HoodieWriteStat has no entries.
+ // In this scenario, we delete all the base files written for
the failed commit.
+ // (B.2) Failed recurring commits - Inserts were written to base
files and updates to log files. In
+ // this scenario, perform (A.1) and for updates written to log
files, write rollback blocks.
+ // (B.3) Rollback triggered for first commit - Same as (B.1)
+ // (B.4) Rollback triggered for recurring commits - Same as
(B.2) plus we need to delete the log files
+ // as well if the base base file gets deleted.
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+ // In case all data was inserts and the commit failed, delete
the file belonging to that commit
+ // We do not know fileIds for inserts (first inserts are either
log files or base files),
+ // delete all files for the corresponding failed commit, if
present (same as COW)
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+
+ // append rollback blocks for updates and inserts as A.2 and B.2
+ if
(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
+ hoodieRollbackRequests.addAll(
+ getRollbackRequestForAppend(partitionPath,
instantToRollback, commitMetadata, table));
+ }
+ break;
+ default:
+ throw new HoodieRollbackException("Unknown listing type, during
rollback of " + instantToRollback);
+ }
+ }
+ return hoodieRollbackRequests.stream();
+ }, numPartitions);
+ } catch (Exception e) {
LOG.error("Generating rollback requests failed for " +
instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed
for " + instantToRollback.getTimestamp(), e);
}
}
+
+ private String getBaseFileExtension(HoodieTableMetaClient metaClient) {
+ return metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ }
+
+ @NotNull
+ private HoodieRollbackRequest
deleteDataAndLogFilesRollbackRequest(HoodieInstant instantToRollback,
+
HoodieTableMetaClient metaClient,
+ String
partitionPath)
+ throws IOException {
+ final FileStatus[] filesToDeletedStatus =
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+ return getHoodieRollbackRequest(partitionPath, filesToDeletedStatus);
+ }
+
+ @NotNull
+ private HoodieRollbackRequest
deleteDataFilesOnlyRollbackRequest(HoodieInstant instantToRollback,
+ String
basefileExtension, String partitionPath,
+ FileSystem
fs)
+ throws IOException {
+ final FileStatus[] dataFilesToDeletedStatus =
+ getBaseFilesToBeDeleted(instantToRollback.getTimestamp(),
basefileExtension,
+ partitionPath, fs);
+ return getHoodieRollbackRequest(partitionPath, dataFilesToDeletedStatus);
+ }
+
+ @NotNull
+ private HoodieRollbackRequest getHoodieRollbackRequest(String partitionPath,
FileStatus[] filesToDeletedStatus) {
+ List<String> filesToBeDeleted = getFilesToBeDeleted(filesToDeletedStatus);
+ return new HoodieRollbackRequest(
+ partitionPath, EMPTY_STRING, EMPTY_STRING, filesToBeDeleted,
Collections.emptyMap());
+ }
+
+ @NotNull
+ private List<String> getFilesToBeDeleted(FileStatus[]
dataFilesToDeletedStatus) {
+ return Arrays.stream(dataFilesToDeletedStatus).map(fileStatus -> {
+ String dataFileToBeDeleted = fileStatus.getPath().toString();
+ // strip scheme
+ return dataFileToBeDeleted.substring(dataFileToBeDeleted.indexOf(":") +
1);
+ }).collect(Collectors.toList());
+ }
+
+ private FileStatus[] getBaseFilesToBeDeleted(String commit, String
basefileExtension, String partitionPath,
Review comment:
Let's make naming more aliased with an actions we're performing, for ex:
- `getBaseFilesToBeDeleted` > `listFilesToBeDeleted`
- `getBaseAndLogFilesToBeDeleted` > `fetchFilesFromCommitMetadata`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
##########
@@ -126,12 +114,12 @@ protected void recreateMarkers(final String
commitInstantTime,
}
}
- List<HoodieRollbackStat> getListBasedRollBackStats(
- HoodieTableMetaClient metaClient, HoodieWriteConfig config,
HoodieEngineContext context,
- Option<HoodieInstant> commitInstantOpt,
List<ListingBasedRollbackRequest> rollbackRequests) {
- List<HoodieRollbackRequest> hoodieRollbackRequests = new
ListingBasedRollbackHelper(metaClient, config)
- .getRollbackRequestsForRollbackPlan(context, commitInstantOpt.get(),
rollbackRequests);
- return new BaseRollbackHelper(metaClient,
config).collectRollbackStats(context, commitInstantOpt.get(),
hoodieRollbackRequests);
+ List<HoodieRollbackStat> getListBasedRollBackStats(HoodieTable table,
HoodieEngineContext context, Option<HoodieInstant> commitInstantOpt) {
Review comment:
Please specify generic params `HoodieTable<?, ?, ?, ?>`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +81,252 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths =
+ FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ } else {
+ String commit = instantToRollback.getTimestamp();
+ HoodieActiveTimeline activeTimeline =
table.getMetaClient().reloadActiveTimeline();
+ switch (instantToRollback.getAction()) {
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.REPLACE_COMMIT_ACTION:
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ break;
+ case HoodieTimeline.COMPACTION_ACTION:
+ // If there is no delta commit present after the current commit
(if compaction), no action, else we
+ // need to make sure that a compaction commit rollback also
deletes any log files written as part of the
+ // succeeding deltacommit.
+ boolean higherDeltaCommits =
+
!activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(commit,
1)
+ .empty();
+ if (higherDeltaCommits) {
+ // Rollback of a compaction action with no higher deltacommit
means that the compaction is scheduled
+ // and has not yet finished. In this scenario we should delete
only the newly created base files
+ // and not corresponding base commit log files created with
this as baseCommit since updates would
+ // have been written to the log files.
+ String baseFileExtension = getBaseFileExtension(metaClient);
+
hoodieRollbackRequests.add(deleteDataFilesOnlyRollbackRequest(instantToRollback,
baseFileExtension, partitionPath,
+ metaClient.getFs()));
+ } else {
+ // No deltacommits present after this compaction commit
(inflight or requested). In this case, we
+ // can also delete any log files that were created with this
compaction commit as base
+ // commit.
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+ }
+ break;
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ //
--------------------------------------------------------------------------------------------------
+ // (A) The following cases are possible if
index.canIndexLogFiles and/or index.isGlobal
+ //
--------------------------------------------------------------------------------------------------
+ // (A.1) Failed first commit - Inserts were written to log files
and HoodieWriteStat has no entries. In
+ // this scenario we would want to delete these log files.
+ // (A.2) Failed recurring commit - Inserts/Updates written to
log files. In this scenario,
+ // HoodieWriteStat will have the baseCommitTime for the first
log file written, add rollback blocks.
+ // (A.3) Rollback triggered for first commit - Inserts were
written to the log files but the commit is
+ // being reverted. In this scenario, HoodieWriteStat will be
`null` for the attribute prevCommitTime and
+ // and hence will end up deleting these log files. This is done
so there are no orphan log files
+ // lying around.
+ // (A.4) Rollback triggered for recurring commits -
Inserts/Updates are being rolled back, the actions
+ // taken in this scenario is a combination of (A.2) and (A.3)
+ //
---------------------------------------------------------------------------------------------------
+ // (B) The following cases are possible if
!index.canIndexLogFiles and/or !index.isGlobal
+ //
---------------------------------------------------------------------------------------------------
+ // (B.1) Failed first commit - Inserts were written to base
files and HoodieWriteStat has no entries.
+ // In this scenario, we delete all the base files written for
the failed commit.
+ // (B.2) Failed recurring commits - Inserts were written to base
files and updates to log files. In
+ // this scenario, perform (A.1) and for updates written to log
files, write rollback blocks.
+ // (B.3) Rollback triggered for first commit - Same as (B.1)
+ // (B.4) Rollback triggered for recurring commits - Same as
(B.2) plus we need to delete the log files
+ // as well if the base base file gets deleted.
+ HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(
+
table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(),
+ HoodieCommitMetadata.class);
+
+ // In case all data was inserts and the commit failed, delete
the file belonging to that commit
+ // We do not know fileIds for inserts (first inserts are either
log files or base files),
+ // delete all files for the corresponding failed commit, if
present (same as COW)
+ hoodieRollbackRequests.add(
+ deleteDataAndLogFilesRollbackRequest(instantToRollback,
metaClient, partitionPath));
+
+ // append rollback blocks for updates and inserts as A.2 and B.2
+ if
(commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
+ hoodieRollbackRequests.addAll(
+ getRollbackRequestForAppend(partitionPath,
instantToRollback, commitMetadata, table));
+ }
+ break;
+ default:
+ throw new HoodieRollbackException("Unknown listing type, during
rollback of " + instantToRollback);
+ }
+ }
+ return hoodieRollbackRequests.stream();
+ }, numPartitions);
+ } catch (Exception e) {
LOG.error("Generating rollback requests failed for " +
instantToRollback.getTimestamp(), e);
throw new HoodieRollbackException("Generating rollback requests failed
for " + instantToRollback.getTimestamp(), e);
}
}
+
+ private String getBaseFileExtension(HoodieTableMetaClient metaClient) {
+ return metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+ }
+
+ @NotNull
+ private HoodieRollbackRequest
deleteDataAndLogFilesRollbackRequest(HoodieInstant instantToRollback,
+
HoodieTableMetaClient metaClient,
+ String
partitionPath)
+ throws IOException {
+ final FileStatus[] filesToDeletedStatus =
getBaseAndLogFilesToBeDeleted(metaClient, instantToRollback, partitionPath);
+ return getHoodieRollbackRequest(partitionPath, filesToDeletedStatus);
+ }
+
+ @NotNull
+ private HoodieRollbackRequest
deleteDataFilesOnlyRollbackRequest(HoodieInstant instantToRollback,
+ String
basefileExtension, String partitionPath,
+ FileSystem
fs)
+ throws IOException {
+ final FileStatus[] dataFilesToDeletedStatus =
+ getBaseFilesToBeDeleted(instantToRollback.getTimestamp(),
basefileExtension,
+ partitionPath, fs);
+ return getHoodieRollbackRequest(partitionPath, dataFilesToDeletedStatus);
+ }
+
+ @NotNull
+ private HoodieRollbackRequest getHoodieRollbackRequest(String partitionPath,
FileStatus[] filesToDeletedStatus) {
+ List<String> filesToBeDeleted = getFilesToBeDeleted(filesToDeletedStatus);
Review comment:
Typo: `filesToDelete`
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -57,20 +81,252 @@ public ListingBasedRollbackStrategy(HoodieTable table,
@Override
public List<HoodieRollbackRequest> getRollbackRequests(HoodieInstant
instantToRollback) {
try {
- List<ListingBasedRollbackRequest> rollbackRequests = null;
- if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
- rollbackRequests =
RollbackUtils.generateRollbackRequestsByListingCOW(context,
- table.getMetaClient().getBasePath());
- } else {
- rollbackRequests = RollbackUtils
- .generateRollbackRequestsUsingFileListingMOR(instantToRollback,
table, context);
- }
- List<HoodieRollbackRequest> listingBasedRollbackRequests = new
ListingBasedRollbackHelper(table.getMetaClient(), config)
- .getRollbackRequestsForRollbackPlan(context, instantToRollback,
rollbackRequests);
- return listingBasedRollbackRequests;
- } catch (IOException e) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> partitionPaths =
+ FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), false, false);
+ int numPartitions = Math.max(Math.min(partitionPaths.size(),
config.getRollbackParallelism()), 1);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing
Rollback Plan");
+
+ return context.flatMap(partitionPaths, partitionPath -> {
+ List<HoodieRollbackRequest> hoodieRollbackRequests = new
ArrayList<>(partitionPaths.size());
+ if (table.getMetaClient().getTableType() ==
HoodieTableType.COPY_ON_WRITE) {
+ hoodieRollbackRequests.add(
Review comment:
Generally, in all of these branches we need to handle 2 cases -- when
commit has been completed, and when it is still pending at the time of the
rollback:
- If it's completed, we get list of files from Metadata and rollback those
- If it was not completed, we list the files from FS (matching commit
description), and rollback those
Does it make sense?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]