This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch branch-4.4.0
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7ca9798949ffd31b703c610e4eca20d0e0181ab3
Author: Riza Suminto <[email protected]>
AuthorDate: Sun Apr 21 15:17:37 2024 -0700

    IMPALA-13024: Ignore slots if using default pool and empty group
    
    Slot based admission should not be enabled when using default pool.
    There is a bug where coordinator-only query still does slot based
    admission because executor group name set to
    ClusterMembershipMgr::EMPTY_GROUP_NAME ("empty group (using coordinator
    only)"). This patch adds check to recognize coordinator-only query in
    default pool and skip slot based admission for it.
    
    Testing:
    - Add BE test AdmissionControllerTest.CanAdmitRequestSlotsDefault.
    - In test_executor_groups.py, split test_coordinator_concurrency to
      test_coordinator_concurrency_default and
      test_coordinator_concurrency_two_exec_group_cluster to show the
      behavior change.
    - Pass core tests in ASAN build.
    
    Change-Id: I0b08dea7ba0c78ac6b98c7a0b148df8fb036c4d0
    Reviewed-on: http://gerrit.cloudera.org:8080/21340
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
    (cherry picked from commit 29e418679380591ea7b8e4bdb797ccdbb0964a3d)
---
 be/src/scheduling/admission-controller-test.cc | 64 ++++++++++++++++++++++++++
 be/src/scheduling/admission-controller.cc      |  9 ++--
 be/src/scheduling/admission-controller.h       |  1 +
 be/src/scheduling/cluster-membership-mgr.cc    |  5 +-
 be/src/scheduling/cluster-membership-mgr.h     |  3 ++
 be/src/scheduling/request-pool-service.cc      |  5 +-
 be/src/scheduling/request-pool-service.h       |  3 ++
 tests/common/impala_connection.py              |  6 ++-
 tests/custom_cluster/test_executor_groups.py   | 50 ++++++++++++++++----
 9 files changed, 128 insertions(+), 18 deletions(-)

diff --git a/be/src/scheduling/admission-controller-test.cc 
b/be/src/scheduling/admission-controller-test.cc
index babc848fe..d096bb90f 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -541,6 +541,13 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestSlots) {
     SetHostsInScheduleState(
         *schedule, 2, false, MEGABYTE, 200L * MEGABYTE, slots_per_query, 
slots_per_host);
   }
+
+  // Coordinator only schedule at EMPTY_GROUP_NAME.
+  ScheduleState* coordinator_only_schedule = MakeScheduleState(QUEUE_D, 
config_d,
+      host_count, 30L * MEGABYTE, ClusterMembershipMgr::EMPTY_GROUP_NAME);
+  SetHostsInScheduleState(*coordinator_only_schedule, 1, true, MEGABYTE, 200L 
* MEGABYTE,
+      slots_per_query, slots_per_host);
+
   vector<NetworkAddressPB> host_addrs = GetHostAddrs(*default_group_schedule);
   string not_admitted_reason;
   bool coordinator_resource_limited = false;
@@ -572,6 +579,63 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestSlots) {
       "Not enough admission control slots available on host host1:25000. 
Needed 4 "
       "slots but 15/16 are already in use.");
   ASSERT_FALSE(coordinator_resource_limited);
+  // Assert that coordinator-only schedule also not admitted.
+  
ASSERT_FALSE(admission_controller->CanAdmitRequest(*coordinator_only_schedule, 
config_d,
+      true, &not_admitted_reason, nullptr, coordinator_resource_limited));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "Not enough admission control slots available on host host0:25000. 
Needed 4 "
+      "slots but 15/16 are already in use.");
+  ASSERT_TRUE(coordinator_resource_limited);
+}
+
+/// Test CanAdmitRequest() ignore slots mechanism in default pool setup.
+TEST_F(AdmissionControllerTest, CanAdmitRequestSlotsDefault) {
+  // Pass the paths of the configuration files as command line flags.
+  FLAGS_fair_scheduler_allocation_path = 
GetResourceFile("fair-scheduler-empty.xml");
+  FLAGS_llama_site_path = GetResourceFile("llama-site-empty.xml");
+
+  AdmissionController* admission_controller = MakeAdmissionController();
+  RequestPoolService* request_pool_service = 
admission_controller->request_pool_service_;
+
+  // Get the PoolConfig for "default-pool".
+  TPoolConfig pool_config;
+  ASSERT_OK(request_pool_service->GetPoolConfig(
+      RequestPoolService::DEFAULT_POOL_NAME, &pool_config));
+
+  // Create ScheduleStates to run on "default-pool" on 12 hosts.
+  // Running both distributed and coordinator-only schedule.
+  int64_t host_count = 12;
+  int64_t slots_per_host = 16;
+  int64_t slots_per_query = 4;
+  // Distributed query schedule.
+  ScheduleState* default_pool_schedule = MakeScheduleState(
+      RequestPoolService::DEFAULT_POOL_NAME, pool_config, host_count, 30L * 
MEGABYTE);
+  SetHostsInScheduleState(*default_pool_schedule, 2, false, MEGABYTE, 200L * 
MEGABYTE,
+      slots_per_query, slots_per_host);
+  // Coordinator only schedule at EMPTY_GROUP_NAME.
+  ScheduleState* coordinator_only_schedule =
+      MakeScheduleState(RequestPoolService::DEFAULT_POOL_NAME, pool_config, 
host_count,
+          30L * MEGABYTE, ClusterMembershipMgr::EMPTY_GROUP_NAME);
+  SetHostsInScheduleState(*coordinator_only_schedule, 1, true, MEGABYTE, 200L 
* MEGABYTE,
+      slots_per_query, slots_per_host);
+
+  vector<NetworkAddressPB> host_addrs = GetHostAddrs(*default_pool_schedule);
+  string not_admitted_reason;
+  bool coordinator_resource_limited = false;
+
+  // Simulate that all slots are being used.
+  // All schedules should be admitted.
+  SetSlotsInUse(admission_controller, host_addrs, slots_per_host);
+  ASSERT_TRUE(admission_controller->CanAdmitRequest(*default_pool_schedule, 
pool_config,
+      true, &not_admitted_reason, nullptr, coordinator_resource_limited))
+      << not_admitted_reason;
+  ASSERT_FALSE(coordinator_resource_limited);
+  ASSERT_EQ(coordinator_only_schedule->executor_group(),
+      ClusterMembershipMgr::EMPTY_GROUP_NAME);
+  ASSERT_TRUE(admission_controller->CanAdmitRequest(*coordinator_only_schedule,
+      pool_config, true, &not_admitted_reason, nullptr, 
coordinator_resource_limited))
+      << not_admitted_reason;
+  ASSERT_FALSE(coordinator_resource_limited);
 }
 
 /// Tests that query rejection works as expected by calling 
RejectForSchedule() and
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index af14a7010..a85e3ba47 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -998,13 +998,16 @@ bool AdmissionController::CanAdmitRequest(const 
ScheduleState& state,
   // Can't admit if:
   //  (a) There are already queued requests (and this is not admitting from 
the queue).
   //  (b) The resource pool is already at the maximum number of requests.
-  //  (c) One of the executors in 'schedule' is already at its maximum number 
of requests
-  //      (when not using the default executor group).
+  //  (c) One of the executors or coordinator in 'schedule' is already at its 
maximum
+  //      admission slots (when not using the default executor group).
   //  (d) There are not enough memory resources available for the query.
   const int64_t max_requests = GetMaxRequestsForPool(pool_cfg);
   PoolStats* pool_stats = GetPoolStats(state);
   bool default_group =
       state.executor_group() == ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME;
+  bool default_coordinator_only =
+      state.executor_group() == ClusterMembershipMgr::EMPTY_GROUP_NAME
+      && state.request_pool() == RequestPoolService::DEFAULT_POOL_NAME;
   if (!admit_from_queue && pool_stats->local_stats().num_queued > 0) {
     *not_admitted_reason = Substitute(QUEUED_QUEUE_NOT_EMPTY,
         pool_stats->local_stats().num_queued, GetStalenessDetailLocked(" "));
@@ -1017,7 +1020,7 @@ bool AdmissionController::CanAdmitRequest(const 
ScheduleState& state,
         max_requests, GetStalenessDetailLocked(" "));
     return false;
   }
-  if (!default_group
+  if (!default_group && !default_coordinator_only
       && !HasAvailableSlots(
           state, pool_cfg, not_admitted_reason, coordinator_resource_limited)) 
{
     // All non-default executor groups are also limited by the number of 
running queries
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index f72f5b8a0..9c4db809d 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -1216,6 +1216,7 @@ class AdmissionController {
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestSlots);
+  FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestSlotsDefault);
   FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
   FRIEND_TEST(AdmissionControllerTest, QueryRejection);
   FRIEND_TEST(AdmissionControllerTest, DedicatedCoordScheduleState);
diff --git a/be/src/scheduling/cluster-membership-mgr.cc 
b/be/src/scheduling/cluster-membership-mgr.cc
index 615a385f2..a296b2f35 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -69,8 +69,6 @@ void RemoveExecutorAndGroup(const BackendDescriptorPB& 
be_desc,
 
 namespace impala {
 
-static const string EMPTY_GROUP_NAME("empty group (using coordinator only)");
-
 static const string 
LIVE_EXEC_GROUP_KEY("cluster-membership.executor-groups.total");
 static const string HEALTHY_EXEC_GROUP_KEY(
     "cluster-membership.executor-groups.total-healthy");
@@ -83,6 +81,9 @@ static const string HEALTHY_EXEC_GROUP_KEY_FORMAT(
 static const string TOTAL_BACKENDS_KEY_FORMAT(
     "cluster-membership.group-set.backends.total.$0");
 
+const string ClusterMembershipMgr::EMPTY_GROUP_NAME(
+    "empty group (using coordinator only)");
+
 ClusterMembershipMgr::ClusterMembershipMgr(
     string local_backend_id, StatestoreSubscriber* subscriber, MetricGroup* 
metrics)
   : empty_exec_group_(EMPTY_GROUP_NAME),
diff --git a/be/src/scheduling/cluster-membership-mgr.h 
b/be/src/scheduling/cluster-membership-mgr.h
index 8c82b9972..bfbb7e453 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -78,6 +78,9 @@ class ClusterMembershipMgr {
   /// Maps executor group names to executor groups.
   typedef std::unordered_map<std::string, ExecutorGroup> ExecutorGroups;
 
+  /// Empty group name when query use coordinator only.
+  static const std::string EMPTY_GROUP_NAME;
+
   // A snapshot of the current cluster membership. The ClusterMembershipMgr 
maintains a
   // consistent copy of this and updates it atomically when the membership 
changes.
   // Clients can obtain an immutable copy. Class instances can be created 
through the
diff --git a/be/src/scheduling/request-pool-service.cc 
b/be/src/scheduling/request-pool-service.cc
index dd3d2ff83..bae449611 100644
--- a/be/src/scheduling/request-pool-service.cc
+++ b/be/src/scheduling/request-pool-service.cc
@@ -78,9 +78,6 @@ DEFINE_bool(disable_pool_max_requests, false, "Disables all 
per-pool limits on t
     "maximum number of running requests.");
 
 
-// Pool name used when the configuration files are not specified.
-static const string DEFAULT_POOL_NAME = "default-pool";
-
 static const string RESOLVE_POOL_METRIC_NAME = 
"request-pool-service.resolve-pool-duration-ms";
 
 static const string ERROR_USER_NOT_ALLOWED_IN_POOL = "Request from user '$0' 
with "
@@ -88,6 +85,8 @@ static const string ERROR_USER_NOT_ALLOWED_IN_POOL = "Request 
from user '$0' wit
 static const string ERROR_USER_NOT_SPECIFIED = "User must be specified because 
"
     "-require_username=true.";
 
+const string RequestPoolService::DEFAULT_POOL_NAME = "default-pool";
+
 RequestPoolService::RequestPoolService(MetricGroup* metrics) :
     resolve_pool_ms_metric_(NULL) {
   DCHECK(metrics != NULL);
diff --git a/be/src/scheduling/request-pool-service.h 
b/be/src/scheduling/request-pool-service.h
index 5fa097947..a92d8ee77 100644
--- a/be/src/scheduling/request-pool-service.h
+++ b/be/src/scheduling/request-pool-service.h
@@ -35,6 +35,9 @@ namespace impala {
 /// RequestPoolService, called via JNI.
 class RequestPoolService {
  public:
+  // Pool name used when the configuration files are not specified.
+  static const std::string DEFAULT_POOL_NAME;
+
   /// Initializes the JNI method stubs if configuration files are specified. 
If any
   /// method can't be found, or if there is any further error, the constructor 
will
   /// terminate the process.
diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index 3f8f2a9e0..b008a05cb 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -64,6 +64,7 @@ def log_sql_stmt(sql_stmt):
       LOG.info("-- {0}".format(line))
     LOG.info("-- [...]")
 
+
 # Common wrapper around the internal types of HS2/Beeswax operation/query 
handles.
 class OperationHandle(object):
   def __init__(self, handle, sql_stmt):
@@ -88,7 +89,7 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)):
 
   @abc.abstractmethod
   def set_configuration_option(self, name, value):
-    """Sets a configuraiton option name to the given value"""
+    """Sets a configuration option name to the given value"""
     pass
 
   def set_configuration(self, config_option_dict):
@@ -258,7 +259,7 @@ class BeeswaxConnection(ImpalaConnection):
     LOG.info("-- getting log for operation: %s" % operation_handle)
     return 
self.__beeswax_client.get_log(operation_handle.get_handle().log_context)
 
-  def fetch(self, sql_stmt, operation_handle, max_rows = -1):
+  def fetch(self, sql_stmt, operation_handle, max_rows=-1):
     LOG.info("-- fetching results from: %s" % operation_handle)
     return self.__beeswax_client.fetch_results(
         sql_stmt, operation_handle.get_handle(), max_rows)
@@ -532,6 +533,7 @@ def create_connection(host_port, use_kerberos=False, 
protocol='beeswax',
     c.set_configuration_option("client_identifier", tests.common.current_node)
   return c
 
+
 def create_ldap_connection(host_port, user, password, use_ssl=False):
   return BeeswaxConnection(host_port=host_port, user=user, password=password,
                            use_ssl=use_ssl)
diff --git a/tests/custom_cluster/test_executor_groups.py 
b/tests/custom_cluster/test_executor_groups.py
index de3491f20..b22fd15ae 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -44,6 +44,9 @@ CPU_TEST_QUERY = "select * from tpcds_parquet.store_sales 
where ss_item_sk = 1 l
 GROUPING_TEST_QUERY = ("select ss_item_sk from tpcds_parquet.store_sales"
     " group by (ss_item_sk) order by ss_item_sk limit 10")
 
+# A scan query that is recognized as trivial and scheduled to run at 
coordinator only.
+SINGLE_NODE_SCAN_QUERY = "select * from functional.alltypestiny"
+
 # TPC-DS Q1 to test slightly more complex query.
 TPCDS_Q1 = """
 with customer_total_return as (
@@ -72,6 +75,8 @@ limit 100
 
 DEFAULT_RESOURCE_POOL = "default-pool"
 
+DEBUG_ACTION_DELAY_SCAN = "HDFS_SCANNER_THREAD_OBTAINED_RANGE:SLEEP@1000"
+
 
 class TestExecutorGroups(CustomClusterTestSuite):
   """This class contains tests that exercise the logic related to scaling 
clusters up and
@@ -393,19 +398,48 @@ class TestExecutorGroups(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="-admission_control_slots=1")
-  def test_coordinator_concurrency(self):
-    """Tests that the command line flag to limit the coordinator concurrency 
works as
-    expected."""
-    QUERY = "select sleep(1000)"
+  def test_coordinator_concurrency_default(self):
+    """Tests that concurrent coordinator-only queries ignore slot based 
admission
+    in non multiple executor group set setup."""
     # Add group with more slots than coordinator
     self._add_executor_group("group2", 2, admission_control_slots=3)
-    # Try to run two queries and observe that one gets queued
+    # Try to run two queries and observe that the second one is admitted 
immediately.
     client = self.client
-    q1 = client.execute_async(QUERY)
+    client.set_configuration_option('debug_action', DEBUG_ACTION_DELAY_SCAN)
+    q1 = client.execute_async(SINGLE_NODE_SCAN_QUERY)
     client.wait_for_admission_control(q1)
-    q2 = client.execute_async(QUERY)
-    self._assert_eventually_in_profile(q2, "Initial admission queue reason")
+    q2 = client.execute_async(SINGLE_NODE_SCAN_QUERY)
+    self._assert_eventually_in_profile(q2, "Admitted immediately")
+    # Assert other info strings in query profiles.
+    self._assert_eventually_in_profile(q1, "Admitted immediately")
+    self._assert_eventually_in_profile(q1, "empty group (using coordinator 
only)")
+    client.cancel(q1)
+    self._assert_eventually_in_profile(q2, "empty group (using coordinator 
only)")
+    client.cancel(q2)
+
+  @pytest.mark.execute_serially
+  def test_coordinator_concurrency_two_exec_group_cluster(self):
+    """Tests that concurrent coordinator-only queries respect slot based 
admission
+    in multiple executor group set setup."""
+    coordinator_test_args = "-admission_control_slots=1"
+    self._setup_two_coordinator_two_exec_group_cluster(coordinator_test_args)
+    # Create fresh clients
+    self.create_impala_clients()
+    # Try to run two queries and observe that the second one gets queued.
+    client = self.client
+    client.set_configuration_option('debug_action', DEBUG_ACTION_DELAY_SCAN)
+    q1 = client.execute_async(SINGLE_NODE_SCAN_QUERY)
+    client.wait_for_admission_control(q1)
+    q2 = client.execute_async(SINGLE_NODE_SCAN_QUERY)
+    self._assert_eventually_in_profile(q2,
+        "Not enough admission control slots available")
+    # Assert other info strings in query profiles.
+    self._assert_eventually_in_profile(q1, "Admitted immediately")
+    self._assert_eventually_in_profile(q1, "empty group (using coordinator 
only)")
+    # Cancel q1 so that q2 is then admitted and has 'Executor Group' info 
string.
     client.cancel(q1)
+    self._assert_eventually_in_profile(q2, "Admitted (queued)")
+    self._assert_eventually_in_profile(q2, "empty group (using coordinator 
only)")
     client.cancel(q2)
 
   @pytest.mark.execute_serially

Reply via email to