This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cee2d01f52a75ca8d2c8eb9eb0f57d39d4db9c9b
Author: Michael Smith <[email protected]>
AuthorDate: Thu May 8 16:56:04 2025 -0700

    IMPALA-12162: Use thread pool to collect checksums
    
    Refactors ParallelFileMetadataLoader to be usable for multiple types of
    metadata. Uses it to collect checksums for new files in parallel.
    
    Testing: adds test that multiple loading threads are used and checksum
    does not take too long.
    
    Change-Id: I314621104e4757620c0a90d41dd6875bf8855b51
    Reviewed-on: http://gerrit.cloudera.org:8080/22872
    Reviewed-by: Riza Suminto <[email protected]>
    Reviewed-by: Quanlong Huang <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/catalog/ParallelFileMetadataLoader.java | 139 ++++++++++++---------
 .../apache/impala/service/CatalogOpExecutor.java   |  78 ++++++++----
 .../java/org/apache/impala/util/DebugUtils.java    |   4 +
 tests/query_test/test_insert.py                    |  23 ++++
 4 files changed, 162 insertions(+), 82 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index 385f3298a..39d42380a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -22,13 +22,16 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import javax.annotation.Nullable;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -71,15 +74,24 @@ public class ParallelFileMetadataLoader {
   private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100;
 
   private final String logPrefix_;
-  private final Map<String, FileMetadataLoader> loaders_;
-  private final Map<String, List<HdfsPartition.Builder>> partsByPath_;
   private final FileSystem fs_;
 
+  // Loaders run in parallel and return a result Object.
+  private final List<Pair<String, Callable<Object>>> loaders_;
+  // Updaters are run sequentially after the loaders finish. They process the 
result
+  // Object returned by the corresponding loader.
+  private final List<Consumer<Object>> updaters_;
+
+  /**
+   * Constructs a loader for file descriptors and metadata stats.
+   */
   public ParallelFileMetadataLoader(FileSystem fs,
       Collection<Builder> partBuilders,
       ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean 
isRecursive,
       @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction,
       String logPrefix) {
+    fs_ = fs;
+    logPrefix_ = logPrefix;
     if (writeIdList != null || validTxnList != null) {
       // make sure that both either both writeIdList and validTxnList are set 
or both
       // of them are not.
@@ -87,72 +99,82 @@ public class ParallelFileMetadataLoader {
     }
     // Group the partitions by their path (multiple partitions may point to 
the same
     // path).
-    partsByPath_ = Maps.newHashMap();
+    Map<String, List<HdfsPartition.Builder>> partsByPath = Maps.newHashMap();
     for (HdfsPartition.Builder p : partBuilders) {
-      partsByPath_.computeIfAbsent(p.getLocation(), (path) -> new 
ArrayList<>())
-          .add(p);
+      partsByPath.computeIfAbsent(p.getLocation(), (path) -> new 
ArrayList<>()).add(p);
     }
-    // Create a FileMetadataLoader for each path.
-    loaders_ = Maps.newHashMap();
-    for (Map.Entry<String, List<HdfsPartition.Builder>> e : 
partsByPath_.entrySet()) {
-      List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
-      FileMetadataLoader loader;
-      HdfsFileFormat format = e.getValue().get(0).getFileFormat();
+    // Create a loader (FileMetadataLoader) and updater for each path.
+    loaders_ = new ArrayList<>(partsByPath.size());
+    updaters_ = new ArrayList<>(partsByPath.size());
+    for (Map.Entry<String, List<HdfsPartition.Builder>> e : 
partsByPath.entrySet()) {
+      List<HdfsPartition.Builder> builders = e.getValue();
+      List<FileDescriptor> oldFds = builders.get(0).getFileDescriptors();
+      HdfsFileFormat format = builders.get(0).getFileFormat();
       Preconditions.checkState(!HdfsFileFormat.ICEBERG.equals(format));
-      loader = new FileMetadataLoader(e.getKey(), isRecursive, oldFds, 
hostIndex,
-          validTxnList, writeIdList, format);
+      FileMetadataLoader loader = new FileMetadataLoader(e.getKey(), 
isRecursive, oldFds,
+          hostIndex, validTxnList, writeIdList, format);
       // If there is a cached partition mapped to this path, we recompute the 
block
       // locations even if the underlying files have not changed.
       // This is done to keep the cached block metadata up to date.
-      boolean hasCachedPartition = Iterables.any(e.getValue(),
+      boolean hasCachedPartition = Iterables.any(builders,
           HdfsPartition.Builder::isMarkedCached);
       loader.setForceRefreshBlockLocations(hasCachedPartition);
       loader.setDebugAction(debugAction);
-      loaders_.put(e.getKey(), loader);
+
+      loaders_.add(new Pair<>(e.getKey(), () -> { loader.load(); return 
loader; }));
+
+      updaters_.add((result) ->
+          updatePartBuilders((FileMetadataLoader) result, builders));
     }
-    this.logPrefix_ = logPrefix;
-    this.fs_ = fs;
   }
 
   /**
-   * Loads the file metadata for the given list of Partitions in the 
constructor. If the
-   * load is successful also set the fileDescriptors in the 
HdfsPartition.Builders.
-   * @throws TableLoadingException
+   * Store the loaded FDs from loader into the partitions via builders.
    */
-  void load() throws TableLoadingException {
-    loadInternal();
-
-    // Store the loaded FDs into the partitions.
-    for (Map.Entry<String, List<HdfsPartition.Builder>> e : 
partsByPath_.entrySet()) {
-      FileMetadataLoader loader = loaders_.get(e.getKey());
-
-      for (HdfsPartition.Builder partBuilder : e.getValue()) {
-        // Checks if we can reuse the old file descriptors. Partition builders 
in the list
-        // may have different old file descriptors. We need to verify them one 
by one.
-        if 
((!loader.hasFilesChangedCompareTo(partBuilder.getFileDescriptors()))) {
-          LOG.trace("Detected files unchanged on partition {}",
-              partBuilder.getPartitionName());
-          continue;
-        }
-        partBuilder.clearFileDescriptors();
-        List<FileDescriptor> deleteDescriptors = 
loader.getLoadedDeleteDeltaFds();
-        if (deleteDescriptors != null && !deleteDescriptors.isEmpty()) {
-          
partBuilder.setInsertFileDescriptors(loader.getLoadedInsertDeltaFds());
-          
partBuilder.setDeleteFileDescriptors(loader.getLoadedDeleteDeltaFds());
-        } else {
-          partBuilder.setFileDescriptors(loader.getLoadedFds());
-        }
-        partBuilder.setFileMetadataStats(loader.getFileMetadataStats());
+  private static void updatePartBuilders(FileMetadataLoader loader,
+      List<HdfsPartition.Builder> builders) {
+    for (HdfsPartition.Builder partBuilder : builders) {
+      // Checks if we can reuse the old file descriptors. Partition builders 
in the
+      // list may have different file descriptors. We need to verify them one 
by one.
+      if 
((!loader.hasFilesChangedCompareTo(partBuilder.getFileDescriptors()))) {
+        LOG.trace("Detected files unchanged on partition {}",
+            partBuilder.getPartitionName());
+        continue;
+      }
+      partBuilder.clearFileDescriptors();
+      List<FileDescriptor> deleteDescriptors = 
loader.getLoadedDeleteDeltaFds();
+      if (deleteDescriptors != null && !deleteDescriptors.isEmpty()) {
+        partBuilder.setInsertFileDescriptors(loader.getLoadedInsertDeltaFds());
+        partBuilder.setDeleteFileDescriptors(loader.getLoadedDeleteDeltaFds());
+      } else {
+        partBuilder.setFileDescriptors(loader.getLoadedFds());
       }
+      partBuilder.setFileMetadataStats(loader.getFileMetadataStats());
     }
   }
 
   /**
-   * Call 'load()' in parallel on all of the loaders. If any loaders fail, 
throws
-   * an exception. However, any successful loaders are guaranteed to complete
-   * before any exception is thrown.
+   * Constructs a loader from provided loaders and updaters.
+   */
+  public ParallelFileMetadataLoader(FileSystem fs, String logPrefix,
+      List<Pair<String, Callable<Object>>> loaders,
+      List<Consumer<Object>> updaters) {
+    Preconditions.checkNotNull(loaders);
+    Preconditions.checkNotNull(updaters);
+    Preconditions.checkArgument(loaders.size() == updaters.size());
+    fs_ = fs;
+    logPrefix_ = logPrefix;
+    loaders_ = loaders;
+    updaters_ = updaters;
+  }
+
+  /**
+   * Loads the file metadata for the given list of Partitions in the 
constructor. If the
+   * load is successful also set the fileDescriptors in the 
HdfsPartition.Builders. Any
+   * successful loaders are guaranteed to complete before any exception is 
thrown.
+   * @throws TableLoadingException
    */
-  private void loadInternal() throws TableLoadingException {
+  public void load() throws TableLoadingException {
     if (loaders_.isEmpty()) return;
 
     int failedLoadTasks = 0;
@@ -161,21 +183,22 @@ public class ParallelFileMetadataLoader {
     TOTAL_THREADS.addAndGet(poolSize);
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix_)) {
       TOTAL_TABLES.incrementAndGet();
-      List<Pair<FileMetadataLoader, Future<Void>>> futures =
-          new ArrayList<>(loaders_.size());
-      for (FileMetadataLoader loader : loaders_.values()) {
-        futures.add(new Pair<>(
-            loader, pool.submit(() -> { loader.load(); return null; })));
+      List<Pair<String, Future<Object>>> futures = new 
ArrayList<>(loaders_.size());
+      for (Pair<String, Callable<Object>> loader : loaders_) {
+        futures.add(new Pair<>(loader.first, pool.submit(loader.second)));
       }
 
       // Wait for the loaders to finish.
+      Preconditions.checkState(futures.size() == updaters_.size());
       for (int i = 0; i < futures.size(); i++) {
+        Pair<String, Future<Object>> future = futures.get(i);
         try {
-          futures.get(i).second.get();
+          Object result = future.second.get();
+          updaters_.get(i).accept(result);
         } catch (ExecutionException | InterruptedException e) {
           if (++failedLoadTasks <= MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG) {
-            LOG.error(logPrefix_ + " encountered an error loading data for 
path " +
-                futures.get(i).first.getPartDir(), e);
+            LOG.error("{} encountered an error loading data for path {}",
+                logPrefix_, future.first, e);
           }
         }
       }
@@ -187,8 +210,8 @@ public class ParallelFileMetadataLoader {
     if (failedLoadTasks > 0) {
       int errorsNotLogged = failedLoadTasks - 
MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG;
       if (errorsNotLogged > 0) {
-        LOG.error(logPrefix_ + " error loading {} paths. Only the first {} 
errors " +
-            "were logged", failedLoadTasks, 
MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG);
+        LOG.error("{} error loading {} paths. Only the first {} errors were 
logged",
+            logPrefix_, failedLoadTasks, 
MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG);
       }
       throw new TableLoadingException(logPrefix_ + ": failed to load " + 
failedLoadTasks
           + " paths. Check the catalog server log for more details.");
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 9e1683098..ed5c1e8cb 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -48,8 +48,10 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 
 import javax.annotation.Nullable;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -125,6 +127,7 @@ import 
org.apache.impala.catalog.HiveStorageDescriptorFactory;
 import org.apache.impala.catalog.IncompleteTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.ParallelFileMetadataLoader;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.PartitionNotFoundException;
 import org.apache.impala.catalog.PartitionStatsUtil;
@@ -7345,8 +7348,8 @@ public class CatalogOpExecutor {
     final FeFsTable feFsTable = (FeFsTable) table;
 
     // Collect file checksums (and ACID dir path) before taking table lock.
-    Map<String, List<FileMetadata>> fileMetadata = getFileMetadata(
-        feFsTable, update.getUpdated_partitions(), catalogTimeline);
+    Map<String, List<FileMetadata>> fileMetadata = getFileMetadata(feFsTable,
+        update.getUpdated_partitions(), update.getDebug_action(), 
catalogTimeline);
 
     tryWriteLock(table, "updating the catalog", catalogTimeline);
     final Timer.Context context
@@ -7864,45 +7867,72 @@ public class CatalogOpExecutor {
     }
   }
 
+  private FileSystem getFileSystemOrNull(FeFsTable table) {
+    try {
+      return table.getFileSystem();
+    } catch (CatalogException e) {
+      LOG.warn("Failed to get FileSystem for table {}", table.getFullName(), 
e);
+    }
+    return null;
+  }
+
   /**
    * Returns a map of partition name to list of file metadata: name, checksum, 
and
    * acidDirPath. Logs errors on individual files.
    */
   private Map<String, List<FileMetadata>> getFileMetadata(FeFsTable table,
-      Map<String, TUpdatedPartition> updatedPartitions, EventSequence 
catalogTimeline) {
+      Map<String, TUpdatedPartition> updatedPartitions, String debugAction,
+      EventSequence catalogTimeline) {
     if (!shouldGenerateInsertEvents(table)) return null;
     boolean isPartitioned = table.isPartitioned();
     boolean isTransactional = AcidUtils.isTransactionalTable(table);
 
     // Get table file system with table location.
-    FileSystem tableFs = null;
-    try {
-      tableFs = table.getFileSystem();
-    } catch (CatalogException e) {
-      LOG.warn("Failed to get FileSystem for table {}", table.getFullName(), 
e);
-    }
+    final FileSystem tableFs = getFileSystemOrNull(table);
 
+    // Create a loader and updater for each path.
+    List<Pair<String, Callable<Object>>> loaders = new ArrayList<>();
+    List<Consumer<Object>> updaters = new ArrayList<>();
     Map<String, List<FileMetadata>> fileMetadata = Maps.newHashMap();
+    int numFiles = 0;
     for (Map.Entry<String, TUpdatedPartition> e : 
updatedPartitions.entrySet()) {
+      numFiles += e.getValue().getFiles().size();
       List<FileMetadata> files = new 
ArrayList<>(e.getValue().getFiles().size());
       for (String file : e.getValue().getFiles()) {
-        FileChecksum checksum = null;
-        String acidDirPath = null;
-        try {
-          Path filePath = new Path(file);
-          FileSystem fs = (isPartitioned || tableFs == null) ?
-              FeFsTable.getFileSystem(filePath) : tableFs;
-          checksum = fs.getFileChecksum(filePath);
-          if (isTransactional) {
-            acidDirPath = AcidUtils.getFirstLevelAcidDirPath(filePath, fs);
+        loaders.add(new Pair<>(file, () -> {
+          FileChecksum checksum = null;
+          String acidDirPath = null;
+          try {
+            Path filePath = new Path(file);
+            FileSystem fs = (isPartitioned || tableFs == null) ?
+                FeFsTable.getFileSystem(filePath) : tableFs;
+            if (debugAction != null) {
+              DebugUtils.executeDebugAction(
+                  debugAction, DebugUtils.LOAD_FILE_CHECKSUMS_DELAY);
+            }
+            checksum = fs.getFileChecksum(filePath);
+            if (isTransactional) {
+              acidDirPath = AcidUtils.getFirstLevelAcidDirPath(filePath, fs);
+            }
+          } catch (CatalogException | IOException ex) {
+            LOG.error("Failed to collect insert metadata for {} in table {}",
+                file, table.getFullName(), ex);
           }
-        } catch (CatalogException | IOException ex) {
-          LOG.error("Failed to collect insert metadata for {} in table {}",
-              file, table.getFullName(), ex);
-        }
-        files.add(new FileMetadata(file, checksum, acidDirPath));
+          return new FileMetadata(file, checksum, acidDirPath);
+        }));
+        updaters.add((result) -> files.add((FileMetadata) result));
+        fileMetadata.put(e.getKey(), files);
       }
-      fileMetadata.put(e.getKey(), files);
+    }
+
+    String logPrefix = String.format("Loading file checksums for %s paths for 
table %s",
+        numFiles, table.getFullName());
+    try {
+      new ParallelFileMetadataLoader(
+          table.getFileSystem(), logPrefix, loaders, updaters).load();
+    } catch (CatalogException e) {
+      LOG.error("Failed to collect insert metadata for table {}",
+          table.getFullName(), e);
     }
     catalogTimeline.markEvent("Collected file checksums");
     return fileMetadata;
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java 
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 17b82b9f9..89e9d978c 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -87,6 +87,10 @@ public class DebugUtils {
   // debug action label for introducing delay in loading table metadata.
   public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay";
 
+  // debug action label for introducing delay in loading file checksums.
+  public static final String LOAD_FILE_CHECKSUMS_DELAY =
+      "catalogd_load_file_checksums_delay";
+
   // debug action label for introducing delay in HMS alter_table rename RPC.
   public static final String TABLE_RENAME_DELAY = 
"catalogd_table_rename_delay";
 
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index 0a639c2cf..0a8ef01da 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -172,6 +172,29 @@ class TestInsertQueries(TestInsertBase):
           test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
           .get_db_name_from_format(vector.get_value('table_format'))})
 
+  @pytest.mark.execute_serially
+  def test_parallel_checksum(self, vector, unique_database):
+    """Test that checksum is calculated in parallel when inserting into a table
+    with multiple files."""
+    # Ensure source table is loaded into catalogd.
+    self.execute_query("describe functional.alltypesaggmultifilesnopart")
+
+    exec_options = vector.get_value('exec_option')
+    exec_options['debug_action'] = 
'catalogd_load_file_checksums_delay:SLEEP@3000'
+    handle = self.execute_query_async("create table {0}.test as select * from "
+        "functional.alltypesaggmultifilesnopart".format(unique_database), 
exec_options)
+
+    # Test file has 4 files, so work can be distributed across 3 executors. 
This results
+    # in writing 3 new files in 3 threads.
+    catalogd = ImpalaCluster.get_e2e_test_cluster().catalogd.service
+    catalogd.wait_for_metric_value(
+        "catalog-server.metadata.file.num-loading-threads", 3)
+    catalogd.wait_for_metric_value(
+        "catalog-server.metadata.table.num-loading-file-metadata", 1)
+
+    # Stop the query and cleanup.
+    self.close_query(handle)
+
 
 class TestInsertQueriesWithDefaultFormat(TestInsertBase):
 

Reply via email to