This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0bee563073ef04b04446a2d1acf279ac4bc596e2 Author: Riza Suminto <[email protected]> AuthorDate: Wed Jul 19 15:25:17 2023 -0700 IMPALA-12300: Turn CheckEffectiveInstanceCount to print warning Scheduler::CheckEffectiveInstanceCount was added to check consistency between FE planning and BE scheduling if COMPUTE_PROCESSING_COST=true. This consistency can be broken if there is a cluster membership change (new executor becomes online) between FE planning and BE scheduling. Say, in executor group size 10 with 90% health threshold, admission-controller is allowed to run a query when only 9 executor is available. If 10th executor is online during the time between FE planning and BE scheduling, CheckEffectiveInstanceCount can fail and return error. This patch turn two error status in CheckEffectiveInstanceCount into warning, either to query profile as InfoString or WARNING log. MAX_FRAGMENT_INSTANCES_PER_NODE violation check stays to return error. Testing: - Add test_75_percent_availability - Pass test_executors.py Change-Id: Ieaf6a46c4f12dbf8b03d1618c2f090ab4f2ac665 Reviewed-on: http://gerrit.cloudera.org:8080/20231 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/scheduling/scheduler.cc | 42 +++++++++++-------- be/src/scheduling/scheduler.h | 2 +- tests/custom_cluster/test_executor_groups.py | 61 ++++++++++++++++++++++++++-- 3 files changed, 83 insertions(+), 22 deletions(-) diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 64e8f9ce5..d5ebed49d 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -61,6 +61,7 @@ namespace impala { static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total"); static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total"); static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized"); +static const string SCHEDULER_WARNING_KEY("Scheduler Warning"); static const vector<TPlanNodeType::type> SCAN_NODE_TYPES{TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE, TPlanNodeType::DATA_SOURCE_NODE, @@ -267,17 +268,23 @@ Status Scheduler::ComputeFragmentExecParams( } Status Scheduler::CheckEffectiveInstanceCount( - const FragmentScheduleState* fragment_state, const ScheduleState* state) { + const FragmentScheduleState* fragment_state, ScheduleState* state) { // These checks are only intended if COMPUTE_PROCESSING_COST=true. if (!state->query_options().compute_processing_cost) return Status::OK(); int effective_instance_count = fragment_state->fragment.effective_instance_count; if (effective_instance_count < fragment_state->instance_states.size()) { - return Status( - Substitute("$0 scheduled $1 instances, higher than the effective count ($2). " - "Consider running the query with COMPUTE_PROCESSING_COST=false.", - fragment_state->fragment.display_name, fragment_state->instance_states.size(), - effective_instance_count)); + if (state->summary_profile()->GetInfoString(SCHEDULER_WARNING_KEY) == nullptr) { + state->summary_profile()->AddInfoString(SCHEDULER_WARNING_KEY, + "Cluster membership might changed between planning and scheduling"); + } + + string warn_message = Substitute( + "$0 scheduled instance count ($1) is higher than its effective count ($2)", + fragment_state->fragment.display_name, fragment_state->instance_states.size(), + effective_instance_count); + state->summary_profile()->AppendInfoString(SCHEDULER_WARNING_KEY, warn_message); + LOG(WARNING) << warn_message; } DCHECK(!fragment_state->instance_states.empty()); @@ -308,24 +315,23 @@ Status Scheduler::CheckEffectiveInstanceCount( QueryConstants qc; if (largest_inst_per_host > qc.MAX_FRAGMENT_INSTANCES_PER_NODE) { return Status(Substitute( - "$0 scheduled $1 instances, higher than maximum instances per node ($2). " - "Consider running the query with COMPUTE_PROCESSING_COST=false.", + "$0 scheduled instance count ($1) is higher than maximum instances per node" + " ($2), indicating a planner bug. Consider running the query with" + " COMPUTE_PROCESSING_COST=false.", fragment_state->fragment.display_name, largest_inst_per_host, qc.MAX_FRAGMENT_INSTANCES_PER_NODE)); } int planned_inst_per_host = ceil((float)effective_instance_count / num_host); if (largest_inst_per_host > planned_inst_per_host) { - stringstream err_msg; - err_msg << fragment_state->fragment.display_name - << " has imbalance number of instance to host assignment." - << " Consider running the query with COMPUTE_PROCESSING_COST=false." - << " Host " << fragment_state->instance_states[largest_inst_idx].host - << " has " << largest_inst_per_host << " instances assigned." - << " effective_instance_count=" << effective_instance_count - << " planned_inst_per_host=" << planned_inst_per_host - << " num_host=" << num_host; - return Status(err_msg.str()); + LOG(WARNING) << fragment_state->fragment.display_name + << " has imbalance number of instance to host assignment." + << " Consider running the query with COMPUTE_PROCESSING_COST=false." + << " Host " << fragment_state->instance_states[largest_inst_idx].host + << " has " << largest_inst_per_host << " instances assigned." + << " effective_instance_count=" << effective_instance_count + << " planned_inst_per_host=" << planned_inst_per_host + << " num_host=" << num_host; } return Status::OK(); } diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index 600a74851..ee82b4997 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -444,7 +444,7 @@ class Scheduler { /// instance_states size match with effective_instance_count. Fragment with UnionNode or /// ScanNode or one where IsExceedMaxFsWriters equals true is not checked. static Status CheckEffectiveInstanceCount( - const FragmentScheduleState* fragment_state, const ScheduleState* state); + const FragmentScheduleState* fragment_state, ScheduleState* state); /// Check if sink_fragment_state has hdfs_table_sink AND ref_fragment_state scheduled /// to exceed max_fs_writers query option. diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index dd8693d47..42c900b47 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -826,7 +826,7 @@ class TestExecutorGroups(CustomClusterTestSuite): # The path to resources directory which contains the admission control config files. RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", "resources") - # Define two group sets: tiny, small and large + # Define three group sets: tiny, small and large fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-3-groups.xml") # Define the min-query-mem-limit, max-query-mem-limit, # max-query-cpu-core-per-node-limit and max-query-cpu-core-coordinator-limit @@ -844,8 +844,7 @@ class TestExecutorGroups(CustomClusterTestSuite): "-llama_site_path %s " "%s ") - # Start with a regular admission config, multiple pools, no resource limits, - # and query_cpu_count_divisor=2. + # Start with a regular admission config, multiple pools, no resource limits. self._restart_coordinators(num_coordinators=1, extra_args=extra_args_template % (fs_allocation_path, llama_site_path, coordinator_test_args)) @@ -1384,3 +1383,59 @@ class TestExecutorGroups(CustomClusterTestSuite): QUERY, {'request_pool': 'queue1'}, "Executor Group: root.queue1-group2") self.client.close() second_coord_client.close() + + @pytest.mark.execute_serially + def test_75_percent_availability(self): + """Test query planning and execution when only 75% of executor is up. + This test will run query over 8 node executor group at its healthy threshold (6) and + start the other 2 executor after query is planned. + """ + coordinator_test_args = '' + # The path to resources directory which contains the admission control config files. + RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", + "resources") + + # Reuse cluster configuration from _setup_three_exec_group_cluster, but only start + # root.large executor groups. + fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-3-groups.xml") + llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-3-groups.xml") + + # extra args template to start coordinator + extra_args_template = ("-vmodule admission-controller=3 " + "-admission_control_slots=8 " + "-expected_executor_group_sets=root.large:8 " + "-fair_scheduler_allocation_path %s " + "-llama_site_path %s " + "%s ") + + # Start with a regular admission config, multiple pools, no resource limits. + self._restart_coordinators(num_coordinators=1, + extra_args=extra_args_template % (fs_allocation_path, llama_site_path, + coordinator_test_args)) + + # Create fresh client + self.create_impala_clients() + # Start root.large exec group with 8 admission slots and 6 executors. + self._add_executor_group("group", 6, num_executors=6, admission_control_slots=8, + resource_pool="root.large", extra_args="-mem_limit=2g") + assert self._get_num_executor_groups(only_healthy=False) == 1 + assert self._get_num_executor_groups(only_healthy=False, + exec_group_set_prefix="root.large") == 1 + + # Run query and let it compile, but delay admission for 5s + handle = self.execute_query_async(CPU_TEST_QUERY, { + "COMPUTE_PROCESSING_COST": "true", + "DEBUG_ACTION": "AC_BEFORE_ADMISSION:SLEEP@5000"}) + + # Start the next 2 executors. + self._add_executors("group", 6, num_executors=2, resource_pool="root.large", + extra_args="-mem_limit=2g", expected_num_impalads=9) + + self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60) + profile = self.client.get_runtime_profile(handle) + assert "F00:PLAN FRAGMENT [RANDOM] hosts=6 instances=12" in profile, profile + assert ("Scheduler Warning: Cluster membership might changed between planning and " + "scheduling, F00 scheduled instance count (16) is higher than its effective " + "count (12)") in profile, profile + assert "00:SCAN HDFS 8 16" in profile, profile + self.client.close_query(handle)
