This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b1a10471 IMPALA-11998: Fix potential invalid iterator issue in 
ImpalaServer::GetQueryRecord()
0b1a10471 is described below

commit 0b1a1047104b508ca72b84b1bf050dbcb339ff6f
Author: Eyizoha <[email protected]>
AuthorDate: Wed Mar 15 19:07:52 2023 +0800

    IMPALA-11998: Fix potential invalid iterator issue in 
ImpalaServer::GetQueryRecord()
    
    The current ImpalaServer::GetQueryRecord() may cause the caller to
    access invalid iterators, although the function locks query_log_lock_
    in the execution, the query_record it provides cannot guarantee to be
    valid, because it is out of the protection of query_log_lock_ after
    returning, if the corresponding record in query_log_index_ is just
    deleted at this time, then query_record will be an invalid iterator.
    
    To fix that, this patch uses shared_ptr instead of the original
    unique_ptr to extend the lifetime of query_record after GetQueryRecord
    returns. In addition, a new argument retried_query_record is added to
    GetQueryRecord(), the motivation is to acquire query_log_lock_ only once
    for getting both the original query record and the retried query record.
    
    Change-Id: I1279558b0d0b2a3ba31a496c0acb84ac9d99f2f1
    Reviewed-on: http://gerrit.cloudera.org:8080/19621
    Reviewed-by: Quanlong Huang <[email protected]>
    Reviewed-by: Wenzhe Zhou <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/impala-http-handler.cc | 18 +++----
 be/src/service/impala-server.cc       | 96 +++++++++++++++++------------------
 be/src/service/impala-server.h        | 22 +++++---
 3 files changed, 69 insertions(+), 67 deletions(-)

diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index c1001f2e3..2545cd3ae 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -549,7 +549,7 @@ void ImpalaHttpHandler::QueryStateHandler(const 
Webserver::WebRequest& req,
   Value completed_queries(kArrayType);
   {
     lock_guard<mutex> l(server_->query_log_lock_);
-    for (const unique_ptr<ImpalaServer::QueryStateRecord>& log_entry :
+    for (const shared_ptr<ImpalaServer::QueryStateRecord>& log_entry :
         server_->query_log_) {
       // Don't show duplicated entries between in-flight and completed queries.
       if (in_flight_query_ids.find(log_entry->id) != 
in_flight_query_ids.end()) continue;
@@ -957,23 +957,21 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool 
include_json_plan, bool include
   }
 
   if (!found) {
-    lock_guard<mutex> l(server_->query_log_lock_);
-    ImpalaServer::QueryLogIndex::const_iterator query_record =
-        server_->query_log_index_.find(query_id);
-    if (query_record == server_->query_log_index_.end()) {
+    shared_ptr<ImpalaServer::QueryStateRecord> query_record;
+    if (!server_->GetQueryRecord(query_id, &query_record).ok()) {
       const string& err = Substitute("Unknown query id: $0", 
PrintId(query_id));
       Value json_error(err.c_str(), document->GetAllocator());
       document->AddMember("error", json_error, document->GetAllocator());
       return;
     }
     if (include_json_plan || include_summary) {
-      summary = query_record->second->exec_summary;
+      summary = query_record->exec_summary;
     }
-    stmt = query_record->second->stmt;
-    plan = query_record->second->plan;
-    query_status = query_record->second->query_status;
+    stmt = query_record->stmt;
+    plan = query_record->plan;
+    query_status = query_record->query_status;
     if (include_json_plan) {
-      fragments = query_record->second->fragments;
+      fragments = query_record->fragments;
     }
   }
 
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 1331ab5b1..93ca35585 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -739,17 +739,29 @@ Status ImpalaServer::GetRuntimeProfileOutput(const 
string& user,
   return Status::OK();
 }
 
-Status ImpalaServer::GetQueryRecord(
-    const TUniqueId& query_id, QueryLogIndex::const_iterator* query_record) {
+Status ImpalaServer::GetQueryRecord(const TUniqueId& query_id,
+    shared_ptr<QueryStateRecord>* query_record,
+    shared_ptr<QueryStateRecord>* retried_query_record) {
   lock_guard<mutex> l(query_log_lock_);
-  *query_record = query_log_index_.find(query_id);
-  if (*query_record == query_log_index_.end()) {
+  auto iterator = query_log_index_.find(query_id);
+  if (iterator == query_log_index_.end()) {
     // Common error, so logging explicitly and eliding Status's stack trace.
     string err =
         strings::Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE, 
PrintId(query_id));
     VLOG(1) << err;
     return Status::Expected(err);
   }
+  *query_record = *(iterator->second);
+  if (retried_query_record != nullptr && (*query_record)->was_retried) {
+    DCHECK((*query_record)->retried_query_id != nullptr);
+    iterator = query_log_index_.find(*(*query_record)->retried_query_id);
+
+    // The record of the retried query should always be later in the query log 
compared
+    // to the original query. Since the query log is a FIFO queue, this means 
that if the
+    // original query is in the log, then the retried query must be in the log 
as well.
+    DCHECK(iterator != query_log_index_.end());
+    *retried_query_record = *(iterator->second);
+  }
   return Status::OK();
 }
 
@@ -794,31 +806,22 @@ Status ImpalaServer::GetRuntimeProfileOutput(const 
TUniqueId& query_id,
   // The query was not found in the active query map, search the query log.
   {
     // Set the profile for the original query.
-    QueryLogIndex::const_iterator query_record;
-    RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record));
-    RETURN_IF_ERROR(CheckProfileAccess(user, 
query_record->second->effective_user,
-        query_record->second->user_has_profile_access));
+    shared_ptr<QueryStateRecord> query_record;
+    shared_ptr<QueryStateRecord> retried_query_record;
+    RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record, 
&retried_query_record));
+    RETURN_IF_ERROR(CheckProfileAccess(user, query_record->effective_user,
+        query_record->user_has_profile_access));
     RETURN_IF_ERROR(DecompressToProfile(format, query_record, 
original_profile));
 
     // Set the profile for the retried query.
-    if (query_record->second->was_retried) {
+    if (query_record->was_retried) {
       *was_retried = true;
-      DCHECK(query_record->second->retried_query_id != nullptr);
-      QueryLogIndex::const_iterator retried_query_record;
-
-      // The profile of the retried profile should always be earlier in the 
query log
-      // compared to the original profile. Since the query log is a FIFO 
queue, this
-      // means that if the original profile is in the log, then the retried 
profile
-      // must be in the log as well.
-      Status status =
-          GetQueryRecord(*query_record->second->retried_query_id, 
&retried_query_record);
-      DCHECK(status.ok());
-      RETURN_IF_ERROR(status);
+      DCHECK(retried_query_record != nullptr);
 
       // If the original profile was accessible by the user, then the retried 
profile
       // must be accessible by the user as well.
-      status = CheckProfileAccess(user, 
retried_query_record->second->effective_user,
-          retried_query_record->second->user_has_profile_access);
+      Status status = CheckProfileAccess(user, 
retried_query_record->effective_user,
+          retried_query_record->user_has_profile_access);
       DCHECK(status.ok());
       RETURN_IF_ERROR(status);
 
@@ -847,34 +850,34 @@ Status ImpalaServer::GetRuntimeProfileOutput(const 
TUniqueId& query_id,
 
   // The query was not found the active query map, search the query log.
   {
-    QueryLogIndex::const_iterator query_record = 
query_log_index_.find(query_id);
+    shared_ptr<QueryStateRecord> query_record;
     RETURN_IF_ERROR(GetQueryRecord(query_id, &query_record));
-    RETURN_IF_ERROR(CheckProfileAccess(user, 
query_record->second->effective_user,
-        query_record->second->user_has_profile_access));
+    RETURN_IF_ERROR(CheckProfileAccess(user, query_record->effective_user,
+        query_record->user_has_profile_access));
     RETURN_IF_ERROR(DecompressToProfile(format, query_record, profile));
   }
   return Status::OK();
 }
 
 Status ImpalaServer::DecompressToProfile(TRuntimeProfileFormat::type format,
-    QueryLogIndex::const_iterator query_record, RuntimeProfileOutput* profile) 
{
+    shared_ptr<QueryStateRecord> query_record, RuntimeProfileOutput* profile) {
   if (format == TRuntimeProfileFormat::BASE64) {
-    Base64Encode(query_record->second->compressed_profile, 
profile->string_output);
+    Base64Encode(query_record->compressed_profile, profile->string_output);
   } else if (format == TRuntimeProfileFormat::THRIFT) {
     RETURN_IF_ERROR(RuntimeProfile::DecompressToThrift(
-        query_record->second->compressed_profile, profile->thrift_output));
+        query_record->compressed_profile, profile->thrift_output));
   } else if (format == TRuntimeProfileFormat::JSON) {
     ObjectPool tmp_pool;
     RuntimeProfile* tmp_profile;
     RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile(
-        query_record->second->compressed_profile, &tmp_pool, &tmp_profile));
+        query_record->compressed_profile, &tmp_pool, &tmp_profile));
     tmp_profile->ToJson(profile->json_output);
   } else {
     DCHECK_EQ(format, TRuntimeProfileFormat::STRING);
     ObjectPool tmp_pool;
     RuntimeProfile* tmp_profile;
     RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile(
-        query_record->second->compressed_profile, &tmp_pool, &tmp_profile));
+        query_record->compressed_profile, &tmp_pool, &tmp_profile));
     tmp_profile->PrettyPrint(profile->string_output);
   }
   return Status::OK();
@@ -944,7 +947,6 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& 
query_id, const string& use
   }
 
   // Look for the query in completed query log.
-  // IMPALA-5275: Don't create Status while holding query_log_lock_
   {
     string effective_user;
     bool user_has_profile_access = false;
@@ -952,22 +954,18 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& 
query_id, const string& use
     TExecSummary exec_summary;
     TExecSummary retried_exec_summary;
     {
-      lock_guard<mutex> l(query_log_lock_);
-      QueryLogIndex::const_iterator query_record = 
query_log_index_.find(query_id);
-      is_query_missing = query_record == query_log_index_.end();
+      shared_ptr<QueryStateRecord> query_record;
+      shared_ptr<QueryStateRecord> retried_query_record;
+      is_query_missing =
+          !GetQueryRecord(query_id, &query_record, &retried_query_record).ok();
       if (!is_query_missing) {
-        effective_user = query_record->second->effective_user;
-        user_has_profile_access = 
query_record->second->user_has_profile_access;
-        exec_summary = query_record->second->exec_summary;
-        if (query_record->second->was_retried) {
+        effective_user = query_record->effective_user;
+        user_has_profile_access = query_record->user_has_profile_access;
+        exec_summary = query_record->exec_summary;
+        if (query_record->was_retried) {
           if (was_retried != nullptr) *was_retried = true;
-          DCHECK(query_record->second->retried_query_id != nullptr);
-          QueryLogIndex::const_iterator retried_query_record =
-              query_log_index_.find(*query_record->second->retried_query_id);
-          // The retried query ran later than the original query. We should be 
able to
-          // find it in the query log since we have found the original query.
-          DCHECK(retried_query_record != query_log_index_.end());
-          retried_exec_summary = retried_query_record->second->exec_summary;
+          DCHECK(retried_query_record != nullptr);
+          retried_exec_summary = retried_query_record->exec_summary;
         }
       }
     }
@@ -1059,10 +1057,10 @@ void ImpalaServer::ArchiveQuery(const QueryHandle& 
query_handle) {
   // 'fetch_rows_lock()' protects several fields in ClientReqestState that are 
read
   // during QueryStateRecord creation. There should be no contention on this 
lock because
   // the query has already been closed (e.g. no more results can be fetched).
-  unique_ptr<QueryStateRecord> record = nullptr;
+  shared_ptr<QueryStateRecord> record = nullptr;
   {
     lock_guard<mutex> l(*query_handle->fetch_rows_lock());
-    record = make_unique<QueryStateRecord>(*query_handle, 
move(compressed_profile));
+    record = make_shared<QueryStateRecord>(*query_handle, 
move(compressed_profile));
   }
   if (query_handle->GetCoordinator() != nullptr) {
     query_handle->GetCoordinator()->GetTExecSummary(&record->exec_summary);
@@ -1070,8 +1068,8 @@ void ImpalaServer::ArchiveQuery(const QueryHandle& 
query_handle) {
   {
     lock_guard<mutex> l(query_log_lock_);
     // Add record to the beginning of the log, and to the lookup index.
-    query_log_index_[query_handle->query_id()] = record.get();
-    query_log_.insert(query_log_.begin(), move(record));
+    query_log_.push_front(move(record));
+    query_log_index_[query_handle->query_id()] = &query_log_.front();
 
     if (FLAGS_query_log_size > -1 && FLAGS_query_log_size < query_log_.size()) 
{
       DCHECK_EQ(query_log_.size() - FLAGS_query_log_size, 1);
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index a6cb34182..87c318de6 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -1196,25 +1196,31 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// FIFO list of query records, which are written after the query finishes 
executing.
   /// Queries may briefly have entries in 'query_log_' and 'query_driver_map_'
-  /// while the query is being unregistered.
-  typedef std::list<std::unique_ptr<QueryStateRecord>> QueryLog;
+  /// while the query is being unregistered. To ensure that the records 
provided by
+  /// GetQueryRecord function are always valid, this list uses shared_ptr to 
hold
+  /// QueryStateRecord, preventing its lifecycle from ending prematurely due 
to removal
+  /// from the list.
+  typedef std::list<std::shared_ptr<QueryStateRecord>> QueryLog;
   QueryLog query_log_;
 
   /// Index that allows lookup via TUniqueId into the query log. The 
QueryStateRecord
-  /// value is owned by 'query_log_' so the entry in this index must be 
removed when
+  /// pointer is owned by 'query_log_' so the entry in this index must be 
removed when
   /// it is removed from 'query_log_'.
-  typedef boost::unordered_map<TUniqueId, QueryStateRecord*> QueryLogIndex;
+  typedef boost::unordered_map<TUniqueId, std::shared_ptr<QueryStateRecord>*>
+      QueryLogIndex;
   QueryLogIndex query_log_index_;
 
-  /// Sets the given QueryLogIndex for the given query_id. Returns an error 
Status if the
-  /// given query_id cannot be found in the QueryLogIndex.
+  /// Sets the given query_record (and retried_query_record too if given) for 
the given
+  /// query_id. Returns an error Status if the given query_id cannot be found 
in the
+  /// QueryLogIndex.
   Status GetQueryRecord(
-      const TUniqueId& query_id, QueryLogIndex::const_iterator* query_record);
+      const TUniqueId& query_id, std::shared_ptr<QueryStateRecord>* 
query_record,
+      std::shared_ptr<QueryStateRecord>* retried_query_record = nullptr);
 
   /// Decompresses the profile in the given QueryStateRecord into the 
specified format.
   /// The decompressed profile is added to the given RuntimeProfileOutput.
   Status DecompressToProfile(TRuntimeProfileFormat::type format,
-      QueryLogIndex::const_iterator query_record, RuntimeProfileOutput* 
profile);
+      std::shared_ptr<QueryStateRecord> query_record, RuntimeProfileOutput* 
profile);
 
   /// Logger for writing encoded query profiles, one per line with the 
following format:
   /// <ms-since-epoch> <query-id> <thrift query profile URL encoded and 
gzipped>

Reply via email to