This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 40bb93fc4beb3b216aca942a01352feb535a6cb8
Author: Yida Wu <[email protected]>
AuthorDate: Tue Oct 1 11:55:46 2024 -0700

    IMPALA-12146: Fix incorrect host memory reserved when the executor quits 
abnormally
    
    Currently there is an issue where if an executor quits abnormally
    while running a query, its reserved memory may still remain in the
    coordinator's host stats.
    
    The remote aggregated memory reserved uses all available remote
    pool stats for calculation. The problem happens when the statestore
    sends a topic update to update the pool stats. Although the
    coordinator removes the remote stats from the pool for the terminated
    executor during the update, in UpdateClusterAggregates(), it fails to
    reset the corresponding aggregated memory reserved for that host
    if all the remote stats for that host have been removed. This can
    lead to stale memory reserved value remaining.
    
    To fix this, added a logic to ensure that the stats of memory
    reserved of that host are reset in the aggregated host stats when a
    delete topic for the host is detected and the host no longer exists
    in any remote pool stats.
    
    Tests:
    Passed exhaustive tests.
    Added testcase AdmissionControllerTest::EraseHostStats.
    Manually verified that the coordinator web ui correctly showed the
    reserved memory after the crashed executor recovered and rejoined.
    
    Change-Id: Ic6f6edd28c55904d63d0c494230ee2bf7a0f6cce
    Reviewed-on: http://gerrit.cloudera.org:8080/21896
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
---
 be/src/scheduling/admission-controller-test.cc | 81 ++++++++++++++++++++++++++
 be/src/scheduling/admission-controller.cc      | 31 ++++++++--
 be/src/scheduling/admission-controller.h       | 11 +++-
 3 files changed, 117 insertions(+), 6 deletions(-)

diff --git a/be/src/scheduling/admission-controller-test.cc 
b/be/src/scheduling/admission-controller-test.cc
index 545002f1b..0281608ed 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -546,6 +546,87 @@ TEST_F(AdmissionControllerTest, Simple) {
   ASSERT_FALSE(coordinator_resource_limited);
 }
 
+/// Test that removing hosts from AdmissionController, and check if the memory 
reserved in
+/// host stats for the specific host is updated correctly after the host is 
removed.
+TEST_F(AdmissionControllerTest, EraseHostStats) {
+  FLAGS_fair_scheduler_allocation_path = 
GetResourceFile("fair-scheduler-test2.xml");
+  FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml");
+
+  AdmissionController* admission_controller = MakeAdmissionController();
+
+  ASSERT_EQ(0, admission_controller->host_stats_.size());
+
+  TTopicDelta membership = MakeTopicDelta(false);
+
+  AddStatsToTopic(&membership, HOST_1, QUEUE_C, MakePoolStats(1000, 1, 0));
+  AddStatsToTopic(&membership, HOST_2, QUEUE_C, MakePoolStats(5000, 10, 0));
+  AddStatsToTopic(&membership, HOST_2, QUEUE_B, MakePoolStats(3000, 5, 0));
+
+  StatestoreSubscriber::TopicDeltaMap initial_topic_deltas;
+  initial_topic_deltas.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, 
membership);
+  vector<TTopicDelta> outgoing_topic_updates;
+  admission_controller->UpdatePoolStats(initial_topic_deltas, 
&outgoing_topic_updates);
+
+  // Verify that the host stats were added
+  ASSERT_EQ(3, admission_controller->host_stats_.size());
+  ASSERT_EQ(1, admission_controller->host_stats_.count(HOST_0));
+  ASSERT_EQ(1, admission_controller->host_stats_.count(HOST_1));
+  ASSERT_EQ(1, admission_controller->host_stats_.count(HOST_2));
+  ASSERT_EQ(1000, admission_controller->host_stats_[HOST_1].mem_reserved);
+  ASSERT_EQ(8000, admission_controller->host_stats_[HOST_2].mem_reserved);
+
+  // Create an update that deletes HOST_1
+  TTopicDelta delete_update = MakeTopicDelta(true);
+  TTopicItem delete_item;
+  delete_item.key = "POOL:" + QUEUE_C + "!" + HOST_1;
+  delete_item.deleted = true;
+  delete_update.topic_entries.push_back(delete_item);
+
+  StatestoreSubscriber::TopicDeltaMap delete_topic_deltas;
+  delete_topic_deltas.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, 
delete_update);
+
+  // Apply the delete update
+  admission_controller->UpdatePoolStats(delete_topic_deltas, 
&outgoing_topic_updates);
+
+  // Verify that HOST_1 mem_reserved was reset.
+  ASSERT_EQ(3, admission_controller->host_stats_.size());
+  ASSERT_EQ(8000, admission_controller->host_stats_[HOST_2].mem_reserved);
+  ASSERT_EQ(0, admission_controller->host_stats_[HOST_1].mem_reserved);
+
+  // Verify that the pool stats were updated accordingly
+  AdmissionController::PoolStats* pool_stats =
+      admission_controller->GetPoolStats(QUEUE_C);
+  ASSERT_EQ(5000, pool_stats->agg_mem_reserved_);
+  ASSERT_EQ(10, pool_stats->agg_num_running_);
+
+  // Remove HOST_2 in Queue C.
+  delete_update = MakeTopicDelta(true);
+  delete_item.key = "POOL:" + QUEUE_C + "!" + HOST_2;
+  delete_item.deleted = true;
+  delete_update.topic_entries.push_back(delete_item);
+  StatestoreSubscriber::TopicDeltaMap delete_topic_deltas2;
+  delete_topic_deltas2.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, 
delete_update);
+  admission_controller->UpdatePoolStats(delete_topic_deltas2, 
&outgoing_topic_updates);
+
+  ASSERT_EQ(3, admission_controller->host_stats_.size());
+  ASSERT_EQ(3000, admission_controller->host_stats_[HOST_2].mem_reserved);
+  ASSERT_EQ(0, admission_controller->host_stats_[HOST_1].mem_reserved);
+
+  // Remove HOST_2 in Queue B.
+  TTopicDelta delete_update3 = MakeTopicDelta(true);
+  TTopicItem delete_item3;
+  delete_item3.key = "POOL:" + QUEUE_B + "!" + HOST_2;
+  delete_item3.deleted = true;
+  delete_update3.topic_entries.push_back(delete_item3);
+  StatestoreSubscriber::TopicDeltaMap delete_topic_deltas3;
+  delete_topic_deltas3.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, 
delete_update3);
+  admission_controller->UpdatePoolStats(delete_topic_deltas3, 
&outgoing_topic_updates);
+
+  ASSERT_EQ(3, admission_controller->host_stats_.size());
+  ASSERT_EQ(0, admission_controller->host_stats_[HOST_2].mem_reserved);
+  ASSERT_EQ(0, admission_controller->host_stats_[HOST_1].mem_reserved);
+}
+
 /// Test CanAdmitRequest in the context of aggregated memory required to admit 
a query.
 TEST_F(AdmissionControllerTest, CanAdmitRequestMemory) {
   // Pass the paths of the configuration files as command line flags.
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index bb603383c..b274a2f2f 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -1714,6 +1714,7 @@ void AdmissionController::UpdatePoolStats(
 
     StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
         incoming_topic_deltas.find(request_queue_topic_name_);
+    set<string> pool_stats_removed_hosts;
     if (topic != incoming_topic_deltas.end()) {
       const TTopicDelta& delta = topic->second;
       // Delta and non-delta updates are handled the same way, except for a 
full update
@@ -1723,9 +1724,9 @@ void AdmissionController::UpdatePoolStats(
         VLOG_ROW << "Full impala-request-queue stats update";
         for (auto& entry : pool_stats_) entry.second.ClearRemoteStats();
       }
-      HandleTopicUpdates(delta.topic_entries);
+      HandleTopicUpdates(delta.topic_entries, pool_stats_removed_hosts);
     }
-    UpdateClusterAggregates();
+    UpdateClusterAggregates(pool_stats_removed_hosts);
     last_topic_update_time_ms_ = MonotonicMillis();
     pending_dequeue_ = true;
   }
@@ -1755,7 +1756,8 @@ void 
AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id,
   }
 }
 
-void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& 
topic_updates) {
+void AdmissionController::HandleTopicUpdates(
+    const vector<TTopicItem>& topic_updates, set<string>& 
pool_stats_removed_nodes) {
   string topic_key_prefix;
   string topic_key_suffix;
   string pool_name;
@@ -1769,6 +1771,7 @@ void AdmissionController::HandleTopicUpdates(const 
vector<TTopicItem>& topic_upd
       if (topic_backend_id == host_id_) continue;
       if (item.deleted) {
         GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
+        pool_stats_removed_nodes.insert(topic_backend_id);
         continue;
       }
       TPoolStats remote_update;
@@ -1860,7 +1863,7 @@ void 
AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reser
   VLOG_ROW << "Updated: " << DebugString();
 }
 
-void AdmissionController::UpdateClusterAggregates() {
+void AdmissionController::UpdateClusterAggregates(const set<string>& 
removed_nodes) {
   // Recompute mem_reserved for all hosts.
   PoolStats::HostMemMap updated_mem_reserved;
   for (auto& entry : pool_stats_) 
entry.second.UpdateAggregates(&updated_mem_reserved);
@@ -1878,6 +1881,26 @@ void AdmissionController::UpdateClusterAggregates() {
       ++i;
     }
   }
+
+  // We know if any host stats were removed from the pool stats during 
statestore topic
+  // update by the set pool_stats_removed_nodes. If a host was removed and no 
longer
+  // exists in any remote pool stats, we reset the mem_reserved to 0 for this 
host in
+  // the host stats.
+  for (const auto& host : removed_nodes) {
+    auto it = host_stats_.find(host);
+    if (it != host_stats_.end()
+        && updated_mem_reserved.find(host) == updated_mem_reserved.end()) {
+      int64_t old_mem_reserved = it->second.mem_reserved;
+      it->second.mem_reserved = 0;
+      if (VLOG_ROW_IS_ON) {
+        ss << endl
+           << "Mem_reserved reset to 0 for removed host: " << host
+           << " (old value=" << PrintBytes(old_mem_reserved) << ")";
+        ++i;
+      }
+    }
+  }
+
   if (i > 0) VLOG_ROW << ss.str();
 }
 
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index 296559c9c..ebc202faa 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -720,6 +720,7 @@ class AdmissionController {
     FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
     FRIEND_TEST(AdmissionControllerTest, QueryRejection);
     FRIEND_TEST(AdmissionControllerTest, TopNQueryCheck);
+    FRIEND_TEST(AdmissionControllerTest, EraseHostStats);
     friend class AdmissionControllerTest;
   };
 
@@ -935,13 +936,18 @@ class AdmissionController {
   /// Updates the remote stats with per-host topic_updates coming from the 
statestore.
   /// Removes remote stats identified by topic deletions coming from the
   /// statestore. Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
-  void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates);
+  /// Any remote host stats removed from the pool stats during the process 
will result in
+  /// the host being added to the pool_stats_removed_nodes set.
+  void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates,
+      std::set<std::string>& pool_stats_removed_nodes);
 
   /// Re-computes the per-pool aggregate stats and the per-host aggregates in 
host_stats_
   /// using each pool's remote_stats_ and local_stats_.
   /// Called by UpdatePoolStats() after handling updates and deletions.
   /// Must hold admission_ctrl_lock_.
-  void UpdateClusterAggregates();
+  /// If the host is present in removed_nodes and the host is not present in 
any remote
+  /// pool stats, the mem_reserved will be reset in the host_stats_ for that 
host.
+  void UpdateClusterAggregates(const std::set<std::string>& removed_nodes);
 
   /// Computes schedules for all executor groups that can run the query in 
'queue_node'.
   /// For subsequent calls schedules are only re-computed if the membership 
version inside
@@ -1224,6 +1230,7 @@ class AdmissionController {
   FRIEND_TEST(AdmissionControllerTest, DedicatedCoordScheduleState);
   FRIEND_TEST(AdmissionControllerTest, DedicatedCoordAdmissionChecks);
   FRIEND_TEST(AdmissionControllerTest, TopNQueryCheck);
+  FRIEND_TEST(AdmissionControllerTest, EraseHostStats);
   friend class AdmissionControllerTest;
 };
 

Reply via email to