This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 0b1a10471 IMPALA-11998: Fix potential invalid iterator issue in
ImpalaServer::GetQueryRecord()
0b1a10471 is described below
commit 0b1a1047104b508ca72b84b1bf050dbcb339ff6f
Author: Eyizoha <[email protected]>
AuthorDate: Wed Mar 15 19:07:52 2023 +0800
IMPALA-11998: Fix potential invalid iterator issue in
ImpalaServer::GetQueryRecord()
The current ImpalaServer::GetQueryRecord() may cause the caller to
access invalid iterators, although the function locks query_log_lock_
in the execution, the query_record it provides cannot guarantee to be
valid, because it is out of the protection of query_log_lock_ after
returning, if the corresponding record in query_log_index_ is just
deleted at this time, then query_record will be an invalid iterator.
To fix that, this patch uses shared_ptr instead of the original
unique_ptr to extend the lifetime of query_record after GetQueryRecord
returns. In addition, a new argument retried_query_record is added to
GetQueryRecord(), the motivation is to acquire query_log_lock_ only once
for getting both the original query record and the retried query record.
Change-Id: I1279558b0d0b2a3ba31a496c0acb84ac9d99f2f1
Reviewed-on: http://gerrit.cloudera.org:8080/19621
Reviewed-by: Quanlong Huang <[email protected]>
Reviewed-by: Wenzhe Zhou <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/service/impala-http-handler.cc | 18 +++----
be/src/service/impala-server.cc | 96 +++++++++++++++++------------------
be/src/service/impala-server.h | 22 +++++---
3 files changed, 69 insertions(+), 67 deletions(-)
diff --git a/be/src/service/impala-http-handler.cc
b/be/src/service/impala-http-handler.cc
index c1001f2e3..2545cd3ae 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -549,7 +549,7 @@ void ImpalaHttpHandler::QueryStateHandler(const
Webserver::WebRequest& req,
Value completed_queries(kArrayType);
{
lock_guard<mutex> l(server_->query_log_lock_);
- for (const unique_ptr<ImpalaServer::QueryStateRecord>& log_entry :
+ for (const shared_ptr<ImpalaServer::QueryStateRecord>& log_entry :
server_->query_log_) {
// Don't show duplicated entries between in-flight and completed queries.
if (in_flight_query_ids.find(log_entry->id) !=
in_flight_query_ids.end()) continue;
@@ -957,23 +957,21 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool
include_json_plan, bool include
}
if (!found) {
- lock_guard<mutex> l(server_->query_log_lock_);
- ImpalaServer::QueryLogIndex::const_iterator query_record =
- server_->query_log_index_.find(query_id);
- if (query_record == server_->query_log_index_.end()) {
+ shared_ptr<ImpalaServer::QueryStateRecord> query_record;
+ if (!server_->GetQueryRecord(query_id, &query_record).ok()) {
const string& err = Substitute("Unknown query id: $0",
PrintId(query_id));
Value json_error(err.c_str(), document->GetAllocator());
document->AddMember("error", json_error, document->GetAllocator());
return;
}
if (include_json_plan || include_summary) {
- summary = query_record->second->exec_summary;
+ summary = query_record->exec_summary;
}
- stmt = query_record->second->stmt;
- plan = query_record->second->plan;
- query_status = query_record->second->query_status;
+ stmt = query_record->stmt;
+ plan = query_record->plan;
+ query_status = query_record->query_status;
if (include_json_plan) {
- fragments = query_record->second->fragments;
+ fragments = query_record->fragments;
}
}
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 1331ab5b1..93ca35585 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -739,17 +739,29 @@ Status ImpalaServer::GetRuntimeProfileOutput(const
string& user,
return Status::OK();
}
-Status ImpalaServer::GetQueryRecord(
- const TUniqueId& query_id, QueryLogIndex::const_iterator* query_record) {
+Status ImpalaServer::GetQueryRecord(const TUniqueId& query_id,
+ shared_ptr<QueryStateRecord>* query_record,
+ shared_ptr<QueryStateRecord>* retried_query_record) {
lock_guard<mutex> l(query_log_lock_);
- *query_record = query_log_index_.find(query_id);
- if (*query_record == query_log_index_.end()) {
+ auto iterator = query_log_index_.find(query_id);
+ if (iterator == query_log_index_.end()) {
// Common error, so logging explicitly and eliding Status's stack trace.
string err =
strings::Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE,
PrintId(query_id));
VLOG(1) << err;
return Status::Expected(err);
}
+ *query_record = *(iterator->second);
+ if (retried_query_record != nullptr && (*query_record)->was_retried) {
+ DCHECK((*query_record)->retried_query_id != nullptr);
+ iterator = query_log_index_.find(*(*query_record)->retried_query_id);
+
+ // The record of the retried query should always be later in the query log
compared
+ // to the original query. Since the query log is a FIFO queue, this means
that if the
+ // original query is in the log, then the retried query must be in the log
as well.
+ DCHECK(iterator != query_log_index_.end());
+ *retried_query_record = *(iterator->second);
+ }
return Status::OK();
}
@@ -794,31 +806,22 @@ Status ImpalaServer::GetRuntimeProfileOutput(const
TUniqueId& query_id,
// The query was not found in the active query map, search the query log.
{
// Set the profile for the original query.
- QueryLogIndex::const_iterator query_record;
- RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record));
- RETURN_IF_ERROR(CheckProfileAccess(user,
query_record->second->effective_user,
- query_record->second->user_has_profile_access));
+ shared_ptr<QueryStateRecord> query_record;
+ shared_ptr<QueryStateRecord> retried_query_record;
+ RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record,
&retried_query_record));
+ RETURN_IF_ERROR(CheckProfileAccess(user, query_record->effective_user,
+ query_record->user_has_profile_access));
RETURN_IF_ERROR(DecompressToProfile(format, query_record,
original_profile));
// Set the profile for the retried query.
- if (query_record->second->was_retried) {
+ if (query_record->was_retried) {
*was_retried = true;
- DCHECK(query_record->second->retried_query_id != nullptr);
- QueryLogIndex::const_iterator retried_query_record;
-
- // The profile of the retried profile should always be earlier in the
query log
- // compared to the original profile. Since the query log is a FIFO
queue, this
- // means that if the original profile is in the log, then the retried
profile
- // must be in the log as well.
- Status status =
- GetQueryRecord(*query_record->second->retried_query_id,
&retried_query_record);
- DCHECK(status.ok());
- RETURN_IF_ERROR(status);
+ DCHECK(retried_query_record != nullptr);
// If the original profile was accessible by the user, then the retried
profile
// must be accessible by the user as well.
- status = CheckProfileAccess(user,
retried_query_record->second->effective_user,
- retried_query_record->second->user_has_profile_access);
+ Status status = CheckProfileAccess(user,
retried_query_record->effective_user,
+ retried_query_record->user_has_profile_access);
DCHECK(status.ok());
RETURN_IF_ERROR(status);
@@ -847,34 +850,34 @@ Status ImpalaServer::GetRuntimeProfileOutput(const
TUniqueId& query_id,
// The query was not found the active query map, search the query log.
{
- QueryLogIndex::const_iterator query_record =
query_log_index_.find(query_id);
+ shared_ptr<QueryStateRecord> query_record;
RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record));
- RETURN_IF_ERROR(CheckProfileAccess(user,
query_record->second->effective_user,
- query_record->second->user_has_profile_access));
+ RETURN_IF_ERROR(CheckProfileAccess(user, query_record->effective_user,
+ query_record->user_has_profile_access));
RETURN_IF_ERROR(DecompressToProfile(format, query_record, profile));
}
return Status::OK();
}
Status ImpalaServer::DecompressToProfile(TRuntimeProfileFormat::type format,
- QueryLogIndex::const_iterator query_record, RuntimeProfileOutput* profile)
{
+ shared_ptr<QueryStateRecord> query_record, RuntimeProfileOutput* profile) {
if (format == TRuntimeProfileFormat::BASE64) {
- Base64Encode(query_record->second->compressed_profile,
profile->string_output);
+ Base64Encode(query_record->compressed_profile, profile->string_output);
} else if (format == TRuntimeProfileFormat::THRIFT) {
RETURN_IF_ERROR(RuntimeProfile::DecompressToThrift(
- query_record->second->compressed_profile, profile->thrift_output));
+ query_record->compressed_profile, profile->thrift_output));
} else if (format == TRuntimeProfileFormat::JSON) {
ObjectPool tmp_pool;
RuntimeProfile* tmp_profile;
RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile(
- query_record->second->compressed_profile, &tmp_pool, &tmp_profile));
+ query_record->compressed_profile, &tmp_pool, &tmp_profile));
tmp_profile->ToJson(profile->json_output);
} else {
DCHECK_EQ(format, TRuntimeProfileFormat::STRING);
ObjectPool tmp_pool;
RuntimeProfile* tmp_profile;
RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile(
- query_record->second->compressed_profile, &tmp_pool, &tmp_profile));
+ query_record->compressed_profile, &tmp_pool, &tmp_profile));
tmp_profile->PrettyPrint(profile->string_output);
}
return Status::OK();
@@ -944,7 +947,6 @@ Status ImpalaServer::GetExecSummary(const TUniqueId&
query_id, const string& use
}
// Look for the query in completed query log.
- // IMPALA-5275: Don't create Status while holding query_log_lock_
{
string effective_user;
bool user_has_profile_access = false;
@@ -952,22 +954,18 @@ Status ImpalaServer::GetExecSummary(const TUniqueId&
query_id, const string& use
TExecSummary exec_summary;
TExecSummary retried_exec_summary;
{
- lock_guard<mutex> l(query_log_lock_);
- QueryLogIndex::const_iterator query_record =
query_log_index_.find(query_id);
- is_query_missing = query_record == query_log_index_.end();
+ shared_ptr<QueryStateRecord> query_record;
+ shared_ptr<QueryStateRecord> retried_query_record;
+ is_query_missing =
+ !GetQueryRecord(query_id, &query_record, &retried_query_record).ok();
if (!is_query_missing) {
- effective_user = query_record->second->effective_user;
- user_has_profile_access =
query_record->second->user_has_profile_access;
- exec_summary = query_record->second->exec_summary;
- if (query_record->second->was_retried) {
+ effective_user = query_record->effective_user;
+ user_has_profile_access = query_record->user_has_profile_access;
+ exec_summary = query_record->exec_summary;
+ if (query_record->was_retried) {
if (was_retried != nullptr) *was_retried = true;
- DCHECK(query_record->second->retried_query_id != nullptr);
- QueryLogIndex::const_iterator retried_query_record =
- query_log_index_.find(*query_record->second->retried_query_id);
- // The retried query ran later than the original query. We should be
able to
- // find it in the query log since we have found the original query.
- DCHECK(retried_query_record != query_log_index_.end());
- retried_exec_summary = retried_query_record->second->exec_summary;
+ DCHECK(retried_query_record != nullptr);
+ retried_exec_summary = retried_query_record->exec_summary;
}
}
}
@@ -1059,10 +1057,10 @@ void ImpalaServer::ArchiveQuery(const QueryHandle&
query_handle) {
// 'fetch_rows_lock()' protects several fields in ClientReqestState that are
read
// during QueryStateRecord creation. There should be no contention on this
lock because
// the query has already been closed (e.g. no more results can be fetched).
- unique_ptr<QueryStateRecord> record = nullptr;
+ shared_ptr<QueryStateRecord> record = nullptr;
{
lock_guard<mutex> l(*query_handle->fetch_rows_lock());
- record = make_unique<QueryStateRecord>(*query_handle,
move(compressed_profile));
+ record = make_shared<QueryStateRecord>(*query_handle,
move(compressed_profile));
}
if (query_handle->GetCoordinator() != nullptr) {
query_handle->GetCoordinator()->GetTExecSummary(&record->exec_summary);
@@ -1070,8 +1068,8 @@ void ImpalaServer::ArchiveQuery(const QueryHandle&
query_handle) {
{
lock_guard<mutex> l(query_log_lock_);
// Add record to the beginning of the log, and to the lookup index.
- query_log_index_[query_handle->query_id()] = record.get();
- query_log_.insert(query_log_.begin(), move(record));
+ query_log_.push_front(move(record));
+ query_log_index_[query_handle->query_id()] = &query_log_.front();
if (FLAGS_query_log_size > -1 && FLAGS_query_log_size < query_log_.size())
{
DCHECK_EQ(query_log_.size() - FLAGS_query_log_size, 1);
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index a6cb34182..87c318de6 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -1196,25 +1196,31 @@ class ImpalaServer : public ImpalaServiceIf,
/// FIFO list of query records, which are written after the query finishes
executing.
/// Queries may briefly have entries in 'query_log_' and 'query_driver_map_'
- /// while the query is being unregistered.
- typedef std::list<std::unique_ptr<QueryStateRecord>> QueryLog;
+ /// while the query is being unregistered. To ensure that the records
provided by
+ /// GetQueryRecord function are always valid, this list uses shared_ptr to
hold
+ /// QueryStateRecord, preventing its lifecycle from ending prematurely due
to removal
+ /// from the list.
+ typedef std::list<std::shared_ptr<QueryStateRecord>> QueryLog;
QueryLog query_log_;
/// Index that allows lookup via TUniqueId into the query log. The
QueryStateRecord
- /// value is owned by 'query_log_' so the entry in this index must be
removed when
+ /// pointer is owned by 'query_log_' so the entry in this index must be
removed when
/// it is removed from 'query_log_'.
- typedef boost::unordered_map<TUniqueId, QueryStateRecord*> QueryLogIndex;
+ typedef boost::unordered_map<TUniqueId, std::shared_ptr<QueryStateRecord>*>
+ QueryLogIndex;
QueryLogIndex query_log_index_;
- /// Sets the given QueryLogIndex for the given query_id. Returns an error
Status if the
- /// given query_id cannot be found in the QueryLogIndex.
+ /// Sets the given query_record (and retried_query_record too if given) for
the given
+ /// query_id. Returns an error Status if the given query_id cannot be found
in the
+ /// QueryLogIndex.
Status GetQueryRecord(
- const TUniqueId& query_id, QueryLogIndex::const_iterator* query_record);
+ const TUniqueId& query_id, std::shared_ptr<QueryStateRecord>*
query_record,
+ std::shared_ptr<QueryStateRecord>* retried_query_record = nullptr);
/// Decompresses the profile in the given QueryStateRecord into the
specified format.
/// The decompressed profile is added to the given RuntimeProfileOutput.
Status DecompressToProfile(TRuntimeProfileFormat::type format,
- QueryLogIndex::const_iterator query_record, RuntimeProfileOutput*
profile);
+ std::shared_ptr<QueryStateRecord> query_record, RuntimeProfileOutput*
profile);
/// Logger for writing encoded query profiles, one per line with the
following format:
/// <ms-since-epoch> <query-id> <thrift query profile URL encoded and
gzipped>