This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 77a87bb10 IMPALA-12737: Refactor the Workload Management
Initialization Process.
77a87bb10 is described below
commit 77a87bb103362ebafb0624f95d1a413417763d66
Author: jasonmfehr <[email protected]>
AuthorDate: Thu Aug 8 11:15:33 2024 -0700
IMPALA-12737: Refactor the Workload Management Initialization Process.
The workload management initialization process creates the two tables
"sys.impala_query_log" and "sys.impala_query_live" during coordinator
startup.
The current design for this init process is to create both tables on
each coordinator at every startup by running create database and
create table if not exists DDLs. This design causes unnecessary DDLs
to execute which delays coordinator startup and introduces the
potential for unnecessary startup failures should the DDLs fail.
This patch splits the initialization code into its own file and adds
version tracking to the individual fields in the workload management
tables. This patch also adds schema version checks on the workload
management tables and only runs DDLs for the db tables if necessary.
Additionally, versioning of workload management table schemas is
introduced. The only allowed schema version in this patch is 1.0.0.
Future patches that need to modify the workload management table
schema will expand this list of allowed versions.
Since this patch is a refactor and does not change functionality,
testing was accomplished by running existing workload management
unit and python tests.
Change-Id: Id645f94c8da73b91c13a23d7ac0ea026425f0f96
Reviewed-on: http://gerrit.cloudera.org:8080/21653
Reviewed-by: Riza Suminto <[email protected]>
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/service/CMakeLists.txt | 3 +
be/src/service/impala-server.cc | 4 +-
be/src/service/impala-server.h | 22 +-
be/src/service/workload-management-fields.cc | 106 +++++-----
be/src/service/workload-management-flags.cc | 7 +
be/src/service/workload-management-init.cc | 290 +++++++++++++++++++++++++++
be/src/service/workload-management-test.cc | 132 ++++++++++++
be/src/service/workload-management.cc | 185 +++--------------
be/src/service/workload-management.h | 49 +++--
be/src/util/CMakeLists.txt | 3 +
be/src/util/version-util-test.cc | 130 ++++++++++++
be/src/util/version-util.cc | 65 ++++++
be/src/util/version-util.h | 44 ++++
tests/custom_cluster/test_query_live.py | 8 +-
tests/custom_cluster/test_query_log.py | 10 +-
15 files changed, 798 insertions(+), 260 deletions(-)
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index 7f5206d3d..c8e386df1 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -45,6 +45,7 @@ add_library(Service
query-result-set.cc
query-state-record.cc
workload-management.cc
+ workload-management-init.cc
workload-management-fields.cc
workload-management-flags.cc
)
@@ -140,6 +141,7 @@ add_library(ServiceTests STATIC
impala-server-test.cc
query-options-test.cc
query-state-record-test.cc
+ workload-management-test.cc
)
add_dependencies(ServiceTests gen-deps)
@@ -149,4 +151,5 @@ ADD_UNIFIED_BE_LSAN_TEST(hs2-util-test
"StitchNullsTest.*:PrintTColumnValueTest.
ADD_UNIFIED_BE_LSAN_TEST(query-options-test QueryOptions.*)
ADD_UNIFIED_BE_LSAN_TEST(impala-server-test ImpalaServerTest.*)
ADD_UNIFIED_BE_LSAN_TEST(query-state-record-test QueryStateRecordTest.*)
+ADD_UNIFIED_BE_LSAN_TEST(workload-management-test WorkloadManagementTest.*)
ADD_BE_LSAN_TEST(internal-server-test)
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4a77680a3..126f3535b 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -3217,7 +3217,9 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t
hs2_port,
internal_server_ = shared_from_this();
- RETURN_IF_ERROR(InitWorkloadManagement());
+ ABORT_IF_ERROR(Thread::Create("impala-server", "completed-queries",
+ bind<void>(&ImpalaServer::InitWorkloadManagement, this),
+ &workload_management_thread_));
}
LOG(INFO) << "Initialized coordinator/executor Impala server on "
<<
TNetworkAddressToString(exec_env_->configured_backend_address());
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 197fb3330..f2b86c329 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -1148,9 +1148,11 @@ class ImpalaServer : public ImpalaServiceIf,
/// current query ids to the admissiond.
[[noreturn]] void AdmissionHeartbeatThread();
- /// If workload management is enabled, starts workload management threads.
- /// (implemented in workload-management.cc)
- Status InitWorkloadManagement();
+ /// Checks if workload management is enabled, and starts the init process if
it is
+ /// enabled. Does not return until coordinator shutdown. Returns immediately
if
+ /// workload management is not enabled.
+ /// (implemented in workload-management-init.cc)
+ void InitWorkloadManagement();
/// Blocks until running workload management threads are shut down.
/// (implemented in workload-management.cc)
@@ -1158,7 +1160,8 @@ class ImpalaServer : public ImpalaServiceIf,
/// Periodically writes out completed queries (if configured)
/// (implemented in workload-management.cc)
- void CompletedQueriesThread();
+ void WorkloadManagementWorker(InternalServer::QueryOptionMap&
insert_query_opts,
+ const std::string log_table_name);
/// Returns a list of completed queries that have not yet been written to
storage.
/// Acquires completed_queries_lock_ to make a copy of completed_queries_
state.
@@ -1693,12 +1696,11 @@ class ImpalaServer : public ImpalaServiceIf,
/// Tracks the state of the thread that drains the completed queries queue
to the table.
/// The associated lock must be held before reading/modifying this variable.
- impala::workload_management::ThreadState completed_queries_thread_state_ =
- impala::workload_management::NOT_STARTED;
- std::mutex completed_queries_threadstate_mu_;
+ ThreadState workload_mgmt_thread_state_ = NOT_STARTED;
+ std::mutex workload_mgmt_threadstate_mu_;
- /// Thread that runs CompletedQueriesThread().
- std::unique_ptr<Thread> completed_queries_thread_;
+ /// Thread that runs Workload Management.
+ std::unique_ptr<Thread> workload_management_thread_;
/// Ticker that wakes up the completed_queried_thread at set intervals to
process the
/// queued completed queries. Uses the completed_queries_lock_ to synchonize
access to
@@ -1706,7 +1708,7 @@ class ImpalaServer : public ImpalaServiceIf,
std::unique_ptr<TickerSecondsBool> completed_queries_ticker_;
/// Queue of completed queries and the lock to synchronize access to it.
- std::list<impala::workload_management::CompletedQuery> completed_queries_;
+ std::list<CompletedQuery> completed_queries_;
std::mutex completed_queries_lock_;
};
diff --git a/be/src/service/workload-management-fields.cc
b/be/src/service/workload-management-fields.cc
index 4cff8d2c6..cdac6d945 100644
--- a/be/src/service/workload-management-fields.cc
+++ b/be/src/service/workload-management-fields.cc
@@ -25,18 +25,16 @@
#include <algorithm>
#include <string>
-#include <utility>
-#include <boost/algorithm/string.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gutil/strings/substitute.h>
#include "common/compiler-util.h"
+#include "gen-cpp/SystemTables_types.h"
#include "gen-cpp/Types_types.h"
#include "runtime/exec-env.h"
#include "service/query-options.h"
-#include "service/query-state-record.h"
#include "util/debug-util.h"
#include "util/network-util.h"
#include "util/sql-util.h"
@@ -49,8 +47,6 @@ using strings::Substitute;
namespace impala {
-namespace workload_management {
-
/// Helper type for event timeline timestamp functions.
using _event_compare_pred = function<bool(const string& comp)>;
@@ -98,25 +94,25 @@ const array<FieldDefinition, NumQueryTableColumns>
FIELD_DEFINITIONS{{
FieldDefinition(TQueryTableColumn::CLUSTER_ID, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.cluster_id << "'";
- }),
+ }, VERSION_1_0_0),
// Query Id
FieldDefinition(TQueryTableColumn::QUERY_ID, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << PrintId(ctx.record->base_state->id) << "'";
- }),
+ }, VERSION_1_0_0),
// Session Id
FieldDefinition(TQueryTableColumn::SESSION_ID, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << PrintId(ctx.record->session_id) << "'";
- }),
+ }, VERSION_1_0_0),
// Session Type
FieldDefinition(TQueryTableColumn::SESSION_TYPE, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->session_type << "'";
- }),
+ }, VERSION_1_0_0),
// Hiveserver2 Protocol Version
FieldDefinition(TQueryTableColumn::HIVESERVER2_PROTOCOL_VERSION,
@@ -126,32 +122,32 @@ const array<FieldDefinition, NumQueryTableColumns>
FIELD_DEFINITIONS{{
ctx.sql << ctx.record->hiveserver2_protocol_version;
}
ctx.sql << "'";
- }),
+ }, VERSION_1_0_0),
// Effective User
FieldDefinition(TQueryTableColumn::DB_USER, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->base_state->effective_user << "'";
- }),
+ }, VERSION_1_0_0),
// DB User
FieldDefinition(TQueryTableColumn::DB_USER_CONNECTION,
TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->db_user_connection << "'";
- }),
+ }, VERSION_1_0_0),
// Default DB
FieldDefinition(TQueryTableColumn::DB_NAME, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->base_state->default_db << "'";
- }),
+ }, VERSION_1_0_0),
// Impala Coordinator
FieldDefinition(TQueryTableColumn::IMPALA_COORDINATOR,
TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" <<TNetworkAddressToString(
ExecEnv::GetInstance()->configured_backend_address()) << "'";
- }),
+ }, VERSION_1_0_0),
// Query Status
FieldDefinition(TQueryTableColumn::QUERY_STATUS, TPrimitiveType::STRING,
@@ -163,31 +159,31 @@ const array<FieldDefinition, NumQueryTableColumns>
FIELD_DEFINITIONS{{
ctx.sql <<
EscapeSql(ctx.record->base_state->query_status.msg().msg());
}
ctx.sql << "'";
- }),
+ }, VERSION_1_0_0),
// Query State
FieldDefinition(TQueryTableColumn::QUERY_STATE, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->base_state->query_state << "'";
- }),
+ }, VERSION_1_0_0),
// Impala Query End State
FieldDefinition(TQueryTableColumn::IMPALA_QUERY_END_STATE,
TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->impala_query_end_state << "'";
- }),
+ }, VERSION_1_0_0),
// Query Type
FieldDefinition(TQueryTableColumn::QUERY_TYPE, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->base_state->stmt_type << "'";
- }),
+ }, VERSION_1_0_0),
// Client Network Address
FieldDefinition(TQueryTableColumn::NETWORK_ADDRESS, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" <<
TNetworkAddressToString(ctx.record->client_address) << "'";
- }),
+ }, VERSION_1_0_0),
// Query Start Time in UTC
// Required
@@ -195,39 +191,39 @@ const array<FieldDefinition, NumQueryTableColumns>
FIELD_DEFINITIONS{{
[](FieldParserContext& ctx){
ctx.sql << "UNIX_MICROS_TO_UTC_TIMESTAMP(" <<
ctx.record->base_state->start_time_us << ")";
- }),
+ }, VERSION_1_0_0),
// Query Duration
FieldDefinition(TQueryTableColumn::TOTAL_TIME_MS, TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_decimal(ctx, (ctx.record->base_state->end_time_us -
ctx.record->base_state->start_time_us), MICROS_TO_MILLIS);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Query Options set by Configuration
FieldDefinition(TQueryTableColumn::QUERY_OPTS_CONFIG,
TPrimitiveType::STRING,
[](FieldParserContext& ctx){
const string opts_str = DebugQueryOptions(ctx.record->query_options);
ctx.sql << "'" << EscapeSql(opts_str) << "'";
- }),
+ }, VERSION_1_0_0),
// Resource Pool
FieldDefinition(TQueryTableColumn::RESOURCE_POOL, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << EscapeSql(ctx.record->base_state->resource_pool)
<< "'";
- }),
+ }, VERSION_1_0_0),
// Per-host Memory Estimate
FieldDefinition(TQueryTableColumn::PER_HOST_MEM_ESTIMATE,
TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->per_host_mem_estimate;
- }),
+ }, VERSION_1_0_0),
// Dedicated Coordinator Memory Estimate
FieldDefinition(TQueryTableColumn::DEDICATED_COORD_MEM_ESTIMATE,
TPrimitiveType::BIGINT, [](FieldParserContext& ctx){
ctx.sql << ctx.record->dedicated_coord_mem_estimate;
- }),
+ }, VERSION_1_0_0),
// Per-Host Fragment Instances
FieldDefinition(TQueryTableColumn::PER_HOST_FRAGMENT_INSTANCES,
@@ -243,7 +239,7 @@ const array<FieldDefinition, NumQueryTableColumns>
FIELD_DEFINITIONS{{
}
ctx.sql << "'";
- }),
+ }, VERSION_1_0_0),
// Backends Count
FieldDefinition(TQueryTableColumn::BACKENDS_COUNT, TPrimitiveType::INT,
@@ -253,133 +249,133 @@ const array<FieldDefinition, NumQueryTableColumns>
FIELD_DEFINITIONS{{
} else {
ctx.sql << ctx.record->per_host_state.size();
}
- }),
+ }, VERSION_1_0_0),
// Admission Result
FieldDefinition(TQueryTableColumn::ADMISSION_RESULT,
TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->admission_result << "'";
- }),
+ }, VERSION_1_0_0),
// Cluster Memory Admitted
FieldDefinition(TQueryTableColumn::CLUSTER_MEMORY_ADMITTED,
TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->base_state->cluster_mem_est;
- }),
+ }, VERSION_1_0_0),
// Executor Group
FieldDefinition(TQueryTableColumn::EXECUTOR_GROUP, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << ctx.record->executor_group << "'";
- }),
+ }, VERSION_1_0_0),
// Executor Groups
FieldDefinition(TQueryTableColumn::EXECUTOR_GROUPS, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << EscapeSql(ctx.record->executor_groups) << "'";
- }),
+ }, VERSION_1_0_0),
// Exec Summary (also known as the operator summary)
FieldDefinition(TQueryTableColumn::EXEC_SUMMARY, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << EscapeSql(ctx.record->exec_summary) << "'";
- }),
+ }, VERSION_1_0_0),
// Number of rows fetched
FieldDefinition(TQueryTableColumn::NUM_ROWS_FETCHED,
TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->base_state->num_rows_fetched;
- }),
+ }, VERSION_1_0_0),
// Row Materialization Rate
FieldDefinition(TQueryTableColumn::ROW_MATERIALIZATION_ROWS_PER_SEC,
TPrimitiveType::BIGINT, [](FieldParserContext& ctx){
ctx.sql << ctx.record->row_materialization_rate;
- }),
+ }, VERSION_1_0_0),
// Row Materialization Time
FieldDefinition(TQueryTableColumn::ROW_MATERIALIZATION_TIME_MS,
TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
_write_decimal(ctx, ctx.record->row_materialization_time,
NANOS_TO_MILLIS);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Compressed Bytes Spilled to Disk
FieldDefinition(TQueryTableColumn::COMPRESSED_BYTES_SPILLED,
TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->compressed_bytes_spilled;
- }),
+ }, VERSION_1_0_0),
// Events Timeline Planning Finished
FieldDefinition(TQueryTableColumn::EVENT_PLANNING_FINISHED,
TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, PLANNING_FINISHED);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Submit for Admission
FieldDefinition(TQueryTableColumn::EVENT_SUBMIT_FOR_ADMISSION,
TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
_write_event(ctx, SUBMIT_FOR_ADMISSION);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Completed Admission
FieldDefinition(TQueryTableColumn::EVENT_COMPLETED_ADMISSION,
TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
_write_event(ctx, COMPLETED_ADMISSION);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline All Execution Backends Started
FieldDefinition(TQueryTableColumn::EVENT_ALL_BACKENDS_STARTED,
TPrimitiveType::DECIMAL, [](FieldParserContext& ctx){
_write_event(ctx, ALL_BACKENDS_STARTED);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Rows Available
FieldDefinition(TQueryTableColumn::EVENT_ROWS_AVAILABLE,
TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, ROWS_AVAILABLE);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline First Row Fetched
FieldDefinition(TQueryTableColumn::EVENT_FIRST_ROW_FETCHED,
TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, FIRST_ROW_FETCHED);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Last Row Fetched
FieldDefinition(TQueryTableColumn::EVENT_LAST_ROW_FETCHED,
TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, LAST_ROW_FETCHED);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Events Timeline Unregister Query
FieldDefinition(TQueryTableColumn::EVENT_UNREGISTER_QUERY,
TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_event(ctx, UNREGISTER_QUERY);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Read IO Wait Time Total
FieldDefinition(TQueryTableColumn::READ_IO_WAIT_TOTAL_MS,
TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_decimal(ctx, ctx.record->read_io_wait_time_total,
NANOS_TO_MILLIS);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Read IO Wait Time Mean
FieldDefinition(TQueryTableColumn::READ_IO_WAIT_MEAN_MS,
TPrimitiveType::DECIMAL,
[](FieldParserContext& ctx){
_write_decimal(ctx, ctx.record->read_io_wait_time_mean,
NANOS_TO_MILLIS);
- }, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
+ }, VERSION_1_0_0, DURATION_DECIMAL_PRECISION, DURATION_DECIMAL_SCALE),
// Bytes Read from the Data Cache Total
FieldDefinition(TQueryTableColumn::BYTES_READ_CACHE_TOTAL,
TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->bytes_read_cache_total;
- }),
+ }, VERSION_1_0_0),
// Bytes Read Total
FieldDefinition(TQueryTableColumn::BYTES_READ_TOTAL,
TPrimitiveType::BIGINT,
[](FieldParserContext& ctx){
ctx.sql << ctx.record->bytes_read_total;
- }),
+ }, VERSION_1_0_0),
// Per-Node Peak Memory Usage Min
FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MIN,
TPrimitiveType::BIGINT,
@@ -392,7 +388,7 @@ const array<FieldDefinition, NumQueryTableColumns>
FIELD_DEFINITIONS{{
} else {
ctx.sql << 0;
}
- }),
+ }, VERSION_1_0_0),
// Per-Node Peak Memory Usage Max
FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MAX,
TPrimitiveType::BIGINT,
@@ -405,7 +401,7 @@ const array<FieldDefinition, NumQueryTableColumns>
FIELD_DEFINITIONS{{
} else {
ctx.sql << max_elem->second.peak_memory_usage;
}
- }),
+ }, VERSION_1_0_0),
// Per-Node Peak Memory Usage Mean
FieldDefinition(TQueryTableColumn::PERNODE_PEAK_MEM_MEAN,
TPrimitiveType::BIGINT,
@@ -421,14 +417,14 @@ const array<FieldDefinition, NumQueryTableColumns>
FIELD_DEFINITIONS{{
}
ctx.sql << calc_mean;
- }),
+ }, VERSION_1_0_0),
// SQL Statement
FieldDefinition(TQueryTableColumn::SQL, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" <<
EscapeSql(ctx.record->redacted_sql,
FLAGS_query_log_max_sql_length) << "'";
- }),
+ }, VERSION_1_0_0),
// Query Plan
FieldDefinition(TQueryTableColumn::PLAN, TPrimitiveType::STRING,
@@ -436,16 +432,14 @@ const array<FieldDefinition, NumQueryTableColumns>
FIELD_DEFINITIONS{{
ctx.sql << "'"
<< EscapeSql(ctx.record->base_state->plan,
FLAGS_query_log_max_plan_length)
<< "'";
- }),
+ }, VERSION_1_0_0),
// Tables Queried
FieldDefinition(TQueryTableColumn::TABLES_QUERIED, TPrimitiveType::STRING,
[](FieldParserContext& ctx){
ctx.sql << "'" << PrintTableList(ctx.record->tables) << "'";
- }),
+ }, VERSION_1_0_0),
}}; // FIELDS_PARSERS const array
-} //namespace workload_management
-
} // namespace impala
diff --git a/be/src/service/workload-management-flags.cc
b/be/src/service/workload-management-flags.cc
index b71bef2cc..a911407d5 100644
--- a/be/src/service/workload-management-flags.cc
+++ b/be/src/service/workload-management-flags.cc
@@ -170,3 +170,10 @@ DEFINE_string_hidden(query_log_table_props, "", "Comma
separated list of additio
"Iceberg table properties in the format 'key'='value' to apply when
creating the "
"query log table. Only applies when the table is being created. After
table "
"creation, this property does nothing");
+
+DEFINE_string_hidden(workload_mgmt_schema_version, "1.0.0", "Schema version of
the "
+ "workload management table.");
+
+DEFINE_validator(workload_mgmt_schema_version, [](const char* name, const
string& val) {
+ return !val.empty();
+});
diff --git a/be/src/service/workload-management-init.cc
b/be/src/service/workload-management-init.cc
new file mode 100644
index 000000000..c35052092
--- /dev/null
+++ b/be/src/service/workload-management-init.cc
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// This file contains the code for the initialization process for workload
management.
+/// The init process handles:
+/// 1. Checking the state of the workload management db and tables.
+/// 2. Creating the db/tables if necessary.
+/// 3. Starting the workload management thread which runs the completed
queries
+/// processing loop.
+
+#include "service/workload-management.h"
+
+#include <mutex>
+
+#include <boost/algorithm/string/case_conv.hpp>
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/algorithm/string/trim.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gutil/strings/strcat.h>
+
+#include "common/status.h"
+#include "gen-cpp/CatalogObjects_constants.h"
+#include "gen-cpp/SystemTables_types.h"
+#include "gen-cpp/TCLIService_types.h"
+#include "gen-cpp/Types_types.h"
+#include "kudu/util/version_util.h"
+#include "service/impala-server.h"
+
+using namespace std;
+using namespace impala;
+using boost::algorithm::starts_with;
+using boost::algorithm::trim_copy;
+using kudu::Version;
+using kudu::ParseVersion;
+
+DECLARE_bool(enable_workload_mgmt);
+DECLARE_int32(query_log_write_interval_s);
+DECLARE_int32(query_log_write_timeout_s);
+DECLARE_string(query_log_request_pool);
+DECLARE_string(query_log_table_location);
+DECLARE_string(query_log_table_name);
+DECLARE_string(query_log_table_props);
+DECLARE_string(workload_mgmt_user);
+DECLARE_string(workload_mgmt_schema_version);
+
+namespace impala {
+
+/// Name of the database where all workload management tables will be stored.
+static const string DB = "sys";
+
+/// Sets up the sys database generating and executing the necessary DML
statements.
+static void _setupDb(InternalServer* server,
+ InternalServer::QueryOptionMap& insert_query_opts) {
+ insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
+ ABORT_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
+ StrCat("CREATE DATABASE IF NOT EXISTS ", DB, " COMMENT "
+ "'System database for Impala introspection'"), insert_query_opts,
false));
+ insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
+} // function _setupDb
+
+/// Appends all relevant fields to a create or alter table sql statement.
+static void _appendCols(StringStreamPop& stream,
+ std::function<bool(const FieldDefinition& item)> shouldIncludeCol) {
+ bool match = false;
+
+ for (const auto& field : FIELD_DEFINITIONS) {
+ if (shouldIncludeCol(field)) {
+ match = true;
+ stream << field.db_column << " " << field.db_column_type;
+
+ if (field.db_column_type == TPrimitiveType::DECIMAL) {
+ stream << "(" << field.precision << "," << field.scale << ")";
+ }
+
+ stream << ",";
+ }
+ }
+
+ DCHECK_EQ(match, true);
+ stream.move_back();
+} // function _appendCols
+
+/// Sets up the query table by generating and executing the necessary DML
statements.
+static void _setupTable(InternalServer* server, const string& table_name,
+ InternalServer::QueryOptionMap& insert_query_opts, const Version&
target_version,
+ bool is_system_table = false) {
+ insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
+
+ StringStreamPop create_table_sql;
+ create_table_sql << "CREATE ";
+ // System tables do not have anything to purge, and must not be managed
tables.
+ if (is_system_table) create_table_sql << "EXTERNAL ";
+ create_table_sql << "TABLE IF NOT EXISTS " << table_name << "(";
+
+ _appendCols(create_table_sql, [target_version](const FieldDefinition& f){
+ return f.schema_version <= target_version;});
+
+ create_table_sql << ") ";
+
+ if (!is_system_table) {
+ create_table_sql << "PARTITIONED BY SPEC(identity(cluster_id),
HOUR(start_time_utc)) "
+ << "STORED AS iceberg ";
+
+ if (!FLAGS_query_log_table_location.empty()) {
+ create_table_sql << "LOCATION '" << FLAGS_query_log_table_location << "'
";
+ }
+ }
+
+ create_table_sql << "TBLPROPERTIES ('schema_version'='" <<
target_version.ToString()
+ << "','format-version'='2'";
+
+ if (is_system_table) {
+ create_table_sql << ",'"
+ << g_CatalogObjects_constants.TBL_PROP_SYSTEM_TABLE
<<"'='true'";
+ } else if (!FLAGS_query_log_table_props.empty()) {
+ create_table_sql << "," << FLAGS_query_log_table_props;
+ }
+
+ create_table_sql << ")";
+
+ VLOG(2) << "Creating workload management table '" << table_name
+ << "' on schema version '" << target_version.ToString() << "'";
+ ABORT_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
+ create_table_sql.str(), insert_query_opts, false));
+
+ insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
+
+ LOG(INFO) << "Completed " << table_name << " initialization.
write_interval=\"" <<
+ FLAGS_query_log_write_interval_s << "s\"";
+} // function _setupTable
+
+static Version _retrieveSchemaVersion(InternalServer* server, const string
table_name,
+ const InternalServer::QueryOptionMap& insert_query_opts) {
+
+ vector<apache::hive::service::cli::thrift::TRow> query_results;
+
+ const Status describe_table =
server->ExecuteAndFetchAllHS2(FLAGS_workload_mgmt_user,
+ StrCat("DESCRIBE EXTENDED ", table_name), query_results,
insert_query_opts, false);
+
+ // If an error, ignore the error and run as if workload management has never
+ // executed. Since all the DDLs use the "if not exists" clause, extra runs
of the
+ // DDLs will not cause any harm.
+ if (describe_table.ok()) {
+ const string SCHEMA_VER_PROP_NAME = "schema_version";
+
+ // Table exists, search for its schema_version table property.
+ for(auto& res : query_results) {
+ if (starts_with(res.colVals[1].stringVal.value, SCHEMA_VER_PROP_NAME) ){
+ const string schema_ver = trim_copy(res.colVals[2].stringVal.value);
+ Version parsed_schema_ver;
+
+ VLOG(2) << "Actual current workload management schema version of the '"
+ << table_name << "' table is '" << schema_ver << "'";
+
+ if(!ParseVersion(schema_ver, &parsed_schema_ver).ok()) {
+ ABORT_WITH_ERROR(StrCat("Invalid actual workload management schema
version '",
+ schema_ver, "' for table '", table_name, "'"));
+ }
+
+ return parsed_schema_ver;
+ }
+ }
+
+ // If the for loop does not find the schema_version table property, then
it has been
+ // removed outside of the workload management code.
+ ABORT_WITH_ERROR(StrCat("Table '", table_name, "' is missing required
property '",
+ SCHEMA_VER_PROP_NAME, "'"));
+ }
+
+ return NO_TABLE_EXISTS;
+} // _retrieveSchemaVersion
+
+/// Aborts with error if the target_ver is less than the actual_ver.
+static void _errorIfDowngrade(const Version target_ver, const Version
actual_ver,
+ const string table_name) {
+ if (target_ver < actual_ver) {
+ ABORT_WITH_ERROR(StrCat("Target schema version '", target_ver.ToString(),
+ " of the '", table_name, "' table is lower than the actual schema
version '",
+ actual_ver.ToString(), "'. Downgrades are not supported. The target
schema "
+ "version must be greater than or equal to the actual schema
version."));
+ }
+} // _errorIfDowngrade
+
+void ImpalaServer::InitWorkloadManagement() {
+ if (!FLAGS_enable_workload_mgmt) {
+ return;
+ }
+
+ // Fully qualified table name based on startup flags.
+ const string log_table_name = StrCat(DB, ".", FLAGS_query_log_table_name);
+
+ // Verify FIELD_DEFINITIONS includes all QueryTableColumns.
+ DCHECK_EQ(_TQueryTableColumn_VALUES_TO_NAMES.size(),
FIELD_DEFINITIONS.size());
+ for (const auto& field : FIELD_DEFINITIONS) {
+ // Verify all fields match their column position.
+ DCHECK_EQ(FIELD_DEFINITIONS[field.db_column].db_column, field.db_column);
+ }
+
+ // Ensure a valid schema version was specified on the command line flag.
+ Version target_schema_version;
+ if (!ParseVersion(FLAGS_workload_mgmt_schema_version,
+ &target_schema_version).ok()) {
+ ABORT_WITH_ERROR(StrCat("Invalid workload management schema version '",
+ FLAGS_workload_mgmt_schema_version, "'"));
+ }
+ VLOG(2) << "Target workload management schema version is '"
+ << target_schema_version.ToString() << "'";
+
+ if (target_schema_version != VERSION_1_0_0) {
+ ABORT_WITH_ERROR(StrCat("Workload management schema version '",
+ FLAGS_workload_mgmt_schema_version, "' does not match any valid
version"));
+ }
+
+ // Setup default query options that will be provided on all queries that
insert rows
+ // into the completed queries table.
+ InternalServer::QueryOptionMap insert_query_opts;
+
+ insert_query_opts[TImpalaQueryOptions::TIMEZONE] = "UTC";
+ insert_query_opts[TImpalaQueryOptions::QUERY_TIMEOUT_S] = std::to_string(
+ FLAGS_query_log_write_timeout_s < 1 ?
+ FLAGS_query_log_write_interval_s : FLAGS_query_log_write_timeout_s);
+ if (!FLAGS_query_log_request_pool.empty()) {
+ insert_query_opts[TImpalaQueryOptions::REQUEST_POOL] =
FLAGS_query_log_request_pool;
+ }
+
+ {
+ lock_guard<mutex> l(workload_mgmt_threadstate_mu_);
+ workload_mgmt_thread_state_ = INITIALIZING;
+ }
+
+ Version parsed_actual_schema_version;
+
+ // Create and/or update the completed queries table if needed.
+ parsed_actual_schema_version = _retrieveSchemaVersion(internal_server_.get(),
+ log_table_name, insert_query_opts);
+
+ if (parsed_actual_schema_version == NO_TABLE_EXISTS) {
+ // First time setting up workload management.
+ // Setup the sys database.
+ _setupDb(internal_server_.get(), insert_query_opts);
+
+ // Create the query log table at the target schema version.
+ _setupTable(internal_server_.get(), log_table_name, insert_query_opts,
+ VERSION_1_0_0);
+
+ parsed_actual_schema_version = VERSION_1_0_0;
+ } else {
+ _errorIfDowngrade(target_schema_version, parsed_actual_schema_version,
+ log_table_name);
+ }
+
+ // Create and/or update the live queries table if needed.
+ // Determine the live queries table name.
+ string live_table_name = StrCat(DB, ".",
+ to_string(TSystemTableName::IMPALA_QUERY_LIVE));
+ boost::algorithm::to_lower(live_table_name);
+
+ parsed_actual_schema_version = _retrieveSchemaVersion(internal_server_.get(),
+ live_table_name, insert_query_opts);
+
+ if (parsed_actual_schema_version == NO_TABLE_EXISTS) {
+ // First time setting up workload management.
+ // Create the query live table on the target schema version.
+ _setupTable(internal_server_.get(), live_table_name, insert_query_opts,
+ target_schema_version, true);
+ } else {
+ _errorIfDowngrade(target_schema_version, parsed_actual_schema_version,
+ live_table_name);
+ }
+
+ LOG(INFO) << "Completed workload management initialization";
+ WorkloadManagementWorker(insert_query_opts, log_table_name);
+} // ImpalaServer::InitWorkloadManagement
+
+} // namespace impala
diff --git a/be/src/service/workload-management-test.cc
b/be/src/service/workload-management-test.cc
new file mode 100644
index 000000000..6b7c5cd7b
--- /dev/null
+++ b/be/src/service/workload-management-test.cc
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/workload-management.h"
+
+#include <sstream>
+#include <string>
+
+#include <gutil/strings/strcat.h>
+
+#include "gen-cpp/SystemTables_types.h"
+#include "testutil/gtest-util.h"
+
+using std::stringstream;
+using std::string;
+
+namespace impala {
+
+static constexpr int8_t EXPECTED_DECIMAL_PRECISION = 18;
+static constexpr int8_t EXPECTED_DECIMAL_SCALE = 3;
+
+// Builds and returns a string that can be used to identify the column name
during a test
+// failure situation.
+static string _eMsg(const string& expected_name) {
+ return StrCat("for field: ", expected_name);
+}
+
+// Asserts the common fields on a FieldDefinition instance.
+static void _assertCol(const string& expected_name,
+ const TPrimitiveType::type& expected_type, const string& expected_version,
+ const FieldDefinition& actual){
+ stringstream actual_col_name;
+ actual_col_name << actual.db_column;
+
+ stringstream actual_col_type;
+ actual_col_type << actual.db_column_type;
+
+ EXPECT_EQ(expected_name, actual_col_name.str()) << _eMsg(expected_name);
+ EXPECT_EQ(to_string(expected_type), actual_col_type.str()) <<
_eMsg(expected_name);
+ EXPECT_EQ(expected_version, actual.schema_version.ToString()) <<
_eMsg(expected_name);
+}
+
+// Asserts the common fields and the decimal related fields on a
FieldDefinition instance.
+static void _assertColDecimal(const string& expected_name,
+ const string& expected_version, const FieldDefinition& actual){
+ _assertCol(expected_name, TPrimitiveType::DECIMAL, expected_version, actual);
+
+ EXPECT_EQ(EXPECTED_DECIMAL_PRECISION, actual.precision) <<
_eMsg(expected_name);
+ EXPECT_EQ(EXPECTED_DECIMAL_SCALE, actual.scale) << _eMsg(expected_name);
+}
+
+TEST(WorkloadManagementTest, CheckColumnNames) {
+ EXPECT_EQ(49, FIELD_DEFINITIONS.size());
+
+ _assertCol("CLUSTER_ID", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[0]);
+ _assertCol("QUERY_ID", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[1]);
+ _assertCol("SESSION_ID", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[2]);
+ _assertCol("SESSION_TYPE", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[3]);
+ _assertCol("HIVESERVER2_PROTOCOL_VERSION", TPrimitiveType::STRING, "1.0.0",
+ FIELD_DEFINITIONS[4]);
+ _assertCol("DB_USER", TPrimitiveType::STRING, "1.0.0", FIELD_DEFINITIONS[5]);
+ _assertCol("DB_USER_CONNECTION", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[6]);
+ _assertCol("DB_NAME", TPrimitiveType::STRING, "1.0.0", FIELD_DEFINITIONS[7]);
+ _assertCol("IMPALA_COORDINATOR", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[8]);
+ _assertCol("QUERY_STATUS", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[9]);
+ _assertCol("QUERY_STATE", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[10]);
+ _assertCol("IMPALA_QUERY_END_STATE", TPrimitiveType::STRING, "1.0.0",
+ FIELD_DEFINITIONS[11]);
+ _assertCol("QUERY_TYPE", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[12]);
+ _assertCol("NETWORK_ADDRESS", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[13]);
+ _assertCol("START_TIME_UTC", TPrimitiveType::TIMESTAMP, "1.0.0",
FIELD_DEFINITIONS[14]);
+ _assertColDecimal("TOTAL_TIME_MS", "1.0.0", FIELD_DEFINITIONS[15]);
+ _assertCol("QUERY_OPTS_CONFIG", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[16]);
+ _assertCol("RESOURCE_POOL", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[17]);
+ _assertCol("PER_HOST_MEM_ESTIMATE", TPrimitiveType::BIGINT, "1.0.0",
+ FIELD_DEFINITIONS[18]);
+ _assertCol("DEDICATED_COORD_MEM_ESTIMATE", TPrimitiveType::BIGINT, "1.0.0",
+ FIELD_DEFINITIONS[19]);
+ _assertCol("PER_HOST_FRAGMENT_INSTANCES", TPrimitiveType::STRING, "1.0.0",
+ FIELD_DEFINITIONS[20]);
+ _assertCol("BACKENDS_COUNT", TPrimitiveType::INT, "1.0.0",
FIELD_DEFINITIONS[21]);
+ _assertCol("ADMISSION_RESULT", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[22]);
+ _assertCol("CLUSTER_MEMORY_ADMITTED", TPrimitiveType::BIGINT, "1.0.0",
+ FIELD_DEFINITIONS[23]);
+ _assertCol("EXECUTOR_GROUP", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[24]);
+ _assertCol("EXECUTOR_GROUPS", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[25]);
+ _assertCol("EXEC_SUMMARY", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[26]);
+ _assertCol("NUM_ROWS_FETCHED", TPrimitiveType::BIGINT, "1.0.0",
FIELD_DEFINITIONS[27]);
+ _assertCol("ROW_MATERIALIZATION_ROWS_PER_SEC", TPrimitiveType::BIGINT,
"1.0.0",
+ FIELD_DEFINITIONS[28]);
+ _assertColDecimal("ROW_MATERIALIZATION_TIME_MS", "1.0.0",
FIELD_DEFINITIONS[29]);
+ _assertCol("COMPRESSED_BYTES_SPILLED", TPrimitiveType::BIGINT, "1.0.0",
+ FIELD_DEFINITIONS[30]);
+ _assertColDecimal("EVENT_PLANNING_FINISHED", "1.0.0", FIELD_DEFINITIONS[31]);
+ _assertColDecimal("EVENT_SUBMIT_FOR_ADMISSION", "1.0.0",
FIELD_DEFINITIONS[32]);
+ _assertColDecimal("EVENT_COMPLETED_ADMISSION", "1.0.0",
FIELD_DEFINITIONS[33]);
+ _assertColDecimal("EVENT_ALL_BACKENDS_STARTED", "1.0.0",
FIELD_DEFINITIONS[34]);
+ _assertColDecimal("EVENT_ROWS_AVAILABLE", "1.0.0", FIELD_DEFINITIONS[35]);
+ _assertColDecimal("EVENT_FIRST_ROW_FETCHED", "1.0.0", FIELD_DEFINITIONS[36]);
+ _assertColDecimal("EVENT_LAST_ROW_FETCHED", "1.0.0", FIELD_DEFINITIONS[37]);
+ _assertColDecimal("EVENT_UNREGISTER_QUERY", "1.0.0", FIELD_DEFINITIONS[38]);
+ _assertColDecimal("READ_IO_WAIT_TOTAL_MS", "1.0.0", FIELD_DEFINITIONS[39]);
+ _assertColDecimal("READ_IO_WAIT_MEAN_MS", "1.0.0", FIELD_DEFINITIONS[40]);
+ _assertCol("BYTES_READ_CACHE_TOTAL", TPrimitiveType::BIGINT, "1.0.0",
+ FIELD_DEFINITIONS[41]);
+ _assertCol("BYTES_READ_TOTAL", TPrimitiveType::BIGINT, "1.0.0",
FIELD_DEFINITIONS[42]);
+ _assertCol("PERNODE_PEAK_MEM_MIN", TPrimitiveType::BIGINT, "1.0.0",
+ FIELD_DEFINITIONS[43]);
+ _assertCol("PERNODE_PEAK_MEM_MAX", TPrimitiveType::BIGINT, "1.0.0",
+ FIELD_DEFINITIONS[44]);
+ _assertCol("PERNODE_PEAK_MEM_MEAN", TPrimitiveType::BIGINT, "1.0.0",
+ FIELD_DEFINITIONS[45]);
+ _assertCol("SQL", TPrimitiveType::STRING, "1.0.0", FIELD_DEFINITIONS[46]);
+ _assertCol("PLAN", TPrimitiveType::STRING, "1.0.0", FIELD_DEFINITIONS[47]);
+ _assertCol("TABLES_QUERIED", TPrimitiveType::STRING, "1.0.0",
FIELD_DEFINITIONS[48]);
+}
+
+} // namespace impala
diff --git a/be/src/service/workload-management.cc
b/be/src/service/workload-management.cc
index a968c25af..f0b15c794 100644
--- a/be/src/service/workload-management.cc
+++ b/be/src/service/workload-management.cc
@@ -23,10 +23,8 @@
#include <memory>
#include <mutex>
#include <string>
-#include <thread>
#include <utility>
-#include <boost/algorithm/string/case_conv.hpp>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gutil/strings/strcat.h>
@@ -34,9 +32,7 @@
#include "common/compiler-util.h"
#include "common/logging.h"
#include "common/status.h"
-#include "gen-cpp/CatalogObjects_constants.h"
#include "gen-cpp/Frontend_types.h"
-#include "gen-cpp/Query_types.h"
#include "gen-cpp/SystemTables_types.h"
#include "gen-cpp/Types_types.h"
#include "runtime/query-driver.h"
@@ -47,47 +43,24 @@
#include "util/debug-util.h"
#include "util/histogram-metric.h"
#include "util/impalad-metrics.h"
-#include "util/metrics.h"
#include "util/pretty-printer.h"
#include "util/stopwatch.h"
#include "util/string-util.h"
-#include "util/thread.h"
#include "util/ticker.h"
using namespace impala;
-using namespace impala::workload_management;
using namespace std;
DECLARE_bool(enable_workload_mgmt);
-DECLARE_string(query_log_table_name);
-DECLARE_string(query_log_table_location);
DECLARE_int32(query_log_write_interval_s);
-DECLARE_int32(query_log_write_timeout_s);
DECLARE_int32(query_log_max_queued);
DECLARE_string(workload_mgmt_user);
-DECLARE_int32(query_log_max_sql_length);
-DECLARE_int32(query_log_max_plan_length);
DECLARE_int32(query_log_shutdown_timeout_s);
DECLARE_string(cluster_id);
DECLARE_int32(query_log_max_insert_attempts);
-DECLARE_string(query_log_request_pool);
-DECLARE_string(query_log_table_props);
namespace impala {
-/// Name of the database where all workload management tables will be stored.
-static const string DB = "sys";
-
-/// Default query options that will be provided on all queries that insert
rows into the
-/// completed queries table. See the initialization code in the
-/// ImpalaServer::CompletedQueriesThread function for details on which options
are set.
-static InternalServer::QueryOptionMap insert_query_opts;
-
-/// Non-values portion of the sql DML to insert records into the completed
queries table.
-/// Generates the first portion of the DML that inserts records into the
completed queries
-/// table. This portion of the statement is constant and thus is only
generated once.
-static string _insert_dml;
-
/// Determine if the maximum number of queued completed queries has been
exceeded.
///
/// Return:
@@ -99,78 +72,6 @@ static inline bool MaxRecordsExceeded(size_t record_count)
noexcept {
return FLAGS_query_log_max_queued > 0 && record_count >
FLAGS_query_log_max_queued;
} // function MaxRecordsExceeded
-/// Sets up the sys database generating and executing the necessary DML
statements.
-static const Status SetupDb(InternalServer* server) {
- insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
- RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
- StrCat("CREATE DATABASE IF NOT EXISTS ", DB, " COMMENT "
- "'System database for Impala introspection'"), insert_query_opts,
false));
- insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
- return Status::OK();
-} // function SetupDb
-
-/// Returns column name as lower-case to match common SQL style.
-static string GetColumnName(const FieldDefinition& field) {
- std::string column_name = to_string(field.db_column);
- boost::algorithm::to_lower(column_name);
- return column_name;
-}
-
-/// Sets up the query table by generating and executing the necessary DML
statements.
-static const Status SetupTable(InternalServer* server, const string&
table_name,
- bool is_system_table = false) {
- insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
-
- StringStreamPop create_table_sql;
- create_table_sql << "CREATE ";
- // System tables do not have anything to purge, and must not be managed
tables.
- if (is_system_table) create_table_sql << "EXTERNAL ";
- create_table_sql << "TABLE IF NOT EXISTS " << table_name << "(";
-
- for (const auto& field : FIELD_DEFINITIONS) {
- create_table_sql << GetColumnName(field) << " " << field.db_column_type;
-
- if (field.db_column_type == TPrimitiveType::DECIMAL) {
- create_table_sql << "(" << field.precision << "," << field.scale << ")";
- }
-
- create_table_sql << ",";
- }
- create_table_sql.move_back();
-
- create_table_sql << ") ";
-
- if (!is_system_table) {
- create_table_sql << "PARTITIONED BY SPEC(identity(cluster_id),
HOUR(start_time_utc)) "
- << "STORED AS iceberg ";
-
- if (!FLAGS_query_log_table_location.empty()) {
- create_table_sql << "LOCATION '" << FLAGS_query_log_table_location << "'
";
- }
- }
-
- create_table_sql << "TBLPROPERTIES
('schema_version'='1.0.0','format-version'='2'";
-
- if (is_system_table) {
- create_table_sql << ",'"
- << g_CatalogObjects_constants.TBL_PROP_SYSTEM_TABLE
<<"'='true'";
- } else if (!FLAGS_query_log_table_props.empty()) {
- create_table_sql << "," << FLAGS_query_log_table_props;
- }
-
- create_table_sql << ")";
-
- RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
- create_table_sql.str(), insert_query_opts, false));
-
- insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
-
- LOG(INFO) << "Completed " << table_name << " initialization.
write_interval=\"" <<
- FLAGS_query_log_write_interval_s << "s\"";
-
- return Status::OK();
-} // function SetupTable
-
/// Iterates through the list of field in `FIELDS_PARSERS` executing each
parser for the
/// given `QueryStateExpanded` object. This function builds the
`FieldParserContext`
/// object that is passed to each parser.
@@ -206,33 +107,16 @@ size_t ImpalaServer::NumLiveQueries() {
return live_queries + completed_queries_.size();
}
-Status ImpalaServer::InitWorkloadManagement() {
- // Verify FIELD_DEFINITIONS includes all QueryTableColumns.
- DCHECK_EQ(_TQueryTableColumn_VALUES_TO_NAMES.size(),
FIELD_DEFINITIONS.size());
- for (const auto& field : FIELD_DEFINITIONS) {
- // Verify all fields match their column position.
- DCHECK_EQ(FIELD_DEFINITIONS[field.db_column].db_column, field.db_column);
- }
-
- if (FLAGS_enable_workload_mgmt) {
- return Thread::Create("impala-server", "completed-queries",
- bind<void>(&ImpalaServer::CompletedQueriesThread, this),
- &completed_queries_thread_);
- }
-
- return Status::OK();
-} // ImpalaServer::InitWorkloadManagement
-
void ImpalaServer::ShutdownWorkloadManagement() {
- unique_lock<mutex> l(completed_queries_threadstate_mu_);
+ unique_lock<mutex> l(workload_mgmt_threadstate_mu_);
// If the completed queries thread is not yet running, then we don't need to
give it a
// chance to flush the in-memory queue to the completed queries table.
- if (completed_queries_thread_state_ == RUNNING) {
- completed_queries_thread_state_ = SHUTTING_DOWN;
+ if (workload_mgmt_thread_state_ == RUNNING) {
+ workload_mgmt_thread_state_ = SHUTTING_DOWN;
completed_queries_cv_.notify_all();
completed_queries_shutdown_cv_.wait_for(l,
chrono::seconds(FLAGS_query_log_shutdown_timeout_s),
- [this]{ return completed_queries_thread_state_ == SHUTDOWN; });
+ [this]{ return workload_mgmt_thread_state_ == SHUTDOWN; });
}
} // ImpalaServer::ShutdownWorkloadManagement
@@ -312,49 +196,22 @@ static string get_insert_prefix(const string& table_name)
{
StringStreamPop fields;
fields << "INSERT INTO " << table_name << "(";
for (const auto& field : FIELD_DEFINITIONS) {
- fields << GetColumnName(field) << ",";
+ fields << field.db_column << ",";
}
fields.move_back();
fields << ") VALUES ";
return fields.str();
}
-void ImpalaServer::CompletedQueriesThread() {
- {
- lock_guard<mutex> l(completed_queries_threadstate_mu_);
- completed_queries_thread_state_ = INITIALIZING;
- }
-
- // Setup default query options.
- insert_query_opts[TImpalaQueryOptions::TIMEZONE] = "UTC";
- insert_query_opts[TImpalaQueryOptions::QUERY_TIMEOUT_S] = std::to_string(
- FLAGS_query_log_write_timeout_s < 1 ?
- FLAGS_query_log_write_interval_s : FLAGS_query_log_write_timeout_s);
- if (!FLAGS_query_log_request_pool.empty()) {
- insert_query_opts[TImpalaQueryOptions::REQUEST_POOL] =
FLAGS_query_log_request_pool;
- }
-
- // Fully qualified table name based on startup flags.
- const string log_table_name = StrCat(DB, ".", FLAGS_query_log_table_name);
-
- // Non-values portion of the completed queries insert dml. Does not change
across
- // queries.
- _insert_dml = get_insert_prefix(log_table_name);
-
- // The initialization code only works when run in a separate thread for
reasons unknown.
- ABORT_IF_ERROR(SetupDb(internal_server_.get()));
- ABORT_IF_ERROR(SetupTable(internal_server_.get(), log_table_name));
- std::string live_table_name = to_string(TSystemTableName::IMPALA_QUERY_LIVE);
- boost::algorithm::to_lower(live_table_name);
- ABORT_IF_ERROR(SetupTable(internal_server_.get(),
- StrCat(DB, ".", live_table_name), true));
+void ImpalaServer::WorkloadManagementWorker(
+ InternalServer::QueryOptionMap& insert_query_opts, const string
log_table_name) {
{
- lock_guard<mutex> l(completed_queries_threadstate_mu_);
+ lock_guard<mutex> l(workload_mgmt_threadstate_mu_);
// This condition will evaluate to false only if a clean shutdown was
initiated while
// the previous function was running.
- if (LIKELY(completed_queries_thread_state_ == INITIALIZING)) {
- completed_queries_thread_state_ = RUNNING;
+ if (LIKELY(workload_mgmt_thread_state_ == INITIALIZING)) {
+ workload_mgmt_thread_state_ = RUNNING;
} else {
return; // Note: early return
}
@@ -365,15 +222,19 @@ void ImpalaServer::CompletedQueriesThread() {
"completed-queries-ticker"));
}
+ // Non-values portion of the sql DML to insert records into the completed
queries
+ // tables. This portion of the statement is constant and thus is only
generated once.
+ const string insert_dml_prefix = get_insert_prefix(log_table_name);
+
while (true) {
// Exit this thread if a shutdown was initiated.
{
- lock_guard<mutex> l(completed_queries_threadstate_mu_);
+ lock_guard<mutex> l(workload_mgmt_threadstate_mu_);
- DCHECK(completed_queries_thread_state_ != SHUTDOWN);
+ DCHECK(workload_mgmt_thread_state_ != SHUTDOWN);
- if (UNLIKELY(completed_queries_thread_state_ == SHUTTING_DOWN)) {
- completed_queries_thread_state_ = SHUTDOWN;
+ if (UNLIKELY(workload_mgmt_thread_state_ == SHUTTING_DOWN)) {
+ workload_mgmt_thread_state_ = SHUTDOWN;
completed_queries_shutdown_cv_.notify_all();
return; // Note: early return
}
@@ -385,13 +246,13 @@ void ImpalaServer::CompletedQueriesThread() {
unique_lock<mutex> l(completed_queries_lock_);
completed_queries_cv_.wait(l,
[this]{
- lock_guard<mutex> l2(completed_queries_threadstate_mu_);
+ lock_guard<mutex> l2(workload_mgmt_threadstate_mu_);
// To guard against spurious wakeups, this predicate ensures there
are completed
// queries queued up before waking up the thread.
return (completed_queries_ticker_->WakeupGuard()()
&& !completed_queries_.empty())
|| MaxRecordsExceeded(completed_queries_.size())
- || UNLIKELY(completed_queries_thread_state_ == SHUTTING_DOWN);
+ || UNLIKELY(workload_mgmt_thread_state_ == SHUTTING_DOWN);
});
completed_queries_ticker_->ResetWakeupGuard();
@@ -447,7 +308,7 @@ void ImpalaServer::CompletedQueriesThread() {
// Remove the last comma and determine the final sql statement length.
sql.pop_back();
- const size_t final_sql_len = _insert_dml.size() + sql.size();
+ const size_t final_sql_len = insert_dml_prefix.size() + sql.size();
uint64_t gather_time = timer.Reset();
TUniqueId tmp_query_id;
@@ -473,7 +334,7 @@ void ImpalaServer::CompletedQueriesThread() {
// Execute the insert dml.
const Status ret_status = internal_server_->ExecuteIgnoreResults(
- FLAGS_workload_mgmt_user, StrCat(_insert_dml, sql), opts, false,
+ FLAGS_workload_mgmt_user, StrCat(insert_dml_prefix, sql), opts, false,
&tmp_query_id);
uint64_t exec_time = timer.ElapsedTime();
@@ -504,7 +365,7 @@ void ImpalaServer::CompletedQueriesThread() {
completed_queries_lock_.unlock();
}
}
-} // ImpalaServer::CompletedQueriesThread
+} // ImpalaServer::WorkloadManagementWorker
vector<shared_ptr<QueryStateExpanded>> ImpalaServer::GetCompletedQueries() {
lock_guard<mutex> l(completed_queries_lock_);
diff --git a/be/src/service/workload-management.h
b/be/src/service/workload-management.h
index 8e196249d..868cebebd 100644
--- a/be/src/service/workload-management.h
+++ b/be/src/service/workload-management.h
@@ -22,17 +22,14 @@
#include <string>
#include <utility>
-#include <gflags/gflags.h>
-
#include "gen-cpp/SystemTables_types.h"
-#include "gen-cpp/Types_types.h"
+#include "kudu/util/version_util.h"
#include "service/query-state-record.h"
#include "util/string-util.h"
+#include "util/version-util.h"
namespace impala {
-namespace workload_management {
-
/// Struct defining the context for generating the sql DML that inserts
records into the
/// completed queries table.
struct FieldParserContext {
@@ -44,6 +41,10 @@ struct FieldParserContext {
StringStreamPop& s) : record(rec), cluster_id(cluster_id), sql(s) {}
}; // struct FieldParserContext
+/// Constants for all possible schema versions.
+const kudu::Version NO_TABLE_EXISTS = constructVersion(0, 0, 0);
+const kudu::Version VERSION_1_0_0 = constructVersion(1, 0, 0);
+
/// Type of a function that retrieves one piece of information from the
context and writes
/// it to the SQL statement that inserts rows into the completed queries table.
using FieldParser = void (*)(FieldParserContext&);
@@ -51,17 +52,31 @@ using FieldParser = void (*)(FieldParserContext&);
/// Contains all necessary information for the definition and parsing of a
single field
/// in workload management.
struct FieldDefinition {
- const TQueryTableColumn::type db_column;
- const TPrimitiveType::type db_column_type;
- const FieldParser parser;
- const int16_t precision;
- const int16_t scale;
-
- FieldDefinition(const TQueryTableColumn::type db_col,
- const TPrimitiveType::type db_col_type, const FieldParser fp,
- const int16_t precision = 0, const int16_t scale = 0) :
- db_column(std::move(db_col)), db_column_type(std::move(db_col_type)),
- parser(std::move(fp)), precision(precision), scale(scale) {}
+ public:
+ // Name of the database column.
+ const TQueryTableColumn::type db_column;
+
+ // Type of the database column.
+ const TPrimitiveType::type db_column_type;
+
+ // Function that will extract the column value from the provided
FieldParseContext and
+ // will write that value into a sql statement,
+ const FieldParser parser;
+
+ // Specifies the first schema version where the column appears.
+ const kudu::Version schema_version;
+
+ // When column type is decimal, specifies the precision and scale for the
column.
+ const int16_t precision;
+ const int16_t scale;
+
+ FieldDefinition(const TQueryTableColumn::type db_col,
+ const TPrimitiveType::type db_col_type, const FieldParser fp,
+ const kudu::Version schema_ver, const int16_t precision = 0,
+ const int16_t scale = 0) :
+ db_column(std::move(db_col)), db_column_type(std::move(db_col_type)),
+ parser(std::move(fp)), schema_version(std::move(schema_ver)),
+ precision(precision), scale(scale) { }
}; // struct FieldDefinition
/// Number of query table columns
@@ -100,6 +115,4 @@ struct CompletedQuery {
}
};
-} //namespace workload_management
-
} // namespace impala
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index dfd51a919..16a6e0fe9 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -113,6 +113,7 @@ set(UTIL_SRCS
tuple-row-compare.cc
uid-util.cc
url-parser.cc
+ version-util.cc
${SQUEASEL_SRC_DIR}/squeasel.c
webserver.cc
zip-util.cc
@@ -235,6 +236,7 @@ add_library(UtilTests STATIC
time-test.cc
tuple-row-compare-test.cc
uid-util-test.cc
+ version-util-test.cc
zip-util-test.cc
)
add_dependencies(UtilTests gen-deps)
@@ -299,6 +301,7 @@ ADD_UNIFIED_BE_LSAN_TEST(ticker-test "TickerTest.*")
ADD_UNIFIED_BE_LSAN_TEST(time-test "TimeTest.*")
ADD_UNIFIED_BE_LSAN_TEST(tuple-row-compare-test "TupleRowCompareTest.*")
ADD_UNIFIED_BE_LSAN_TEST(uid-util-test "UidUtil.*")
+ADD_UNIFIED_BE_LSAN_TEST(version-util-test "VersionUtilTest.*")
# Using standalone webserver-test for now, nonstandard main() passes in a port.
ADD_BE_LSAN_TEST(webserver-test)
TARGET_LINK_LIBRARIES(webserver-test mini_kdc)
diff --git a/be/src/util/version-util-test.cc b/be/src/util/version-util-test.cc
new file mode 100644
index 000000000..6761ed8c3
--- /dev/null
+++ b/be/src/util/version-util-test.cc
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/version-util.h"
+
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/version_util.h"
+
+namespace kudu {
+
+// Assert the overloaded equality operators '==' and '!='.
+TEST(VersionUtilTest, EqualityOperators) {
+ Version lhs;
+ Version rhs;
+
+ ASSERT_OK(ParseVersion("1.0.0", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0", &rhs));
+ ASSERT_TRUE(lhs == rhs);
+ ASSERT_FALSE(lhs != rhs);
+
+ ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &rhs));
+ ASSERT_TRUE(lhs == rhs);
+ ASSERT_FALSE(lhs != rhs);
+
+ ASSERT_OK(ParseVersion("1.1.0", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0", &rhs));
+ ASSERT_FALSE(lhs == rhs);
+ ASSERT_TRUE(lhs != rhs);
+
+ ASSERT_OK(ParseVersion("1.0.1", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0", &rhs));
+ ASSERT_FALSE(lhs == rhs);
+ ASSERT_TRUE(lhs != rhs);
+
+ ASSERT_OK(ParseVersion("1.0.1-SNAPSHOT", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0-RELEASE", &rhs));
+ ASSERT_FALSE(lhs == rhs);
+ ASSERT_TRUE(lhs != rhs);
+
+ ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0", &rhs));
+ ASSERT_FALSE(lhs == rhs);
+ ASSERT_TRUE(lhs != rhs);
+}
+
+// Assert the overloaded less than,and less than or equal to, and greater than
operators.
+TEST(VersionUtilTest, OperatorLessThan) {
+ Version lhs;
+ Version rhs;
+
+ ASSERT_OK(ParseVersion("2.0.0", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0", &rhs));
+ ASSERT_FALSE(lhs < rhs);
+ ASSERT_FALSE(lhs <= rhs);
+ ASSERT_TRUE(lhs > rhs);
+
+ ASSERT_OK(ParseVersion("1.0.0", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0", &rhs));
+ ASSERT_FALSE(lhs < rhs);
+ ASSERT_TRUE(lhs <= rhs);
+ ASSERT_FALSE(lhs > rhs);
+
+ ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &rhs));
+ ASSERT_FALSE(lhs < rhs);
+ ASSERT_TRUE(lhs <= rhs);
+ ASSERT_FALSE(lhs > rhs);
+
+ ASSERT_OK(ParseVersion("1.0.0", &lhs));
+ ASSERT_OK(ParseVersion("2.0.0", &rhs));
+ ASSERT_TRUE(lhs < rhs);
+ ASSERT_TRUE(lhs <= rhs);
+ ASSERT_FALSE(lhs > rhs);
+
+ ASSERT_OK(ParseVersion("1.0.0", &lhs));
+ ASSERT_OK(ParseVersion("1.1.0", &rhs));
+ ASSERT_TRUE(lhs < rhs);
+ ASSERT_TRUE(lhs <= rhs);
+ ASSERT_FALSE(lhs > rhs);
+
+ ASSERT_OK(ParseVersion("1.0.0", &lhs));
+ ASSERT_OK(ParseVersion("1.0.1", &rhs));
+ ASSERT_TRUE(lhs < rhs);
+ ASSERT_TRUE(lhs <= rhs);
+ ASSERT_FALSE(lhs > rhs);
+
+ ASSERT_OK(ParseVersion("1.0.0-RELEASE", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0", &rhs));
+ ASSERT_TRUE(lhs < rhs);
+ ASSERT_TRUE(lhs <= rhs);
+ ASSERT_FALSE(lhs > rhs);
+
+ ASSERT_OK(ParseVersion("1.0.0", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0-RELEASE", &rhs));
+ ASSERT_FALSE(lhs < rhs);
+ ASSERT_FALSE(lhs <= rhs);
+ ASSERT_TRUE(lhs > rhs);
+
+ // Asserts the example provided in the comments in version-util.h.
+ // 1.0.0-RELEASE, 1.0.0-SNAPSHOT, 1.0.0, 1.0.1-SNAPSHOT
+ ASSERT_OK(ParseVersion("1.0.0-RELEASE", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &rhs));
+ ASSERT_TRUE(lhs < rhs);
+
+ ASSERT_OK(ParseVersion("1.0.0-SNAPSHOT", &lhs));
+ ASSERT_OK(ParseVersion("1.0.0", &rhs));
+ ASSERT_TRUE(lhs < rhs);
+
+ ASSERT_OK(ParseVersion("1.0.0", &lhs));
+ ASSERT_OK(ParseVersion("1.0.1-SNAPSHOT", &rhs));
+ ASSERT_TRUE(lhs < rhs);
+}
+
+} // namespace impala
diff --git a/be/src/util/version-util.cc b/be/src/util/version-util.cc
new file mode 100644
index 000000000..6982540e3
--- /dev/null
+++ b/be/src/util/version-util.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/version-util.h"
+
+#include "kudu/util/version_util.h"
+
+namespace kudu {
+
+bool operator<(const Version& lhs, const Version& rhs) {
+ bool num_compare = lhs.major < rhs.major || lhs.minor < rhs.minor
+ || lhs.maintenance < rhs.maintenance;
+
+ if (num_compare) {
+ return true;
+ }
+
+ if (lhs.extra_delimiter == rhs.extra_delimiter) {
+ return lhs.extra < rhs.extra;
+ }
+
+ return lhs.extra_delimiter.has_value();
+}
+
+bool operator<=(const Version& lhs, const Version& rhs) {
+ return lhs == rhs || lhs < rhs;
+}
+
+bool operator>(const Version& lhs, const Version& rhs) {
+ return !(lhs <= rhs);
+}
+
+bool operator!=(const Version& lhs, const Version& rhs) {
+ return !(lhs == rhs);
+}
+
+} // namespace kudu
+
+namespace impala {
+
+kudu::Version constructVersion(int maj, int min, int maint) {
+ kudu::Version v;
+
+ v.major = maj;
+ v.minor = min;
+ v.maintenance = maint;
+
+ return v;
+}
+
+} // namespace impala
diff --git a/be/src/util/version-util.h b/be/src/util/version-util.h
new file mode 100644
index 000000000..7175c5626
--- /dev/null
+++ b/be/src/util/version-util.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/util/version_util.h"
+
+namespace kudu {
+
+// Adds additional comparison operators to the kudu::Version class.
+
+// Compare two Version objects. Versions that contain an extra component sort
before
+// (less than) versions that do not contain an extra component. The extra
component is
+// sorted alphabetically without modifying the case. Thus '-SNAPSHOT' sorts as
greater
+// than '-RELEASE'.
+// Example sort order (from least to greatest):
+// 1.0.0-RELEASE, 1.0.0-SNAPSHOT, 1.0.0, 1.0.1-SNAPSHOT.
+bool operator<(const Version& lhs, const Version& rhs);
+bool operator<=(const Version& lhs, const Version& rhs);
+bool operator>(const Version& lhs, const Version& rhs);
+bool operator!=(const Version& lhs, const Version& rhs);
+
+} // namespace kudu
+
+namespace impala {
+
+// Constructor function that allows for setting the values of some struct
members.
+kudu::Version constructVersion(int maj, int min, int maint);
+
+}
diff --git a/tests/custom_cluster/test_query_live.py
b/tests/custom_cluster/test_query_live.py
index 8820f4270..3a2595cd0 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -37,12 +37,8 @@ class TestQueryLive(CustomClusterTestSuite):
def setup_method(self, method):
super(TestQueryLive, self).setup_method(method)
- create_match = self.assert_impalad_log_contains("INFO",
r'\]\s+(\w+:\w+)\]\s+'
- r'Analyzing query: CREATE EXTERNAL TABLE IF NOT EXISTS
sys.impala_query_live',
- timeout_s=60)
- self.assert_impalad_log_contains("INFO", r'Query successfully
unregistered: '
- r'query_id={}'.format(create_match.group(1)),
- timeout_s=60)
+ self.assert_impalad_log_contains("INFO", r'Completed workload management '
+ r'initialization', timeout_s=120)
def assert_describe_extended(self):
describe_ext_result = self.execute_query('describe extended
sys.impala_query_live')
diff --git a/tests/custom_cluster/test_query_log.py
b/tests/custom_cluster/test_query_log.py
index 037104ca3..bce0a0eab 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -59,12 +59,8 @@ class TestQueryLogTableBase(CustomClusterTestSuite):
# These tests run very quickly and can actually complete before Impala has
finished
# creating the completed queries table. Thus, to make these tests more
robust, this
# code checks to make sure the table create has finished before returning.
- create_match = self.assert_impalad_log_contains("INFO",
r'\]\s+(\w+:\w+)\]\s+'
- r'Analyzing query: CREATE TABLE IF NOT EXISTS
{}'.format(self.QUERY_TBL),
- timeout_s=60)
- self.assert_impalad_log_contains("INFO", r'Query successfully
unregistered: '
- r'query_id={}'.format(create_match.group(1)),
- timeout_s=60)
+ self.assert_impalad_log_contains("INFO", r'Completed workload management '
+ r'initialization', timeout_s=120)
def get_client(self, protocol):
"""Retrieves the default Impala client for the specified protocol. This
client is
@@ -314,7 +310,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
impalad_graceful_shutdown=True)
def test_flush_on_queued_count_exceeded(self, vector):
"""Asserts that queries that have completed are written to the query log
table when
- the maximum number of queued records it reached. Also verifies that
writing
+ the maximum number of queued records is reached. Also verifies that
writing
completed queries is not limited by default
statement_expression_limit."""
impalad = self.cluster.get_first_impalad()