This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit cee2d01f52a75ca8d2c8eb9eb0f57d39d4db9c9b Author: Michael Smith <[email protected]> AuthorDate: Thu May 8 16:56:04 2025 -0700 IMPALA-12162: Use thread pool to collect checksums Refactors ParallelFileMetadataLoader to be usable for multiple types of metadata. Uses it to collect checksums for new files in parallel. Testing: adds test that multiple loading threads are used and checksum does not take too long. Change-Id: I314621104e4757620c0a90d41dd6875bf8855b51 Reviewed-on: http://gerrit.cloudera.org:8080/22872 Reviewed-by: Riza Suminto <[email protected]> Reviewed-by: Quanlong Huang <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/catalog/ParallelFileMetadataLoader.java | 139 ++++++++++++--------- .../apache/impala/service/CatalogOpExecutor.java | 78 ++++++++---- .../java/org/apache/impala/util/DebugUtils.java | 4 + tests/query_test/test_insert.py | 23 ++++ 4 files changed, 162 insertions(+), 82 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java index 385f3298a..39d42380a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java @@ -22,13 +22,16 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import javax.annotation.Nullable; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; @@ -71,15 +74,24 @@ public class ParallelFileMetadataLoader { private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100; private final String logPrefix_; - private final Map<String, FileMetadataLoader> loaders_; - private final Map<String, List<HdfsPartition.Builder>> partsByPath_; private final FileSystem fs_; + // Loaders run in parallel and return a result Object. + private final List<Pair<String, Callable<Object>>> loaders_; + // Updaters are run sequentially after the loaders finish. They process the result + // Object returned by the corresponding loader. + private final List<Consumer<Object>> updaters_; + + /** + * Constructs a loader for file descriptors and metadata stats. + */ public ParallelFileMetadataLoader(FileSystem fs, Collection<Builder> partBuilders, ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean isRecursive, @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String logPrefix) { + fs_ = fs; + logPrefix_ = logPrefix; if (writeIdList != null || validTxnList != null) { // make sure that both either both writeIdList and validTxnList are set or both // of them are not. @@ -87,72 +99,82 @@ public class ParallelFileMetadataLoader { } // Group the partitions by their path (multiple partitions may point to the same // path). - partsByPath_ = Maps.newHashMap(); + Map<String, List<HdfsPartition.Builder>> partsByPath = Maps.newHashMap(); for (HdfsPartition.Builder p : partBuilders) { - partsByPath_.computeIfAbsent(p.getLocation(), (path) -> new ArrayList<>()) - .add(p); + partsByPath.computeIfAbsent(p.getLocation(), (path) -> new ArrayList<>()).add(p); } - // Create a FileMetadataLoader for each path. - loaders_ = Maps.newHashMap(); - for (Map.Entry<String, List<HdfsPartition.Builder>> e : partsByPath_.entrySet()) { - List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors(); - FileMetadataLoader loader; - HdfsFileFormat format = e.getValue().get(0).getFileFormat(); + // Create a loader (FileMetadataLoader) and updater for each path. + loaders_ = new ArrayList<>(partsByPath.size()); + updaters_ = new ArrayList<>(partsByPath.size()); + for (Map.Entry<String, List<HdfsPartition.Builder>> e : partsByPath.entrySet()) { + List<HdfsPartition.Builder> builders = e.getValue(); + List<FileDescriptor> oldFds = builders.get(0).getFileDescriptors(); + HdfsFileFormat format = builders.get(0).getFileFormat(); Preconditions.checkState(!HdfsFileFormat.ICEBERG.equals(format)); - loader = new FileMetadataLoader(e.getKey(), isRecursive, oldFds, hostIndex, - validTxnList, writeIdList, format); + FileMetadataLoader loader = new FileMetadataLoader(e.getKey(), isRecursive, oldFds, + hostIndex, validTxnList, writeIdList, format); // If there is a cached partition mapped to this path, we recompute the block // locations even if the underlying files have not changed. // This is done to keep the cached block metadata up to date. - boolean hasCachedPartition = Iterables.any(e.getValue(), + boolean hasCachedPartition = Iterables.any(builders, HdfsPartition.Builder::isMarkedCached); loader.setForceRefreshBlockLocations(hasCachedPartition); loader.setDebugAction(debugAction); - loaders_.put(e.getKey(), loader); + + loaders_.add(new Pair<>(e.getKey(), () -> { loader.load(); return loader; })); + + updaters_.add((result) -> + updatePartBuilders((FileMetadataLoader) result, builders)); } - this.logPrefix_ = logPrefix; - this.fs_ = fs; } /** - * Loads the file metadata for the given list of Partitions in the constructor. If the - * load is successful also set the fileDescriptors in the HdfsPartition.Builders. - * @throws TableLoadingException + * Store the loaded FDs from loader into the partitions via builders. */ - void load() throws TableLoadingException { - loadInternal(); - - // Store the loaded FDs into the partitions. - for (Map.Entry<String, List<HdfsPartition.Builder>> e : partsByPath_.entrySet()) { - FileMetadataLoader loader = loaders_.get(e.getKey()); - - for (HdfsPartition.Builder partBuilder : e.getValue()) { - // Checks if we can reuse the old file descriptors. Partition builders in the list - // may have different old file descriptors. We need to verify them one by one. - if ((!loader.hasFilesChangedCompareTo(partBuilder.getFileDescriptors()))) { - LOG.trace("Detected files unchanged on partition {}", - partBuilder.getPartitionName()); - continue; - } - partBuilder.clearFileDescriptors(); - List<FileDescriptor> deleteDescriptors = loader.getLoadedDeleteDeltaFds(); - if (deleteDescriptors != null && !deleteDescriptors.isEmpty()) { - partBuilder.setInsertFileDescriptors(loader.getLoadedInsertDeltaFds()); - partBuilder.setDeleteFileDescriptors(loader.getLoadedDeleteDeltaFds()); - } else { - partBuilder.setFileDescriptors(loader.getLoadedFds()); - } - partBuilder.setFileMetadataStats(loader.getFileMetadataStats()); + private static void updatePartBuilders(FileMetadataLoader loader, + List<HdfsPartition.Builder> builders) { + for (HdfsPartition.Builder partBuilder : builders) { + // Checks if we can reuse the old file descriptors. Partition builders in the + // list may have different file descriptors. We need to verify them one by one. + if ((!loader.hasFilesChangedCompareTo(partBuilder.getFileDescriptors()))) { + LOG.trace("Detected files unchanged on partition {}", + partBuilder.getPartitionName()); + continue; + } + partBuilder.clearFileDescriptors(); + List<FileDescriptor> deleteDescriptors = loader.getLoadedDeleteDeltaFds(); + if (deleteDescriptors != null && !deleteDescriptors.isEmpty()) { + partBuilder.setInsertFileDescriptors(loader.getLoadedInsertDeltaFds()); + partBuilder.setDeleteFileDescriptors(loader.getLoadedDeleteDeltaFds()); + } else { + partBuilder.setFileDescriptors(loader.getLoadedFds()); } + partBuilder.setFileMetadataStats(loader.getFileMetadataStats()); } } /** - * Call 'load()' in parallel on all of the loaders. If any loaders fail, throws - * an exception. However, any successful loaders are guaranteed to complete - * before any exception is thrown. + * Constructs a loader from provided loaders and updaters. + */ + public ParallelFileMetadataLoader(FileSystem fs, String logPrefix, + List<Pair<String, Callable<Object>>> loaders, + List<Consumer<Object>> updaters) { + Preconditions.checkNotNull(loaders); + Preconditions.checkNotNull(updaters); + Preconditions.checkArgument(loaders.size() == updaters.size()); + fs_ = fs; + logPrefix_ = logPrefix; + loaders_ = loaders; + updaters_ = updaters; + } + + /** + * Loads the file metadata for the given list of Partitions in the constructor. If the + * load is successful also set the fileDescriptors in the HdfsPartition.Builders. Any + * successful loaders are guaranteed to complete before any exception is thrown. + * @throws TableLoadingException */ - private void loadInternal() throws TableLoadingException { + public void load() throws TableLoadingException { if (loaders_.isEmpty()) return; int failedLoadTasks = 0; @@ -161,21 +183,22 @@ public class ParallelFileMetadataLoader { TOTAL_THREADS.addAndGet(poolSize); try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix_)) { TOTAL_TABLES.incrementAndGet(); - List<Pair<FileMetadataLoader, Future<Void>>> futures = - new ArrayList<>(loaders_.size()); - for (FileMetadataLoader loader : loaders_.values()) { - futures.add(new Pair<>( - loader, pool.submit(() -> { loader.load(); return null; }))); + List<Pair<String, Future<Object>>> futures = new ArrayList<>(loaders_.size()); + for (Pair<String, Callable<Object>> loader : loaders_) { + futures.add(new Pair<>(loader.first, pool.submit(loader.second))); } // Wait for the loaders to finish. + Preconditions.checkState(futures.size() == updaters_.size()); for (int i = 0; i < futures.size(); i++) { + Pair<String, Future<Object>> future = futures.get(i); try { - futures.get(i).second.get(); + Object result = future.second.get(); + updaters_.get(i).accept(result); } catch (ExecutionException | InterruptedException e) { if (++failedLoadTasks <= MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG) { - LOG.error(logPrefix_ + " encountered an error loading data for path " + - futures.get(i).first.getPartDir(), e); + LOG.error("{} encountered an error loading data for path {}", + logPrefix_, future.first, e); } } } @@ -187,8 +210,8 @@ public class ParallelFileMetadataLoader { if (failedLoadTasks > 0) { int errorsNotLogged = failedLoadTasks - MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG; if (errorsNotLogged > 0) { - LOG.error(logPrefix_ + " error loading {} paths. Only the first {} errors " + - "were logged", failedLoadTasks, MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG); + LOG.error("{} error loading {} paths. Only the first {} errors were logged", + logPrefix_, failedLoadTasks, MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG); } throw new TableLoadingException(logPrefix_ + ": failed to load " + failedLoadTasks + " paths. Check the catalog server log for more details."); diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 9e1683098..ed5c1e8cb 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -48,8 +48,10 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import javax.annotation.Nullable; import org.apache.hadoop.fs.FSDataInputStream; @@ -125,6 +127,7 @@ import org.apache.impala.catalog.HiveStorageDescriptorFactory; import org.apache.impala.catalog.IncompleteTable; import org.apache.impala.catalog.KuduColumn; import org.apache.impala.catalog.KuduTable; +import org.apache.impala.catalog.ParallelFileMetadataLoader; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.PartitionNotFoundException; import org.apache.impala.catalog.PartitionStatsUtil; @@ -7345,8 +7348,8 @@ public class CatalogOpExecutor { final FeFsTable feFsTable = (FeFsTable) table; // Collect file checksums (and ACID dir path) before taking table lock. - Map<String, List<FileMetadata>> fileMetadata = getFileMetadata( - feFsTable, update.getUpdated_partitions(), catalogTimeline); + Map<String, List<FileMetadata>> fileMetadata = getFileMetadata(feFsTable, + update.getUpdated_partitions(), update.getDebug_action(), catalogTimeline); tryWriteLock(table, "updating the catalog", catalogTimeline); final Timer.Context context @@ -7864,45 +7867,72 @@ public class CatalogOpExecutor { } } + private FileSystem getFileSystemOrNull(FeFsTable table) { + try { + return table.getFileSystem(); + } catch (CatalogException e) { + LOG.warn("Failed to get FileSystem for table {}", table.getFullName(), e); + } + return null; + } + /** * Returns a map of partition name to list of file metadata: name, checksum, and * acidDirPath. Logs errors on individual files. */ private Map<String, List<FileMetadata>> getFileMetadata(FeFsTable table, - Map<String, TUpdatedPartition> updatedPartitions, EventSequence catalogTimeline) { + Map<String, TUpdatedPartition> updatedPartitions, String debugAction, + EventSequence catalogTimeline) { if (!shouldGenerateInsertEvents(table)) return null; boolean isPartitioned = table.isPartitioned(); boolean isTransactional = AcidUtils.isTransactionalTable(table); // Get table file system with table location. - FileSystem tableFs = null; - try { - tableFs = table.getFileSystem(); - } catch (CatalogException e) { - LOG.warn("Failed to get FileSystem for table {}", table.getFullName(), e); - } + final FileSystem tableFs = getFileSystemOrNull(table); + // Create a loader and updater for each path. + List<Pair<String, Callable<Object>>> loaders = new ArrayList<>(); + List<Consumer<Object>> updaters = new ArrayList<>(); Map<String, List<FileMetadata>> fileMetadata = Maps.newHashMap(); + int numFiles = 0; for (Map.Entry<String, TUpdatedPartition> e : updatedPartitions.entrySet()) { + numFiles += e.getValue().getFiles().size(); List<FileMetadata> files = new ArrayList<>(e.getValue().getFiles().size()); for (String file : e.getValue().getFiles()) { - FileChecksum checksum = null; - String acidDirPath = null; - try { - Path filePath = new Path(file); - FileSystem fs = (isPartitioned || tableFs == null) ? - FeFsTable.getFileSystem(filePath) : tableFs; - checksum = fs.getFileChecksum(filePath); - if (isTransactional) { - acidDirPath = AcidUtils.getFirstLevelAcidDirPath(filePath, fs); + loaders.add(new Pair<>(file, () -> { + FileChecksum checksum = null; + String acidDirPath = null; + try { + Path filePath = new Path(file); + FileSystem fs = (isPartitioned || tableFs == null) ? + FeFsTable.getFileSystem(filePath) : tableFs; + if (debugAction != null) { + DebugUtils.executeDebugAction( + debugAction, DebugUtils.LOAD_FILE_CHECKSUMS_DELAY); + } + checksum = fs.getFileChecksum(filePath); + if (isTransactional) { + acidDirPath = AcidUtils.getFirstLevelAcidDirPath(filePath, fs); + } + } catch (CatalogException | IOException ex) { + LOG.error("Failed to collect insert metadata for {} in table {}", + file, table.getFullName(), ex); } - } catch (CatalogException | IOException ex) { - LOG.error("Failed to collect insert metadata for {} in table {}", - file, table.getFullName(), ex); - } - files.add(new FileMetadata(file, checksum, acidDirPath)); + return new FileMetadata(file, checksum, acidDirPath); + })); + updaters.add((result) -> files.add((FileMetadata) result)); + fileMetadata.put(e.getKey(), files); } - fileMetadata.put(e.getKey(), files); + } + + String logPrefix = String.format("Loading file checksums for %s paths for table %s", + numFiles, table.getFullName()); + try { + new ParallelFileMetadataLoader( + table.getFileSystem(), logPrefix, loaders, updaters).load(); + } catch (CatalogException e) { + LOG.error("Failed to collect insert metadata for table {}", + table.getFullName(), e); } catalogTimeline.markEvent("Collected file checksums"); return fileMetadata; diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java b/fe/src/main/java/org/apache/impala/util/DebugUtils.java index 17b82b9f9..89e9d978c 100644 --- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java +++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java @@ -87,6 +87,10 @@ public class DebugUtils { // debug action label for introducing delay in loading table metadata. public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay"; + // debug action label for introducing delay in loading file checksums. + public static final String LOAD_FILE_CHECKSUMS_DELAY = + "catalogd_load_file_checksums_delay"; + // debug action label for introducing delay in HMS alter_table rename RPC. public static final String TABLE_RENAME_DELAY = "catalogd_table_rename_delay"; diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py index 0a639c2cf..0a8ef01da 100644 --- a/tests/query_test/test_insert.py +++ b/tests/query_test/test_insert.py @@ -172,6 +172,29 @@ class TestInsertQueries(TestInsertBase): test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite .get_db_name_from_format(vector.get_value('table_format'))}) + @pytest.mark.execute_serially + def test_parallel_checksum(self, vector, unique_database): + """Test that checksum is calculated in parallel when inserting into a table + with multiple files.""" + # Ensure source table is loaded into catalogd. + self.execute_query("describe functional.alltypesaggmultifilesnopart") + + exec_options = vector.get_value('exec_option') + exec_options['debug_action'] = 'catalogd_load_file_checksums_delay:SLEEP@3000' + handle = self.execute_query_async("create table {0}.test as select * from " + "functional.alltypesaggmultifilesnopart".format(unique_database), exec_options) + + # Test file has 4 files, so work can be distributed across 3 executors. This results + # in writing 3 new files in 3 threads. + catalogd = ImpalaCluster.get_e2e_test_cluster().catalogd.service + catalogd.wait_for_metric_value( + "catalog-server.metadata.file.num-loading-threads", 3) + catalogd.wait_for_metric_value( + "catalog-server.metadata.table.num-loading-file-metadata", 1) + + # Stop the query and cleanup. + self.close_query(handle) + class TestInsertQueriesWithDefaultFormat(TestInsertBase):
