This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ccb8eac10a4ffdce61dd8fb1c359969b6ba2c77e Author: Riza Suminto <[email protected]> AuthorDate: Wed Jun 4 11:33:26 2025 -0700 IMPALA-14075: Add CatalogOpExecutor.icebergExecutorService_ Before this patch, Impala executes EXPIRE_SNAPSHOTS operation on a single thread. It can be really slow on cloud storage systems, especially if the operation needs to remove lots of files. This patch adds CatalogOpExecutor.icebergExecutorService_ to parallelize Iceberg API call that supports passing ExecutorService, such as ExpireSnapshots.executeDeleteWith(). Number of threads for this executor service is controlled by CatalogD flag --iceberg_catalog_num_threads. It is default to 16, same as --num_metadata_loading_threads default value. Rename ValidateMinProcessingPerThread to ValidatePositiveInt64 to match with other validators in backend-gflag-util.cc. Testing: - Lower sleep time between insert queries from 5s to 1s in test_expire_snapshots and test_describe_history_params to speed up tests. - Manually verify that 'IcebergCatalogThread' threads are visible in /jvm-threadz page of CatalogD. - Pass test_iceberg.py. Change-Id: I6dcbf1e406e1732ef8829eb0cd627d932291d485 Reviewed-on: http://gerrit.cloudera.org:8080/22980 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/util/backend-gflag-util.cc | 14 ++++++++++++-- common/thrift/BackendGflags.thrift | 2 ++ .../main/java/org/apache/impala/service/BackendConfig.java | 4 ++++ .../java/org/apache/impala/service/CatalogOpExecutor.java | 13 +++++++++++-- .../apache/impala/service/IcebergCatalogOpExecutor.java | 6 ++++-- tests/query_test/test_iceberg.py | 8 ++++---- 6 files changed, 37 insertions(+), 10 deletions(-) diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index c37b6afc5..ce52d7dca 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -293,6 +293,10 @@ DEFINE_int64_hidden(data_stream_sender_buffer_size_used_by_planner, -1, "With default -1 the planner uses the old logic that is different" "than how the backend actually works (see IMPALA-12594)"); +DEFINE_int32(iceberg_catalog_num_threads, 16, + "Maximum number of threads to use for Iceberg catalog operations. These threads are " + "shared among concurrent Iceberg catalog operation (ie., ExpireSnapshot)."); + using strings::Substitute; namespace impala { @@ -308,7 +312,7 @@ static bool ValidatePositiveDouble(const char* flagname, double value) { return false; } -static bool ValidateMinProcessingPerThread(const char* flagname, int64_t value) { +static bool ValidatePositiveInt64(const char* flagname, int64_t value) { if (0 < value) { return true; } @@ -317,9 +321,14 @@ static bool ValidateMinProcessingPerThread(const char* flagname, int64_t value) return false; } +static bool ValidatePositiveInt32(const char* flagname, int32_t value) { + return ValidatePositiveInt64(flagname, value); +} + DEFINE_validator(query_cpu_count_divisor, &ValidatePositiveDouble); -DEFINE_validator(min_processing_per_thread, &ValidateMinProcessingPerThread); +DEFINE_validator(min_processing_per_thread, &ValidatePositiveInt64); DEFINE_validator(query_cpu_root_factor, &ValidatePositiveDouble); +DEFINE_validator(iceberg_catalog_num_threads, &ValidatePositiveInt32); Status GetConfigFromCommand(const string& flag_cmd, string& result) { result.clear(); @@ -534,6 +543,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) { cfg.__set_max_outstanding_events_on_executors( FLAGS_max_outstanding_events_on_executors); cfg.__set_consolidate_grant_revoke_requests(FLAGS_consolidate_grant_revoke_requests); + cfg.__set_iceberg_catalog_num_threads(FLAGS_iceberg_catalog_num_threads); return Status::OK(); } diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index f1f93877c..9f909b724 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -339,4 +339,6 @@ struct TBackendGflags { 153: required i32 max_outstanding_events_on_executors 154: required bool consolidate_grant_revoke_requests + + 155: required i32 iceberg_catalog_num_threads } diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index d9d84719e..0a4ca616f 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -577,4 +577,8 @@ public class BackendConfig { public boolean consolidateGrantRevokeRequests() { return backendCfg_.consolidate_grant_revoke_requests; } + + public int icebergCatalogNumThreads() { + return backendCfg_.iceberg_catalog_num_threads; + } } diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index c9a986cb0..1534f271a 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -30,6 +30,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -49,6 +50,8 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -408,6 +411,7 @@ public class CatalogOpExecutor { private final AuthorizationConfig authzConfig_; private final AuthorizationManager authzManager_; private final HiveJavaFunctionFactory hiveJavaFuncFactory_; + private final ExecutorService icebergExecutorService_; // A singleton monitoring class that keeps track of the catalog operations. private final CatalogOperationTracker catalogOpTracker_ = @@ -425,6 +429,10 @@ public class CatalogOpExecutor { authzConfig_ = Preconditions.checkNotNull(authzConfig); authzManager_ = Preconditions.checkNotNull(authzManager); hiveJavaFuncFactory_ = Preconditions.checkNotNull(hiveJavaFuncFactory); + icebergExecutorService_ = + Executors.newFixedThreadPool(BackendConfig.INSTANCE.icebergCatalogNumThreads(), + new ThreadFactoryBuilder().setNameFormat("IcebergCatalogThread-%d") + .build()); } public CatalogServiceCatalog getCatalog() { return catalog_; } @@ -1611,8 +1619,9 @@ public class CatalogOpExecutor { addSummary(response, rollbackSummary); } else if (setExecuteParams.isSetExpire_snapshots_params()) { String expireSummary = - IcebergCatalogOpExecutor.alterTableExecuteExpireSnapshots( - iceTxn, setExecuteParams.getExpire_snapshots_params()); + IcebergCatalogOpExecutor.alterTableExecuteExpireSnapshots(iceTxn, + setExecuteParams.getExpire_snapshots_params(), + icebergExecutorService_); addSummary(response, expireSummary); } else { // Cannot happen, but throw just in case. diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java index 1d1a50e8d..5faf6aef4 100644 --- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Collections; +import java.util.concurrent.ExecutorService; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseTable; @@ -211,11 +212,12 @@ public class IcebergCatalogOpExecutor { * TableProperties.MIN_SNAPSHOTS_TO_KEEP table property manages how many snapshots * should be retained even when all snapshots are selected by expireOlderThan(). */ - public static String alterTableExecuteExpireSnapshots( - Transaction txn, TAlterTableExecuteExpireSnapshotsParams params) { + public static String alterTableExecuteExpireSnapshots(Transaction txn, + TAlterTableExecuteExpireSnapshotsParams params, ExecutorService executors) { ExpireSnapshots expireApi = txn.expireSnapshots(); Preconditions.checkState(params.isSetOlder_than_millis()); expireApi.expireOlderThan(params.older_than_millis); + expireApi.executeDeleteWith(executors); expireApi.commit(); return "Snapshots have been expired."; } diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 0bcbf3a94..141808c67 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -92,9 +92,9 @@ class TestIcebergTable(IcebergTestSuite): ts_0 = datetime.datetime.now() insert_q = "insert into {0} values (1)".format(tbl_name) ts_1 = self.execute_query_ts(impalad_client, insert_q) - time.sleep(5) + time.sleep(1) impalad_client.execute(insert_q) - time.sleep(5) + time.sleep(1) ts_2 = self.execute_query_ts(impalad_client, insert_q) impalad_client.execute(insert_q) @@ -472,9 +472,9 @@ class TestIcebergTable(IcebergTestSuite): .format(tbl_name)) insert_q = "insert into {0} values (1)".format(tbl_name) ts_1 = self.execute_query_ts(impalad_client, insert_q) - time.sleep(5) + time.sleep(1) ts_2 = self.execute_query_ts(impalad_client, insert_q) - time.sleep(5) + time.sleep(1) ts_3 = self.execute_query_ts(impalad_client, insert_q) # Describe history without predicate data = impalad_client.execute("DESCRIBE HISTORY {0}".format(tbl_name))
