This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 40bb93fc4beb3b216aca942a01352feb535a6cb8 Author: Yida Wu <[email protected]> AuthorDate: Tue Oct 1 11:55:46 2024 -0700 IMPALA-12146: Fix incorrect host memory reserved when the executor quits abnormally Currently there is an issue where if an executor quits abnormally while running a query, its reserved memory may still remain in the coordinator's host stats. The remote aggregated memory reserved uses all available remote pool stats for calculation. The problem happens when the statestore sends a topic update to update the pool stats. Although the coordinator removes the remote stats from the pool for the terminated executor during the update, in UpdateClusterAggregates(), it fails to reset the corresponding aggregated memory reserved for that host if all the remote stats for that host have been removed. This can lead to stale memory reserved value remaining. To fix this, added a logic to ensure that the stats of memory reserved of that host are reset in the aggregated host stats when a delete topic for the host is detected and the host no longer exists in any remote pool stats. Tests: Passed exhaustive tests. Added testcase AdmissionControllerTest::EraseHostStats. Manually verified that the coordinator web ui correctly showed the reserved memory after the crashed executor recovered and rejoined. Change-Id: Ic6f6edd28c55904d63d0c494230ee2bf7a0f6cce Reviewed-on: http://gerrit.cloudera.org:8080/21896 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Michael Smith <[email protected]> --- be/src/scheduling/admission-controller-test.cc | 81 ++++++++++++++++++++++++++ be/src/scheduling/admission-controller.cc | 31 ++++++++-- be/src/scheduling/admission-controller.h | 11 +++- 3 files changed, 117 insertions(+), 6 deletions(-) diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc index 545002f1b..0281608ed 100644 --- a/be/src/scheduling/admission-controller-test.cc +++ b/be/src/scheduling/admission-controller-test.cc @@ -546,6 +546,87 @@ TEST_F(AdmissionControllerTest, Simple) { ASSERT_FALSE(coordinator_resource_limited); } +/// Test that removing hosts from AdmissionController, and check if the memory reserved in +/// host stats for the specific host is updated correctly after the host is removed. +TEST_F(AdmissionControllerTest, EraseHostStats) { + FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml"); + FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml"); + + AdmissionController* admission_controller = MakeAdmissionController(); + + ASSERT_EQ(0, admission_controller->host_stats_.size()); + + TTopicDelta membership = MakeTopicDelta(false); + + AddStatsToTopic(&membership, HOST_1, QUEUE_C, MakePoolStats(1000, 1, 0)); + AddStatsToTopic(&membership, HOST_2, QUEUE_C, MakePoolStats(5000, 10, 0)); + AddStatsToTopic(&membership, HOST_2, QUEUE_B, MakePoolStats(3000, 5, 0)); + + StatestoreSubscriber::TopicDeltaMap initial_topic_deltas; + initial_topic_deltas.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, membership); + vector<TTopicDelta> outgoing_topic_updates; + admission_controller->UpdatePoolStats(initial_topic_deltas, &outgoing_topic_updates); + + // Verify that the host stats were added + ASSERT_EQ(3, admission_controller->host_stats_.size()); + ASSERT_EQ(1, admission_controller->host_stats_.count(HOST_0)); + ASSERT_EQ(1, admission_controller->host_stats_.count(HOST_1)); + ASSERT_EQ(1, admission_controller->host_stats_.count(HOST_2)); + ASSERT_EQ(1000, admission_controller->host_stats_[HOST_1].mem_reserved); + ASSERT_EQ(8000, admission_controller->host_stats_[HOST_2].mem_reserved); + + // Create an update that deletes HOST_1 + TTopicDelta delete_update = MakeTopicDelta(true); + TTopicItem delete_item; + delete_item.key = "POOL:" + QUEUE_C + "!" + HOST_1; + delete_item.deleted = true; + delete_update.topic_entries.push_back(delete_item); + + StatestoreSubscriber::TopicDeltaMap delete_topic_deltas; + delete_topic_deltas.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, delete_update); + + // Apply the delete update + admission_controller->UpdatePoolStats(delete_topic_deltas, &outgoing_topic_updates); + + // Verify that HOST_1 mem_reserved was reset. + ASSERT_EQ(3, admission_controller->host_stats_.size()); + ASSERT_EQ(8000, admission_controller->host_stats_[HOST_2].mem_reserved); + ASSERT_EQ(0, admission_controller->host_stats_[HOST_1].mem_reserved); + + // Verify that the pool stats were updated accordingly + AdmissionController::PoolStats* pool_stats = + admission_controller->GetPoolStats(QUEUE_C); + ASSERT_EQ(5000, pool_stats->agg_mem_reserved_); + ASSERT_EQ(10, pool_stats->agg_num_running_); + + // Remove HOST_2 in Queue C. + delete_update = MakeTopicDelta(true); + delete_item.key = "POOL:" + QUEUE_C + "!" + HOST_2; + delete_item.deleted = true; + delete_update.topic_entries.push_back(delete_item); + StatestoreSubscriber::TopicDeltaMap delete_topic_deltas2; + delete_topic_deltas2.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, delete_update); + admission_controller->UpdatePoolStats(delete_topic_deltas2, &outgoing_topic_updates); + + ASSERT_EQ(3, admission_controller->host_stats_.size()); + ASSERT_EQ(3000, admission_controller->host_stats_[HOST_2].mem_reserved); + ASSERT_EQ(0, admission_controller->host_stats_[HOST_1].mem_reserved); + + // Remove HOST_2 in Queue B. + TTopicDelta delete_update3 = MakeTopicDelta(true); + TTopicItem delete_item3; + delete_item3.key = "POOL:" + QUEUE_B + "!" + HOST_2; + delete_item3.deleted = true; + delete_update3.topic_entries.push_back(delete_item3); + StatestoreSubscriber::TopicDeltaMap delete_topic_deltas3; + delete_topic_deltas3.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, delete_update3); + admission_controller->UpdatePoolStats(delete_topic_deltas3, &outgoing_topic_updates); + + ASSERT_EQ(3, admission_controller->host_stats_.size()); + ASSERT_EQ(0, admission_controller->host_stats_[HOST_2].mem_reserved); + ASSERT_EQ(0, admission_controller->host_stats_[HOST_1].mem_reserved); +} + /// Test CanAdmitRequest in the context of aggregated memory required to admit a query. TEST_F(AdmissionControllerTest, CanAdmitRequestMemory) { // Pass the paths of the configuration files as command line flags. diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index bb603383c..b274a2f2f 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -1714,6 +1714,7 @@ void AdmissionController::UpdatePoolStats( StatestoreSubscriber::TopicDeltaMap::const_iterator topic = incoming_topic_deltas.find(request_queue_topic_name_); + set<string> pool_stats_removed_hosts; if (topic != incoming_topic_deltas.end()) { const TTopicDelta& delta = topic->second; // Delta and non-delta updates are handled the same way, except for a full update @@ -1723,9 +1724,9 @@ void AdmissionController::UpdatePoolStats( VLOG_ROW << "Full impala-request-queue stats update"; for (auto& entry : pool_stats_) entry.second.ClearRemoteStats(); } - HandleTopicUpdates(delta.topic_entries); + HandleTopicUpdates(delta.topic_entries, pool_stats_removed_hosts); } - UpdateClusterAggregates(); + UpdateClusterAggregates(pool_stats_removed_hosts); last_topic_update_time_ms_ = MonotonicMillis(); pending_dequeue_ = true; } @@ -1755,7 +1756,8 @@ void AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id, } } -void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_updates) { +void AdmissionController::HandleTopicUpdates( + const vector<TTopicItem>& topic_updates, set<string>& pool_stats_removed_nodes) { string topic_key_prefix; string topic_key_suffix; string pool_name; @@ -1769,6 +1771,7 @@ void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_upd if (topic_backend_id == host_id_) continue; if (item.deleted) { GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr); + pool_stats_removed_nodes.insert(topic_backend_id); continue; } TPoolStats remote_update; @@ -1860,7 +1863,7 @@ void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reser VLOG_ROW << "Updated: " << DebugString(); } -void AdmissionController::UpdateClusterAggregates() { +void AdmissionController::UpdateClusterAggregates(const set<string>& removed_nodes) { // Recompute mem_reserved for all hosts. PoolStats::HostMemMap updated_mem_reserved; for (auto& entry : pool_stats_) entry.second.UpdateAggregates(&updated_mem_reserved); @@ -1878,6 +1881,26 @@ void AdmissionController::UpdateClusterAggregates() { ++i; } } + + // We know if any host stats were removed from the pool stats during statestore topic + // update by the set pool_stats_removed_nodes. If a host was removed and no longer + // exists in any remote pool stats, we reset the mem_reserved to 0 for this host in + // the host stats. + for (const auto& host : removed_nodes) { + auto it = host_stats_.find(host); + if (it != host_stats_.end() + && updated_mem_reserved.find(host) == updated_mem_reserved.end()) { + int64_t old_mem_reserved = it->second.mem_reserved; + it->second.mem_reserved = 0; + if (VLOG_ROW_IS_ON) { + ss << endl + << "Mem_reserved reset to 0 for removed host: " << host + << " (old value=" << PrintBytes(old_mem_reserved) << ")"; + ++i; + } + } + } + if (i > 0) VLOG_ROW << ss.str(); } diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index 296559c9c..ebc202faa 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -720,6 +720,7 @@ class AdmissionController { FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue); FRIEND_TEST(AdmissionControllerTest, QueryRejection); FRIEND_TEST(AdmissionControllerTest, TopNQueryCheck); + FRIEND_TEST(AdmissionControllerTest, EraseHostStats); friend class AdmissionControllerTest; }; @@ -935,13 +936,18 @@ class AdmissionController { /// Updates the remote stats with per-host topic_updates coming from the statestore. /// Removes remote stats identified by topic deletions coming from the /// statestore. Called by UpdatePoolStats(). Must hold admission_ctrl_lock_. - void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates); + /// Any remote host stats removed from the pool stats during the process will result in + /// the host being added to the pool_stats_removed_nodes set. + void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates, + std::set<std::string>& pool_stats_removed_nodes); /// Re-computes the per-pool aggregate stats and the per-host aggregates in host_stats_ /// using each pool's remote_stats_ and local_stats_. /// Called by UpdatePoolStats() after handling updates and deletions. /// Must hold admission_ctrl_lock_. - void UpdateClusterAggregates(); + /// If the host is present in removed_nodes and the host is not present in any remote + /// pool stats, the mem_reserved will be reset in the host_stats_ for that host. + void UpdateClusterAggregates(const std::set<std::string>& removed_nodes); /// Computes schedules for all executor groups that can run the query in 'queue_node'. /// For subsequent calls schedules are only re-computed if the membership version inside @@ -1224,6 +1230,7 @@ class AdmissionController { FRIEND_TEST(AdmissionControllerTest, DedicatedCoordScheduleState); FRIEND_TEST(AdmissionControllerTest, DedicatedCoordAdmissionChecks); FRIEND_TEST(AdmissionControllerTest, TopNQueryCheck); + FRIEND_TEST(AdmissionControllerTest, EraseHostStats); friend class AdmissionControllerTest; };
