satishkotha commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r488990799
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext
jsc, HoodieInstant in
}
}
+ private void deleteReplacedFiles(HoodieInstant instant) {
+ if (!instant.isCompleted()) {
+ // only delete files for completed instants
+ return;
+ }
+
+ TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+ Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+ .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+ fileGroupsToDelete.forEach(fg -> {
+ fg.getAllRawFileSlices().forEach(fileSlice -> {
+ fileSlice.getBaseFile().map(baseFile ->
deletePath(baseFile.getFileStatus().getPath(), instant));
+ fileSlice.getLogFiles().forEach(logFile ->
deletePath(logFile.getPath(), instant));
+ });
+ });
+ }
+
+ /**
+ * Because we are creating new 'HoodieTable' and FileSystemView objects in
this class constructor,
+ * partition view may not be loaded correctly.
+ * Reload all partitions modified by REPLACE action
+ *
+ * TODO find a better way to pass the FileSystemView to this class.
+ */
+ private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant,
TableFileSystemView fileSystemView) {
+ Option<HoodieInstant> replaceInstantOption =
metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+ .filter(replaceInstant ->
replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+ replaceInstantOption.ifPresent(replaceInstant -> {
+ try {
+ HoodieReplaceCommitMetadata metadata =
HoodieReplaceCommitMetadata.fromBytes(
+
metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+ HoodieReplaceCommitMetadata.class);
+
+ metadata.getPartitionToReplaceStats().keySet().forEach(partition ->
fileSystemView.getAllFileGroups(partition));
+ } catch (IOException e) {
+ throw new HoodieCommitException("Failed to archive because cannot
delete replace files", e);
+ }
+ });
+ }
+
+ private boolean deletePath(Path path, HoodieInstant instant) {
+ try {
+ LOG.info("Deleting " + path + " before archiving " + instant);
+ metaClient.getFs().delete(path);
Review comment:
for parallel deletes, JavaSparkContext is not exposed to Archive
process. Since we anyway want to move this to be part of clean, is it ok if I
defer this to https://issues.apache.org/jira/browse/HUDI-1276?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]