yihua commented on a change in pull request #4078:
URL: https://github.com/apache/hudi/pull/4078#discussion_r780743992



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -249,6 +249,24 @@
           + "record size estimate compute dynamically based on commit 
metadata. "
           + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty<String> ARCHIVE_FILES_MERGE_BATCH_SIZE = 
ConfigProperty
+      .key("hoodie.archive.files.merge.batch.size")
+      .defaultValue(String.valueOf(10))
+      .withDocumentation("The numbers of small archive files are merged at 
once.");

Review comment:
       nit: docs -> `"The number of small archive files to be merged at once."`

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
         LOG.info("No Instants to archive");
       }
 
+      if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+        mergeArchiveFilesIfNecessary(context);
+      }
       return success;
     } finally {
       close();
     }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+    Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+    // Flush reminded content if existed and open a new write
+    reOpenWriter();
+    // List all archive files
+    FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+    // Sort files by version suffix in reverse (implies reverse chronological 
order)
+    Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+    int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+    long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+    List<FileStatus> mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+    if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+      List<String> candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());

Review comment:
       Do we consider limiting the number of archive files to merge for each 
merging operation?  The first time triggering the archive file merging 
operation on an existing Hudi table with a long history could introduce the 
merging of a large number of archive files, penalizing a single commit.  It can 
also create a huge archive file, and the operation itself can take a long time 
and fail.  Maybe we can limit the number of files or bytes to merge?  And 
possibly making the merge plan to include multiple batches of merging, and each 
commit triggering only one batch of merging?  Sth like:
   
   ```
   candidates: log1, log2, ...., log1000
   plan:
   batch1: log1, ..., log100 -> log1001
   batch2: log101, ..., log200 -> log1002
   ...
   batch10: log901, ..., log1000 -> log1010
   this commit: only executing batch1
   ```
   
   This is something we can follow up on in a separate PR.  @zhangyue19921010 
you can file a ticket for that.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
         LOG.info("No Instants to archive");
       }
 
+      if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+        mergeArchiveFilesIfNecessary(context);
+      }
       return success;
     } finally {
       close();
     }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+    Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+    // Flush reminded content if existed and open a new write
+    reOpenWriter();
+    // List all archive files
+    FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+    // Sort files by version suffix in reverse (implies reverse chronological 
order)
+    Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+    int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+    long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+    List<FileStatus> mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+    if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+      List<String> candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+      // before merge archive files build merge plan
+      String logFileName = computeLogFileName();
+      buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+      // merge archive files
+      mergeArchiveFiles(mergeCandidate);
+      // after merge, delete the small archive files.
+      deleteFilesParallelize(metaClient, candidateFiles, context, true);
+      // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+      metaClient.getFs().delete(planPath, false);
+    }
+  }
+
+  /**
+   * Find the latest 'huge archive file' index as a break point and only 
check/merge newer archive files.
+   * Because we need to keep the original order of archive files which is 
important when loading archived instants with time filter.
+   * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, 
boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter)
+   * @param smallFileLimitBytes small File Limit Bytes
+   * @param fsStatuses Sort by version suffix in reverse
+   * @return merge candidates
+   */
+  private List<FileStatus> getMergeCandidates(long smallFileLimitBytes, 
FileStatus[] fsStatuses) {
+    int index = 0;
+    for (; index < fsStatuses.length; index++) {
+      if (fsStatuses[index].getLen() > smallFileLimitBytes) {
+        break;
+      }
+    }
+    return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+    String logWriteToken = writer.getLogFile().getLogWriteToken();
+    HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+    return hoodieLogFile.getFileName();
+  }
+
+  /**
+   * Check/Solve if there is any failed and unfinished merge small archive 
files operation
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   * @throws IOException
+   */
+  private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext 
context) throws IOException {
+    if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+      Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+      HoodieWrapperFileSystem fs = metaClient.getFs();
+      // If plan exist, last merge small archive files was failed.
+      // we need to revert or complete last action.
+      if (fs.exists(planPath)) {
+        HoodieMergeArchiveFilePlan plan = 
TimelineMetadataUtils.deserializeAvroMetadata(readDataFromPath(planPath).get(), 
HoodieMergeArchiveFilePlan.class);
+        Path mergedArchiveFile = new Path(metaClient.getArchivePath(), 
plan.getMergedArchiveFileName());
+        List<Path> candidates = 
plan.getCandidate().stream().map(Path::new).collect(Collectors.toList());
+        if (candidateAllExists(candidates)) {
+          // Last merge action is failed during writing merged archive file.
+          // But all the small archive files are not deleted.
+          // Revert last action by deleting mergedArchiveFile if existed.
+          if (fs.exists(mergedArchiveFile)) {
+            fs.delete(mergedArchiveFile, false);
+          }
+        } else {
+          // Last merge action is failed during deleting small archive files.
+          // But the merged files is completed.
+          // Try to complete last action
+          if (fs.exists(mergedArchiveFile)) {
+            deleteFilesParallelize(metaClient, plan.getCandidate(), context, 
true);
+          }
+        }
+
+        fs.delete(planPath);
+      }
+    }
+  }
+
+  /**
+   * If all the candidate small archive files existed, last merge operation 
was failed during writing the merged archive file.
+   * If at least one of candidate small archive files existed, the merged 
archive file was created and last operation was failed during deleting the 
small archive files.
+   */
+  private boolean candidateAllExists(List<Path> candidates) throws IOException 
{
+    for (Path archiveFile : candidates) {
+      if (!metaClient.getFs().exists(archiveFile)) {
+        // candidate is deleted
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private Option<byte[]> readDataFromPath(Path detailPath) {
+    try (FSDataInputStream is = metaClient.getFs().open(detailPath)) {
+      return Option.of(FileIOUtils.readAsByteArray(is));
+    } catch (IOException e) {
+      throw new HoodieIOException("Could not read commit details from " + 
detailPath, e);
+    }
+  }
+
+  public void buildArchiveMergePlan(List<String> compactCandidate, Path 
planPath, String compactedArchiveFileName) throws IOException {
+    HoodieMergeArchiveFilePlan plan = HoodieMergeArchiveFilePlan.newBuilder()
+        .setCandidate(compactCandidate)
+        .setMergedArchiveFileName(compactedArchiveFileName)
+        .build();
+    Option<byte[]> content = TimelineMetadataUtils.serializeAvroMetadata(plan, 
HoodieMergeArchiveFilePlan.class);
+    createFileInPath(planPath, content);
+  }
+
+  private void createFileInPath(Path fullPath, Option<byte[]> content) {
+    try {
+      // If the path does not exist, create it first
+      if (!metaClient.getFs().exists(fullPath)) {
+        if (metaClient.getFs().createNewFile(fullPath)) {
+          LOG.info("Created a new file in meta path: " + fullPath);
+        } else {
+          throw new HoodieIOException("Failed to create file " + fullPath);
+        }
+      }
+
+      if (content.isPresent()) {
+        FSDataOutputStream fsout = metaClient.getFs().create(fullPath, true);
+        fsout.write(content.get());
+        fsout.close();
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to create file " + fullPath, e);
+    }
+  }
+
+  public void mergeArchiveFiles(List<FileStatus> compactCandidate) throws 
IOException {
+    Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
+    try {
+      List<IndexedRecord> records = new ArrayList<>();
+      for (FileStatus fs : compactCandidate) {
+        // Read the archived file
+        try (HoodieLogFormat.Reader reader = 
HoodieLogFormat.newReader(metaClient.getFs(),
+            new HoodieLogFile(fs.getPath()), 
HoodieArchivedMetaEntry.getClassSchema())) {
+          // Read the avro blocks
+          while (reader.hasNext()) {
+            HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
+            List<IndexedRecord> recordsPerFile = blk.getRecords();
+            records.addAll(recordsPerFile);
+            if (records.size() >= this.config.getCommitArchivalBatchSize()) {
+              writeToFile(wrapperSchema, records);
+            }
+          }
+        }
+      }
+      writeToFile(wrapperSchema, records);
+    } catch (Exception e) {
+      throw new HoodieCommitException("Failed to merge small archive files", 
e);
+    } finally {
+      writer.close();
+    }
+  }
+
+  private Map<String, Boolean> deleteFilesParallelize(HoodieTableMetaClient 
metaClient, List<String> paths, HoodieEngineContext context, boolean 
ignoreFailed) {
+
+    return FSUtils.parallelizeFilesProcess(context,
+        metaClient.getFs(),
+        config.getArchiveDeleteParallelism(),
+        pairOfSubPathAndConf -> {
+          Path file = new Path(pairOfSubPathAndConf.getKey());
+          try {
+            FileSystem fs = metaClient.getFs();

Review comment:
       Have you tested this with Spark to see if introducing the metaClient 
here does not cause TaskNotSerializable issue?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -249,6 +249,24 @@
           + "record size estimate compute dynamically based on commit 
metadata. "
           + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty<String> ARCHIVE_FILES_MERGE_BATCH_SIZE = 
ConfigProperty
+      .key("hoodie.archive.files.merge.batch.size")
+      .defaultValue(String.valueOf(10))
+      .withDocumentation("The numbers of small archive files are merged at 
once.");
+
+  public static final ConfigProperty<String> 
ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+      .key("hoodie.archive.merge.small.file.limit.bytes")
+      .defaultValue(String.valueOf(20 * 1024 * 1024))
+      .withDocumentation("This config sets the archive file size limit below 
which an archive file becomes a candidate to be selected as such a small 
file.");
+
+  public static final ConfigProperty<String> ARCHIVE_AUTO_MERGE_ENABLE = 
ConfigProperty
+      .key("hoodie.archive.auto.merge.enable")
+      .defaultValue("false")
+      .withDocumentation("When enable, hoodie will auto merge several small 
archive files into larger one. It's"
+          + " useful when storage scheme doesn't support append operation.");

Review comment:
       Use actual types in the ConfigProperty, e.g., `ConfigProperty<Integer>`, 
`ConfigProperty<Long>`, `ConfigProperty<Boolean>`, instead of embedding the 
value in String?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -249,6 +249,24 @@
           + "record size estimate compute dynamically based on commit 
metadata. "
           + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty<String> ARCHIVE_FILES_MERGE_BATCH_SIZE = 
ConfigProperty
+      .key("hoodie.archive.files.merge.batch.size")

Review comment:
       nit: `hoodie.archive.files.merge.batch.size` -> 
`hoodie.archive.merge.files.batch.size`, so it has the same prefix as 
`hoodie.archive.merge.small.file.limit.bytes`.  Don't forget to update the 
variable and method names as well.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
         LOG.info("No Instants to archive");
       }
 
+      if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+        mergeArchiveFilesIfNecessary(context);
+      }
       return success;
     } finally {
       close();
     }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+    Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+    // Flush reminded content if existed and open a new write

Review comment:
       `reminded`: do you mean `remained`

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
         LOG.info("No Instants to archive");
       }
 
+      if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+        mergeArchiveFilesIfNecessary(context);
+      }
       return success;
     } finally {
       close();
     }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+    Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+    // Flush reminded content if existed and open a new write
+    reOpenWriter();
+    // List all archive files
+    FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+    // Sort files by version suffix in reverse (implies reverse chronological 
order)
+    Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+    int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+    long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+    List<FileStatus> mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+    if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+      List<String> candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+      // before merge archive files build merge plan
+      String logFileName = computeLogFileName();
+      buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+      // merge archive files
+      mergeArchiveFiles(mergeCandidate);
+      // after merge, delete the small archive files.
+      deleteFilesParallelize(metaClient, candidateFiles, context, true);
+      // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+      metaClient.getFs().delete(planPath, false);
+    }
+  }
+
+  /**
+   * Find the latest 'huge archive file' index as a break point and only 
check/merge newer archive files.
+   * Because we need to keep the original order of archive files which is 
important when loading archived instants with time filter.
+   * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, 
boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter)
+   * @param smallFileLimitBytes small File Limit Bytes
+   * @param fsStatuses Sort by version suffix in reverse
+   * @return merge candidates
+   */
+  private List<FileStatus> getMergeCandidates(long smallFileLimitBytes, 
FileStatus[] fsStatuses) {
+    int index = 0;
+    for (; index < fsStatuses.length; index++) {
+      if (fsStatuses[index].getLen() > smallFileLimitBytes) {
+        break;
+      }
+    }
+    return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+    String logWriteToken = writer.getLogFile().getLogWriteToken();
+    HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+    return hoodieLogFile.getFileName();
+  }
+
+  /**
+   * Check/Solve if there is any failed and unfinished merge small archive 
files operation
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   * @throws IOException
+   */
+  private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext 
context) throws IOException {
+    if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {

Review comment:
       If the last merging of small archive files fails, and the current Hudi 
writer / commit is configured to disable archive file merging, we still need to 
clean up the state right?  Otherwise, some archive logs are dangling, e.g., 
incomplete mergedArchiveFile, which may result in failures of reading archived 
timeline.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
         LOG.info("No Instants to archive");
       }
 
+      if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+        mergeArchiveFilesIfNecessary(context);
+      }
       return success;
     } finally {
       close();
     }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+    Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+    // Flush reminded content if existed and open a new write
+    reOpenWriter();
+    // List all archive files
+    FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+    // Sort files by version suffix in reverse (implies reverse chronological 
order)
+    Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+    int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+    long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+    List<FileStatus> mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+    if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+      List<String> candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+      // before merge archive files build merge plan
+      String logFileName = computeLogFileName();
+      buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+      // merge archive files
+      mergeArchiveFiles(mergeCandidate);
+      // after merge, delete the small archive files.
+      deleteFilesParallelize(metaClient, candidateFiles, context, true);
+      // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+      metaClient.getFs().delete(planPath, false);
+    }
+  }
+
+  /**
+   * Find the latest 'huge archive file' index as a break point and only 
check/merge newer archive files.
+   * Because we need to keep the original order of archive files which is 
important when loading archived instants with time filter.
+   * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, 
boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter)
+   * @param smallFileLimitBytes small File Limit Bytes
+   * @param fsStatuses Sort by version suffix in reverse
+   * @return merge candidates
+   */
+  private List<FileStatus> getMergeCandidates(long smallFileLimitBytes, 
FileStatus[] fsStatuses) {
+    int index = 0;
+    for (; index < fsStatuses.length; index++) {
+      if (fsStatuses[index].getLen() > smallFileLimitBytes) {
+        break;
+      }
+    }
+    return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+    String logWriteToken = writer.getLogFile().getLogWriteToken();
+    HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+    return hoodieLogFile.getFileName();
+  }
+
+  /**
+   * Check/Solve if there is any failed and unfinished merge small archive 
files operation
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   * @throws IOException
+   */
+  private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext 
context) throws IOException {
+    if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+      Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+      HoodieWrapperFileSystem fs = metaClient.getFs();
+      // If plan exist, last merge small archive files was failed.
+      // we need to revert or complete last action.
+      if (fs.exists(planPath)) {
+        HoodieMergeArchiveFilePlan plan = 
TimelineMetadataUtils.deserializeAvroMetadata(readDataFromPath(planPath).get(), 
HoodieMergeArchiveFilePlan.class);

Review comment:
       Should the failed plan write/creation be also considered, i.e., the job 
fails when the plan is being written to a file?  In this case, just delete the 
plan itself should be enough?  The plan file that cannot be properly 
deserialized, throwing an IOException, can block the whole archive process 
based on the code.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
         LOG.info("No Instants to archive");
       }
 
+      if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+        mergeArchiveFilesIfNecessary(context);
+      }
       return success;
     } finally {
       close();
     }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+    Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+    // Flush reminded content if existed and open a new write
+    reOpenWriter();
+    // List all archive files
+    FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+    // Sort files by version suffix in reverse (implies reverse chronological 
order)
+    Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+    int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+    long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+    List<FileStatus> mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+    if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+      List<String> candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+      // before merge archive files build merge plan
+      String logFileName = computeLogFileName();
+      buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+      // merge archive files
+      mergeArchiveFiles(mergeCandidate);
+      // after merge, delete the small archive files.
+      deleteFilesParallelize(metaClient, candidateFiles, context, true);
+      // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.
+      metaClient.getFs().delete(planPath, false);
+    }
+  }
+
+  /**
+   * Find the latest 'huge archive file' index as a break point and only 
check/merge newer archive files.
+   * Because we need to keep the original order of archive files which is 
important when loading archived instants with time filter.
+   * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, 
boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter)
+   * @param smallFileLimitBytes small File Limit Bytes
+   * @param fsStatuses Sort by version suffix in reverse
+   * @return merge candidates
+   */
+  private List<FileStatus> getMergeCandidates(long smallFileLimitBytes, 
FileStatus[] fsStatuses) {
+    int index = 0;
+    for (; index < fsStatuses.length; index++) {
+      if (fsStatuses[index].getLen() > smallFileLimitBytes) {
+        break;
+      }
+    }
+    return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
+  }
+
+  /**
+   * Get final written archive file name based on storageSchemes support 
append or not.
+   */
+  private String computeLogFileName() throws IOException {
+    String logWriteToken = writer.getLogFile().getLogWriteToken();
+    HoodieLogFile hoodieLogFile = 
writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
+    return hoodieLogFile.getFileName();
+  }
+
+  /**
+   * Check/Solve if there is any failed and unfinished merge small archive 
files operation
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   * @throws IOException
+   */
+  private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext 
context) throws IOException {
+    if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+      Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+      HoodieWrapperFileSystem fs = metaClient.getFs();
+      // If plan exist, last merge small archive files was failed.
+      // we need to revert or complete last action.
+      if (fs.exists(planPath)) {
+        HoodieMergeArchiveFilePlan plan = 
TimelineMetadataUtils.deserializeAvroMetadata(readDataFromPath(planPath).get(), 
HoodieMergeArchiveFilePlan.class);
+        Path mergedArchiveFile = new Path(metaClient.getArchivePath(), 
plan.getMergedArchiveFileName());
+        List<Path> candidates = 
plan.getCandidate().stream().map(Path::new).collect(Collectors.toList());
+        if (candidateAllExists(candidates)) {
+          // Last merge action is failed during writing merged archive file.
+          // But all the small archive files are not deleted.
+          // Revert last action by deleting mergedArchiveFile if existed.
+          if (fs.exists(mergedArchiveFile)) {
+            fs.delete(mergedArchiveFile, false);
+          }
+        } else {
+          // Last merge action is failed during deleting small archive files.
+          // But the merged files is completed.
+          // Try to complete last action
+          if (fs.exists(mergedArchiveFile)) {
+            deleteFilesParallelize(metaClient, plan.getCandidate(), context, 
true);
+          }
+        }
+
+        fs.delete(planPath);
+      }
+    }
+  }
+
+  /**
+   * If all the candidate small archive files existed, last merge operation 
was failed during writing the merged archive file.
+   * If at least one of candidate small archive files existed, the merged 
archive file was created and last operation was failed during deleting the 
small archive files.
+   */
+  private boolean candidateAllExists(List<Path> candidates) throws IOException 
{
+    for (Path archiveFile : candidates) {
+      if (!metaClient.getFs().exists(archiveFile)) {
+        // candidate is deleted
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private Option<byte[]> readDataFromPath(Path detailPath) {
+    try (FSDataInputStream is = metaClient.getFs().open(detailPath)) {
+      return Option.of(FileIOUtils.readAsByteArray(is));
+    } catch (IOException e) {
+      throw new HoodieIOException("Could not read commit details from " + 
detailPath, e);
+    }
+  }
+
+  public void buildArchiveMergePlan(List<String> compactCandidate, Path 
planPath, String compactedArchiveFileName) throws IOException {
+    HoodieMergeArchiveFilePlan plan = HoodieMergeArchiveFilePlan.newBuilder()
+        .setCandidate(compactCandidate)
+        .setMergedArchiveFileName(compactedArchiveFileName)
+        .build();
+    Option<byte[]> content = TimelineMetadataUtils.serializeAvroMetadata(plan, 
HoodieMergeArchiveFilePlan.class);
+    createFileInPath(planPath, content);
+  }
+
+  private void createFileInPath(Path fullPath, Option<byte[]> content) {
+    try {
+      // If the path does not exist, create it first
+      if (!metaClient.getFs().exists(fullPath)) {
+        if (metaClient.getFs().createNewFile(fullPath)) {
+          LOG.info("Created a new file in meta path: " + fullPath);
+        } else {
+          throw new HoodieIOException("Failed to create file " + fullPath);
+        }
+      }
+
+      if (content.isPresent()) {
+        FSDataOutputStream fsout = metaClient.getFs().create(fullPath, true);
+        fsout.write(content.get());
+        fsout.close();
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to create file " + fullPath, e);
+    }
+  }

Review comment:
       Maybe we can make this a util method.  There are other places, e.g., 
`writeMarkerTypeToFile()`, creating a file with content.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -249,6 +249,24 @@
           + "record size estimate compute dynamically based on commit 
metadata. "
           + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty<String> ARCHIVE_FILES_MERGE_BATCH_SIZE = 
ConfigProperty
+      .key("hoodie.archive.files.merge.batch.size")
+      .defaultValue(String.valueOf(10))
+      .withDocumentation("The numbers of small archive files are merged at 
once.");
+
+  public static final ConfigProperty<String> 
ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+      .key("hoodie.archive.merge.small.file.limit.bytes")
+      .defaultValue(String.valueOf(20 * 1024 * 1024))
+      .withDocumentation("This config sets the archive file size limit below 
which an archive file becomes a candidate to be selected as such a small 
file.");
+
+  public static final ConfigProperty<String> ARCHIVE_AUTO_MERGE_ENABLE = 
ConfigProperty
+      .key("hoodie.archive.auto.merge.enable")

Review comment:
       nit: `hoodie.archive.auto.merge.enable` -> `hoodie.archive.merge.enable` 
(it's anyway automatic)

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
         LOG.info("No Instants to archive");
       }
 
+      if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+        mergeArchiveFilesIfNecessary(context);
+      }
       return success;
     } finally {
       close();
     }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:
+   * 1. Build merge plan with merge candidates/merged file name infos.
+   * 2. Do merge.
+   * 3. Delete all the candidates.
+   * 4. Delete the merge plan.
+   * @param context HoodieEngineContext
+   * @throws IOException
+   */
+  private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) 
throws IOException {
+    Path planPath = new Path(metaClient.getArchivePath(), 
mergeArchivePlanName);
+    // Flush reminded content if existed and open a new write
+    reOpenWriter();
+    // List all archive files
+    FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+    // Sort files by version suffix in reverse (implies reverse chronological 
order)
+    Arrays.sort(fsStatuses, new 
HoodieArchivedTimeline.ArchiveFileVersionComparator());
+
+    int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize();
+    long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
+
+    List<FileStatus> mergeCandidate = getMergeCandidates(smallFileLimitBytes, 
fsStatuses);
+
+    if (mergeCandidate.size() >= archiveFilesMergeBatch) {
+      List<String> candidateFiles = mergeCandidate.stream().map(fs -> 
fs.getPath().toString()).collect(Collectors.toList());
+      // before merge archive files build merge plan
+      String logFileName = computeLogFileName();
+      buildArchiveMergePlan(candidateFiles, planPath, logFileName);
+      // merge archive files
+      mergeArchiveFiles(mergeCandidate);
+      // after merge, delete the small archive files.
+      deleteFilesParallelize(metaClient, candidateFiles, context, true);
+      // finally, delete archiveMergePlan which means merge small archive 
files operatin is succeed.

Review comment:
       nit: typos here

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext 
context) throws IOException
         LOG.info("No Instants to archive");
       }
 
+      if (config.getArchiveAutoMergeEnable() && 
!StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) {
+        mergeArchiveFilesIfNecessary(context);
+      }
       return success;
     } finally {
       close();
     }
   }
 
+  /**
+   * Here Hoodie can merge the small archive files into a new larger one.
+   * Only used for filesystem which is not supported append operation.
+   * The hole merge small archive files operation has four stages:

Review comment:
       nit: typos here:
   `which is not supported` -> `which does not support`
   `hole` -> `whole`

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -249,6 +249,24 @@
           + "record size estimate compute dynamically based on commit 
metadata. "
           + " This is critical in computing the insert parallelism and 
bin-packing inserts into small files.");
 
+  public static final ConfigProperty<String> ARCHIVE_FILES_MERGE_BATCH_SIZE = 
ConfigProperty
+      .key("hoodie.archive.files.merge.batch.size")
+      .defaultValue(String.valueOf(10))
+      .withDocumentation("The numbers of small archive files are merged at 
once.");
+
+  public static final ConfigProperty<String> 
ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+      .key("hoodie.archive.merge.small.file.limit.bytes")
+      .defaultValue(String.valueOf(20 * 1024 * 1024))
+      .withDocumentation("This config sets the archive file size limit below 
which an archive file becomes a candidate to be selected as such a small 
file.");
+
+  public static final ConfigProperty<String> ARCHIVE_AUTO_MERGE_ENABLE = 
ConfigProperty
+      .key("hoodie.archive.auto.merge.enable")
+      .defaultValue("false")
+      .withDocumentation("When enable, hoodie will auto merge several small 
archive files into larger one. It's"
+          + " useful when storage scheme doesn't support append operation.");
+
+
+

Review comment:
       nit: remove empty lines to only have one?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to