This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ccb8eac10a4ffdce61dd8fb1c359969b6ba2c77e
Author: Riza Suminto <[email protected]>
AuthorDate: Wed Jun 4 11:33:26 2025 -0700

    IMPALA-14075: Add CatalogOpExecutor.icebergExecutorService_
    
    Before this patch, Impala executes EXPIRE_SNAPSHOTS operation on a
    single thread. It can be really slow on cloud storage systems,
    especially if the operation needs to remove lots of files.
    
    This patch adds CatalogOpExecutor.icebergExecutorService_ to parallelize
    Iceberg API call that supports passing ExecutorService, such as
    ExpireSnapshots.executeDeleteWith(). Number of threads for this executor
    service is controlled by CatalogD flag --iceberg_catalog_num_threads. It
    is default to 16, same as --num_metadata_loading_threads default value.
    
    Rename ValidateMinProcessingPerThread to ValidatePositiveInt64 to match
    with other validators in backend-gflag-util.cc.
    
    Testing:
    - Lower sleep time between insert queries from 5s to 1s in
      test_expire_snapshots and test_describe_history_params to speed up
      tests.
    - Manually verify that 'IcebergCatalogThread' threads are visible in
      /jvm-threadz page of CatalogD.
    - Pass test_iceberg.py.
    
    Change-Id: I6dcbf1e406e1732ef8829eb0cd627d932291d485
    Reviewed-on: http://gerrit.cloudera.org:8080/22980
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/util/backend-gflag-util.cc                          | 14 ++++++++++++--
 common/thrift/BackendGflags.thrift                         |  2 ++
 .../main/java/org/apache/impala/service/BackendConfig.java |  4 ++++
 .../java/org/apache/impala/service/CatalogOpExecutor.java  | 13 +++++++++++--
 .../apache/impala/service/IcebergCatalogOpExecutor.java    |  6 ++++--
 tests/query_test/test_iceberg.py                           |  8 ++++----
 6 files changed, 37 insertions(+), 10 deletions(-)

diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index c37b6afc5..ce52d7dca 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -293,6 +293,10 @@ 
DEFINE_int64_hidden(data_stream_sender_buffer_size_used_by_planner, -1,
     "With default -1 the planner uses the old logic that is different"
     "than how the backend actually works (see IMPALA-12594)");
 
+DEFINE_int32(iceberg_catalog_num_threads, 16,
+    "Maximum number of threads to use for Iceberg catalog operations. These 
threads are "
+    "shared among concurrent Iceberg catalog operation (ie., 
ExpireSnapshot).");
+
 using strings::Substitute;
 
 namespace impala {
@@ -308,7 +312,7 @@ static bool ValidatePositiveDouble(const char* flagname, 
double value) {
   return false;
 }
 
-static bool ValidateMinProcessingPerThread(const char* flagname, int64_t 
value) {
+static bool ValidatePositiveInt64(const char* flagname, int64_t value) {
   if (0 < value) {
     return true;
   }
@@ -317,9 +321,14 @@ static bool ValidateMinProcessingPerThread(const char* 
flagname, int64_t value)
   return false;
 }
 
+static bool ValidatePositiveInt32(const char* flagname, int32_t value) {
+  return ValidatePositiveInt64(flagname, value);
+}
+
 DEFINE_validator(query_cpu_count_divisor, &ValidatePositiveDouble);
-DEFINE_validator(min_processing_per_thread, &ValidateMinProcessingPerThread);
+DEFINE_validator(min_processing_per_thread, &ValidatePositiveInt64);
 DEFINE_validator(query_cpu_root_factor, &ValidatePositiveDouble);
+DEFINE_validator(iceberg_catalog_num_threads, &ValidatePositiveInt32);
 
 Status GetConfigFromCommand(const string& flag_cmd, string& result) {
   result.clear();
@@ -534,6 +543,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_max_outstanding_events_on_executors(
       FLAGS_max_outstanding_events_on_executors);
   
cfg.__set_consolidate_grant_revoke_requests(FLAGS_consolidate_grant_revoke_requests);
+  cfg.__set_iceberg_catalog_num_threads(FLAGS_iceberg_catalog_num_threads);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index f1f93877c..9f909b724 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -339,4 +339,6 @@ struct TBackendGflags {
   153: required i32 max_outstanding_events_on_executors
 
   154: required bool consolidate_grant_revoke_requests
+
+  155: required i32 iceberg_catalog_num_threads
 }
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index d9d84719e..0a4ca616f 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -577,4 +577,8 @@ public class BackendConfig {
   public boolean consolidateGrantRevokeRequests() {
     return backendCfg_.consolidate_grant_revoke_requests;
   }
+
+  public int icebergCatalogNumThreads() {
+    return backendCfg_.iceberg_catalog_num_threads;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index c9a986cb0..1534f271a 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -49,6 +50,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
@@ -408,6 +411,7 @@ public class CatalogOpExecutor {
   private final AuthorizationConfig authzConfig_;
   private final AuthorizationManager authzManager_;
   private final HiveJavaFunctionFactory hiveJavaFuncFactory_;
+  private final ExecutorService icebergExecutorService_;
 
   // A singleton monitoring class that keeps track of the catalog operations.
   private final CatalogOperationTracker catalogOpTracker_ =
@@ -425,6 +429,10 @@ public class CatalogOpExecutor {
     authzConfig_ = Preconditions.checkNotNull(authzConfig);
     authzManager_ = Preconditions.checkNotNull(authzManager);
     hiveJavaFuncFactory_ = Preconditions.checkNotNull(hiveJavaFuncFactory);
+    icebergExecutorService_ =
+        
Executors.newFixedThreadPool(BackendConfig.INSTANCE.icebergCatalogNumThreads(),
+                new 
ThreadFactoryBuilder().setNameFormat("IcebergCatalogThread-%d")
+                .build());
   }
 
   public CatalogServiceCatalog getCatalog() { return catalog_; }
@@ -1611,8 +1619,9 @@ public class CatalogOpExecutor {
             addSummary(response, rollbackSummary);
           } else if (setExecuteParams.isSetExpire_snapshots_params()) {
             String expireSummary =
-                IcebergCatalogOpExecutor.alterTableExecuteExpireSnapshots(
-                    iceTxn, setExecuteParams.getExpire_snapshots_params());
+                
IcebergCatalogOpExecutor.alterTableExecuteExpireSnapshots(iceTxn,
+                    setExecuteParams.getExpire_snapshots_params(),
+                    icebergExecutorService_);
             addSummary(response, expireSummary);
           } else {
             // Cannot happen, but throw just in case.
diff --git 
a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 1d1a50e8d..5faf6aef4 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Collections;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.BaseTable;
@@ -211,11 +212,12 @@ public class IcebergCatalogOpExecutor {
    * TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many 
snapshots
    * should be retained even when all snapshots are selected by 
expireOlderThan().
    */
-  public static String alterTableExecuteExpireSnapshots(
-      Transaction txn, TAlterTableExecuteExpireSnapshotsParams params) {
+  public static String alterTableExecuteExpireSnapshots(Transaction txn,
+      TAlterTableExecuteExpireSnapshotsParams params, ExecutorService 
executors) {
     ExpireSnapshots expireApi = txn.expireSnapshots();
     Preconditions.checkState(params.isSetOlder_than_millis());
     expireApi.expireOlderThan(params.older_than_millis);
+    expireApi.executeDeleteWith(executors);
     expireApi.commit();
     return "Snapshots have been expired.";
   }
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 0bcbf3a94..141808c67 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -92,9 +92,9 @@ class TestIcebergTable(IcebergTestSuite):
         ts_0 = datetime.datetime.now()
         insert_q = "insert into {0} values (1)".format(tbl_name)
         ts_1 = self.execute_query_ts(impalad_client, insert_q)
-        time.sleep(5)
+        time.sleep(1)
         impalad_client.execute(insert_q)
-        time.sleep(5)
+        time.sleep(1)
         ts_2 = self.execute_query_ts(impalad_client, insert_q)
         impalad_client.execute(insert_q)
 
@@ -472,9 +472,9 @@ class TestIcebergTable(IcebergTestSuite):
           .format(tbl_name))
       insert_q = "insert into {0} values (1)".format(tbl_name)
       ts_1 = self.execute_query_ts(impalad_client, insert_q)
-      time.sleep(5)
+      time.sleep(1)
       ts_2 = self.execute_query_ts(impalad_client, insert_q)
-      time.sleep(5)
+      time.sleep(1)
       ts_3 = self.execute_query_ts(impalad_client, insert_q)
       # Describe history without predicate
       data = impalad_client.execute("DESCRIBE HISTORY {0}".format(tbl_name))

Reply via email to