This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 77d80aeda IMPALA-11812: Deduplicate column schema in hmsPartitions
77d80aeda is described below
commit 77d80aeda653b3aecb8bc41bf867cc5a84ba1245
Author: stiga-huang <[email protected]>
AuthorDate: Tue Dec 27 12:24:44 2022 +0800
IMPALA-11812: Deduplicate column schema in hmsPartitions
A list of HMS Partitions will be created in many workloads in catalogd,
e.g. table loading, bulk altering partitions by ComputeStats or
AlterTableRecoverPartitions, etc. Currently, each of hmsPartition hold a
unique list of column schema, i.e. a List<FieldSchema>. This results in
lots of FieldSchema instances if the table is wide and lots of
partitions need to be loaded/operated. Though the strings of column
names and comments are interned, the FieldSchema objects could still
occupy the majority of the heap. See the histogram in JIRA description.
In reality, the hmsPartition instances of a table can share the
table-level column schema since Impala doesn't respect the partition
level schema.
This patch replaces column list in StorageDescriptor of hmsPartitions
with the table level column list to remove the duplications. Also add
some progress logs in batch HMS operations, and avoid misleading logs
when event-processor is disabled.
Tests:
- Ran exhaustive tests
- Add tests on wide table operations that hit OOM errors without this
fix.
Change-Id: I511ecca0ace8bea4c24a19a54fb0a75390e50c4d
Reviewed-on: http://gerrit.cloudera.org:8080/19391
Reviewed-by: Aman Sinha <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../java/org/apache/impala/catalog/HdfsTable.java | 13 ++--
.../impala/catalog/events/MetastoreEvents.java | 2 +
.../impala/catalog/local/DirectMetaProvider.java | 2 +-
.../apache/impala/service/CatalogOpExecutor.java | 42 ++++++----
.../java/org/apache/impala/util/MetaStoreUtil.java | 86 +++++++++++++++-----
.../functional/functional_schema_template.sql | 11 +++
.../datasets/functional/schema_constraints.csv | 3 +
tests/common/custom_cluster_test_suite.py | 7 +-
tests/custom_cluster/test_wide_table_operations.py | 91 ++++++++++++++++++++++
9 files changed, 212 insertions(+), 45 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 2ffdf8db0..99a34a877 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1269,7 +1269,7 @@ public class HdfsTable extends Table implements FeFsTable
{
// Load all partitions from Hive Metastore, including file metadata.
List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
MetaStoreUtil.fetchAllPartitions(
- client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
+ client, msTbl, NUM_PARTITION_FETCH_RETRIES);
LOG.info("Fetched partition metadata from the Metastore: " +
getFullName());
storageMetadataLoadTime_ = loadAllPartitions(client, msPartitions,
msTbl);
allPartitionsLdContext.stop();
@@ -1574,11 +1574,11 @@ public class HdfsTable extends Table implements
FeFsTable {
if (partitionsToUpdate != null) {
partitionList = MetaStoreUtil
.fetchPartitionsByName(client,
Lists.newArrayList(partitionsToUpdate),
- db_.getName(), name_);
+ msTable_);
} else {
partitionList =
MetaStoreUtil.fetchAllPartitions(
- client_, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
+ client_, msTable_, NUM_PARTITION_FETCH_RETRIES);
}
LOG.debug("Time taken to fetch all partitions of table {}: {} msec",
getFullName(),
sw.stop().elapsed(TimeUnit.MILLISECONDS));
@@ -1838,6 +1838,8 @@ public class HdfsTable extends Table implements FeFsTable
{
}
addVirtualColumns();
isSchemaLoaded_ = true;
+ LOG.info("Loaded {} columns from HMS. Actual columns: {}",
+ nonPartFieldSchemas_.size() + numClusteringCols_, colsByPos_.size());
}
/**
@@ -1858,7 +1860,7 @@ public class HdfsTable extends Table implements FeFsTable
{
// Load partition metadata from Hive Metastore.
List<Partition> msPartitions = new ArrayList<>(
MetaStoreUtil.fetchPartitionsByName(
- client, Lists.newArrayList(partitionNames), db_.getName(), name_));
+ client, Lists.newArrayList(partitionNames), msTable_));
return loadPartitionsFromMetastore(msPartitions, inprogressPartBuilders,
partitionToEventId, client);
}
@@ -2756,8 +2758,7 @@ public class HdfsTable extends Table implements FeFsTable
{
List<Partition> hmsPartitions;
Map<Partition, HdfsPartition> hmsPartToHdfsPart = new HashMap<>();
try {
- hmsPartitions = client.getPartitionsByNames(getDb().getName(),
- getName(), partNames);
+ hmsPartitions = MetaStoreUtil.fetchPartitionsByName(client, partNames,
msTable_);
for (Partition partition : hmsPartitions) {
List<LiteralExpr> partExprs =
getTypeCompatiblePartValues(partition.getValues());
HdfsPartition hdfsPartition = getPartition(partExprs);
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 4bfb2c8a4..44d09150f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -74,6 +74,7 @@ import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.MetaStoreUtil;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
@@ -1873,6 +1874,7 @@ public class MetastoreEvents {
// it is possible that the added partitions is empty in certain cases.
See
// IMPALA-8847 for example
msTbl_ = addPartitionMessage_.getTableObj();
+ MetaStoreUtil.replaceSchemaFromTable(addedPartitions_, msTbl_);
partitionKeyVals_ = new ArrayList<>(addedPartitions_.size());
for (Partition part : addedPartitions_) {
partitionKeyVals_.add(getTPartitionSpecFromHmsPartition(msTbl_,
part));
diff --git
a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 20ce56ada..90bdba126 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -236,7 +236,7 @@ class DirectMetaProvider implements MetaProvider {
List<Partition> parts;
try (MetaStoreClient c = msClientPool_.getClient()) {
parts = MetaStoreUtil.fetchPartitionsByName(
- c.getHiveClient(), partNames, tableImpl.dbName_,
tableImpl.tableName_);
+ c.getHiveClient(), partNames, tableImpl.msTable_);
}
// HMS may return fewer partition objects than requested, and the
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ed896c42a..6f3fa24e9 100755
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -174,7 +174,6 @@ import
org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
import org.apache.impala.thrift.TAlterTableType;
import org.apache.impala.thrift.TAlterTableUnSetTblPropertiesParams;
import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
-import org.apache.impala.thrift.TBucketInfo;
import org.apache.impala.thrift.TBucketType;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
@@ -4127,7 +4126,8 @@ public class CatalogOpExecutor {
if (allHmsPartitionsToAdd.isEmpty()) return null;
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
- Map<String, Long> partitionToEventId = Maps.newHashMap();
+ Map<String, Long> partitionToEventId =
catalog_.isEventProcessingActive() ?
+ Maps.newHashMap() : null;
List<Partition> addedHmsPartitions =
addHmsPartitionsInTransaction(msClient,
tbl, allHmsPartitionsToAdd, partitionToEventId, ifNotExists);
// Handle HDFS cache. This is done in a separate round bacause we have
to apply
@@ -4142,7 +4142,7 @@ public class CatalogOpExecutor {
List<Partition> difference = computeDifference(allHmsPartitionsToAdd,
addedHmsPartitions);
addedHmsPartitions.addAll(
- getPartitionsFromHms(msTbl, msClient, tableName, difference));
+ getPartitionsFromHms(msTbl, msClient, difference));
}
addHdfsPartitions(msClient, tbl, addedHmsPartitions, partitionToEventId);
}
@@ -4720,16 +4720,21 @@ public class CatalogOpExecutor {
*/
private List<Partition> addHmsPartitions(MetaStoreClient msClient,
Table tbl, List<Partition> allHmsPartitionsToAdd,
- Map<String, Long> partitionToEventId, boolean ifNotExists)
+ @Nullable Map<String, Long> partitionToEventId, boolean ifNotExists)
throws ImpalaRuntimeException, CatalogException {
long eventId = getCurrentEventId(msClient);
List<Partition> addedHmsPartitions = Lists
.newArrayListWithCapacity(allHmsPartitionsToAdd.size());
+ long numDone = 0;
for (List<Partition> hmsSublist :
Lists.partition(allHmsPartitionsToAdd, MAX_PARTITION_UPDATES_PER_RPC))
{
try {
- List<Partition> addedPartitions = msClient.getHiveClient()
- .add_partitions(hmsSublist, ifNotExists, true);
+ List<Partition> addedPartitions = MetaStoreUtil.addPartitions(
+ msClient.getHiveClient(), tbl.getMetaStoreTable(),
+ hmsSublist, ifNotExists, true);
+ numDone += hmsSublist.size();
+ LOG.info("Added {}/{} partitions in HMS for table {}", numDone,
+ allHmsPartitionsToAdd.size(), tbl.getFullName());
org.apache.hadoop.hive.metastore.api.Table msTbl =
tbl.getMetaStoreTable();
List<NotificationEvent> events =
getNextMetastoreEventsIfEnabled(eventId,
event -> AddPartitionEvent.ADD_PARTITION_EVENT_TYPE
@@ -4748,6 +4753,7 @@ public class CatalogOpExecutor {
// add_partitions call above.
addedHmsPartitions.addAll(addedPartitions);
} else {
+ Preconditions.checkNotNull(partitionToEventId);
addedHmsPartitions.addAll(partitionToEventSubMap.keySet());
// we cannot keep a mapping of Partition to event ids because the
// partition objects are changed later in the cachePartitions code
path.
@@ -4815,8 +4821,7 @@ public class CatalogOpExecutor {
*/
private List<Partition> getPartitionsFromHms(
org.apache.hadoop.hive.metastore.api.Table msTbl, MetaStoreClient
msClient,
- TableName tableName, List<Partition> hmsPartitions)
- throws ImpalaException {
+ List<Partition> hmsPartitions) throws ImpalaException {
List<String> partitionCols = Lists.newArrayList();
for (FieldSchema fs: msTbl.getPartitionKeys())
partitionCols.add(fs.getName());
@@ -4827,8 +4832,8 @@ public class CatalogOpExecutor {
partitionNames.add(partName);
}
try {
- return msClient.getHiveClient().getPartitionsByNames(tableName.getDb(),
- tableName.getTbl(), partitionNames);
+ return MetaStoreUtil.fetchPartitionsByName(msClient.getHiveClient(),
+ partitionNames, msTbl);
} catch (TException e) {
throw new ImpalaRuntimeException("Metadata inconsistency has occured.
Please run "
+ "'invalidate metadata <tablename>' to resolve the problem.", e);
@@ -5693,7 +5698,8 @@ public class CatalogOpExecutor {
}
// Add partitions to metastore.
- Map<String, Long> partitionToEventId = Maps.newHashMap();
+ Map<String, Long> partitionToEventId = catalog_.isEventProcessingActive() ?
+ Maps.newHashMap() : null;
String annotation = String.format("Recovering %d partitions for %s",
hmsPartitions.size(), tbl.getFullName());
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation);
@@ -5703,6 +5709,7 @@ public class CatalogOpExecutor {
addHdfsPartitions(msClient, tbl, addedPartitions, partitionToEventId);
// Handle HDFS cache.
if (cachePoolName != null) {
+ int numDone = 0;
for (List<Partition> hmsSublist :
Lists.partition(addedPartitions, MAX_PARTITION_UPDATES_PER_RPC)) {
for (Partition partition: hmsSublist) {
@@ -5713,6 +5720,9 @@ public class CatalogOpExecutor {
// Update the partition metadata to include the cache directive id.
MetastoreShim.alterPartitions(msClient.getHiveClient(),
tableName.getDb(),
tableName.getTbl(), hmsSublist);
+ numDone += hmsSublist.size();
+ LOG.info("Updated cache directive id for {}/{} partitions for table
{}",
+ numDone, addedPartitions.size(), tableName);
}
}
} catch (TException e) {
@@ -5784,7 +5794,7 @@ public class CatalogOpExecutor {
partition.setDbName(tableName.getDb());
partition.setTableName(tableName.getTbl());
partition.setValues(partitionSpecValues);
- StorageDescriptor sd = msTbl.getSd().deepCopy();
+ StorageDescriptor sd =
MetaStoreUtil.shallowCopyStorageDescriptor(msTbl.getSd());
sd.setLocation(location);
partition.setSd(sd);
return partition;
@@ -6131,6 +6141,7 @@ public class CatalogOpExecutor {
String dbName = tbl.getDb().getName();
String tableName = tbl.getName();
+ int numDone = 0;
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
// Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'.
for (List<Partition> msPartitionsSubList : Iterables.partition(
@@ -6144,6 +6155,9 @@ public class CatalogOpExecutor {
MetastoreShim.alterPartitions(msClient.getHiveClient(), dbName,
tableName,
msPartitionsSubList);
}
+ numDone += msPartitionsSubList.size();
+ LOG.info("HMS alterPartitions done on {}/{} partitions of table {}",
numDone,
+ msPartitionToBuilders.size(), tbl.getFullName());
// Mark the corresponding HdfsPartition objects as dirty
for (Partition msPartition : msPartitionsSubList) {
HdfsPartition.Builder partBuilder =
msPartitionToBuilders.get(msPartition);
@@ -6497,9 +6511,7 @@ public class CatalogOpExecutor {
partition.setDbName(tblName.getDb());
partition.setTableName(tblName.getTbl());
partition.setValues(MetaStoreUtil.getPartValsFromName(msTbl,
partName));
- partition.setParameters(new HashMap<String, String>());
- partition.setSd(msTbl.getSd().deepCopy());
-
partition.getSd().setSerdeInfo(msTbl.getSd().getSerdeInfo().deepCopy());
+
partition.setSd(MetaStoreUtil.shallowCopyStorageDescriptor(msTbl.getSd()));
partition.getSd().setLocation(msTbl.getSd().getLocation() + "/"
+ partName);
addCatalogServiceIdentifiers(msTbl, partition);
MetastoreShim.updatePartitionStatsFast(partition, msTbl,
warehouse);
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index 6cd6ada23..0e667b4b4 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -17,16 +17,11 @@
package org.apache.impala.util;
-import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Collection;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -35,7 +30,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.impala.catalog.CatalogException;
@@ -156,16 +150,17 @@ public class MetaStoreUtil {
* generally mean the connection is broken or has timed out. The HiveClient
supports
* configuring retires at the connection level so it can be enabled
independently.
*/
- public static List<org.apache.hadoop.hive.metastore.api.Partition>
fetchAllPartitions(
- IMetaStoreClient client, String dbName, String tblName, int numRetries)
- throws MetaException, TException {
+ public static List<Partition> fetchAllPartitions(IMetaStoreClient client,
Table msTbl,
+ int numRetries) throws TException {
+ String dbName = msTbl.getDbName();
+ String tblName = msTbl.getTableName();
Preconditions.checkArgument(numRetries >= 0);
int retryAttempt = 0;
while (true) {
try {
// First, get all partition names that currently exist.
List<String> partNames = client.listPartitionNames(dbName, tblName,
(short) -1);
- return MetaStoreUtil.fetchPartitionsByName(client, partNames, dbName,
tblName);
+ return MetaStoreUtil.fetchPartitionsByName(client, partNames, msTbl);
} catch (MetaException e) {
// Only retry for MetaExceptions, since TExceptions could indicate a
broken
// connection which we can't recover from by retrying.
@@ -184,31 +179,80 @@ public class MetaStoreUtil {
/**
* Given a List of partition names, fetches the matching Partitions from the
HMS
* in batches. Each batch will contain at most 'maxPartsPerRpc' partitions.
- * Returns a List containing all fetched Partitions.
- * Will throw a MetaException if any partitions in 'partNames' do not exist.
+ * The partition-level schema will be replaced with the table's to reduce
memory
+ * footprint (IMPALA-11812).
+ * @return a List containing all fetched Partitions.
+ * @throws MetaException if any partitions in 'partNames' do not exist.
+ * @throws TException for RPC failures.
*/
- public static List<Partition> fetchPartitionsByName(
- IMetaStoreClient client, List<String> partNames, String dbName, String
tblName)
- throws MetaException, TException {
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format("Fetching %d partitions for: %s.%s using
partition " +
- "batch size: %d", partNames.size(), dbName, tblName,
maxPartitionsPerRpc_));
- }
+ public static List<Partition> fetchPartitionsByName(IMetaStoreClient client,
+ List<String> partNames, Table msTbl) throws TException {
+ LOG.info("Fetching {} partitions for: {}.{} using partition batch size:
{}",
+ partNames.size(), msTbl.getDbName(), msTbl.getTableName(),
+ maxPartitionsPerRpc_);
List<org.apache.hadoop.hive.metastore.api.Partition> fetchedPartitions =
Lists.newArrayList();
// Fetch the partitions in batches.
+ int numDone = 0;
for (int i = 0; i < partNames.size(); i += maxPartitionsPerRpc_) {
// Get a subset of partition names to fetch.
List<String> partsToFetch =
partNames.subList(i, Math.min(i + maxPartitionsPerRpc_,
partNames.size()));
// Fetch these partitions from the metastore.
- fetchedPartitions.addAll(
- client.getPartitionsByNames(dbName, tblName, partsToFetch));
+ List<Partition> partitions = client.getPartitionsByNames(
+ msTbl.getDbName(), msTbl.getTableName(), partsToFetch);
+ replaceSchemaFromTable(partitions, msTbl);
+ fetchedPartitions.addAll(partitions);
+ numDone += partitions.size();
+ LOG.info("Fetched {}/{} partitions for table {}.{}", numDone,
partNames.size(),
+ msTbl.getDbName(), msTbl.getTableName());
}
return fetchedPartitions;
}
+ /**
+ * A wrapper for HMS add_partitions API to replace the partition-level
schema with
+ * table's to save memory.
+ * @return a List containing all added Partitions.
+ */
+ public static List<Partition> addPartitions(IMetaStoreClient client, Table
tbl,
+ List<Partition> partitions, boolean ifNotExists, boolean needResults)
+ throws TException {
+ List<Partition> addedPartitions = client.add_partitions(partitions,
+ ifNotExists, needResults);
+ replaceSchemaFromTable(addedPartitions, tbl);
+ return addedPartitions;
+ }
+
+ /**
+ * Replace the column list in the given partitions with the table level
schema to save
+ * memory for wide tables (IMPALA-11812). Note that Impala never use the
partition
+ * level schema.
+ */
+ public static void replaceSchemaFromTable(List<Partition> partitions, Table
msTbl) {
+ for (Partition p : partitions) {
+ p.getSd().setCols(msTbl.getSd().getCols());
+ }
+ }
+
+ /**
+ * Shallow copy the given StorageDescriptor.
+ */
+ public static StorageDescriptor
shallowCopyStorageDescriptor(StorageDescriptor other) {
+ return new StorageDescriptor(
+ other.getCols(),
+ other.getLocation(),
+ other.getInputFormat(),
+ other.getOutputFormat(),
+ other.isCompressed(),
+ other.getNumBuckets(),
+ other.getSerdeInfo(),
+ other.getBucketCols(),
+ other.getSortCols(),
+ other.getParameters());
+ }
+
/**
* Checks that a given 'property' is short enough for HMS to handle. If not,
throws an
* 'AnalysisException' with 'name' as its prefix.
diff --git a/testdata/datasets/functional/functional_schema_template.sql
b/testdata/datasets/functional/functional_schema_template.sql
index 08cb1a4aa..8ac045275 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2388,6 +2388,17 @@ select * from functional.{table_name};
---- DATASET
functional
---- BASE_TABLE_NAME
+widetable_2000_cols_partitioned
+---- PARTITION_COLUMNS
+p int
+---- COLUMNS
+`${IMPALA_HOME}/testdata/common/widetable.py --get_columns -n 2000
+---- ROW_FORMAT
+delimited fields terminated by ',' escaped by '\\'
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
avro_decimal_tbl
---- COLUMNS
name STRING
diff --git a/testdata/datasets/functional/schema_constraints.csv
b/testdata/datasets/functional/schema_constraints.csv
index 45d770e85..ad5abee48 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -361,3 +361,6 @@ table_name:alltypestiny_negative, constraint:restrict_to,
table_format:orc/def/b
table_name:insert_only_minor_compacted, constraint:restrict_to,
table_format:parquet/none/none
table_name:insert_only_major_and_minor_compacted, constraint:restrict_to,
table_format:parquet/none/none
+
+# The table is used in large scale metadata test. File format doesn't matter
so restrict to text only
+table_name:widetable_2000_cols_partitioned, constraint:restrict_to,
table_format:text/none/none
diff --git a/tests/common/custom_cluster_test_suite.py
b/tests/common/custom_cluster_test_suite.py
index 6e8205532..65a2fe8d5 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -43,6 +43,7 @@ ADMISSIOND_ARGS = 'admissiond_args'
KUDU_ARGS = 'kudu_args'
# Additional args passed to the start-impala-cluster script.
START_ARGS = 'start_args'
+JVM_ARGS = 'jvm_args'
HIVE_CONF_DIR = 'hive_conf_dir'
CLUSTER_SIZE = "cluster_size"
# Default query options passed to the impala daemon command line. Handled
separately from
@@ -105,7 +106,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
@staticmethod
def with_args(impalad_args=None, statestored_args=None, catalogd_args=None,
- start_args=None, default_query_options=None,
+ start_args=None, default_query_options=None, jvm_args=None,
impala_log_dir=None, hive_conf_dir=None, cluster_size=None,
num_exclusive_coordinators=None, kudu_args=None,
statestored_timeout_s=None,
impalad_timeout_s=None, expect_cores=None, reset_ranger=False):
@@ -119,6 +120,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
func.func_dict[CATALOGD_ARGS] = catalogd_args
if start_args is not None:
func.func_dict[START_ARGS] = start_args.split()
+ if jvm_args is not None:
+ func.func_dict[JVM_ARGS] = jvm_args
if hive_conf_dir is not None:
func.func_dict[HIVE_CONF_DIR] = hive_conf_dir
if kudu_args is not None:
@@ -144,7 +147,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
def setup_method(self, method):
cluster_args = list()
- for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS,
ADMISSIOND_ARGS]:
+ for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS,
ADMISSIOND_ARGS, JVM_ARGS]:
if arg in method.func_dict:
cluster_args.append("--%s=%s " % (arg, method.func_dict[arg]))
if START_ARGS in method.func_dict:
diff --git a/tests/custom_cluster/test_wide_table_operations.py
b/tests/custom_cluster/test_wide_table_operations.py
new file mode 100644
index 000000000..e1081aff1
--- /dev/null
+++ b/tests/custom_cluster/test_wide_table_operations.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pytest
+from subprocess import call
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIf
+
+TBL_NAME = "widetable_2000_cols_partitioned"
+NUM_PARTS = 50000
+
+
[email protected]_hdfs
+class TestWideTableOperations(CustomClusterTestSuite):
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive since it takes more than 20 mins')
+ super(TestWideTableOperations, cls).setup_class()
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ jvm_args="-Xmx2g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="
+ + os.getenv("LOG_DIR", "/tmp"))
+ def test_wide_table_operations(self, vector, unique_database):
+ """Regression test for IMPALA-11812. Test DDL/DML operations on wide table.
+ Use a small heap size (2GB) to make sure memory consumption is optimized.
+ Each FieldSchema instance takes 24 bytes in a small heap (<32GB). Without
the fix,
+ catalogd will hold at least 50,000 (parts) * 2,000 (cols) = 100,000,000
FieldSchema
+ instances in memory for execDdl or table loading, which already takes more
than 2GB
+ and will results in OOM failures."""
+ # Create partition dirs and files locally
+ tmp_dir = "/tmp/" + TBL_NAME
+ os.mkdir(tmp_dir)
+ for i in range(NUM_PARTS):
+ part_dir = tmp_dir + "/p=" + str(i)
+ data_file = part_dir + "/data.txt"
+ os.mkdir(part_dir)
+ with open(data_file, 'w') as local_file:
+ local_file.write("true")
+ # Upload files to HDFS
+ hdfs_dir = self._get_table_location("functional." + TBL_NAME, vector)
+ call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", hdfs_dir])
+ # Use 1 replica to save space, 8 threads to speed up
+ call(["hdfs", "dfs", "-Ddfs.replication=1", "-put", "-t", "8", tmp_dir,
hdfs_dir])
+ # Create a new table so we don't need to drop partitions at the end.
+ # It will be dropped when 'unique_database' is dropped.
+ create_tbl_ddl =\
+ "create external table {db}.{tbl} like functional.{tbl} " \
+ "location '{location}'".format(
+ db=unique_database, tbl=TBL_NAME, location=hdfs_dir)
+ self.execute_query_expect_success(
+ self.client, create_tbl_ddl.format(db=unique_database, tbl=TBL_NAME))
+
+ # Recover partitions first. This takes 10mins for 50k partitions.
+ recover_stmt = "alter table {db}.{tbl} recover partitions"
+ # Invalidate the table to test initial metadata loading
+ invalidate_stmt = "invalidate metadata {db}.{tbl}"
+ # Test initial table loading and get all partitions
+ show_parts_stmt = "show partitions {db}.{tbl}"
+ try:
+ self.execute_query_expect_success(
+ self.client, recover_stmt.format(db=unique_database, tbl=TBL_NAME))
+ self.execute_query_expect_success(
+ self.client, invalidate_stmt.format(db=unique_database,
tbl=TBL_NAME))
+ res = self.execute_query_expect_success(
+ self.client, show_parts_stmt.format(db=unique_database,
tbl=TBL_NAME))
+ # Last line is 'Total'
+ assert len(res.data) == NUM_PARTS + 1
+ finally:
+ call(["rm", "-rf", tmp_dir])
+ call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", hdfs_dir])