This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.4.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0ba5403ea3718e102b4e92b7ff8259de58a16c88 Author: wzhou-code <[email protected]> AuthorDate: Fri May 24 14:54:56 2024 -0700 IMPALA-13107: Don't start query on executor if instance number equals 0 In bad networking condition, TExecPlanFragmentInfo in KRPC messages received by executors could be truncated due to KRPC failures, but truncation may not cause thrift deserialization error. The invalid TExecPlanFragmentInfo causes Impala daemon to crash. To avoid crash, this patch checks number of instances in received TExecPlanFragment on executor. The query will not be started if number of instances equals 0. Also adds DCHECK on coordinator side to make sure it does not send TExecPlanFragment without any instance. Testing: - Passed core tests. - Passed exhaustive tests in debug build. The new DCHECKs were not hit. Change-Id: Ie92ee120f1e9369f8dc2512792a05b7f8be5f007 Reviewed-on: http://gerrit.cloudera.org:8080/21458 Reviewed-by: Wenzhe Zhou <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/runtime/coordinator-backend-state.cc | 5 +++++ be/src/service/control-service.cc | 11 ++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index e689c59b9..2ad027d15 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -131,6 +131,7 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options, fragment_info->__isset.fragment_instance_ctxs = true; fragment_info->fragment_instance_ctxs.resize( backend_exec_params_.instance_params().size()); + DCHECK_GT(fragment_info->fragment_instance_ctxs.size(), 0); for (int i = 0; i < backend_exec_params_.instance_params().size(); ++i) { TPlanFragmentInstanceCtx& instance_ctx = fragment_info->fragment_instance_ctxs[i]; PlanFragmentInstanceCtxPB* instance_ctx_pb = request->add_fragment_instance_ctxs(); @@ -181,6 +182,10 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options, // table construction. instance_ctx.__set_filters_produced(produced_it->second); } + DCHECK_GT(fragment_info->fragments.size(), 0); + DCHECK_EQ(fragment_info->fragments.size(), request->fragment_ctxs_size()); + DCHECK_EQ(fragment_info->fragment_instance_ctxs.size(), + request->fragment_instance_ctxs_size()); } void Coordinator::BackendState::SetExecError( diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc index c046520e3..d20922458 100644 --- a/be/src/service/control-service.cc +++ b/be/src/service/control-service.cc @@ -150,7 +150,16 @@ void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* req << " coord=" << query_ctx.coord_hostname << ":" << query_ctx.coord_ip_address.port << " #instances=" << fragment_info.fragment_instance_ctxs.size(); - Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery( + Status resp_status; + if (UNLIKELY(fragment_info.fragments.size() == 0 + || fragment_info.fragment_instance_ctxs.size() == 0)) { + resp_status = Status(Substitute("ExecQueryFInstances() failed: query_id=: $0, " + "no instance in TExecPlanFragmentInfo", PrintId(query_ctx.query_id))); + LOG(ERROR) << resp_status.msg().msg(); + RespondAndReleaseRpc(resp_status, response, rpc_context); + return; + } + resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery( request, query_ctx, fragment_info); if (!resp_status.ok()) { LOG(INFO) << "ExecQueryFInstances() failed: query_id=" << PrintId(query_ctx.query_id)
