This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.4.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0ba5403ea3718e102b4e92b7ff8259de58a16c88
Author: wzhou-code <[email protected]>
AuthorDate: Fri May 24 14:54:56 2024 -0700

    IMPALA-13107: Don't start query on executor if instance number equals 0
    
    In bad networking condition, TExecPlanFragmentInfo in KRPC messages
    received by executors could be truncated due to KRPC failures, but
    truncation may not cause thrift deserialization error. The invalid
    TExecPlanFragmentInfo causes Impala daemon to crash.
    To avoid crash, this patch checks number of instances in received
    TExecPlanFragment on executor. The query will not be started if number
    of instances equals 0. Also adds DCHECK on coordinator side to make
    sure it does not send TExecPlanFragment without any instance.
    
    Testing:
     - Passed core tests.
     - Passed exhaustive tests in debug build. The new DCHECKs were not
       hit.
    
    Change-Id: Ie92ee120f1e9369f8dc2512792a05b7f8be5f007
    Reviewed-on: http://gerrit.cloudera.org:8080/21458
    Reviewed-by: Wenzhe Zhou <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/coordinator-backend-state.cc |  5 +++++
 be/src/service/control-service.cc           | 11 ++++++++++-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index e689c59b9..2ad027d15 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -131,6 +131,7 @@ void Coordinator::BackendState::SetRpcParams(const 
DebugOptions& debug_options,
   fragment_info->__isset.fragment_instance_ctxs = true;
   fragment_info->fragment_instance_ctxs.resize(
       backend_exec_params_.instance_params().size());
+  DCHECK_GT(fragment_info->fragment_instance_ctxs.size(), 0);
   for (int i = 0; i < backend_exec_params_.instance_params().size(); ++i) {
     TPlanFragmentInstanceCtx& instance_ctx = 
fragment_info->fragment_instance_ctxs[i];
     PlanFragmentInstanceCtxPB* instance_ctx_pb = 
request->add_fragment_instance_ctxs();
@@ -181,6 +182,10 @@ void Coordinator::BackendState::SetRpcParams(const 
DebugOptions& debug_options,
     // table construction.
     instance_ctx.__set_filters_produced(produced_it->second);
   }
+  DCHECK_GT(fragment_info->fragments.size(), 0);
+  DCHECK_EQ(fragment_info->fragments.size(), request->fragment_ctxs_size());
+  DCHECK_EQ(fragment_info->fragment_instance_ctxs.size(),
+      request->fragment_instance_ctxs_size());
 }
 
 void Coordinator::BackendState::SetExecError(
diff --git a/be/src/service/control-service.cc 
b/be/src/service/control-service.cc
index c046520e3..d20922458 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -150,7 +150,16 @@ void ControlService::ExecQueryFInstances(const 
ExecQueryFInstancesRequestPB* req
              << " coord=" << query_ctx.coord_hostname << ":"
              << query_ctx.coord_ip_address.port
              << " #instances=" << fragment_info.fragment_instance_ctxs.size();
-  Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(
+  Status resp_status;
+  if (UNLIKELY(fragment_info.fragments.size() == 0
+      || fragment_info.fragment_instance_ctxs.size() == 0)) {
+    resp_status = Status(Substitute("ExecQueryFInstances() failed: query_id=: 
$0, "
+        "no instance in TExecPlanFragmentInfo", PrintId(query_ctx.query_id)));
+    LOG(ERROR) << resp_status.msg().msg();
+    RespondAndReleaseRpc(resp_status, response, rpc_context);
+    return;
+  }
+  resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(
       request, query_ctx, fragment_info);
   if (!resp_status.ok()) {
     LOG(INFO) << "ExecQueryFInstances() failed: query_id=" << 
PrintId(query_ctx.query_id)

Reply via email to