zhangyue19921010 commented on a change in pull request #4078: URL: https://github.com/apache/hudi/pull/4078#discussion_r782002120
########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java ########## @@ -134,12 +161,222 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("No Instants to archive"); } + if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { + mergeArchiveFilesIfNecessary(context); + } return success; } finally { close(); } } + /** + * Here Hoodie can merge the small archive files into a new larger one. + * Only used for filesystem which is not supported append operation. + * The hole merge small archive files operation has four stages: + * 1. Build merge plan with merge candidates/merged file name infos. + * 2. Do merge. + * 3. Delete all the candidates. + * 4. Delete the merge plan. + * @param context HoodieEngineContext + * @throws IOException + */ + private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { + Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); + // Flush reminded content if existed and open a new write + reOpenWriter(); + // List all archive files + FileStatus[] fsStatuses = metaClient.getFs().globStatus( + new Path(metaClient.getArchivePath() + "/.commits_.archive*")); + // Sort files by version suffix in reverse (implies reverse chronological order) + Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator()); + + int archiveFilesMergeBatch = config.getArchiveFilesMergeBatchSize(); + long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes(); + + List<FileStatus> mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses); + + if (mergeCandidate.size() >= archiveFilesMergeBatch) { + List<String> candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + // before merge archive files build merge plan + String logFileName = computeLogFileName(); + buildArchiveMergePlan(candidateFiles, planPath, logFileName); + // merge archive files + mergeArchiveFiles(mergeCandidate); + // after merge, delete the small archive files. + deleteFilesParallelize(metaClient, candidateFiles, context, true); + // finally, delete archiveMergePlan which means merge small archive files operatin is succeed. + metaClient.getFs().delete(planPath, false); + } + } + + /** + * Find the latest 'huge archive file' index as a break point and only check/merge newer archive files. + * Because we need to keep the original order of archive files which is important when loading archived instants with time filter. + * {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter) + * @param smallFileLimitBytes small File Limit Bytes + * @param fsStatuses Sort by version suffix in reverse + * @return merge candidates + */ + private List<FileStatus> getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) { + int index = 0; + for (; index < fsStatuses.length; index++) { + if (fsStatuses[index].getLen() > smallFileLimitBytes) { + break; + } + } + return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList()); + } + + /** + * Get final written archive file name based on storageSchemes support append or not. + */ + private String computeLogFileName() throws IOException { + String logWriteToken = writer.getLogFile().getLogWriteToken(); + HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken); + return hoodieLogFile.getFileName(); + } + + /** + * Check/Solve if there is any failed and unfinished merge small archive files operation + * @param context HoodieEngineContext used for parallelize to delete small archive files if necessary. + * @throws IOException + */ + private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException { + if (config.getArchiveAutoMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme())) { + Path planPath = new Path(metaClient.getArchivePath(), mergeArchivePlanName); + HoodieWrapperFileSystem fs = metaClient.getFs(); + // If plan exist, last merge small archive files was failed. + // we need to revert or complete last action. + if (fs.exists(planPath)) { + HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(readDataFromPath(planPath).get(), HoodieMergeArchiveFilePlan.class); + Path mergedArchiveFile = new Path(metaClient.getArchivePath(), plan.getMergedArchiveFileName()); + List<Path> candidates = plan.getCandidate().stream().map(Path::new).collect(Collectors.toList()); + if (candidateAllExists(candidates)) { + // Last merge action is failed during writing merged archive file. + // But all the small archive files are not deleted. + // Revert last action by deleting mergedArchiveFile if existed. + if (fs.exists(mergedArchiveFile)) { + fs.delete(mergedArchiveFile, false); + } + } else { + // Last merge action is failed during deleting small archive files. + // But the merged files is completed. + // Try to complete last action + if (fs.exists(mergedArchiveFile)) { + deleteFilesParallelize(metaClient, plan.getCandidate(), context, true); + } + } + + fs.delete(planPath); + } + } + } + + /** + * If all the candidate small archive files existed, last merge operation was failed during writing the merged archive file. + * If at least one of candidate small archive files existed, the merged archive file was created and last operation was failed during deleting the small archive files. + */ + private boolean candidateAllExists(List<Path> candidates) throws IOException { + for (Path archiveFile : candidates) { + if (!metaClient.getFs().exists(archiveFile)) { + // candidate is deleted + return false; + } + } + return true; + } + + private Option<byte[]> readDataFromPath(Path detailPath) { + try (FSDataInputStream is = metaClient.getFs().open(detailPath)) { + return Option.of(FileIOUtils.readAsByteArray(is)); + } catch (IOException e) { + throw new HoodieIOException("Could not read commit details from " + detailPath, e); + } + } + + public void buildArchiveMergePlan(List<String> compactCandidate, Path planPath, String compactedArchiveFileName) throws IOException { + HoodieMergeArchiveFilePlan plan = HoodieMergeArchiveFilePlan.newBuilder() + .setCandidate(compactCandidate) + .setMergedArchiveFileName(compactedArchiveFileName) + .build(); + Option<byte[]> content = TimelineMetadataUtils.serializeAvroMetadata(plan, HoodieMergeArchiveFilePlan.class); + createFileInPath(planPath, content); + } + + private void createFileInPath(Path fullPath, Option<byte[]> content) { + try { + // If the path does not exist, create it first + if (!metaClient.getFs().exists(fullPath)) { + if (metaClient.getFs().createNewFile(fullPath)) { + LOG.info("Created a new file in meta path: " + fullPath); + } else { + throw new HoodieIOException("Failed to create file " + fullPath); + } + } + + if (content.isPresent()) { + FSDataOutputStream fsout = metaClient.getFs().create(fullPath, true); + fsout.write(content.get()); + fsout.close(); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to create file " + fullPath, e); + } + } + + public void mergeArchiveFiles(List<FileStatus> compactCandidate) throws IOException { + Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); + try { + List<IndexedRecord> records = new ArrayList<>(); + for (FileStatus fs : compactCandidate) { + // Read the archived file + try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(), + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) { + // Read the avro blocks + while (reader.hasNext()) { + HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); + List<IndexedRecord> recordsPerFile = blk.getRecords(); + records.addAll(recordsPerFile); + if (records.size() >= this.config.getCommitArchivalBatchSize()) { + writeToFile(wrapperSchema, records); + } + } + } + } + writeToFile(wrapperSchema, records); + } catch (Exception e) { + throw new HoodieCommitException("Failed to merge small archive files", e); + } finally { + writer.close(); + } + } + + private Map<String, Boolean> deleteFilesParallelize(HoodieTableMetaClient metaClient, List<String> paths, HoodieEngineContext context, boolean ignoreFailed) { + + return FSUtils.parallelizeFilesProcess(context, + metaClient.getFs(), + config.getArchiveDeleteParallelism(), + pairOfSubPathAndConf -> { + Path file = new Path(pairOfSubPathAndConf.getKey()); + try { + FileSystem fs = metaClient.getFs(); Review comment: Yep, Tested with spark. Also `public class HoodieTableMetaClient implements Serializable {xxx}` with `private transient HoodieWrapperFileSystem fs;` ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java ########## @@ -174,6 +205,82 @@ public void testArchiveTableWithArchival(boolean enableMetadata) throws Exceptio } } + @Test + public void testArchiveTableWithArchivalSmallFileMergeEnableRecoverFromDeleteFailed() throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, true, 3, 209715200); + for (int i = 1; i < 8; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); + FileStatus[] fsStatuses = metaClient.getFs().globStatus( + new Path(metaClient.getArchivePath() + "/.commits_.archive*")); + List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + + archiveLog.reOpenWriter(); + + archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), archiveLog.getMergeArchivePlanName()), ".commits_.archive.3_1-0-1"); + archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); + archiveLog.reOpenWriter(); + + metaClient.getFs().delete(fsStatuses[0].getPath()); + + Review comment: Changed. ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java ########## @@ -174,6 +205,82 @@ public void testArchiveTableWithArchival(boolean enableMetadata) throws Exceptio } } + @Test + public void testArchiveTableWithArchivalSmallFileMergeEnableRecoverFromDeleteFailed() throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, true, 3, 209715200); + for (int i = 1; i < 8; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); + FileStatus[] fsStatuses = metaClient.getFs().globStatus( + new Path(metaClient.getArchivePath() + "/.commits_.archive*")); + List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + + archiveLog.reOpenWriter(); + + archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), archiveLog.getMergeArchivePlanName()), ".commits_.archive.3_1-0-1"); + archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList())); + archiveLog.reOpenWriter(); + + metaClient.getFs().delete(fsStatuses[0].getPath()); + + + HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false); + HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload(); + assertEquals(7 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants()); + + + for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false); + HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload(); + + assertEquals(16 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants()); + } + + @Test + public void testArchiveTableWithArchivalSmallFileMergeEnableRecoverFromMergeFailed() throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, true, 3, 209715200); + for (int i = 1; i < 8; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + archiveAndGetCommitsList(writeConfig); + } + + HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table); + FileStatus[] fsStatuses = metaClient.getFs().globStatus( + new Path(metaClient.getArchivePath() + "/.commits_.archive*")); + List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList()); + archiveLog.reOpenWriter(); + + Review comment: Changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org