This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ba32d7089 IMPALA-13012: Lower default query_log_max_queued
ba32d7089 is described below
commit ba32d70891fd68c5c1234ed543b74c51661bf272
Author: Michael Smith <[email protected]>
AuthorDate: Wed Apr 17 16:54:00 2024 -0700
IMPALA-13012: Lower default query_log_max_queued
Sets the query_log_max_queued default such that
query_log_max_queued * num_columns(49) < statement_expression_limit
to avoid triggering e.g.
AnalysisException: Exceeded the statement expression limit (250000)
Statement has 370039 expressions.
Also increases statement_expression_limit for insertion to avoid an
error if query_log_max_queued is changed.
Logs time taken to write to the queries table for help with debugging
and adds histogram "impala-server.completed-queries.write-durations".
Fixes InternalServer so it uses 'default_query_options'.
Change-Id: I6535675307d88cb65ba7d908f3c692e0cf3259d7
Reviewed-on: http://gerrit.cloudera.org:8080/21351
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Michael Smith <[email protected]>
Reviewed-by: Riza Suminto <[email protected]>
---
be/src/service/impala-server.h | 7 ++--
be/src/service/internal-server-test.cc | 36 ++++++++----------
be/src/service/internal-server.cc | 24 +++++-------
be/src/service/internal-server.h | 10 +++--
be/src/service/query-options.cc | 7 +++-
be/src/service/query-options.h | 6 +++
be/src/service/workload-management-flags.cc | 2 +-
be/src/service/workload-management.cc | 57 +++++++++++++++++++----------
be/src/util/impalad-metrics.cc | 6 +++
be/src/util/impalad-metrics.h | 4 ++
common/thrift/SystemTables.thrift | 2 +
common/thrift/metrics.json | 12 +++++-
tests/custom_cluster/test_query_log.py | 13 ++++---
13 files changed, 115 insertions(+), 71 deletions(-)
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index e3562959d..b1ef8fa4d 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -410,18 +410,17 @@ class ImpalaServer : public ImpalaServiceIf,
/// InternalServer methods, see internal-server.h for details
virtual Status OpenSession(const std::string& user_name, TUniqueId&
new_session_id,
- const TQueryOptions& query_opts = TQueryOptions());
+ const QueryOptionMap& query_opts = {});
virtual bool CloseSession(const impala::TUniqueId& session_id);
virtual Status ExecuteIgnoreResults(const std::string& user_name,
- const std::string& sql, const TQueryOptions& query_opts =
TQueryOptions(),
+ const std::string& sql, const QueryOptionMap& query_opts = {},
const bool persist_in_db = true, TUniqueId* query_id = nullptr);
virtual Status ExecuteAndFetchAllText(const std::string& user_name,
const std::string& sql, query_results& results, results_columns* columns
= nullptr,
TUniqueId* query_id = nullptr);
virtual Status SubmitAndWait(const std::string& user_name, const
std::string& sql,
TUniqueId& new_session_id, TUniqueId& new_query_id,
- const TQueryOptions& query_opts = TQueryOptions(),
- const bool persist_in_db = true);
+ const QueryOptionMap& query_opts = {}, const bool persist_in_db = true);
virtual Status WaitForResults(TUniqueId& query_id);
virtual Status SubmitQuery(const std::string& sql, const impala::TUniqueId&
session_id,
TUniqueId& new_query_id, const bool persist_in_db = true);
diff --git a/be/src/service/internal-server-test.cc
b/be/src/service/internal-server-test.cc
index c1cb1ab79..ecc184283 100644
--- a/be/src/service/internal-server-test.cc
+++ b/be/src/service/internal-server-test.cc
@@ -130,7 +130,7 @@ class DatabaseTest {
TUniqueId query_id;
EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", StrCat("create
database ",
database_name_, " comment 'Temporary database created and managed by
"
- "internal-server-test'"), TQueryOptions(), false, &query_id));
+ "internal-server-test'"), {}, false, &query_id));
assertQueryState(query_id, QUERY_STATE_SUCCESS);
if (create_table) {
@@ -138,7 +138,7 @@ class DatabaseTest {
EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala",
StrCat("create table ",
table_name_, "(id INT, name STRING, first_sold TIMESTAMP, "
"last_sold TIMESTAMP, price DECIMAL(30, 2)) partitioned by
(category INT)"),
- TQueryOptions(), false, &query_id));
+ {}, false, &query_id));
assertQueryState(query_id, QUERY_STATE_SUCCESS);
// Insert some products that have a last_sold time.
@@ -162,7 +162,7 @@ class DatabaseTest {
}
}
- EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql1,
TQueryOptions(),
+ EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql1, {},
false, &query_id));
assertQueryState(query_id, QUERY_STATE_SUCCESS);
@@ -183,7 +183,7 @@ class DatabaseTest {
}
}
- EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql2,
TQueryOptions(),
+ EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql2, {},
false, &query_id));
assertQueryState(query_id, QUERY_STATE_SUCCESS);
}
@@ -221,13 +221,11 @@ TEST(InternalServerTest, QueryTimeout) {
DatabaseTest db_test = DatabaseTest(impala_server_, "query_timeout", true,
5);
InternalServer* fixture = impala_server_.get();
- TQueryOptions query_opts;
- query_opts.__set_fetch_rows_timeout_ms(1);
-
TUniqueId session_id;
TUniqueId query_id;
- ASSERT_OK(fixture->OpenSession("impala", session_id, query_opts));
+ ASSERT_OK(fixture->OpenSession("impala", session_id,
+ {{TImpalaQueryOptions::FETCH_ROWS_TIMEOUT_MS, "1"}}));
// Run a query that will execute for longer than the configured exec timeout.
ASSERT_OK(fixture->SubmitQuery(StrCat("select * from ",
db_test.GetTableName(),
@@ -249,12 +247,10 @@ TEST(InternalServerTest, QueryTimeout) {
// Asserts the expected error is returned when a query option is set to an
invalid value.
TEST(InternalServerTest, InvalidQueryOption) {
InternalServer* fixture = impala_server_.get();
- TQueryOptions query_opts;
-
- query_opts.__set_mem_limit_executors(-2);
TUniqueId session_id;
- Status stat = fixture->OpenSession("impala", session_id, query_opts);
+ Status stat = fixture->OpenSession("impala", session_id,
+ {{TImpalaQueryOptions::MEM_LIMIT_EXECUTORS, "-2"}});
ASSERT_FALSE(stat.ok());
ASSERT_EQ("Failed to parse query option 'MEM_LIMIT_EXECUTORS': -2",
stat.msg().msg());
@@ -281,7 +277,7 @@ TEST(InternalServerTest, MultipleQueriesMultipleSessions) {
// Insert a record into the test table using a new session.
ASSERT_OK(fixture->ExecuteIgnoreResults("impala", StrCat("insert into ",
test_table_name, "(id,first_name,last_name) VALUES
(1,'test','person1')"),
- TQueryOptions(), false, &query_id));
+ {}, false, &query_id));
assertQueryState(query_id, QUERY_STATE_SUCCESS);
// Select a record from the test table using a new session.
@@ -317,10 +313,8 @@ TEST(InternalServerTest, RetryFailedQuery) {
StrCat("IMPALA_SERVICE_POOL:127.0.0.1:",FLAGS_krpc_port,
":ExecQueryFInstances:FAIL"));
- TQueryOptions query_opts;
- query_opts.__set_retry_failed_queries(true);
-
- ASSERT_OK(fixture->OpenSession("impala", session_id, query_opts));
+ ASSERT_OK(fixture->OpenSession("impala", session_id,
+ {{TImpalaQueryOptions::RETRY_FAILED_QUERIES, "true"}}));
// Run a query that will fail and get automatically retried.
ASSERT_OK(fixture->SubmitQuery("select 1", session_id, query_id));
@@ -415,7 +409,7 @@ TEST(InternalServerTest, MissingClosingQuote) {
const string expected_msg = "ParseException: Unmatched string literal";
res = fixture->ExecuteIgnoreResults("impala",StrCat( "select * from ",
- db_test.GetTableName(), " where name = 'foo"), TQueryOptions(), false,
&query_id);
+ db_test.GetTableName(), " where name = 'foo"), {}, false, &query_id);
EXPECT_EQ(TErrorCode::GENERAL, res.code());
EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
EXPECT_EQ(TUniqueId(), query_id);
@@ -429,7 +423,7 @@ TEST(InternalServerTest, SyntaxError) {
const string expected_msg = "ParseException: Syntax error in line 1";
res = fixture->ExecuteIgnoreResults("impala", StrCat("select * from ",
- db_test.GetTableName(), "; select"), TQueryOptions(), false, &query_id);
+ db_test.GetTableName(), "; select"), {}, false, &query_id);
EXPECT_EQ(TErrorCode::GENERAL, res.code());
EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
EXPECT_EQ(TUniqueId(), query_id);
@@ -441,7 +435,7 @@ TEST(InternalServerTest, UnclosedComment) {
Status res;
const string expected_msg = "ParseException: Syntax error in line 1";
- res = fixture->ExecuteIgnoreResults("impala", "select 1 /*foo",
TQueryOptions(), false,
+ res = fixture->ExecuteIgnoreResults("impala", "select 1 /*foo", {}, false,
&query_id);
EXPECT_EQ(TErrorCode::GENERAL, res.code());
EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
@@ -459,7 +453,7 @@ TEST(InternalServerTest, TableNotExist) {
ASSERT_OK(fixture->ExecuteIgnoreResults("impala", StrCat("drop table ",
db_test.GetTableName(), " purge")));
res = fixture->ExecuteIgnoreResults("impala", StrCat("select * from ",
- db_test.GetTableName()), TQueryOptions(), false, &query_id);
+ db_test.GetTableName()), {}, false, &query_id);
EXPECT_EQ(TErrorCode::GENERAL, res.code());
EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
EXPECT_EQ(TUniqueId(), query_id);
diff --git a/be/src/service/internal-server.cc
b/be/src/service/internal-server.cc
index 829b79225..19c0ae5db 100644
--- a/be/src/service/internal-server.cc
+++ b/be/src/service/internal-server.cc
@@ -34,7 +34,7 @@ using namespace std;
namespace impala {
Status ImpalaServer::OpenSession(const string& user_name, TUniqueId&
new_session_id,
- const TQueryOptions& query_opts) {
+ const QueryOptionMap& query_opts) {
shared_ptr<ThriftServer::ConnectionContext> conn_ctx =
make_shared<ThriftServer::ConnectionContext>();
conn_ctx->connection_id = RandomUniqueID();
@@ -58,14 +58,11 @@ Status ImpalaServer::OpenSession(const string& user_name,
TUniqueId& new_session
{
lock_guard<mutex> l(session_state_map_lock_);
session_state = session_state_map_[new_session_id];
- std::map<string, string> query_opts_map;
- TQueryOptionsToMap(query_opts, &query_opts_map);
- for (auto iter=query_opts_map.cbegin(); iter!=query_opts_map.cend();
iter++) {
- if (!iter->second.empty()) {
- RETURN_IF_ERROR(SetQueryOption(iter->first, iter->second,
+ }
+
+ for (const auto& iter : query_opts) {
+ RETURN_IF_ERROR(SetQueryOption(iter.first, iter.second,
&session_state->set_query_options,
&session_state->set_query_options_mask));
- }
- }
}
MarkSessionActive(session_state);
@@ -102,7 +99,7 @@ bool ImpalaServer::CloseSession(const TUniqueId& session_id)
{
} // ImpalaServer::CloseSession
Status ImpalaServer::ExecuteIgnoreResults(const string& user_name, const
string& sql,
- const TQueryOptions& query_opts, const bool persist_in_db, TUniqueId*
query_id) {
+ const QueryOptionMap& query_opts, const bool persist_in_db, TUniqueId*
query_id) {
TUniqueId session_id;
TUniqueId internal_query_id;
Status result;
@@ -130,7 +127,7 @@ Status ImpalaServer::ExecuteAndFetchAllText(const
std::string& user_name,
TUniqueId internal_query_id;
Status result;
- result = SubmitAndWait(user_name, sql, session_id, internal_query_id,
TQueryOptions());
+ result = SubmitAndWait(user_name, sql, session_id, internal_query_id);
if (query_id != nullptr) {
*query_id = internal_query_id;
@@ -150,7 +147,7 @@ Status ImpalaServer::ExecuteAndFetchAllText(const
std::string& user_name,
} // ImpalaServer::ExecuteAndFetchAllText
Status ImpalaServer::SubmitAndWait(const string& user_name, const string& sql,
- TUniqueId& new_session_id, TUniqueId& new_query_id, const TQueryOptions&
query_opts,
+ TUniqueId& new_session_id, TUniqueId& new_query_id, const QueryOptionMap&
query_opts,
const bool persist_in_db) {
RETURN_IF_ERROR(OpenSession(user_name, new_session_id, query_opts));
@@ -200,11 +197,8 @@ Status ImpalaServer::SubmitQuery(const string& sql, const
TUniqueId& session_id,
session_state->ToThrift(session_state->session_id, &query_context.session);
- QueryOptionsMask set_query_options_mask;
query_context.client_request.query_options = session_state->QueryOptions();
- set_query_options_mask = session_state->set_query_options_mask;
-
- AddPoolConfiguration(&query_context, ~set_query_options_mask);
+ AddPoolConfiguration(&query_context, ~session_state->set_query_options_mask);
QueryHandle query_handle;
RETURN_IF_ERROR(Execute(&query_context, session_state, &query_handle,
nullptr,
diff --git a/be/src/service/internal-server.h b/be/src/service/internal-server.h
index aedaeb058..65e78bdee 100644
--- a/be/src/service/internal-server.h
+++ b/be/src/service/internal-server.h
@@ -17,6 +17,7 @@
#pragma once
+#include <map>
#include <memory>
#include <string>
#include <vector>
@@ -59,6 +60,8 @@ namespace impala {
public:
virtual ~InternalServer() {}
+ using QueryOptionMap = std::map<TImpalaQueryOptions::type, string>;
+
/// Creates and registers a new connection and session.
///
/// Parameters:
@@ -72,7 +75,7 @@ namespace impala {
/// Return:
/// `impala::Status` Indicates the result of opening the new session.
virtual Status OpenSession(const std::string& user_name, TUniqueId&
new_session_id,
- const TQueryOptions& query_opts = TQueryOptions()) = 0;
+ const QueryOptionMap& query_opts = {}) = 0;
/// Closes a given session cleaning up all associated resources.
///
@@ -105,7 +108,7 @@ namespace impala {
/// `impala::Status` Indicates the result of submitting the query and
waiting for
/// it to return.
virtual Status ExecuteIgnoreResults(const std::string& user_name,
- const std::string& sql, const TQueryOptions& query_opts =
TQueryOptions(),
+ const std::string& sql, const QueryOptionMap& query_opts = {},
const bool persist_in_db = true, TUniqueId* query_id = nullptr) = 0;
/// Creates a new session under the specified user and submits a query
under that
@@ -162,8 +165,7 @@ namespace impala {
/// `impala::Status` Indicates the result of submitting and waiting
for the query.
virtual Status SubmitAndWait(const std::string& user_name, const
std::string& sql,
TUniqueId& new_session_id, TUniqueId& new_query_id,
- const TQueryOptions& query_opts = TQueryOptions(),
- const bool persist_in_db = true) = 0;
+ const QueryOptionMap& query_opts = {}, const bool persist_in_db =
true) = 0;
/// Waits until the given query has results available.
///
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 10b4f3fbc..7f52e943c 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -231,9 +231,13 @@ Status impala::SetQueryOption(const string& key, const
string& value,
if (option_int < 0) {
return Status(Substitute("Invalid query option: $0", key));
}
+ return SetQueryOption(static_cast<TImpalaQueryOptions::type>(option_int),
+ value, query_options, set_query_options_mask);
+}
+Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string&
value,
+ TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask) {
QueryConstants qc;
- TImpalaQueryOptions::type option =
static_cast<TImpalaQueryOptions::type>(option_int);
if (value.empty()) {
ResetQueryOption(option, query_options);
@@ -1286,6 +1290,7 @@ Status impala::SetQueryOption(const string& key, const
string& value,
break;
}
default:
+ string key = to_string(option);
if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" <<
key << "'";
return Status::OK();
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 127f1ddb2..e9e23a38a 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -371,6 +371,12 @@ void OverlayQueryOptions(const TQueryOptions& src, const
QueryOptionsMask& mask,
Status SetQueryOption(const std::string& key, const std::string& value,
TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask);
+/// Set the key/value pair in TQueryOptions. It will override existing setting
in
+/// query_options. The bit corresponding to query option 'key' in
set_query_options_mask
+/// is set. An empty string value will reset the key to its default value.
+Status SetQueryOption(TImpalaQueryOptions::type option, const std::string&
value,
+ TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask);
+
/// Validates the query options after they have all been set. Returns a Status
indicating
/// the results of running the validation rules. The majority of the query
options
/// validation is done in SetQueryOption. However, more complex validations
rules (e.g.
diff --git a/be/src/service/workload-management-flags.cc
b/be/src/service/workload-management-flags.cc
index 81ec53bf1..b71bef2cc 100644
--- a/be/src/service/workload-management-flags.cc
+++ b/be/src/service/workload-management-flags.cc
@@ -79,7 +79,7 @@ DEFINE_int32_hidden(query_log_write_timeout_s, 0, "Specifies
the query timeout i
"seconds for inserts to the query log table. A value less than 1 indicates
to use "
"the same value as the query_log_write_interval_s flag.");
-DEFINE_int32(query_log_max_queued, 10000, "Maximum number of records that can
be queued "
+DEFINE_int32(query_log_max_queued, 5000, "Maximum number of records that can
be queued "
"before they are written to the impala query log table. This flag operates
"
"independently of the 'query_log_write_interval_s' flag. If the number of
queued "
"records reaches this value, the records will be written to the query log
table no "
diff --git a/be/src/service/workload-management.cc
b/be/src/service/workload-management.cc
index ca570e787..cf88365d3 100644
--- a/be/src/service/workload-management.cc
+++ b/be/src/service/workload-management.cc
@@ -45,8 +45,11 @@
#include "service/internal-server.h"
#include "service/query-state-record.h"
#include "util/debug-util.h"
+#include "util/histogram-metric.h"
#include "util/impalad-metrics.h"
#include "util/metrics.h"
+#include "util/pretty-printer.h"
+#include "util/stopwatch.h"
#include "util/string-util.h"
#include "util/thread.h"
#include "util/ticker.h"
@@ -78,7 +81,7 @@ static const string DB = "sys";
/// Default query options that will be provided on all queries that insert
rows into the
/// completed queries table. See the initialization code in the
/// ImpalaServer::CompletedQueriesThread function for details on which options
are set.
-static TQueryOptions insert_query_opts;
+static InternalServer::QueryOptionMap insert_query_opts;
/// Non-values portion of the sql DML to insert records into the completed
queries table.
/// Generates the first portion of the DML that inserts records into the
completed queries
@@ -98,11 +101,11 @@ static inline bool MaxRecordsExceeded(size_t record_count)
noexcept {
/// Sets up the sys database generating and executing the necessary DML
statements.
static const Status SetupDb(InternalServer* server) {
- insert_query_opts.__set_sync_ddl(true);
+ insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
StrCat("CREATE DATABASE IF NOT EXISTS ", DB, " COMMENT "
"'System database for Impala introspection'"), insert_query_opts,
false));
- insert_query_opts.__set_sync_ddl(false);
+ insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
return Status::OK();
} // function SetupDb
@@ -116,7 +119,7 @@ static string GetColumnName(const FieldDefinition& field) {
/// Sets up the query table by generating and executing the necessary DML
statements.
static const Status SetupTable(InternalServer* server, const string&
table_name,
bool is_system_table = false) {
- insert_query_opts.__set_sync_ddl(true);
+ insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
StringStreamPop create_table_sql;
create_table_sql << "CREATE TABLE IF NOT EXISTS " << table_name << "(";
@@ -157,7 +160,7 @@ static const Status SetupTable(InternalServer* server,
const string& table_name,
RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
create_table_sql.str(), insert_query_opts, false));
- insert_query_opts.__set_sync_ddl(false);
+ insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
LOG(INFO) << "Completed " << table_name << " initialization.
write_interval=\"" <<
FLAGS_query_log_write_interval_s << "s\"";
@@ -320,11 +323,12 @@ void ImpalaServer::CompletedQueriesThread() {
}
// Setup default query options.
- insert_query_opts.__set_timezone("UTC");
- insert_query_opts.__set_query_timeout_s((FLAGS_query_log_write_timeout_s < 1
?
- FLAGS_query_log_write_interval_s : FLAGS_query_log_write_timeout_s));
+ insert_query_opts[TImpalaQueryOptions::TIMEZONE] = "UTC";
+ insert_query_opts[TImpalaQueryOptions::QUERY_TIMEOUT_S] = std::to_string(
+ FLAGS_query_log_write_timeout_s < 1 ?
+ FLAGS_query_log_write_interval_s : FLAGS_query_log_write_timeout_s);
if (!FLAGS_query_log_request_pool.empty()) {
- insert_query_opts.__set_request_pool(FLAGS_query_log_request_pool);
+ insert_query_opts[TImpalaQueryOptions::REQUEST_POOL] =
FLAGS_query_log_request_pool;
}
// Fully qualified table name based on startup flags.
@@ -388,9 +392,6 @@ void ImpalaServer::CompletedQueriesThread() {
});
completed_queries_ticker_->ResetWakeupGuard();
- // transfer all currently queued completed queries to another list for
processing
- // so that the completed queries queue is not blocked while creating and
executing the
- // DML to insert into the query log table
if (!completed_queries_.empty()) {
if (MaxRecordsExceeded(completed_queries_.size())) {
ImpaladMetrics::COMPLETED_QUERIES_MAX_RECORDS_WRITES->Increment(1L);
@@ -398,6 +399,9 @@ void ImpalaServer::CompletedQueriesThread() {
ImpaladMetrics::COMPLETED_QUERIES_SCHEDULED_WRITES->Increment(1L);
}
+ MonotonicStopWatch timer;
+ timer.Start();
+
// Copy all completed queries to a temporary list so that inserts to the
// completed_queries list are not blocked while generating and running
an insert
// SQL statement for the completed queries.
@@ -440,10 +444,11 @@ void ImpalaServer::CompletedQueriesThread() {
sql.pop_back();
const size_t final_sql_len = _insert_dml.size() + sql.size();
+ uint64_t gather_time = timer.Reset();
TUniqueId tmp_query_id;
// Build query options to ensure the query is not rejected.
- TQueryOptions opts = insert_query_opts;
+ InternalServer::QueryOptionMap opts = insert_query_opts;
if (UNLIKELY(final_sql_len > numeric_limits<int32_t>::max())) {
LOG(ERROR) << "Completed queries table insert sql statement of
length '" <<
@@ -452,26 +457,40 @@ void ImpalaServer::CompletedQueriesThread() {
continue; // NOTE: early loop continuation
}
- opts.__set_max_statement_length_bytes(final_sql_len < 1024 ? 1024 :
- final_sql_len);
- opts.__set_max_row_size(max_row_size);
+ // Set max_statement_length_bytes based on actual query, and at least
the minimum.
+ opts[TImpalaQueryOptions::MAX_STATEMENT_LENGTH_BYTES] = std::to_string(
+ max<size_t>(MIN_MAX_STATEMENT_LENGTH_BYTES, final_sql_len));
+ // Set statement_expression_limit based on actual query, and at least
the minimum.
+ opts[TImpalaQueryOptions::STATEMENT_EXPRESSION_LIMIT] = std::to_string(
+ max<size_t>(MIN_STATEMENT_EXPRESSION_LIMIT,
+ queries_to_insert.size() *
_TQueryTableColumn_VALUES_TO_NAMES.size()));
+ opts[TImpalaQueryOptions::MAX_ROW_SIZE] = std::to_string(max_row_size);
// Execute the insert dml.
const Status ret_status = internal_server_->ExecuteIgnoreResults(
FLAGS_workload_mgmt_user, StrCat(_insert_dml, sql), opts, false,
&tmp_query_id);
+ uint64_t exec_time = timer.ElapsedTime();
+ ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS->Update(
+ gather_time + exec_time);
if (ret_status.ok()) {
LOG(INFO) << "wrote completed queries table=\"" << log_table_name <<
"\" "
- "record_count=\"" << queries_to_insert.size() << "\"";
+ "record_count=" << queries_to_insert.size() << " "
+ "bytes=" << PrettyPrinter::PrintBytes(sql.size()) << " "
+ "gather_time=" << PrettyPrinter::Print(gather_time,
TUnit::TIME_NS) << " "
+ "exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS);
ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(
queries_to_insert.size() * -1);
DCHECK(ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() >= 0);
ImpaladMetrics::COMPLETED_QUERIES_WRITTEN->Increment(
queries_to_insert.size());
} else {
- LOG(WARNING) << "failed to write completed queries table=\"" <<
- log_table_name << "\" record_count=\"" <<
queries_to_insert.size() << "\"";
+ LOG(WARNING) << "failed to write completed queries table=\"" <<
log_table_name
+ << "\" record_count=" << queries_to_insert.size() << " "
+ "bytes=" << PrettyPrinter::PrintBytes(sql.size()) << " "
+ "gather_time=" << PrettyPrinter::Print(gather_time,
TUnit::TIME_NS) << " "
+ "exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS);
LOG(WARNING) << ret_status.GetDetail();
ImpaladMetrics::COMPLETED_QUERIES_FAIL->Increment(queries_to_insert.size());
completed_queries_lock_.lock();
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 2079755a4..2e85d76b2 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -176,6 +176,8 @@ const char*
ImpaladMetricKeys::COMPLETED_QUERIES_SCHEDULED_WRITES =
"impala-server.completed-queries.scheduled-writes";
const char* ImpaladMetricKeys::COMPLETED_QUERIES_MAX_RECORDS_WRITES =
"impala-server.completed-queries.max-records-writes";
+const char* ImpaladMetricKeys::COMPLETED_QUERIES_WRITE_DURATIONS =
+ "impala-server.completed-queries.write-durations";
const char* ImpaladMetricKeys::DEBUG_ACTION_NUM_FAIL =
"impala.debug_action.fail";
const char* ImpaladMetricKeys::QUERY_LOG_EST_TOTAL_BYTES =
"impala-server.query-log-est-total-bytes";
@@ -272,6 +274,7 @@ StringProperty* ImpaladMetrics::ACTIVE_CATALOGD_ADDRESS =
nullptr;
// Histograms
HistogramMetric* ImpaladMetrics::QUERY_DURATIONS = nullptr;
HistogramMetric* ImpaladMetrics::DDL_DURATIONS = nullptr;
+HistogramMetric* ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS = nullptr;
// Other
StatsMetric<uint64_t, StatsType::MEAN>*
@@ -459,6 +462,9 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
MetricDefs::Get(ImpaladMetricKeys::QUERY_DURATIONS), FIVE_HOURS_IN_MS,
3));
DDL_DURATIONS = m->RegisterMetric(new HistogramMetric(
MetricDefs::Get(ImpaladMetricKeys::DDL_DURATIONS), FIVE_HOURS_IN_MS, 3));
+ COMPLETED_QUERIES_WRITE_DURATIONS = m->RegisterMetric(new HistogramMetric(
+ MetricDefs::Get(ImpaladMetricKeys::COMPLETED_QUERIES_WRITE_DURATIONS),
+ FIVE_HOURS_IN_MS, 3));
// Initialize Hedged read metrics
HEDGED_READ_OPS = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS, 0);
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index c30ff0e53..2dcf2c568 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -284,6 +284,9 @@ class ImpaladMetricKeys {
/// Number of writes to the query log table that happened because the max
queued
/// completed queries records was reached.
static const char* COMPLETED_QUERIES_MAX_RECORDS_WRITES;
+
+ /// Time spent writing completed queries to the query log table.
+ static const char* COMPLETED_QUERIES_WRITE_DURATIONS;
};
/// Global impalad-wide metrics. This is useful for objects that want to
update metrics
@@ -383,6 +386,7 @@ class ImpaladMetrics {
// Histograms
static HistogramMetric* QUERY_DURATIONS;
static HistogramMetric* DDL_DURATIONS;
+ static HistogramMetric* COMPLETED_QUERIES_WRITE_DURATIONS;
// Other
static StatsMetric<uint64_t, StatsType::MEAN>*
IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO;
diff --git a/common/thrift/SystemTables.thrift
b/common/thrift/SystemTables.thrift
index a67282609..a66e27cc5 100644
--- a/common/thrift/SystemTables.thrift
+++ b/common/thrift/SystemTables.thrift
@@ -20,6 +20,8 @@ namespace java org.apache.impala.thrift
# Must be kept in-sync with workload-management-fields.cc
# Used as column names, so do not change existing enums.
+# When adding new columns, review the default for query_log_max_queued to
maintain
+# query_log_max_queued * len(TQueryTableColumn) <
statement_expression_limit(250k)
enum TQueryTableColumn {
CLUSTER_ID
QUERY_ID
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 5aee94d49..d77415dfc 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -3899,9 +3899,19 @@
"contexts": [
"IMPALAD"
],
- "label": "Max Records Hit",
+ "label": "Completed Queries Max Records Hit",
"units": "NONE",
"kind": "COUNTER",
"key": "impala-server.completed-queries.max-records-writes"
+ },
+ {
+ "description": "Time spent writing completed queries to the query log
table.",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Completed Queries Write Duration Distribution",
+ "units": "TIME_NS",
+ "kind": "HISTOGRAM",
+ "key": "impala-server.completed-queries.write-durations"
}
]
diff --git a/tests/custom_cluster/test_query_log.py
b/tests/custom_cluster/test_query_log.py
index 9c393f60e..a81f92ca5 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -18,7 +18,6 @@
from __future__ import absolute_import, division, print_function
import os
-import pytest
import string
import tempfile
@@ -94,7 +93,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
MAX_SQL_PLAN_LEN = 2000
LOG_DIR_MAX_WRITES = tempfile.mkdtemp(prefix="max_writes")
FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_max_records_" +
str(int(time()))
- FLUSH_MAX_RECORDS_QUERY_COUNT = 2
+ FLUSH_MAX_RECORDS_QUERY_COUNT = 30
OTHER_TBL = "completed_queries_table_{0}".format(int(time()))
@CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
@@ -310,10 +309,13 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
.format(FLUSH_MAX_RECORDS_QUERY_COUNT,
FLUSH_MAX_RECORDS_CLUSTER_ID),
catalogd_args="--enable_workload_mgmt",
+ default_query_options=[
+ ('statement_expression_limit', 1024)],
impalad_graceful_shutdown=True)
def test_query_log_flush_max_records(self, vector):
"""Asserts that queries that have completed are written to the query log
table when
- the maximum number of queued records it reached."""
+ the maximum number of queued records it reached. Also verifies that
writing
+ completed queries is not limited by default
statement_expression_limit."""
impalad = self.cluster.get_first_impalad()
client = self.get_client(vector.get_value('protocol'))
@@ -341,7 +343,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
impalad.service.wait_for_metric_value(
"impala-server.completed-queries.max-records-writes", 1, 60)
self.cluster.get_first_impalad().service.wait_for_metric_value(
- "impala-server.completed-queries.written", 3, 60)
+ "impala-server.completed-queries.written",
+ self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1, 60)
# Force Impala to process the inserts to the completed queries table.
client.execute("refresh " + self.QUERY_TBL)
@@ -352,7 +355,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
.format(self.QUERY_TBL, rand_str))
assert res.success
assert 1 == len(res.data)
- assert "3" == res.data[0]
+ assert str(self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1) == res.data[0]
impalad.service.wait_for_metric_value(
"impala-server.completed-queries.queued", 2, 60)