This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 29e418679380591ea7b8e4bdb797ccdbb0964a3d Author: Riza Suminto <[email protected]> AuthorDate: Sun Apr 21 15:17:37 2024 -0700 IMPALA-13024: Ignore slots if using default pool and empty group Slot based admission should not be enabled when using default pool. There is a bug where coordinator-only query still does slot based admission because executor group name set to ClusterMembershipMgr::EMPTY_GROUP_NAME ("empty group (using coordinator only)"). This patch adds check to recognize coordinator-only query in default pool and skip slot based admission for it. Testing: - Add BE test AdmissionControllerTest.CanAdmitRequestSlotsDefault. - In test_executor_groups.py, split test_coordinator_concurrency to test_coordinator_concurrency_default and test_coordinator_concurrency_two_exec_group_cluster to show the behavior change. - Pass core tests in ASAN build. Change-Id: I0b08dea7ba0c78ac6b98c7a0b148df8fb036c4d0 Reviewed-on: http://gerrit.cloudera.org:8080/21340 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/scheduling/admission-controller-test.cc | 64 ++++++++++++++++++++++++++ be/src/scheduling/admission-controller.cc | 9 ++-- be/src/scheduling/admission-controller.h | 1 + be/src/scheduling/cluster-membership-mgr.cc | 5 +- be/src/scheduling/cluster-membership-mgr.h | 3 ++ be/src/scheduling/request-pool-service.cc | 5 +- be/src/scheduling/request-pool-service.h | 3 ++ tests/common/impala_connection.py | 6 ++- tests/custom_cluster/test_executor_groups.py | 50 ++++++++++++++++---- 9 files changed, 128 insertions(+), 18 deletions(-) diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc index babc848fe..d096bb90f 100644 --- a/be/src/scheduling/admission-controller-test.cc +++ b/be/src/scheduling/admission-controller-test.cc @@ -541,6 +541,13 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestSlots) { SetHostsInScheduleState( *schedule, 2, false, MEGABYTE, 200L * MEGABYTE, slots_per_query, slots_per_host); } + + // Coordinator only schedule at EMPTY_GROUP_NAME. + ScheduleState* coordinator_only_schedule = MakeScheduleState(QUEUE_D, config_d, + host_count, 30L * MEGABYTE, ClusterMembershipMgr::EMPTY_GROUP_NAME); + SetHostsInScheduleState(*coordinator_only_schedule, 1, true, MEGABYTE, 200L * MEGABYTE, + slots_per_query, slots_per_host); + vector<NetworkAddressPB> host_addrs = GetHostAddrs(*default_group_schedule); string not_admitted_reason; bool coordinator_resource_limited = false; @@ -572,6 +579,63 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestSlots) { "Not enough admission control slots available on host host1:25000. Needed 4 " "slots but 15/16 are already in use."); ASSERT_FALSE(coordinator_resource_limited); + // Assert that coordinator-only schedule also not admitted. + ASSERT_FALSE(admission_controller->CanAdmitRequest(*coordinator_only_schedule, config_d, + true, ¬_admitted_reason, nullptr, coordinator_resource_limited)); + EXPECT_STR_CONTAINS(not_admitted_reason, + "Not enough admission control slots available on host host0:25000. Needed 4 " + "slots but 15/16 are already in use."); + ASSERT_TRUE(coordinator_resource_limited); +} + +/// Test CanAdmitRequest() ignore slots mechanism in default pool setup. +TEST_F(AdmissionControllerTest, CanAdmitRequestSlotsDefault) { + // Pass the paths of the configuration files as command line flags. + FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-empty.xml"); + FLAGS_llama_site_path = GetResourceFile("llama-site-empty.xml"); + + AdmissionController* admission_controller = MakeAdmissionController(); + RequestPoolService* request_pool_service = admission_controller->request_pool_service_; + + // Get the PoolConfig for "default-pool". + TPoolConfig pool_config; + ASSERT_OK(request_pool_service->GetPoolConfig( + RequestPoolService::DEFAULT_POOL_NAME, &pool_config)); + + // Create ScheduleStates to run on "default-pool" on 12 hosts. + // Running both distributed and coordinator-only schedule. + int64_t host_count = 12; + int64_t slots_per_host = 16; + int64_t slots_per_query = 4; + // Distributed query schedule. + ScheduleState* default_pool_schedule = MakeScheduleState( + RequestPoolService::DEFAULT_POOL_NAME, pool_config, host_count, 30L * MEGABYTE); + SetHostsInScheduleState(*default_pool_schedule, 2, false, MEGABYTE, 200L * MEGABYTE, + slots_per_query, slots_per_host); + // Coordinator only schedule at EMPTY_GROUP_NAME. + ScheduleState* coordinator_only_schedule = + MakeScheduleState(RequestPoolService::DEFAULT_POOL_NAME, pool_config, host_count, + 30L * MEGABYTE, ClusterMembershipMgr::EMPTY_GROUP_NAME); + SetHostsInScheduleState(*coordinator_only_schedule, 1, true, MEGABYTE, 200L * MEGABYTE, + slots_per_query, slots_per_host); + + vector<NetworkAddressPB> host_addrs = GetHostAddrs(*default_pool_schedule); + string not_admitted_reason; + bool coordinator_resource_limited = false; + + // Simulate that all slots are being used. + // All schedules should be admitted. + SetSlotsInUse(admission_controller, host_addrs, slots_per_host); + ASSERT_TRUE(admission_controller->CanAdmitRequest(*default_pool_schedule, pool_config, + true, ¬_admitted_reason, nullptr, coordinator_resource_limited)) + << not_admitted_reason; + ASSERT_FALSE(coordinator_resource_limited); + ASSERT_EQ(coordinator_only_schedule->executor_group(), + ClusterMembershipMgr::EMPTY_GROUP_NAME); + ASSERT_TRUE(admission_controller->CanAdmitRequest(*coordinator_only_schedule, + pool_config, true, ¬_admitted_reason, nullptr, coordinator_resource_limited)) + << not_admitted_reason; + ASSERT_FALSE(coordinator_resource_limited); } /// Tests that query rejection works as expected by calling RejectForSchedule() and diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index af14a7010..a85e3ba47 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -998,13 +998,16 @@ bool AdmissionController::CanAdmitRequest(const ScheduleState& state, // Can't admit if: // (a) There are already queued requests (and this is not admitting from the queue). // (b) The resource pool is already at the maximum number of requests. - // (c) One of the executors in 'schedule' is already at its maximum number of requests - // (when not using the default executor group). + // (c) One of the executors or coordinator in 'schedule' is already at its maximum + // admission slots (when not using the default executor group). // (d) There are not enough memory resources available for the query. const int64_t max_requests = GetMaxRequestsForPool(pool_cfg); PoolStats* pool_stats = GetPoolStats(state); bool default_group = state.executor_group() == ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME; + bool default_coordinator_only = + state.executor_group() == ClusterMembershipMgr::EMPTY_GROUP_NAME + && state.request_pool() == RequestPoolService::DEFAULT_POOL_NAME; if (!admit_from_queue && pool_stats->local_stats().num_queued > 0) { *not_admitted_reason = Substitute(QUEUED_QUEUE_NOT_EMPTY, pool_stats->local_stats().num_queued, GetStalenessDetailLocked(" ")); @@ -1017,7 +1020,7 @@ bool AdmissionController::CanAdmitRequest(const ScheduleState& state, max_requests, GetStalenessDetailLocked(" ")); return false; } - if (!default_group + if (!default_group && !default_coordinator_only && !HasAvailableSlots( state, pool_cfg, not_admitted_reason, coordinator_resource_limited)) { // All non-default executor groups are also limited by the number of running queries diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index f72f5b8a0..9c4db809d 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -1216,6 +1216,7 @@ class AdmissionController { FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory); FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount); FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestSlots); + FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestSlotsDefault); FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue); FRIEND_TEST(AdmissionControllerTest, QueryRejection); FRIEND_TEST(AdmissionControllerTest, DedicatedCoordScheduleState); diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc index 615a385f2..a296b2f35 100644 --- a/be/src/scheduling/cluster-membership-mgr.cc +++ b/be/src/scheduling/cluster-membership-mgr.cc @@ -69,8 +69,6 @@ void RemoveExecutorAndGroup(const BackendDescriptorPB& be_desc, namespace impala { -static const string EMPTY_GROUP_NAME("empty group (using coordinator only)"); - static const string LIVE_EXEC_GROUP_KEY("cluster-membership.executor-groups.total"); static const string HEALTHY_EXEC_GROUP_KEY( "cluster-membership.executor-groups.total-healthy"); @@ -83,6 +81,9 @@ static const string HEALTHY_EXEC_GROUP_KEY_FORMAT( static const string TOTAL_BACKENDS_KEY_FORMAT( "cluster-membership.group-set.backends.total.$0"); +const string ClusterMembershipMgr::EMPTY_GROUP_NAME( + "empty group (using coordinator only)"); + ClusterMembershipMgr::ClusterMembershipMgr( string local_backend_id, StatestoreSubscriber* subscriber, MetricGroup* metrics) : empty_exec_group_(EMPTY_GROUP_NAME), diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h index 8c82b9972..bfbb7e453 100644 --- a/be/src/scheduling/cluster-membership-mgr.h +++ b/be/src/scheduling/cluster-membership-mgr.h @@ -78,6 +78,9 @@ class ClusterMembershipMgr { /// Maps executor group names to executor groups. typedef std::unordered_map<std::string, ExecutorGroup> ExecutorGroups; + /// Empty group name when query use coordinator only. + static const std::string EMPTY_GROUP_NAME; + // A snapshot of the current cluster membership. The ClusterMembershipMgr maintains a // consistent copy of this and updates it atomically when the membership changes. // Clients can obtain an immutable copy. Class instances can be created through the diff --git a/be/src/scheduling/request-pool-service.cc b/be/src/scheduling/request-pool-service.cc index dd3d2ff83..bae449611 100644 --- a/be/src/scheduling/request-pool-service.cc +++ b/be/src/scheduling/request-pool-service.cc @@ -78,9 +78,6 @@ DEFINE_bool(disable_pool_max_requests, false, "Disables all per-pool limits on t "maximum number of running requests."); -// Pool name used when the configuration files are not specified. -static const string DEFAULT_POOL_NAME = "default-pool"; - static const string RESOLVE_POOL_METRIC_NAME = "request-pool-service.resolve-pool-duration-ms"; static const string ERROR_USER_NOT_ALLOWED_IN_POOL = "Request from user '$0' with " @@ -88,6 +85,8 @@ static const string ERROR_USER_NOT_ALLOWED_IN_POOL = "Request from user '$0' wit static const string ERROR_USER_NOT_SPECIFIED = "User must be specified because " "-require_username=true."; +const string RequestPoolService::DEFAULT_POOL_NAME = "default-pool"; + RequestPoolService::RequestPoolService(MetricGroup* metrics) : resolve_pool_ms_metric_(NULL) { DCHECK(metrics != NULL); diff --git a/be/src/scheduling/request-pool-service.h b/be/src/scheduling/request-pool-service.h index 5fa097947..a92d8ee77 100644 --- a/be/src/scheduling/request-pool-service.h +++ b/be/src/scheduling/request-pool-service.h @@ -35,6 +35,9 @@ namespace impala { /// RequestPoolService, called via JNI. class RequestPoolService { public: + // Pool name used when the configuration files are not specified. + static const std::string DEFAULT_POOL_NAME; + /// Initializes the JNI method stubs if configuration files are specified. If any /// method can't be found, or if there is any further error, the constructor will /// terminate the process. diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py index 3f8f2a9e0..b008a05cb 100644 --- a/tests/common/impala_connection.py +++ b/tests/common/impala_connection.py @@ -64,6 +64,7 @@ def log_sql_stmt(sql_stmt): LOG.info("-- {0}".format(line)) LOG.info("-- [...]") + # Common wrapper around the internal types of HS2/Beeswax operation/query handles. class OperationHandle(object): def __init__(self, handle, sql_stmt): @@ -88,7 +89,7 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)): @abc.abstractmethod def set_configuration_option(self, name, value): - """Sets a configuraiton option name to the given value""" + """Sets a configuration option name to the given value""" pass def set_configuration(self, config_option_dict): @@ -258,7 +259,7 @@ class BeeswaxConnection(ImpalaConnection): LOG.info("-- getting log for operation: %s" % operation_handle) return self.__beeswax_client.get_log(operation_handle.get_handle().log_context) - def fetch(self, sql_stmt, operation_handle, max_rows = -1): + def fetch(self, sql_stmt, operation_handle, max_rows=-1): LOG.info("-- fetching results from: %s" % operation_handle) return self.__beeswax_client.fetch_results( sql_stmt, operation_handle.get_handle(), max_rows) @@ -532,6 +533,7 @@ def create_connection(host_port, use_kerberos=False, protocol='beeswax', c.set_configuration_option("client_identifier", tests.common.current_node) return c + def create_ldap_connection(host_port, user, password, use_ssl=False): return BeeswaxConnection(host_port=host_port, user=user, password=password, use_ssl=use_ssl) diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index de3491f20..b22fd15ae 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -44,6 +44,9 @@ CPU_TEST_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 l GROUPING_TEST_QUERY = ("select ss_item_sk from tpcds_parquet.store_sales" " group by (ss_item_sk) order by ss_item_sk limit 10") +# A scan query that is recognized as trivial and scheduled to run at coordinator only. +SINGLE_NODE_SCAN_QUERY = "select * from functional.alltypestiny" + # TPC-DS Q1 to test slightly more complex query. TPCDS_Q1 = """ with customer_total_return as ( @@ -72,6 +75,8 @@ limit 100 DEFAULT_RESOURCE_POOL = "default-pool" +DEBUG_ACTION_DELAY_SCAN = "HDFS_SCANNER_THREAD_OBTAINED_RANGE:SLEEP@1000" + class TestExecutorGroups(CustomClusterTestSuite): """This class contains tests that exercise the logic related to scaling clusters up and @@ -393,19 +398,48 @@ class TestExecutorGroups(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args="-admission_control_slots=1") - def test_coordinator_concurrency(self): - """Tests that the command line flag to limit the coordinator concurrency works as - expected.""" - QUERY = "select sleep(1000)" + def test_coordinator_concurrency_default(self): + """Tests that concurrent coordinator-only queries ignore slot based admission + in non multiple executor group set setup.""" # Add group with more slots than coordinator self._add_executor_group("group2", 2, admission_control_slots=3) - # Try to run two queries and observe that one gets queued + # Try to run two queries and observe that the second one is admitted immediately. client = self.client - q1 = client.execute_async(QUERY) + client.set_configuration_option('debug_action', DEBUG_ACTION_DELAY_SCAN) + q1 = client.execute_async(SINGLE_NODE_SCAN_QUERY) client.wait_for_admission_control(q1) - q2 = client.execute_async(QUERY) - self._assert_eventually_in_profile(q2, "Initial admission queue reason") + q2 = client.execute_async(SINGLE_NODE_SCAN_QUERY) + self._assert_eventually_in_profile(q2, "Admitted immediately") + # Assert other info strings in query profiles. + self._assert_eventually_in_profile(q1, "Admitted immediately") + self._assert_eventually_in_profile(q1, "empty group (using coordinator only)") + client.cancel(q1) + self._assert_eventually_in_profile(q2, "empty group (using coordinator only)") + client.cancel(q2) + + @pytest.mark.execute_serially + def test_coordinator_concurrency_two_exec_group_cluster(self): + """Tests that concurrent coordinator-only queries respect slot based admission + in multiple executor group set setup.""" + coordinator_test_args = "-admission_control_slots=1" + self._setup_two_coordinator_two_exec_group_cluster(coordinator_test_args) + # Create fresh clients + self.create_impala_clients() + # Try to run two queries and observe that the second one gets queued. + client = self.client + client.set_configuration_option('debug_action', DEBUG_ACTION_DELAY_SCAN) + q1 = client.execute_async(SINGLE_NODE_SCAN_QUERY) + client.wait_for_admission_control(q1) + q2 = client.execute_async(SINGLE_NODE_SCAN_QUERY) + self._assert_eventually_in_profile(q2, + "Not enough admission control slots available") + # Assert other info strings in query profiles. + self._assert_eventually_in_profile(q1, "Admitted immediately") + self._assert_eventually_in_profile(q1, "empty group (using coordinator only)") + # Cancel q1 so that q2 is then admitted and has 'Executor Group' info string. client.cancel(q1) + self._assert_eventually_in_profile(q2, "Admitted (queued)") + self._assert_eventually_in_profile(q2, "empty group (using coordinator only)") client.cancel(q2) @pytest.mark.execute_serially
