This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 1008decc0 IMPALA-14447: Parallelize table loading in getMissingTables()
1008decc0 is described below
commit 1008decc0780fc4da9a3d35cafc5c93f9f3574e5
Author: Riza Suminto <[email protected]>
AuthorDate: Wed Sep 17 14:05:20 2025 -0700
IMPALA-14447: Parallelize table loading in getMissingTables()
StmtMetadataLoader.getMissingTables() load missing tables in serial
manner. In local catalog mode, large number of serial table loading can
incur significant round trip latency to CatalogD. This patch parallelize
the table loading by using executor service to lookup and gather all
non-null FeTables from given TableName set.
Modify LocalCatalog.loadDbs() and LocalDb.loadTableNames() slightly to
make it thread-safe. Change FrontendProfile.Scope to support nested
scope referencing the same FrontendProfile instance.
Added new flag max_stmt_metadata_loader_threads to control the maximum
number of threads to use for loading table metadata during query
compilation. It is deafult to 8 threads per query compilation.
If there is only one table to load, max_stmt_metadata_loader_threads set
to 1, or RejectedExecutionException raised, fallback to load table
serially.
Testing:
Run and pass few tests such as test_catalogd_ha.py,
test_concurrent_ddls.py, and test_observability.py.
Add FE tests CatalogdMetaProviderTest.testProfileParallelLoad.
Manually run following query and observe parallel loading by setting
TRACE level log in CatalogdMetaProvider.java.
use functional;
select count(*) from alltypesnopart
union select count(*) from alltypessmall
union select count(*) from alltypestiny
union select count(*) from alltypesagg;
Change-Id: I97a5165844ae846b28338d62e93a20121488d79f
Reviewed-on: http://gerrit.cloudera.org:8080/23436
Reviewed-by: Quanlong Huang <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/util/backend-gflag-util.cc | 6 +
common/thrift/BackendGflags.thrift | 2 +
.../java/org/apache/impala/analysis/Analyzer.java | 4 +
.../apache/impala/analysis/StmtMetadataLoader.java | 134 +++++++++++++++++++--
.../apache/impala/catalog/local/LocalCatalog.java | 46 ++++---
.../org/apache/impala/catalog/local/LocalDb.java | 30 ++---
.../org/apache/impala/service/BackendConfig.java | 4 +
.../org/apache/impala/service/FrontendProfile.java | 21 +++-
.../catalog/local/CatalogdMetaProviderTest.java | 51 +++++++-
9 files changed, 248 insertions(+), 50 deletions(-)
diff --git a/be/src/util/backend-gflag-util.cc
b/be/src/util/backend-gflag-util.cc
index d90df897c..afdc92c1a 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -302,6 +302,10 @@ DEFINE_int32(iceberg_catalog_num_threads, 16,
"Maximum number of threads to use for Iceberg catalog operations. These
threads are "
"shared among concurrent Iceberg catalog operation (ie.,
ExpireSnapshot).");
+DEFINE_int32(max_stmt_metadata_loader_threads, 8,
+ "Maximum number of threads to use for loading table metadata during query "
+ "compilation.");
+
// These coefficients have not been determined empirically. The write
coefficient
// matches the coefficient for a broadcast sender in DataStreamSink. The read
// coefficient matches the coefficient for an exchange receiver in
ExchandeNode.
@@ -358,6 +362,7 @@ DEFINE_validator(query_cpu_count_divisor,
&ValidatePositiveDouble);
DEFINE_validator(min_processing_per_thread, &ValidatePositiveInt64);
DEFINE_validator(query_cpu_root_factor, &ValidatePositiveDouble);
DEFINE_validator(iceberg_catalog_num_threads, &ValidatePositiveInt32);
+DEFINE_validator(max_stmt_metadata_loader_threads, &ValidatePositiveInt32);
DEFINE_validator(tuple_cache_cost_coefficient_write_bytes,
&ValidateNonnegativeDouble);
DEFINE_validator(tuple_cache_cost_coefficient_write_rows,
&ValidateNonnegativeDouble);
DEFINE_validator(tuple_cache_cost_coefficient_read_bytes,
&ValidateNonnegativeDouble);
@@ -590,6 +595,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_tuple_cache_cost_coefficient_read_rows(
FLAGS_tuple_cache_cost_coefficient_read_rows);
cfg.__set_min_jdbc_scan_cardinality(FLAGS_min_jdbc_scan_cardinality);
+
cfg.__set_max_stmt_metadata_loader_threads(FLAGS_max_stmt_metadata_loader_threads);
return Status::OK();
}
diff --git a/common/thrift/BackendGflags.thrift
b/common/thrift/BackendGflags.thrift
index 07f7e65f3..d8bc737f0 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -359,4 +359,6 @@ struct TBackendGflags {
164: required double tuple_cache_cost_coefficient_read_rows
165: required i32 min_jdbc_scan_cardinality
+
+ 166: required i32 max_stmt_metadata_loader_threads
}
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index fdfe2bd21..fd5b2e1b6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -1568,6 +1568,10 @@ public class Analyzer {
// Add paths rooted at a table with an unqualified and fully-qualified
table name.
List<TableName> candidateTbls = Path.getCandidateTables(rawPath,
getDefaultDb());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Candidate tables to lookup in catalog/cache for {} {}: {}",
pathType,
+ ToSqlUtils.getPathSql(rawPath), candidateTbls);
+ }
for (int tblNameIdx = 0; tblNameIdx < candidateTbls.size();
++tblNameIdx) {
TableName tblName = candidateTbls.get(tblNameIdx);
FeTable tbl = null;
diff --git
a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
index 01d974aa0..7ef9290c4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
@@ -19,12 +19,23 @@ package org.apache.impala.analysis;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
import org.apache.impala.authorization.TableMask;
import org.apache.impala.authorization.User;
@@ -36,11 +47,15 @@ import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.FeView;
import org.apache.impala.catalog.MaterializedViewHdfsTable;
import org.apache.impala.catalog.Table;
+import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
+import org.apache.impala.catalog.local.LocalCatalogException;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.InternalException;
+import org.apache.impala.common.Pair;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.Frontend;
+import org.apache.impala.service.FrontendProfile;
import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.EventSequence;
@@ -50,6 +65,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Loads all table and view metadata relevant for a single SQL statement and
returns the
@@ -70,8 +87,9 @@ public class StmtMetadataLoader {
private final TUniqueId queryId_;
// Results of the loading process. See StmtTableCache.
- private final Set<String> dbs_ = new HashSet<>();
- private final Map<TableName, FeTable> loadedOrFailedTbls_ = new HashMap<>();
+ // Use thread-safe collection for parallel stream in getMissingTables().
+ private final Set<String> dbs_ = Collections.synchronizedSet(new
HashSet<>());
+ private final Map<TableName, FeTable> loadedOrFailedTbls_ = new
ConcurrentHashMap<>();
// Metrics for the metadata load.
// Number of prioritizedLoad() RPCs issued to the catalogd.
@@ -310,6 +328,91 @@ public class StmtMetadataLoader {
needsAnyTableMasksInQuery_);
}
+ /**
+ * Loads the table 'tblName' from 'catalog' and returns it. If the table or
its
+ * database does not exist, returns null. If the table is already in
+ * 'loadedOrFailedTbls_', returns null. Everything called from here needs to
be
+ * reentrant.
+ */
+ private @Nullable Pair<TableName, FeTable> loadFeTable(
+ FeCatalog catalog, TableName tblName) {
+ if (loadedOrFailedTbls_.containsKey(tblName)) return null;
+ FeDb db = catalog.getDb(tblName.getDb());
+ if (db == null) return null;
+ dbs_.add(tblName.getDb());
+ FeTable tbl = db.getTable(tblName.getTbl());
+ if (tbl == null) return null;
+ return Pair.create(tblName, tbl);
+ }
+
+ /**
+ * Load 'tbls' serially.
+ */
+ private List<Pair<TableName, FeTable>> serialTableLoad(
+ final FeCatalog catalog, Set<TableName> tbls) {
+ LOG.trace("Serial load {}", tbls);
+ return tbls.stream()
+ .map(tblName -> loadFeTable(catalog, tblName))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Load 'tbls' using executor service to speed up table loadings from
CatalogD in case
+ * of local catalog mode.
+ */
+ private List<Pair<TableName, FeTable>> parallelTableLoad(final FeCatalog
catalog,
+ Set<TableName> tbls, int maxThreads) throws InternalException {
+ LOG.trace("Parallel load {} with {} max threads", tbls, maxThreads);
+ FrontendProfile profile = FrontendProfile.getCurrentOrNull();
+ String queryIdStr =
+ queryId_ == null ? "<unknown_query_id>" :
TUniqueIdUtil.PrintId(queryId_);
+ List<Pair<TableName, FeTable>> tables = new ArrayList<>();
+ ExecutorService executorService = null;
+ try {
+ executorService = Executors.newFixedThreadPool(maxThreads,
+ new ThreadFactoryBuilder()
+ .setNameFormat("MissingTableLoaderThread-" + queryIdStr + "-%d")
+ .build());
+ // Transform tbls to a list of tasks.
+ List<Callable<Pair<TableName, FeTable>>> tasks =
+ tbls.stream()
+ .map(tblName -> (Callable<Pair<TableName, FeTable>>) () -> {
+ try (FrontendProfile.Scope s =
+ FrontendProfile.newScopeWithExistingProfile(profile))
{
+ return loadFeTable(catalog, tblName);
+ }
+ })
+ .collect(Collectors.toList());
+ // Invoke all tasks and wait for them to finish.
+ List<Future<Pair<TableName, FeTable>>> futures =
executorService.invokeAll(tasks);
+ for (Future<Pair<TableName, FeTable>> future : futures) {
+ Pair<TableName, FeTable> nameTblPair = future.get();
+ if (nameTblPair != null) tables.add(nameTblPair);
+ }
+ } catch (ExecutionException ex) {
+ // Unwrap and rethrow the cause if it is one of the declared exceptions.
+ // TODO: Any other exceptions to propagate?
+ Throwables.propagateIfPossible(ex.getCause(),
+ InconsistentMetadataFetchException.class,
LocalCatalogException.class);
+ Throwables.propagateIfPossible(ex.getCause(), InternalException.class);
+ // Otherwise, wrap as InternalException and rethrow.
+ throw new InternalException(
+ "Caught exception during parallel table loading of " + tbls,
ex.getCause());
+ } catch (InterruptedException ex) {
+ // Wrap as InternalException and rethrow.
+ throw new InternalException(
+ "ExecutorService interrupted during parallel table loading of " +
tbls, ex);
+ } catch (RejectedExecutionException ex) {
+ // Parallel load rejected, fall back to serial load.
+ tables = serialTableLoad(catalog, tbls);
+ } finally {
+ if (executorService != null) executorService.shutdown();
+ }
+
+ return tables;
+ }
+
/**
* Determines whether the 'tbls' are loaded in the given catalog or not.
Adds the names
* of referenced databases that exist to 'dbs_', and loaded tables to
@@ -319,17 +422,24 @@ public class StmtMetadataLoader {
* Path.getCandidateTables(). Non-existent tables are ignored and not
returned or
* added to 'loadedOrFailedTbls_'.
*/
- private Set<TableName> getMissingTables(FeCatalog catalog, Set<TableName>
tbls,
- Map<TableName, FeTable> missingTblsSnapshot) {
+ private Set<TableName> getMissingTables(final FeCatalog catalog,
Set<TableName> tbls,
+ Map<TableName, FeTable> missingTblsSnapshot) throws InternalException {
Set<TableName> missingTbls = new HashSet<>();
+ if (tbls.isEmpty()) return missingTbls;
Set<TableName> viewTbls = new HashSet<>();
- for (TableName tblName: tbls) {
- if (loadedOrFailedTbls_.containsKey(tblName)) continue;
- FeDb db = catalog.getDb(tblName.getDb());
- if (db == null) continue;
- dbs_.add(tblName.getDb());
- FeTable tbl = db.getTable(tblName.getTbl());
- if (tbl == null) continue;
+
+ int maxThreads =
+ Math.min(tbls.size(),
BackendConfig.INSTANCE.getMaxStmtMetadataLoaderThreads());
+ List<Pair<TableName, FeTable>> tables = null;
+ if (maxThreads > 1 &&
BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) {
+ tables = parallelTableLoad(catalog, tbls, maxThreads);
+ } else {
+ tables = serialTableLoad(catalog, tbls);
+ }
+
+ for (Pair<TableName, FeTable> nameTblPair : tables) {
+ TableName tblName = nameTblPair.first;
+ FeTable tbl = nameTblPair.second;
if (!tbl.isLoaded()
|| (tbl instanceof FeIncompleteTable
&& ((FeIncompleteTable)
tbl).isLoadFailedByRecoverableError())) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 5301e48b2..07849d4bd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
@@ -78,7 +79,8 @@ public class LocalCatalog implements FeCatalog {
private final static Logger LOG =
LoggerFactory.getLogger(LocalCatalog.class);
private final MetaProvider metaProvider_;
- private Map<String, FeDb> dbs_ = new HashMap<>();
+ // Mapping of Db name to FeDb. Not cleared once populated.
+ private ImmutableMap<String, FeDb> dbs_ = null;
private Map<String, HdfsCachePool> hdfsCachePools_ = null;
private String nullPartitionKeyValue_;
// Catalog service id when MetaProvider is CatalogdMetaProvider
@@ -101,27 +103,33 @@ public class LocalCatalog implements FeCatalog {
return Catalog.filterCatalogObjectsByPattern(dbs_.values(), matcher);
}
+ /**
+ * Populate dbs_ if it is empty. This method is synchronized to avoid
+ * multiple threads trying to populate dbs_ at the same time.
+ */
private void loadDbs() {
- if (!dbs_.isEmpty()) return;
- Map<String, FeDb> dbs = new HashMap<>();
- List<String> names;
- try {
- names = metaProvider_.loadDbList();
- } catch (TException e) {
- throw new LocalCatalogException("Unable to load database names", e);
- }
- for (String dbName : names) {
- dbName = dbName.toLowerCase();
- if (dbs_.containsKey(dbName)) {
- dbs.put(dbName, dbs_.get(dbName));
- } else {
- dbs.put(dbName, new LocalDb(this, dbName));
+ // Do nothing if dbs_ is already populated.
+ if (dbs_ != null) return;
+
+ synchronized (this) {
+ // Check again to avoid redundant work.
+ if (dbs_ != null) return;
+ Map<String, FeDb> dbs = new HashMap<>();
+ List<String> names;
+ try {
+ names = metaProvider_.loadDbList();
+ } catch (TException e) {
+ throw new LocalCatalogException("Unable to load database names", e);
+ }
+ for (String dbName : names) {
+ String lowerDbName = dbName.toLowerCase();
+ dbs.putIfAbsent(lowerDbName, new LocalDb(this, lowerDbName));
}
- }
- Db bdb = BuiltinsDb.getInstance();
- dbs.put(bdb.getName(), bdb);
- dbs_ = dbs;
+ Db bdb = BuiltinsDb.getInstance();
+ dbs.putIfAbsent(bdb.getName(), bdb);
+ dbs_ = ImmutableMap.copyOf(dbs);
+ }
}
@Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
index 8669af064..f19d79f73 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalDb.java
@@ -19,11 +19,11 @@ package org.apache.impala.catalog.local;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.stream.Collectors;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
@@ -38,7 +38,6 @@ import org.apache.impala.catalog.FeTable;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.Function.CompareMode;
import org.apache.impala.catalog.TableLoadingException;
-import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TBriefTableMeta;
@@ -56,7 +55,6 @@ import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Database instance loaded from {@link LocalCatalog}.
*
@@ -74,7 +72,7 @@ public class LocalDb implements FeDb {
* Map from lower-cased table name to table object. Values will be
* null for tables which have not yet been loaded.
*/
- private Map<String, FeTable> tables_;
+ private ConcurrentHashMap<String, FeTable> tables_;
/**
* Map of function name to list of signatures for that function name.
@@ -198,17 +196,21 @@ public class LocalDb implements FeDb {
*/
private void loadTableNames() {
if (tables_ != null) return;
- Map<String, FeTable> newMap = new HashMap<>();
- try {
- MetaProvider metaProvider = catalog_.getMetaProvider();
- for (TBriefTableMeta meta : metaProvider.loadTableList(name_)) {
- newMap.put(meta.getName(), new LocalIncompleteTable(this, meta));
+ synchronized (this) {
+ if (tables_ != null) return;
+ ConcurrentHashMap<String, FeTable> newMap = new ConcurrentHashMap<>();
+ try {
+ MetaProvider metaProvider = catalog_.getMetaProvider();
+ for (TBriefTableMeta meta : metaProvider.loadTableList(name_)) {
+ newMap.put(meta.getName(), new LocalIncompleteTable(this, meta));
+ }
+ } catch (TException e) {
+ throw new LocalCatalogException(
+ String.format("Could not load table names for database '%s' from
HMS", name_),
+ e);
}
- } catch (TException e) {
- throw new LocalCatalogException(String.format(
- "Could not load table names for database '%s' from HMS", name_), e);
+ tables_ = newMap;
}
- tables_ = newMap;
}
@Override
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 871d9d1b5..b2239168c 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -620,4 +620,8 @@ public class BackendConfig {
public double getTupleCacheCostCoefficientReadRows() {
return backendCfg_.tuple_cache_cost_coefficient_read_rows;
}
+
+ public int getMaxStmtMetadataLoaderThreads() {
+ return backendCfg_.max_stmt_metadata_loader_threads;
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
index 6eec58394..343369809 100644
--- a/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
+++ b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
@@ -92,13 +92,24 @@ public class FrontendProfile {
/**
* Create a new profile, setting it as the current thread-local profile for
the
* length of the current scope. This is meant to be used in a
try-with-resources
- * statement. Supports at most one scope per thread. No nested scopes are
currently
- * allowed.
+ * statement. Nested scopes are only supported if they use the same
FrontendProfile
+ * instance.
*/
public static Scope createNewWithScope() {
return new Scope(new FrontendProfile());
}
+ /**
+ * Create a new scope with an existing profile. This is meant to be used in a
+ * try-with-resources statement. Nested scopes are only supported if they
use the same
+ * FrontendProfile instance.
+ * @param profile existing profile from main thread to use in the new scope.
+ * @return the new scope.
+ */
+ public static Scope newScopeWithExistingProfile(FrontendProfile profile) {
+ return new Scope(profile);
+ }
+
/**
* Get the profile attached to the current thread, throw
IllegalStateException if there
* is none.
@@ -255,7 +266,11 @@ public class FrontendProfile {
private Scope(FrontendProfile profile) {
oldThreadLocalValue_ = THREAD_LOCAL.get();
// TODO: remove when allowing nested scopes.
- Preconditions.checkState(oldThreadLocalValue_ == null);
+ if (oldThreadLocalValue_ != null) {
+ Preconditions.checkState(oldThreadLocalValue_ == profile,
+ "Nested FrontendProfile scopes only supported if they use the same
"
+ + "FrontendProfile instance");
+ }
THREAD_LOCAL.set(profile);
}
diff --git
a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index 64845affc..1c36223da 100644
---
a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++
b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -34,8 +34,8 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.analysis.TableName;
import org.apache.impala.catalog.FileDescriptor;
-import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.catalog.local.CatalogdMetaProvider.SizeOfWeigher;
import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
@@ -61,7 +61,6 @@ import org.apache.impala.thrift.TTable;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.TByteBuffer;
import org.apache.thrift.TConfiguration;
-import org.apache.thrift.transport.TTransportException;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
@@ -389,6 +388,54 @@ public class CatalogdMetaProviderTest {
assertTrue(counters.containsKey("CatalogFetch.StorageLoad.Time"));
}
+ @Test
+ public void testProfileParallelLoad() throws Exception {
+ FrontendProfile profile;
+ // All of these table has not been loaded yet.
+ List<TableName> tables = new ArrayList<>();
+ tables.add(new TableName("functional", "alltypesnopart"));
+ tables.add(new TableName("functional", "alltypessmall"));
+ tables.add(new TableName("functional", "alltypestiny"));
+ tables.add(new TableName("functional", "alltypesagg"));
+ try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+ profile = FrontendProfile.getCurrent();
+ assertNotNull(profile);
+ tables.parallelStream().forEach(t -> {
+ try (FrontendProfile.Scope s =
+ FrontendProfile.newScopeWithExistingProfile(profile)) {
+ // Load the table. This will create a Table miss.
+ Pair<Table, TableMetaRef> pair = provider_.loadTable(t.getDb(),
t.getTbl());
+ // Load all partition ids. This will create a PartitionLists miss.
+ List<PartitionRef> allRefs =
provider_.loadPartitionList(pair.second);
+ // Load all partitions. This will create one partition miss per
partition.
+ loadPartitions(pair.second, allRefs);
+ } catch (Exception e) { throw new RuntimeException(e); }
+ });
+ }
+ TRuntimeProfileNode prof = profile.emitAsThrift();
+ Map<String, TCounter> counters = Maps.uniqueIndex(prof.counters,
TCounter::getName);
+ assertEquals(prof.counters.toString(), 16, counters.size());
+ assertEquals(0, counters.get("CatalogFetch.Tables.Hits").getValue());
+ assertEquals(4, counters.get("CatalogFetch.Tables.Misses").getValue());
+ assertEquals(4, counters.get("CatalogFetch.Tables.Requests").getValue());
+ assertTrue(counters.containsKey("CatalogFetch.Tables.Time"));
+ assertEquals(0,
counters.get("CatalogFetch.PartitionLists.Hits").getValue());
+ assertEquals(4,
counters.get("CatalogFetch.PartitionLists.Misses").getValue());
+ assertEquals(4,
counters.get("CatalogFetch.PartitionLists.Requests").getValue());
+ assertTrue(counters.containsKey("CatalogFetch.PartitionLists.Time"));
+ assertEquals(0, counters.get("CatalogFetch.Partitions.Hits").getValue());
+ assertEquals(20,
counters.get("CatalogFetch.Partitions.Misses").getValue());
+ assertEquals(20,
counters.get("CatalogFetch.Partitions.Requests").getValue());
+ assertTrue(counters.containsKey("CatalogFetch.Partitions.Time"));
+ assertTrue(counters.containsKey("CatalogFetch.RPCs.Bytes"));
+ assertTrue(counters.containsKey("CatalogFetch.RPCs.Time"));
+ // 3 RPCs per table: one for fetching table, one for partition list,
+ // and the other one for fetching partitions.
+ assertEquals(12, counters.get("CatalogFetch.RPCs.Requests").getValue());
+ // Should contains StorageLoad.Time since we have loaded partitions from
catalogd.
+ assertTrue(counters.containsKey("CatalogFetch.StorageLoad.Time"));
+ }
+
@Test
public void testPiggybackSuccess() throws Exception {
// TODO: investigate the cause of flakiness (IMPALA-8794)