This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 53a452af6 IMPALA-13254: Optimize REFRESH for Iceberg tables
53a452af6 is described below
commit 53a452af669b022fd2ab19eb58a5dc5ea29aed78
Author: fulili <[email protected]>
AuthorDate: Tue Aug 20 06:42:38 2024 +0000
IMPALA-13254: Optimize REFRESH for Iceberg tables
Considering that Iceberg's ContentFile is a collection of immutable
files, the current code logic has been simplified. The optimized
process is as follows:
1. For existing ContentFiles, directly reuse the existing file
descriptors.
2. For newly added ContentFiles that do not support block locations,
directly create file descriptors.
3. For newly added ContentFiles that support block locations,
choose between using a listLocatedStatus operation or calling
getFileBlockLocations one by one, based on the number of files.
A simple performance comparison test has been conducted in a
single-node environment. The test used the following data tables:
- non_partitioned_table: No partitions, containing 10,000 files
- partitioned_table_1: Contains 10,000 partitions, each with 1 file
- partitioned_table_2: Contains 300 partitions, each with 300 files
and scenarios tested:
- FULL: Perform REFRESH after executing INVALIDATE METADATA
- ADD_1_FILES: Insert 1 file using Hive and then perform REFRESH
- ADD_101_FILES: Insert 101 files using Hive and then perform REFRESH
The test results of the new version are as follows:
+------------------------+----------+-------------+----------------+
| Table | FULL | ADD_1_FILES | ADD_101_FILES |
+------------------------+----------+-------------+----------------+
| non_partitioned_table | 356.389ms| 40.015ms | 302.435ms |
| partitioned_table_1 | 288.798ms| 26.667ms | 33.035ms |
| partitioned_table_2 | 1s436ms | 237.057ms | 225.749ms |
+------------------------+----------+-------------+----------------+
The test results of the old version are as follows:
+------------------------+----------+-------------+----------------+
| Table | FULL | ADD_1_FILES | ADD_101_FILES |
+------------------------+----------+-------------+----------------+
| non_partitioned_table | 338ms | 57.156ms | 12s903ms |
| partitioned_table_1 | 281ms | 40.525ms | 12s743ms |
| partitioned_table_2 | 1s397ms | 336.965ms | 1m57s |
+------------------------+----------+-------------+----------------+
It can be observed that when the number of newly added files exceeds
iceberg_reload_new_files_threshold, REFRESH performance improves
significantly, while there is no noticeable change in other scenarios.
Change-Id: I8c99a28eb16275efdff52e0ea2711c0c6036719
Reviewed-on: http://gerrit.cloudera.org:8080/21608
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/common/global-flags.cc | 6 +-
.../impala/catalog/IcebergFileMetadataLoader.java | 255 +++++++++------------
.../impala/catalog/FileMetadataLoaderTest.java | 40 ++--
3 files changed, 141 insertions(+), 160 deletions(-)
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 072ed4dc4..7eaa7dd2b 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -417,9 +417,9 @@ DEFINE_int64(update_catalogd_rpc_resend_interval_ms, 100,
"(Advanced) Interval (
"statestore has failed to send the RPC to the subscriber.");
DEFINE_int32(iceberg_reload_new_files_threshold, 100, "(Advanced) If during a
table "
- "refresh the number of new files are greater than this, catalogd will
completely "
- "reload all file metadata. If number of new files are less or equal to
this, "
- "catalogd will only load the metadata of the newly added files.");
+ "refresh the number of new files are greater than this, catalogd will use
a "
+ "recursive file listing to load file metadata. If number of new files are
less or "
+ "equal to this, catalogd will load the file metadata one by one.");
DEFINE_bool(iceberg_allow_datafiles_in_table_location_only, true, "If true,
Impala "
"does not allow Iceberg data file locations outside of the table directory
during "
diff --git
a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
index 00e2c99d7..b9ceff848 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java
@@ -23,23 +23,22 @@ import static
org.apache.impala.catalog.ParallelFileMetadataLoader.getPoolSize;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -53,10 +52,10 @@ import
org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Reference;
+import org.apache.impala.common.Pair;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.ListMap;
-
import org.apache.impala.util.ThreadNameAnnotator;
import org.slf4j.Logger;
@@ -69,8 +68,6 @@ public class IcebergFileMetadataLoader extends
FileMetadataLoader {
private final static Logger LOG = LoggerFactory.getLogger(
IcebergFileMetadataLoader.class);
- private static final Configuration CONF = new Configuration();
-
// Default value of 'newFilesThreshold_' if the given parameter or startup
flag have
// invalid value.
private final int NEW_FILES_THRESHOLD_DEFAULT = 100;
@@ -81,6 +78,7 @@ public class IcebergFileMetadataLoader extends
FileMetadataLoader {
private final GroupedContentFiles icebergFiles_;
private final boolean canDataBeOutsideOfTableLocation_;
+ private boolean useParallelListing_;
public IcebergFileMetadataLoader(Path partDir, boolean recursive,
List<FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex,
@@ -111,146 +109,144 @@ public class IcebergFileMetadataLoader extends
FileMetadataLoader {
@Override
public void load() throws CatalogException, IOException {
- if (!shouldReuseOldFds()) {
- super.load();
- } else {
- try {
- reloadWithOldFds();
- } finally {
- FileMetadataLoader.TOTAL_TASKS.decrementAndGet();
+ String msg = String.format("Refreshing Iceberg file metadata from path
%s", partDir_);
+ LOG.trace(msg);
+ try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) {
+ loadInternal();
+ } finally {
+ FileMetadataLoader.TOTAL_TASKS.decrementAndGet();
+ }
+ }
+
+ private void loadInternal() throws CatalogException, IOException {
+ loadedFds_ = new ArrayList<>();
+ loadStats_ = new LoadStats(partDir_);
+
+ // Process the existing Fd ContentFile and return the newly added
ContentFile
+ Iterable<ContentFile<?>> newContentFiles = loadContentFilesWithOldFds();
+ // Iterate through all the newContentFiles, determine if StorageIds are
supported,
+ // and use different handling methods accordingly.
+ // This considers that different ContentFiles are on different FileSystems
+ List<Pair<FileSystem, ContentFile<?>>> filesSupportsStorageIds =
Lists.newArrayList();
+ FileSystem fsForTable = FileSystemUtil.getFileSystemForPath(partDir_);
+ FileSystem defaultFs = FileSystemUtil.getDefaultFileSystem();
+ for (ContentFile<?> contentFile : newContentFiles) {
+ FileSystem fsForPath = fsForTable;
+ // If canDataBeOutsideOfTableLocation is not true, we assume that the
file system
+ // for all ContentFiles is the same as fsForTable
+ if (canDataBeOutsideOfTableLocation_) {
+ Path path = new Path(contentFile.path().toString());
+ fsForPath = path.toUri().getScheme() != null ?
+ FileSystemUtil.getFileSystemForPath(path) : defaultFs;
+ }
+ // If the specific fs does not support StorageIds, then
+ // we create FileDescriptor directly
+ if (FileSystemUtil.supportsStorageIds(fsForPath)) {
+ filesSupportsStorageIds.add(Pair.create(fsForPath, contentFile));
+ } else {
+ loadedFds_.add(createFd(fsForPath, contentFile, null, null));
+ ++loadStats_.loadedFiles;
}
}
+ // If the number of filesSupportsStorageIds are greater than
newFilesThreshold,
+ // we will use a recursive file listing to load file metadata. If number
of new
+ // files are less or equal to this, we will load the metadata of the newly
added
+ // files one by one
+ useParallelListing_ = filesSupportsStorageIds.size() > newFilesThreshold_;
+ Reference<Long> numUnknownDiskIds = new Reference<>(0L);
+ Map<Path, FileStatus> nameToFileStatus = Collections.emptyMap();
+ if (useParallelListing_) {
+ nameToFileStatus = parallelListing(filesSupportsStorageIds);
+ }
+ for (Pair<FileSystem, ContentFile<?>> contentFileInfo :
filesSupportsStorageIds) {
+ Path path = FileSystemUtil.createFullyQualifiedPath(
+ new Path(contentFileInfo.getSecond().path().toString()));
+ FileStatus stat = nameToFileStatus.get(path);
+ loadedFds_.add(createFd(contentFileInfo.getFirst(),
contentFileInfo.getSecond(),
+ stat, numUnknownDiskIds));
+ }
+ loadStats_.loadedFiles += filesSupportsStorageIds.size();
+ loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(loadStats_.debugString());
+ }
+ }
+
+ @VisibleForTesting
+ boolean useParallelListing() {
+ return useParallelListing_;
}
/**
* Iceberg tables are a collection of immutable, uniquely identifiable data
files,
* which means we can safely reuse the old FDs.
*/
- private void reloadWithOldFds() throws IOException {
- loadStats_ = new LoadStats(partDir_);
- FileSystem fs = partDir_.getFileSystem(CONF);
-
- String msg = String.format("Refreshing Iceberg file metadata from path %s
" +
- "while reusing old file descriptors", partDir_);
- LOG.trace(msg);
- try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg)) {
- loadedFds_ = new ArrayList<>();
- Reference<Long> numUnknownDiskIds = new Reference<>(0L);
- for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) {
- FileDescriptor fd = getOldFd(contentFile);
- if (fd == null) {
- fd = getFileDescriptor(fs, contentFile, numUnknownDiskIds);
- } else {
- ++loadStats_.skippedFiles;
- }
+ private Iterable<ContentFile<?>> loadContentFilesWithOldFds() throws
IOException {
+ if (forceRefreshLocations || oldFdsByPath_.isEmpty()) {
+ return icebergFiles_.getAllContentFiles();
+ }
+ List<ContentFile<?>> newContentFiles = Lists.newArrayList();
+ for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) {
+ FileDescriptor fd = getOldFd(contentFile);
+ if (fd == null) {
+ newContentFiles.add(contentFile);
+ } else {
+ ++loadStats_.skippedFiles;
loadedFds_.add(Preconditions.checkNotNull(fd));
}
- Preconditions.checkState(loadStats_.loadedFiles <= newFilesThreshold_);
- loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
- if (LOG.isTraceEnabled()) {
- LOG.trace(loadStats_.debugString());
- }
}
+ return newContentFiles;
}
- private FileDescriptor getFileDescriptor(FileSystem fs, ContentFile<?>
contentFile,
- Reference<Long> numUnknownDiskIds) throws IOException {
- Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
- new Path(contentFile.path().toString()));
- FileStatus stat;
- if (FileSystemUtil.supportsStorageIds(fs)) {
- stat = Utils.createLocatedFileStatus(fileLoc, fs);
- } else {
- // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus
ourselves.
- stat = Utils.createFileStatus(contentFile, fileLoc);
+ private FileDescriptor createFd(FileSystem fs, ContentFile<?> contentFile,
+ FileStatus stat, Reference<Long> numUnknownDiskIds) throws IOException {
+ if (stat == null) {
+ Path fileLoc = FileSystemUtil.createFullyQualifiedPath(
+ new Path(contentFile.path().toString()));
+ if (FileSystemUtil.supportsStorageIds(fs)) {
+ stat = Utils.createLocatedFileStatus(fileLoc, fs);
+ } else {
+ // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus
ourselves.
+ stat = Utils.createFileStatus(contentFile, fileLoc);
+ }
}
- return getFileDescriptor(fs, FileSystemUtil.supportsStorageIds(fs),
- numUnknownDiskIds, stat);
- }
- /**
- * Throw exception if the path fails to relativize based on the location of
the Iceberg
- * tables, and files is not allowed outside the table location.
- */
- @Override
- protected FileDescriptor getFileDescriptor(FileSystem fs, boolean
listWithLocations,
- Reference<Long> numUnknownDiskIds, FileStatus fileStatus) throws
IOException {
String absPath = null;
- String relPath =
FileSystemUtil.relativizePathNoThrow(fileStatus.getPath(), partDir_);
+ String relPath = FileSystemUtil.relativizePathNoThrow(stat.getPath(),
partDir_);
if (relPath == null) {
if (canDataBeOutsideOfTableLocation_) {
- absPath = fileStatus.getPath().toString();
+ absPath = stat.getPath().toString();
} else {
throw new IOException(String.format("Failed to load Iceberg datafile
%s, because "
- + "it's outside of the table location",
fileStatus.getPath().toUri()));
+ + "it's outside of the table location", stat.getPath().toUri()));
}
}
-
- String path = Strings.isNullOrEmpty(relPath) ? absPath : relPath;
- FileDescriptor fd = oldFdsByPath_.get(path);
- if (listWithLocations || forceRefreshLocations || fd == null ||
- fd.isChanged(fileStatus)) {
- fd = createFd(fs, fileStatus, relPath, numUnknownDiskIds, absPath);
- ++loadStats_.loadedFiles;
- } else {
- ++loadStats_.skippedFiles;
- }
- return fd;
+ return createFd(fs, stat, relPath, numUnknownDiskIds, absPath);
}
/**
- * Return file status list based on the data and delete files of the Iceberg
tables.
+ * Using a thread pool to perform parallel List operations on the
FileSystem, this takes
+ * into account the situation where multiple FileSystems exist within the
ContentFiles.
*/
- @Override
- protected List<FileStatus> getFileStatuses(FileSystem fs, boolean
listWithLocations)
- throws IOException {
- if (icebergFiles_.isEmpty()) return null;
- // For the FSs in 'FileSystemUtil#SCHEME_SUPPORT_STORAGE_IDS' (e.g. HDFS,
Ozone,
- // Alluxio, etc.) we ensure the file with block location information, so
we're going
- // to get the block information through 'FileSystemUtil.listFiles'.
- Map<Path, FileStatus> nameToFileStatus = Collections.emptyMap();
- if (listWithLocations) nameToFileStatus = parallelListing(fs);
- List<FileStatus> stats = Lists.newLinkedList();
- for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) {
- Path path = FileSystemUtil.createFullyQualifiedPath(
- new Path(contentFile.path().toString()));
- // If data is in the table location, then we can get LocatedFileStatus
from
- // 'nameToFileStatus'. If 'nameToFileStatus' does not include the
ContentFile, we
- // try to get LocatedFileStatus based on the specific fs(StorageIds are
supported)
- // of the actual ContentFile. If the specific fs does not support
StorageIds, then
- // we create FileStatus directly by the method
- //
'org.apache.impala.catalog.IcebergFileMetadataLoader.createFileStatus'.
- if (nameToFileStatus.containsKey(path)) {
- stats.add(nameToFileStatus.get(path));
- } else {
- FileSystem fsForPath = FileSystemUtil.getFileSystemForPath(path);
- if (FileSystemUtil.supportsStorageIds(fsForPath)) {
- stats.add(Utils.createLocatedFileStatus(path, fsForPath));
- } else {
- // To avoid the cost of directory listing on OSS service (e.g. S3A,
COS, OSS,
- // etc), we create FileStatus ourselves.
- stats.add(Utils.createFileStatus(contentFile, path));
- }
- }
- }
- return stats;
- }
-
- private Map<Path, FileStatus> parallelListing(FileSystem fs) throws
IOException {
+ private Map<Path, FileStatus> parallelListing(
+ Iterable<Pair<FileSystem, ContentFile<?>>> contentFiles) throws
IOException {
+ final Set<Path> partitionPaths = collectPartitionPaths(contentFiles);
+ if (partitionPaths.size() == 0) return Collections.emptyMap();
String logPrefix = "Parallel Iceberg file metadata listing";
- int poolSize = getPoolSize(icebergFiles_.size(), fs);
+ // Use the file system type of the table's root path as
+ // the basis for determining the pool size.
+ int poolSize = getPoolSize(partitionPaths.size(),
+ FileSystemUtil.getFileSystemForPath(partDir_));
ExecutorService pool = createPool(poolSize, logPrefix);
TOTAL_THREADS.addAndGet(poolSize);
- final Set<Path> partitionPaths;
Map<Path, FileStatus> nameToFileStatus = Maps.newConcurrentMap();
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix)) {
- partitionPaths = icebergFilesByPartition();
TOTAL_TASKS.addAndGet(partitionPaths.size());
List<Future<Void>> tasks =
partitionPaths.stream()
.map(path -> pool.submit(() -> {
try {
- return listingTask(fs, path, nameToFileStatus);
+ return listingTask(path, nameToFileStatus);
} finally {
TOTAL_TASKS.decrementAndGet();
}
@@ -266,14 +262,17 @@ public class IcebergFileMetadataLoader extends
FileMetadataLoader {
return nameToFileStatus;
}
- private Set<Path> icebergFilesByPartition() {
- return
StreamSupport.stream(icebergFiles_.getAllContentFiles().spliterator(), false)
- .map(contentFile -> new
Path(String.valueOf(contentFile.path())).getParent())
+ private Set<Path> collectPartitionPaths(
+ Iterable<Pair<FileSystem, ContentFile<?>>> contentFiles) {
+ return StreamSupport.stream(contentFiles.spliterator(), false)
+ .map(contentFile ->
+ new
Path(String.valueOf(contentFile.getSecond().path())).getParent())
.collect(Collectors.toSet());
}
- private Void listingTask(FileSystem fs, Path partitionPath,
+ private Void listingTask(Path partitionPath,
Map<Path, FileStatus> nameToFileStatus) throws IOException {
+ FileSystem fs = FileSystemUtil.getFileSystemForPath(partitionPath);
RemoteIterator<? extends FileStatus> remoteIterator =
FileSystemUtil.listFiles(fs, partitionPath, recursive_, debugAction_);
Map<Path, FileStatus> perThreadMapping = new HashMap<>();
@@ -285,34 +284,6 @@ public class IcebergFileMetadataLoader extends
FileMetadataLoader {
return null;
}
- @VisibleForTesting
- boolean shouldReuseOldFds() throws IOException {
- if (oldFdsByPath_ == null || oldFdsByPath_.isEmpty()) return false;
- if (forceRefreshLocations) return false;
-
- int oldFdsSize = oldFdsByPath_.size();
- int iceContentFilesSize = icebergFiles_.size();
-
- if (iceContentFilesSize - oldFdsSize > newFilesThreshold_) {
- LOG.trace("There are at least {} new files under path {}.",
- iceContentFilesSize - oldFdsSize, partDir_);
- return false;
- }
-
- int newFiles = 0;
- for (ContentFile<?> contentFile : icebergFiles_.getAllContentFiles()) {
- if (getOldFd(contentFile) == null) {
- ++newFiles;
- if (newFiles > newFilesThreshold_) {
- LOG.trace("There are at least {} new files under path {}.",
newFiles, partDir_);
- return false;
- }
- }
- }
- LOG.trace("There are only {} new files under path {}.", newFiles,
partDir_);
- return true;
- }
-
FileDescriptor getOldFd(ContentFile<?> contentFile) throws IOException {
Path contentFilePath = FileSystemUtil.createFullyQualifiedPath(
new Path(contentFile.path().toString()));
diff --git
a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index a0f5129e2..af9c6b62e 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -163,8 +163,8 @@ public class FileMetadataLoaderTest {
"functional_parquet", "iceberg_partitioned",
/* oldFds = */ fml1.getLoadedFds(),
/* canDataBeOutsideOfTableLocation = */ false);
- assertTrue(fml1Refresh.shouldReuseOldFds());
fml1Refresh.load();
+ assertFalse(fml1Refresh.useParallelListing());
assertEquals(0, fml1Refresh.getStats().loadedFiles);
assertEquals(20, fml1Refresh.getStats().skippedFiles);
assertEquals(20, fml1Refresh.getLoadedFds().size());
@@ -186,8 +186,8 @@ public class FileMetadataLoaderTest {
"functional_parquet", "iceberg_non_partitioned",
/* oldFds = */ fml2.getLoadedFds(),
/* canDataBeOutsideOfTableLocation = */ false);
- assertTrue(fml2Refresh.shouldReuseOldFds());
fml2Refresh.load();
+ assertFalse(fml2Refresh.useParallelListing());
assertEquals(0, fml2Refresh.getStats().loadedFiles);
assertEquals(20, fml2Refresh.getStats().skippedFiles);
assertEquals(20, fml2Refresh.getLoadedFds().size());
@@ -213,8 +213,8 @@ public class FileMetadataLoaderTest {
"functional_parquet", "iceberg_partitioned",
/* oldFds = */ fml1.getLoadedFds().subList(0, 10),
/* canDataBeOutsideOfTableLocation = */ false);
- assertTrue(fml1Refresh.shouldReuseOldFds());
fml1Refresh.load();
+ assertFalse(fml1Refresh.useParallelListing());
assertEquals(10, fml1Refresh.getStats().loadedFiles);
assertEquals(10, fml1Refresh.getStats().skippedFiles);
assertEquals(20, fml1Refresh.getLoadedFds().size());
@@ -229,8 +229,8 @@ public class FileMetadataLoaderTest {
"functional_parquet", "iceberg_non_partitioned",
/* oldFds = */ fml2.getLoadedFds().subList(0, 10),
/* canDataBeOutsideOfTableLocation = */ false);
- assertTrue(fml2Refresh.shouldReuseOldFds());
fml2Refresh.load();
+ assertFalse(fml2Refresh.useParallelListing());
assertEquals(10, fml2Refresh.getStats().loadedFiles);
assertEquals(10, fml2Refresh.getStats().skippedFiles);
assertEquals(20, fml2Refresh.getLoadedFds().size());
@@ -245,26 +245,34 @@ public class FileMetadataLoaderTest {
/* canDataBeOutsideOfTableLocation = */ false);
fml1.load();
+ IcebergFileMetadataLoader fml1ForceRefresh =
getLoaderForIcebergTable(catalog,
+ "functional_parquet", "iceberg_partitioned",
+ /* oldFds = */ fml1.getLoadedFds().subList(0, 10),
+ /* canDataBeOutsideOfTableLocation = */ false, 10);
+ fml1ForceRefresh.setForceRefreshBlockLocations(true);
+ fml1ForceRefresh.load();
+ assertTrue(fml1ForceRefresh.useParallelListing());
+
IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_partitioned",
/* oldFds = */ fml1.getLoadedFds().subList(0, 10),
- /* canDataBeOutsideOfTableLocation = */ false);
- assertTrue(fml1Refresh.shouldReuseOldFds());
- fml1Refresh.setForceRefreshBlockLocations(true);
- assertFalse(fml1Refresh.shouldReuseOldFds());
+ /* canDataBeOutsideOfTableLocation = */ false, 10);
fml1Refresh.setForceRefreshBlockLocations(false);
- assertTrue(fml1Refresh.shouldReuseOldFds());
+ fml1Refresh.load();
+ assertFalse(fml1Refresh.useParallelListing());
IcebergFileMetadataLoader fml1Refresh10 = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_partitioned",
/* oldFds = */ fml1.getLoadedFds().subList(0, 10),
/* canDataBeOutsideOfTableLocation = */ false, 10);
- assertTrue(fml1Refresh10.shouldReuseOldFds());
+ fml1Refresh10.load();
+ assertFalse(fml1Refresh10.useParallelListing());
IcebergFileMetadataLoader fml1Refresh9 = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_partitioned",
/* oldFds = */ fml1.getLoadedFds().subList(0, 10),
/* canDataBeOutsideOfTableLocation = */ false, 9);
- assertFalse(fml1Refresh9.shouldReuseOldFds());
+ fml1Refresh9.load();
+ assertTrue(fml1Refresh9.useParallelListing());
IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog,
@@ -281,12 +289,14 @@ public class FileMetadataLoaderTest {
"functional_parquet", "iceberg_non_partitioned",
/* oldFds = */ fml2.getLoadedFds().subList(0, 10),
/* canDataBeOutsideOfTableLocation = */ false, 10);
- assertTrue(fml2Refresh10.shouldReuseOldFds());
+ fml2Refresh10.load();
+ assertFalse(fml2Refresh10.useParallelListing());
IcebergFileMetadataLoader fml2Refresh9 = getLoaderForIcebergTable(catalog,
"functional_parquet", "iceberg_non_partitioned",
/* oldFds = */ fml2.getLoadedFds().subList(0, 10),
/* canDataBeOutsideOfTableLocation = */ false, 9);
- assertFalse(fml2Refresh9.shouldReuseOldFds());
+ fml2Refresh9.load();
+ assertTrue(fml2Refresh9.useParallelListing());
}
@Test
@@ -303,8 +313,8 @@ public class FileMetadataLoaderTest {
"functional_parquet", "iceberg_multiple_storage_locations",
/* oldFds = */ fml1.getLoadedFds().subList(0, 1),
/* canDataBeOutsideOfTableLocation = */ true);
- assertTrue(fml1Refresh1.shouldReuseOldFds());
fml1Refresh1.load();
+ assertFalse(fml1Refresh1.useParallelListing());
assertEquals(5, fml1Refresh1.getStats().loadedFiles);
assertEquals(1, fml1Refresh1.getStats().skippedFiles);
assertEquals(6, fml1Refresh1.getLoadedFds().size());
@@ -313,8 +323,8 @@ public class FileMetadataLoaderTest {
"functional_parquet", "iceberg_multiple_storage_locations",
/* oldFds = */ fml1.getLoadedFds().subList(0, 5),
/* canDataBeOutsideOfTableLocation = */ true);
- assertTrue(fml1Refresh5.shouldReuseOldFds());
fml1Refresh5.load();
+ assertFalse(fml1Refresh5.useParallelListing());
assertEquals(1, fml1Refresh5.getStats().loadedFiles);
assertEquals(5, fml1Refresh5.getStats().skippedFiles);
assertEquals(6, fml1Refresh5.getLoadedFds().size());