This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 01b8b4525 IMPALA-13620: Refresh compute_table_stats.py script
01b8b4525 is described below
commit 01b8b45252d50e4887278ae60b6bcf37c68440bb
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Dec 17 11:53:43 2024 -0800
IMPALA-13620: Refresh compute_table_stats.py script
This patch refreshes compute_table_stats.py script with the following
changes:
- Limit parallelism to IMPALA_BUILD_THREADS at maximum if --parallelism
argument is not set.
- Change its default connection to hs2, leveraging existing
ImpylaHS2Connection.
- Change OptionParser to ArgumentParser.
- Use impala-python3 to run the script.
- Add --exclude_table_names to skip running COMPUTE STATS on certain
tables/views.
- continue_on_error is False by default.
This patch also improves query handle logging in ImpylaHS2Connection.
collect_profile_and_log argument is added to control whether to pull
logs and runtime profile at the end of __fetch_results(). The default
behavior remains unchanged.
Skip COMPUTE STATS for functional_kudu.alltypesagg and
functional_kudu.manynulls because it is invalid to run COMPUTE STATS
over view.
Customized hive-site.xml to set datanucleus.connectionPool.maxPoolSize
to 30 and hikaricp.connectionTimeout to 60000 ms. Also set hive.log.dir
to ${IMPALA_CLUSTER_LOGS_DIR}/hive.
Testing:
Repeatedly run compute-table-stats.sh from cold state and confirm there
is no error occurs. This is the script to do so from active minicluster:
cd $IMPALA_HOME
./bin/start-impala-cluster.py --kill
./testdata/bin/kill-hive-server.sh
./testdata/bin/run-hive-server.sh
./bin/start-impala-cluster.py
./testdata/bin/compute-table-stats.sh > /tmp/compute-stats.txt 2>&1
grep error /tmp/compute-stats.txt
Core tests ran and passed.
Change-Id: I1ebf02f95b957e7dda3a30622b87e8fca3197699
Reviewed-on: http://gerrit.cloudera.org:8080/22231
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
fe/src/test/resources/hive-site.xml.py | 2 +
testdata/bin/compute-table-stats.sh | 15 ++-
testdata/bin/run-hive-server.sh | 6 +-
tests/common/impala_connection.py | 64 ++++++++-----
tests/util/compute_table_stats.py | 165 ++++++++++++++++++++++-----------
5 files changed, 167 insertions(+), 85 deletions(-)
diff --git a/fe/src/test/resources/hive-site.xml.py
b/fe/src/test/resources/hive-site.xml.py
index 9b9c2dbfd..137b55d28 100644
--- a/fe/src/test/resources/hive-site.xml.py
+++ b/fe/src/test/resources/hive-site.xml.py
@@ -220,8 +220,10 @@ CONFIG.update({
'datanucleus.autoCreateSchema': 'false',
'datanucleus.fixedDatastore': 'false',
'datanucleus.metadata.validate': 'false',
+ 'datanucleus.connectionPool.maxPoolSize': 30,
'javax.jdo.option.ConnectionUserName': 'hiveuser',
'javax.jdo.option.ConnectionPassword': 'password',
+ 'hikaricp.connectionTimeout': 60000,
})
if db_type == 'postgres':
CONFIG.update({
diff --git a/testdata/bin/compute-table-stats.sh
b/testdata/bin/compute-table-stats.sh
index 40001eaa4..76e1ee757 100755
--- a/testdata/bin/compute-table-stats.sh
+++ b/testdata/bin/compute-table-stats.sh
@@ -26,9 +26,10 @@ setup_report_build_error
. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1
# TODO: We need a better way of managing how these get set. See IMPALA-4346
-IMPALAD=${IMPALAD:-localhost:21000}
+IMPALAD_HS2=${IMPALAD_HS2:-localhost:21050}
-COMPUTE_STATS_SCRIPT="${IMPALA_HOME}/tests/util/compute_table_stats.py
--impalad=${IMPALAD}"
+COMPUTE_STATS_SCRIPT="${IMPALA_HOME}/tests/util/compute_table_stats.py\
+ --impalad=${IMPALAD_HS2}"
# Run compute stats over as many of the tables used in the Planner tests as
possible.
${COMPUTE_STATS_SCRIPT} --db_names=functional\
@@ -46,10 +47,8 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then
fi
${COMPUTE_STATS_SCRIPT} --db_names=tpch,tpch_parquet,tpch_orc_def \
--table_names=customer,lineitem,nation,orders,part,partsupp,region,supplier
-${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet,tpcds,tpcds_parquet
-${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu,tpch_kudu
+${COMPUTE_STATS_SCRIPT}
--db_names="tpch_nested_parquet,tpch_kudu,tpcds,tpcds_parquet,\
+ tpcds_partitioned_parquet_snap"
+${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu \
+ --exclude_table_names="alltypesagg,manynulls"
-# Compute tables of tpcds_partitioned_parquet_snap serially
-# due to large number of partitions in some of the fact tables.
-${COMPUTE_STATS_SCRIPT} --db_names=tpcds_partitioned_parquet_snap \
- --parallelism=1
diff --git a/testdata/bin/run-hive-server.sh b/testdata/bin/run-hive-server.sh
index f64d4cdd5..dccf4e6a2 100755
--- a/testdata/bin/run-hive-server.sh
+++ b/testdata/bin/run-hive-server.sh
@@ -159,7 +159,8 @@ export
KUDU_SKIP_HMS_PLUGIN_VALIDATION=${KUDU_SKIP_HMS_PLUGIN_VALIDATION:-1}
# To debug log4j2 loading issues, add to HADOOP_CLIENT_OPTS:
# -Dorg.apache.logging.log4j.simplelog.StatusLogger.level=TRACE
if [[ ${START_METASTORE} -eq 1 && -z $HMS_PID ]]; then
- HADOOP_CLIENT_OPTS="-Xmx2024m -Dhive.log.file=hive-metastore.log" hive \
+ HADOOP_CLIENT_OPTS="-Xmx2024m -Dhive.log.dir=${LOGDIR} \
+ -Dhive.log.file=hive-metastore.log" hive \
--service metastore -p $HIVE_METASTORE_PORT >>
${LOGDIR}/hive-metastore.out 2>&1 &
# Wait for the Metastore to come up because HiveServer2 relies on it being
live.
@@ -194,7 +195,8 @@ if [[ ${START_HIVESERVER} -eq 1 && -z $HS2_PID ]]; then
# Starts a HiveServer2 instance on the port specified by the
HIVE_SERVER2_THRIFT_PORT
# environment variable. HADOOP_HEAPSIZE should be set to at least 2048 to
avoid OOM
# when loading ORC tables like widerow.
- HADOOP_CLIENT_OPTS="-Xmx2048m -Dhive.log.file=hive-server2.log" hive \
+ HADOOP_CLIENT_OPTS="-Xmx2048m -Dhive.log.dir=${LOGDIR} \
+ -Dhive.log.file=hive-server2.log" hive \
--service hiveserver2 >> ${LOGDIR}/hive-server2.out 2>&1 &
# Wait for the HiveServer2 service to come up because callers of this script
diff --git a/tests/common/impala_connection.py
b/tests/common/impala_connection.py
index 2ebaacc2a..0bdf2a8ed 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -32,7 +32,7 @@ from RuntimeProfile.ttypes import TRuntimeProfileFormat
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
-LOG = logging.getLogger('impala_connection')
+LOG = logging.getLogger(__name__)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# All logging needs to be either executable SQL or a SQL comment (prefix with
--).
@@ -274,10 +274,12 @@ class ImpylaHS2Connection(ImpalaConnection):
TODO: implement support for kerberos, SSL, etc.
"""
def __init__(self, host_port, use_kerberos=False, is_hive=False,
- use_http_transport=False, http_path=""):
+ use_http_transport=False, http_path="", use_ssl=False,
+ collect_profile_and_log=True):
self.__host_port = host_port
self.__use_http_transport = use_http_transport
self.__http_path = http_path
+ self.__use_ssl = use_ssl
if use_kerberos:
raise NotImplementedError("Kerberos support not yet implemented")
# Impyla connection and cursor is initialised in connect(). We need to
reuse the same
@@ -289,6 +291,7 @@ class ImpylaHS2Connection(ImpalaConnection):
# Query options to send along with each query.
self.__query_options = {}
self._is_hive = is_hive
+ self._collect_profile_and_log = not is_hive and collect_profile_and_log
def set_configuration_option(self, name, value):
self.__query_options[name] = str(value)
@@ -309,7 +312,8 @@ class ImpylaHS2Connection(ImpalaConnection):
conn_kwargs['auth_mechanism'] = 'PLAIN'
self.__impyla_conn = impyla.connect(host=host, port=int(port),
use_http_transport=self.__use_http_transport,
- http_path=self.__http_path,
**conn_kwargs)
+ http_path=self.__http_path,
+ use_ssl=self.__use_ssl, **conn_kwargs)
# Get the default query options for the session before any modifications
are made.
self.__cursor = self.__impyla_conn.cursor(convert_types=False)
self.__default_query_options = {}
@@ -345,7 +349,7 @@ class ImpylaHS2Connection(ImpalaConnection):
return self.__cursor.fetchall()
def close_query(self, operation_handle):
- LOG.info("-- closing query for operation handle:
{0}".format(operation_handle))
+ self.log_handle(operation_handle, 'closing query for operation')
operation_handle.get_handle().close_operation()
def execute(self, sql_stmt, user=None,
profile_format=TRuntimeProfileFormat.STRING,
@@ -392,57 +396,68 @@ class ImpylaHS2Connection(ImpalaConnection):
raise
def cancel(self, operation_handle):
- LOG.info("-- canceling operation: {0}".format(operation_handle))
+ self.log_handle(operation_handle, 'canceling operation')
cursor = operation_handle.get_handle()
return cursor.cancel_operation(reset_state=False)
def get_query_id(self, operation_handle):
- """Return the string representation of the query id."""
- guid_bytes = \
- operation_handle.get_handle()._last_operation.handle.operationId.guid
+ """Return the string representation of the query id.
+ Return empty string if handle is already canceled or closed."""
+ last_op = operation_handle.get_handle()._last_operation
+ if last_op is None:
+ return ""
+ guid_bytes = last_op.handle.operationId.guid
# hex_codec works on bytes, so this needs to a decode() to get back to a
string
hi_str = codecs.encode(guid_bytes[7::-1], 'hex_codec').decode()
lo_str = codecs.encode(guid_bytes[16:7:-1], 'hex_codec').decode()
return "{0}:{1}".format(hi_str, lo_str)
+ def handle_id_for_logging(self, operation_handle):
+ query_id = self.get_query_id(operation_handle)
+ return query_id if query_id else str(operation_handle)
+
+ def log_handle(self, handle, message):
+ handle_id = self.handle_id_for_logging(handle)
+ LOG.info("-- {0}: {1}".format(handle_id, message))
+
def get_state(self, operation_handle):
- LOG.info("-- getting state for operation: {0}".format(operation_handle))
+ handle_id = self.handle_id_for_logging(operation_handle)
+ LOG.info("-- getting state for operation: {0}".format(handle_id))
cursor = operation_handle.get_handle()
return cursor.status()
def state_is_finished(self, operation_handle):
- LOG.info("-- checking finished state for operation:
{0}".format(operation_handle))
+ self.log_handle(operation_handle, 'checking finished state for operation')
cursor = operation_handle.get_handle()
# cursor.status contains a string representation of one of
# TCLIService.TOperationState.
return cursor.status() == "FINISHED_STATE"
def get_exec_summary(self, operation_handle):
- LOG.info("-- getting exec summary operation: {0}".format(operation_handle))
+ self.log_handle(operation_handle, 'getting exec summary operation')
cursor = operation_handle.get_handle()
# summary returned is thrift, not string.
return cursor.get_summary()
def get_runtime_profile(self, operation_handle, profile_format):
- LOG.info("-- getting runtime profile operation:
{0}".format(operation_handle))
+ self.log_handle(operation_handle, 'getting runtime profile operation')
cursor = operation_handle.get_handle()
return cursor.get_profile(profile_format=profile_format)
def wait_for_finished_timeout(self, operation_handle, timeout):
- LOG.info("-- waiting for query to reach FINISHED state:
{0}".format(operation_handle))
+ self.log_handle(operation_handle, 'waiting for query to reach FINISHED
state')
raise NotImplementedError("Not yet implemented for HS2 - states differ
from beeswax")
def wait_for_admission_control(self, operation_handle):
- LOG.info("-- waiting for completion of the admission control processing of
the "
- "query: {0}".format(operation_handle))
+ self.log_handle(operation_handle, 'waiting for completion of the admission
control')
raise NotImplementedError("Not yet implemented for HS2 - states differ
from beeswax")
def get_admission_result(self, operation_handle):
- LOG.info("-- getting the admission result: {0}".format(operation_handle))
+ self.log_handle(operation_handle, 'getting the admission result')
raise NotImplementedError("Not yet implemented for HS2 - states differ
from beeswax")
def get_log(self, operation_handle):
- LOG.info("-- getting log for operation: {0}".format(operation_handle))
+ self.log_handle(operation_handle, 'getting log for operation')
# HS2 includes non-error log messages that we need to filter out.
cursor = operation_handle.get_handle()
lines = [line for line in cursor.get_log().split('\n')
@@ -450,7 +465,7 @@ class ImpylaHS2Connection(ImpalaConnection):
return '\n'.join(lines)
def fetch(self, sql_stmt, handle, max_rows=-1):
- LOG.info("-- fetching results from: {0}".format(handle))
+ self.log_handle(handle, 'fetching results')
return self.__fetch_results(handle, max_rows)
def __fetch_results(self, handle, max_rows=-1,
@@ -471,7 +486,7 @@ class ImpylaHS2Connection(ImpalaConnection):
else:
result_tuples = cursor.fetchmany(max_rows)
- if not self._is_hive:
+ if not self._is_hive and self._collect_profile_and_log:
log = self.get_log(handle)
profile = self.get_runtime_profile(handle, profile_format=profile_format)
else:
@@ -520,16 +535,19 @@ class ImpylaHS2ResultSet(object):
def create_connection(host_port, use_kerberos=False, protocol='beeswax',
- is_hive=False):
+ is_hive=False, use_ssl=False, collect_profile_and_log=True):
if protocol == 'beeswax':
- c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos)
+ c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos,
+ use_ssl=use_ssl)
elif protocol == 'hs2':
c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
- is_hive=is_hive)
+ is_hive=is_hive, use_ssl=use_ssl,
+ collect_profile_and_log=collect_profile_and_log)
else:
assert protocol == 'hs2-http'
c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
- is_hive=is_hive, use_http_transport=True, http_path='cliservice')
+ is_hive=is_hive, use_http_transport=True, http_path='cliservice',
+ use_ssl=use_ssl, collect_profile_and_log=collect_profile_and_log)
# A hook in conftest sets tests.common.current_node. Skip for Hive
connections since
# Hive cannot modify client_identifier at runtime.
diff --git a/tests/util/compute_table_stats.py
b/tests/util/compute_table_stats.py
index bb25e6c67..206a70479 100755
--- a/tests/util/compute_table_stats.py
+++ b/tests/util/compute_table_stats.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env impala-python
+#!/usr/bin/env impala-python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -21,14 +21,22 @@
from __future__ import absolute_import, division, print_function
from contextlib import contextmanager
-from optparse import OptionParser
+from argparse import ArgumentParser
import logging
import multiprocessing
import multiprocessing.pool
+import os
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient
+from tests.common.impala_connection import create_connection
-def compute_stats_table(client_factory, db, table, continue_on_error):
+logging.basicConfig(level=logging.INFO,
+ format='%(asctime)s %(threadName)s: %(message)s')
+LOG = logging.getLogger(__name__)
+DEFAULT_PARALLELISM = int(
+ os.environ.get('IMPALA_BUILD_THREADS', multiprocessing.cpu_count()))
+
+
+def compute_stats_table(client_factory, db, table):
"""
Runs 'compute stats' on a given table. If continue_on_error is
True, exceptions computing statistics are swallowed.
@@ -36,17 +44,27 @@ def compute_stats_table(client_factory, db, table,
continue_on_error):
with client_factory() as impala_client:
db_table = "%s.%s" % (db, table)
statement = "compute stats %s" % (db_table,)
- logging.info('Executing: %s', statement)
+ LOG.info('Executing: %s', statement)
try:
result = impala_client.execute(statement)
- logging.info(" %s -> %s", db_table, ' '.join(result.data).strip())
- except:
- logging.exception(' Failed on table %s', db_table)
- if not continue_on_error:
- raise
+ LOG.info(" %s -> %s", db_table, ' '.join(result.data).strip())
+ return db_table
+ except Exception as e:
+ LOG.exception(' Failed on table %s', db_table)
+ raise e
+
+
+def log_completion(completed, total_tables, error=None):
+ if error:
+ LOG.error("Completed COMPUTE STATS for %d/%d tables with error.",
+ completed, total_tables, exc_info=error)
+ else:
+ LOG.info("Completed COMPUTE STATS for %d/%d tables.", completed,
total_tables)
+
def compute_stats(client_factory, db_names=None, table_names=None,
- continue_on_error=False, parallelism=multiprocessing.cpu_count()):
+ exclude_table_names=None, continue_on_error=False,
+ parallelism=DEFAULT_PARALLELISM):
"""
Runs COMPUTE STATS over the selected tables. The target tables can be
filtered by
specifying a list of databases and/or table names. If no filters are
specified this will
@@ -55,68 +73,111 @@ def compute_stats(client_factory, db_names=None,
table_names=None,
parallelism controls the size of the thread pool to which compute_stats
is sent.
"""
- logging.info("Enumerating databases and tables for compute stats.")
+ LOG.info("Enumerating databases and tables for compute stats. "
+ "db_names={} table_names={} exclude_table_names={}
parallelism={}.".format(
+ str(db_names), str(table_names), str(exclude_table_names),
parallelism
+ ))
pool = multiprocessing.pool.ThreadPool(processes=parallelism)
futures = []
+
with client_factory() as impala_client:
+ db_table_map = {}
+ total_tables = 0
all_dbs = set(name.split('\t')[0].lower() for name
in impala_client.execute("show databases").data)
selected_dbs = all_dbs if db_names is None else set(db_names)
for db in all_dbs.intersection(selected_dbs):
- all_tables =\
- set([t.lower() for t in impala_client.execute("show tables in %s" %
db).data])
+ all_tables = set(
+ [t.lower() for t in impala_client.execute("show tables in %s" %
db).data])
selected_tables = all_tables if table_names is None else set(table_names)
- for table in all_tables.intersection(selected_tables):
+ excluded_tables = (set() if exclude_table_names is None
+ else set(exclude_table_names))
+ tables_to_compute = (all_tables.intersection(selected_tables)
+ - excluded_tables)
+ db_table_map[db] = tables_to_compute
+ total_tables += len(tables_to_compute)
+
+ for db, tables in db_table_map.items():
+ for table in tables:
# Submit command to threadpool
- futures.append(pool.apply_async(compute_stats_table,
- (client_factory, db, table, continue_on_error,)))
+ futures.append(
+ pool.apply_async(compute_stats_table, (client_factory, db, table,)))
+
# Wait for all stats commands to finish
+ completed = 0
for f in futures:
- f.get()
+ try:
+ f.get()
+ completed += 1
+ except Exception as e:
+ if not continue_on_error:
+ log_completion(completed, total_tables, e)
+ raise e
+ log_completion(completed, total_tables)
+
if __name__ == "__main__":
- logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s:
%(message)s')
- parser = OptionParser()
- parser.add_option("--continue_on_error", dest="continue_on_error",
- action="store_true", default=True, help="If True, continue
"\
- "if there is an error executing the compute stats
statement.")
- parser.add_option("--stop_on_error", dest="continue_on_error",
- action="store_false", default=True, help="If True, stop "\
- "if there is an error executing the compute stats
statement.")
- parser.add_option("--impalad", dest="impalad", default="localhost:21000",
- help="Impala daemon <host:port> to connect to.")
- parser.add_option("--use_kerberos", action="store_true", default=False,
- help="Compute stats on a kerberized cluster.")
- parser.add_option("--use_ssl", action="store_true", default=False,
- help="Compute stats on a cluster with SSL enabled.")
- parser.add_option("--parallelism", type=int,
default=multiprocessing.cpu_count(),
- help="Number of parallel compute stats commands.")
- parser.add_option("--db_names", dest="db_names", default=None,
- help="Comma-separated list of database names for which to
compute "\
- "stats. Can be used in conjunction with the --table_names
flag. "\
- "If not specified, compute stats will run on tables from
all "\
- "databases.")
- parser.add_option("--table_names", dest="table_names", default=None,
- help="Comma-separated list of table names to compute stats
over. A"\
- " substring comparison is done. If no tables are specified
stats "\
- "are computed across all tables.")
- options, args = parser.parse_args()
+ parser = ArgumentParser()
+ group_continuation_opt = parser.add_mutually_exclusive_group()
+ group_continuation_opt.add_argument(
+ "--continue_on_error", dest="continue_on_error", action="store_true",
+ help="If True, continue if there is an error executing the compute stats
statement.")
+ group_continuation_opt.add_argument(
+ "--stop_on_error", dest="continue_on_error", action="store_false",
+ help="If True, stop if there is an error executing the compute stats
statement.")
+ parser.add_argument(
+ "--impalad", dest="impalad", default="localhost:21050",
+ help="Impala daemon <host:hs2_port> to connect to.")
+ parser.add_argument(
+ "--use_kerberos", action="store_true", default=False,
+ help="Compute stats on a kerberized cluster.")
+ parser.add_argument(
+ "--use_ssl", action="store_true", default=False,
+ help="Compute stats on a cluster with SSL enabled.")
+ parser.add_argument(
+ "--parallelism", type=int, default=DEFAULT_PARALLELISM,
+ help="Number of parallel compute stats commands.")
+ parser.add_argument(
+ "--db_names", dest="db_names", default=None, help=(
+ "Comma-separated list of database names for which to compute stats. "
+ "Can be used in conjunction with the --table_names or
--exclude_table_names flag. "
+ "If not specified, compute stats will run on tables from all
databases."))
+ group_selection_opt = parser.add_mutually_exclusive_group()
+ group_selection_opt.add_argument(
+ "--table_names", dest="table_names", default=None, help=(
+ "Comma-separated list of table names to compute stats over. "
+ "A substring comparison is done. If no tables are specified stats are
computed "
+ "across all tables. Can not be used in conjunction with
--exclude_table_names."))
+ group_selection_opt.add_argument(
+ "--exclude_table_names", dest="exclude_table_names", default=None, help=(
+ "Comma-separated list of table names to exclude compute stats. "
+ "A substring comparison is done. If no tables are specified stats are
computed "
+ "across all tables. Can not be used in conjunction with --table_names."))
+ args = parser.parse_args()
+
table_names = None
- if options.table_names is not None:
- table_names = [name.lower().strip() for name in
options.table_names.split(',')]
+ if args.table_names is not None:
+ table_names = [name.lower().strip() for name in
args.table_names.split(',')]
+
+ exclude_table_names = None
+ if args.exclude_table_names is not None:
+ exclude_table_names = [name.lower().strip()
+ for name in args.exclude_table_names.split(',')]
db_names = None
- if options.db_names is not None:
- db_names = [name.lower().strip() for name in options.db_names.split(',')]
+ if args.db_names is not None:
+ db_names = [name.lower().strip() for name in args.db_names.split(',')]
@contextmanager
def client_factory():
- impala_client = ImpalaBeeswaxClient(options.impalad,
- use_kerberos=options.use_kerberos, use_ssl=options.use_ssl)
+ impala_client = create_connection(args.impalad,
+ use_kerberos=args.use_kerberos, use_ssl=args.use_ssl, protocol='hs2',
+ collect_profile_and_log=False)
impala_client.connect()
yield impala_client
- impala_client.close_connection()
+ impala_client.close()
compute_stats(client_factory, db_names=db_names, table_names=table_names,
- continue_on_error=options.continue_on_error,
parallelism=options.parallelism)
+ exclude_table_names=exclude_table_names,
+ continue_on_error=args.continue_on_error,
parallelism=args.parallelism)