This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 77a87bb10 IMPALA-12737: Refactor the Workload Management 
Initialization Process.
77a87bb10 is described below

commit 77a87bb103362ebafb0624f95d1a413417763d66
Author: jasonmfehr <[email protected]>
AuthorDate: Thu Aug 8 11:15:33 2024 -0700

    IMPALA-12737: Refactor the Workload Management Initialization Process.
    
    The workload management initialization process creates the two tables
    "sys.impala_query_log" and "sys.impala_query_live" during coordinator
    startup.
    
    The current design for this init process is to create both tables on
    each coordinator at every startup by running create database and
    create table if not exists DDLs. This design causes unnecessary DDLs
    to execute which delays coordinator startup and introduces the
    potential for unnecessary startup failures should the DDLs fail.
    
    This patch splits the initialization code into its own file and adds
    version tracking to the individual fields in the workload management
    tables. This patch also adds schema version checks on the workload
    management tables and only runs DDLs for the db tables if necessary.
    
    Additionally, versioning of workload management table schemas is
    introduced. The only allowed schema version in this patch is 1.0.0.
    Future patches that need to modify the workload management table
    schema will expand this list of allowed versions.
    
    Since this patch is a refactor and does not change functionality,
    testing was accomplished by running existing workload management
    unit and python tests.
    
    Change-Id: Id645f94c8da73b91c13a23d7ac0ea026425f0f96
    Reviewed-on: http://gerrit.cloudera.org:8080/21653
    Reviewed-by: Riza Suminto <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/CMakeLists.txt                |   3 +
 be/src/service/impala-server.cc              |   4 +-
 be/src/service/impala-server.h               |  22 +-
 be/src/service/workload-management-fields.cc | 106 +++++-----
 be/src/service/workload-management-flags.cc  |   7 +
 be/src/service/workload-management-init.cc   | 290 +++++++++++++++++++++++++++
 be/src/service/workload-management-test.cc   | 132 ++++++++++++
 be/src/service/workload-management.cc        | 185 +++--------------
 be/src/service/workload-management.h         |  49 +++--
 be/src/util/CMakeLists.txt                   |   3 +
 be/src/util/version-util-test.cc             | 130 ++++++++++++
 be/src/util/version-util.cc                  |  65 ++++++
 be/src/util/version-util.h                   |  44 ++++
 tests/custom_cluster/test_query_live.py      |   8 +-
 tests/custom_cluster/test_query_log.py       |  10 +-
 15 files changed, 798 insertions(+), 260 deletions(-)

diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index 7f5206d3d..c8e386df1 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -45,6 +45,7 @@ add_library(Service
   query-result-set.cc
   query-state-record.cc
   workload-management.cc
+  workload-management-init.cc
   workload-management-fields.cc
   workload-management-flags.cc
 )
@@ -140,6 +141,7 @@ add_library(ServiceTests STATIC
   impala-server-test.cc
   query-options-test.cc
   query-state-record-test.cc
+  workload-management-test.cc
 )
 add_dependencies(ServiceTests gen-deps)
 
@@ -149,4 +151,5 @@ ADD_UNIFIED_BE_LSAN_TEST(hs2-util-test 
"StitchNullsTest.*:PrintTColumnValueTest.
 ADD_UNIFIED_BE_LSAN_TEST(query-options-test QueryOptions.*)
 ADD_UNIFIED_BE_LSAN_TEST(impala-server-test ImpalaServerTest.*)
 ADD_UNIFIED_BE_LSAN_TEST(query-state-record-test QueryStateRecordTest.*)
+ADD_UNIFIED_BE_LSAN_TEST(workload-management-test WorkloadManagementTest.*)
 ADD_BE_LSAN_TEST(internal-server-test)
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4a77680a3..126f3535b 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -3217,7 +3217,9 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t 
hs2_port,
 
     internal_server_ = shared_from_this();
 
-    RETURN_IF_ERROR(InitWorkloadManagement());
+    ABORT_IF_ERROR(Thread::Create("impala-server", "completed-queries",
+      bind<void>(&ImpalaServer::InitWorkloadManagement, this),
+    &workload_management_thread_));
   }
   LOG(INFO) << "Initialized coordinator/executor Impala server on "
             << 
TNetworkAddressToString(exec_env_->configured_backend_address());
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 197fb3330..f2b86c329 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -1148,9 +1148,11 @@ class ImpalaServer : public ImpalaServiceIf,
   /// current query ids to the admissiond.
   [[noreturn]] void AdmissionHeartbeatThread();
 
-  /// If workload management is enabled, starts workload management threads.
-  /// (implemented in workload-management.cc)
-  Status InitWorkloadManagement();
+  /// Checks if workload management is enabled, and starts the init process if 
it is
+  /// enabled. Does not return until coordinator shutdown. Returns immediately 
if
+  /// workload management is not enabled.
+  /// (implemented in workload-management-init.cc)
+  void InitWorkloadManagement();
 
   /// Blocks until running workload management threads are shut down.
   /// (implemented in workload-management.cc)
@@ -1158,7 +1160,8 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// Periodically writes out completed queries (if configured)
   /// (implemented in workload-management.cc)
-  void CompletedQueriesThread();
+  void WorkloadManagementWorker(InternalServer::QueryOptionMap& 
insert_query_opts,
+      const std::string log_table_name);
 
   /// Returns a list of completed queries that have not yet been written to 
storage.
   /// Acquires completed_queries_lock_ to make a copy of completed_queries_ 
state.
@@ -1693,12 +1696,11 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// Tracks the state of the thread that drains the completed queries queue 
to the table.
   /// The associated lock must be held before reading/modifying this variable.
-  impala::workload_management::ThreadState completed_queries_thread_state_ =
-      impala::workload_management::NOT_STARTED;
-  std::mutex completed_queries_threadstate_mu_;
+  ThreadState workload_mgmt_thread_state_ = NOT_STARTED;
+  std::mutex workload_mgmt_threadstate_mu_;
 
-  /// Thread that runs CompletedQueriesThread().
-  std::unique_ptr<Thread> completed_queries_thread_;
+  /// Thread that runs Workload Management.
+  std::unique_ptr<Thread> workload_management_thread_;
 
   /// Ticker that wakes up the completed_queried_thread at set intervals to 
process the
   /// queued completed queries. Uses the completed_queries_lock_ to synchonize 
access to
@@ -1706,7 +1708,7 @@ class ImpalaServer : public ImpalaServiceIf,
   std::unique_ptr<TickerSecondsBool> completed_queries_ticker_;
 
   /// Queue of completed queries and the lock to synchronize access to it.
-  std::list<impala::workload_management::CompletedQuery> completed_queries_;
+  std::list<CompletedQuery> completed_queries_;
   std::mutex completed_queries_lock_;
 
 };
diff --git a/be/src/service/workload-management-fields.cc 
b/be/src/service/workload-management-fields.cc
index 4cff8d2c6..cdac6d945 100644
--- a/be/src/service/workload-management-fields.cc
+++ b/be/src/service/workload-management-fields.cc
@@ -25,18 +25,16 @@
 
 #include <algorithm>
 #include <string>
-#include <utility>
 
-#include <boost/algorithm/string.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gutil/strings/substitute.h>
 
 #include "common/compiler-util.h"
+#include "gen-cpp/SystemTables_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/exec-env.h"
 #include "service/query-options.h"
-#include "service/query-state-record.h"
 #include "util/debug-util.h"
 #include "util/network-util.h"
 #include "util/sql-util.h"
@@ -49,8 +47,6 @@ using strings::Substitute;
 
 namespace impala {
 
-namespace workload_management {
-
 /// Helper type for event timeline timestamp functions.
 using _event_compare_pred = function<bool(const string& comp)>;
 
@@ -98,25 +94,25 @@ const array<FieldDefinition, NumQueryTableColumns> 
FIELD_DEFINITIONS{{
     FieldDefinition(TQueryTableColumn::CLUSTER_ID, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << ctx.cluster_id << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Query Id
     FieldDefinition(TQueryTableColumn::QUERY_ID, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << PrintId(ctx.record->base_state->id) << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Session Id
     FieldDefinition(TQueryTableColumn::SESSION_ID, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << PrintId(ctx.record->session_id) << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Session Type
     FieldDefinition(TQueryTableColumn::SESSION_TYPE, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << ctx.record->session_type << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Hiveserver2 Protocol Version
     FieldDefinition(TQueryTableColumn::HIVESERVER2_PROTOCOL_VERSION,
@@ -126,32 +122,32 @@ const array<FieldDefinition, NumQueryTableColumns> 
FIELD_DEFINITIONS{{
             ctx.sql << ctx.record->hiveserver2_protocol_version;
           }
           ctx.sql << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Effective User
     FieldDefinition(TQueryTableColumn::DB_USER, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << ctx.record->base_state->effective_user << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // DB User
     FieldDefinition(TQueryTableColumn::DB_USER_CONNECTION, 
TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << ctx.record->db_user_connection << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Default DB
     FieldDefinition(TQueryTableColumn::DB_NAME, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << ctx.record->base_state->default_db << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Impala Coordinator
     FieldDefinition(TQueryTableColumn::IMPALA_COORDINATOR, 
TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" <<TNetworkAddressToString(
               ExecEnv::GetInstance()->configured_backend_address()) << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Query Status
     FieldDefinition(TQueryTableColumn::QUERY_STATUS, TPrimitiveType::STRING,
@@ -163,31 +159,31 @@ const array<FieldDefinition, NumQueryTableColumns> 
FIELD_DEFINITIONS{{
             ctx.sql << 
EscapeSql(ctx.record->base_state->query_status.msg().msg());
           }
           ctx.sql << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Query State
     FieldDefinition(TQueryTableColumn::QUERY_STATE, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << ctx.record->base_state->query_state << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Impala Query End State
     FieldDefinition(TQueryTableColumn::IMPALA_QUERY_END_STATE, 
TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << ctx.record->impala_query_end_state << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Query Type
     FieldDefinition(TQueryTableColumn::QUERY_TYPE, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << ctx.record->base_state->stmt_type << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Client Network Address
     FieldDefinition(TQueryTableColumn::NETWORK_ADDRESS, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << 
TNetworkAddressToString(ctx.record->client_address) << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Query Start Time in UTC
     // Required
@@ -195,39 +191,39 @@ const array<FieldDefinition, NumQueryTableColumns> 
FIELD_DEFINITIONS{{
         [](FieldParserContext& ctx){
           ctx.sql << "UNIX_MICROS_TO_UTC_TIMESTAMP(" <<
               ctx.record->base_state->start_time_us << ")";
-        }),
+        }, VERSION_1_0_0),
 
     // Query Duration
     FieldDefinition(TQueryTableColumn::TOTAL_TIME_MS, TPrimitiveType::DECIMAL,
         [](FieldParserContext& ctx){
           _write_decimal(ctx, (ctx.record->base_state->end_time_us -
               ctx.record->base_state->start_time_us), MICROS_TO_MILLIS);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Query Options set by Configuration
     FieldDefinition(TQueryTableColumn::QUERY_OPTS_CONFIG, 
TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           const string opts_str = DebugQueryOptions(ctx.record->query_options);
           ctx.sql << "'" << EscapeSql(opts_str) << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Resource Pool
     FieldDefinition(TQueryTableColumn::RESOURCE_POOL, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << EscapeSql(ctx.record->base_state->resource_pool) 
<< "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Per-host Memory Estimate
     FieldDefinition(TQueryTableColumn::PER_HOST_MEM_ESTIMATE, 
TPrimitiveType::BIGINT,
         [](FieldParserContext& ctx){
           ctx.sql << ctx.record->per_host_mem_estimate;
-        }),
+        }, VERSION_1_0_0),
 
     // Dedicated Coordinator Memory Estimate
     FieldDefinition(TQueryTableColumn::DEDICATED_COORD_MEM_ESTIMATE,
         TPrimitiveType::BIGINT, [](FieldParserContext& ctx){
           ctx.sql << ctx.record->dedicated_coord_mem_estimate;
-        }),
+        }, VERSION_1_0_0),
 
     // Per-Host Fragment Instances
     FieldDefinition(TQueryTableColumn::PER_HOST_FRAGMENT_INSTANCES,
@@ -243,7 +239,7 @@ const array<FieldDefinition, NumQueryTableColumns> 
FIELD_DEFINITIONS{{
           }
 
           ctx.sql << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Backends Count
     FieldDefinition(TQueryTableColumn::BACKENDS_COUNT, TPrimitiveType::INT,
@@ -253,133 +249,133 @@ const array<FieldDefinition, NumQueryTableColumns> 
FIELD_DEFINITIONS{{
           } else {
             ctx.sql << ctx.record->per_host_state.size();
           }
-        }),
+        }, VERSION_1_0_0),
 
     // Admission Result
     FieldDefinition(TQueryTableColumn::ADMISSION_RESULT, 
TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << ctx.record->admission_result << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Cluster Memory Admitted
     FieldDefinition(TQueryTableColumn::CLUSTER_MEMORY_ADMITTED, 
TPrimitiveType::BIGINT,
         [](FieldParserContext& ctx){
           ctx.sql << ctx.record->base_state->cluster_mem_est;
-        }),
+        }, VERSION_1_0_0),
 
     // Executor Group
     FieldDefinition(TQueryTableColumn::EXECUTOR_GROUP, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << ctx.record->executor_group << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Executor Groups
     FieldDefinition(TQueryTableColumn::EXECUTOR_GROUPS, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << EscapeSql(ctx.record->executor_groups) << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Exec Summary (also known as the operator summary)
     FieldDefinition(TQueryTableColumn::EXEC_SUMMARY, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << EscapeSql(ctx.record->exec_summary) << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Number of rows fetched
     FieldDefinition(TQueryTableColumn::NUM_ROWS_FETCHED, 
TPrimitiveType::BIGINT,
         [](FieldParserContext& ctx){
           ctx.sql << ctx.record->base_state->num_rows_fetched;
-        }),
+        }, VERSION_1_0_0),
 
     // Row Materialization Rate
     FieldDefinition(TQueryTableColumn::ROW_MATERIALIZATION_ROWS_PER_SEC,
         TPrimitiveType::BIGINT, [](FieldParserContext& ctx){
           ctx.sql << ctx.record->row_materialization_rate;
-        }),
+        }, VERSION_1_0_0),
 
     // Row Materialization Time
     FieldDefinition(TQueryTableColumn::ROW_MATERIALIZATION_TIME_MS,
         TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
           _write_decimal(ctx, ctx.record->row_materialization_time, 
NANOS_TO_MILLIS);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Compressed Bytes Spilled to Disk
     FieldDefinition(TQueryTableColumn::COMPRESSED_BYTES_SPILLED, 
TPrimitiveType::BIGINT,
         [](FieldParserContext& ctx){
           ctx.sql << ctx.record->compressed_bytes_spilled;
-        }),
+        }, VERSION_1_0_0),
 
     // Events Timeline Planning Finished
     FieldDefinition(TQueryTableColumn::EVENT_PLANNING_FINISHED, 
TPrimitiveType::DECIMAL,
         [](FieldParserContext& ctx){
           _write_event(ctx, PLANNING_FINISHED);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Events Timeline Submit for Admission
     FieldDefinition(TQueryTableColumn::EVENT_SUBMIT_FOR_ADMISSION,
         TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
           _write_event(ctx, SUBMIT_FOR_ADMISSION);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Events Timeline Completed Admission
     FieldDefinition(TQueryTableColumn::EVENT_COMPLETED_ADMISSION,
         TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
           _write_event(ctx, COMPLETED_ADMISSION);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Events Timeline All Execution Backends Started
     FieldDefinition(TQueryTableColumn::EVENT_ALL_BACKENDS_STARTED,
         TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
           _write_event(ctx, ALL_BACKENDS_STARTED);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Events Timeline Rows Available
     FieldDefinition(TQueryTableColumn::EVENT_ROWS_AVAILABLE, 
TPrimitiveType::DECIMAL,
         [](FieldParserContext& ctx){
           _write_event(ctx, ROWS_AVAILABLE);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Events Timeline First Row Fetched
     FieldDefinition(TQueryTableColumn::EVENT_FIRST_ROW_FETCHED, 
TPrimitiveType::DECIMAL,
         [](FieldParserContext& ctx){
           _write_event(ctx, FIRST_ROW_FETCHED);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Events Timeline Last Row Fetched
     FieldDefinition(TQueryTableColumn::EVENT_LAST_ROW_FETCHED, 
TPrimitiveType::DECIMAL,
         [](FieldParserContext& ctx){
           _write_event(ctx, LAST_ROW_FETCHED);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Events Timeline Unregister Query
     FieldDefinition(TQueryTableColumn::EVENT_UNREGISTER_QUERY, 
TPrimitiveType::DECIMAL,
         [](FieldParserContext& ctx){
           _write_event(ctx, UNREGISTER_QUERY);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Read IO Wait Time Total
     FieldDefinition(TQueryTableColumn::READ_IO_WAIT_TOTAL_MS, 
TPrimitiveType::DECIMAL,
         [](FieldParserContext& ctx){
           _write_decimal(ctx, ctx.record->read_io_wait_time_total, 
NANOS_TO_MILLIS);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Read IO Wait Time Mean
     FieldDefinition(TQueryTableColumn::READ_IO_WAIT_MEAN_MS, 
TPrimitiveType::DECIMAL,
         [](FieldParserContext& ctx){
           _write_decimal(ctx, ctx.record->read_io_wait_time_mean, 
NANOS_TO_MILLIS);
-        }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+        }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
 
     // Bytes Read from the Data Cache Total
     FieldDefinition(TQueryTableColumn::BYTES_READ_CACHE_TOTAL, 
TPrimitiveType::BIGINT,
         [](FieldParserContext& ctx){
           ctx.sql << ctx.record->bytes_read_cache_total;
-        }),
+        }, VERSION_1_0_0),
 
     // Bytes Read Total
     FieldDefinition(TQueryTableColumn::BYTES_READ_TOTAL, 
TPrimitiveType::BIGINT,
         [](FieldParserContext& ctx){
           ctx.sql << ctx.record->bytes_read_total;
-        }),
+        }, VERSION_1_0_0),
 
     // Per-Node Peak Memory Usage Min
     FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MIN, 
TPrimitiveType::BIGINT,
@@ -392,7 +388,7 @@ const array<FieldDefinition, NumQueryTableColumns> 
FIELD_DEFINITIONS{{
           } else {
             ctx.sql << 0;
           }
-        }),
+        }, VERSION_1_0_0),
 
     // Per-Node Peak Memory Usage Max
     FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MAX, 
TPrimitiveType::BIGINT,
@@ -405,7 +401,7 @@ const array<FieldDefinition, NumQueryTableColumns> 
FIELD_DEFINITIONS{{
           } else {
             ctx.sql << max_elem->second.peak_memory_usage;
           }
-        }),
+        }, VERSION_1_0_0),
 
     // Per-Node Peak Memory Usage Mean
     FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MEAN, 
TPrimitiveType::BIGINT,
@@ -421,14 +417,14 @@ const array<FieldDefinition, NumQueryTableColumns> 
FIELD_DEFINITIONS{{
           }
 
           ctx.sql << calc_mean;
-        }),
+        }, VERSION_1_0_0),
 
     // SQL Statement
     FieldDefinition(TQueryTableColumn::SQL, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" <<
               EscapeSql(ctx.record->redacted_sql, 
FLAGS_query_log_max_sql_length) << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Query Plan
     FieldDefinition(TQueryTableColumn::PLAN, TPrimitiveType::STRING,
@@ -436,16 +432,14 @@ const array<FieldDefinition, NumQueryTableColumns> 
FIELD_DEFINITIONS{{
           ctx.sql << "'"
               << EscapeSql(ctx.record->base_state->plan, 
FLAGS_query_log_max_plan_length)
               << "'";
-        }),
+        }, VERSION_1_0_0),
 
     // Tables Queried
     FieldDefinition(TQueryTableColumn::TABLES_QUERIED, TPrimitiveType::STRING,
         [](FieldParserContext& ctx){
           ctx.sql << "'" << PrintTableList(ctx.record->tables) << "'";
-        }),
+        }, VERSION_1_0_0),
 
     }}; // FIELDS_PARSERS const array
 
-} //namespace workload_management
-
 } // namespace impala
diff --git a/be/src/service/workload-management-flags.cc 
b/be/src/service/workload-management-flags.cc
index b71bef2cc..a911407d5 100644
--- a/be/src/service/workload-management-flags.cc
+++ b/be/src/service/workload-management-flags.cc
@@ -170,3 +170,10 @@ DEFINE_string_hidden(query_log_table_props, "", "Comma 
separated list of additio
     "Iceberg table properties in the format 'key'='value' to apply when 
creating the "
     "query log table. Only applies when the table is being created. After 
table "
     "creation, this property does nothing");
+
+DEFINE_string_hidden(workload_mgmt_schema_version, "1.0.0", "Schema version of 
the "
+    "workload management table.");
+
+DEFINE_validator(workload_mgmt_schema_version, [](const char* name, const 
string& val) {
+  return !val.empty();
+});
diff --git a/be/src/service/workload-management-init.cc 
b/be/src/service/workload-management-init.cc
new file mode 100644
index 000000000..c35052092
--- /dev/null
+++ b/be/src/service/workload-management-init.cc
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// This file contains the code for the initialization process for workload 
management.
+/// The init process handles:
+///   1. Checking the state of the workload management db and tables.
+///   2. Creating the db/tables if necessary.
+///   3. Starting the workload management thread which  runs the completed 
queries
+///      processing loop.
+
+#include "service/workload-management.h"
+
+#include <mutex>
+
+#include <boost/algorithm/string/case_conv.hpp>
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/algorithm/string/trim.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gutil/strings/strcat.h>
+
+#include "common/status.h"
+#include "gen-cpp/CatalogObjects_constants.h"
+#include "gen-cpp/SystemTables_types.h"
+#include "gen-cpp/TCLIService_types.h"
+#include "gen-cpp/Types_types.h"
+#include "kudu/util/version_util.h"
+#include "service/impala-server.h"
+
+using namespace std;
+using namespace impala;
+using boost::algorithm::starts_with;
+using boost::algorithm::trim_copy;
+using kudu::Version;
+using kudu::ParseVersion;
+
+DECLARE_bool(enable_workload_mgmt);
+DECLARE_int32(query_log_write_interval_s);
+DECLARE_int32(query_log_write_timeout_s);
+DECLARE_string(query_log_request_pool);
+DECLARE_string(query_log_table_location);
+DECLARE_string(query_log_table_name);
+DECLARE_string(query_log_table_props);
+DECLARE_string(workload_mgmt_user);
+DECLARE_string(workload_mgmt_schema_version);
+
+namespace impala {
+
+/// Name of the database where all workload management tables will be stored.
+static const string DB = "sys";
+
+/// Sets up the sys database generating and executing the necessary DML 
statements.
+static void _setupDb(InternalServer* server,
+    InternalServer::QueryOptionMap& insert_query_opts) {
+  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
+  ABORT_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
+      StrCat("CREATE DATABASE IF NOT EXISTS ", DB, " COMMENT "
+      "'System database for Impala introspection'"), insert_query_opts, 
false));
+  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
+} // function _setupDb
+
+/// Appends all relevant fields to a create or alter table sql statement.
+static void _appendCols(StringStreamPop& stream,
+    std::function<bool(const FieldDefinition& item)> shouldIncludeCol) {
+  bool match = false;
+
+  for (const auto& field : FIELD_DEFINITIONS) {
+   if (shouldIncludeCol(field)) {
+    match = true;
+    stream << field.db_column << " " << field.db_column_type;
+
+      if (field.db_column_type == TPrimitiveType::DECIMAL) {
+        stream << "(" << field.precision << "," << field.scale << ")";
+      }
+
+      stream << ",";
+   }
+  }
+
+  DCHECK_EQ(match, true);
+  stream.move_back();
+} // function _appendCols
+
+/// Sets up the query table by generating and executing the necessary DML 
statements.
+static void _setupTable(InternalServer* server, const string& table_name,
+    InternalServer::QueryOptionMap& insert_query_opts, const Version& 
target_version,
+    bool is_system_table = false) {
+  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
+
+  StringStreamPop create_table_sql;
+  create_table_sql << "CREATE ";
+  // System tables do not have anything to purge, and must not be managed 
tables.
+  if (is_system_table) create_table_sql << "EXTERNAL ";
+  create_table_sql << "TABLE IF NOT EXISTS " << table_name << "(";
+
+  _appendCols(create_table_sql, [target_version](const FieldDefinition& f){
+      return f.schema_version <= target_version;});
+
+  create_table_sql << ") ";
+
+  if (!is_system_table) {
+    create_table_sql << "PARTITIONED BY SPEC(identity(cluster_id), 
HOUR(start_time_utc)) "
+        << "STORED AS iceberg ";
+
+    if (!FLAGS_query_log_table_location.empty()) {
+      create_table_sql << "LOCATION '" << FLAGS_query_log_table_location << "' 
";
+    }
+  }
+
+  create_table_sql << "TBLPROPERTIES ('schema_version'='" << 
target_version.ToString()
+      << "','format-version'='2'";
+
+  if (is_system_table) {
+    create_table_sql << ",'"
+                     << g_CatalogObjects_constants.TBL_PROP_SYSTEM_TABLE 
<<"'='true'";
+  } else if (!FLAGS_query_log_table_props.empty()) {
+    create_table_sql << "," << FLAGS_query_log_table_props;
+  }
+
+  create_table_sql << ")";
+
+  VLOG(2) << "Creating workload management table '" << table_name
+      << "' on schema version '" << target_version.ToString() << "'";
+  ABORT_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
+      create_table_sql.str(), insert_query_opts, false));
+
+  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
+
+  LOG(INFO) << "Completed " << table_name << " initialization. 
write_interval=\"" <<
+      FLAGS_query_log_write_interval_s << "s\"";
+} // function _setupTable
+
+static Version _retrieveSchemaVersion(InternalServer* server, const string 
table_name,
+     const InternalServer::QueryOptionMap& insert_query_opts) {
+
+  vector<apache::hive::service::cli::thrift::TRow> query_results;
+
+  const Status describe_table = 
server->ExecuteAndFetchAllHS2(FLAGS_workload_mgmt_user,
+      StrCat("DESCRIBE EXTENDED ", table_name), query_results, 
insert_query_opts, false);
+
+  // If an error, ignore the error and run as if workload management has never
+  // executed. Since all the DDLs use the "if not exists" clause, extra runs 
of the
+  // DDLs will not cause any harm.
+  if (describe_table.ok()) {
+    const string SCHEMA_VER_PROP_NAME = "schema_version";
+
+    // Table exists, search for its schema_version table property.
+    for(auto& res : query_results) {
+      if (starts_with(res.colVals[1].stringVal.value, SCHEMA_VER_PROP_NAME) ){
+        const string schema_ver = trim_copy(res.colVals[2].stringVal.value);
+        Version parsed_schema_ver;
+
+        VLOG(2) << "Actual current workload management schema version of the '"
+            << table_name << "' table is '" << schema_ver << "'";
+
+        if(!ParseVersion(schema_ver, &parsed_schema_ver).ok()) {
+          ABORT_WITH_ERROR(StrCat("Invalid actual workload management schema 
version '",
+              schema_ver, "' for table '", table_name, "'"));
+        }
+
+        return parsed_schema_ver;
+      }
+    }
+
+    // If the for loop does not find the schema_version table property, then 
it has been
+    // removed outside of the workload management code.
+    ABORT_WITH_ERROR(StrCat("Table '", table_name, "' is missing required 
property '",
+        SCHEMA_VER_PROP_NAME, "'"));
+  }
+
+  return NO_TABLE_EXISTS;
+} // _retrieveSchemaVersion
+
+/// Aborts with error if the target_ver is less than the actual_ver.
+static void _errorIfDowngrade(const Version target_ver, const Version 
actual_ver,
+    const string table_name) {
+  if (target_ver < actual_ver) {
+      ABORT_WITH_ERROR(StrCat("Target schema version '", target_ver.ToString(),
+          " of the '", table_name, "' table is lower than the actual schema 
version '",
+          actual_ver.ToString(), "'. Downgrades are not supported. The target 
schema "
+          "version must be greater than or equal to the actual schema 
version."));
+    }
+} // _errorIfDowngrade
+
+void ImpalaServer::InitWorkloadManagement() {
+  if (!FLAGS_enable_workload_mgmt) {
+    return;
+  }
+
+  // Fully qualified table name based on startup flags.
+  const string log_table_name = StrCat(DB, ".", FLAGS_query_log_table_name);
+
+  // Verify FIELD_DEFINITIONS includes all QueryTableColumns.
+  DCHECK_EQ(_TQueryTableColumn_VALUES_TO_NAMES.size(), 
FIELD_DEFINITIONS.size());
+  for (const auto& field : FIELD_DEFINITIONS) {
+    // Verify all fields match their column position.
+    DCHECK_EQ(FIELD_DEFINITIONS[field.db_column].db_column, field.db_column);
+  }
+
+  // Ensure a valid schema version was specified on the command line flag.
+  Version target_schema_version;
+  if (!ParseVersion(FLAGS_workload_mgmt_schema_version,
+      &target_schema_version).ok()) {
+    ABORT_WITH_ERROR(StrCat("Invalid workload management schema version '",
+        FLAGS_workload_mgmt_schema_version, "'"));
+  }
+  VLOG(2) << "Target workload management schema version is '"
+      << target_schema_version.ToString() << "'";
+
+  if (target_schema_version != VERSION_1_0_0) {
+    ABORT_WITH_ERROR(StrCat("Workload management schema version '",
+        FLAGS_workload_mgmt_schema_version, "' does not match any valid 
version"));
+  }
+
+  // Setup default query options that will be provided on all queries that 
insert rows
+  // into the completed queries table.
+  InternalServer::QueryOptionMap insert_query_opts;
+
+  insert_query_opts[TImpalaQueryOptions::TIMEZONE] = "UTC";
+  insert_query_opts[TImpalaQueryOptions::QUERY_TIMEOUT_S] = std::to_string(
+      FLAGS_query_log_write_timeout_s < 1 ?
+      FLAGS_query_log_write_interval_s : FLAGS_query_log_write_timeout_s);
+  if (!FLAGS_query_log_request_pool.empty()) {
+    insert_query_opts[TImpalaQueryOptions::REQUEST_POOL] = 
FLAGS_query_log_request_pool;
+  }
+
+  {
+    lock_guard<mutex> l(workload_mgmt_threadstate_mu_);
+    workload_mgmt_thread_state_ = INITIALIZING;
+  }
+
+  Version parsed_actual_schema_version;
+
+  // Create and/or update the completed queries table if needed.
+  parsed_actual_schema_version = _retrieveSchemaVersion(internal_server_.get(),
+      log_table_name, insert_query_opts);
+
+  if (parsed_actual_schema_version == NO_TABLE_EXISTS) {
+    // First time setting up workload management.
+    // Setup the sys database.
+    _setupDb(internal_server_.get(), insert_query_opts);
+
+    // Create the query log table at the target schema version.
+    _setupTable(internal_server_.get(), log_table_name, insert_query_opts,
+        VERSION_1_0_0);
+
+    parsed_actual_schema_version = VERSION_1_0_0;
+  } else {
+    _errorIfDowngrade(target_schema_version, parsed_actual_schema_version,
+        log_table_name);
+  }
+
+  // Create and/or update the live queries table if needed.
+  // Determine the live queries table name.
+  string live_table_name = StrCat(DB, ".",
+      to_string(TSystemTableName::IMPALA_QUERY_LIVE));
+  boost::algorithm::to_lower(live_table_name);
+
+  parsed_actual_schema_version = _retrieveSchemaVersion(internal_server_.get(),
+      live_table_name, insert_query_opts);
+
+  if (parsed_actual_schema_version == NO_TABLE_EXISTS) {
+    // First time setting up workload management.
+    // Create the query live table on the target schema version.
+    _setupTable(internal_server_.get(), live_table_name, insert_query_opts,
+        target_schema_version, true);
+  } else {
+    _errorIfDowngrade(target_schema_version, parsed_actual_schema_version,
+        live_table_name);
+  }
+
+  LOG(INFO) << "Completed workload management initialization";
+  WorkloadManagementWorker(insert_query_opts, log_table_name);
+} // ImpalaServer::InitWorkloadManagement
+
+} // namespace impala
diff --git a/be/src/service/workload-management-test.cc 
b/be/src/service/workload-management-test.cc
new file mode 100644
index 000000000..6b7c5cd7b
--- /dev/null
+++ b/be/src/service/workload-management-test.cc
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/workload-management.h"
+
+#include <sstream>
+#include <string>
+
+#include <gutil/strings/strcat.h>
+
+#include "gen-cpp/SystemTables_types.h"
+#include "testutil/gtest-util.h"
+
+using std::stringstream;
+using std::string;
+
+namespace impala {
+
+static constexpr int8_t EXPECTED_DECIMAL_PRECISION = 18;
+static constexpr int8_t EXPECTED_DECIMAL_SCALE = 3;
+
+// Builds and returns a string that can be used to identify the column name 
during a test
+// failure situation.
+static string _eMsg(const string& expected_name) {
+  return StrCat("for field: ", expected_name);
+}
+
+// Asserts the common fields on a FieldDefinition instance.
+static void _assertCol(const string& expected_name,
+    const TPrimitiveType::type& expected_type, const string& expected_version,
+    const FieldDefinition& actual){
+  stringstream actual_col_name;
+  actual_col_name << actual.db_column;
+
+  stringstream actual_col_type;
+  actual_col_type << actual.db_column_type;
+
+  EXPECT_EQ(expected_name, actual_col_name.str()) << _eMsg(expected_name);
+  EXPECT_EQ(to_string(expected_type), actual_col_type.str()) << 
_eMsg(expected_name);
+  EXPECT_EQ(expected_version, actual.schema_version.ToString()) << 
_eMsg(expected_name);
+}
+
+// Asserts the common fields and the decimal related fields on a 
FieldDefinition instance.
+static void _assertColDecimal(const string& expected_name,
+    const string& expected_version, const FieldDefinition& actual){
+  _assertCol(expected_name, TPrimitiveType::DECIMAL, expected_version, actual);
+
+  EXPECT_EQ(EXPECTED_DECIMAL_PRECISION, actual.precision) << 
_eMsg(expected_name);
+  EXPECT_EQ(EXPECTED_DECIMAL_SCALE, actual.scale) << _eMsg(expected_name);
+}
+
+TEST(WorkloadManagementTest, CheckColumnNames) {
+  EXPECT_EQ(49, FIELD_DEFINITIONS.size());
+
+  _assertCol("CLUSTER_ID", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[0]);
+  _assertCol("QUERY_ID", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[1]);
+  _assertCol("SESSION_ID", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[2]);
+  _assertCol("SESSION_TYPE", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[3]);
+  _assertCol("HIVESERVER2_PROTOCOL_VERSION", TPrimitiveType::STRING, "1.0.0",
+      FIELD_DEFINITIONS[4]);
+  _assertCol("DB_USER", TPrimitiveType::STRING, "1.0.0", FIELD_DEFINITIONS[5]);
+  _assertCol("DB_USER_CONNECTION", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[6]);
+  _assertCol("DB_NAME", TPrimitiveType::STRING, "1.0.0", FIELD_DEFINITIONS[7]);
+  _assertCol("IMPALA_COORDINATOR", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[8]);
+  _assertCol("QUERY_STATUS", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[9]);
+  _assertCol("QUERY_STATE", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[10]);
+  _assertCol("IMPALA_QUERY_END_STATE", TPrimitiveType::STRING, "1.0.0",
+      FIELD_DEFINITIONS[11]);
+  _assertCol("QUERY_TYPE", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[12]);
+  _assertCol("NETWORK_ADDRESS", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[13]);
+  _assertCol("START_TIME_UTC", TPrimitiveType::TIMESTAMP, "1.0.0", 
FIELD_DEFINITIONS[14]);
+  _assertColDecimal("TOTAL_TIME_MS", "1.0.0", FIELD_DEFINITIONS[15]);
+  _assertCol("QUERY_OPTS_CONFIG", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[16]);
+  _assertCol("RESOURCE_POOL", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[17]);
+  _assertCol("PER_HOST_MEM_ESTIMATE", TPrimitiveType::BIGINT, "1.0.0",
+      FIELD_DEFINITIONS[18]);
+  _assertCol("DEDICATED_COORD_MEM_ESTIMATE", TPrimitiveType::BIGINT, "1.0.0",
+      FIELD_DEFINITIONS[19]);
+  _assertCol("PER_HOST_FRAGMENT_INSTANCES", TPrimitiveType::STRING, "1.0.0",
+      FIELD_DEFINITIONS[20]);
+  _assertCol("BACKENDS_COUNT", TPrimitiveType::INT, "1.0.0", 
FIELD_DEFINITIONS[21]);
+  _assertCol("ADMISSION_RESULT", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[22]);
+  _assertCol("CLUSTER_MEMORY_ADMITTED", TPrimitiveType::BIGINT, "1.0.0",
+      FIELD_DEFINITIONS[23]);
+  _assertCol("EXECUTOR_GROUP", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[24]);
+  _assertCol("EXECUTOR_GROUPS", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[25]);
+  _assertCol("EXEC_SUMMARY", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[26]);
+  _assertCol("NUM_ROWS_FETCHED", TPrimitiveType::BIGINT, "1.0.0", 
FIELD_DEFINITIONS[27]);
+  _assertCol("ROW_MATERIALIZATION_ROWS_PER_SEC", TPrimitiveType::BIGINT, 
"1.0.0",
+      FIELD_DEFINITIONS[28]);
+  _assertColDecimal("ROW_MATERIALIZATION_TIME_MS", "1.0.0", 
FIELD_DEFINITIONS[29]);
+  _assertCol("COMPRESSED_BYTES_SPILLED", TPrimitiveType::BIGINT, "1.0.0",
+      FIELD_DEFINITIONS[30]);
+  _assertColDecimal("EVENT_PLANNING_FINISHED", "1.0.0", FIELD_DEFINITIONS[31]);
+  _assertColDecimal("EVENT_SUBMIT_FOR_ADMISSION", "1.0.0", 
FIELD_DEFINITIONS[32]);
+  _assertColDecimal("EVENT_COMPLETED_ADMISSION", "1.0.0", 
FIELD_DEFINITIONS[33]);
+  _assertColDecimal("EVENT_ALL_BACKENDS_STARTED", "1.0.0", 
FIELD_DEFINITIONS[34]);
+  _assertColDecimal("EVENT_ROWS_AVAILABLE", "1.0.0", FIELD_DEFINITIONS[35]);
+  _assertColDecimal("EVENT_FIRST_ROW_FETCHED", "1.0.0", FIELD_DEFINITIONS[36]);
+  _assertColDecimal("EVENT_LAST_ROW_FETCHED", "1.0.0", FIELD_DEFINITIONS[37]);
+  _assertColDecimal("EVENT_UNREGISTER_QUERY", "1.0.0", FIELD_DEFINITIONS[38]);
+  _assertColDecimal("READ_IO_WAIT_TOTAL_MS", "1.0.0", FIELD_DEFINITIONS[39]);
+  _assertColDecimal("READ_IO_WAIT_MEAN_MS", "1.0.0", FIELD_DEFINITIONS[40]);
+  _assertCol("BYTES_READ_CACHE_TOTAL", TPrimitiveType::BIGINT, "1.0.0",
+      FIELD_DEFINITIONS[41]);
+  _assertCol("BYTES_READ_TOTAL", TPrimitiveType::BIGINT, "1.0.0", 
FIELD_DEFINITIONS[42]);
+  _assertCol("PERNODE_PEAK_MEM_MIN", TPrimitiveType::BIGINT, "1.0.0",
+      FIELD_DEFINITIONS[43]);
+  _assertCol("PERNODE_PEAK_MEM_MAX", TPrimitiveType::BIGINT, "1.0.0",
+      FIELD_DEFINITIONS[44]);
+  _assertCol("PERNODE_PEAK_MEM_MEAN", TPrimitiveType::BIGINT, "1.0.0",
+      FIELD_DEFINITIONS[45]);
+  _assertCol("SQL", TPrimitiveType::STRING, "1.0.0", FIELD_DEFINITIONS[46]);
+  _assertCol("PLAN", TPrimitiveType::STRING, "1.0.0", FIELD_DEFINITIONS[47]);
+  _assertCol("TABLES_QUERIED", TPrimitiveType::STRING, "1.0.0", 
FIELD_DEFINITIONS[48]);
+}
+
+} // namespace impala
diff --git a/be/src/service/workload-management.cc 
b/be/src/service/workload-management.cc
index a968c25af..f0b15c794 100644
--- a/be/src/service/workload-management.cc
+++ b/be/src/service/workload-management.cc
@@ -23,10 +23,8 @@
 #include <memory>
 #include <mutex>
 #include <string>
-#include <thread>
 #include <utility>
 
-#include <boost/algorithm/string/case_conv.hpp>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gutil/strings/strcat.h>
@@ -34,9 +32,7 @@
 #include "common/compiler-util.h"
 #include "common/logging.h"
 #include "common/status.h"
-#include "gen-cpp/CatalogObjects_constants.h"
 #include "gen-cpp/Frontend_types.h"
-#include "gen-cpp/Query_types.h"
 #include "gen-cpp/SystemTables_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/query-driver.h"
@@ -47,47 +43,24 @@
 #include "util/debug-util.h"
 #include "util/histogram-metric.h"
 #include "util/impalad-metrics.h"
-#include "util/metrics.h"
 #include "util/pretty-printer.h"
 #include "util/stopwatch.h"
 #include "util/string-util.h"
-#include "util/thread.h"
 #include "util/ticker.h"
 
 using namespace impala;
-using namespace impala::workload_management;
 using namespace std;
 
 DECLARE_bool(enable_workload_mgmt);
-DECLARE_string(query_log_table_name);
-DECLARE_string(query_log_table_location);
 DECLARE_int32(query_log_write_interval_s);
-DECLARE_int32(query_log_write_timeout_s);
 DECLARE_int32(query_log_max_queued);
 DECLARE_string(workload_mgmt_user);
-DECLARE_int32(query_log_max_sql_length);
-DECLARE_int32(query_log_max_plan_length);
 DECLARE_int32(query_log_shutdown_timeout_s);
 DECLARE_string(cluster_id);
 DECLARE_int32(query_log_max_insert_attempts);
-DECLARE_string(query_log_request_pool);
-DECLARE_string(query_log_table_props);
 
 namespace impala {
 
-/// Name of the database where all workload management tables will be stored.
-static const string DB = "sys";
-
-/// Default query options that will be provided on all queries that insert 
rows into the
-/// completed queries table. See the initialization code in the
-/// ImpalaServer::CompletedQueriesThread function for details on which options 
are set.
-static InternalServer::QueryOptionMap insert_query_opts;
-
-/// Non-values portion of the sql DML to insert records into the completed 
queries table.
-/// Generates the first portion of the DML that inserts records into the 
completed queries
-/// table. This portion of the statement is constant and thus is only 
generated once.
-static string _insert_dml;
-
 /// Determine if the maximum number of queued completed queries has been 
exceeded.
 ///
 /// Return:
@@ -99,78 +72,6 @@ static inline bool MaxRecordsExceeded(size_t record_count) 
noexcept {
   return FLAGS_query_log_max_queued > 0 && record_count > 
FLAGS_query_log_max_queued;
 } // function MaxRecordsExceeded
 
-/// Sets up the sys database generating and executing the necessary DML 
statements.
-static const Status SetupDb(InternalServer* server) {
-  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
-  RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
-      StrCat("CREATE DATABASE IF NOT EXISTS ", DB, " COMMENT "
-      "'System database for Impala introspection'"), insert_query_opts, 
false));
-  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
-  return Status::OK();
-} // function SetupDb
-
-/// Returns column name as lower-case to match common SQL style.
-static string GetColumnName(const FieldDefinition& field) {
-  std::string column_name = to_string(field.db_column);
-  boost::algorithm::to_lower(column_name);
-  return column_name;
-}
-
-/// Sets up the query table by generating and executing the necessary DML 
statements.
-static const Status SetupTable(InternalServer* server, const string& 
table_name,
-    bool is_system_table = false) {
-  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
-
-  StringStreamPop create_table_sql;
-  create_table_sql << "CREATE ";
-  // System tables do not have anything to purge, and must not be managed 
tables.
-  if (is_system_table) create_table_sql << "EXTERNAL ";
-  create_table_sql << "TABLE IF NOT EXISTS " << table_name << "(";
-
-  for (const auto& field : FIELD_DEFINITIONS) {
-    create_table_sql << GetColumnName(field) << " " << field.db_column_type;
-
-    if (field.db_column_type == TPrimitiveType::DECIMAL) {
-      create_table_sql << "(" << field.precision << "," << field.scale << ")";
-    }
-
-    create_table_sql << ",";
-  }
-  create_table_sql.move_back();
-
-  create_table_sql << ") ";
-
-  if (!is_system_table) {
-    create_table_sql << "PARTITIONED BY SPEC(identity(cluster_id), 
HOUR(start_time_utc)) "
-        << "STORED AS iceberg ";
-
-    if (!FLAGS_query_log_table_location.empty()) {
-      create_table_sql << "LOCATION '" << FLAGS_query_log_table_location << "' 
";
-    }
-  }
-
-  create_table_sql << "TBLPROPERTIES 
('schema_version'='1.0.0','format-version'='2'";
-
-  if (is_system_table) {
-    create_table_sql << ",'"
-                     << g_CatalogObjects_constants.TBL_PROP_SYSTEM_TABLE 
<<"'='true'";
-  } else if (!FLAGS_query_log_table_props.empty()) {
-    create_table_sql << "," << FLAGS_query_log_table_props;
-  }
-
-  create_table_sql << ")";
-
-  RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
-      create_table_sql.str(), insert_query_opts, false));
-
-  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
-
-  LOG(INFO) << "Completed " << table_name << " initialization. 
write_interval=\"" <<
-      FLAGS_query_log_write_interval_s << "s\"";
-
-  return Status::OK();
-} // function SetupTable
-
 /// Iterates through the list of field in `FIELDS_PARSERS` executing each 
parser for the
 /// given `QueryStateExpanded` object. This function builds the 
`FieldParserContext`
 /// object that is passed to each parser.
@@ -206,33 +107,16 @@ size_t ImpalaServer::NumLiveQueries() {
   return live_queries + completed_queries_.size();
 }
 
-Status ImpalaServer::InitWorkloadManagement() {
-  // Verify FIELD_DEFINITIONS includes all QueryTableColumns.
-  DCHECK_EQ(_TQueryTableColumn_VALUES_TO_NAMES.size(), 
FIELD_DEFINITIONS.size());
-  for (const auto& field : FIELD_DEFINITIONS) {
-    // Verify all fields match their column position.
-    DCHECK_EQ(FIELD_DEFINITIONS[field.db_column].db_column, field.db_column);
-  }
-
-  if (FLAGS_enable_workload_mgmt) {
-    return Thread::Create("impala-server", "completed-queries",
-      bind<void>(&ImpalaServer::CompletedQueriesThread, this),
-      &completed_queries_thread_);
-  }
-
-  return Status::OK();
-} // ImpalaServer::InitWorkloadManagement
-
 void ImpalaServer::ShutdownWorkloadManagement() {
-  unique_lock<mutex> l(completed_queries_threadstate_mu_);
+  unique_lock<mutex> l(workload_mgmt_threadstate_mu_);
   // If the completed queries thread is not yet running, then we don't need to 
give it a
   // chance to flush the in-memory queue to the completed queries table.
-  if (completed_queries_thread_state_ == RUNNING) {
-    completed_queries_thread_state_ = SHUTTING_DOWN;
+  if (workload_mgmt_thread_state_ == RUNNING) {
+    workload_mgmt_thread_state_ = SHUTTING_DOWN;
     completed_queries_cv_.notify_all();
     completed_queries_shutdown_cv_.wait_for(l,
         chrono::seconds(FLAGS_query_log_shutdown_timeout_s),
-        [this]{ return completed_queries_thread_state_ == SHUTDOWN; });
+        [this]{ return workload_mgmt_thread_state_ == SHUTDOWN; });
   }
 } // ImpalaServer::ShutdownWorkloadManagement
 
@@ -312,49 +196,22 @@ static string get_insert_prefix(const string& table_name) 
{
   StringStreamPop fields;
   fields << "INSERT INTO " << table_name << "(";
   for (const auto& field : FIELD_DEFINITIONS) {
-    fields << GetColumnName(field) << ",";
+    fields << field.db_column << ",";
   }
   fields.move_back();
   fields << ") VALUES ";
   return fields.str();
 }
 
-void ImpalaServer::CompletedQueriesThread() {
-  {
-    lock_guard<mutex> l(completed_queries_threadstate_mu_);
-    completed_queries_thread_state_ = INITIALIZING;
-  }
-
-  // Setup default query options.
-  insert_query_opts[TImpalaQueryOptions::TIMEZONE] = "UTC";
-  insert_query_opts[TImpalaQueryOptions::QUERY_TIMEOUT_S] = std::to_string(
-      FLAGS_query_log_write_timeout_s < 1 ?
-      FLAGS_query_log_write_interval_s : FLAGS_query_log_write_timeout_s);
-  if (!FLAGS_query_log_request_pool.empty()) {
-    insert_query_opts[TImpalaQueryOptions::REQUEST_POOL] = 
FLAGS_query_log_request_pool;
-  }
-
-  // Fully qualified table name based on startup flags.
-  const string log_table_name = StrCat(DB, ".", FLAGS_query_log_table_name);
-
-  // Non-values portion of the completed queries insert dml. Does not change 
across
-  // queries.
-  _insert_dml = get_insert_prefix(log_table_name);
-
-  // The initialization code only works when run in a separate thread for 
reasons unknown.
-  ABORT_IF_ERROR(SetupDb(internal_server_.get()));
-  ABORT_IF_ERROR(SetupTable(internal_server_.get(), log_table_name));
-  std::string live_table_name = to_string(TSystemTableName::IMPALA_QUERY_LIVE);
-  boost::algorithm::to_lower(live_table_name);
-  ABORT_IF_ERROR(SetupTable(internal_server_.get(),
-      StrCat(DB, ".", live_table_name), true));
+void ImpalaServer::WorkloadManagementWorker(
+    InternalServer::QueryOptionMap& insert_query_opts, const string 
log_table_name) {
 
   {
-    lock_guard<mutex> l(completed_queries_threadstate_mu_);
+    lock_guard<mutex> l(workload_mgmt_threadstate_mu_);
     // This condition will evaluate to false only if a clean shutdown was 
initiated while
     // the previous function was running.
-    if (LIKELY(completed_queries_thread_state_ == INITIALIZING)) {
-      completed_queries_thread_state_ = RUNNING;
+    if (LIKELY(workload_mgmt_thread_state_ == INITIALIZING)) {
+      workload_mgmt_thread_state_ = RUNNING;
     } else {
       return; // Note: early return
     }
@@ -365,15 +222,19 @@ void ImpalaServer::CompletedQueriesThread() {
         "completed-queries-ticker"));
   }
 
+  // Non-values portion of the sql DML to insert records into the completed 
queries
+  // tables. This portion of the statement is constant and thus is only 
generated once.
+  const string insert_dml_prefix = get_insert_prefix(log_table_name);
+
   while (true) {
     // Exit this thread if a shutdown was initiated.
     {
-      lock_guard<mutex> l(completed_queries_threadstate_mu_);
+      lock_guard<mutex> l(workload_mgmt_threadstate_mu_);
 
-      DCHECK(completed_queries_thread_state_ != SHUTDOWN);
+      DCHECK(workload_mgmt_thread_state_ != SHUTDOWN);
 
-      if (UNLIKELY(completed_queries_thread_state_ == SHUTTING_DOWN)) {
-        completed_queries_thread_state_ = SHUTDOWN;
+      if (UNLIKELY(workload_mgmt_thread_state_ == SHUTTING_DOWN)) {
+        workload_mgmt_thread_state_ = SHUTDOWN;
         completed_queries_shutdown_cv_.notify_all();
         return; // Note: early return
       }
@@ -385,13 +246,13 @@ void ImpalaServer::CompletedQueriesThread() {
     unique_lock<mutex> l(completed_queries_lock_);
     completed_queries_cv_.wait(l,
         [this]{
-          lock_guard<mutex> l2(completed_queries_threadstate_mu_);
+          lock_guard<mutex> l2(workload_mgmt_threadstate_mu_);
           // To guard against spurious wakeups, this predicate ensures there 
are completed
           // queries queued up before waking up the thread.
           return (completed_queries_ticker_->WakeupGuard()()
               && !completed_queries_.empty())
               || MaxRecordsExceeded(completed_queries_.size())
-              || UNLIKELY(completed_queries_thread_state_ == SHUTTING_DOWN);
+              || UNLIKELY(workload_mgmt_thread_state_ == SHUTTING_DOWN);
         });
     completed_queries_ticker_->ResetWakeupGuard();
 
@@ -447,7 +308,7 @@ void ImpalaServer::CompletedQueriesThread() {
 
     // Remove the last comma and determine the final sql statement length.
     sql.pop_back();
-    const size_t final_sql_len = _insert_dml.size() + sql.size();
+    const size_t final_sql_len = insert_dml_prefix.size() + sql.size();
 
     uint64_t gather_time = timer.Reset();
     TUniqueId tmp_query_id;
@@ -473,7 +334,7 @@ void ImpalaServer::CompletedQueriesThread() {
 
     // Execute the insert dml.
     const Status ret_status = internal_server_->ExecuteIgnoreResults(
-        FLAGS_workload_mgmt_user, StrCat(_insert_dml, sql), opts, false,
+        FLAGS_workload_mgmt_user, StrCat(insert_dml_prefix, sql), opts, false,
         &tmp_query_id);
 
     uint64_t exec_time = timer.ElapsedTime();
@@ -504,7 +365,7 @@ void ImpalaServer::CompletedQueriesThread() {
       completed_queries_lock_.unlock();
     }
   }
-} // ImpalaServer::CompletedQueriesThread
+} // ImpalaServer::WorkloadManagementWorker
 
 vector<shared_ptr<QueryStateExpanded>> ImpalaServer::GetCompletedQueries() {
   lock_guard<mutex> l(completed_queries_lock_);
diff --git a/be/src/service/workload-management.h 
b/be/src/service/workload-management.h
index 8e196249d..868cebebd 100644
--- a/be/src/service/workload-management.h
+++ b/be/src/service/workload-management.h
@@ -22,17 +22,14 @@
 #include <string>
 #include <utility>
 
-#include <gflags/gflags.h>
-
 #include "gen-cpp/SystemTables_types.h"
-#include "gen-cpp/Types_types.h"
+#include "kudu/util/version_util.h"
 #include "service/query-state-record.h"
 #include "util/string-util.h"
+#include "util/version-util.h"
 
 namespace impala {
 
-namespace workload_management {
-
 /// Struct defining the context for generating the sql DML that inserts 
records into the
 /// completed queries table.
 struct FieldParserContext {
@@ -44,6 +41,10 @@ struct FieldParserContext {
       StringStreamPop& s) : record(rec), cluster_id(cluster_id), sql(s) {}
 }; // struct FieldParserContext
 
+/// Constants for all possible schema versions.
+const kudu::Version NO_TABLE_EXISTS = constructVersion(0, 0, 0);
+const kudu::Version VERSION_1_0_0 = constructVersion(1, 0, 0);
+
 /// Type of a function that retrieves one piece of information from the 
context and writes
 /// it to the SQL statement that inserts rows into the completed queries table.
 using FieldParser = void (*)(FieldParserContext&);
@@ -51,17 +52,31 @@ using FieldParser = void (*)(FieldParserContext&);
 /// Contains all necessary information for the definition and parsing of a 
single field
 /// in workload management.
 struct FieldDefinition {
-  const TQueryTableColumn::type db_column;
-  const TPrimitiveType::type db_column_type;
-  const FieldParser parser;
-  const int16_t precision;
-  const int16_t scale;
-
-  FieldDefinition(const TQueryTableColumn::type db_col,
-      const TPrimitiveType::type db_col_type, const FieldParser fp,
-      const int16_t precision = 0, const int16_t scale = 0) :
-      db_column(std::move(db_col)), db_column_type(std::move(db_col_type)),
-      parser(std::move(fp)), precision(precision), scale(scale) {}
+  public:
+    // Name of the database column.
+    const TQueryTableColumn::type db_column;
+
+    // Type of the database column.
+    const TPrimitiveType::type db_column_type;
+
+    // Function that will extract the column value from the provided 
FieldParseContext and
+    // will write that value into a sql statement,
+    const FieldParser parser;
+
+    // Specifies the first schema version where the column appears.
+    const kudu::Version schema_version;
+
+    // When column type is decimal, specifies the precision and scale for the 
column.
+    const int16_t precision;
+    const int16_t scale;
+
+    FieldDefinition(const TQueryTableColumn::type db_col,
+        const TPrimitiveType::type db_col_type, const FieldParser fp,
+        const kudu::Version schema_ver, const int16_t precision = 0,
+        const int16_t scale = 0) :
+        db_column(std::move(db_col)), db_column_type(std::move(db_col_type)),
+        parser(std::move(fp)), schema_version(std::move(schema_ver)),
+        precision(precision), scale(scale) { }
 }; // struct FieldDefinition
 
 /// Number of query table columns
@@ -100,6 +115,4 @@ struct CompletedQuery {
   }
 };
 
-} //namespace workload_management
-
 } // namespace impala
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index dfd51a919..16a6e0fe9 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -113,6 +113,7 @@ set(UTIL_SRCS
   tuple-row-compare.cc
   uid-util.cc
   url-parser.cc
+  version-util.cc
   ${SQUEASEL_SRC_DIR}/squeasel.c
   webserver.cc
   zip-util.cc
@@ -235,6 +236,7 @@ add_library(UtilTests STATIC
   time-test.cc
   tuple-row-compare-test.cc
   uid-util-test.cc
+  version-util-test.cc
   zip-util-test.cc
 )
 add_dependencies(UtilTests gen-deps)
@@ -299,6 +301,7 @@ ADD_UNIFIED_BE_LSAN_TEST(ticker-test "TickerTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(time-test "TimeTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(tuple-row-compare-test "TupleRowCompareTest.*")
 ADD_UNIFIED_BE_LSAN_TEST(uid-util-test "UidUtil.*")
+ADD_UNIFIED_BE_LSAN_TEST(version-util-test "VersionUtilTest.*")
 # Using standalone webserver-test for now, nonstandard main() passes in a port.
 ADD_BE_LSAN_TEST(webserver-test)
 TARGET_LINK_LIBRARIES(webserver-test mini_kdc)
diff --git a/be/src/util/version-util-test.cc b/be/src/util/version-util-test.cc
new file mode 100644
index 000000000..6761ed8c3
--- /dev/null
+++ b/be/src/util/version-util-test.cc
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/version-util.h"
+
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/version_util.h"
+
+namespace kudu {
+
+// Assert the overloaded equality operators '==' and '!='.
+TEST(VersionUtilTest, EqualityOperators) {
+  Version lhs;
+  Version rhs;
+
+  ASSERT_OK(ParseVersion("1.0.0", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0", &rhs));
+  ASSERT_TRUE(lhs == rhs);
+  ASSERT_FALSE(lhs != rhs);
+
+  ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &rhs));
+  ASSERT_TRUE(lhs == rhs);
+  ASSERT_FALSE(lhs != rhs);
+
+  ASSERT_OK(ParseVersion("1.1.0", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0", &rhs));
+  ASSERT_FALSE(lhs == rhs);
+  ASSERT_TRUE(lhs != rhs);
+
+  ASSERT_OK(ParseVersion("1.0.1", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0", &rhs));
+  ASSERT_FALSE(lhs == rhs);
+  ASSERT_TRUE(lhs != rhs);
+
+  ASSERT_OK(ParseVersion("1.0.1-SNAPSHOT", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0-RELEASE", &rhs));
+  ASSERT_FALSE(lhs == rhs);
+  ASSERT_TRUE(lhs != rhs);
+
+  ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0", &rhs));
+  ASSERT_FALSE(lhs == rhs);
+  ASSERT_TRUE(lhs != rhs);
+}
+
+// Assert the overloaded less than,and less than or equal to, and greater than 
operators.
+TEST(VersionUtilTest, OperatorLessThan) {
+  Version lhs;
+  Version rhs;
+
+  ASSERT_OK(ParseVersion("2.0.0", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0", &rhs));
+  ASSERT_FALSE(lhs < rhs);
+  ASSERT_FALSE(lhs <= rhs);
+  ASSERT_TRUE(lhs > rhs);
+
+  ASSERT_OK(ParseVersion("1.0.0", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0", &rhs));
+  ASSERT_FALSE(lhs < rhs);
+  ASSERT_TRUE(lhs <= rhs);
+  ASSERT_FALSE(lhs > rhs);
+
+  ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &rhs));
+  ASSERT_FALSE(lhs < rhs);
+  ASSERT_TRUE(lhs <= rhs);
+  ASSERT_FALSE(lhs > rhs);
+
+  ASSERT_OK(ParseVersion("1.0.0", &lhs));
+  ASSERT_OK(ParseVersion("2.0.0", &rhs));
+  ASSERT_TRUE(lhs < rhs);
+  ASSERT_TRUE(lhs <= rhs);
+  ASSERT_FALSE(lhs > rhs);
+
+  ASSERT_OK(ParseVersion("1.0.0", &lhs));
+  ASSERT_OK(ParseVersion("1.1.0", &rhs));
+  ASSERT_TRUE(lhs < rhs);
+  ASSERT_TRUE(lhs <= rhs);
+  ASSERT_FALSE(lhs > rhs);
+
+  ASSERT_OK(ParseVersion("1.0.0", &lhs));
+  ASSERT_OK(ParseVersion("1.0.1", &rhs));
+  ASSERT_TRUE(lhs < rhs);
+  ASSERT_TRUE(lhs <= rhs);
+  ASSERT_FALSE(lhs > rhs);
+
+  ASSERT_OK(ParseVersion("1.0.0-RELEASE", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0", &rhs));
+  ASSERT_TRUE(lhs < rhs);
+  ASSERT_TRUE(lhs <= rhs);
+  ASSERT_FALSE(lhs > rhs);
+
+  ASSERT_OK(ParseVersion("1.0.0", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0-RELEASE", &rhs));
+  ASSERT_FALSE(lhs < rhs);
+  ASSERT_FALSE(lhs <= rhs);
+  ASSERT_TRUE(lhs > rhs);
+
+  // Asserts the example provided in the comments in version-util.h.
+  // 1.0.0-RELEASE, 1.0.0-SNAPSHOT, 1.0.0, 1.0.1-SNAPSHOT
+  ASSERT_OK(ParseVersion("1.0.0-RELEASE", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &rhs));
+  ASSERT_TRUE(lhs < rhs);
+
+  ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &lhs));
+  ASSERT_OK(ParseVersion("1.0.0", &rhs));
+  ASSERT_TRUE(lhs < rhs);
+
+  ASSERT_OK(ParseVersion("1.0.0", &lhs));
+  ASSERT_OK(ParseVersion("1.0.1-SNAPSHOT", &rhs));
+  ASSERT_TRUE(lhs < rhs);
+}
+
+} // namespace impala
diff --git a/be/src/util/version-util.cc b/be/src/util/version-util.cc
new file mode 100644
index 000000000..6982540e3
--- /dev/null
+++ b/be/src/util/version-util.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/version-util.h"
+
+#include "kudu/util/version_util.h"
+
+namespace kudu {
+
+bool operator<(const Version& lhs, const Version& rhs) {
+  bool num_compare = lhs.major < rhs.major || lhs.minor < rhs.minor
+      || lhs.maintenance < rhs.maintenance;
+
+  if (num_compare) {
+    return true;
+  }
+
+  if (lhs.extra_delimiter == rhs.extra_delimiter) {
+    return lhs.extra < rhs.extra;
+  }
+
+  return lhs.extra_delimiter.has_value();
+}
+
+bool operator<=(const Version& lhs, const Version& rhs) {
+  return lhs == rhs || lhs < rhs;
+}
+
+bool operator>(const Version& lhs, const Version& rhs) {
+  return !(lhs <= rhs);
+}
+
+bool operator!=(const Version& lhs, const Version& rhs) {
+  return !(lhs == rhs);
+}
+
+} // namespace kudu
+
+namespace impala {
+
+kudu::Version constructVersion(int maj, int min, int maint) {
+  kudu::Version v;
+
+  v.major = maj;
+  v.minor = min;
+  v.maintenance = maint;
+
+  return v;
+}
+
+} // namespace impala
diff --git a/be/src/util/version-util.h b/be/src/util/version-util.h
new file mode 100644
index 000000000..7175c5626
--- /dev/null
+++ b/be/src/util/version-util.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/util/version_util.h"
+
+namespace kudu {
+
+// Adds additional comparison operators to the kudu::Version class.
+
+// Compare two Version objects. Versions that contain an extra component sort 
before
+// (less than) versions that do not contain an extra component. The extra 
component is
+// sorted alphabetically without modifying the case. Thus '-SNAPSHOT' sorts as 
greater
+// than '-RELEASE'.
+// Example sort order (from least to greatest):
+//   1.0.0-RELEASE, 1.0.0-SNAPSHOT, 1.0.0, 1.0.1-SNAPSHOT.
+bool operator<(const Version& lhs, const Version& rhs);
+bool operator<=(const Version& lhs, const Version& rhs);
+bool operator>(const Version& lhs, const Version& rhs);
+bool operator!=(const Version& lhs, const Version& rhs);
+
+} // namespace kudu
+
+namespace impala {
+
+// Constructor function that allows for setting the values of some struct 
members.
+kudu::Version constructVersion(int maj, int min, int maint);
+
+}
diff --git a/tests/custom_cluster/test_query_live.py 
b/tests/custom_cluster/test_query_live.py
index 8820f4270..3a2595cd0 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -37,12 +37,8 @@ class TestQueryLive(CustomClusterTestSuite):
 
   def setup_method(self, method):
     super(TestQueryLive, self).setup_method(method)
-    create_match = self.assert_impalad_log_contains("INFO", 
r'\]\s+(\w+:\w+)\]\s+'
-        r'Analyzing query: CREATE EXTERNAL TABLE IF NOT EXISTS 
sys.impala_query_live',
-        timeout_s=60)
-    self.assert_impalad_log_contains("INFO", r'Query successfully 
unregistered: '
-        r'query_id={}'.format(create_match.group(1)),
-        timeout_s=60)
+    self.assert_impalad_log_contains("INFO", r'Completed workload management '
+        r'initialization', timeout_s=120)
 
   def assert_describe_extended(self):
     describe_ext_result = self.execute_query('describe extended 
sys.impala_query_live')
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index 037104ca3..bce0a0eab 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -59,12 +59,8 @@ class TestQueryLogTableBase(CustomClusterTestSuite):
     # These tests run very quickly and can actually complete before Impala has 
finished
     # creating the completed queries table. Thus, to make these tests more 
robust, this
     # code checks to make sure the table create has finished before returning.
-    create_match = self.assert_impalad_log_contains("INFO", 
r'\]\s+(\w+:\w+)\]\s+'
-        r'Analyzing query: CREATE TABLE IF NOT EXISTS 
{}'.format(self.QUERY_TBL),
-        timeout_s=60)
-    self.assert_impalad_log_contains("INFO", r'Query successfully 
unregistered: '
-        r'query_id={}'.format(create_match.group(1)),
-        timeout_s=60)
+    self.assert_impalad_log_contains("INFO", r'Completed workload management '
+        r'initialization', timeout_s=120)
 
   def get_client(self, protocol):
     """Retrieves the default Impala client for the specified protocol. This 
client is
@@ -314,7 +310,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                     impalad_graceful_shutdown=True)
   def test_flush_on_queued_count_exceeded(self, vector):
     """Asserts that queries that have completed are written to the query log 
table when
-       the maximum number of queued records it reached. Also verifies that 
writing
+       the maximum number of queued records is reached. Also verifies that 
writing
        completed queries is not limited by default 
statement_expression_limit."""
 
     impalad = self.cluster.get_first_impalad()

Reply via email to