This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3e1b10556bc83b0e697b7a2aac411ccad6094563
Author: wzhou-code <[email protected]>
AuthorDate: Fri May 24 14:54:56 2024 -0700

    IMPALA-13107: Don't start query on executor if instance number equals 0
    
    In bad networking condition, TExecPlanFragmentInfo in KRPC messages
    received by executors could be truncated due to KRPC failures, but
    truncation may not cause thrift deserialization error. The invalid
    TExecPlanFragmentInfo causes Impala daemon to crash.
    To avoid crash, this patch checks number of instances in received
    TExecPlanFragment on executor. The query will not be started if number
    of instances equals 0. Also adds DCHECK on coordinator side to make
    sure it does not send TExecPlanFragment without any instance.
    
    Testing:
     - Passed core tests.
     - Passed exhaustive tests in debug build. The new DCHECKs were not
       hit.
    
    Change-Id: Ie92ee120f1e9369f8dc2512792a05b7f8be5f007
    Reviewed-on: http://gerrit.cloudera.org:8080/21458
    Reviewed-by: Wenzhe Zhou <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/coordinator-backend-state.cc |  5 +++++
 be/src/service/control-service.cc           | 11 ++++++++++-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index e689c59b9..2ad027d15 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -131,6 +131,7 @@ void Coordinator::BackendState::SetRpcParams(const 
DebugOptions& debug_options,
   fragment_info->__isset.fragment_instance_ctxs = true;
   fragment_info->fragment_instance_ctxs.resize(
       backend_exec_params_.instance_params().size());
+  DCHECK_GT(fragment_info->fragment_instance_ctxs.size(), 0);
   for (int i = 0; i < backend_exec_params_.instance_params().size(); ++i) {
     TPlanFragmentInstanceCtx& instance_ctx = 
fragment_info->fragment_instance_ctxs[i];
     PlanFragmentInstanceCtxPB* instance_ctx_pb = 
request->add_fragment_instance_ctxs();
@@ -181,6 +182,10 @@ void Coordinator::BackendState::SetRpcParams(const 
DebugOptions& debug_options,
     // table construction.
     instance_ctx.__set_filters_produced(produced_it->second);
   }
+  DCHECK_GT(fragment_info->fragments.size(), 0);
+  DCHECK_EQ(fragment_info->fragments.size(), request->fragment_ctxs_size());
+  DCHECK_EQ(fragment_info->fragment_instance_ctxs.size(),
+      request->fragment_instance_ctxs_size());
 }
 
 void Coordinator::BackendState::SetExecError(
diff --git a/be/src/service/control-service.cc 
b/be/src/service/control-service.cc
index c046520e3..d20922458 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -150,7 +150,16 @@ void ControlService::ExecQueryFInstances(const 
ExecQueryFInstancesRequestPB* req
              << " coord=" << query_ctx.coord_hostname << ":"
              << query_ctx.coord_ip_address.port
              << " #instances=" << fragment_info.fragment_instance_ctxs.size();
-  Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(
+  Status resp_status;
+  if (UNLIKELY(fragment_info.fragments.size() == 0
+      || fragment_info.fragment_instance_ctxs.size() == 0)) {
+    resp_status = Status(Substitute("ExecQueryFInstances() failed: query_id=: 
$0, "
+        "no instance in TExecPlanFragmentInfo", PrintId(query_ctx.query_id)));
+    LOG(ERROR) << resp_status.msg().msg();
+    RespondAndReleaseRpc(resp_status, response, rpc_context);
+    return;
+  }
+  resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(
       request, query_ctx, fragment_info);
   if (!resp_status.ok()) {
     LOG(INFO) << "ExecQueryFInstances() failed: query_id=" << 
PrintId(query_ctx.query_id)

Reply via email to