This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b35ddc8c IMPALA-13051: Speed up, refactor query log tests
3b35ddc8c is described below

commit 3b35ddc8ca7b0e540fc16c413a170a25e164462b
Author: Michael Smith <[email protected]>
AuthorDate: Thu Apr 25 15:45:59 2024 -0700

    IMPALA-13051: Speed up, refactor query log tests
    
    Sets faster default shutdown_grace_period_s and shutdown_deadline_s when
    impalad_graceful_shutdown=True in tests. Impala waits until grace period
    has passed and all queries are stopped (or deadline is exceeded) before
    flushing the query log, so grace period of 0 is sufficient. Adds them in
    setup_method to reduce duplication in test declarations.
    
    Re-uses TQueryTableColumn Thrift definitions for testing.
    
    Moves waiting for query log table to exist to setup_method rather than
    as a side-effect of get_client.
    
    Refactors workload management code to reduce if-clause nesting.
    
    Adds functional query workload tests for both the sys.impala_query_log
    and the sys.impala_query_live tables to assert the names and order of
    the individual columns within each table.
    
    Renames the python tests for the sys.impala_query_log table removing the
    unnecessary "_query_log_table_" string from the name of each test.
    
    Change-Id: I1127ef041a3e024bf2b262767d56ec5f29bf3855
    Reviewed-on: http://gerrit.cloudera.org:8080/21358
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Riza Suminto <[email protected]>
---
 be/src/service/workload-management.cc              | 202 +++++-----
 .../QueryTest/workload-management-live.test        |  56 +++
 .../queries/QueryTest/workload-management-log.test |  56 +++
 tests/common/custom_cluster_test_suite.py          |   9 +-
 tests/custom_cluster/test_query_live.py            |  13 +-
 tests/custom_cluster/test_query_log.py             | 248 ++++++------
 tests/util/workload_management.py                  | 423 +++++++--------------
 7 files changed, 492 insertions(+), 515 deletions(-)

diff --git a/be/src/service/workload-management.cc 
b/be/src/service/workload-management.cc
index 15527fa97..a968c25af 100644
--- a/be/src/service/workload-management.cc
+++ b/be/src/service/workload-management.cc
@@ -395,113 +395,113 @@ void ImpalaServer::CompletedQueriesThread() {
         });
     completed_queries_ticker_->ResetWakeupGuard();
 
-    if (!completed_queries_.empty()) {
-      if (MaxRecordsExceeded(completed_queries_.size())) {
-        ImpaladMetrics::COMPLETED_QUERIES_MAX_RECORDS_WRITES->Increment(1L);
-      } else {
-        ImpaladMetrics::COMPLETED_QUERIES_SCHEDULED_WRITES->Increment(1L);
-      }
-
-      MonotonicStopWatch timer;
-      timer.Start();
+    if (completed_queries_.empty()) continue;
 
-      // Copy all completed queries to a temporary list so that inserts to the
-      // completed_queries list are not blocked while generating and running 
an insert
-      // SQL statement for the completed queries.
-      list<CompletedQuery> queries_to_insert;
-      queries_to_insert.splice(queries_to_insert.cend(), completed_queries_);
-      completed_queries_lock_.unlock();
+    if (MaxRecordsExceeded(completed_queries_.size())) {
+      ImpaladMetrics::COMPLETED_QUERIES_MAX_RECORDS_WRITES->Increment(1L);
+    } else {
+      ImpaladMetrics::COMPLETED_QUERIES_SCHEDULED_WRITES->Increment(1L);
+    }
 
-      string sql;
-      uint32_t max_row_size = 0;
-
-      for (auto iter = queries_to_insert.begin(); iter != 
queries_to_insert.end();
-          iter++) {
-        if (iter->insert_attempts_count >= 
FLAGS_query_log_max_insert_attempts) {
-          LOG(ERROR) << "could not write completed query table=\"" << 
log_table_name <<
-              "\" query_id=\"" << PrintId(iter->query->base_state->id) << "\"";
-          iter = queries_to_insert.erase(iter);
-          ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(-1);
-          continue;
-        }
-
-        // Increment the count of attempts to insert this query into the 
completed
-        // queries table.
-        iter->insert_attempts_count += 1;
-
-        const string row = QueryStateToSql(iter->query.get());
-        if (row.size() > max_row_size) {
-          max_row_size = row.size();
-        }
-
-        StrAppend(&sql, move(row), ",");
+    MonotonicStopWatch timer;
+    timer.Start();
+
+    // Copy all completed queries to a temporary list so that inserts to the
+    // completed_queries list are not blocked while generating and running an 
insert
+    // SQL statement for the completed queries.
+    list<CompletedQuery> queries_to_insert;
+    queries_to_insert.splice(queries_to_insert.cend(), completed_queries_);
+    completed_queries_lock_.unlock();
+
+    string sql;
+    uint32_t max_row_size = 0;
+
+    for (auto iter = queries_to_insert.begin(); iter != 
queries_to_insert.end();
+        iter++) {
+      if (iter->insert_attempts_count >= FLAGS_query_log_max_insert_attempts) {
+        LOG(ERROR) << "could not write completed query table=\"" << 
log_table_name <<
+            "\" query_id=\"" << PrintId(iter->query->base_state->id) << "\"";
+        iter = queries_to_insert.erase(iter);
+        ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(-1);
+        continue;
       }
 
-      DCHECK(ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() >=
-          queries_to_insert.size());
+      // Increment the count of attempts to insert this query into the 
completed
+      // queries table.
+      iter->insert_attempts_count += 1;
 
-      // In the case where queries_to_insert only contains records that have 
exceeded
-      // the max insert attempts, sql will be empty.
-      if (LIKELY(!sql.empty())) {
-        // Remove the last comma and determine the final sql statement length.
-        sql.pop_back();
-        const size_t final_sql_len = _insert_dml.size() + sql.size();
-
-        uint64_t gather_time = timer.Reset();
-        TUniqueId tmp_query_id;
-
-        // Build query options to ensure the query is not rejected.
-        InternalServer::QueryOptionMap opts = insert_query_opts;
-
-        if (UNLIKELY(final_sql_len > numeric_limits<int32_t>::max())) {
-          LOG(ERROR) << "Completed queries table insert sql statement of 
length '" <<
-              final_sql_len << "' was longer than the maximum of '" <<
-              numeric_limits<int32_t>::max() << "', skipping";
-          continue; // NOTE: early loop continuation
-        }
-
-        // Set max_statement_length_bytes based on actual query, and at least 
the minimum.
-        opts[TImpalaQueryOptions::MAX_STATEMENT_LENGTH_BYTES] = std::to_string(
-            max<size_t>(MIN_MAX_STATEMENT_LENGTH_BYTES, final_sql_len));
-        // Set statement_expression_limit based on actual query, and at least 
the minimum.
-        opts[TImpalaQueryOptions::STATEMENT_EXPRESSION_LIMIT] = std::to_string(
-            max<size_t>(MIN_STATEMENT_EXPRESSION_LIMIT,
-                queries_to_insert.size() * 
_TQueryTableColumn_VALUES_TO_NAMES.size()));
-        opts[TImpalaQueryOptions::MAX_ROW_SIZE] = std::to_string(max_row_size);
-
-        // Execute the insert dml.
-        const Status ret_status = internal_server_->ExecuteIgnoreResults(
-            FLAGS_workload_mgmt_user, StrCat(_insert_dml, sql), opts, false,
-            &tmp_query_id);
-
-        uint64_t exec_time = timer.ElapsedTime();
-        ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS->Update(
-            gather_time + exec_time);
-        if (ret_status.ok()) {
-          LOG(INFO) << "wrote completed queries table=\"" << log_table_name << 
"\" "
-              "record_count=" << queries_to_insert.size() << " "
-              "bytes=" << PrettyPrinter::PrintBytes(sql.size()) << " "
-              "gather_time=" << PrettyPrinter::Print(gather_time, 
TUnit::TIME_NS) << " "
-              "exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS);
-          ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(
-              queries_to_insert.size() * -1);
-          DCHECK(ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() >= 0);
-          ImpaladMetrics::COMPLETED_QUERIES_WRITTEN->Increment(
-              queries_to_insert.size());
-        } else {
-          LOG(WARNING) << "failed to write completed queries table=\"" << 
log_table_name
-              << "\" record_count=" << queries_to_insert.size() << " "
-              "bytes=" << PrettyPrinter::PrintBytes(sql.size()) << " "
-              "gather_time=" << PrettyPrinter::Print(gather_time, 
TUnit::TIME_NS) << " "
-              "exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS);
-          LOG(WARNING) << ret_status.GetDetail();
-          
ImpaladMetrics::COMPLETED_QUERIES_FAIL->Increment(queries_to_insert.size());
-          completed_queries_lock_.lock();
-          completed_queries_.splice(
-              completed_queries_.cend(), queries_to_insert);
-          completed_queries_lock_.unlock();
-        }
+      const string row = QueryStateToSql(iter->query.get());
+      if (row.size() > max_row_size) {
+        max_row_size = row.size();
       }
+
+      StrAppend(&sql, move(row), ",");
+    }
+
+    DCHECK(ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() >=
+        queries_to_insert.size());
+
+    // In the case where queries_to_insert only contains records that have 
exceeded
+    // the max insert attempts, sql will be empty.
+    if (UNLIKELY(sql.empty())) continue;
+
+    // Remove the last comma and determine the final sql statement length.
+    sql.pop_back();
+    const size_t final_sql_len = _insert_dml.size() + sql.size();
+
+    uint64_t gather_time = timer.Reset();
+    TUniqueId tmp_query_id;
+
+    // Build query options to ensure the query is not rejected.
+    InternalServer::QueryOptionMap opts = insert_query_opts;
+
+    if (UNLIKELY(final_sql_len > numeric_limits<int32_t>::max())) {
+      LOG(ERROR) << "Completed queries table insert sql statement of length '" 
<<
+          final_sql_len << "' was longer than the maximum of '" <<
+          numeric_limits<int32_t>::max() << "', skipping";
+      continue; // NOTE: early loop continuation
+    }
+
+    // Set max_statement_length_bytes based on actual query, and at least the 
minimum.
+    opts[TImpalaQueryOptions::MAX_STATEMENT_LENGTH_BYTES] = std::to_string(
+        max<size_t>(MIN_MAX_STATEMENT_LENGTH_BYTES, final_sql_len));
+    // Set statement_expression_limit based on actual query, and at least the 
minimum.
+    opts[TImpalaQueryOptions::STATEMENT_EXPRESSION_LIMIT] = std::to_string(
+        max<size_t>(MIN_STATEMENT_EXPRESSION_LIMIT,
+            queries_to_insert.size() * 
_TQueryTableColumn_VALUES_TO_NAMES.size()));
+    opts[TImpalaQueryOptions::MAX_ROW_SIZE] = std::to_string(max_row_size);
+
+    // Execute the insert dml.
+    const Status ret_status = internal_server_->ExecuteIgnoreResults(
+        FLAGS_workload_mgmt_user, StrCat(_insert_dml, sql), opts, false,
+        &tmp_query_id);
+
+    uint64_t exec_time = timer.ElapsedTime();
+    ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS->Update(
+        gather_time + exec_time);
+    if (ret_status.ok()) {
+      LOG(INFO) << "wrote completed queries table=\"" << log_table_name << "\" 
"
+          "record_count=" << queries_to_insert.size() << " "
+          "bytes=" << PrettyPrinter::PrintBytes(sql.size()) << " "
+          "gather_time=" << PrettyPrinter::Print(gather_time, TUnit::TIME_NS) 
<< " "
+          "exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS);
+      ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(
+          queries_to_insert.size() * -1);
+      DCHECK(ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() >= 0);
+      ImpaladMetrics::COMPLETED_QUERIES_WRITTEN->Increment(
+          queries_to_insert.size());
+    } else {
+      LOG(WARNING) << "failed to write completed queries table=\"" << 
log_table_name
+          << "\" record_count=" << queries_to_insert.size() << " "
+          "bytes=" << PrettyPrinter::PrintBytes(sql.size()) << " "
+          "gather_time=" << PrettyPrinter::Print(gather_time, TUnit::TIME_NS) 
<< " "
+          "exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS);
+      LOG(WARNING) << ret_status.GetDetail();
+      
ImpaladMetrics::COMPLETED_QUERIES_FAIL->Increment(queries_to_insert.size());
+      completed_queries_lock_.lock();
+      completed_queries_.splice(
+          completed_queries_.cend(), queries_to_insert);
+      completed_queries_lock_.unlock();
     }
   }
 } // ImpalaServer::CompletedQueriesThread
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/workload-management-live.test
 
b/testdata/workloads/functional-query/queries/QueryTest/workload-management-live.test
new file mode 100644
index 000000000..8d4ef0c13
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/workload-management-live.test
@@ -0,0 +1,56 @@
+====
+---- QUERY
+describe sys.impala_query_live
+---- RESULTS
+'cluster_id','string',''
+'query_id','string',''
+'session_id','string',''
+'session_type','string',''
+'hiveserver2_protocol_version','string',''
+'db_user','string',''
+'db_user_connection','string',''
+'db_name','string',''
+'impala_coordinator','string',''
+'query_status','string',''
+'query_state','string',''
+'impala_query_end_state','string',''
+'query_type','string',''
+'network_address','string',''
+'start_time_utc','timestamp',''
+'total_time_ms','decimal(18,3)',''
+'query_opts_config','string',''
+'resource_pool','string',''
+'per_host_mem_estimate','bigint',''
+'dedicated_coord_mem_estimate','bigint',''
+'per_host_fragment_instances','string',''
+'backends_count','int',''
+'admission_result','string',''
+'cluster_memory_admitted','bigint',''
+'executor_group','string',''
+'executor_groups','string',''
+'exec_summary','string',''
+'num_rows_fetched','bigint',''
+'row_materialization_rows_per_sec','bigint',''
+'row_materialization_time_ms','decimal(18,3)',''
+'compressed_bytes_spilled','bigint',''
+'event_planning_finished','decimal(18,3)',''
+'event_submit_for_admission','decimal(18,3)',''
+'event_completed_admission','decimal(18,3)',''
+'event_all_backends_started','decimal(18,3)',''
+'event_rows_available','decimal(18,3)',''
+'event_first_row_fetched','decimal(18,3)',''
+'event_last_row_fetched','decimal(18,3)',''
+'event_unregister_query','decimal(18,3)',''
+'read_io_wait_total_ms','decimal(18,3)',''
+'read_io_wait_mean_ms','decimal(18,3)',''
+'bytes_read_cache_total','bigint',''
+'bytes_read_total','bigint',''
+'pernode_peak_mem_min','bigint',''
+'pernode_peak_mem_max','bigint',''
+'pernode_peak_mem_mean','bigint',''
+'sql','string',''
+'plan','string',''
+'tables_queried','string',''
+---- TYPES
+string,string,string
+====
\ No newline at end of file
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/workload-management-log.test
 
b/testdata/workloads/functional-query/queries/QueryTest/workload-management-log.test
new file mode 100644
index 000000000..0009df034
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/workload-management-log.test
@@ -0,0 +1,56 @@
+====
+---- QUERY
+describe sys.impala_query_log
+---- RESULTS
+'cluster_id','string','','true'
+'query_id','string','','true'
+'session_id','string','','true'
+'session_type','string','','true'
+'hiveserver2_protocol_version','string','','true'
+'db_user','string','','true'
+'db_user_connection','string','','true'
+'db_name','string','','true'
+'impala_coordinator','string','','true'
+'query_status','string','','true'
+'query_state','string','','true'
+'impala_query_end_state','string','','true'
+'query_type','string','','true'
+'network_address','string','','true'
+'start_time_utc','timestamp','','true'
+'total_time_ms','decimal(18,3)','','true'
+'query_opts_config','string','','true'
+'resource_pool','string','','true'
+'per_host_mem_estimate','bigint','','true'
+'dedicated_coord_mem_estimate','bigint','','true'
+'per_host_fragment_instances','string','','true'
+'backends_count','int','','true'
+'admission_result','string','','true'
+'cluster_memory_admitted','bigint','','true'
+'executor_group','string','','true'
+'executor_groups','string','','true'
+'exec_summary','string','','true'
+'num_rows_fetched','bigint','','true'
+'row_materialization_rows_per_sec','bigint','','true'
+'row_materialization_time_ms','decimal(18,3)','','true'
+'compressed_bytes_spilled','bigint','','true'
+'event_planning_finished','decimal(18,3)','','true'
+'event_submit_for_admission','decimal(18,3)','','true'
+'event_completed_admission','decimal(18,3)','','true'
+'event_all_backends_started','decimal(18,3)','','true'
+'event_rows_available','decimal(18,3)','','true'
+'event_first_row_fetched','decimal(18,3)','','true'
+'event_last_row_fetched','decimal(18,3)','','true'
+'event_unregister_query','decimal(18,3)','','true'
+'read_io_wait_total_ms','decimal(18,3)','','true'
+'read_io_wait_mean_ms','decimal(18,3)','','true'
+'bytes_read_cache_total','bigint','','true'
+'bytes_read_total','bigint','','true'
+'pernode_peak_mem_min','bigint','','true'
+'pernode_peak_mem_max','bigint','','true'
+'pernode_peak_mem_mean','bigint','','true'
+'sql','string','','true'
+'plan','string','','true'
+'tables_queried','string','','true'
+---- TYPES
+string,string,string,string
+====
\ No newline at end of file
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index 9a8626391..914d56603 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -157,6 +157,13 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
   def setup_method(self, method):
     cluster_args = list()
+    if method.__dict__.get(IMPALAD_GRACEFUL_SHUTDOWN, False):
+      # IMPALA-13051: Add faster default graceful shutdown options before 
processing
+      # explicit args. Impala doesn't start graceful shutdown until the grace 
period has
+      # passed, and most tests that use graceful shutdown are testing flushing 
the query
+      # log, which doesn't start until after the grace period has passed.
+      cluster_args.append(
+          "--impalad=--shutdown_grace_period_s=0 --shutdown_deadline_s=15")
     for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS, 
ADMISSIOND_ARGS, JVM_ARGS]:
       if arg in method.__dict__:
         cluster_args.append("--%s=%s " % (arg, method.__dict__[arg]))
@@ -218,7 +225,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       super(CustomClusterTestSuite, self).setup_class()
 
   def teardown_method(self, method):
-    if method is not None and method.__dict__.get(IMPALAD_GRACEFUL_SHUTDOWN, 
False):
+    if method.__dict__.get(IMPALAD_GRACEFUL_SHUTDOWN, False):
       for impalad in self.cluster.impalads:
         impalad.kill(SIGRTMIN)
       for impalad in self.cluster.impalads:
diff --git a/tests/custom_cluster/test_query_live.py 
b/tests/custom_cluster/test_query_live.py
index f315f402b..a93aca3ae 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -31,6 +31,10 @@ from time import sleep
 class TestQueryLive(CustomClusterTestSuite):
   """Tests to assert the query live table is correctly populated."""
 
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
   def setup_method(self, method):
     super(TestQueryLive, self).setup_method(method)
     create_match = self.assert_impalad_log_contains("INFO", 
r'\]\s+(\w+:\w+)\]\s+'
@@ -74,6 +78,13 @@ class TestQueryLive(CustomClusterTestSuite):
         assert False, "did not find host {}".format(host)
     assert len(actual_hosts) == 0, "did not find all expected hosts"
 
+  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+                                                 
"--cluster_id=test_query_live",
+                                    catalogd_args="--enable_workload_mgmt")
+  def test_table_structure(self, vector):
+    """Asserts that the live table has the expected columns."""
+    self.run_test_case('QueryTest/workload-management-live', vector)
+
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--cluster_id=test_query_live",
                                     catalogd_args="--enable_workload_mgmt")
@@ -371,8 +382,6 @@ class TestQueryLive(CustomClusterTestSuite):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60 "
                                                  
"--cluster_id=test_query_live",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True,
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index a5c1f87a2..4a8852509 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -34,8 +34,7 @@ from tests.common.impala_test_suite import 
IMPALAD_HS2_HOST_PORT
 from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.retry import retry
-from tests.util.workload_management import assert_query, 
COMPRESSED_BYTES_SPILLED, \
-    BYTES_READ_CACHE_TOTAL
+from tests.util.workload_management import assert_query
 from time import sleep, time
 
 
@@ -55,23 +54,19 @@ class TestQueryLogTableBase(CustomClusterTestSuite):
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol',
         cls.PROTOCOL_BEESWAX, cls.PROTOCOL_HS2))
 
-  def get_client(self, protocol, query_table_name=QUERY_TBL):
-    """Retrieves the default Impala client for the specified protocol. This 
client is
-       automatically closed after the test completes. Also ensures the 
completed queries
-       table has been successfully created by checking the logs to verify the 
create
-       table sql has finished."""
-
+  def setup_method(self, method):
+    super(TestQueryLogTableBase, self).setup_method(method)
     # These tests run very quickly and can actually complete before Impala has 
finished
     # creating the completed queries table. Thus, to make these tests more 
robust, this
     # code checks to make sure the table create has finished before returning.
-    create_re = r'\]\s+(\w+:\w+)\]\s+Analyzing query: CREATE TABLE IF NOT 
EXISTS {}' \
-        .format(query_table_name)
-    create_match = self.assert_impalad_log_contains("INFO", create_re)
-
-    finish_re = r'Query successfully unregistered: query_id={}' \
-        .format(create_match.group(1))
-    self.assert_impalad_log_contains("INFO", finish_re)
+    create_match = self.assert_impalad_log_contains("INFO", 
r'\]\s+(\w+:\w+)\]\s+'
+        r'Analyzing query: CREATE TABLE IF NOT EXISTS 
{}'.format(self.QUERY_TBL))
+    self.assert_impalad_log_contains("INFO", r'Query successfully 
unregistered: '
+        r'query_id={}'.format(create_match.group(1)))
 
+  def get_client(self, protocol):
+    """Retrieves the default Impala client for the specified protocol. This 
client is
+       automatically closed after the test completes."""
     if protocol == self.PROTOCOL_BEESWAX:
       return self.client
     elif protocol == self.PROTOCOL_HS2:
@@ -89,25 +84,38 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('protocol') == 'beeswax')
 
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
   CACHE_DIR = tempfile.mkdtemp(prefix="cache_dir")
   MAX_SQL_PLAN_LEN = 2000
   LOG_DIR_MAX_WRITES = tempfile.mkdtemp(prefix="max_writes")
   FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_max_records_" + 
str(int(time()))
   FLUSH_MAX_RECORDS_QUERY_COUNT = 30
-  OTHER_TBL = "completed_queries_table_{0}".format(int(time()))
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
                                                  "--cluster_id=test_max_select 
"
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60 "
                                                  
"--query_log_max_sql_length={0} "
                                                  
"--query_log_max_plan_length={0}"
                                                  .format(MAX_SQL_PLAN_LEN),
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_lower_max_sql_plan(self, vector):
-    """Asserts that lower limits on the sql and plan columns in the completed 
queries
+  def test_table_structure(self, vector):
+    """Asserts that the log table has the expected columns."""
+    self.run_test_case('QueryTest/workload-management-log', vector)
+
+  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+                                                 
"--query_log_write_interval_s=1 "
+                                                 "--cluster_id=test_max_select 
"
+                                                 
"--query_log_max_sql_length={0} "
+                                                 
"--query_log_max_plan_length={0}"
+                                                 .format(MAX_SQL_PLAN_LEN),
+                                    catalogd_args="--enable_workload_mgmt",
+                                    impalad_graceful_shutdown=True)
+  def test_lower_max_sql_plan(self, vector):
+    """Asserts that length limits on the sql and plan columns in the completed 
queries
        table are respected."""
     client = self.get_client(vector.get_value('protocol'))
     rand_long_str = "".join(choice(string.ascii_letters) for _ in
@@ -139,12 +147,10 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
-                                                 "--cluster_id=test_max_select 
"
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60",
+                                                 
"--cluster_id=test_max_select",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_over_max_sql_plan(self, vector):
+  def test_sql_plan_too_long(self, vector):
     """Asserts that very long queries have their corresponding plan and sql 
columns
        shortened in the completed queries table."""
     client = self.get_client(vector.get_value('protocol'))
@@ -177,15 +183,13 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
                                                  
"--cluster_id=test_query_hist_1 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60 "
                                                  "--query_log_size=0 "
                                                  "--query_log_size_in_bytes=0",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_no_query_log_select(self, vector):
+  def test_no_query_log(self, vector):
     """Asserts queries are written to the completed queries table when the 
in-memory
-    query log is turned off."""
+       query log queue is turned off."""
     client = self.get_client(vector.get_value('protocol'))
 
     # Run a select query.
@@ -208,14 +212,12 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
                                                  
"--cluster_id=test_query_hist_2 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60 "
                                                  "--always_use_data_cache "
                                                  
"--data_cache={0}:5GB".format(CACHE_DIR),
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True,
                                     cluster_size=1)
-  def test_query_log_table_query_cache(self, vector):
+  def test_query_data_cache(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile. Specifically focuses on the data cache metrics."""
     client = self.get_client(vector.get_value('protocol'))
@@ -248,17 +250,15 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     # for this test to not actually assert anything different than other 
tests. Thus, an
     # additional assert is needed to ensure that there actually was data read 
from the
     # cache.
-    assert data[BYTES_READ_CACHE_TOTAL] != "0", "bytes read from cache total 
was " \
+    assert data["BYTES_READ_CACHE_TOTAL"] != "0", "bytes read from cache total 
was " \
         "zero, test did not assert anything"
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=5 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60 ",
+                                                 
"--query_log_write_interval_s=5",
                                     impala_log_dir=LOG_DIR_MAX_WRITES,
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_max_attempts_exceeded(self, vector):
+  def test_max_attempts_exceeded(self, vector):
     """Asserts that completed queries are only attempted 3 times to be 
inserted into the
        completed queries table. This test deletes the completed queries table 
thus it must
        not come last otherwise the table stays deleted. Subsequent tests will 
re-create
@@ -303,8 +303,6 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  "--query_log_max_queued={0} "
                                                  
"--query_log_write_interval_s=9999 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60 "
                                                  "--cluster_id={1}"
                                                  
.format(FLUSH_MAX_RECORDS_QUERY_COUNT,
                                                  FLUSH_MAX_RECORDS_CLUSTER_ID),
@@ -312,7 +310,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                     default_query_options=[
                                       ('statement_expression_limit', 1024)],
                                     impalad_graceful_shutdown=True)
-  def test_query_log_flush_max_records(self, vector):
+  def test_flush_on_queued_count_exceeded(self, vector):
     """Asserts that queries that have completed are written to the query log 
table when
        the maximum number of queued records it reached. Also verifies that 
writing
        completed queries is not limited by default 
statement_expression_limit."""
@@ -369,46 +367,12 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
         "impala-server.completed-queries.queued") == 2
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=30 "
-                                                 
"--blacklisted_dbs=information_schema "
-                                                 "--query_log_table_name={0}"
-                                                 .format(OTHER_TBL),
-                                    catalogd_args="--enable_workload_mgmt "
-                                                  
"--blacklisted_dbs=information_schema",
-                                    impalad_graceful_shutdown=True)
-  def test_query_log_table_different_table(self, vector):
-    """Asserts that the completed queries table can be renamed."""
-
-    client = self.get_client(vector.get_value('protocol'),
-        "{}.{}".format(self.WM_DB, self.OTHER_TBL))
-
-    try:
-      res = client.execute("show tables in {0}".format(self.WM_DB))
-      assert res.success
-      assert len(res.data) > 0, "could not find any tables in database {0}" \
-          .format(self.WM_DB)
-
-      tbl_found = False
-      for tbl in res.data:
-        if tbl.startswith(self.OTHER_TBL):
-          tbl_found = True
-          break
-      assert tbl_found, "could not find table '{0}' in database '{1}'" \
-          .format(self.OTHER_TBL, self.WM_DB)
-    finally:
-      client.execute("drop table {0}.{1} purge".format(self.WM_DB, 
self.OTHER_TBL))
-
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60",
+                                                 
"--query_log_write_interval_s=1",
                                     cluster_size=3,
                                     num_exclusive_coordinators=2,
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_query_select_dedicate_coordinator(self, vector):
+  def test_dedicated_coordinator_no_mt_dop(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile when dedicated coordinators are used."""
     client = self.get_client(vector.get_value('protocol'))
@@ -430,14 +394,12 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
       client2.close()
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60",
+                                                 
"--query_log_write_interval_s=1",
                                     cluster_size=3,
                                     num_exclusive_coordinators=2,
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_query_select_mt_dop(self, vector):
+  def test_dedicated_coordinator_with_mt_dop(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile when dedicated coordinators are used along with an MT_DOP 
setting
        greater than 0."""
@@ -461,6 +423,49 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
       client2.close()
 
 
+class TestQueryLogOtherTable(TestQueryLogTableBase):
+  """Tests to assert that query_log_table_name works with non-default value."""
+
+  OTHER_TBL = "completed_queries_table_{0}".format(int(time()))
+  # Used in TestQueryLogTableBase.setup_method
+  QUERY_TBL = "{0}.{1}".format(TestQueryLogTableBase.WM_DB, OTHER_TBL)
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestQueryLogOtherTable, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('protocol') == 'beeswax')
+
+  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+                                                 
"--query_log_write_interval_s=1 "
+                                                 
"--blacklisted_dbs=information_schema "
+                                                 "--query_log_table_name={0}"
+                                                 .format(OTHER_TBL),
+                                    catalogd_args="--enable_workload_mgmt "
+                                                  
"--blacklisted_dbs=information_schema",
+                                    impalad_graceful_shutdown=True)
+  def test_renamed_log_table(self, vector):
+    """Asserts that the completed queries table can be renamed."""
+
+    client = self.get_client(vector.get_value('protocol'))
+
+    try:
+      res = client.execute("show tables in {0}".format(self.WM_DB))
+      assert res.success
+      assert len(res.data) > 0, "could not find any tables in database {0}" \
+          .format(self.WM_DB)
+
+      tbl_found = False
+      for tbl in res.data:
+        if tbl.startswith(self.OTHER_TBL):
+          tbl_found = True
+          break
+      assert tbl_found, "could not find table '{0}' in database '{1}'" \
+          .format(self.OTHER_TBL, self.WM_DB)
+    finally:
+      client.execute("drop table {0}.{1} purge".format(self.WM_DB, 
self.OTHER_TBL))
+
+
 class TestQueryLogTableHS2(TestQueryLogTableBase):
   """Tests to assert the query log table is correctly populated when using the 
HS2
      client protocol."""
@@ -475,14 +480,12 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
-                                                 "--cluster_id={} "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60"
+                                                 "--cluster_id={}"
                                                  
.format(HS2_OPERATIONS_CLUSTER_ID),
                                     catalogd_args="--enable_workload_mgmt",
                                     cluster_size=2,
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_hs2_operations(self, vector):
+  def test_hs2_metadata_operations(self, vector):
     """Certain HS2 operations appear to Impala as a special kind of query. 
Specifically,
        these operations have a type of unknown and a normally invalid sql 
syntax. This
        test asserts those queries are not written to the completed queries 
table since
@@ -605,13 +608,11 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_mult "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60",
+                                                 
"--cluster_id=test_query_hist_mult",
                                     catalogd_args="--enable_workload_mgmt",
                                     cluster_size=2,
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_query_multiple(self, vector):
+  def test_query_multiple_tables(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile for a query that reads from multiple tables."""
     client = self.get_client(vector.get_value('protocol'))
@@ -636,12 +637,10 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_3 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60",
+                                                 
"--cluster_id=test_query_hist_3",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_query_insert_select(self, vector, unique_database,
+  def test_insert_select(self, vector, unique_database,
       unique_name):
     """Asserts the values written to the query log table match the values from 
the
        query profile for a query that insert selects."""
@@ -670,12 +669,10 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
       client2.close()
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=15 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60",
+                                                 
"--query_log_write_interval_s=15",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_flush_interval(self, vector):
+  def test_flush_on_interval(self, vector):
     """Asserts that queries that have completed are written to the query log 
table
        after the specified write interval elapses."""
 
@@ -694,10 +691,10 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=9999 "
-                                                 "--shutdown_grace_period_s=1 "
-                                                 "--shutdown_deadline_s=30",
+                                                 "--shutdown_grace_period_s=0 "
+                                                 "--shutdown_deadline_s=15",
                                     catalogd_args="--enable_workload_mgmt")
-  def test_query_log_table_flush_on_shutdown(self, vector):
+  def test_flush_on_shutdown(self, vector):
     """Asserts that queries that have completed but are not yet written to the 
query
        log table are flushed to the table before the coordinator exits. 
Graceful shutdown
        for 2nd coordinator not needed because query_log_write_interval_s is 
very long."""
@@ -734,7 +731,7 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
 
         return success
 
-      assert retry(func=assert_func, max_attempts=5, sleep_time_s=5)
+      assert retry(func=assert_func, max_attempts=5, sleep_time_s=3)
     finally:
       client2.close()
 
@@ -745,12 +742,10 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_2 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60",
+                                                 
"--cluster_id=test_query_hist_2",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_ddl(self, vector, unique_database, unique_name):
+  def test_ddl(self, vector, unique_database, unique_name):
     """Asserts the values written to the query log table match the values from 
the
        query profile for a DDL query."""
     create_tbl_sql = "create table {0}.{1} (id INT, product_name STRING) " \
@@ -773,12 +768,10 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_3 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60",
+                                                 
"--cluster_id=test_query_hist_3",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_dml(self, vector, unique_database, unique_name):
+  def test_dml(self, vector, unique_database, unique_name):
     """Asserts the values written to the query log table match the values from 
the
        query profile for a DML query."""
     tbl_name = "{0}.{1}".format(unique_database, unique_name)
@@ -808,12 +801,10 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_2 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60 ",
+                                                 
"--cluster_id=test_query_hist_2",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_invalid_query(self, vector):
+  def test_invalid_query(self, vector):
     """Asserts correct values are written to the completed queries table for a 
failed
        query. The query profile is used as the source of expected values."""
     client = self.get_client(vector.get_value('protocol'))
@@ -841,12 +832,10 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
         expected_cluster_id="test_query_hist_2", impalad=impalad, 
query_id=result.data[0])
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60",
+                                                 
"--query_log_write_interval_s=1",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_ignored_sqls(self, vector):
+  def test_ignored_sqls_not_written(self, vector):
     """Asserts that expected queries are not written to the query log table."""
     client = self.get_client(vector.get_value('protocol'))
 
@@ -935,12 +924,10 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
         "impala-server.completed-queries.failure") == 0
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60",
+                                                 
"--query_log_write_interval_s=1",
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_sql_injection(self, vector):
+  def test_sql_injection_attempts(self, vector):
     client = self.get_client(vector.get_value('protocol'))
     impalad = self.cluster.get_first_impalad()
 
@@ -958,14 +945,21 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
 
     # Attempt to cause an error using multiline comments.
     sql3_str = "select 1' /* foo"
-    self.__run_sql_inject(impalad, client, sql3_str, "multiline comments", 10, 
False)
+    self.__run_sql_inject(impalad, client, sql3_str, "multiline comments", 11, 
False)
 
     # Attempt to cause an error using single line comments.
     sql4_str = "select 1' -- foo"
-    self.__run_sql_inject(impalad, client, sql4_str, "single line comments", 
13, False)
+    self.__run_sql_inject(impalad, client, sql4_str, "single line comments", 
15, False)
 
   def __run_sql_inject(self, impalad, client, sql, test_case, expected_writes,
                        expect_success=True):
+    # Capture coordinators "now" so we match only queries in this test case.
+    start_time = None
+    if not expect_success:
+      utc_timestamp = self.execute_query('select utc_timestamp()')
+      assert len(utc_timestamp.data) == 1
+      start_time = utc_timestamp.data[0]
+
     sql_result = None
     try:
       sql_result = client.execute(sql)
@@ -993,11 +987,11 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
                                         .format(sql_result.query_id, test_case)
       assert sql_verify.data[0] == sql, test_case
     else:
+      assert start_time is not None
       esc_sql = sql.replace("'", "\\'")
       sql_verify = client.execute("select sql from {0} where sql='{1}' "
-                                  "and start_time_utc > "
-                                  "date_sub(utc_timestamp(), interval 25 
seconds);"
-                                  .format(self.QUERY_TBL, esc_sql))
+                                  "and start_time_utc > '{2}'"
+                                  .format(self.QUERY_TBL, esc_sql, start_time))
       assert sql_verify.success, test_case
       assert len(sql_verify.data) == 1, "did not find query '{0}' in query log 
" \
                                         "table for test case '{1}" \
@@ -1018,13 +1012,11 @@ class 
TestQueryLogTableBufferPool(TestQueryLogTableBase):
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
                                                  
"--query_log_write_interval_s=1 "
                                                  
"--cluster_id=test_query_hist_1 "
-                                                 "--shutdown_grace_period_s=10 
"
-                                                 "--shutdown_deadline_s=60 "
                                                  "--scratch_dirs={0}:5G"
                                                  .format(SCRATCH_DIR),
                                     catalogd_args="--enable_workload_mgmt",
                                     impalad_graceful_shutdown=True)
-  def test_query_log_table_query_select(self, vector):
+  def test_select(self, vector):
     """Asserts the values written to the query log table match the values from 
the
        query profile. If the buffer_pool_limit parameter is not None, then 
this test
        requires that the query spills to disk to assert that the spill metrics 
are correct
@@ -1066,5 +1058,5 @@ class TestQueryLogTableBufferPool(TestQueryLogTableBase):
       # potential for this test to not actually assert anything different than 
other
       # tests. Thus, an additional assert is needed to ensure that there 
actually was
       # data that was spilled.
-      assert data[COMPRESSED_BYTES_SPILLED] != "0", "compressed bytes spilled 
total " \
+      assert data["COMPRESSED_BYTES_SPILLED"] != "0", "compressed bytes 
spilled total " \
           "was zero, test did not assert anything"
diff --git a/tests/util/workload_management.py 
b/tests/util/workload_management.py
index 772041aa7..876d6c239 100644
--- a/tests/util/workload_management.py
+++ b/tests/util/workload_management.py
@@ -23,59 +23,9 @@ import requests
 from datetime import datetime
 from tests.util.assert_time import assert_time_str, convert_to_milliseconds
 from tests.util.memory import assert_byte_str, convert_to_bytes
+from SystemTables.ttypes import TQueryTableColumn
 
 DEDICATED_COORD_SAFETY_BUFFER_BYTES = 104857600
-EXPECTED_QUERY_COLS = 49
-
-CLUSTER_ID = "CLUSTER_ID"
-QUERY_ID = "QUERY_ID"
-SESSION_ID = "SESSION_ID"
-SESSION_TYPE = "SESSION_TYPE"
-HIVESERVER2_PROTOCOL_VERSION = "HIVESERVER2_PROTOCOL_VERSION"
-DB_USER = "DB_USER"
-DB_USER_CONNECTION = "DB_USER_CONNECTION"
-DB_NAME = "DB_NAME"
-IMPALA_COORDINATOR = "IMPALA_COORDINATOR"
-QUERY_STATUS = "QUERY_STATUS"
-QUERY_STATE = "QUERY_STATE"
-IMPALA_QUERY_END_STATE = "IMPALA_QUERY_END_STATE"
-QUERY_TYPE = "QUERY_TYPE"
-NETWORK_ADDRESS = "NETWORK_ADDRESS"
-START_TIME_UTC = "START_TIME_UTC"
-TOTAL_TIME_MS = "TOTAL_TIME_MS"
-QUERY_OPTS_CONFIG = "QUERY_OPTS_CONFIG"
-RESOURCE_POOL = "RESOURCE_POOL"
-PER_HOST_MEM_ESTIMATE = "PER_HOST_MEM_ESTIMATE"
-DEDICATED_COORD_MEM_ESTIMATE = "DEDICATED_COORD_MEM_ESTIMATE"
-PER_HOST_FRAGMENT_INSTANCES = "PER_HOST_FRAGMENT_INSTANCES"
-BACKENDS_COUNT = "BACKENDS_COUNT"
-ADMISSION_RESULT = "ADMISSION_RESULT"
-CLUSTER_MEMORY_ADMITTED = "CLUSTER_MEMORY_ADMITTED"
-EXECUTOR_GROUP = "EXECUTOR_GROUP"
-EXECUTOR_GROUPS = "EXECUTOR_GROUPS"
-EXEC_SUMMARY = "EXEC_SUMMARY"
-NUM_ROWS_FETCHED = "NUM_ROWS_FETCHED"
-ROW_MATERIALIZATION_ROWS_PER_SEC = "ROW_MATERIALIZATION_ROWS_PER_SEC"
-ROW_MATERIALIZATION_TIME_MS = "ROW_MATERIALIZATION_TIME_MS"
-COMPRESSED_BYTES_SPILLED = "COMPRESSED_BYTES_SPILLED"
-EVENT_PLANNING_FINISHED = "EVENT_PLANNING_FINISHED"
-EVENT_SUBMIT_FOR_ADMISSION = "EVENT_SUBMIT_FOR_ADMISSION"
-EVENT_COMPLETED_ADMISSION = "EVENT_COMPLETED_ADMISSION"
-EVENT_ALL_BACKENDS_STARTED = "EVENT_ALL_BACKENDS_STARTED"
-EVENT_ROWS_AVAILABLE = "EVENT_ROWS_AVAILABLE"
-EVENT_FIRST_ROW_FETCHED = "EVENT_FIRST_ROW_FETCHED"
-EVENT_LAST_ROW_FETCHED = "EVENT_LAST_ROW_FETCHED"
-EVENT_UNREGISTER_QUERY = "EVENT_UNREGISTER_QUERY"
-READ_IO_WAIT_TOTAL_MS = "READ_IO_WAIT_TOTAL_MS"
-READ_IO_WAIT_MEAN_MS = "READ_IO_WAIT_MEAN_MS"
-BYTES_READ_CACHE_TOTAL = "BYTES_READ_CACHE_TOTAL"
-BYTES_READ_TOTAL = "BYTES_READ_TOTAL"
-PERNODE_PEAK_MEM_MIN = "PERNODE_PEAK_MEM_MIN"
-PERNODE_PEAK_MEM_MAX = "PERNODE_PEAK_MEM_MAX"
-PERNODE_PEAK_MEM_MEAN = "PERNODE_PEAK_MEM_MEAN"
-SQL = "SQL"
-PLAN = "PLAN"
-TABLES_QUERIED = "TABLES_QUERIED"
 
 
 def round_to_3(val):
@@ -83,7 +33,6 @@ def round_to_3(val):
   # pylint: disable=round-builtin
   return round(val, 3)
 
-
 def assert_query(query_tbl, client, expected_cluster_id, raw_profile=None, 
impalad=None,
     query_id=None, max_mem_for_admission=None, max_row_size=None):
   """Helper function to assert that the values in the completed query log table
@@ -123,126 +72,102 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
 
   # Assert the expected columns were included.
   assert len(sql_results.data) == 1
-  assert len(sql_results.column_labels) == EXPECTED_QUERY_COLS
+  assert len(sql_results.column_labels) == 
len(TQueryTableColumn._VALUES_TO_NAMES)
   data = sql_results.data[0].split("\t")
   assert len(data) == len(sql_results.column_labels)
 
+  def column_val(index):
+    name = TQueryTableColumn._VALUES_TO_NAMES[index]
+    assert sql_results.column_labels[index] == name
+    ret_data[name] = data[index]
+    return data[index]
+
   # Cluster ID
-  index = 0
-  assert sql_results.column_labels[index] == CLUSTER_ID
-  ret_data[CLUSTER_ID] = data[index]
-  assert data[index] == expected_cluster_id, "cluster id incorrect"
+  assert column_val(TQueryTableColumn.CLUSTER_ID) == expected_cluster_id,\
+      "cluster id incorrect"
 
   # Query ID
-  index += 1
-  assert sql_results.column_labels[index] == QUERY_ID
-  ret_data[QUERY_ID] = data[index]
-  assert data[index] == query_id
+  assert column_val(TQueryTableColumn.QUERY_ID) == query_id
 
   # Session ID
-  index += 1
-  assert sql_results.column_labels[index] == SESSION_ID
-  ret_data[SESSION_ID] = data[index]
   session_id = re.search(r'\n\s+Session ID:\s+(.*)\n', profile_text)
   assert session_id is not None
-  assert data[index] == session_id.group(1), "session id incorrect"
+  assert column_val(TQueryTableColumn.SESSION_ID) == session_id.group(1),\
+      "session id incorrect"
 
   # Session Type
-  index += 1
-  assert sql_results.column_labels[index] == SESSION_TYPE
-  ret_data[SESSION_TYPE] = data[index]
   session_type = re.search(r'\n\s+Session Type:\s+(.*)\n', profile_text)
   assert session_type is not None
-  assert data[index] == session_type.group(1), "session type incorrect"
+  assert column_val(TQueryTableColumn.SESSION_TYPE) == session_type.group(1),\
+      "session type incorrect"
 
   # HS2 Protocol Version
-  index += 1
-  assert sql_results.column_labels[index] == HIVESERVER2_PROTOCOL_VERSION
-  ret_data[HIVESERVER2_PROTOCOL_VERSION] = data[index]
+  value = column_val(TQueryTableColumn.HIVESERVER2_PROTOCOL_VERSION)
   if session_type.group(1) == "HIVESERVER2":
     hs2_ver = re.search(r'\n\s+HiveServer2 Protocol Version:\s+(.*)', 
profile_text)
     assert hs2_ver is not None
-    assert data[index] == 
"HIVE_CLI_SERVICE_PROTOCOL_{0}".format(hs2_ver.group(1))
+    assert value == "HIVE_CLI_SERVICE_PROTOCOL_{0}".format(hs2_ver.group(1))
   else:
-    assert data[index] == ""
+    assert value == ""
 
   # Database User
-  index += 1
-  assert sql_results.column_labels[index] == DB_USER
-  ret_data[DB_USER] = data[index]
   user = re.search(r'\n\s+User:\s+(.*?)\n', profile_text)
   assert user is not None
-  assert data[index] == user.group(1), "db user incorrect"
+  assert column_val(TQueryTableColumn.DB_USER) == user.group(1), "db user 
incorrect"
 
   # Connected Database User
-  index += 1
-  assert sql_results.column_labels[index] == DB_USER_CONNECTION
-  ret_data[DB_USER_CONNECTION] = data[index]
   db_user = re.search(r'\n\s+Connected User:\s+(.*?)\n', profile_text)
   assert db_user is not None
-  assert data[index] == db_user.group(1), "db user connection incorrect"
+  assert column_val(TQueryTableColumn.DB_USER_CONNECTION) == db_user.group(1),\
+      "db user connection incorrect"
 
   # Database Name
-  index += 1
-  assert sql_results.column_labels[index] == DB_NAME
-  ret_data[DB_NAME] = data[index]
   default_db = re.search(r'\n\s+Default Db:\s+(.*?)\n', profile_text)
   assert default_db is not None
-  assert data[index] == default_db.group(1), "database name incorrect"
+  assert column_val(TQueryTableColumn.DB_NAME) == default_db.group(1),\
+      "database name incorrect"
 
   # Coordinator
-  index += 1
-  assert sql_results.column_labels[index] == IMPALA_COORDINATOR
-  ret_data[IMPALA_COORDINATOR] = data[index]
   coordinator = re.search(r'\n\s+Coordinator:\s+(.*?)\n', profile_text)
   assert coordinator is not None
-  assert data[index] == coordinator.group(1), "impala coordinator incorrect"
+  assert column_val(TQueryTableColumn.IMPALA_COORDINATOR) == 
coordinator.group(1),\
+      "impala coordinator incorrect"
 
   # Query Status (can be multiple lines if the query errored)
-  index += 1
-  assert sql_results.column_labels[index] == QUERY_STATUS
-  ret_data[QUERY_STATUS] = data[index]
   query_status = re.search(r'\n\s+Query Status:\s+(.*?)\n\s+Impala Version', 
profile_text,
       re.DOTALL)
   assert query_status is not None
-  assert data[index] == query_status.group(1), "query status incorrect"
+  assert column_val(TQueryTableColumn.QUERY_STATUS) == query_status.group(1),\
+      "query status incorrect"
 
   # Query State
-  index += 1
-  assert sql_results.column_labels[index] == QUERY_STATE
-  ret_data[QUERY_STATE] = data[index]
   query_state = re.search(r'\n\s+Query State:\s+(.*?)\n', profile_text)
   assert query_state is not None
   query_state_value = query_state.group(1)
-  assert data[index] == query_state_value, "query state incorrect"
+  assert column_val(TQueryTableColumn.QUERY_STATE) == query_state_value,\
+      "query state incorrect"
 
   # Impala Query End State
-  index += 1
-  assert sql_results.column_labels[index] == IMPALA_QUERY_END_STATE
-  ret_data[IMPALA_QUERY_END_STATE] = data[index]
   impala_query_state = re.search(r'\n\s+Impala Query State:\s+(.*?)\n', 
profile_text)
   assert impala_query_state is not None
-  assert data[index] == impala_query_state.group(1), "impala query end state 
incorrect"
+  assert column_val(TQueryTableColumn.IMPALA_QUERY_END_STATE) \
+      == impala_query_state.group(1), "impala query end state incorrect"
 
   # Query Type
-  index += 1
-  assert sql_results.column_labels[index] == QUERY_TYPE
-  ret_data[QUERY_TYPE] = data[index]
+  value = column_val(TQueryTableColumn.QUERY_TYPE)
   if query_state_value == "EXCEPTION":
-    assert data[index] == "UNKNOWN", "query type incorrect"
+    assert value == "UNKNOWN", "query type incorrect"
   else:
     query_type = re.search(r'\n\s+Query Type:\s+(.*?)\n', profile_text)
     assert query_type is not None
-    assert data[index] == query_type.group(1), "query type incorrect"
+    assert value == query_type.group(1), "query type incorrect"
     query_type = query_type.group(1)
 
   # Client Network Address
-  index += 1
-  assert sql_results.column_labels[index] == NETWORK_ADDRESS
-  ret_data[NETWORK_ADDRESS] = data[index]
   network_address = re.search(r'\n\s+Network Address:\s+(.*?)\n', profile_text)
   assert network_address is not None
-  assert data[index] == network_address.group(1), "network address incorrect"
+  assert column_val(TQueryTableColumn.NETWORK_ADDRESS) == 
network_address.group(1),\
+      "network address incorrect"
 
   # offset from UTC
   utc_now = datetime.utcnow().replace(microsecond=0, second=0)
@@ -250,15 +175,12 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
   utc_offset = utc_now - local_now
 
   # Start Time
-  index += 1
-  assert sql_results.column_labels[index] == START_TIME_UTC
-  ret_data[START_TIME_UTC] = data[index]
   start_time = re.search(r'\n\s+Start Time:\s+(.*?)\n', profile_text)
   assert start_time is not None
   start_time_obj = datetime.strptime(start_time.group(1)[:-3], "%Y-%m-%d 
%H:%M:%S.%f")
   start_time_obj_utc = start_time_obj + utc_offset
-  assert data[index][:-3] == start_time_obj_utc.strftime("%Y-%m-%d 
%H:%M:%S.%f"), \
-      "start time incorrect"
+  assert column_val(TQueryTableColumn.START_TIME_UTC)[:-3] \
+      == start_time_obj_utc.strftime("%Y-%m-%d %H:%M:%S.%f"), "start time 
incorrect"
 
   # End Time (not in table, but needed for duration calculation)
   end_time = re.search(r'\n\s+End Time:\s+(.*?)\n', profile_text)
@@ -266,79 +188,69 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
   end_time_obj = datetime.strptime(end_time.group(1)[:-3], "%Y-%m-%d 
%H:%M:%S.%f")
 
   # Query Duration (allow values that are within 1 second)
-  index += 1
-  assert sql_results.column_labels[index] == TOTAL_TIME_MS
-  ret_data[TOTAL_TIME_MS] = data[index]
+  value = column_val(TQueryTableColumn.TOTAL_TIME_MS)
   duration = end_time_obj - start_time_obj
   min_allowed = round_to_3(duration.total_seconds() * 1000 * 0.999)
   max_allowed = round_to_3(duration.total_seconds() * 1000 * 1.001)
-  assert min_allowed <= float(data[index]) <= max_allowed, "total time 
incorrect"
+  assert min_allowed <= float(value) <= max_allowed, "total time incorrect"
 
   # Query Options Set By Configuration
-  index += 1
-  assert sql_results.column_labels[index] == QUERY_OPTS_CONFIG
-  ret_data[QUERY_OPTS_CONFIG] = data[index]
+  value = column_val(TQueryTableColumn.QUERY_OPTS_CONFIG)
   if query_state_value == "EXCEPTION":
-    assert data[index] != "", "query options set by config incorrect"
+    assert value != "", "query options set by config incorrect"
   else:
     query_opts = re.search(r'\n\s+Query Options \(set by 
configuration\):\s+(.*?)\n',
         profile_text)
     assert query_opts is not None
-    assert data[index] == query_opts.group(1), "query opts set by config 
incorrect"
+    assert value == query_opts.group(1), "query opts set by config incorrect"
 
   # Resource Pool
-  index += 1
-  assert sql_results.column_labels[index] == RESOURCE_POOL
-  ret_data[RESOURCE_POOL] = data[index]
+  value = column_val(TQueryTableColumn.RESOURCE_POOL)
   if query_state_value == "EXCEPTION":
-    assert data[index] == "", "resource pool incorrect"
+    assert value == "", "resource pool incorrect"
   else:
     if query_type != "DDL":
       req_pool = re.search(r'\n\s+Request Pool:\s+(.*?)\n', profile_text)
       assert req_pool is not None
-      assert data[index] == req_pool.group(1), "request pool incorrect"
+      assert value == req_pool.group(1), "request pool incorrect"
     else:
-      assert data[index] == "", "request pool not empty"
+      assert value == "", "request pool not empty"
 
   # Per-host Memory Estimate
-  index += 1
-  assert sql_results.column_labels[index] == PER_HOST_MEM_ESTIMATE
-  ret_data[PER_HOST_MEM_ESTIMATE] = data[index]
+  value = column_val(TQueryTableColumn.PER_HOST_MEM_ESTIMATE)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
-    assert data[index] == "0", "per-host memory estimate incorrect"
+    assert value == "0", "per-host memory estimate incorrect"
   else:
     # First check the Estimated Per-Host Mem from the query profile. This 
value may not
     # match though because certain query options can cause this value to 
diverge from the
     # per-host memory estimate stored in the query history table.
     est_perhost_mem = re.search(r'\n\s+Estimated Per-Host Mem:\s+(\d+)\n', 
profile_text)
     assert est_perhost_mem is not None
-    if est_perhost_mem.group(1) != data[index]:
+    if est_perhost_mem.group(1) != value:
       # The profile and db values diverged, use the Per-Host Resource 
Estimates field from
       # the query profile as the expected value. Since query profile value is 
an estimate,
       # it's not as good to use, but it's all we have available.
       perhost_mem_est = re.search(r'\nPer-Host Resource 
Estimates:\s+Memory\=(.*?)\n',
           profile_text)
       assert perhost_mem_est is not None
-      assert_byte_str(expected_str=perhost_mem_est.group(1), 
actual_bytes=data[index],
+      assert_byte_str(expected_str=perhost_mem_est.group(1), 
actual_bytes=value,
           msg="per-host memory estimate incorrect", unit_combined=True)
 
   # Dedicated Coordinator Memory Estimate
   # This value is different because it is the minimum of the query option
   # MAX_MEM_ESTIMATE_FOR_ADMISSION or a calculation that includes a 100mb 
buffer.
   # Thus, callers must specify if the query being asserted had that option set.
-  index += 1
-  assert sql_results.column_labels[index] == DEDICATED_COORD_MEM_ESTIMATE
-  ret_data[DEDICATED_COORD_MEM_ESTIMATE] = data[index]
+  value = column_val(TQueryTableColumn.DEDICATED_COORD_MEM_ESTIMATE)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
-    assert data[index] == "0", "dedicated coordinator memory estimate 
incorrect"
+    assert value == "0", "dedicated coordinator memory estimate incorrect"
   elif query_type == "DML":
-    assert data[index] == str(DEDICATED_COORD_SAFETY_BUFFER_BYTES), \
+    assert value == str(DEDICATED_COORD_SAFETY_BUFFER_BYTES), \
         "dedicated coordinator memory estimate incorrect"
   else:
     if max_mem_for_admission is not None:
       # The MAX_MEM_ESTIMATE_FOR_ADMISSION query option was specified, thus 
that should
       # be the value that was written to the database.
-      assert str(max_mem_for_admission) == data[index], \
+      assert str(max_mem_for_admission) == value, \
           "dedicated coordinator memory estimate incorrect"
     else:
       root_mem = re.search(r'\n\nF\d+:PLAN FRAGMENT.*?mem-estimate=(\S+?) mem',
@@ -346,79 +258,67 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
       assert root_mem is not None, "dedicated coordinator memory estimate 
incorrect"
       buffer = DEDICATED_COORD_SAFETY_BUFFER_BYTES
       assert_byte_str(expected_str=root_mem.group(1),
-          actual_bytes=int(data[index]) - buffer,
+          actual_bytes=int(value) - buffer,
           msg="dedicated coordinator memory estimate incorrect", 
unit_combined=True)
 
   # Per-Host Fragment Instances
-  index += 1
-  assert sql_results.column_labels[index] == PER_HOST_FRAGMENT_INSTANCES
-  ret_data[PER_HOST_FRAGMENT_INSTANCES] = data[index]
+  value = column_val(TQueryTableColumn.PER_HOST_FRAGMENT_INSTANCES)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
-    assert data[index] == "", "per-host fragment instances incorrect"
+    assert value == "", "per-host fragment instances incorrect"
   else:
     perhost_frags = re.search(r'\n\s+Per Host Number of Fragment 
Instances:\s+(.*?)\n',
         profile_text)
     assert perhost_frags is not None
     expected = ",".join(sorted(perhost_frags.group(1).replace("(", "=")
         .replace(")", "").split(" ")))
-    assert data[index] == expected, ('per-host fragment instances incorrect.'
+    assert value == expected, ('per-host fragment instances incorrect.'
         ' expected="{0}" actual="{1}"').format(expected, data[index])
 
   # Backends Count
-  index += 1
-  assert sql_results.column_labels[index] == BACKENDS_COUNT
-  ret_data[BACKENDS_COUNT] = data[index]
+  value = column_val(TQueryTableColumn.BACKENDS_COUNT)
   num_bck = re.search(r'\n\s+\- NumBackends:\s+(\d+)', profile_text)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
     assert num_bck is None
-    assert data[index] == "0", "backends count incorrect"
+    assert value == "0", "backends count incorrect"
   else:
     assert num_bck is not None
-    assert data[index] == num_bck.group(1), "backends count incorrect"
+    assert value == num_bck.group(1), "backends count incorrect"
 
   # Admission Result
-  index += 1
-  assert sql_results.column_labels[index] == ADMISSION_RESULT
-  ret_data[ADMISSION_RESULT] = data[index]
+  value = column_val(TQueryTableColumn.ADMISSION_RESULT)
   adm_result = re.search(r'\n\s+Admission result:\s+(.*?)\n', profile_text)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
     assert adm_result is None
-    assert data[index] == "", "admission result incorrect"
+    assert value == "", "admission result incorrect"
   else:
     assert adm_result is not None
-    assert data[index] == adm_result.group(1), "admission result incorrect"
+    assert value == adm_result.group(1), "admission result incorrect"
 
   # Cluster Memory Admitted
-  index += 1
-  assert sql_results.column_labels[index] == CLUSTER_MEMORY_ADMITTED
-  ret_data[CLUSTER_MEMORY_ADMITTED] = data[index]
+  value = column_val(TQueryTableColumn.CLUSTER_MEMORY_ADMITTED)
   clust_mem = re.search(r'\n\s+Cluster Memory Admitted:\s+(.*?)\n', 
profile_text)
   if query_state_value == "EXCEPTION":
     assert clust_mem is None
   else:
     if query_type != "DDL":
       assert clust_mem is not None
-      assert_byte_str(expected_str=clust_mem.group(1), 
actual_bytes=data[index],
+      assert_byte_str(expected_str=clust_mem.group(1), actual_bytes=value,
           msg="cluster memory admitted incorrect")
     else:
-      assert data[index] == "0", "cluster memory not zero"
+      assert value == "0", "cluster memory not zero"
 
   # Executor Group
-  index += 1
-  assert sql_results.column_labels[index] == EXECUTOR_GROUP
-  ret_data[EXECUTOR_GROUP] = data[index]
+  value = column_val(TQueryTableColumn.EXECUTOR_GROUP)
   exec_group = re.search(r'\n\s+Executor Group:\s+(.*?)\n', profile_text)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
     assert exec_group is None
-    assert data[index] == "", "executor group should not have been found"
+    assert value == "", "executor group should not have been found"
   else:
     assert exec_group is not None
-    assert data[index] == exec_group.group(1), "executor group incorrect"
+    assert value == exec_group.group(1), "executor group incorrect"
 
   # Executor Groups
-  index += 1
-  assert sql_results.column_labels[index] == EXECUTOR_GROUPS
-  ret_data[EXECUTOR_GROUPS] = data[index]
+  value = column_val(TQueryTableColumn.EXECUTOR_GROUPS)
   exec_groups = re.search(r'\n\s+(Executor group \d+:.*?)\n\s+PlannerInfo', 
profile_text,
       re.DOTALL)
   if query_state_value == "EXCEPTION":
@@ -426,42 +326,36 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
   else:
     assert exec_groups is not None
     dedent_str = re.sub(r'^\s{6}', '', exec_groups.group(1), 
flags=re.MULTILINE)
-    assert data[index] == dedent_str, "executor groups incorrect"
+    assert value == dedent_str, "executor groups incorrect"
 
   # Exec Summary
-  index += 1
-  assert sql_results.column_labels[index] == EXEC_SUMMARY
-  ret_data[EXEC_SUMMARY] = data[index]
+  value = column_val(TQueryTableColumn.EXEC_SUMMARY)
   exec_sum = re.search(r'\n\s+ExecSummary:\s*\n(.*)\n\s+Errors', profile_text, 
re.DOTALL)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
     assert exec_sum is None
-    assert data[index] == ""
+    assert value == ""
   else:
     assert exec_sum is not None
-    assert data[index] == exec_sum.group(1)
+    assert value == exec_sum.group(1)
 
   # Rows Fetched
-  index += 1
-  assert sql_results.column_labels[index] == NUM_ROWS_FETCHED
-  ret_data[NUM_ROWS_FETCHED] = data[index]
+  value = column_val(TQueryTableColumn.NUM_ROWS_FETCHED)
   rows_fetched = re.search(r'\n\s+\-\s+NumRowsFetched:\s+\S+\s+\((\d+)\)', 
profile_text)
   if query_state_value == "EXCEPTION":
     assert rows_fetched is None
   else:
     assert rows_fetched is not None
-    assert data[index] == rows_fetched.group(1)
+    assert value == rows_fetched.group(1)
 
   # Row Materialization Rate
-  index += 1
-  assert sql_results.column_labels[index] == ROW_MATERIALIZATION_ROWS_PER_SEC
-  ret_data[ROW_MATERIALIZATION_ROWS_PER_SEC] = data[index]
+  value = column_val(TQueryTableColumn.ROW_MATERIALIZATION_ROWS_PER_SEC)
   if query_state_value == "EXCEPTION" or query_type == "DDL" or query_type == 
'DML':
-    assert data[index] == "0", "row materialization rate incorrect"
+    assert value == "0", "row materialization rate incorrect"
   else:
     row_mat = re.search(r'\n\s+\-\s+RowMaterializationRate:\s+(\S+)\s+([MK])?',
         profile_text)
     assert row_mat is not None
-    tolerance = int(data[index]) * 0.005
+    tolerance = int(value) * 0.005
     expected_row_mat = 0
     if row_mat.group(2) == "K":
       expected_row_mat = int(float(row_mat.group(1)) * 1000)
@@ -469,13 +363,11 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
       expected_row_mat = int(float(row_mat.group(1)) * 1000000)
     else:
       expected_row_mat = int(float(row_mat.group(1)))
-    assert expected_row_mat - tolerance <= int(data[index]) \
+    assert expected_row_mat - tolerance <= int(value) \
         <= expected_row_mat + tolerance, "row materialization rate incorrect"
 
   # Row Materialization Time
-  index += 1
-  assert sql_results.column_labels[index] == ROW_MATERIALIZATION_TIME_MS
-  ret_data[ROW_MATERIALIZATION_TIME_MS] = data[index]
+  value = column_val(TQueryTableColumn.ROW_MATERIALIZATION_TIME_MS)
   row_mat_tmr = re.search(r'\n\s+\-\s+RowMaterializationTimer:\s+(.*?)\n', 
profile_text)
   if query_state_value == "EXCEPTION":
     assert row_mat_tmr is None
@@ -484,17 +376,15 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
     assert row_mat_tmr.group(1) == "0.000ns", "row materialization timer 
incorrect"
   else:
     assert row_mat_tmr is not None
-    assert_time_str(row_mat_tmr.group(1), data[index],
+    assert_time_str(row_mat_tmr.group(1), value,
         "row materialization time incorrect")
 
   # Compressed Bytes Spilled
-  index += 1
-  assert sql_results.column_labels[index] == COMPRESSED_BYTES_SPILLED
-  ret_data[COMPRESSED_BYTES_SPILLED] = data[index]
   scratch_bytes_total = 0
   for sbw in re.findall(r'\n\s+\-\s+ScratchBytesWritten:.*?\((\d+)\)', 
profile_text):
     scratch_bytes_total += int(sbw)
-  assert int(data[index]) == scratch_bytes_total
+  assert int(column_val(TQueryTableColumn.COMPRESSED_BYTES_SPILLED)) \
+      == scratch_bytes_total
 
   # Parse out only the query timeline.
   timeline = re.search(r'\n\s+Query Timeline:(.*?)\n\s+Frontend', 
profile_text, re.DOTALL)
@@ -502,97 +392,79 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
   timeline = timeline.group(1)
 
   # Event Timeline Planning Finished
-  index += 1
-  assert sql_results.column_labels[index] == EVENT_PLANNING_FINISHED
-  ret_data[EVENT_PLANNING_FINISHED] = data[index]
+  value = column_val(TQueryTableColumn.EVENT_PLANNING_FINISHED)
   if query_state_value == "EXCEPTION":
-    assert data[index] == "0.000", "planning finished event incorrect"
+    assert value == "0.000", "planning finished event incorrect"
   else:
     event = re.search(r'\n\s+\-\s+Planning finished:\s+(\S+)', timeline)
     assert event is not None, "planning finished event missing"
-    assert_time_str(event.group(1), data[index], "planning finished event 
incorrect")
+    assert_time_str(event.group(1), value, "planning finished event incorrect")
 
   # Event Timeline Submit for Admission
-  index += 1
-  assert sql_results.column_labels[index] == EVENT_SUBMIT_FOR_ADMISSION
-  ret_data[EVENT_SUBMIT_FOR_ADMISSION] = data[index]
+  value = column_val(TQueryTableColumn.EVENT_SUBMIT_FOR_ADMISSION)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
-    assert data[index] == "0.000", "submit for admission event incorrect"
+    assert value == "0.000", "submit for admission event incorrect"
   else:
     event = re.search(r'\n\s+\-\s+Submit for admission:\s+(\S+)', timeline)
     assert event is not None, "submit for admission event missing"
-    assert_time_str(event.group(1), data[index], "submit for admission event 
incorrect")
+    assert_time_str(event.group(1), value, "submit for admission event 
incorrect")
 
   # Event Timeline Completed Admission
-  index += 1
-  assert sql_results.column_labels[index] == EVENT_COMPLETED_ADMISSION
-  ret_data[EVENT_COMPLETED_ADMISSION] = data[index]
+  value = column_val(TQueryTableColumn.EVENT_COMPLETED_ADMISSION)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
-    assert data[index] == "0.000", "completed admission event incorrect"
+    assert value == "0.000", "completed admission event incorrect"
   else:
     event = re.search(r'\n\s+\-\s+Completed admission:\s+(\S+)', timeline)
     assert event is not None, "completed admission event missing"
-    assert_time_str(event.group(1), data[index], "completed admission event 
incorrect")
+    assert_time_str(event.group(1), value, "completed admission event 
incorrect")
 
   # Event Timeline All Backends Started
-  index += 1
-  assert sql_results.column_labels[index] == EVENT_ALL_BACKENDS_STARTED
-  ret_data[EVENT_ALL_BACKENDS_STARTED] = data[index]
+  value = column_val(TQueryTableColumn.EVENT_ALL_BACKENDS_STARTED)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
-    assert data[index] == "0.000", "all backends started event incorrect"
+    assert value == "0.000", "all backends started event incorrect"
   else:
     event = re.search(r'\n\s+\-\s+All \d+ execution backends \(\d+ fragment 
instances\)'
         r' started:\s+(\S+)', timeline)
     assert event is not None, "all backends started event missing"
-    assert_time_str(event.group(1), data[index], "all backends started event 
incorrect")
+    assert_time_str(event.group(1), value, "all backends started event 
incorrect")
 
   # Event Timeline Rows Available
-  index += 1
-  assert sql_results.column_labels[index] == EVENT_ROWS_AVAILABLE
-  ret_data[EVENT_ROWS_AVAILABLE] = data[index]
+  value = column_val(TQueryTableColumn.EVENT_ROWS_AVAILABLE)
   if query_state_value == "EXCEPTION" or query_type == "DML":
-    assert data[index] == "0.000", "rows available event incorrect"
+    assert value == "0.000", "rows available event incorrect"
   else:
     event = re.search(r'\n\s+\-\s+Rows available:\s+(\S+)', timeline)
     assert event is not None, "rows available event missing"
-    assert_time_str(event.group(1), data[index], "rows available event 
incorrect")
+    assert_time_str(event.group(1), value, "rows available event incorrect")
 
   # Event Timeline First Row Fetched
-  index += 1
-  assert sql_results.column_labels[index] == EVENT_FIRST_ROW_FETCHED
-  ret_data[EVENT_FIRST_ROW_FETCHED] = data[index]
+  value = column_val(TQueryTableColumn.EVENT_FIRST_ROW_FETCHED)
   if query_state_value == "EXCEPTION" or query_type == "DDL" or query_type == 
"DML":
-    assert data[index] == "0.000", "first row fetched event incorrect"
+    assert value == "0.000", "first row fetched event incorrect"
   else:
     event = re.search(r'\n\s+\-\s+First row fetched:\s+(\S+)', timeline)
     assert event is not None, "first row fetched event missing"
-    assert_time_str(event.group(1), data[index], "first row fetched event 
incorrect")
+    assert_time_str(event.group(1), value, "first row fetched event incorrect")
 
   # Event Timeline Last Row Fetched
-  index += 1
-  assert sql_results.column_labels[index] == EVENT_LAST_ROW_FETCHED
-  ret_data[EVENT_LAST_ROW_FETCHED] = data[index]
+  value = column_val(TQueryTableColumn.EVENT_LAST_ROW_FETCHED)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
-    assert data[index] == "0.000", "last row fetched event incorrect"
+    assert value == "0.000", "last row fetched event incorrect"
   else:
     event = re.search(r'\n\s+\-\s+Last row fetched:\s+(\S+)', timeline)
     assert event is not None, "last row fetched event missing"
-    assert_time_str(event.group(1), data[index], "last row fetched event 
incorrect")
+    assert_time_str(event.group(1), value, "last row fetched event incorrect")
 
   # Event Timeline Unregister Query
-  index += 1
-  assert sql_results.column_labels[index] == EVENT_UNREGISTER_QUERY
-  ret_data[EVENT_UNREGISTER_QUERY] = data[index]
+  value = column_val(TQueryTableColumn.EVENT_UNREGISTER_QUERY)
   event = re.search(r'\n\s+\-\s+Unregister query:\s+(\S+)', timeline)
   assert event is not None, "unregister query event missing"
-  assert_time_str(event.group(1), data[index], "unregister query event 
incorrect")
+  assert_time_str(event.group(1), value, "unregister query event incorrect")
 
   # Read IO Wait Total
-  index += 1
-  assert sql_results.column_labels[index] == READ_IO_WAIT_TOTAL_MS
-  ret_data[READ_IO_WAIT_TOTAL_MS] = data[index]
+  value = column_val(TQueryTableColumn.READ_IO_WAIT_TOTAL_MS)
   total_read_wait = 0
-  if (query_state_value != "EXCEPTION" and query_type == "QUERY") or 
data[index] != "0":
+  if (query_state_value != "EXCEPTION" and query_type == "QUERY") or value != 
"0":
     re_wait_time = re.compile(r'^\s+\-\s+ScannerIoWaitTime:\s+(.*?)$')
     read_waits = assert_scan_node_metrics(re_wait_time, profile_lines)
     for r in read_waits:
@@ -600,47 +472,41 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
 
     tolerance = total_read_wait * 0.001
 
-    assert total_read_wait - tolerance <= float(data[index]) <= \
+    assert total_read_wait - tolerance <= float(value) <= \
         total_read_wait + tolerance, "read io wait time total incorrect"
   else:
-    assert data[index] == "0.000"
+    assert value == "0.000"
 
   # Read IO Wait Average
-  index += 1
-  assert sql_results.column_labels[index] == READ_IO_WAIT_MEAN_MS
-  ret_data[READ_IO_WAIT_MEAN_MS] = data[index]
+  value = column_val(TQueryTableColumn.READ_IO_WAIT_MEAN_MS)
   if (query_state_value != "EXCEPTION" and query_type == "QUERY"
-      and len(read_waits) != 0) or data[index] != "0.000":
+      and len(read_waits) != 0) or value != "0.000":
     avg_read_wait = round_to_3(float(total_read_wait / len(read_waits)))
-    assert avg_read_wait - tolerance <= float(data[index]) <= avg_read_wait + 
tolerance, \
+    assert avg_read_wait - tolerance <= float(value) <= avg_read_wait + 
tolerance, \
         "read io wait time average incorrect"
   else:
-    assert data[index] == "0.000"
+    assert value == "0.000"
 
   # Total Bytes Read From Cache
-  index += 1
-  assert sql_results.column_labels[index] == BYTES_READ_CACHE_TOTAL
-  ret_data[BYTES_READ_CACHE_TOTAL] = data[index]
-  if (query_state_value != "EXCEPTION" and query_type == "QUERY") or 
data[index] != "0":
+  value = column_val(TQueryTableColumn.BYTES_READ_CACHE_TOTAL)
+  if (query_state_value != "EXCEPTION" and query_type == "QUERY") or value != 
"0":
     re_cache_read = re.compile(r'^\s+\-\s+DataCacheHitBytes:\s+.*?\((\d+)\)$')
     read_from_cache = assert_scan_node_metrics(re_cache_read, profile_lines)
 
     total_read = 0
     for r in read_from_cache:
       total_read += int(r)
-    assert total_read == int(data[index]), "bytes read from cache total 
incorrect"
+    assert total_read == int(value), "bytes read from cache total incorrect"
   else:
-    assert data[index] == "0"
+    assert value == "0"
 
   # Total Bytes Read
-  index += 1
-  assert sql_results.column_labels[index] == BYTES_READ_TOTAL
-  ret_data[BYTES_READ_TOTAL] = data[index]
+  value = column_val(TQueryTableColumn.BYTES_READ_TOTAL)
   bytes_read = re.search(r'\n\s+\-\s+TotalBytesRead:\s+.*?\((\d+)\)\n', 
profile_text)
   if query_state_value != "EXCEPTION" and query_type == "QUERY":
     assert bytes_read is not None, "total bytes read missing"
   if bytes_read is not None:
-    assert data[index] == bytes_read.group(1), "total bytes read incorrect"
+    assert value == bytes_read.group(1), "total bytes read incorrect"
 
   # Calculate all peak memory usage stats by scraping the query profile.
   peak_mem_cnt = 0
@@ -660,64 +526,55 @@ def assert_query(query_tbl, client, expected_cluster_id, 
raw_profile=None, impal
     assert peak_mem_cnt > 0, "did not find per node peak memory usage"
 
   # Per Node Peak Memory Usage Min
-  index += 1
-  assert sql_results.column_labels[index] == PERNODE_PEAK_MEM_MIN
-  ret_data[PERNODE_PEAK_MEM_MIN] = data[index]
+  value = column_val(TQueryTableColumn.PERNODE_PEAK_MEM_MIN)
   tolerance = int(min_peak_mem * 0.005)
-  assert min_peak_mem - tolerance <= int(data[index]) <= min_peak_mem + 
tolerance, \
+  assert min_peak_mem - tolerance <= int(value) <= min_peak_mem + tolerance, \
       "pernode peak memory minimum incorrect"
 
   # Per Node Peak Memory Usage Max
-  index += 1
-  assert sql_results.column_labels[index] == PERNODE_PEAK_MEM_MAX
-  ret_data[PERNODE_PEAK_MEM_MAX] = data[index]
+  value = column_val(TQueryTableColumn.PERNODE_PEAK_MEM_MAX)
   tolerance = int(max_peak_mem * 0.005)
-  assert max_peak_mem - tolerance <= int(data[index]) <= max_peak_mem + 
tolerance, \
+  assert max_peak_mem - tolerance <= int(value) <= max_peak_mem + tolerance, \
       "pernode peak memory maximum incorrect"
 
   # Per Node Peak Memory Usage Mean
-  index += 1
-  assert sql_results.column_labels[index] == PERNODE_PEAK_MEM_MEAN
-  ret_data[PERNODE_PEAK_MEM_MEAN] = data[index]
+  value = column_val(TQueryTableColumn.PERNODE_PEAK_MEM_MEAN)
   mean_peak_mem = 0
   if peak_mem_cnt > 0:
     mean_peak_mem = int(total_peak_mem / peak_mem_cnt)
   tolerance = int(max_peak_mem * 0.005)
-  assert mean_peak_mem - tolerance <= int(data[index]) <= mean_peak_mem + 
tolerance, \
+  assert mean_peak_mem - tolerance <= int(value) <= mean_peak_mem + tolerance, 
\
       "pernode peak memory mean incorrect"
 
   # SQL statement
-  index += 1
-  assert sql_results.column_labels[index] == SQL
-  ret_data[SQL] = data[index]
   sql_stmt = re.search(r'\n\s+Sql Statement:\s+(.*?)\n', profile_text)
   assert sql_stmt is not None
-  assert data[index] == sql_stmt.group(1), "sql incorrect"
+  assert column_val(TQueryTableColumn.SQL) == sql_stmt.group(1), "sql 
incorrect"
 
   # Query Plan
-  index += 1
-  assert sql_results.column_labels[index] == PLAN
-  ret_data[PLAN] = data[index]
+  value = column_val(TQueryTableColumn.PLAN)
   plan = re.search(r'\n\s+Plan:\s*\n(.*)\n\s+Estimated Per-Host Mem', 
profile_text,
       re.DOTALL)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
     assert plan is None
-    assert data[index] == ""
+    assert value == ""
   else:
     assert plan is not None
-    assert data[index] == plan.group(1)
+    assert value == plan.group(1)
 
   # Tables Queried
-  index += 1
-  assert sql_results.column_labels[index] == TABLES_QUERIED
-  ret_data[TABLES_QUERIED] = data[index]
+  value = column_val(TQueryTableColumn.TABLES_QUERIED)
   tables = re.search(r'\n\s+Tables Queried:\s+(.*?)\n', profile_text)
   if query_state_value == "EXCEPTION" or query_type == "DDL":
     assert tables is None
-    assert data[index] == ""
+    assert value == ""
   else:
     assert tables is not None
-    assert data[index] == tables.group(1)
+    assert value == tables.group(1)
+
+  # Assert all entries have been tested and added to ret_data
+  for i in range(len(TQueryTableColumn._VALUES_TO_NAMES)):
+    assert TQueryTableColumn._VALUES_TO_NAMES[i] in ret_data
 
   return ret_data
 # function assert_query


Reply via email to