nsivabalan commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r779271122



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -134,12 +161,199 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
         LOG.info("No Instants to archive");
       }
 
+      if (config.getArchiveAutoMergeEnable()) {
+        mergeArchiveFilesIfNecessary(context);
+      }
       return success;
     } finally {
       close();
     }
   }
 
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+    Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+    // Flush reminded content if existed and open a new write
+    reOpenWriter();
+    // List all archive files
+    FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+    List<FileStatus> mergeCandidate = new ArrayList<>();
+    int archiveFilesCompactBatch = config.getArchiveFilesMergeBatchSize();
+    long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+    for (FileStatus fs: fsStatuses) {
+      if (fs.getLen() < smallFileLimitBytes) {
+        mergeCandidate.add(fs);
+      }
+      if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+        break;
+      }
+    }
+
+    if (mergeCandidate.size() >= archiveFilesCompactBatch) {
+      List<String> candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+      // before merge archive files build merge plan
+      String logFileName = computeLogFileName();
+      buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+      // merge archive files
+      mergeArchiveFiles(mergeCandidate);
+      // after merge, delete the small archive files.
+      deleteFilesParallelize(metaClient, candidateFiles, context, true);
+      // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+      metaClient.getFs().delete(planPath, false);
+    }
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+    if (!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+      String logWriteToken = writer.getLogFile().getLogWriteToken();
+      HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+      return hoodieLogFile.getFileName();
+    } else {
+      return writer.getLogFile().getFileName();

Review comment:
       @vinothchandar @yihua : Do you folks any thoughts here. This is the only 
pending item we need to resolve. rest of the patch looks fine to me. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to