This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3eca9da0dd99aeffc21f546e45609dee19b0d01f Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Thu Apr 18 15:57:59 2024 +0800 [refactor](filesystem)refactor `filesystem` interface (#33361) 1. Remame`list` to `globList` . The path of this `list` needs to have a wildcard character, and the corresponding hdfs interface is `globStatus`, so the modified name is `globList`. 2. If you only need to view files based on paths, you can use the `listFiles` operation. 3. Merge `listLocatedFiles` function into `listFiles` function. --- .../java/org/apache/doris/backup/Repository.java | 8 +- .../org/apache/doris/common/util/BrokerUtil.java | 2 +- .../doris/datasource/hive/HMSTransaction.java | 36 ++++--- .../doris/datasource/hive/HiveMetaStoreCache.java | 71 +++++++------ .../main/java/org/apache/doris/fs/FileSystem.java | 61 +++++------ .../java/org/apache/doris/fs/FileSystemUtil.java | 70 +++++++++++++ .../org/apache/doris/fs/LocalDfsFileSystem.java | 81 +++------------ .../apache/doris/fs/remote/BrokerFileSystem.java | 20 ++-- .../apache/doris/fs/remote/RemoteFileSystem.java | 76 ++++++++++---- .../org/apache/doris/fs/remote/S3FileSystem.java | 2 +- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 114 +-------------------- .../org/apache/doris/backup/BrokerStorageTest.java | 2 +- .../org/apache/doris/backup/RepositoryTest.java | 8 +- .../org/apache/doris/fs/obj/S3FileSystemTest.java | 4 +- 14 files changed, 253 insertions(+), 302 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java index a1ede5b373a..0f21027f2e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java @@ -245,7 +245,7 @@ public class Repository implements Writable { String repoInfoFilePath = assembleRepoInfoFilePath(); // check if the repo is already exist in remote List<RemoteFile> remoteFiles = Lists.newArrayList(); - Status st = fileSystem.list(repoInfoFilePath, remoteFiles); + Status st = fileSystem.globList(repoInfoFilePath, remoteFiles); if (!st.ok()) { return st; } @@ -417,7 +417,7 @@ public class Repository implements Writable { String listPath = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), PREFIX_SNAPSHOT_DIR) + "*"; List<RemoteFile> result = Lists.newArrayList(); - Status st = fileSystem.list(listPath, result); + Status st = fileSystem.globList(listPath, result); if (!st.ok()) { return st; } @@ -595,7 +595,7 @@ public class Repository implements Writable { public Status download(String remoteFilePath, String localFilePath) { // 0. list to get to full name(with checksum) List<RemoteFile> remoteFiles = Lists.newArrayList(); - Status status = fileSystem.list(remoteFilePath + "*", remoteFiles); + Status status = fileSystem.globList(remoteFilePath + "*", remoteFiles); if (!status.ok()) { return status; } @@ -759,7 +759,7 @@ public class Repository implements Writable { LOG.debug("assemble infoFilePath: {}, snapshot: {}", infoFilePath, snapshotName); } List<RemoteFile> results = Lists.newArrayList(); - Status st = fileSystem.list(infoFilePath + "*", results); + Status st = fileSystem.globList(infoFilePath + "*", results); if (!st.ok()) { info.add(snapshotName); info.add(FeConstants.null_string); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index a4d8186d1df..d32a04331b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -88,7 +88,7 @@ public class BrokerUtil { try { RemoteFileSystem fileSystem = FileSystemFactory.get( brokerDesc.getName(), brokerDesc.getStorageType(), brokerDesc.getProperties()); - Status st = fileSystem.list(path, rfiles, false); + Status st = fileSystem.globList(path, rfiles, false); if (!st.ok()) { throw new UserException(st.getErrMsg()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 0e668e0eda5..7a1d4389096 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -25,6 +25,7 @@ import org.apache.doris.backup.Status; import org.apache.doris.common.Pair; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.fs.FileSystem; +import org.apache.doris.fs.FileSystemUtil; import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.THivePartitionUpdate; @@ -238,6 +239,7 @@ public class HMSTransaction implements Transaction { hmsCommitter.doCommit(); } catch (Throwable t) { LOG.warn("Failed to commit for {}.{}, abort it.", dbName, tbName); + hmsCommitter.abort(); hmsCommitter.rollback(); throw t; } finally { @@ -564,7 +566,7 @@ public class HMSTransaction implements Transaction { private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir) { List<RemoteFile> allFiles = new ArrayList<>(); Set<String> allDirs = new HashSet<>(); - Status statusFile = fs.listFiles(directory.toString(), allFiles); + Status statusFile = fs.listFiles(directory.toString(), true, allFiles); Status statusDir = fs.listDirectories(directory.toString(), allDirs); if (!statusFile.ok() || !statusDir.ok()) { ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder(); @@ -1329,7 +1331,7 @@ public class HMSTransaction implements Transaction { doNothing(); } - public void rollback() { + public void abort() { cancelUnStartedAsyncFileSystemTask(); undoUpdateStatisticsTasks(); undoAddPartitionsTask(); @@ -1337,6 +1339,10 @@ public class HMSTransaction implements Transaction { runDirectoryClearUpTasksForAbort(); runRenameDirTasksForAbort(); } + + public void rollback() { + //delete write path + } } public Status wrapperRenameDirWithProfileSummary(String origFilePath, @@ -1366,22 +1372,24 @@ public class HMSTransaction implements Transaction { } public void wrapperAsyncRenameWithProfileSummary(Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, - String destFilePath, - List<String> fileNames) { - fs.asyncRename(executor, renameFileFutures, cancelled, origFilePath, destFilePath, fileNames); + List<CompletableFuture<?>> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + List<String> fileNames) { + FileSystemUtil.asyncRenameFiles( + fs, executor, renameFileFutures, cancelled, origFilePath, destFilePath, fileNames); summaryProfile.ifPresent(profile -> profile.addRenameFileCnt(fileNames.size())); } public void wrapperAsyncRenameDirWithProfileSummary(Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, - String destFilePath, - Runnable runWhenPathNotExist) { - fs.asyncRenameDir(executor, renameFileFutures, cancelled, origFilePath, destFilePath, runWhenPathNotExist); + List<CompletableFuture<?>> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + FileSystemUtil.asyncRenameDir( + fs, executor, renameFileFutures, cancelled, origFilePath, destFilePath, runWhenPathNotExist); summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 2690d82db78..fcebd67954e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -40,7 +40,6 @@ import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.fs.FileSystemCache; -import org.apache.doris.fs.RemoteFiles; import org.apache.doris.fs.remote.RemoteFile; import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.metric.GaugeMetric; @@ -80,7 +79,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.FileNotFoundException; import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -355,16 +353,17 @@ public class HiveMetaStoreCache { new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity( location, bindBrokerName), jobConf, bindBrokerName)); result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf)); - try { - // For Tez engine, it may generate subdirectoies for "union" query. - // So there may be files and directories in the table directory at the same time. eg: - // /user/hive/warehouse/region_tmp_union_all2/000000_0 - // /user/hive/warehouse/region_tmp_union_all2/1 - // /user/hive/warehouse/region_tmp_union_all2/2 - // So we need to recursively list data location. - // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 - RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, true); - for (RemoteFile remoteFile : locatedFiles.files()) { + // For Tez engine, it may generate subdirectoies for "union" query. + // So there may be files and directories in the table directory at the same time. eg: + // /user/hive/warehouse/region_tmp_union_all2/000000_0 + // /user/hive/warehouse/region_tmp_union_all2/1 + // /user/hive/warehouse/region_tmp_union_all2/2 + // So we need to recursively list data location. + // https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31 + List<RemoteFile> remoteFiles = new ArrayList<>(); + Status status = fs.listFiles(location, true, remoteFiles); + if (status.ok()) { + for (RemoteFile remoteFile : remoteFiles) { String srcPath = remoteFile.getPath().toString(); LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties()); Path convertedPath = locationPath.toScanRangeLocation(); @@ -373,15 +372,13 @@ public class HiveMetaStoreCache { } result.addFile(remoteFile); } - } catch (Exception e) { + } else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) { // User may manually remove partition under HDFS, in this case, // Hive doesn't aware that the removed partition is missing. // Here is to support this case without throw an exception. - if (e.getCause() instanceof FileNotFoundException) { - LOG.warn(String.format("File %s not exist.", location)); - } else { - throw e; - } + LOG.warn(String.format("File %s not exist.", location)); + } else { + throw new RuntimeException(status.getErrMsg()); } // Must copy the partitionValues to avoid concurrent modification of key and value result.setPartitionValues(Lists.newArrayList(partitionValues)); @@ -807,17 +804,22 @@ public class HiveMetaStoreCache { new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(location, bindBrokerName), jobConf, bindBrokerName)); - RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); - if (delta.isDeleteDelta()) { - List<String> deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter( - name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .collect(Collectors.toList()); - deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); - continue; - } - locatedFiles.files().stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + List<RemoteFile> remoteFiles = new ArrayList<>(); + Status status = fs.listFiles(location, false, remoteFiles); + if (status.ok()) { + if (delta.isDeleteDelta()) { + List<String> deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter( + name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + .collect(Collectors.toList()); + deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames)); + continue; + } + remoteFiles.stream().filter( + f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) .forEach(fileCacheValue::addFile); + } else { + throw new RuntimeException(status.getErrMsg()); + } } // base @@ -827,10 +829,15 @@ public class HiveMetaStoreCache { new FileSystemCache.FileSystemCacheKey( LocationPath.getFSIdentity(location, bindBrokerName), jobConf, bindBrokerName)); - RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); - locatedFiles.files().stream().filter( - f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) - .forEach(fileCacheValue::addFile); + List<RemoteFile> remoteFiles = new ArrayList<>(); + Status status = fs.listFiles(location, false, remoteFiles); + if (status.ok()) { + remoteFiles.stream().filter( + f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) + .forEach(fileCacheValue::addFile); + } else { + throw new RuntimeException(status.getErrMsg()); + } } fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas)); fileCacheValues.add(fileCacheValue); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java index 369fc917d77..94f5c420438 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java @@ -18,15 +18,11 @@ package org.apache.doris.fs; import org.apache.doris.backup.Status; -import org.apache.doris.common.UserException; import org.apache.doris.fs.remote.RemoteFile; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; /** * File system interface. @@ -50,49 +46,44 @@ public interface FileSystem { Status rename(String origFilePath, String destFilePath); - default Status renameDir(String origFilePath, - String destFilePath, - Runnable runWhenPathNotExist) { - throw new UnsupportedOperationException("Unsupported operation rename dir on current file system."); + default Status renameDir(String origFilePath, String destFilePath) { + return renameDir(origFilePath, destFilePath, () -> {}); } - default void asyncRename(Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, + default Status renameDir(String origFilePath, String destFilePath, - List<String> fileNames) { - throw new UnsupportedOperationException("Unsupported operation async rename on current file system."); - } - - default void asyncRenameDir(Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, - String destFilePath, - Runnable runWhenPathNotExist) { - throw new UnsupportedOperationException("Unsupported operation async rename dir on current file system."); + Runnable runWhenPathNotExist) { + throw new UnsupportedOperationException("Unsupported operation rename dir on current file system."); } Status delete(String remotePath); Status makeDir(String remotePath); - RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException; - - // List files in remotePath - // The remote file name will only contain file name only(Not full path) - default Status list(String remotePath, List<RemoteFile> result) { - return list(remotePath, result, true); + Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result); + + /** + * List files in remotePath by wildcard <br/> + * The {@link RemoteFile}'name will only contain file name (Not full path) + * @param remotePath remote path + * @param result All eligible files under the path + * @return + */ + default Status globList(String remotePath, List<RemoteFile> result) { + return globList(remotePath, result, true); } - Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly); - - default Status listFiles(String remotePath, List<RemoteFile> result) { - throw new UnsupportedOperationException("Unsupported operation list files on current file system."); - } + /** + * List files in remotePath by wildcard <br/> + * @param remotePath remote path + * @param result All eligible files under the path + * @param fileNameOnly for {@link RemoteFile}'name: whether the full path is included.<br/> + * true: only contains file name, false: contains full path<br/> + * @return + */ + Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly); default Status listDirectories(String remotePath, Set<String> result) { - throw new UnsupportedOperationException("Unsupported operation list directores on current file system."); + throw new UnsupportedOperationException("Unsupported operation list directories on current file system."); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemUtil.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemUtil.java new file mode 100644 index 00000000000..0d4c13c7651 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemUtil.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.backup.Status; + +import org.apache.hadoop.fs.Path; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +public class FileSystemUtil { + + public static void asyncRenameFiles(FileSystem fs, + Executor executor, + List<CompletableFuture<?>> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + List<String> fileNames) { + for (String fileName : fileNames) { + Path source = new Path(origFilePath, fileName); + Path target = new Path(destFilePath, fileName); + renameFileFutures.add(CompletableFuture.runAsync(() -> { + if (cancelled.get()) { + return; + } + Status status = fs.rename(source.toString(), target.toString()); + if (!status.ok()) { + throw new RuntimeException(status.getErrMsg()); + } + }, executor)); + } + } + + public static void asyncRenameDir(FileSystem fs, + Executor executor, + List<CompletableFuture<?>> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + renameFileFutures.add(CompletableFuture.runAsync(() -> { + if (cancelled.get()) { + return; + } + Status status = fs.renameDir(origFilePath, destFilePath, runWhenPathNotExist); + if (!status.ok()) { + throw new RuntimeException(status.getErrMsg()); + } + }, executor)); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java index 0faf1916db0..57a10ed1109 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java @@ -18,7 +18,6 @@ package org.apache.doris.fs; import org.apache.doris.backup.Status; -import org.apache.doris.common.UserException; import org.apache.doris.fs.remote.RemoteFile; import com.google.common.collect.ImmutableSet; @@ -30,14 +29,12 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; public class LocalDfsFileSystem implements FileSystem { @@ -111,46 +108,6 @@ public class LocalDfsFileSystem implements FileSystem { return rename(origFilePath, destFilePath); } - @Override - public void asyncRename(Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, - String destFilePath, - List<String> fileNames) { - for (String fileName : fileNames) { - Path source = new Path(origFilePath, fileName); - Path target = new Path(destFilePath, fileName); - renameFileFutures.add(CompletableFuture.runAsync(() -> { - if (cancelled.get()) { - return; - } - Status status = rename(source.toString(), target.toString()); - if (!status.ok()) { - throw new RuntimeException(status.getErrMsg()); - } - }, executor)); - } - } - - @Override - public void asyncRenameDir(Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, - String destFilePath, - Runnable runWhenPathNotExist) { - renameFileFutures.add(CompletableFuture.runAsync(() -> { - if (cancelled.get()) { - return; - } - Status status = renameDir(origFilePath, destFilePath, runWhenPathNotExist); - if (!status.ok()) { - throw new RuntimeException(status.getErrMsg()); - } - }, executor)); - } - @Override public Status delete(String remotePath) { try { @@ -172,14 +129,9 @@ public class LocalDfsFileSystem implements FileSystem { } @Override - public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException { - return null; - } - - @Override - public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { + public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { try { - FileStatus[] locatedFileStatusRemoteIterator = fs.listStatus(new Path(remotePath)); + FileStatus[] locatedFileStatusRemoteIterator = fs.globStatus(new Path(remotePath)); if (locatedFileStatusRemoteIterator == null) { return Status.OK; } @@ -197,23 +149,20 @@ public class LocalDfsFileSystem implements FileSystem { } @Override - public Status listFiles(String remotePath, List<RemoteFile> result) { - RemoteIterator<LocatedFileStatus> iterator; + public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) { try { - Path dirPath = new Path(remotePath); - iterator = fs.listFiles(dirPath, true); - while (iterator.hasNext()) { - LocatedFileStatus next = iterator.next(); - String location = next.getPath().toString(); - String child = location.substring(dirPath.toString().length()); - while (child.startsWith("/")) { - child = child.substring(1); - } - if (!child.contains("/")) { - result.add(new RemoteFile(location, next.isFile(), next.getLen(), next.getBlockSize())); - } + Path locatedPath = new Path(remotePath); + RemoteIterator<LocatedFileStatus> locatedFiles = fs.listFiles(locatedPath, recursive); + while (locatedFiles.hasNext()) { + LocatedFileStatus fileStatus = locatedFiles.next(); + RemoteFile location = new RemoteFile( + fileStatus.getPath(), fileStatus.isDirectory(), fileStatus.getLen(), + fileStatus.getBlockSize(), fileStatus.getModificationTime(), fileStatus.getBlockLocations()); + result.add(location); } - } catch (IOException e) { + } catch (FileNotFoundException e) { + return new Status(Status.ErrCode.NOT_FOUND, e.getMessage()); + } catch (Exception e) { return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); } return Status.OK; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java index 6e3d20a2df0..5b9ee8aaeca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java @@ -27,7 +27,6 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.datasource.property.PropertyConverter; -import org.apache.doris.fs.RemoteFiles; import org.apache.doris.fs.operations.BrokerFileOperations; import org.apache.doris.fs.operations.OpParams; import org.apache.doris.service.FrontendOptions; @@ -69,7 +68,6 @@ import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -568,11 +566,11 @@ public class BrokerFileSystem extends RemoteFileSystem { } @Override - public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException { + public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) { // get a proper broker Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker(); if (pair == null) { - throw new UserException("failed to get broker client"); + return new Status(Status.ErrCode.BAD_CONNECTION, "failed to get broker client"); } TPaloBrokerService.Client client = pair.first; TNetworkAddress address = pair.second; @@ -582,14 +580,14 @@ public class BrokerFileSystem extends RemoteFileSystem { try { TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath, recursive, properties); - req.setOnlyFiles(onlyFiles); + req.setOnlyFiles(true); TBrokerListResponse response = client.listLocatedFiles(req); TBrokerOperationStatus operationStatus = response.getOpStatus(); if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { - throw new UserException("failed to listLocatedFiles, remote path: " + remotePath + ". msg: " - + operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to listLocatedFiles, remote path: " + remotePath + ". msg: " + + operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); } - List<RemoteFile> result = new ArrayList<>(); List<TBrokerFileStatus> fileStatus = response.getFiles(); for (TBrokerFileStatus tFile : fileStatus) { org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(tFile.path); @@ -598,10 +596,10 @@ public class BrokerFileSystem extends RemoteFileSystem { result.add(file); } LOG.info("finished to listLocatedFiles, remote path {}. get files: {}", remotePath, result); - return new RemoteFiles(result); + return Status.OK; } catch (TException e) { needReturn = false; - throw new UserException("failed to listLocatedFiles, remote path: " + return new Status(Status.ErrCode.COMMON_ERROR, "failed to listLocatedFiles, remote path: " + remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); } finally { if (needReturn) { @@ -651,7 +649,7 @@ public class BrokerFileSystem extends RemoteFileSystem { // List files in remotePath @Override - public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { + public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { // get a proper broker Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker(); if (pair == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index ffe63f20ac7..311532794f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -18,17 +18,21 @@ package org.apache.doris.fs.remote; import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; import org.apache.doris.fs.PersistentFileSystem; -import org.apache.doris.fs.RemoteFiles; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import java.io.IOException; -import java.util.ArrayList; +import java.io.FileNotFoundException; +import java.util.Arrays; import java.util.List; +import java.util.Set; public abstract class RemoteFileSystem extends PersistentFileSystem { // this field will be visited by multi-threads, better use volatile qualifier @@ -43,26 +47,62 @@ public abstract class RemoteFileSystem extends PersistentFileSystem { } @Override - public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException { - org.apache.hadoop.fs.FileSystem fileSystem = nativeFileSystem(remotePath); + public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) { try { + org.apache.hadoop.fs.FileSystem fileSystem = nativeFileSystem(remotePath); Path locatedPath = new Path(remotePath); - RemoteIterator<LocatedFileStatus> locatedFiles = onlyFiles ? fileSystem.listFiles(locatedPath, recursive) - : fileSystem.listLocatedStatus(locatedPath); - return getFileLocations(locatedFiles); - } catch (IOException e) { - throw new UserException("Failed to list located status for path: " + remotePath, e); + RemoteIterator<LocatedFileStatus> locatedFiles = fileSystem.listFiles(locatedPath, recursive); + while (locatedFiles.hasNext()) { + LocatedFileStatus fileStatus = locatedFiles.next(); + RemoteFile location = new RemoteFile( + fileStatus.getPath(), fileStatus.isDirectory(), fileStatus.getLen(), + fileStatus.getBlockSize(), fileStatus.getModificationTime(), fileStatus.getBlockLocations()); + result.add(location); + } + } catch (FileNotFoundException e) { + return new Status(Status.ErrCode.NOT_FOUND, e.getMessage()); + } catch (Exception e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); } + return Status.OK; } - private RemoteFiles getFileLocations(RemoteIterator<LocatedFileStatus> locatedFiles) throws IOException { - List<RemoteFile> locations = new ArrayList<>(); - while (locatedFiles.hasNext()) { - LocatedFileStatus fileStatus = locatedFiles.next(); - RemoteFile location = new RemoteFile(fileStatus.getPath(), fileStatus.isDirectory(), fileStatus.getLen(), - fileStatus.getBlockSize(), fileStatus.getModificationTime(), fileStatus.getBlockLocations()); - locations.add(location); + @Override + public Status listDirectories(String remotePath, Set<String> result) { + try { + FileSystem fileSystem = nativeFileSystem(remotePath); + FileStatus[] fileStatuses = fileSystem.listStatus(new Path(remotePath)); + result.addAll( + Arrays.stream(fileStatuses) + .filter(FileStatus::isDirectory) + .map(file -> file.getPath().toString() + "/") + .collect(ImmutableSet.toImmutableSet())); + } catch (Exception e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } + return Status.OK; + } + + @Override + public Status renameDir(String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + Status status = exists(destFilePath); + if (status.ok()) { + return new Status(Status.ErrCode.COMMON_ERROR, "Destination directory already exists: " + destFilePath); } - return new RemoteFiles(locations); + + String targetParent = new Path(destFilePath).getParent().toString(); + status = exists(targetParent); + if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) { + status = makeDir(targetParent); + } + if (!status.ok()) { + return new Status(Status.ErrCode.COMMON_ERROR, status.getErrMsg()); + } + + runWhenPathNotExist.run(); + + return rename(origFilePath, destFilePath); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index db0062cf98a..5771c65224b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -72,7 +72,7 @@ public class S3FileSystem extends ObjFileSystem { // broker file pattern glob is too complex, so we use hadoop directly @Override - public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { + public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { try { FileSystem s3AFileSystem = nativeFileSystem(remotePath); Path pathPattern = new Path(remotePath); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 7e3032ca807..7f31f8eed49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -31,15 +31,12 @@ import org.apache.doris.fs.remote.RemoteFileSystem; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -54,14 +51,9 @@ import java.nio.ByteBuffer; import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; public class DFSFileSystem extends RemoteFileSystem { @@ -395,70 +387,6 @@ public class DFSFileSystem extends RemoteFileSystem { return Status.OK; } - @Override - public void asyncRename( - Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, - String destFilePath, - List<String> fileNames) { - - for (String fileName : fileNames) { - Path source = new Path(origFilePath, fileName); - Path target = new Path(destFilePath, fileName); - renameFileFutures.add(CompletableFuture.runAsync(() -> { - if (cancelled.get()) { - return; - } - Status status = rename(source.toString(), target.toString()); - if (!status.ok()) { - throw new RuntimeException(status.getErrMsg()); - } - }, executor)); - } - } - - public Status renameDir(String origFilePath, - String destFilePath, - Runnable runWhenPathNotExist) { - Status status = exists(destFilePath); - if (status.ok()) { - throw new RuntimeException("Destination directory already exists: " + destFilePath); - } - - String targetParent = new Path(destFilePath).getParent().toString(); - status = exists(targetParent); - if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) { - status = makeDir(targetParent); - } - if (!status.ok()) { - throw new RuntimeException(status.getErrMsg()); - } - - runWhenPathNotExist.run(); - - return rename(origFilePath, destFilePath); - } - - @Override - public void asyncRenameDir(Executor executor, - List<CompletableFuture<?>> renameFileFutures, - AtomicBoolean cancelled, - String origFilePath, - String destFilePath, - Runnable runWhenPathNotExist) { - renameFileFutures.add(CompletableFuture.runAsync(() -> { - if (cancelled.get()) { - return; - } - Status status = renameDir(origFilePath, destFilePath, runWhenPathNotExist); - if (!status.ok()) { - throw new RuntimeException(status.getErrMsg()); - } - }, executor)); - } - @Override public Status delete(String remotePath) { try { @@ -486,7 +414,7 @@ public class DFSFileSystem extends RemoteFileSystem { * @return Status.OK if success. */ @Override - public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { + public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { try { URI pathUri = URI.create(remotePath); FileSystem fileSystem = nativeFileSystem(remotePath); @@ -528,44 +456,4 @@ public class DFSFileSystem extends RemoteFileSystem { } return Status.OK; } - - @Override - public Status listFiles(String remotePath, List<RemoteFile> result) { - RemoteIterator<LocatedFileStatus> iterator; - try { - FileSystem fileSystem = nativeFileSystem(remotePath); - Path dirPath = new Path(remotePath); - iterator = fileSystem.listFiles(dirPath, true); - while (iterator.hasNext()) { - LocatedFileStatus next = iterator.next(); - String location = next.getPath().toString(); - String child = location.substring(dirPath.toString().length()); - while (child.startsWith("/")) { - child = child.substring(1); - } - if (!child.contains("/")) { - result.add(new RemoteFile(location, next.isFile(), next.getLen(), next.getBlockSize())); - } - } - } catch (Exception e) { - return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); - } - return Status.OK; - } - - @Override - public Status listDirectories(String remotePath, Set<String> result) { - try { - FileSystem fileSystem = nativeFileSystem(remotePath); - FileStatus[] fileStatuses = fileSystem.listStatus(new Path(remotePath)); - result.addAll( - Arrays.stream(fileStatuses) - .filter(FileStatus::isDirectory) - .map(file -> file.getPath().toString() + "/") - .collect(ImmutableSet.toImmutableSet())); - } catch (Exception e) { - return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); - } - return Status.OK; - } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BrokerStorageTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BrokerStorageTest.java index 3e59a5552fc..704bc17c538 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/BrokerStorageTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BrokerStorageTest.java @@ -176,7 +176,7 @@ public class BrokerStorageTest { Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".1")); Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".2")); Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".3")); - Assert.assertEquals(Status.OK, fileSystem.list(bucket + basePath + "_list/*", result)); + Assert.assertEquals(Status.OK, fileSystem.globList(bucket + basePath + "_list/*", result)); Assert.assertEquals(3, result.size()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java index 597324fde7f..a49d7e4328e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java @@ -106,7 +106,7 @@ public class RepositoryTest { public void testInit() { new Expectations() { { - fileSystem.list(anyString, (List<RemoteFile>) any); + fileSystem.globList(anyString, (List<RemoteFile>) any); minTimes = 0; result = new Delegate<Status>() { public Status list(String remotePath, List<RemoteFile> result) { @@ -180,7 +180,7 @@ public class RepositoryTest { public void testListSnapshots() { new Expectations() { { - fileSystem.list(anyString, (List<RemoteFile>) any); + fileSystem.globList(anyString, (List<RemoteFile>) any); minTimes = 0; result = new Delegate() { public Status list(String remotePath, List<RemoteFile> result) { @@ -250,7 +250,7 @@ public class RepositoryTest { new Expectations() { { - fileSystem.list(anyString, (List<RemoteFile>) any); + fileSystem.globList(anyString, (List<RemoteFile>) any); minTimes = 0; result = new Delegate() { public Status list(String remotePath, List<RemoteFile> result) { @@ -285,7 +285,7 @@ public class RepositoryTest { public void testGetSnapshotInfo() { new Expectations() { { - fileSystem.list(anyString, (List<RemoteFile>) any); + fileSystem.globList(anyString, (List<RemoteFile>) any); minTimes = 0; result = new Delegate() { public Status list(String remotePath, List<RemoteFile> result) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java index 81247b54371..d5983bb3ab5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java @@ -108,7 +108,7 @@ public class S3FileSystemTest { fileSystem = new S3FileSystem(mockedStorage); new MockUp<S3FileSystem>(S3FileSystem.class) { @Mock - public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { + public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { try { S3URI uri = S3URI.create(remotePath, false); ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder().bucket(uri.getBucket()); @@ -225,7 +225,7 @@ public class S3FileSystemTest { Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".1")); Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".2")); Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".3")); - Assertions.assertEquals(Status.OK, fileSystem.list(bucket + basePath + "_list/*", result)); + Assertions.assertEquals(Status.OK, fileSystem.globList(bucket + basePath + "_list/*", result)); Assertions.assertEquals(3, result.size()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org