This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0bee563073ef04b04446a2d1acf279ac4bc596e2
Author: Riza Suminto <[email protected]>
AuthorDate: Wed Jul 19 15:25:17 2023 -0700

    IMPALA-12300: Turn CheckEffectiveInstanceCount to print warning
    
    Scheduler::CheckEffectiveInstanceCount was added to check consistency
    between FE planning and BE scheduling if COMPUTE_PROCESSING_COST=true.
    This consistency can be broken if there is a cluster membership
    change (new executor becomes online) between FE planning and BE
    scheduling. Say, in executor group size 10 with 90% health threshold,
    admission-controller is allowed to run a query when only 9 executor is
    available. If 10th executor is online during the time between FE
    planning and BE scheduling, CheckEffectiveInstanceCount can fail and
    return error.
    
    This patch turn two error status in CheckEffectiveInstanceCount into
    warning, either to query profile as InfoString or WARNING log.
    MAX_FRAGMENT_INSTANCES_PER_NODE violation check stays to return error.
    
    Testing:
    - Add test_75_percent_availability
    - Pass test_executors.py
    
    Change-Id: Ieaf6a46c4f12dbf8b03d1618c2f090ab4f2ac665
    Reviewed-on: http://gerrit.cloudera.org:8080/20231
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/scheduling/scheduler.cc               | 42 +++++++++++--------
 be/src/scheduling/scheduler.h                |  2 +-
 tests/custom_cluster/test_executor_groups.py | 61 ++++++++++++++++++++++++++--
 3 files changed, 83 insertions(+), 22 deletions(-)

diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 64e8f9ce5..d5ebed49d 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -61,6 +61,7 @@ namespace impala {
 static const string 
LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
 static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
+static const string SCHEDULER_WARNING_KEY("Scheduler Warning");
 
 static const vector<TPlanNodeType::type> 
SCAN_NODE_TYPES{TPlanNodeType::HDFS_SCAN_NODE,
     TPlanNodeType::HBASE_SCAN_NODE, TPlanNodeType::DATA_SOURCE_NODE,
@@ -267,17 +268,23 @@ Status Scheduler::ComputeFragmentExecParams(
 }
 
 Status Scheduler::CheckEffectiveInstanceCount(
-    const FragmentScheduleState* fragment_state, const ScheduleState* state) {
+    const FragmentScheduleState* fragment_state, ScheduleState* state) {
   // These checks are only intended if COMPUTE_PROCESSING_COST=true.
   if (!state->query_options().compute_processing_cost) return Status::OK();
 
   int effective_instance_count = 
fragment_state->fragment.effective_instance_count;
   if (effective_instance_count < fragment_state->instance_states.size()) {
-    return Status(
-        Substitute("$0 scheduled $1 instances, higher than the effective count 
($2). "
-                   "Consider running the query with 
COMPUTE_PROCESSING_COST=false.",
-            fragment_state->fragment.display_name, 
fragment_state->instance_states.size(),
-            effective_instance_count));
+    if (state->summary_profile()->GetInfoString(SCHEDULER_WARNING_KEY) == 
nullptr) {
+      state->summary_profile()->AddInfoString(SCHEDULER_WARNING_KEY,
+          "Cluster membership might changed between planning and scheduling");
+    }
+
+    string warn_message = Substitute(
+        "$0 scheduled instance count ($1) is higher than its effective count 
($2)",
+        fragment_state->fragment.display_name, 
fragment_state->instance_states.size(),
+        effective_instance_count);
+    state->summary_profile()->AppendInfoString(SCHEDULER_WARNING_KEY, 
warn_message);
+    LOG(WARNING) << warn_message;
   }
 
   DCHECK(!fragment_state->instance_states.empty());
@@ -308,24 +315,23 @@ Status Scheduler::CheckEffectiveInstanceCount(
   QueryConstants qc;
   if (largest_inst_per_host > qc.MAX_FRAGMENT_INSTANCES_PER_NODE) {
     return Status(Substitute(
-        "$0 scheduled $1 instances, higher than maximum instances per node 
($2). "
-        "Consider running the query with COMPUTE_PROCESSING_COST=false.",
+        "$0 scheduled instance count ($1) is higher than maximum instances per 
node"
+        " ($2), indicating a planner bug. Consider running the query with"
+        " COMPUTE_PROCESSING_COST=false.",
         fragment_state->fragment.display_name, largest_inst_per_host,
         qc.MAX_FRAGMENT_INSTANCES_PER_NODE));
   }
 
   int planned_inst_per_host = ceil((float)effective_instance_count / num_host);
   if (largest_inst_per_host > planned_inst_per_host) {
-    stringstream err_msg;
-    err_msg << fragment_state->fragment.display_name
-            << " has imbalance number of instance to host assignment."
-            << " Consider running the query with 
COMPUTE_PROCESSING_COST=false."
-            << " Host " << 
fragment_state->instance_states[largest_inst_idx].host
-            << " has " << largest_inst_per_host << " instances assigned."
-            << " effective_instance_count=" << effective_instance_count
-            << " planned_inst_per_host=" << planned_inst_per_host
-            << " num_host=" << num_host;
-    return Status(err_msg.str());
+    LOG(WARNING) << fragment_state->fragment.display_name
+                 << " has imbalance number of instance to host assignment."
+                 << " Consider running the query with 
COMPUTE_PROCESSING_COST=false."
+                 << " Host " << 
fragment_state->instance_states[largest_inst_idx].host
+                 << " has " << largest_inst_per_host << " instances assigned."
+                 << " effective_instance_count=" << effective_instance_count
+                 << " planned_inst_per_host=" << planned_inst_per_host
+                 << " num_host=" << num_host;
   }
   return Status::OK();
 }
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 600a74851..ee82b4997 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -444,7 +444,7 @@ class Scheduler {
   /// instance_states size match with effective_instance_count. Fragment with 
UnionNode or
   /// ScanNode or one where IsExceedMaxFsWriters equals true is not checked.
   static Status CheckEffectiveInstanceCount(
-      const FragmentScheduleState* fragment_state, const ScheduleState* state);
+      const FragmentScheduleState* fragment_state, ScheduleState* state);
 
   /// Check if sink_fragment_state has hdfs_table_sink AND ref_fragment_state 
scheduled
   /// to exceed max_fs_writers query option.
diff --git a/tests/custom_cluster/test_executor_groups.py 
b/tests/custom_cluster/test_executor_groups.py
index dd8693d47..42c900b47 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -826,7 +826,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # The path to resources directory which contains the admission control 
config files.
     RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", 
"test",
                                  "resources")
-    # Define two group sets: tiny, small and large
+    # Define three group sets: tiny, small and large
     fs_allocation_path = os.path.join(RESOURCES_DIR, 
"fair-scheduler-3-groups.xml")
     # Define the min-query-mem-limit, max-query-mem-limit,
     # max-query-cpu-core-per-node-limit and 
max-query-cpu-core-coordinator-limit
@@ -844,8 +844,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
         "-llama_site_path %s "
         "%s ")
 
-    # Start with a regular admission config, multiple pools, no resource 
limits,
-    # and query_cpu_count_divisor=2.
+    # Start with a regular admission config, multiple pools, no resource 
limits.
     self._restart_coordinators(num_coordinators=1,
         extra_args=extra_args_template % (fs_allocation_path, llama_site_path,
           coordinator_test_args))
@@ -1384,3 +1383,59 @@ class TestExecutorGroups(CustomClusterTestSuite):
         QUERY, {'request_pool': 'queue1'}, "Executor Group: 
root.queue1-group2")
     self.client.close()
     second_coord_client.close()
+
+  @pytest.mark.execute_serially
+  def test_75_percent_availability(self):
+    """Test query planning and execution when only 75% of executor is up.
+    This test will run query over 8 node executor group at its healthy 
threshold (6) and
+    start the other 2 executor after query is planned.
+    """
+    coordinator_test_args = ''
+    # The path to resources directory which contains the admission control 
config files.
+    RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", 
"test",
+                                 "resources")
+
+    # Reuse cluster configuration from _setup_three_exec_group_cluster, but 
only start
+    # root.large executor groups.
+    fs_allocation_path = os.path.join(RESOURCES_DIR, 
"fair-scheduler-3-groups.xml")
+    llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-3-groups.xml")
+
+    # extra args template to start coordinator
+    extra_args_template = ("-vmodule admission-controller=3 "
+        "-admission_control_slots=8 "
+        "-expected_executor_group_sets=root.large:8 "
+        "-fair_scheduler_allocation_path %s "
+        "-llama_site_path %s "
+        "%s ")
+
+    # Start with a regular admission config, multiple pools, no resource 
limits.
+    self._restart_coordinators(num_coordinators=1,
+        extra_args=extra_args_template % (fs_allocation_path, llama_site_path,
+          coordinator_test_args))
+
+    # Create fresh client
+    self.create_impala_clients()
+    # Start root.large exec group with 8 admission slots and 6 executors.
+    self._add_executor_group("group", 6, num_executors=6, 
admission_control_slots=8,
+                             resource_pool="root.large", 
extra_args="-mem_limit=2g")
+    assert self._get_num_executor_groups(only_healthy=False) == 1
+    assert self._get_num_executor_groups(only_healthy=False,
+                                         exec_group_set_prefix="root.large") 
== 1
+
+    # Run query and let it compile, but delay admission for 5s
+    handle = self.execute_query_async(CPU_TEST_QUERY, {
+      "COMPUTE_PROCESSING_COST": "true",
+      "DEBUG_ACTION": "AC_BEFORE_ADMISSION:SLEEP@5000"})
+
+    # Start the next 2 executors.
+    self._add_executors("group", 6, num_executors=2, 
resource_pool="root.large",
+        extra_args="-mem_limit=2g", expected_num_impalads=9)
+
+    self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
+    profile = self.client.get_runtime_profile(handle)
+    assert "F00:PLAN FRAGMENT [RANDOM] hosts=6 instances=12" in profile, 
profile
+    assert ("Scheduler Warning: Cluster membership might changed between 
planning and "
+        "scheduling, F00 scheduled instance count (16) is higher than its 
effective "
+        "count (12)") in profile, profile
+    assert "00:SCAN HDFS               8     16" in profile, profile
+    self.client.close_query(handle)

Reply via email to