This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ef174d3aa IMPALA-12162: Checksum files before lock in INSERT
ef174d3aa is described below

commit ef174d3aa5405043fa5084cac83bafcdc1afd473
Author: Michael Smith <[email protected]>
AuthorDate: Thu May 8 16:13:11 2025 -0700

    IMPALA-12162: Checksum files before lock in INSERT
    
    Collect file metadata - file checksums and ACID directory path - before
    acquiring the table lock. Table lock doesn't prevent files from being
    deleted from the underlying filesystem, and these operations can take
    time, blocking other operations that depend on the table lock.
    
    Fires InsertEvents with partial data if there are errors collecting
    checksum or acidDirPath on individual files to provide best-effort
    information. Hive defaults to empty string for these values when not
    specified.
    
    IMPALA-10254 has been resolved, so removes the exception for
    FeIcebergTable and associated TODO.
    
    Change-Id: I18f9686f5d53cf1e7c384684c25427fb5353e2af
    Reviewed-on: http://gerrit.cloudera.org:8080/22871
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 129 +++++++++++++--------
 1 file changed, 82 insertions(+), 47 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 1ce5bd01a..9e1683098 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -7342,6 +7342,11 @@ public class CatalogOpExecutor {
       throw new InternalException("Unexpected table type: " +
           update.getTarget_table());
     }
+    final FeFsTable feFsTable = (FeFsTable) table;
+
+    // Collect file checksums (and ACID dir path) before taking table lock.
+    Map<String, List<FileMetadata>> fileMetadata = getFileMetadata(
+        feFsTable, update.getUpdated_partitions(), catalogTimeline);
 
     tryWriteLock(table, "updating the catalog", catalogTimeline);
     final Timer.Context context
@@ -7392,7 +7397,7 @@ public class CatalogOpExecutor {
                   .build());
         }
       }
-      Collection<? extends FeFsPartition> parts = 
((FeFsTable)table).loadAllPartitions();
+      Collection<? extends FeFsPartition> parts = 
feFsTable.loadAllPartitions();
       List<FeFsPartition> affectedExistingPartitions = new ArrayList<>();
       List<org.apache.hadoop.hive.metastore.api.Partition> 
hmsPartitionsStatsUnset =
           Lists.newArrayList();
@@ -7454,7 +7459,7 @@ public class CatalogOpExecutor {
       // Before commit fire insert events if external event processing is
       // enabled. This is best-effort. Any errors in it should not fail the 
INSERT.
       try {
-        createInsertEvents((FeFsTable) table, update.getUpdated_partitions(),
+        createInsertEvents(feFsTable, fileMetadata,
             addedPartitionNames, update.is_overwrite, tblTxn, catalogTimeline);
       } catch (Exception e) {
         LOG.error("Failed to fire insert events for table {}", 
table.getFullName(), e);
@@ -7709,14 +7714,13 @@ public class CatalogOpExecutor {
    * does, see:
    * 
https://github.com/apache/hive/blob/25892ea409/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L3251
    * @param table The target table.
-   * @param updatedPartitions All affected partitions with the list of new 
files
-   *                          inserted.
+   * @param updatedPartitions All affected partitions with FileMetadata for 
new files.
    * @param addedPartitionNames List of new partitions created during the 
insert.
    * @param isInsertOverwrite Indicates if the operation was an insert 
overwrite.
    * @param tblTxn Contains the transactionId and the writeId for the insert.
    */
   private void createInsertEvents(FeFsTable table,
-      Map<String, TUpdatedPartition> updatedPartitions,
+      Map<String, List<FileMetadata>> updatedPartitions,
       Map<String, List<String>> addedPartitionNames,
       boolean isInsertOverwrite, TblTransaction tblTxn, EventSequence 
catalogTimeline)
       throws CatalogException, MetaException {
@@ -7796,11 +7800,11 @@ public class CatalogOpExecutor {
    */
   private void prepareInsertEventData(FeFsTable table,
       String partName, List<String> partVals,
-      Map<String, TUpdatedPartition> updatedPartitions,
+      Map<String, List<FileMetadata>> updatedPartitions,
       boolean isInsertOverwrite, boolean isPartitioned,
       List<InsertEventRequestData> insertEventReqDatas,
-      List<List<String>> insertEventPartVals) throws CatalogException {
-    List<String> newFiles = updatedPartitions.get(partName).getFiles();
+      List<List<String>> insertEventPartVals) {
+    List<FileMetadata> newFiles = updatedPartitions.get(partName);
     if (!newFiles.isEmpty() || isInsertOverwrite) {
       LOG.info("{} new files detected for table {}{}",
           newFiles.size(), table.getFullName(),
@@ -7843,50 +7847,81 @@ public class CatalogOpExecutor {
     return BackendConfig.INSTANCE.isInsertEventsEnabled();
   }
 
+  private class FileMetadata {
+    String filename;
+    FileChecksum checksum;
+    String acidDirPath;
+
+    FileMetadata(String filename, FileChecksum checksum, String acidDirPath) {
+      this.filename = filename;
+      this.checksum = checksum;
+      this.acidDirPath = acidDirPath;
+    }
+
+    String getChecksum() {
+      return checksum == null ? "" :
+          StringUtils.byteToHexString(checksum.getBytes(), 0, 
checksum.getLength());
+    }
+  }
+
+  /**
+   * Returns a map of partition name to list of file metadata: name, checksum, 
and
+   * acidDirPath. Logs errors on individual files.
+   */
+  private Map<String, List<FileMetadata>> getFileMetadata(FeFsTable table,
+      Map<String, TUpdatedPartition> updatedPartitions, EventSequence 
catalogTimeline) {
+    if (!shouldGenerateInsertEvents(table)) return null;
+    boolean isPartitioned = table.isPartitioned();
+    boolean isTransactional = AcidUtils.isTransactionalTable(table);
+
+    // Get table file system with table location.
+    FileSystem tableFs = null;
+    try {
+      tableFs = table.getFileSystem();
+    } catch (CatalogException e) {
+      LOG.warn("Failed to get FileSystem for table {}", table.getFullName(), 
e);
+    }
+
+    Map<String, List<FileMetadata>> fileMetadata = Maps.newHashMap();
+    for (Map.Entry<String, TUpdatedPartition> e : 
updatedPartitions.entrySet()) {
+      List<FileMetadata> files = new 
ArrayList<>(e.getValue().getFiles().size());
+      for (String file : e.getValue().getFiles()) {
+        FileChecksum checksum = null;
+        String acidDirPath = null;
+        try {
+          Path filePath = new Path(file);
+          FileSystem fs = (isPartitioned || tableFs == null) ?
+              FeFsTable.getFileSystem(filePath) : tableFs;
+          checksum = fs.getFileChecksum(filePath);
+          if (isTransactional) {
+            acidDirPath = AcidUtils.getFirstLevelAcidDirPath(filePath, fs);
+          }
+        } catch (CatalogException | IOException ex) {
+          LOG.error("Failed to collect insert metadata for {} in table {}",
+              file, table.getFullName(), ex);
+        }
+        files.add(new FileMetadata(file, checksum, acidDirPath));
+      }
+      fileMetadata.put(e.getKey(), files);
+    }
+    catalogTimeline.markEvent("Collected file checksums");
+    return fileMetadata;
+  }
+
   private InsertEventRequestData makeInsertEventData(FeFsTable tbl, 
List<String> partVals,
-      List<String> newFiles, boolean isInsertOverwrite) throws 
CatalogException {
+      List<FileMetadata> newFiles, boolean isInsertOverwrite) {
     Preconditions.checkNotNull(newFiles);
     Preconditions.checkNotNull(partVals);
     InsertEventRequestData insertEventRequestData = new InsertEventRequestData(
-        Lists.newArrayListWithCapacity(
-            newFiles.size()));
+      new ArrayList<>(newFiles.size()));
     boolean isTransactional = AcidUtils.isTransactionalTable(tbl);
-    // in case of unpartitioned table, partVals will be empty
-    boolean isPartitioned = !partVals.isEmpty();
-    if (isPartitioned) {
-      MetastoreShim.setPartitionVal(insertEventRequestData, partVals);
-    }
-    // Get table file system with table location.
-    FileSystem tableFs = tbl.getFileSystem();
-    FileSystem fs;
-    for (String file : newFiles) {
-      try {
-        Path filePath = new Path(file);
-        if (!isPartitioned) {
-          fs = tableFs;
-        } else {
-          // Partitions may be in different file systems.
-          fs = FeFsTable.getFileSystem(filePath);
-        }
-        FileChecksum checkSum = fs.getFileChecksum(filePath);
-        String checksumStr = checkSum == null ? ""
-            : StringUtils.byteToHexString(checkSum.getBytes(), 0, 
checkSum.getLength());
-        insertEventRequestData.addToFilesAdded(file);
-        insertEventRequestData.addToFilesAddedChecksum(checksumStr);
-        if (isTransactional) {
-          String acidDirPath = AcidUtils.getFirstLevelAcidDirPath(filePath, 
fs);
-          if (acidDirPath != null) {
-            MetastoreShim.addToSubDirectoryList(insertEventRequestData, 
acidDirPath);
-          }
-        }
-        insertEventRequestData.setReplace(isInsertOverwrite);
-      } catch (IOException e) {
-        if (tbl instanceof FeIcebergTable) {
-          // TODO IMPALA-10254: load data files via Iceberg API. Currently we 
load
-          // Iceberg data files via file listing, so we might see files being 
written.
-          continue;
-        }
-        throw new CatalogException("Could not get the file checksum for file " 
+ file, e);
+    MetastoreShim.setPartitionVal(insertEventRequestData, partVals);
+    insertEventRequestData.setReplace(isInsertOverwrite);
+    for (FileMetadata metadata : newFiles) {
+      insertEventRequestData.addToFilesAdded(metadata.filename);
+      insertEventRequestData.addToFilesAddedChecksum(metadata.getChecksum());
+      if (isTransactional) {
+        MetastoreShim.addToSubDirectoryList(insertEventRequestData, 
metadata.acidDirPath);
       }
     }
     return insertEventRequestData;

Reply via email to