This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 711797e7fbda6f30fc49d91e30ad6ab31a4f4a69 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Fri Aug 29 09:37:20 2025 +0200 IMPALA-14349: Encode FileDescriptors in time in loading Iceberg Tables With this patch we create Iceberg file descriptors from LocatedFileStatus objects during IcebergFileMetadataLoader's parallelListing(). This has the following benefits: * We parallelize the creation of Iceberg file descriptor objects * We don't need to maintain a large hash map with all the LocatedFileStatus objects at once. Now we only need to keep a few LocatedFileStatus objects per partition in memory while we are converting them to Iceberg file descriptors. I.e., the GC is free to destroy the LocatedFileStatus objects we don't use anymore. This patch retires startup flag 'iceberg_reload_new_files_threshold'. Since IMPALA-13254 we only list partitions that have new data files, and we load them in parallel, i.e. efficient incremental table loading is already covered. From that point the startup flag only added unnecessary code complexity. Measurements I created two tables (from tpcds.store_sales) to measure table loading times for large tables: Table #1: PARTITIONED BY SPEC(ss_item_sk, BUCKET(5, ss_sold_time_sk)) partitions: 107818 files: 754726 Table #2: PARTITIONED BY SPEC(ss_item_sk) partitions: 18000 files: 504224 Time taken in IcebergFileMetadataLoader.load() during full table reload: +----------+-------+------+---------+ | | Base | New | Speedup | +----------+-------+------+---------+ | Table #1 | 17.3s | 8.1s | 2.14 | | Table #2 | 7.8s | 4.3s | 1.8 | +----------+-------+------+---------+ I measured incremental table loading only for Table #2 (since there are more files per partition this is the worse scenario for the new code, as it only uses file listings, and each new file were created in a separate partition) Time taken in IcebergFileMetadataLoader.load() during incremental table reload: +------------+------+------+---------+ | #new files | Base | New | Speedup | +------------+------+------+---------+ | 1 | 1.4s | 1.6s | 0.9 | | 100 | 1.5s | 1.9s | 0.8 | | 200 | 1.5s | 1.5s | 1 | +------------+------+------+---------+ We lose a few tenths of a second, but I think the simplified code justifies it. Testing: * some tests were updated because we we don't have startup flag 'iceberg_reload_new_files_threshold' anymore Change-Id: Ia1c2a7119d76db7ce7c43caec2ccb122a014851b Reviewed-on: http://gerrit.cloudera.org:8080/23363 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/common/global-flags.cc | 6 +- be/src/util/backend-gflag-util.cc | 2 - common/thrift/BackendGflags.thrift | 2 - .../apache/impala/catalog/FileMetadataLoader.java | 7 +- .../impala/catalog/IcebergFileMetadataLoader.java | 234 ++++++++++----------- .../org/apache/impala/catalog/IcebergTable.java | 6 +- .../impala/catalog/ParallelFileMetadataLoader.java | 2 +- .../org/apache/impala/service/BackendConfig.java | 4 - .../impala/catalog/FileMetadataLoaderTest.java | 80 +------ 9 files changed, 125 insertions(+), 218 deletions(-) diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc index e7751e0ab..ee2d5533a 100644 --- a/be/src/common/global-flags.cc +++ b/be/src/common/global-flags.cc @@ -416,11 +416,6 @@ DEFINE_int64(update_catalogd_rpc_resend_interval_ms, 100, "(Advanced) Interval ( "with which the statestore resends the update catalogd RPC to a subscriber if the " "statestore has failed to send the RPC to the subscriber."); -DEFINE_int32(iceberg_reload_new_files_threshold, 100, "(Advanced) If during a table " - "refresh the number of new files are greater than this, catalogd will use a " - "recursive file listing to load file metadata. If number of new files are less or " - "equal to this, catalogd will load the file metadata one by one."); - DEFINE_bool(iceberg_allow_datafiles_in_table_location_only, true, "If true, Impala " "does not allow Iceberg data file locations outside of the table directory during " "reads"); @@ -500,6 +495,7 @@ REMOVED_FLAG(enable_partitioned_aggregation); REMOVED_FLAG(enable_partitioned_hash_join); REMOVED_FLAG(enable_phj_probe_side_filtering); REMOVED_FLAG(enable_rm); +REMOVED_FLAG(iceberg_reload_new_files_threshold); REMOVED_FLAG(kerberos_reinit_interval); REMOVED_FLAG(ldap_manual_config); REMOVED_FLAG(llama_addresses); diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index 90385f0f4..39dec440f 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -109,7 +109,6 @@ DECLARE_bool(enable_reload_events); DECLARE_string(geospatial_library); DECLARE_string(file_metadata_reload_properties); DECLARE_string(java_weigher); -DECLARE_int32(iceberg_reload_new_files_threshold); DECLARE_bool(enable_skipping_older_events); DECLARE_bool(enable_json_scanner); DECLARE_bool(iceberg_allow_datafiles_in_table_location_only); @@ -494,7 +493,6 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) { cfg.__set_thrift_rpc_max_message_size(ThriftInternalRpcMaxMessageSize()); cfg.__set_scan_range_cost_factor(FLAGS_scan_range_cost_factor); cfg.__set_use_jamm_weigher(FLAGS_java_weigher == "jamm"); - cfg.__set_iceberg_reload_new_files_threshold(FLAGS_iceberg_reload_new_files_threshold); cfg.__set_enable_skipping_older_events(FLAGS_enable_skipping_older_events); cfg.__set_enable_json_scanner(FLAGS_enable_json_scanner); cfg.__set_iceberg_allow_datafiles_in_table_location_only( diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index 445a40dc2..b186009c8 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -260,8 +260,6 @@ struct TBackendGflags { 114: required bool use_jamm_weigher - 115: required i32 iceberg_reload_new_files_threshold - 116: required bool enable_skipping_older_events; 117: required bool enable_json_scanner diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java index fce49a9fe..749721e6a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java @@ -307,14 +307,13 @@ public class FileMetadataLoader { protected FileDescriptor createFd(FileSystem fs, FileStatus fileStatus, String relPath, Reference<Long> numUnknownDiskIds, String absPath) throws IOException { - if (!FileSystemUtil.supportsStorageIds(fs)) { - return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath); - } BlockLocation[] locations; if (fileStatus instanceof LocatedFileStatus) { locations = ((LocatedFileStatus) fileStatus).getBlockLocations(); - } else { + } else if (FileSystemUtil.supportsStorageIds(fs)) { locations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); + } else { + return FileDescriptor.createWithNoBlocks(fileStatus, relPath, absPath); } return FileDescriptor.create(fileStatus, relPath, locations, hostIndex_, fileStatus.isEncrypted(), fileStatus.isErasureCoded(), numUnknownDiskIds, diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java index fbfb95fa9..c51904ccf 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergFileMetadataLoader.java @@ -17,32 +17,30 @@ package org.apache.impala.catalog; +import static org.apache.impala.catalog.ParallelFileMetadataLoader. + MAX_HDFS_PARTITIONS_PARALLEL_LOAD; import static org.apache.impala.catalog.ParallelFileMetadataLoader.TOTAL_THREADS; import static org.apache.impala.catalog.ParallelFileMetadataLoader.createPool; -import static org.apache.impala.catalog.ParallelFileMetadataLoader.getPoolSize; -import com.google.common.annotations.VisibleForTesting; +import com.codahale.metrics.Clock; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.iceberg.ContentFile; @@ -50,9 +48,9 @@ import org.apache.impala.catalog.FeFsTable.FileMetadataStats; import org.apache.impala.catalog.FeIcebergTable.Utils; import org.apache.impala.catalog.iceberg.GroupedContentFiles; import org.apache.impala.common.FileSystemUtil; +import org.apache.impala.common.PrintUtils; import org.apache.impala.common.Reference; import org.apache.impala.common.Pair; -import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.util.IcebergUtil; import org.apache.impala.util.ListMap; @@ -68,43 +66,20 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader { private final static Logger LOG = LoggerFactory.getLogger( IcebergFileMetadataLoader.class); - // Default value of 'newFilesThreshold_' if the given parameter or startup flag have - // invalid value. - private final int NEW_FILES_THRESHOLD_DEFAULT = 100; - private final org.apache.iceberg.Table iceTbl_; - - // If there are more new files than 'newFilesThreshold_', we should fall back - // to regular file metadata loading. - private final int newFilesThreshold_; - + private final Path tablePath_; private final GroupedContentFiles icebergFiles_; private final boolean requiresDataFilesInTableLocation_; - private boolean useParallelListing_; public IcebergFileMetadataLoader(org.apache.iceberg.Table iceTbl, Iterable<IcebergFileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex, GroupedContentFiles icebergFiles, boolean requiresDataFilesInTableLocation) { - this(iceTbl, oldFds, hostIndex, icebergFiles, requiresDataFilesInTableLocation, - BackendConfig.INSTANCE.icebergReloadNewFilesThreshold()); - } - - public IcebergFileMetadataLoader(org.apache.iceberg.Table iceTbl, - Iterable<IcebergFileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex, - GroupedContentFiles icebergFiles, boolean requiresDataFilesInTableLocation, - int newFilesThresholdParam) { super(iceTbl.location(), true, oldFds, hostIndex, null, null, HdfsFileFormat.ICEBERG); iceTbl_ = iceTbl; + tablePath_ = FileSystemUtil.createFullyQualifiedPath(new Path(iceTbl.location())); icebergFiles_ = icebergFiles; requiresDataFilesInTableLocation_ = requiresDataFilesInTableLocation; - if (newFilesThresholdParam >= 0) { - newFilesThreshold_ = newFilesThresholdParam; - } else { - newFilesThreshold_ = NEW_FILES_THRESHOLD_DEFAULT; - LOG.warn("Ignoring invalid new files threshold: {} " + - "using value: {}", newFilesThresholdParam, newFilesThreshold_); - } } @Override @@ -127,18 +102,17 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader { } private void loadInternal() throws CatalogException, IOException { - Path partPath = FileSystemUtil.createFullyQualifiedPath(new Path(partDir_)); loadedFds_ = new ArrayList<>(); loadStats_ = new LoadStats(partDir_); fileMetadataStats_ = new FileMetadataStats(); // Process the existing Fd ContentFile and return the newly added ContentFile - Iterable<ContentFile<?>> newContentFiles = loadContentFilesWithOldFds(partPath); + Iterable<ContentFile<?>> newContentFiles = loadContentFilesWithOldFds(tablePath_); // Iterate through all the newContentFiles, determine if StorageIds are supported, // and use different handling methods accordingly. // This considers that different ContentFiles are on different FileSystems - List<Pair<FileSystem, ContentFile<?>>> filesSupportsStorageIds = Lists.newArrayList(); - FileSystem fsForTable = FileSystemUtil.getFileSystemForPath(partPath); + List<ContentFile<?>> filesSupportsStorageIds = Lists.newArrayList(); + FileSystem fsForTable = FileSystemUtil.getFileSystemForPath(tablePath_); FileSystem defaultFs = FileSystemUtil.getDefaultFileSystem(); for (ContentFile<?> contentFile : newContentFiles) { FileSystem fsForPath = fsForTable; @@ -152,56 +126,28 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader { // If the specific fs does not support StorageIds, then // we create FileDescriptor directly if (FileSystemUtil.supportsStorageIds(fsForPath)) { - filesSupportsStorageIds.add(Pair.create(fsForPath, contentFile)); + filesSupportsStorageIds.add(contentFile); } else { - IcebergFileDescriptor fd = createFd(fsForPath, contentFile, null, partPath, null); - loadedFds_.add(fd); - fileMetadataStats_.accumulate(fd); - ++loadStats_.loadedFiles; + IcebergFileDescriptor fd = createNonLocatedFd(fsForPath, contentFile, tablePath_); + registerNewlyLoadedFd(fd); } } - // If the number of filesSupportsStorageIds are greater than newFilesThreshold, - // we will use a recursive file listing to load file metadata. If number of new - // files are less or equal to this, we will load the metadata of the newly added - // files one by one - useParallelListing_ = filesSupportsStorageIds.size() > newFilesThreshold_; - Reference<Long> numUnknownDiskIds = new Reference<>(0L); - Map<Path, FileStatus> nameToFileStatus = Collections.emptyMap(); - if (useParallelListing_) { - nameToFileStatus = parallelListing(filesSupportsStorageIds); - } - for (Pair<FileSystem, ContentFile<?>> contentFileInfo : filesSupportsStorageIds) { - Path path = FileSystemUtil.createFullyQualifiedPath( - new Path(contentFileInfo.getSecond().path().toString())); - FileStatus stat = nameToFileStatus.get(path); - loadFdFromStorage(contentFileInfo, stat, partPath, numUnknownDiskIds); + AtomicLong numUnknownDiskIds = new AtomicLong(); + List<IcebergFileDescriptor> newFds = parallelListing(filesSupportsStorageIds, + numUnknownDiskIds); + for (IcebergFileDescriptor fd : newFds) { + registerNewlyLoadedFd(fd); } - loadStats_.unknownDiskIds += numUnknownDiskIds.getRef(); + loadStats_.unknownDiskIds += numUnknownDiskIds.get(); if (LOG.isTraceEnabled()) { LOG.trace(loadStats_.debugString()); } } - private void loadFdFromStorage(Pair<FileSystem, ContentFile<?>> contentFileInfo, - FileStatus stat, Path partPath, Reference<Long> numUnknownDiskIds) - throws CatalogException { - try { - IcebergFileDescriptor fd = createFd(contentFileInfo.getFirst(), - contentFileInfo.getSecond(), stat, partPath, numUnknownDiskIds); - loadedFds_.add(fd); - ++loadStats_.loadedFiles; - fileMetadataStats_.accumulate(fd); - } catch (IOException e) { - StringWriter w = new StringWriter(); - e.printStackTrace(new PrintWriter(w)); - LOG.warn(String.format("Failed to load Iceberg content file: '%s' Caused by: %s", - contentFileInfo.getSecond().path().toString(), w)); - } - } - - @VisibleForTesting - boolean useParallelListing() { - return useParallelListing_; + private void registerNewlyLoadedFd(IcebergFileDescriptor fd) { + loadedFds_.add(fd); + fileMetadataStats_.accumulate(fd); + ++loadStats_.loadedFiles; } /** @@ -227,20 +173,38 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader { return newContentFiles; } - private IcebergFileDescriptor createFd(FileSystem fs, ContentFile<?> contentFile, + private IcebergFileDescriptor createNonLocatedFd(FileSystem fs, + ContentFile<?> contentFile, Path partPath) throws CatalogException, IOException { + Path fileLoc = FileSystemUtil.createFullyQualifiedPath( + new Path(contentFile.path().toString())); + // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus ourselves. + FileStatus stat = Utils.createFileStatus(contentFile, fileLoc); + + Pair<String, String> absPathRelPath = getAbsPathRelPath(partPath, stat); + String absPath = absPathRelPath.first; + String relPath = absPathRelPath.second; + + return IcebergFileDescriptor.cloneWithFileMetadata( + createFd(fs, stat, relPath, null, absPath), + IcebergUtil.createIcebergMetadata(iceTbl_, contentFile)); + } + + private IcebergFileDescriptor createLocatedFd(ContentFile<?> contentFile, FileStatus stat, Path partPath, Reference<Long> numUnknownDiskIds) throws CatalogException, IOException { - if (stat == null) { - Path fileLoc = FileSystemUtil.createFullyQualifiedPath( - new Path(contentFile.path().toString())); - if (FileSystemUtil.supportsStorageIds(fs)) { - stat = Utils.createLocatedFileStatus(fileLoc, fs); - } else { - // For OSS service (e.g. S3A, COS, OSS, etc), we create FileStatus ourselves. - stat = Utils.createFileStatus(contentFile, fileLoc); - } - } + Preconditions.checkState(stat instanceof LocatedFileStatus); + + Pair<String, String> absPathRelPath = getAbsPathRelPath(partPath, stat); + String absPath = absPathRelPath.first; + String relPath = absPathRelPath.second; + return IcebergFileDescriptor.cloneWithFileMetadata( + createFd(null, stat, relPath, numUnknownDiskIds, absPath), + IcebergUtil.createIcebergMetadata(iceTbl_, contentFile)); + } + + Pair<String, String> getAbsPathRelPath(Path partPath, FileStatus stat) + throws TableLoadingException { String absPath = null; String relPath = FileSystemUtil.relativizePathNoThrow(stat.getPath(), partPath); if (relPath == null) { @@ -251,70 +215,100 @@ public class IcebergFileMetadataLoader extends FileMetadataLoader { absPath = stat.getPath().toString(); } } - - return IcebergFileDescriptor.cloneWithFileMetadata( - createFd(fs, stat, relPath, numUnknownDiskIds, absPath), - IcebergUtil.createIcebergMetadata(iceTbl_, contentFile)); + return new Pair<>(absPath, relPath); } /** * Using a thread pool to perform parallel List operations on the FileSystem, this takes * into account the situation where multiple FileSystems exist within the ContentFiles. */ - private Map<Path, FileStatus> parallelListing( - Iterable<Pair<FileSystem, ContentFile<?>>> contentFiles) throws IOException { - final Set<Path> partitionPaths = collectPartitionPaths(contentFiles); - if (partitionPaths.size() == 0) return Collections.emptyMap(); + private List<IcebergFileDescriptor> parallelListing( + List<ContentFile<?>> contentFiles, + AtomicLong numUnknownDiskIds) throws IOException { + final Map<Path, List<ContentFile<?>>> partitionPaths = + collectPartitionPaths(contentFiles); + if (partitionPaths.isEmpty()) return Collections.emptyList(); + List<IcebergFileDescriptor> ret = new ArrayList<>(); String logPrefix = "Parallel Iceberg file metadata listing"; - // Use the file system type of the table's root path as - // the basis for determining the pool size. - int poolSize = getPoolSize(partitionPaths.size(), - FileSystemUtil.getFileSystemForPath(partDir_)); + int poolSize = getPoolSize(partitionPaths.size()); ExecutorService pool = createPool(poolSize, logPrefix); TOTAL_THREADS.addAndGet(poolSize); - Map<Path, FileStatus> nameToFileStatus = Maps.newConcurrentMap(); try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix)) { TOTAL_TASKS.addAndGet(partitionPaths.size()); - List<Future<Void>> tasks = - partitionPaths.stream() - .map(path -> pool.submit(() -> { + List<Future<List<IcebergFileDescriptor>>> tasks = + partitionPaths.entrySet().stream() + .map(entry -> pool.submit(() -> { try { - return listingTask(path, nameToFileStatus); + return createFdsForPartition(entry.getKey(), entry.getValue(), + numUnknownDiskIds); } finally { TOTAL_TASKS.decrementAndGet(); } })) .collect(Collectors.toList()); - for (Future<Void> task : tasks) { task.get(); } + for (Future<List<IcebergFileDescriptor>> task : tasks) { + ret.addAll(task.get()); + } } catch (ExecutionException | InterruptedException e) { throw new IOException(String.format("%s: failed to load paths.", logPrefix), e); } finally { TOTAL_THREADS.addAndGet(-poolSize); pool.shutdown(); } - return nameToFileStatus; + return ret; } - private Set<Path> collectPartitionPaths( - Iterable<Pair<FileSystem, ContentFile<?>>> contentFiles) { - return StreamSupport.stream(contentFiles.spliterator(), false) - .map(contentFile -> - new Path(String.valueOf(contentFile.getSecond().path())).getParent()) - .collect(Collectors.toSet()); + private Map<Path, List<ContentFile<?>>> collectPartitionPaths( + List<ContentFile<?>> contentFiles) { + final Clock clock = Clock.defaultClock(); + long startTime = clock.getTick(); + Map<Path, List<ContentFile<?>>> ret = contentFiles.stream() + .collect(Collectors.groupingBy( + cf -> new Path(String.valueOf(cf.path())).getParent(), + HashMap::new, + Collectors.toList() + )); + long duration = clock.getTick() - startTime; + LOG.info("Collected {} Iceberg content files into {} partitions. Duration: {}", + contentFiles.size(), ret.size(), PrintUtils.printTimeNs(duration)); + return ret; } - private Void listingTask(Path partitionPath, - Map<Path, FileStatus> nameToFileStatus) throws IOException { + /** + * Returns thread pool size for listing files in parallel from storage systems that + * provide block location information. + */ + private static int getPoolSize(int numLoaders) { + return Math.min(numLoaders, MAX_HDFS_PARTITIONS_PARALLEL_LOAD); + } + + private List<IcebergFileDescriptor> createFdsForPartition(Path partitionPath, + List<ContentFile<?>> contentFiles, AtomicLong numUnknownDiskIds) + throws IOException, CatalogException { FileSystem fs = FileSystemUtil.getFileSystemForPath(partitionPath); RemoteIterator<? extends FileStatus> remoteIterator = FileSystemUtil.listFiles(fs, partitionPath, recursive_, debugAction_); - Map<Path, FileStatus> perThreadMapping = new HashMap<>(); + Map<Path, FileStatus> pathToFileStatus = new HashMap<>(); while (remoteIterator.hasNext()) { FileStatus status = remoteIterator.next(); - perThreadMapping.put(status.getPath(), status); + pathToFileStatus.put(status.getPath(), status); + } + List<IcebergFileDescriptor> ret = new ArrayList<>(); + Reference<Long> localNumUnknownDiskIds = new Reference<>(0L); + for (ContentFile<?> contentFile : contentFiles) { + Path path = FileSystemUtil.createFullyQualifiedPath( + new Path(contentFile.path().toString())); + FileStatus stat = pathToFileStatus.get(path); + if (stat == null) { + LOG.warn(String.format( + "Failed to load Iceberg content file: '%s', Not found on storage", + contentFile.path().toString())); + continue; + } + ret.add(createLocatedFd(contentFile, stat, tablePath_, localNumUnknownDiskIds)); } - nameToFileStatus.putAll(perThreadMapping); - return null; + numUnknownDiskIds.addAndGet(localNumUnknownDiskIds.getRef()); + return ret; } IcebergFileDescriptor getOldFd(ContentFile<?> contentFile, Path partPath) diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index 6a71f007b..a55068306 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -47,6 +47,7 @@ import org.apache.impala.analysis.IcebergPartitionSpec; import org.apache.impala.analysis.IcebergPartitionTransform; import org.apache.impala.catalog.iceberg.GroupedContentFiles; import org.apache.impala.common.ImpalaRuntimeException; +import org.apache.impala.common.PrintUtils; import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.CatalogLookupStatus; import org.apache.impala.thrift.TAlterTableUpdateStatsParams; @@ -513,7 +514,7 @@ public class IcebergTable extends Table implements FeIcebergTable { ((BaseTable)icebergApiTable_).operations().current().metadataFileLocation(); GroupedContentFiles icebergFiles = IcebergUtil.getIcebergFiles(this, new ArrayList<>(), /*timeTravelSpec=*/null); - catalogTimeline.markEvent("Loaded Iceberg files"); + catalogTimeline.markEvent("Loaded Iceberg content file list"); // We use IcebergFileMetadataLoader directly to load file metadata, so we don't // want 'hdfsTable_' to do any file loading. hdfsTable_.setSkipIcebergFileMetadataLoading(true); @@ -527,6 +528,7 @@ public class IcebergTable extends Table implements FeIcebergTable { getHostIndex(), Preconditions.checkNotNull(icebergFiles), Utils.requiresDataFilesInTableLocation(this)); loader.load(); + catalogTimeline.markEvent("Loaded Iceberg file descriptors"); fileStore_ = new IcebergContentFileStore( icebergApiTable_, loader.getLoadedIcebergFds(), icebergFiles); partitionStats_ = Utils.loadPartitionStats(this, icebergFiles); @@ -539,6 +541,8 @@ public class IcebergTable extends Table implements FeIcebergTable { } finally { storageMetadataLoadTime_ = ctxStorageLdTime.stop(); } + LOG.info("Loaded file and block metadata for {}. Time taken: {}", + getFullName(), PrintUtils.printTimeNs(storageMetadataLoadTime_)); } private boolean canSkipReload() { diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java index 39d42380a..1d5a0c6b6 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java @@ -62,7 +62,7 @@ public class ParallelFileMetadataLoader { private final static Logger LOG = LoggerFactory.getLogger( ParallelFileMetadataLoader.class); - private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD = + public static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD = BackendConfig.INSTANCE.maxHdfsPartsParallelLoad(); private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD = BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad(); diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index a2b44df2b..a53e8b0cd 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -432,10 +432,6 @@ public class BackendConfig { return backendCfg_.use_jamm_weigher; } - public int icebergReloadNewFilesThreshold() { - return backendCfg_.iceberg_reload_new_files_threshold; - } - public boolean icebergAllowDatafileInTableLocationOnly() { return backendCfg_.iceberg_allow_datafiles_in_table_location_only; } diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java index 75188e1e5..27d095d26 100644 --- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java @@ -212,7 +212,6 @@ public class FileMetadataLoaderTest { /* oldFds = */ fml1.getLoadedIcebergFds(), /* requiresDataFilesInTableLocation = */ true); fml1Refresh.load(); - assertFalse(fml1Refresh.useParallelListing()); assertEquals(0, fml1Refresh.getStats().loadedFiles); assertEquals(20, fml1Refresh.getStats().skippedFiles); assertEquals(20, fml1Refresh.getLoadedFds().size()); @@ -235,7 +234,6 @@ public class FileMetadataLoaderTest { /* oldFds = */ fml2.getLoadedIcebergFds(), /* requiresDataFilesInTableLocation = */ true); fml2Refresh.load(); - assertFalse(fml2Refresh.useParallelListing()); assertEquals(0, fml2Refresh.getStats().loadedFiles); assertEquals(20, fml2Refresh.getStats().skippedFiles); assertEquals(20, fml2Refresh.getLoadedFds().size()); @@ -262,7 +260,6 @@ public class FileMetadataLoaderTest { /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10), /* requiresDataFilesInTableLocation = */ true); fml1Refresh.load(); - assertFalse(fml1Refresh.useParallelListing()); assertEquals(10, fml1Refresh.getStats().loadedFiles); assertEquals(10, fml1Refresh.getStats().skippedFiles); assertEquals(20, fml1Refresh.getLoadedFds().size()); @@ -278,75 +275,11 @@ public class FileMetadataLoaderTest { /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10), /* requiresDataFilesInTableLocation = */ true); fml2Refresh.load(); - assertFalse(fml2Refresh.useParallelListing()); assertEquals(10, fml2Refresh.getStats().loadedFiles); assertEquals(10, fml2Refresh.getStats().skippedFiles); assertEquals(20, fml2Refresh.getLoadedFds().size()); } - @Test - public void testIcebergNewFilesThreshold() throws IOException, CatalogException { - CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create(); - IcebergFileMetadataLoader fml1 = getLoaderForIcebergTable(catalog, - "functional_parquet", "iceberg_partitioned", - /* oldFds = */ Collections.emptyList(), - /* requiresDataFilesInTableLocation = */ true); - fml1.load(); - - IcebergFileMetadataLoader fml1ForceRefresh = getLoaderForIcebergTable(catalog, - "functional_parquet", "iceberg_partitioned", - /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10), - /* requiresDataFilesInTableLocation = */ true, 10); - fml1ForceRefresh.setForceRefreshBlockLocations(true); - fml1ForceRefresh.load(); - assertTrue(fml1ForceRefresh.useParallelListing()); - - IcebergFileMetadataLoader fml1Refresh = getLoaderForIcebergTable(catalog, - "functional_parquet", "iceberg_partitioned", - /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10), - /* requiresDataFilesInTableLocation = */ true, 10); - fml1Refresh.setForceRefreshBlockLocations(false); - fml1Refresh.load(); - assertFalse(fml1Refresh.useParallelListing()); - - IcebergFileMetadataLoader fml1Refresh10 = getLoaderForIcebergTable(catalog, - "functional_parquet", "iceberg_partitioned", - /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10), - /* requiresDataFilesInTableLocation = */ true, 10); - fml1Refresh10.load(); - assertFalse(fml1Refresh10.useParallelListing()); - IcebergFileMetadataLoader fml1Refresh9 = getLoaderForIcebergTable(catalog, - "functional_parquet", "iceberg_partitioned", - /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 10), - /* requiresDataFilesInTableLocation = */ true, 9); - fml1Refresh9.load(); - assertTrue(fml1Refresh9.useParallelListing()); - - - IcebergFileMetadataLoader fml2 = getLoaderForIcebergTable(catalog, - "functional_parquet", "iceberg_non_partitioned", - /* oldFds = */ Collections.emptyList(), - /* requiresDataFilesInTableLocation = */ true); - fml2.load(); - - IcebergFileMetadataLoader fml2Refresh = getLoaderForIcebergTable(catalog, - "functional_parquet", "iceberg_non_partitioned", - /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10), - /* requiresDataFilesInTableLocation = */ true); - IcebergFileMetadataLoader fml2Refresh10 = getLoaderForIcebergTable(catalog, - "functional_parquet", "iceberg_non_partitioned", - /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10), - /* requiresDataFilesInTableLocation = */ true, 10); - fml2Refresh10.load(); - assertFalse(fml2Refresh10.useParallelListing()); - IcebergFileMetadataLoader fml2Refresh9 = getLoaderForIcebergTable(catalog, - "functional_parquet", "iceberg_non_partitioned", - /* oldFds = */ fml2.getLoadedIcebergFds().subList(0, 10), - /* requiresDataFilesInTableLocation = */ true, 9); - fml2Refresh9.load(); - assertTrue(fml2Refresh9.useParallelListing()); - } - @Test public void testIcebergMultipleStorageLocations() throws IOException, CatalogException { CatalogServiceCatalog catalog = CatalogServiceTestCatalog.create(); @@ -362,7 +295,6 @@ public class FileMetadataLoaderTest { /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 1), /* requiresDataFilesInTableLocation = */ false); fml1Refresh1.load(); - assertFalse(fml1Refresh1.useParallelListing()); assertEquals(5, fml1Refresh1.getStats().loadedFiles); assertEquals(1, fml1Refresh1.getStats().skippedFiles); assertEquals(6, fml1Refresh1.getLoadedFds().size()); @@ -372,7 +304,6 @@ public class FileMetadataLoaderTest { /* oldFds = */ fml1.getLoadedIcebergFds().subList(0, 5), /* requiresDataFilesInTableLocation = */ false); fml1Refresh5.load(); - assertFalse(fml1Refresh5.useParallelListing()); assertEquals(1, fml1Refresh5.getStats().loadedFiles); assertEquals(5, fml1Refresh5.getStats().skippedFiles); assertEquals(6, fml1Refresh5.getLoadedFds().size()); @@ -382,15 +313,6 @@ public class FileMetadataLoaderTest { CatalogServiceCatalog catalog, String dbName, String tblName, List<IcebergFileDescriptor> oldFds, boolean requiresDataFilesInTableLocation) throws CatalogException { - return getLoaderForIcebergTable(catalog, dbName, tblName, oldFds, - requiresDataFilesInTableLocation, -1); - } - - private IcebergFileMetadataLoader getLoaderForIcebergTable( - CatalogServiceCatalog catalog, String dbName, String tblName, - List<IcebergFileDescriptor> oldFds, boolean requiresDataFilesInTableLocation, - int newFilesThreshold) - throws CatalogException { ListMap<TNetworkAddress> hostIndex = new ListMap<>(); FeIcebergTable iceT = (FeIcebergTable)catalog.getOrLoadTable( dbName, tblName, "test", null); @@ -398,7 +320,7 @@ public class FileMetadataLoaderTest { GroupedContentFiles iceFiles = IcebergUtil.getIcebergFiles(iceT, /*predicates=*/Collections.emptyList(), /*timeTravelSpec=*/null); return new IcebergFileMetadataLoader(iceT.getIcebergApiTable(), - oldFds, hostIndex, iceFiles, requiresDataFilesInTableLocation, newFilesThreshold); + oldFds, hostIndex, iceFiles, requiresDataFilesInTableLocation); } private FileMetadataLoader getLoaderForAcidTable(
