This is an automated email from the ASF dual-hosted git repository. jasonmfehr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ec6585fa7e6126a38cbd66eeb6e62ddaad3551b2 Author: Michael Smith <[email protected]> AuthorDate: Fri Sep 13 10:06:15 2024 -0700 IMPALA-915: Support cancel queries in frontend Adds support to cancel a query during Frontend planning or metadata operations. Frontend planning is handled by createExecRequest, so registers Java Threads executing createExecRequest by their query ID and provides cancelExecRequest to interrupt the Thread for a particular query ID. Cancellation is implemented by setting a boolean for the thread, and calling Thread.interrupt to trigger InterruptedException from any wait calls. Several ignored wait calls are updated to check the boolean and throw an exception if the query has been cancelled, interrupting those operations. Adds periodic checks to the planning process to interrupt planning. They're primarily useful when planning is waiting on catalogd/HMS. If planning gets into an algorithmically complex operation, it will not be interrupted. Removes check_inflight, as we can now cancel a query before it's inflight. In the case that cancellation doesn't happen immediately - because we're in a busy frontend loop that can't be interrupted - /cancel will block until the frontend reaches an interruption point and returns to the backend to finalize the query. When analysis returns, cancellation is finalized in the backend. The /cancel_query request returns once the query is cancelled. Cancelling a request can no longer fail, so additional checks for whether the request has been cancelled before it started executing are added. Removes setting UpdateQueryStatus when GetExecRequest returns because that's already handled in ImpalaServer::Execute when it calls UnregisterQuery in response to an error, and constitutes an update race on the status with UnregisterQuery triggered by CancelQueryHandler. We want to use the status from CancelQueryHandler in this case as it provides more context (about who initiated the cancel); the result of GetExecRequest is just UserCancelledException. Avoids calling UnregisterQuery in Execute if the query is already finalized to avoid redundant "Invalid or unknown query handle" logs. Extends idle_query_statuses_ to save status for any query interrupted by an external process - cancelled by a user or timeout - so they can be handled consistently. Testing: - updates test_query_cancel_created to cancel a CREATED query - added tests to cancel a query while metadata loading is delayed - removes test_query_cancel_exception, as it no longer demonstrates relevant behavior; cancelling a query that will encounter an exception before the exception occurs is no different than other queries - ran query_test/test_cancellation.py in exhaustive mode - ran query_test/test_cancellation.py w/ DEFAULT_TEST_PROTOCOL=beeswax - updates cancellation tests that expect INVALID_QUERY_HANDLE to accept Cancelled, which is sometimes returned by interrupted query status. Change-Id: I0d25d4c7fb0b8dcc7dad9510db1e8dca220eeb86 Reviewed-on: http://gerrit.cloudera.org:8080/21803 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/runtime/query-driver.cc | 15 +- be/src/runtime/query-driver.h | 3 +- be/src/runtime/runtime-state.h | 1 + be/src/service/child-query.cc | 2 +- be/src/service/client-request-state.cc | 35 ++-- be/src/service/client-request-state.h | 3 + be/src/service/frontend.cc | 7 +- be/src/service/frontend.h | 3 + be/src/service/impala-beeswax-server.cc | 14 +- be/src/service/impala-hs2-server.cc | 11 +- be/src/service/impala-http-handler.cc | 2 +- be/src/service/impala-server.cc | 101 +++++------ be/src/service/impala-server.h | 37 ++-- be/src/service/internal-server.cc | 2 +- .../impala/common/UserCancelledException.java | 28 +++ .../java/org/apache/impala/service/Canceller.java | 113 ++++++++++++ .../java/org/apache/impala/service/Frontend.java | 23 ++- .../org/apache/impala/service/JniFrontend.java | 10 +- .../impala/analysis/ExprCardinalityTest.java | 2 +- .../impala/analysis/StmtMetadataLoaderTest.java | 4 +- tests/common/test_result_verifier.py | 11 +- tests/query_test/test_kill_query.py | 4 +- tests/util/cancel_util.py | 8 +- tests/util/web_pages_util.py | 60 +++++++ tests/webserver/test_web_pages.py | 194 ++++++++------------- 25 files changed, 438 insertions(+), 255 deletions(-) diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc index 27880fecc..810bb9e43 100644 --- a/be/src/runtime/query-driver.cc +++ b/be/src/runtime/query-driver.cc @@ -91,8 +91,8 @@ Status QueryDriver::DoFrontendPlanning(const TQueryCtx& query_ctx, bool use_requ TExecRequest exec_request; RETURN_IF_ERROR( DebugAction(query_ctx.client_request.query_options, "FRONTEND_PLANNER")); - RETURN_IF_ERROR(client_request_state_->UpdateQueryStatus( - ExecEnv::GetInstance()->frontend()->GetExecRequest(query_ctx, &exec_request))); + RETURN_IF_ERROR( + ExecEnv::GetInstance()->frontend()->GetExecRequest(query_ctx, &exec_request)); DumpTExecReq(exec_request, "internal", client_request_state_->query_id()); if (use_request) exec_request_.reset(new TExecRequest(move(exec_request))); @@ -492,18 +492,11 @@ void QueryDriver::HandleRetryFailure(Status* status, string* error_msg, Substitute("Failed to retry query $0", PrintId(request_state->query_id()))); status->AddDetail(*error_msg); discard_result(request_state->UpdateQueryStatus(*status)); - parent_server_->UnregisterQueryDiscardResult(retry_query_id, false, status); + parent_server_->UnregisterQueryDiscardResult(retry_query_id, status); } Status QueryDriver::Finalize( - QueryHandle* query_handle, bool check_inflight, const Status* cause) { - // If the query's not inflight yet, return an appropriate error. If the query - // has been finalized and removed from inflight_queries (but not yet removed - // from query_driver_map_) we want to fall-through to the next check. - if (check_inflight && !(*query_handle)->is_inflight() && !finalized_.Load()) { - return Status("Query not yet running"); - } - + QueryHandle* query_handle, const Status* cause) { if (!finalized_.CompareAndSwap(false, true)) { // Return error as-if the query was already unregistered, so that it appears to the // client as-if unregistration already happened. We don't need a distinct diff --git a/be/src/runtime/query-driver.h b/be/src/runtime/query-driver.h index 6219c885f..9ad0e3e10 100644 --- a/be/src/runtime/query-driver.h +++ b/be/src/runtime/query-driver.h @@ -183,9 +183,8 @@ class QueryDriver { /// This indicates that the query should no longer be considered registered from the /// client's point of view. Returns an INVALID_QUERY_HANDLE error if finalization /// already started. After this method has been called, finalized() will return true. - /// If 'check_inflight' is true and the query is not yet inflight, Finalize will error. /// 'cause' is passed to ClientRequestState::Finalize(Status). - Status Finalize(QueryHandle* query_handle, bool check_inflight, const Status* cause); + Status Finalize(QueryHandle* query_handle, const Status* cause); /// Delete this query from the given QueryDriverMap. Status Unregister(ImpalaServer::QueryDriverMap* query_driver_map) WARN_UNUSED_RESULT; diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 6b5f39ebb..4e9dbf3e1 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -450,6 +450,7 @@ class RuntimeState { RuntimeState(const RuntimeState&); }; +// Helper for any state that implements is_cancelled, such as ClientRequestState. #define RETURN_IF_CANCELLED(state) \ do { \ if (UNLIKELY((state)->is_cancelled())) return Status::CANCELLED; \ diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc index d2d783f59..1172055f0 100644 --- a/be/src/service/child-query.cc +++ b/be/src/service/child-query.cc @@ -167,7 +167,7 @@ void ChildQuery::Cancel() { if (!status.ok()) { LOG(ERROR) << "Failed to cancel child query: " << status.GetDetail(); } - status = parent_server_->UnregisterQuery(query_id, true, &Status::CANCELLED); + status = parent_server_->UnregisterQuery(query_id, &Status::CANCELLED); if (!status.ok()) { LOG(ERROR) << "Failed to unregister child query: " << status.GetDetail(); } diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index eefefe438..673047110 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -252,8 +252,6 @@ void ClientRequestState::SetBlacklistedExecutorAddresses( } Status ClientRequestState::Exec() { - MarkActive(); - const TExecRequest& exec_req = exec_request(); profile_->AddChild(server_profile_); summary_profile_->AddInfoString("Query Type", PrintValue(stmt_type())); @@ -289,6 +287,10 @@ Status ClientRequestState::Exec() { query_ctx_.client_request.query_options.mt_dop)); } + // Don't start executing the query if Cancel() was called between planning and Exec(). + RETURN_IF_CANCELLED(this); + MarkActive(); + switch (exec_req.stmt_type) { case TStmtType::QUERY: case TStmtType::DML: @@ -619,11 +621,8 @@ Status ClientRequestState::ExecQueryOrDmlRequest( PrintTableList(query_exec_request.query_ctx.tables_missing_diskids)); } - { - lock_guard<mutex> l(lock_); - // Don't start executing the query if Cancel() was called concurrently with Exec(). - if (is_cancelled_) return Status::CANCELLED; - } + // Don't start executing the query if Cancel() was called concurrently with Exec(). + RETURN_IF_CANCELLED(this); if (isAsync) { // Don't transition to PENDING inside the FinishExecQueryOrDmlRequest thread because // the query should be in the PENDING state before the Exec RPC returns. @@ -1121,7 +1120,6 @@ Status ClientRequestState::ExecEventProcessorCmd() { } void ClientRequestState::Finalize(const Status* cause) { - UnRegisterCompletedRPCs(); Cancel(cause, /*wait_until_finalized=*/true); MarkActive(); // Make sure we join on wait_thread_ before we finish (and especially before this object @@ -1295,8 +1293,7 @@ Status ClientRequestState::WaitInternal() { // further if the query has been cancelled. If so, return immediately as there will // be no query result available (IMPALA-11006). if (isCTAS) { - lock_guard<mutex> l(lock_); - if (is_cancelled_) return Status::CANCELLED; + RETURN_IF_CANCELLED(this); } } @@ -1551,6 +1548,18 @@ Status ClientRequestState::FetchRowsInternal(const int32_t max_rows, } void ClientRequestState::Cancel(const Status* cause, bool wait_until_finalized) { + // If planning is not done, attempt to cancel query in the frontend. + if (!is_planning_done_.load()) { + Status status = frontend_->CancelExecRequest(query_id()); + if (!status.ok()) { + LOG(ERROR) << "Error cancelling planning for query " << PrintId(query_id()) + << ": " << status; + } + } + + // Clean up completed RPCs before cancelling backends. + UnRegisterCompletedRPCs(); + { lock_guard<mutex> lock(lock_); // If the query has reached a terminal state, no need to update the state. @@ -1869,6 +1878,12 @@ void ClientRequestState::MarkActive() { ++ref_count_; } +// Used by RETURN_IF_CANCELLED. +bool ClientRequestState::is_cancelled() { + lock_guard<mutex> l(lock_); + return is_cancelled_; +} + std::optional<long> getIcebergSnapshotId(const TExecRequest& exec_req) { DCHECK(exec_req.__isset.catalog_op_request); DCHECK(exec_req.catalog_op_request.__isset.ddl_params); diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index 347f8cc9f..c4d06c533 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -830,6 +830,9 @@ class ClientRequestState { /// actively processed. Takes expiration_data_lock_. void MarkActive(); + /// Returns true if request is cancelled. Acquires lock_ to avoid dirty reads. + bool is_cancelled(); + /// Sets up profile and pre-execution counters, creates the query schedule, and calls /// FinishExecQueryOrDmlRequest() which contains the core logic of executing a QUERY or /// DML execution request. When 'async' is true, spawn a thread to run diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc index 50c701270..967f839e2 100644 --- a/be/src/service/frontend.cc +++ b/be/src/service/frontend.cc @@ -154,7 +154,8 @@ Frontend::Frontend() { {"validateSaml2Response", "([B)[B", &validate_saml2_response_id_}, {"validateSaml2Bearer", "([B)Ljava/lang/String;", &validate_saml2_bearer_id_}, {"abortKuduTransaction", "([B)V", &abort_kudu_txn_}, - {"commitKuduTransaction", "([B)V", &commit_kudu_txn_} + {"commitKuduTransaction", "([B)V", &commit_kudu_txn_}, + {"cancelExecRequest", "([B)V", &cancel_exec_request_id_} }; JniMethodDescriptor staticMethods[] = { @@ -322,6 +323,10 @@ Status Frontend::GetExecRequest( return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result); } +Status Frontend::CancelExecRequest(const TUniqueId& query_id) { + return JniUtil::CallJniMethod(fe_, cancel_exec_request_id_, query_id); +} + Status Frontend::GetExplainPlan( const TQueryCtx& query_ctx, string* explain_string) { return JniUtil::CallJniMethod(fe_, get_explain_plan_id_, query_ctx, explain_string); diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h index e5780305d..cfb67f8ee 100644 --- a/be/src/service/frontend.h +++ b/be/src/service/frontend.h @@ -57,6 +57,8 @@ class Frontend { /// Call FE to get TExecRequest. Status GetExecRequest(const TQueryCtx& query_ctx, TExecRequest* result); + Status CancelExecRequest(const TUniqueId& query_id); + /// Get the metrics from the catalog used by this frontend. Status GetCatalogMetrics(TGetCatalogMetricsResult* resp); @@ -301,6 +303,7 @@ class Frontend { jmethodID convertTable; // JniFrontend.convertTable jmethodID get_secret_from_key_store_; // JniFrontend.getSecretFromKeyStore() jmethodID hive_legacy_timezone_convert_; // JniFrontend.hiveLegacyTimezoneConvert() + jmethodID cancel_exec_request_id_; // JniFrontend.cancelExecRequest() // Only used for testing. jmethodID build_test_descriptor_table_id_; // JniFrontend.buildTestDescriptorTable() diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index 15d907119..1fe84f97d 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -81,14 +81,14 @@ void ImpalaServer::query(beeswax::QueryHandle& beeswax_handle, const Query& quer // us to advance query state to FINISHED or EXCEPTION Status status = query_handle->WaitAsync(); if (!status.ok()) { - discard_result(UnregisterQuery(query_handle->query_id(), false, &status)); + discard_result(UnregisterQuery(query_handle->query_id(), &status)); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } // Once the query is running do a final check for session closure and add it to the // set of in-flight queries. status = SetQueryInflight(session, query_handle); if (!status.ok()) { - discard_result(UnregisterQuery(query_handle->query_id(), false, &status)); + discard_result(UnregisterQuery(query_handle->query_id(), &status)); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } TUniqueIdToBeeswaxHandle(query_handle->query_id(), &beeswax_handle); @@ -132,7 +132,7 @@ void ImpalaServer::executeAndWait(beeswax::QueryHandle& beeswax_handle, // set of in-flight queries. Status status = SetQueryInflight(session, query_handle); if (!status.ok()) { - discard_result(UnregisterQuery(query_handle->query_id(), false, &status)); + discard_result(UnregisterQuery(query_handle->query_id(), &status)); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } // block until results are ready @@ -142,7 +142,7 @@ void ImpalaServer::executeAndWait(beeswax::QueryHandle& beeswax_handle, status = query_handle->query_status(); } if (!status.ok()) { - discard_result(UnregisterQuery(query_handle->query_id(), false, &status)); + discard_result(UnregisterQuery(query_handle->query_id(), &status)); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } @@ -206,7 +206,7 @@ void ImpalaServer::fetch(Results& query_results, VLOG_ROW << "fetch result: #results=" << query_results.data.size() << " has_more=" << (query_results.has_more ? "true" : "false"); if (!status.ok()) { - discard_result(UnregisterQuery(query_id, false, &status)); + discard_result(UnregisterQuery(query_id, &status)); RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR); } } @@ -295,7 +295,7 @@ void ImpalaServer::close(const beeswax::QueryHandle& beeswax_handle) { // Make query id available to the following RaiseBeeswaxException(). ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_id); - RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR); + RAISE_IF_ERROR(UnregisterQuery(query_id), SQLSTATE_GENERAL_ERROR); } beeswax::QueryState::type ImpalaServer::get_state( @@ -702,7 +702,7 @@ Status ImpalaServer::CloseInsertInternal( Status query_status; query_handle->GetDmlStats(dml_result, &query_status); - RETURN_IF_ERROR(UnregisterQuery(query_id, true)); + RETURN_IF_ERROR(UnregisterQuery(query_id)); return query_status; } } diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index fe52c9cbf..5310c6f3c 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -172,7 +172,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, Status exec_status = query_handle->Exec(*request); if (!exec_status.ok()) { - discard_result(UnregisterQuery(query_handle->query_id(), false, &exec_status)); + discard_result(UnregisterQuery(query_handle->query_id(), &exec_status)); status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); status->__set_errorMessage(Substitute( QUERY_ERROR_FORMAT, PrintId(query_handle->query_id()), exec_status.GetDetail())); @@ -184,7 +184,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, Status inflight_status = SetQueryInflight(session, query_handle); if (!inflight_status.ok()) { - discard_result(UnregisterQuery(query_handle->query_id(), false, &inflight_status)); + discard_result(UnregisterQuery(query_handle->query_id(), &inflight_status)); status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS); status->__set_errorMessage(Substitute(QUERY_ERROR_FORMAT, PrintId(query_handle->query_id()), inflight_status.GetDetail())); @@ -652,7 +652,7 @@ void ImpalaServer::ExecuteStatementCommon(TExecuteStatementResp& return_val, return; return_error: - discard_result(UnregisterQuery(query_handle->query_id(), false, &status)); + discard_result(UnregisterQuery(query_handle->query_id(), &status)); HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); } @@ -1003,8 +1003,7 @@ void ImpalaServer::CloseImpalaOperation(TCloseImpalaOperationResp& return_val, } // TODO: use timeout to get rid of unwanted query_handle. - HS2_RETURN_IF_ERROR(return_val, UnregisterQuery(query_id, true), - SQLSTATE_GENERAL_ERROR); + HS2_RETURN_IF_ERROR(return_val, UnregisterQuery(query_id), SQLSTATE_GENERAL_ERROR); return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); } @@ -1104,7 +1103,7 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val, if (status.IsRecoverableError()) { DCHECK(fetch_first); } else { - discard_result(UnregisterQuery(query_id, false, &status)); + discard_result(UnregisterQuery(query_id, &status)); } HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR); return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index f12e08a6f..80cabca08 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -264,7 +264,7 @@ void ImpalaHttpHandler::CancelQueryHandler(const Webserver::WebRequest& req, " '$0' at $1", req.source_user, req.source_socket)); // Web UI doesn't have access to secret so we can't validate it. We assume that // web UI is allowed to close queries. - status = server_->UnregisterQuery(unique_id, true, &cause); + status = server_->UnregisterQuery(unique_id, &cause, /* interrupted */ true); if (!status.ok()) { Value error(status.GetDetail(), document->GetAllocator()); document->AddMember("error", error, document->GetAllocator()); diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 9584405ad..572b9841c 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1316,8 +1316,9 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx, query_handle->query_driver()->IncludeInQueryLog(include_in_query_log); - if (!status.ok() && registered_query) { - UnregisterQueryDiscardResult((*query_handle)->query_id(), false, &status); + // Unregister query if it was registered and not yet finalized. + if (!status.ok() && registered_query && !query_handle->query_driver()->finalized()) { + UnregisterQueryDiscardResult((*query_handle)->query_id(), &status); } return status; } @@ -1589,15 +1590,26 @@ void ImpalaServer::UpdateExecSummary(const QueryHandle& query_handle) const { query_handle->summary_profile()->AddInfoStringRedacted("Errors", join(errors, "\n")); } -Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_inflight, - const Status* cause) { +Status ImpalaServer::UnregisterQuery( + const TUniqueId& query_id, const Status* cause, bool interrupted) { VLOG_QUERY << "UnregisterQuery(): query_id=" << PrintId(query_id); QueryHandle query_handle; - RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle)); + // Skips updating RPCs since we'll finalize them right after, and this avoids + // acquiring a ClientRequestState lock. + RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle, /* skip_rpcs */ true)); + + DebugActionNoFail(query_handle->query_options(), "FINALIZE_INFLIGHT_QUERY"); - if (check_inflight) { - DebugActionNoFail(query_handle->query_options(), "FINALIZE_INFLIGHT_QUERY"); + if (interrupted) { + // Register interrupted query status with the session. + shared_ptr<SessionState> session = query_handle->session(); + lock_guard<mutex> l(session->lock); + if (!session->closed) { + lock_guard<mutex> l(interrupted_query_statuses_lock_); + interrupted_query_statuses_.emplace(query_id, *cause); + session->interrupted_queries.emplace_back(query_id); + } } // Do the work of unregistration that needs to be done synchronously. Once @@ -1606,7 +1618,7 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli // unregistration work. Finalize() succeeds for the first thread to call it to avoid // multiple threads unregistering. RETURN_IF_ERROR( - query_handle.query_driver()->Finalize(&query_handle, check_inflight, cause)); + query_handle.query_driver()->Finalize(&query_handle, cause)); // Do the rest of the unregistration work in the background so that the client does // not need to wait for profile serialization, etc. @@ -1634,8 +1646,8 @@ void ImpalaServer::FinishUnregisterQuery(const QueryHandle& query_handle) { } void ImpalaServer::UnregisterQueryDiscardResult( - const TUniqueId& query_id, bool check_inflight, const Status* cause) { - Status status = UnregisterQuery(query_id, check_inflight, cause); + const TUniqueId& query_id, const Status* cause, bool interrupted) { + Status status = UnregisterQuery(query_id, cause, interrupted); if (!status.ok()) { LOG(ERROR) << Substitute("Query de-registration for query_id=$0 failed: $1", PrintId(query_id), cause->GetDetail()); @@ -1768,28 +1780,31 @@ shared_ptr<QueryDriver> ImpalaServer::GetQueryDriver( return entry->second; } +Status ImpalaServer::GetInterruptedStatus(const TUniqueId& query_id) { + lock_guard<mutex> l(interrupted_query_statuses_lock_); + auto it = interrupted_query_statuses_.find(query_id); + if (it != interrupted_query_statuses_.end()) { + return it->second; + } + return Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id)); +} + Status ImpalaServer::GetActiveQueryHandle( - const TUniqueId& query_id, QueryHandle* query_handle) { + const TUniqueId& query_id, QueryHandle* query_handle, bool skip_rpcs) { DCHECK(query_handle != nullptr); shared_ptr<QueryDriver> query_driver = GetQueryDriver(query_id); if (UNLIKELY(query_driver == nullptr)) { - { - lock_guard<mutex> l(idle_query_statuses_lock_); - auto it = idle_query_statuses_.find(query_id); - if (it != idle_query_statuses_.end()) { - return it->second; - } - } - - Status err = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id)); + Status err = GetInterruptedStatus(query_id); VLOG(1) << err.GetDetail(); return err; } query_handle->SetHandle(query_driver, query_driver->GetActiveClientRequestState()); - // Update RPC Stats before every call. This is done here to minimize the - // pending set size and keep the profile updated while the query is executing. - (*query_handle)->UnRegisterCompletedRPCs(); - (*query_handle)->RegisterRPC(); + if (!skip_rpcs) { + // Update RPC Stats before every call. This is done here to minimize the + // pending set size and keep the profile updated while the query is executing. + (*query_handle)->UnRegisterCompletedRPCs(); + (*query_handle)->RegisterRPC(); + } return Status::OK(); } @@ -1798,11 +1813,11 @@ Status ImpalaServer::GetQueryHandle( DCHECK(query_handle != nullptr); shared_ptr<QueryDriver> query_driver = GetQueryDriver(query_id, return_unregistered); if (UNLIKELY(query_driver == nullptr)) { - return Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id)); + return GetInterruptedStatus(query_id); } ClientRequestState* request_state = query_driver->GetClientRequestState(query_id); if (UNLIKELY(request_state == nullptr)) { - return Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id)); + return GetInterruptedStatus(query_id); } query_handle->SetHandle(query_driver, request_state); return Status::OK(); @@ -1815,7 +1830,7 @@ Status ImpalaServer::GetAllQueryHandles(const TUniqueId& query_id, DCHECK(original_query_handle != nullptr); shared_ptr<QueryDriver> query_driver = GetQueryDriver(query_id, return_unregistered); if (UNLIKELY(query_driver == nullptr)) { - return Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id)); + return GetInterruptedStatus(query_id); } active_query_handle->SetHandle(query_driver, query_driver->GetActiveClientRequestState()); @@ -1828,10 +1843,6 @@ Status ImpalaServer::CancelInternal(const TUniqueId& query_id) { VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id); QueryHandle query_handle; RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle)); - if (!query_handle->is_inflight()) { - // Error if the query is not yet inflight as we have no way to cleanly cancel it. - return Status("Query not yet running"); - } query_handle->Cancel(/*cause=*/ nullptr); return Status::OK(); } @@ -1850,7 +1861,7 @@ Status ImpalaServer::KillQuery( if (status.code() != TErrorCode::INVALID_QUERY_HANDLE) { // The current impalad is the coordinator of the query. RETURN_IF_ERROR(status); - status = UnregisterQuery(query_id, true, &Status::CANCELLED); + status = UnregisterQuery(query_id, &Status::CANCELLED, true); if (status.ok() || status.code() == TErrorCode::INVALID_QUERY_HANDLE) { // There might be another thread that has already unregistered the query // before UnregisterQuery() and after CancelInternal(). In this case we are done. @@ -1896,7 +1907,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id, DecrementSessionCount(session_state->connected_user); } unordered_set<TUniqueId> inflight_queries; - vector<TUniqueId> idled_queries; + vector<TUniqueId> interrupted_queries; { lock_guard<mutex> l(session_state->lock); DCHECK(!session_state->closed); @@ -1904,18 +1915,18 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id, // Since closed is true, no more queries will be added to the inflight list. inflight_queries.insert(session_state->inflight_queries.begin(), session_state->inflight_queries.end()); - idled_queries.swap(session_state->idled_queries); + interrupted_queries.swap(session_state->interrupted_queries); } // Unregister all open queries from this session. Status status = Status::Expected("Session closed"); for (const TUniqueId& query_id: inflight_queries) { // TODO: deal with an error status - UnregisterQueryDiscardResult(query_id, false, &status); + UnregisterQueryDiscardResult(query_id, &status); } { - lock_guard<mutex> l(idle_query_statuses_lock_); - for (const TUniqueId& query_id: idled_queries) { - idle_query_statuses_.erase(query_id); + lock_guard<mutex> l(interrupted_query_statuses_lock_); + for (const TUniqueId& query_id: interrupted_queries) { + interrupted_query_statuses_.erase(query_id); } } // Reconfigure the poll period of session_maintenance_thread_ if necessary. @@ -2129,7 +2140,7 @@ void ImpalaServer::CancelFromThreadPool(const CancellationWork& cancellation_wor } if (cancellation_work.unregister()) { - UnregisterQueryDiscardResult(cancellation_work.query_id(), true, &error); + UnregisterQueryDiscardResult(cancellation_work.query_id(), &error, true); } else { // Retry queries that would otherwise be cancelled due to an impalad leaving the // cluster. CancellationWorkCause::BACKEND_FAILED indicates that a backend running @@ -2968,18 +2979,8 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) { preserved_status = crs->query_status(); } preserved_status.MergeStatus(status); - { - shared_ptr<SessionState> session = crs->session(); - lock_guard<mutex> l(session->lock); - if (!session->closed) { - lock_guard<mutex> l(idle_query_statuses_lock_); - idle_query_statuses_.emplace( - expiration_event->query_id, move(preserved_status)); - session->idled_queries.emplace_back(expiration_event->query_id); - } - } - ExpireQuery(crs, status, true); + ExpireQuery(crs, preserved_status, true); expiration_event = queries_by_timestamp_.erase(expiration_event); } else { // Iterator is moved on in every other branch. diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index b404d7cae..df1802e05 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -187,7 +187,7 @@ class TQueryExecRequest; /// 1. session_state_map_lock_ /// 2. query_expiration_lock_ /// 3. SessionState::lock -/// 4. idle_query_statuses_lock_ +/// 4. interrupted_query_statuses_lock_ /// 5. ClientRequestState::fetch_rows_lock /// 6. ClientRequestState::lock /// 7. ClientRequestState::expiration_data_lock_ @@ -671,8 +671,8 @@ class ImpalaServer : public ImpalaServiceIf, /// inflight_queries. In that case we add it to prestopped_queries instead. std::set<TUniqueId> prestopped_queries; - /// Unregistered queries we need to clear from idle_query_statuses_ on closure. - std::vector<TUniqueId> idled_queries; + /// Unregistered queries we need to clear from interrupted_query_statuses_ on closure. + std::vector<TUniqueId> interrupted_queries; /// Total number of queries run as part of this session. int64_t total_queries; @@ -828,16 +828,15 @@ class ImpalaServer : public ImpalaServiceIf, /// Starts the process of unregistering the query. The query is cancelled on the /// current thread, then asynchronously the query's entry is removed from /// query_driver_map_ and the session state's in-flight query list. - /// If check_inflight is true, then return an error if the query - /// is not yet in-flight. Otherwise, proceed even if the query isn't yet in-flight (for - /// cleaning up after an error on the query issuing path). - Status UnregisterQuery(const TUniqueId& query_id, bool check_inflight, - const Status* cause = NULL) WARN_UNUSED_RESULT; + /// If a query is interrupted by an external process, set 'interrupted' to save 'cause' + /// in the Session so clients can retrieve the cause after the query is unregistered. + Status UnregisterQuery(const TUniqueId& query_id, const Status* cause = NULL, + bool interrupted = false) WARN_UNUSED_RESULT; /// Delegates to UnregisterQuery. If UnregisterQuery returns an error Status, the /// status is logged and then discarded. void UnregisterQueryDiscardResult( - const TUniqueId& query_id, bool check_inflight, const Status* cause = NULL); + const TUniqueId& query_id, const Status* cause = NULL, bool interrupted = false); /// Unregisters the provided query, does all required finalization and removes it from /// 'query_driver_map_'. @@ -1004,10 +1003,10 @@ class ImpalaServer : public ImpalaServiceIf, /// Returns the active QueryHandle for this query id. The QueryHandle contains the /// active ClientRequestState. Returns an error Status if the query id cannot be found. - /// If caller is an RPC thread, RPC context will be registered for time tracking - /// See QueryDriver for a description of active ClientRequestStates. + /// If caller is an RPC thread, RPC context will be registered for time tracking unless + /// skip_rpcs is true. See QueryDriver for a description of active ClientRequestStates. Status GetActiveQueryHandle( - const TUniqueId& query_id, QueryHandle* query_handle); + const TUniqueId& query_id, QueryHandle* query_handle, bool skip_rpcs = false); /// Similar to 'GetActiveQueryHandle' except it does not return the active handle, it /// returns the handle directly associated with the given query id. Returns an error @@ -1032,6 +1031,9 @@ class ImpalaServer : public ImpalaServiceIf, std::shared_ptr<QueryDriver> GetQueryDriver( const TUniqueId& query_id, bool return_unregistered = false); + /// Returns status from interrupted_query_statuses_, or INVALID_QUERY_HANDLE. + Status GetInterruptedStatus(const TUniqueId& query_id); + /// Beeswax private methods /// Helper functions to translate between Beeswax and Impala structs @@ -1442,12 +1444,13 @@ class ImpalaServer : public ImpalaServiceIf, typedef boost::unordered_map<TUniqueId, std::shared_ptr<SessionState>> SessionStateMap; SessionStateMap session_state_map_; - /// Protects idle_query_statuses_; - std::mutex idle_query_statuses_lock_; + /// Protects interrupted_query_statuses_; + std::mutex interrupted_query_statuses_lock_; - /// A map of queries that were stopped due to idle timeout and the status they had when - /// unregistered. Used to return a more useful error when looking up unregistered IDs. - std::map<TUniqueId, Status> idle_query_statuses_; + /// A map of queries that were interrupted by an external process and the status they + /// had when unregistered. Used to return a more useful error when looking up + /// unregistered IDs. + std::map<TUniqueId, Status> interrupted_query_statuses_; /// Protects connection_to_sessions_map_. See "Locking" in the class comment for lock /// acquisition order. diff --git a/be/src/service/internal-server.cc b/be/src/service/internal-server.cc index dca054af0..b127fab1c 100644 --- a/be/src/service/internal-server.cc +++ b/be/src/service/internal-server.cc @@ -280,7 +280,7 @@ void ImpalaServer::CloseQuery(const TUniqueId& query_id) { return; } - UnregisterQueryDiscardResult(query_handle->query_id(), false); + UnregisterQueryDiscardResult(query_handle->query_id()); } // ImpalaServer::CloseQuery void ImpalaServer::GetConnectionContextList( diff --git a/fe/src/main/java/org/apache/impala/common/UserCancelledException.java b/fe/src/main/java/org/apache/impala/common/UserCancelledException.java new file mode 100644 index 000000000..608c5d34e --- /dev/null +++ b/fe/src/main/java/org/apache/impala/common/UserCancelledException.java @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.common; + +/** + * Thrown to interrupt Frontend execution when cancellation is requested. + */ +public class UserCancelledException extends ImpalaException { + static public final String MSG = "Query cancelled by user request"; + public UserCancelledException() { + super(MSG); + } +} diff --git a/fe/src/main/java/org/apache/impala/service/Canceller.java b/fe/src/main/java/org/apache/impala/service/Canceller.java new file mode 100644 index 000000000..a51d17796 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/service/Canceller.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.service; + +import static org.apache.impala.util.TUniqueIdUtil.PrintId; + +import com.google.common.base.Preconditions; + +import org.apache.impala.common.Pair; +import org.apache.impala.common.UserCancelledException; +import org.apache.impala.thrift.TUniqueId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; + +/** + * Global Java thread cancellation handler. Query-related operations register their thread + * for a given query ID, allowing a Cancel call to interrupt that thread when requested. + * + * Uses a separate UserCancelledException to interrupt execution, as that has a more + * specific meaning than InterruptedException (which our code sometimes ignores). + */ +public final class Canceller { + private final static Logger LOG = LoggerFactory.getLogger(Canceller.class); + + // Registers threads by their query ID so the threads can be interrupted on + // cancellation, along with a thread-local flag to indicate cancellation. + private final static ConcurrentHashMap<TUniqueId, Pair<Thread, AtomicBoolean>> + queryThreads_ = new ConcurrentHashMap<>(); + // Registers that the current thread has been cancelled and should clean up. + private final static ThreadLocal<AtomicBoolean> cancelled_ = new ThreadLocal<>(); + + private Canceller() { /* Private constructor to prevent instantiation. */ } + + public static class Registration implements AutoCloseable { + private final TUniqueId queryId_; + + public Registration(TUniqueId queryId) { queryId_ = queryId; } + + /** + * Removes association of current thread with queryId. + */ + public void close() { + LOG.trace("unregister {}", PrintId(queryId_)); + Pair<Thread, AtomicBoolean> curr = queryThreads_.remove(queryId_); + Preconditions.checkState(curr.first == Thread.currentThread()); + Preconditions.checkState(curr.second == cancelled_.get()); + cancelled_.remove(); + } + } + + /** + * Associates the current thread with queryId. + */ + public static Registration register(TUniqueId queryId) { + if (queryId == null) return null; + LOG.trace("register {}", PrintId(queryId)); + cancelled_.set(new AtomicBoolean(false)); + queryThreads_.put(queryId, new Pair<>(Thread.currentThread(), cancelled_.get())); + return new Registration(queryId); + } + + /** + * Cancels thread associated with queryId. + */ + public static void cancel(TUniqueId queryId) { + if (queryId == null) return; + Pair<Thread, AtomicBoolean> queryPair = queryThreads_.get(queryId); + if (queryPair == null) { + LOG.info( + "Unable to cancel request: thread for query {} not found", PrintId(queryId)); + return; + } + + Thread queryThread = queryPair.first; + LOG.debug( + "Cancelling request: thread {} for query {}", queryThread, PrintId(queryId)); + queryPair.second.set(true); + queryThread.interrupt(); + } + + /** + * Throws UserCancelledException if the current thread is cancelled. + */ + public static void throwIfCancelled() throws UserCancelledException { + if (isCancelled()) { throw new UserCancelledException(); } + } + + private static boolean isCancelled() { + AtomicBoolean cancelled = cancelled_.get(); + return cancelled != null && cancelled.get(); + } +} diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 17ab88f38..434d4e9d6 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -137,7 +137,6 @@ import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.local.LocalCatalog; import org.apache.impala.catalog.IcebergPositionDeleteTable; -import org.apache.impala.catalog.ImpaladCatalog; import org.apache.impala.catalog.ImpaladTableUsageTracker; import org.apache.impala.catalog.MaterializedViewHdfsTable; import org.apache.impala.catalog.MetaStoreClientPool; @@ -149,6 +148,7 @@ import org.apache.impala.catalog.Type; import org.apache.impala.catalog.local.InconsistentMetadataFetchException; import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.UserCancelledException; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; @@ -168,6 +168,7 @@ import org.apache.impala.planner.HdfsScanNode; import org.apache.impala.planner.PlanFragment; import org.apache.impala.planner.Planner; import org.apache.impala.planner.ScanNode; +import org.apache.impala.service.Frontend; import org.apache.impala.thrift.CatalogLookupStatus; import org.apache.impala.thrift.TAlterDbParams; import org.apache.impala.thrift.TBackendGflags; @@ -270,9 +271,6 @@ public class Frontend { // Max time to wait for a catalog update notification. public static final long MAX_CATALOG_UPDATE_WAIT_TIME_MS = 2 * 1000; - // TODO: Make the reload interval configurable. - private static final int AUTHORIZATION_POLICY_RELOAD_INTERVAL_SECS = 5 * 60; - // Maximum number of times to retry a query if it fails due to inconsistent metadata. private static final int INCONSISTENT_METADATA_NUM_RETRIES = (BackendConfig.INSTANCE != null) ? @@ -1299,7 +1297,7 @@ public class Frontend { * of the pendingCheckTasks. */ private void filterUnaccessibleElements(List<Future<Boolean>> pendingCheckTasks, - List<?> checkList) throws InternalException { + List<?> checkList) throws UserCancelledException, InternalException { int failedCheckTasks = 0; int index = 0; Iterator<?> iter = checkList.iterator(); @@ -1311,6 +1309,7 @@ public class Frontend { if (!pendingCheckTasks.get(index).get()) iter.remove(); index++; } catch (ExecutionException | InterruptedException e) { + Canceller.throwIfCancelled(); failedCheckTasks++; LOG.error("Encountered an error checking access", e); break; @@ -1481,7 +1480,7 @@ public class Frontend { * accessible to 'user'. */ public List<? extends FeDb> getDbs(PatternMatcher matcher, User user) - throws InternalException { + throws UserCancelledException, InternalException { List<? extends FeDb> dbs = getCatalog().getDbs(matcher); boolean needsAuthChecks = authzFactory_.getAuthorizationConfig().isEnabled() @@ -1912,7 +1911,7 @@ public class Frontend { * * @see ImpaladCatalog#isReady(), CatalogdMetaProvider#isReady() */ - public void waitForCatalog() { + public void waitForCatalog() throws UserCancelledException { LOG.info("Waiting for first catalog update from the statestore."); int numTries = 0; long startTimeMs = System.currentTimeMillis(); @@ -2051,7 +2050,8 @@ public class Frontend { throws ImpalaException { // Timeline of important events in the planning process, used for debugging // and profiling. - try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) { + try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope(); + Canceller.Registration reg = Canceller.register(planCtx.queryCtx_.query_id)) { EventSequence timeline = new EventSequence("Query Compilation"); // a wrapper of the getTExecRequest is in the factory so the implementation // can handle various planner fallback execution logic (e.g. allowing one @@ -2402,6 +2402,8 @@ public class Frontend { Preconditions.checkState( !clientSetRequestPool || !queryOptions.getRequest_pool().isEmpty()); + Canceller.throwIfCancelled(); + boolean coordOnlyRequestPool = false; if (clientSetRequestPool && RequestPoolService.getInstance() != null) { @@ -2449,6 +2451,8 @@ public class Frontend { + executorGroupSetsToUse, numExecutorGroupSets); + Canceller.throwIfCancelled(); + TExecRequest req = null; // Capture the current state. @@ -2473,6 +2477,7 @@ public class Frontend { expectedTotalCores(executorGroupSetsToUse.get(numExecutorGroupSets - 1)); int i = 0; while (i < numExecutorGroupSets) { + Canceller.throwIfCancelled(); boolean isLastEG = (i == numExecutorGroupSets - 1); boolean skipResourceCheckingAtLastEG = isLastEG && BackendConfig.INSTANCE.isSkipResourceCheckingOnLastExecutorGroupSet(); @@ -2491,6 +2496,7 @@ public class Frontend { String retryMsg = ""; while (true) { + Canceller.throwIfCancelled(); try { req = doCreateExecRequest(compilerFactory, planCtx, warnings, timeline); markTimelineRetries(attempt, retryMsg, timeline); @@ -2511,6 +2517,7 @@ public class Frontend { INCONSISTENT_METADATA_NUM_RETRIES); } } + Canceller.throwIfCancelled(); // Counters about this group set. int availableCores = expectedTotalCores(group_set); diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java index 0edc351c9..8a4dca548 100644 --- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java +++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java @@ -44,6 +44,7 @@ import org.apache.impala.catalog.FeDataSource; import org.apache.impala.catalog.FeDb; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.Function; +import org.apache.impala.common.UserCancelledException; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; @@ -187,6 +188,13 @@ public class JniFrontend { } } + public void cancelExecRequest(byte[] thriftQueryId) throws ImpalaException { + Preconditions.checkNotNull(frontend_); + TUniqueId queryId = new TUniqueId(); + JniUtil.deserializeThrift(protocolFactory_, queryId, thriftQueryId); + Canceller.cancel(queryId); + } + // Deserialize and merge each thrift catalog update into a single merged update public byte[] updateCatalogCache(byte[] req) throws ImpalaException, TException { Preconditions.checkNotNull(frontend_); @@ -656,7 +664,7 @@ public class JniFrontend { frontend_.getCatalog().setIsReady(true); } - public void waitForCatalog() { + public void waitForCatalog() throws UserCancelledException { Preconditions.checkNotNull(frontend_); frontend_.waitForCatalog(); } diff --git a/fe/src/test/java/org/apache/impala/analysis/ExprCardinalityTest.java b/fe/src/test/java/org/apache/impala/analysis/ExprCardinalityTest.java index de786ae48..66fd76fb4 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ExprCardinalityTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ExprCardinalityTest.java @@ -116,7 +116,7 @@ public class ExprCardinalityTest { */ @Test - public void testMetadata() throws DatabaseNotFoundException, InternalException { + public void testMetadata() throws ImpalaException { Catalog catalog = session_.catalog(); Db db = catalog.getDb("functional"); StmtMetadataLoader mdLoader = diff --git a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java index f5a4ef93d..e40a26d9f 100644 --- a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java @@ -114,7 +114,7 @@ public class StmtMetadataLoaderTest { // Assume tables in the stmt are not acid tables. private void validateUncached(ParsedStatement stmt, Frontend fe, int expectedNumLoadRequests, int expectedNumCatalogUpdates, - String[] expectedDbs, String[] expectedTables) throws InternalException { + String[] expectedDbs, String[] expectedTables) throws ImpalaException { EventSequence timeline = new EventSequence("Test Timeline"); StmtMetadataLoader mdLoader = new StmtMetadataLoader(fe, Catalog.DEFAULT_DB, timeline); @@ -132,7 +132,7 @@ public class StmtMetadataLoaderTest { } private void validateCached(ParsedStatement stmt, Frontend fe, - String[] expectedDbs, String[] expectedTables) throws InternalException { + String[] expectedDbs, String[] expectedTables) throws ImpalaException { EventSequence timeline = new EventSequence("Test Timeline"); StmtMetadataLoader mdLoader = new StmtMetadataLoader(fe, Catalog.DEFAULT_DB, timeline); diff --git a/tests/common/test_result_verifier.py b/tests/common/test_result_verifier.py index 3a9045a81..d44b046a6 100644 --- a/tests/common/test_result_verifier.py +++ b/tests/common/test_result_verifier.py @@ -840,6 +840,9 @@ def error_msg_startswith(actual_msg, expected_msg="", query_id=None): - Otherwise, it checks if the `query_id` part in the actual error message matches the format using the regular expression. + `expected_msg` may also be an array of strings, in which case the function checks + whether the actual error message starts with any of the strings in the array. + NOTE: Messages of errors such as "Invalid session id" do not contain a query id since such an error may occur before a query id is generated. """ @@ -848,13 +851,17 @@ def error_msg_startswith(actual_msg, expected_msg="", query_id=None): start = actual_msg.find(ERROR_PROMPT) if start == -1: return False - return actual_msg.startswith(expected_msg, start + len(ERROR_PROMPT)) + start += len(ERROR_PROMPT) else: ERROR_PROMPT = "Query " + QUERY_ID_REGEX + " failed:\n" m = re.search(ERROR_PROMPT, actual_msg) if m is None: return False - return actual_msg.startswith(expected_msg, m.end()) + start = m.end() + for msg in expected_msg if isinstance(expected_msg, list) else [expected_msg]: + if actual_msg.startswith(msg, start): + return True + return False def error_msg_equal(msg1, msg2): diff --git a/tests/query_test/test_kill_query.py b/tests/query_test/test_kill_query.py index c456c3fe5..4a60aa40e 100644 --- a/tests/query_test/test_kill_query.py +++ b/tests/query_test/test_kill_query.py @@ -73,11 +73,11 @@ class TestKillQuery(ImpalaTestSuite): assert_kill_ok(client, query_id_to_kill) assert_kill_error( client, - "Could not find query on any coordinator", + ["Could not find query on any coordinator", "Cancelled"], query_id=query_id_to_kill, ) assert_kill_error( client, - "Could not find query on any coordinator", + ["Could not find query on any coordinator", "Cancelled"], query_id=query_id_to_kill, ) diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py index c98ffc013..2977b143b 100644 --- a/tests/util/cancel_util.py +++ b/tests/util/cancel_util.py @@ -64,17 +64,13 @@ class QueryToKill: # will be "Cancelled". assert error_msg_startswith( str(self.exc), - "Invalid or unknown query handle", - self.client.handle_id(self.handle), - ) or error_msg_startswith( - str(self.exc), - "Cancelled", + ["Invalid or unknown query handle", "Cancelled"], self.client.handle_id(self.handle), ) try: self.client.fetch(self.sql, self.handle) except Exception as ex: - assert "Invalid or unknown query handle" in str(ex) + assert "Invalid or unknown query handle" in str(ex) or "Cancelled" in str(ex) finally: self.client.close() diff --git a/tests/util/web_pages_util.py b/tests/util/web_pages_util.py index a8b6b37a5..4b294d893 100644 --- a/tests/util/web_pages_util.py +++ b/tests/util/web_pages_util.py @@ -16,8 +16,11 @@ # under the License. from __future__ import absolute_import, division, print_function +from multiprocessing import Process, Queue import json +import requests +from tests.util.retry import retry def get_num_completed_backends(service, query_id): """Get the number of completed backends for the given query_id from the @@ -53,3 +56,60 @@ def get_mem_admitted_backends_debug_page(cluster, ac_process=None): else: ret['executor'].append(parse_mem_value(backend['mem_admitted'])) return ret + + +def cancel(impalad, query_id): + """Cancel a query via /cancel_query on impalad.""" + cancel_query_url = "http://{0}:{1}/cancel_query?json&query_id={2}"\ + .format(impalad.webserver_interface, impalad.webserver_port, query_id) + response = requests.post(cancel_query_url) + assert response.status_code == requests.codes.ok + response_json = json.loads(response.text) + assert response_json['contents'] == "Query cancellation successful" + + +def wait_for_state(impalad, state): + """Wait for the inflight query to reach 'state' on impalad. If state=None, wait for + no inflight queries.""" + def is_state(): + in_flight_queries = impalad.get_debug_webpage_json('queries')['in_flight_queries'] + if state: + return len(in_flight_queries) > 0 and in_flight_queries[0]['state'] == state + else: + return len(in_flight_queries) <= 0 + + assert retry(is_state) + + +def assert_query_stopped(impalad, query_id): + """Assert all queries are complete, and query_id is in completed_queries, on impalad.""" + response_json = impalad.get_debug_webpage_json('queries') + assert response_json['num_in_flight_queries'] == 0 + assert response_json['num_waiting_queries'] == 0 + + expected_queries = [q for q in response_json['completed_queries'] + if q['query_id'] == query_id] + assert len(expected_queries) == 1 + + +def run(client, query, queue): + """Execute query and put results or errors into queue.""" + try: + queue.put(str(client.execute(query))) + except Exception as ex: + queue.put(str(ex)) + + +def start(client, query): + """Execute a query in a separate process. Returns (process, queue).""" + queue = Queue() + proc = Process(target=run, args=(client, query, queue)) + proc.start() + return proc, queue + + +def join(proc, queue): + """Returns result from queue returned by 'start', and waits for proc to complete.""" + result = queue.get() + proc.join() + return result diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py index 8ae846235..bcb88d2f4 100644 --- a/tests/webserver/test_web_pages.py +++ b/tests/webserver/test_web_pages.py @@ -18,17 +18,18 @@ from __future__ import absolute_import, division, print_function from tests.common.environ import ImpalaTestClusterFlagsDetector from tests.common.file_utils import grep_dir -from tests.common.skip import SkipIfBuildType +from tests.common.skip import SkipIfBuildType, SkipIfCatalogV2 from tests.common.impala_cluster import ImpalaCluster -from tests.common.impala_connection import FINISHED, RUNNING, MinimalHS2Connection -from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT, ImpalaTestSuite +from tests.common.impala_connection import FINISHED, RUNNING +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.test_vector import HS2 from tests.util.filesystem_utils import supports_storage_ids from tests.util.parse_util import parse_duration_string_ms -from tests.common.test_vector import HS2 +from tests.util.web_pages_util import ( + cancel, wait_for_state, assert_query_stopped, start, join) from datetime import datetime -from multiprocessing import Process, Queue from prometheus_client.parser import text_string_to_metric_families -from time import sleep, time +from multiprocessing import Process import itertools import json import logging @@ -1062,134 +1063,75 @@ class TestWebPage(ImpalaTestSuite): assert found, "Query {} not found in response_json\n{}".format( query_id, json.dumps(response_json, sort_keys=True, indent=4)) - def try_until(self, desc, run, check, timeout=10, interval=0.1): - start_time = time() - while (time() - start_time < timeout): - result = run() - if check(result): - return result - sleep(interval) - assert False, "Timed out waiting for " + desc - - def get_queries(self): - responses = self.get_and_check_status( - self.QUERIES_URL + "?json", ports_to_test=[25000]) - assert len(responses) == 1 - response_json = json.loads(responses[0].text) - return response_json - + # CatalogV2 doesn't have the delay loading metadata after invalidate, so this test + # is only applicable to CatalogV1. test_query_cancel_load_tables is sufficient for V2. + @SkipIfCatalogV2.catalog_v1_test() @pytest.mark.execute_serially - def test_query_cancel_created(self): - """Tests that if we cancel a query in the CREATED state, it still finishes and we can - cancel it.""" - # Use MinimalHS2Connection because it has simpler concurrency than hs2_client. - with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as client: - delay_created_action = "impalad_load_tables_delay:SLEEP@1000" - client.set_configuration(dict(debug_action=delay_created_action)) - self._run_test_query_cancel_created(client) - - def _run_test_query_cancel_created(self, client): - query = "select count(*) from functional_parquet.alltypes" - - response_json = self.try_until("test baseline", self.get_queries, - lambda resp: resp['num_in_flight_queries'] == 0) + def test_query_cancel_load_metadata(self): + """Tests that we can cancel a query in the CREATED state while catalogd loads + metadata. Invalidate metadata introduces a delay while catalogd loads metadata.""" + impalad = ImpalaCluster.get_e2e_test_cluster().impalads[0].service + wait_for_state(impalad, None) + result = self.execute_query("invalidate metadata functional_parquet.alltypes") + assert result.success + # Start the query completely async. The server doesn't return a response until # the query has exited the CREATED state, so we need to get the query ID another way. - proc = Process(target=lambda cli, q: cli.execute_async(q), args=(client, query)) - proc.start() - - response_json = self.try_until("query creation", self.get_queries, - lambda resp: resp['num_in_flight_queries'] > 0) - assert len(response_json['in_flight_queries']) == 1 - assert response_json['in_flight_queries'][0]['state'] == 'CREATED' - query_id = response_json['in_flight_queries'][0]['query_id'] - - cancel_query_url = "{0}cancel_query?json&query_id={1}".format(self.ROOT_URL.format - ("25000"), query_id) - response = requests.get(cancel_query_url) - assert response.status_code == requests.codes.ok - response_json = json.loads(response.text) - assert response_json['error'] == "Query not yet running\n" - - # Wait for query to start running. It should finish soon after. - proc.join() - response_json = self.try_until("query finished", self.get_queries, - lambda resp: resp['in_flight_queries'][0]['state'] == 'FINISHED') - assert response_json['num_in_flight_queries'] == 1 - - # We never fetch results for the async query, so it stays in-flight until cancelled. - response = requests.get(cancel_query_url) - assert response.status_code == requests.codes.ok - response_json = json.loads(response.text) - assert response_json['contents'] == "Query cancellation successful" - - # Cancel request can return before cancellation is finalized. Retry for slow - # environments like ASAN. - response_json = self.try_until("query cancellation", self.get_queries, - lambda resp: resp['num_in_flight_queries'] == 0) - assert response_json['num_waiting_queries'] == 0 - - expected_queries = [q for q in response_json['completed_queries'] - if q['query_id'] == query_id] - assert len(expected_queries) == 1 + proc, queue = start(self.client, "select count(*) from functional_parquet.alltypes") + wait_for_state(impalad, 'CREATED') + + in_flight_queries = impalad.get_debug_webpage_json('queries')['in_flight_queries'] + assert len(in_flight_queries) == 1 + assert in_flight_queries[0]['state'] == 'CREATED' + query_id = in_flight_queries[0]['query_id'] + cancel(impalad, query_id) + + # Verify query was cancelled. Cancel and fetch requests can return before cancellation + # is finalized, so wait for the original request to return. + assert "UserCancelledException: Query cancelled by user request" in join(proc, queue) + wait_for_state(impalad, None) + assert_query_stopped(impalad, query_id) + + response = impalad.read_debug_webpage( + "query_profile_plain_text?query_id={}".format(query_id)) + assert "Cancelled from Impala's debug web interface by user: " \ + "'anonymous' at" in response @pytest.mark.execute_serially - def test_query_cancel_exception(self): - """Tests that if we cancel a query in the CREATED state and it has an exception, we - can cancel it.""" - # Use MinimalHS2Connection because it has simpler concurrency than hs2_client. - with MinimalHS2Connection(IMPALAD_HS2_HOST_PORT) as client: - delay_created_action = "impalad_load_tables_delay:SLEEP@1000" - client.set_configuration(dict(debug_action=delay_created_action)) - self._test_query_cancel_exception(client) - - def _test_query_cancel_exception(self, client): - # Trigger UDF ERROR: Cannot divide decimal by zero - query = "select *, 1.0/0 from functional_parquet.alltypes limit 10" - - response_json = self.try_until("test baseline", self.get_queries, - lambda resp: resp['num_in_flight_queries'] == 0) - - def run(queue, client, query): - queue.put(client.execute_async(query)) + def test_query_cancel_load_tables(self): + """Tests that we can cancel a query in the CREATED state while loading tables.""" + impalad = ImpalaCluster.get_e2e_test_cluster().impalads[0].service + wait_for_state(impalad, None) + delay_created_action = "impalad_load_tables_delay:SLEEP@5000" + self.client.set_configuration({'debug_action': delay_created_action}) # Start the query completely async. The server doesn't return a response until # the query has exited the CREATED state, so we need to get the query ID another way. - queue = Queue() - proc = Process(target=run, args=(queue, client, query)) - proc.start() - - response_json = self.try_until("query creation", self.get_queries, - lambda resp: resp['num_in_flight_queries'] > 0) - assert len(response_json['in_flight_queries']) == 1 - assert response_json['in_flight_queries'][0]['state'] == 'CREATED' - query_id = response_json['in_flight_queries'][0]['query_id'] - - cancel_query_url = "{0}cancel_query?json&query_id={1}".format(self.ROOT_URL.format - ("25000"), query_id) - response = requests.get(cancel_query_url) - assert response.status_code == requests.codes.ok - response_json = json.loads(response.text) - assert response_json['error'] == "Query not yet running\n" - - # Fetch query results. - query_handle = queue.get() - proc.join() - assert query_handle - try: - client.fetch(query, query_handle) - except Exception as e: - re.match("UDF ERROR: Cannot divide decimal by zero", str(e)) - - # Cancel and fetch requests can return before cancellation is finalized. Retry for - # slow environments like ASAN. - response_json = self.try_until("query failure", self.get_queries, - lambda resp: resp['num_in_flight_queries'] == 0) - assert response_json['num_waiting_queries'] == 0 - - expected_queries = [q for q in response_json['completed_queries'] - if q['query_id'] == query_id] - assert len(expected_queries) == 1 + proc, queue = start(self.client, "select count(*) from functional_parquet.alltypes") + wait_for_state(impalad, 'CREATED') + + in_flight_queries = impalad.get_debug_webpage_json('queries')['in_flight_queries'] + assert len(in_flight_queries) == 1 + assert in_flight_queries[0]['state'] == 'CREATED' + query_id = in_flight_queries[0]['query_id'] + + # Call cancel multiple times to ensure it's idempotent. + procs = [Process(target=cancel, args=(impalad, query_id)) for _ in range(3)] + for proc in procs: + proc.start() + for proc in procs: + proc.join() + + # Verify query was cancelled. Cancel and fetch requests can return before cancellation + # is finalized, so wait for the original request to return and retry get_queries. + assert "UserCancelledException: Query cancelled by user request" in join(proc, queue) + wait_for_state(impalad, None) + assert_query_stopped(impalad, query_id) + + response = impalad.read_debug_webpage( + "query_profile_plain_text?query_id={}".format(query_id)) + assert "Cancelled from Impala's debug web interface by user: " \ + "'anonymous' at" in response @pytest.mark.execute_serially def test_hadoop_varz_page(self):
