This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 59cfa4b7e4404456c26c9c8df17182ed6e68ccd1 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Wed Jan 12 09:03:27 2022 -0800 [HUDI-3007] Fix issues in HoodieRepairTool (#4564) --- .../org/apache/hudi/table/repair/RepairUtils.java | 6 +- .../org/apache/hudi/HoodieTestCommitGenerator.java | 183 +++++++++ .../apache/hudi/table/repair/TestRepairUtils.java | 176 +++++++++ .../java/org/apache/hudi/common/fs/FSUtils.java | 47 +++ .../org/apache/hudi/common/fs/TestFSUtils.java | 20 +- .../apache/hudi/utilities/HoodieRepairTool.java | 235 ++++++------ .../hudi/utilities/TestHoodieRepairTool.java | 409 +++++++++++++++++++++ 7 files changed, 957 insertions(+), 119 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java index 156da66..5aa03a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/repair/RepairUtils.java @@ -56,17 +56,15 @@ public final class RepairUtils { * Tags the instant time of each base or log file from the input file paths. * * @param basePath Base path of the table. - * @param baseFileExtension Base file extension, e.g., ".parquet". * @param allPaths A {@link List} of file paths to tag. * @return A {@link Map} of instant time in {@link String} to a {@link List} of relative file paths. */ public static Map<String, List<String>> tagInstantsOfBaseAndLogFiles( - String basePath, String baseFileExtension, List<Path> allPaths) { + String basePath, List<Path> allPaths) { // Instant time -> Set of base and log file paths Map<String, List<String>> instantToFilesMap = new HashMap<>(); allPaths.forEach(path -> { - String instantTime = path.toString().endsWith(baseFileExtension) - ? FSUtils.getCommitTime(path.getName()) : FSUtils.getBaseCommitTimeFromLogPath(path); + String instantTime = FSUtils.getCommitTime(path.getName()); instantToFilesMap.computeIfAbsent(instantTime, k -> new ArrayList<>()); instantToFilesMap.get(instantTime).add( FSUtils.getRelativePartitionPath(new Path(basePath), path)); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java new file mode 100644 index 0000000..0c4a971 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/HoodieTestCommitGenerator.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.collection.ImmutablePair; +import org.apache.hudi.common.util.collection.Pair; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.apache.hudi.common.table.log.HoodieLogFormat.DEFAULT_WRITE_TOKEN; + +public class HoodieTestCommitGenerator { + public static final String BASE_FILE_WRITE_TOKEN = "1-0-1"; + public static final String LOG_FILE_WRITE_TOKEN = DEFAULT_WRITE_TOKEN; + private static final Logger LOG = LogManager.getLogger(HoodieTestCommitGenerator.class); + + public static void initCommitInfoForRepairTests( + Map<String, List<Pair<String, String>>> baseFileInfo, + Map<String, List<Pair<String, String>>> logFileInfo) { + baseFileInfo.clear(); + logFileInfo.clear(); + baseFileInfo.put("000", CollectionUtils.createImmutableList( + new ImmutablePair<>("2022/01/01", UUID.randomUUID().toString()), + new ImmutablePair<>("2022/01/02", UUID.randomUUID().toString()), + new ImmutablePair<>("2022/01/03", UUID.randomUUID().toString()) + )); + baseFileInfo.put("001", CollectionUtils.createImmutableList( + new ImmutablePair<>("2022/01/04", UUID.randomUUID().toString()), + new ImmutablePair<>("2022/01/05", UUID.randomUUID().toString()) + )); + baseFileInfo.put("002", CollectionUtils.createImmutableList( + new ImmutablePair<>("2022/01/06", UUID.randomUUID().toString()) + )); + logFileInfo.put("001", CollectionUtils.createImmutableList( + new ImmutablePair<>("2022/01/03", UUID.randomUUID().toString()), + new ImmutablePair<>("2022/01/06", UUID.randomUUID().toString()) + )); + } + + public static void setupTimelineInFS( + String basePath, + Map<String, List<Pair<String, String>>> baseFileInfo, + Map<String, List<Pair<String, String>>> logFileInfo, + Map<String, Map<String, List<Pair<String, String>>>> instantInfoMap) throws IOException { + instantInfoMap.clear(); + for (String instantTime : baseFileInfo.keySet()) { + Map<String, List<Pair<String, String>>> partitionPathToFileIdAndNameMap = new HashMap<>(); + baseFileInfo.getOrDefault(instantTime, new ArrayList<>()) + .forEach(e -> { + List<Pair<String, String>> fileInfoList = partitionPathToFileIdAndNameMap + .computeIfAbsent(e.getKey(), k -> new ArrayList<>()); + String fileId = e.getValue(); + fileInfoList.add(new ImmutablePair<>(fileId, getBaseFilename(instantTime, fileId))); + }); + logFileInfo.getOrDefault(instantTime, new ArrayList<>()) + .forEach(e -> { + List<Pair<String, String>> fileInfoList = partitionPathToFileIdAndNameMap + .computeIfAbsent(e.getKey(), k -> new ArrayList<>()); + String fileId = e.getValue(); + fileInfoList.add(new ImmutablePair<>(fileId, getLogFilename(instantTime, fileId))); + }); + createCommitAndDataFiles(basePath, instantTime, partitionPathToFileIdAndNameMap); + instantInfoMap.put(instantTime, partitionPathToFileIdAndNameMap); + } + } + + public static String getBaseFilename(String instantTime, String fileId) { + return FSUtils.makeDataFileName(instantTime, BASE_FILE_WRITE_TOKEN, fileId); + } + + public static String getLogFilename(String instantTime, String fileId) { + return FSUtils.makeLogFileName( + fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), instantTime, 1, LOG_FILE_WRITE_TOKEN); + } + + public static void createCommitAndDataFiles( + String basePath, String instantTime, + Map<String, List<Pair<String, String>>> partitionPathToFileIdAndNameMap) throws IOException { + String commitFilename = HoodieTimeline.makeCommitFileName(instantTime); + HoodieCommitMetadata commitMetadata = + generateCommitMetadata(partitionPathToFileIdAndNameMap, Collections.emptyMap()); + String content = commitMetadata.toJsonString(); + createCommitFileWithMetadata(basePath, new Configuration(), commitFilename, content); + for (String partitionPath : partitionPathToFileIdAndNameMap.keySet()) { + partitionPathToFileIdAndNameMap.get(partitionPath) + .forEach(fileInfo -> { + String filename = fileInfo.getValue(); + try { + createDataFile(basePath, new Configuration(), partitionPath, filename); + } catch (IOException e) { + LOG.error(String.format("Failed to create data file: %s/%s/%s", + basePath, partitionPath, filename)); + } + }); + } + } + + public static HoodieCommitMetadata generateCommitMetadata( + Map<String, List<Pair<String, String>>> partitionPathToFileIdAndNameMap, + Map<String, String> extraMetadata) { + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + for (Map.Entry<String, String> entry : extraMetadata.entrySet()) { + metadata.addMetadata(entry.getKey(), entry.getValue()); + } + partitionPathToFileIdAndNameMap.forEach((partitionPath, fileInfoList) -> + fileInfoList.forEach(fileInfo -> { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(partitionPath); + writeStat.setPath(new Path(partitionPath, fileInfo.getValue()).toString()); + writeStat.setFileId(fileInfo.getKey()); + // Below are dummy values + writeStat.setTotalWriteBytes(10000); + writeStat.setPrevCommit("000"); + writeStat.setNumWrites(10); + writeStat.setNumUpdateWrites(15); + writeStat.setTotalLogBlocks(2); + writeStat.setTotalLogRecords(100); + metadata.addWriteStat(partitionPath, writeStat); + })); + return metadata; + } + + public static void createCommitFileWithMetadata( + String basePath, Configuration configuration, + String filename, String content) throws IOException { + Path commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + filename); + try (FSDataOutputStream os = FSUtils.getFs(basePath, configuration).create(commitFilePath, true)) { + os.writeBytes(new String(content.getBytes(StandardCharsets.UTF_8))); + } + } + + public static void createDataFile( + String basePath, Configuration configuration, + String partitionPath, String filename) throws IOException { + FileSystem fs = FSUtils.getFs(basePath, configuration); + Path filePath = new Path(new Path(basePath, partitionPath), filename); + Path parent = filePath.getParent(); + if (!fs.exists(parent)) { + fs.mkdirs(parent); + } + if (!fs.exists(filePath)) { + fs.create(filePath); + } + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java new file mode 100644 index 0000000..4f8fb1d --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/repair/TestRepairUtils.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.repair; + +import org.apache.hudi.HoodieTestCommitGenerator; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; + +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename; +import static org.apache.hudi.HoodieTestCommitGenerator.getLogFilename; +import static org.apache.hudi.HoodieTestCommitGenerator.initCommitInfoForRepairTests; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestRepairUtils { + // Instant time -> List<Pair<relativePartitionPath, fileId>> + private static final Map<String, List<Pair<String, String>>> BASE_FILE_INFO = new HashMap<>(); + private static final Map<String, List<Pair<String, String>>> LOG_FILE_INFO = new HashMap<>(); + // instant time -> partitionPathToFileIdAndPathMap + private final Map<String, Map<String, List<Pair<String, String>>>> instantInfoMap = new HashMap<>(); + @TempDir + public static java.nio.file.Path tempDir; + private static String basePath; + private static HoodieTableMetaClient metaClient; + + @BeforeAll + static void initFileInfo() throws IOException { + initCommitInfoForRepairTests(BASE_FILE_INFO, LOG_FILE_INFO); + metaClient = + HoodieTestUtils.init(tempDir.toAbsolutePath().toString(), HoodieTableType.COPY_ON_WRITE); + basePath = metaClient.getBasePath(); + } + + public void setupTimelineInFS() throws IOException { + HoodieTestCommitGenerator.setupTimelineInFS( + basePath, BASE_FILE_INFO, LOG_FILE_INFO, instantInfoMap); + } + + @Test + public void testTagInstantsOfBaseAndLogFiles() { + Map<String, List<String>> expectedResult = new HashMap<>(); + List<Path> inputPathList = new ArrayList<>(); + + for (Map.Entry<String, List<Pair<String, String>>> entry : BASE_FILE_INFO.entrySet()) { + String instantTime = entry.getKey(); + List<String> fileNameList = entry.getValue().stream() + .map(e -> { + String partitionPath = e.getKey(); + String fileId = e.getValue(); + return new Path( + new Path(partitionPath), getBaseFilename(instantTime, fileId)).toString(); + }) + .collect(Collectors.toList()); + List<String> expectedList = expectedResult.computeIfAbsent( + instantTime, k -> new ArrayList<>()); + expectedList.addAll(fileNameList); + inputPathList.addAll(fileNameList.stream() + .map(path -> new Path(basePath, path)).collect(Collectors.toList())); + } + + for (Map.Entry<String, List<Pair<String, String>>> entry : LOG_FILE_INFO.entrySet()) { + String instantTime = entry.getKey(); + List<String> fileNameList = entry.getValue().stream() + .map(e -> { + String partitionPath = e.getKey(); + String fileId = e.getValue(); + return new Path( + new Path(partitionPath), getLogFilename(instantTime, fileId)).toString(); + }) + .collect(Collectors.toList()); + List<String> expectedList = expectedResult.computeIfAbsent( + instantTime, k -> new ArrayList<>()); + expectedList.addAll(fileNameList); + inputPathList.addAll(fileNameList.stream() + .map(path -> new Path(basePath, path)).collect(Collectors.toList())); + } + + assertEquals(expectedResult, + RepairUtils.tagInstantsOfBaseAndLogFiles(basePath, inputPathList)); + } + + @Test + public void testGetBaseAndLogFilePathsFromTimeline() throws IOException { + setupTimelineInFS(); + HoodieTimeline timeline = metaClient.getActiveTimeline(); + HoodieInstant commitInstant = new HoodieInstant( + HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001"); + HoodieInstant inflightInstant = new HoodieInstant( + HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "005"); + HoodieInstant compactionInstant = new HoodieInstant( + HoodieInstant.State.COMPLETED, HoodieTimeline.COMPACTION_ACTION, "006"); + + Map<String, List<Pair<String, String>>> partitionToFileIdAndNameMap = + instantInfoMap.get(commitInstant.getTimestamp()); + Set<String> expectedPaths = partitionToFileIdAndNameMap.entrySet().stream() + .flatMap(entry -> + entry.getValue().stream() + .map(fileInfo -> new Path(entry.getKey(), fileInfo.getValue()).toString()) + .collect(Collectors.toList()) + .stream() + ).collect(Collectors.toSet()); + assertEquals(Option.of(expectedPaths), + RepairUtils.getBaseAndLogFilePathsFromTimeline(timeline, commitInstant)); + assertThrows(HoodieException.class, + () -> RepairUtils.getBaseAndLogFilePathsFromTimeline(timeline, inflightInstant)); + assertEquals(Option.empty(), + RepairUtils.getBaseAndLogFilePathsFromTimeline(timeline, compactionInstant)); + } + + @Test + public void testFindInstantFilesToRemove() throws IOException { + setupTimelineInFS(); + HoodieInstant existingInstant = new HoodieInstant( + HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001"); + Map<String, List<Pair<String, String>>> partitionToFileIdAndNameMap = + instantInfoMap.get(existingInstant.getTimestamp()); + List<String> fileListFromFs = partitionToFileIdAndNameMap.entrySet().stream() + .flatMap(entry -> + entry.getValue().stream() + .map(fileInfo -> new Path(entry.getKey(), fileInfo.getValue()).toString()) + .collect(Collectors.toList()) + .stream() + ).collect(Collectors.toList()); + String danglingFilePath = new Path("2022/01/02", + getBaseFilename(existingInstant.getTimestamp(), UUID.randomUUID().toString())).toString(); + fileListFromFs.add(danglingFilePath); + // Existing instant + assertEquals(CollectionUtils.createImmutableList(danglingFilePath), + RepairUtils.findInstantFilesToRemove( + existingInstant.getTimestamp(), fileListFromFs, + metaClient.getActiveTimeline(), metaClient.getArchivedTimeline())); + // Non-existing instant + assertEquals(fileListFromFs, + RepairUtils.findInstantFilesToRemove( + "004", fileListFromFs, + metaClient.getActiveTimeline(), metaClient.getArchivedTimeline())); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 74b673d..ceec282 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -721,6 +721,53 @@ public class FSUtils { } } + /** + * Lists file status at a certain level in the directory hierarchy. + * <p> + * E.g., given "/tmp/hoodie_table" as the rootPath, and 3 as the expected level, + * this method gives back the {@link FileStatus} of all files under + * "/tmp/hoodie_table/[*]/[*]/[*]/" folders. + * + * @param hoodieEngineContext {@link HoodieEngineContext} instance. + * @param fs {@link FileSystem} instance. + * @param rootPath Root path for the file listing. + * @param expectLevel Expected level of directory hierarchy for files to be added. + * @param parallelism Parallelism for the file listing. + * @return A list of file status of files at the level. + */ + + public static List<FileStatus> getFileStatusAtLevel( + HoodieEngineContext hoodieEngineContext, FileSystem fs, Path rootPath, + int expectLevel, int parallelism) { + List<String> levelPaths = new ArrayList<>(); + List<FileStatus> result = new ArrayList<>(); + levelPaths.add(rootPath.toString()); + + for (int i = 0; i <= expectLevel; i++) { + result = FSUtils.parallelizeFilesProcess(hoodieEngineContext, fs, parallelism, + pairOfSubPathAndConf -> { + Path path = new Path(pairOfSubPathAndConf.getKey()); + try { + FileSystem fileSystem = path.getFileSystem(pairOfSubPathAndConf.getValue().get()); + return Arrays.stream(fileSystem.listStatus(path)) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new HoodieIOException("Failed to list " + path, e); + } + }, + levelPaths) + .values().stream() + .flatMap(list -> list.stream()).collect(Collectors.toList()); + if (i < expectLevel) { + levelPaths = result.stream() + .filter(FileStatus::isDirectory) + .map(fileStatus -> fileStatus.getPath().toString()) + .collect(Collectors.toList()); + } + } + return result; + } + public interface SerializableFunction<T, R> extends Function<T, R>, Serializable { } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index ec4c3b2..02258e7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -67,7 +68,7 @@ public class TestFSUtils extends HoodieCommonTestHarness { private final long minRollbackToKeep = 10; private final long minCleanToKeep = 10; - private static String TEST_WRITE_TOKEN = "1-0-1"; + private static final String TEST_WRITE_TOKEN = "1-0-1"; private static final String BASE_FILE_EXTENSION = HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension(); @Rule @@ -456,4 +457,21 @@ public class TestFSUtils extends HoodieCommonTestHarness { } } } + + @Test + public void testGetFileStatusAtLevel() throws IOException { + String rootDir = basePath + "/.hoodie/.temp"; + FileSystem fileSystem = metaClient.getFs(); + prepareTestDirectory(fileSystem, rootDir); + List<FileStatus> fileStatusList = FSUtils.getFileStatusAtLevel( + new HoodieLocalEngineContext(fileSystem.getConf()), fileSystem, + new Path(basePath), 3, 2); + assertEquals(CollectionUtils.createImmutableList( + "file:" + basePath + "/.hoodie/.temp/subdir1/file1.txt", + "file:" + basePath + "/.hoodie/.temp/subdir2/file2.txt"), + fileStatusList.stream() + .map(fileStatus -> fileStatus.getPath().toString()) + .filter(filePath -> filePath.endsWith(".txt")) + .collect(Collectors.toList())); + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java index d7fa708..d6b74c8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java @@ -22,21 +22,22 @@ package org.apache.hudi.utilities; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.metadata.FileSystemBackedTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.table.repair.RepairUtils; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -48,14 +49,10 @@ import java.io.Serializable; import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; -import scala.Tuple2; - /** * A tool with spark-submit to repair Hudi table by finding and deleting dangling * base and log files. @@ -153,15 +150,15 @@ public class HoodieRepairTool { // Properties with source, hoodie client, key generator etc. private TypedProperties props; // Spark context - private final JavaSparkContext jsc; + private final HoodieEngineContext context; private final HoodieTableMetaClient metaClient; - private final FileSystemBackedTableMetadata tableMetadata; + private final HoodieTableMetadata tableMetadata; public HoodieRepairTool(JavaSparkContext jsc, Config cfg) { if (cfg.propsFilePath != null) { cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString(); } - this.jsc = jsc; + this.context = new HoodieSparkEngineContext(jsc); this.cfg = cfg; this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) @@ -170,13 +167,12 @@ public class HoodieRepairTool { .setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath) .setLoadActiveTimelineOnLoad(true) .build(); + this.tableMetadata = new FileSystemBackedTableMetadata( - new HoodieSparkEngineContext(jsc), - new SerializableConfiguration(jsc.hadoopConfiguration()), - cfg.basePath, cfg.assumeDatePartitioning); + context, context.getHadoopConf(), cfg.basePath, cfg.assumeDatePartitioning); } - public void run() { + public boolean run() { Option<String> startingInstantOption = Option.ofNullable(cfg.startingInstantTime); Option<String> endingInstantOption = Option.ofNullable(cfg.endingInstantTime); @@ -201,24 +197,22 @@ public class HoodieRepairTool { + "not belonging to any commit are going to be DELETED from the table ******"); if (checkBackupPathForRepair() < 0) { LOG.error("Backup path check failed."); - break; + return false; } - doRepair(startingInstantOption, endingInstantOption, false); - break; + return doRepair(startingInstantOption, endingInstantOption, false); case DRY_RUN: LOG.info(" ****** The repair tool is in DRY_RUN mode, " + "only LOOKING FOR dangling data and log files from the table ******"); - doRepair(startingInstantOption, endingInstantOption, true); - break; + return doRepair(startingInstantOption, endingInstantOption, true); case UNDO: if (checkBackupPathAgainstBasePath() < 0) { LOG.error("Backup path check failed."); - break; + return false; } - undoRepair(); - break; + return undoRepair(); default: LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly"); + return false; } } catch (IOException e) { throw new HoodieIOException("Unable to repair table in " + cfg.basePath, e); @@ -246,69 +240,98 @@ public class HoodieRepairTool { * Copies the list of files from source base path to destination base path. * The destination file path (base + relative) should not already exist. * - * @param jsc {@link JavaSparkContext} instance. + * @param context {@link HoodieEngineContext} instance. * @param relativeFilePaths A {@link List} of relative file paths for copying. * @param sourceBasePath Source base path. * @param destBasePath Destination base path. - * @param parallelism Parallelism. * @return {@code true} if all successful; {@code false} otherwise. */ static boolean copyFiles( - JavaSparkContext jsc, List<String> relativeFilePaths, String sourceBasePath, - String destBasePath, int parallelism) { - SerializableConfiguration conf = new SerializableConfiguration(jsc.hadoopConfiguration()); - List<Boolean> allResults = jsc.parallelize(relativeFilePaths, parallelism) + HoodieEngineContext context, List<String> relativeFilePaths, String sourceBasePath, + String destBasePath) { + SerializableConfiguration conf = context.getHadoopConf(); + List<Boolean> allResults = context.parallelize(relativeFilePaths) .mapPartitions(iterator -> { List<Boolean> results = new ArrayList<>(); FileSystem fs = FSUtils.getFs(destBasePath, conf.get()); iterator.forEachRemaining(filePath -> { boolean success = false; + Path sourcePath = new Path(sourceBasePath, filePath); Path destPath = new Path(destBasePath, filePath); try { if (!fs.exists(destPath)) { - FileIOUtils.copy(fs, new Path(sourceBasePath, filePath), destPath); + FileIOUtils.copy(fs, sourcePath, destPath); success = true; } } catch (IOException e) { // Copy Fail + LOG.error(String.format("Copying file fails: source [%s], destination [%s]", + sourcePath, destPath)); } finally { results.add(success); } }); return results.iterator(); - }) - .collect(); + }, true) + .collectAsList(); return allResults.stream().reduce((r1, r2) -> r1 && r2).orElse(false); } /** * Lists all Hoodie files from the table base path. * - * @param basePathStr Table base path. - * @param conf {@link Configuration} instance. - * @return An array of {@link FileStatus} of all Hoodie files. + * @param context {@link HoodieEngineContext} instance. + * @param basePathStr Table base path. + * @param expectedLevel Expected level in the directory hierarchy to include the file status. + * @param parallelism Parallelism for the file listing. + * @return A list of absolute file paths of all Hoodie files. * @throws IOException upon errors. */ - static FileStatus[] listFilesFromBasePath(String basePathStr, Configuration conf) throws IOException { - final Set<String> validFileExtensions = Arrays.stream(HoodieFileFormat.values()) - .map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new)); - final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension(); - FileSystem fs = FSUtils.getFs(basePathStr, conf); + static List<String> listFilesFromBasePath( + HoodieEngineContext context, String basePathStr, int expectedLevel, int parallelism) { + FileSystem fs = FSUtils.getFs(basePathStr, context.getHadoopConf().get()); Path basePath = new Path(basePathStr); + return FSUtils.getFileStatusAtLevel( + context, fs, basePath, expectedLevel, parallelism).stream() + .filter(fileStatus -> { + if (!fileStatus.isFile()) { + return false; + } + return FSUtils.isDataFile(fileStatus.getPath()); + }) + .map(fileStatus -> fileStatus.getPath().toString()) + .collect(Collectors.toList()); + } - try { - return Arrays.stream(fs.listStatus(basePath, path -> { - String extension = FSUtils.getFileExtension(path.getName()); - return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension); - })).filter(FileStatus::isFile).toArray(FileStatus[]::new); - } catch (IOException e) { - // return empty FileStatus if partition does not exist already - if (!fs.exists(basePath)) { - return new FileStatus[0]; - } else { - throw e; - } - } + /** + * Deletes files from table base path. + * + * @param context {@link HoodieEngineContext} instance. + * @param basePath Base path of the table. + * @param relativeFilePaths A {@link List} of relative file paths for deleting. + */ + static boolean deleteFiles( + HoodieEngineContext context, String basePath, List<String> relativeFilePaths) { + SerializableConfiguration conf = context.getHadoopConf(); + return context.parallelize(relativeFilePaths) + .mapPartitions(iterator -> { + FileSystem fs = FSUtils.getFs(basePath, conf.get()); + List<Boolean> results = new ArrayList<>(); + iterator.forEachRemaining(relativeFilePath -> { + boolean success = false; + try { + success = fs.delete(new Path(basePath, relativeFilePath), false); + } catch (IOException e) { + LOG.warn("Failed to delete file " + relativeFilePath); + } finally { + results.add(success); + } + }); + return results.iterator(); + }, true) + .collectAsList() + .stream().reduce((a, b) -> a && b) + .orElse(true); } /** @@ -319,15 +342,14 @@ public class HoodieRepairTool { * @param isDryRun Is dry run. * @throws IOException upon errors. */ - void doRepair( + boolean doRepair( Option<String> startingInstantOption, Option<String> endingInstantOption, boolean isDryRun) throws IOException { // Scans all partitions to find base and log files in the base path List<Path> allFilesInPartitions = getBaseAndLogFilePathsFromFileSystem(); // Buckets the files based on instant time // instant time -> relative paths of base and log files to base path Map<String, List<String>> instantToFilesMap = RepairUtils.tagInstantsOfBaseAndLogFiles( - metaClient.getBasePath(), - metaClient.getTableConfig().getBaseFileFormat().getFileExtension(), allFilesInPartitions); + metaClient.getBasePath(), allFilesInPartitions); List<String> instantTimesToRepair = instantToFilesMap.keySet().stream() .filter(instant -> (!startingInstantOption.isPresent() || instant.compareTo(startingInstantOption.get()) >= 0) @@ -340,30 +362,30 @@ public class HoodieRepairTool { // This assumes that the archived timeline only has completed instants so this is safe archivedTimeline.loadCompletedInstantDetailsInMemory(); - int parallelism = Math.max(Math.min(instantTimesToRepair.size(), cfg.parallelism), 1); - List<Tuple2<String, List<String>>> instantFilesToRemove = - jsc.parallelize(instantTimesToRepair, parallelism) - .mapToPair(instantToRepair -> - new Tuple2<>(instantToRepair, RepairUtils.findInstantFilesToRemove(instantToRepair, + List<ImmutablePair<String, List<String>>> instantFilesToRemove = + context.parallelize(instantTimesToRepair) + .map(instantToRepair -> + new ImmutablePair<>(instantToRepair, RepairUtils.findInstantFilesToRemove(instantToRepair, instantToFilesMap.get(instantToRepair), activeTimeline, archivedTimeline))) - .collect(); + .collectAsList(); - List<Tuple2<String, List<String>>> instantsWithDanglingFiles = - instantFilesToRemove.stream().filter(e -> !e._2.isEmpty()).collect(Collectors.toList()); + List<ImmutablePair<String, List<String>>> instantsWithDanglingFiles = + instantFilesToRemove.stream().filter(e -> !e.getValue().isEmpty()).collect(Collectors.toList()); printRepairInfo(instantTimesToRepair, instantsWithDanglingFiles); if (!isDryRun) { - List<String> relativeFilePathsToDelete = - instantsWithDanglingFiles.stream().flatMap(e -> e._2.stream()).collect(Collectors.toList()); + List<String> relativeFilePathsToDelete = instantsWithDanglingFiles.stream() + .flatMap(e -> e.getValue().stream()) + .collect(Collectors.toList()); if (relativeFilePathsToDelete.size() > 0) { - parallelism = Math.max(Math.min(relativeFilePathsToDelete.size(), cfg.parallelism), 1); - if (!backupFiles(relativeFilePathsToDelete, parallelism)) { + if (!backupFiles(relativeFilePathsToDelete)) { LOG.error("Error backing up dangling files. Exiting..."); - return; + return false; } - deleteFiles(relativeFilePathsToDelete, parallelism); + return deleteFiles(context, cfg.basePath, relativeFilePathsToDelete); } LOG.info(String.format("Table repair on %s is successful", cfg.basePath)); } + return true; } /** @@ -387,22 +409,38 @@ public class HoodieRepairTool { * * @throws IOException upon errors. */ - void undoRepair() throws IOException { + boolean undoRepair() throws IOException { FileSystem fs = metaClient.getFs(); String backupPathStr = cfg.backupPath; Path backupPath = new Path(backupPathStr); if (!fs.exists(backupPath)) { LOG.error("Cannot find backup path: " + backupPath); - return; + return false; } - List<String> relativeFilePaths = Arrays.stream( - listFilesFromBasePath(backupPathStr, jsc.hadoopConfiguration())) - .map(fileStatus -> - FSUtils.getPartitionPath(backupPathStr, fileStatus.getPath().toString()).toString()) + List<String> allPartitionPaths = tableMetadata.getAllPartitionPaths(); + + if (allPartitionPaths.isEmpty()) { + LOG.error("Cannot get one partition path since there is no partition available"); + return false; + } + + int partitionLevels = getExpectedLevelBasedOnPartitionPath(allPartitionPaths.get(0)); + + List<String> relativeFilePaths = listFilesFromBasePath( + context, backupPathStr, partitionLevels, cfg.parallelism).stream() + .map(filePath -> + FSUtils.getRelativePartitionPath(new Path(backupPathStr), new Path(filePath))) .collect(Collectors.toList()); - int parallelism = Math.max(Math.min(relativeFilePaths.size(), cfg.parallelism), 1); - restoreFiles(relativeFilePaths, parallelism); + return restoreFiles(relativeFilePaths); + } + + int getExpectedLevelBasedOnPartitionPath(String partitionPath) { + if (StringUtils.isNullOrEmpty(partitionPath)) { + return 0; + } + String[] partitionParts = partitionPath.split("/"); + return partitionParts.length; } /** @@ -455,48 +493,20 @@ public class HoodieRepairTool { * Backs up dangling files from table base path to backup path. * * @param relativeFilePaths A {@link List} of relative file paths for backup. - * @param parallelism Parallelism for copying. * @return {@code true} if all successful; {@code false} otherwise. */ - boolean backupFiles(List<String> relativeFilePaths, int parallelism) { - return copyFiles(jsc, relativeFilePaths, cfg.basePath, cfg.backupPath, parallelism); + boolean backupFiles(List<String> relativeFilePaths) { + return copyFiles(context, relativeFilePaths, cfg.basePath, cfg.backupPath); } /** * Restores dangling files from backup path to table base path. * * @param relativeFilePaths A {@link List} of relative file paths for restoring. - * @param parallelism Parallelism for copying. * @return {@code true} if all successful; {@code false} otherwise. */ - boolean restoreFiles(List<String> relativeFilePaths, int parallelism) { - return copyFiles(jsc, relativeFilePaths, cfg.backupPath, cfg.basePath, parallelism); - } - - /** - * Deletes files from table base path. - * - * @param relativeFilePaths A {@link List} of relative file paths for deleting. - * @param parallelism Parallelism for deleting. - */ - void deleteFiles(List<String> relativeFilePaths, int parallelism) { - jsc.parallelize(relativeFilePaths, parallelism) - .mapPartitions(iterator -> { - FileSystem fs = metaClient.getFs(); - List<Boolean> results = new ArrayList<>(); - iterator.forEachRemaining(filePath -> { - boolean success = false; - try { - success = fs.delete(new Path(filePath), false); - } catch (IOException e) { - LOG.warn("Failed to delete file " + filePath); - } finally { - results.add(success); - } - }); - return results.iterator(); - }) - .collect(); + boolean restoreFiles(List<String> relativeFilePaths) { + return copyFiles(context, relativeFilePaths, cfg.backupPath, cfg.basePath); } /** @@ -506,17 +516,14 @@ public class HoodieRepairTool { * @param instantsWithDanglingFiles A list of instants with dangling files. */ private void printRepairInfo( - List<String> instantTimesToRepair, List<Tuple2<String, List<String>>> instantsWithDanglingFiles) { + List<String> instantTimesToRepair, List<ImmutablePair<String, List<String>>> instantsWithDanglingFiles) { int numInstantsToRepair = instantsWithDanglingFiles.size(); LOG.warn("Number of instants verified based on the base and log files: " + instantTimesToRepair.size()); LOG.warn("Instant timestamps: " + instantTimesToRepair); LOG.warn("Number of instants to repair: " + numInstantsToRepair); if (numInstantsToRepair > 0) { - instantsWithDanglingFiles.forEach(e -> { - LOG.warn(" -> Instant " + numInstantsToRepair); - LOG.warn(" ** Removing files: " + e._2); - }); + instantsWithDanglingFiles.forEach(e -> LOG.warn(" ** Removing files: " + e.getValue())); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java new file mode 100644 index 0000000..8d3917f --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities; + +import org.apache.hudi.HoodieTestCommitGenerator; +import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.testutils.providers.SparkProvider; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.provider.Arguments; + +import java.io.IOException; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename; +import static org.apache.hudi.HoodieTestCommitGenerator.getLogFilename; +import static org.apache.hudi.HoodieTestCommitGenerator.initCommitInfoForRepairTests; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHoodieRepairTool extends HoodieCommonTestHarness implements SparkProvider { + private static final Logger LOG = LogManager.getLogger(TestHoodieRepairTool.class); + // Instant time -> List<Pair<relativePartitionPath, fileId>> + private static final Map<String, List<Pair<String, String>>> BASE_FILE_INFO = new HashMap<>(); + private static final Map<String, List<Pair<String, String>>> LOG_FILE_INFO = new HashMap<>(); + // Relative paths to base path for dangling files + private static final List<String> DANGLING_DATA_FILE_LIST = new ArrayList<>(); + private static transient SparkSession spark; + private static transient SQLContext sqlContext; + private static transient JavaSparkContext jsc; + private static transient HoodieSparkEngineContext context; + // instant time -> partitionPathToFileIdAndNameMap + private final Map<String, Map<String, List<Pair<String, String>>>> instantInfoMap = new HashMap<>(); + private final List<String> allFileAbsolutePathList = new ArrayList<>(); + private java.nio.file.Path backupTempDir; + + @BeforeAll + static void initFileInfo() { + initCommitInfoForRepairTests(BASE_FILE_INFO, LOG_FILE_INFO); + initDanglingDataFileList(); + } + + @BeforeEach + public void initWithCleanState() throws IOException { + boolean initialized = spark != null; + if (!initialized) { + SparkConf sparkConf = conf(); + SparkRDDWriteClient.registerClasses(sparkConf); + HoodieReadClient.addHoodieSupport(sparkConf); + spark = SparkSession.builder().config(sparkConf).getOrCreate(); + sqlContext = spark.sqlContext(); + jsc = new JavaSparkContext(spark.sparkContext()); + context = new HoodieSparkEngineContext(jsc); + } + initPath(); + metaClient = HoodieTestUtils.init(basePath, getTableType()); + backupTempDir = tempDir.resolve("backup"); + cleanUpDanglingDataFilesInFS(); + cleanUpBackupTempDir(); + HoodieTestCommitGenerator.setupTimelineInFS( + basePath, BASE_FILE_INFO, LOG_FILE_INFO, instantInfoMap); + allFileAbsolutePathList.clear(); + allFileAbsolutePathList.addAll(instantInfoMap.entrySet().stream() + .flatMap(e -> e.getValue().entrySet().stream() + .flatMap(partition -> partition.getValue().stream() + .map(fileInfo -> new Path( + new Path(basePath, partition.getKey()), fileInfo.getValue()).toString()) + .collect(Collectors.toList()) + .stream()) + .collect(Collectors.toList()) + .stream() + ) + .collect(Collectors.toList())); + } + + @AfterEach + public void cleanUp() throws IOException { + cleanUpDanglingDataFilesInFS(); + cleanUpBackupTempDir(); + } + + @AfterAll + public static synchronized void resetSpark() { + if (spark != null) { + spark.close(); + spark = null; + } + } + + private void cleanUpDanglingDataFilesInFS() { + FileSystem fs = metaClient.getFs(); + DANGLING_DATA_FILE_LIST.forEach( + relativeFilePath -> { + Path path = new Path(basePath, relativeFilePath); + try { + if (fs.exists(path)) { + fs.delete(path, false); + } + } catch (IOException e) { + throw new HoodieIOException("Unable to delete file: " + path); + } + } + ); + } + + private void cleanUpBackupTempDir() throws IOException { + FileSystem fs = metaClient.getFs(); + fs.delete(new Path(backupTempDir.toAbsolutePath().toString()), true); + } + + private static void initDanglingDataFileList() { + DANGLING_DATA_FILE_LIST.add( + new Path("2022/01/01", + getBaseFilename("000", UUID.randomUUID().toString())).toString()); + DANGLING_DATA_FILE_LIST.add( + new Path("2022/01/06", + getLogFilename("001", UUID.randomUUID().toString())).toString()); + } + + private Stream<Arguments> configPathParams() { + Object[][] data = new Object[][] { + {null, basePath, -1}, {basePath + "/backup", basePath, -1}, + {"/tmp/backup", basePath, 0} + }; + return Stream.of(data).map(Arguments::of); + } + + @Test + public void testCheckBackupPathAgainstBasePath() { + configPathParams().forEach(arguments -> { + Object[] args = arguments.get(); + String backupPath = (String) args[0]; + String basePath = (String) args[1]; + int expectedResult = (Integer) args[2]; + + HoodieRepairTool.Config config = new HoodieRepairTool.Config(); + config.backupPath = backupPath; + config.basePath = basePath; + HoodieRepairTool tool = new HoodieRepairTool(jsc, config); + assertEquals(expectedResult, tool.checkBackupPathAgainstBasePath()); + }); + } + + private Stream<Arguments> configPathParamsWithFS() throws IOException { + SecureRandom random = new SecureRandom(); + long randomLong = random.nextLong(); + String emptyBackupPath = "/tmp/empty_backup_" + randomLong; + FSUtils.createPathIfNotExists(metaClient.getFs(), new Path(emptyBackupPath)); + String nonEmptyBackupPath = "/tmp/nonempty_backup_" + randomLong; + FSUtils.createPathIfNotExists(metaClient.getFs(), new Path(nonEmptyBackupPath)); + FSUtils.createPathIfNotExists(metaClient.getFs(), new Path(nonEmptyBackupPath, ".hoodie")); + Object[][] data = new Object[][] { + {null, basePath, 0}, {"/tmp/backup", basePath, 0}, + {emptyBackupPath, basePath, 0}, {basePath + "/backup", basePath, -1}, + {nonEmptyBackupPath, basePath, -1}, + }; + return Stream.of(data).map(Arguments::of); + } + + @Test + public void testCheckBackupPathForRepair() throws IOException { + for (Arguments arguments: configPathParamsWithFS().collect(Collectors.toList())) { + Object[] args = arguments.get(); + String backupPath = (String) args[0]; + String basePath = (String) args[1]; + int expectedResult = (Integer) args[2]; + + HoodieRepairTool.Config config = new HoodieRepairTool.Config(); + config.backupPath = backupPath; + config.basePath = basePath; + HoodieRepairTool tool = new HoodieRepairTool(jsc, config); + assertEquals(expectedResult, tool.checkBackupPathForRepair()); + if (backupPath == null) { + // Backup path should be created if not provided + assertNotNull(config.backupPath); + } + } + } + + @Test + public void testRepairWithIntactInstants() throws IOException { + + testRepairToolWithMode( + Option.empty(), Option.empty(), HoodieRepairTool.Mode.REPAIR.toString(), + backupTempDir.toAbsolutePath().toString(), true, + allFileAbsolutePathList, Collections.emptyList()); + } + + @Test + public void testRepairWithBrokenInstants() throws IOException { + List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath); + String backupPath = backupTempDir.toAbsolutePath().toString(); + List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST.stream() + .map(filePath -> new Path(backupPath, filePath).toString()) + .collect(Collectors.toList()); + List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList); + existingFileList.addAll(backupDanglingFileList); + + testRepairToolWithMode( + Option.empty(), Option.empty(), HoodieRepairTool.Mode.REPAIR.toString(), + backupPath, true, + existingFileList, tableDanglingFileList); + } + + @Test + public void testRepairWithOneBrokenInstant() throws IOException { + List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath); + String backupPath = backupTempDir.toAbsolutePath().toString(); + List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST + .subList(1, 2).stream() + .map(filePath -> new Path(backupPath, filePath).toString()) + .collect(Collectors.toList()); + List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList); + existingFileList.addAll(backupDanglingFileList); + existingFileList.addAll(tableDanglingFileList.subList(0, 1)); + + testRepairToolWithMode( + Option.of("001"), Option.empty(), HoodieRepairTool.Mode.REPAIR.toString(), + backupPath, true, + existingFileList, tableDanglingFileList.subList(1, 2)); + } + + @Test + public void testDryRunWithBrokenInstants() throws IOException { + List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath); + String backupPath = backupTempDir.toAbsolutePath().toString(); + List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST.stream() + .map(filePath -> new Path(backupPath, filePath).toString()) + .collect(Collectors.toList()); + List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList); + existingFileList.addAll(tableDanglingFileList); + + testRepairToolWithMode( + Option.empty(), Option.empty(), HoodieRepairTool.Mode.DRY_RUN.toString(), + backupPath, true, + existingFileList, backupDanglingFileList); + } + + @Test + public void testDryRunWithOneBrokenInstant() throws IOException { + List<String> tableDanglingFileList = createDanglingDataFilesInFS(basePath); + String backupPath = backupTempDir.toAbsolutePath().toString(); + List<String> backupDanglingFileList = DANGLING_DATA_FILE_LIST.stream() + .map(filePath -> new Path(backupPath, filePath).toString()) + .collect(Collectors.toList()); + List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList); + existingFileList.addAll(tableDanglingFileList); + + testRepairToolWithMode( + Option.of("001"), Option.empty(), HoodieRepairTool.Mode.DRY_RUN.toString(), + backupPath, true, + existingFileList, backupDanglingFileList); + } + + @Test + public void testUndoWithNonExistentBackupPath() throws IOException { + String backupPath = backupTempDir.toAbsolutePath().toString(); + metaClient.getFs().delete(new Path(backupPath), true); + + testRepairToolWithMode( + Option.empty(), Option.empty(), HoodieRepairTool.Mode.UNDO.toString(), + backupPath, false, + allFileAbsolutePathList, Collections.emptyList()); + } + + @Test + public void testUndoWithExistingBackupPath() throws IOException { + String backupPath = backupTempDir.toAbsolutePath().toString(); + List<String> backupDanglingFileList = createDanglingDataFilesInFS(backupPath); + List<String> restoreDanglingFileList = DANGLING_DATA_FILE_LIST.stream() + .map(filePath -> new Path(basePath, filePath).toString()) + .collect(Collectors.toList()); + List<String> existingFileList = new ArrayList<>(allFileAbsolutePathList); + existingFileList.addAll(backupDanglingFileList); + existingFileList.addAll(restoreDanglingFileList); + + verifyFilesInFS(allFileAbsolutePathList, restoreDanglingFileList); + verifyFilesInFS(backupDanglingFileList, Collections.emptyList()); + testRepairToolWithMode( + Option.empty(), Option.empty(), HoodieRepairTool.Mode.UNDO.toString(), + backupPath, true, + existingFileList, Collections.emptyList()); + // Second run should fail + testRepairToolWithMode( + Option.empty(), Option.empty(), HoodieRepairTool.Mode.UNDO.toString(), + backupPath, false, + existingFileList, Collections.emptyList()); + } + + private void testRepairToolWithMode( + Option<String> startingInstantOption, Option<String> endingInstantOption, + String runningMode, String backupPath, boolean isRunSuccessful, + List<String> existFilePathList, List<String> nonExistFilePathList) throws IOException { + HoodieRepairTool.Config config = new HoodieRepairTool.Config(); + config.backupPath = backupPath; + config.basePath = basePath; + config.assumeDatePartitioning = true; + if (startingInstantOption.isPresent()) { + config.startingInstantTime = startingInstantOption.get(); + } + if (endingInstantOption.isPresent()) { + config.endingInstantTime = endingInstantOption.get(); + } + config.runningMode = runningMode; + HoodieRepairTool tool = new HoodieRepairTool(jsc, config); + assertEquals(isRunSuccessful, tool.run()); + verifyFilesInFS(existFilePathList, nonExistFilePathList); + } + + private void verifyFilesInFS( + List<String> existFilePathList, List<String> nonExistFilePathList) throws IOException { + FileSystem fs = metaClient.getFs(); + + for (String filePath : existFilePathList) { + assertTrue(fs.exists(new Path(filePath)), + String.format("File %s should exist but it's not in the file system", filePath)); + } + + for (String filePath : nonExistFilePathList) { + assertFalse(fs.exists(new Path(filePath)), + String.format("File %s should not exist but it's in the file system", filePath)); + } + } + + private List<String> createDanglingDataFilesInFS(String parentPath) { + FileSystem fs = metaClient.getFs(); + return DANGLING_DATA_FILE_LIST.stream().map(relativeFilePath -> { + Path path = new Path(parentPath, relativeFilePath); + try { + fs.mkdirs(path.getParent()); + if (!fs.exists(path)) { + fs.create(path, false); + } + } catch (IOException e) { + LOG.error("Error creating file: " + path); + } + return path.toString(); + }) + .collect(Collectors.toList()); + } + + @Override + public HoodieEngineContext context() { + return context; + } + + @Override + public SparkSession spark() { + return spark; + } + + @Override + public SQLContext sqlContext() { + return sqlContext; + } + + @Override + public JavaSparkContext jsc() { + return jsc; + } +}