This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit b3e9c4a65fa63da6f373c9ecec41fe4247e5e7d8 Author: Yida Wu <[email protected]> AuthorDate: Wed Nov 2 20:34:54 2022 -0700 IMPALA-7969: Always admit trivial queries immediately The idea of trivial query is to allow certain queries to bypass the admission control, and therefore accelerating the query execution even when the server resource is at capacity. It could benefit the queries that require a fast response while consuming the minimum resources. This patch adds support for the trivial query detection and allows an immediate admission for the trivial query. We define the trivial query as a subset of the coordinator-only query, and returns no more than one row. The definition is as below: - Must have PLAN ROOT SINK as the root - Can contain UNION and EMPTYSET nodes only - Results can not be over one row Examples of a trivial query: - select 1; - select * from table limit 0; - select * from table limit 0 union all select 1; - select 1, (2 + 3); Also, we restrict the parallelism of execution of the trivial query, each resource pool can execute no more than three trivial queries at the same time. If the maximum parallelism is reached, the admission controller would try to admit the trivial query via normal process. More precisely, if the cluster is running with a global admission controller, the max parallelism for the trivial query is three per resource pool, but if there is no global admission controller, each coordinator would admit the trivial queries based on its own local variable, therefore, the max parallelism would be three per coordinator per resource pool in this case. As the first patch, we try to keep the trivial query as simple as possible, and it could be extended in future. Added query option enable_trivial_query_for_admission to control whether the trivial query policy is enabled. Tests: Passed exhaustive tests. Added test_trivial_query and test_trivial_query_low_mem. Change-Id: I2a729764e3055d7eb11900c96c82ff53eb261f91 Reviewed-on: http://gerrit.cloudera.org:8080/19214 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/scheduling/admission-controller-test.cc | 4 +- be/src/scheduling/admission-controller.cc | 74 ++++++-- be/src/scheduling/admission-controller.h | 42 ++++- be/src/scheduling/schedule-state.cc | 5 + be/src/scheduling/schedule-state.h | 3 + be/src/service/query-options.cc | 4 + be/src/service/query-options.h | 5 +- common/thrift/ImpalaService.thrift | 3 + common/thrift/Query.thrift | 6 + .../main/java/org/apache/impala/analysis/Expr.java | 12 ++ .../java/org/apache/impala/planner/Planner.java | 4 + .../apache/impala/planner/TrivialQueryChecker.java | 113 ++++++++++++ tests/common/resource_pool_config.py | 1 + tests/custom_cluster/test_admission_controller.py | 196 ++++++++++++++++++++- tests/custom_cluster/test_session_expiration.py | 3 + tests/custom_cluster/test_shell_interactive.py | 1 + 16 files changed, 444 insertions(+), 32 deletions(-) diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc index 9732a7e5d..85e2073e4 100644 --- a/be/src/scheduling/admission-controller-test.cc +++ b/be/src/scheduling/admission-controller-test.cc @@ -743,7 +743,7 @@ TEST_F(AdmissionControllerTest, PoolStats) { CheckPoolStatsEmpty(pool_stats); // Show that Admit and Release leave stats at zero. - pool_stats->AdmitQueryAndMemory(*schedule_state); + pool_stats->AdmitQueryAndMemory(*schedule_state, false); ASSERT_EQ(1, pool_stats->agg_num_running()); ASSERT_EQ(1, pool_stats->metrics()->agg_num_running->GetValue()); int64_t mem_to_release = 0; @@ -752,7 +752,7 @@ TEST_F(AdmissionControllerTest, PoolStats) { admission_controller->GetMemToAdmit(*schedule_state, backend_state.second); } pool_stats->ReleaseMem(mem_to_release); - pool_stats->ReleaseQuery(0); + pool_stats->ReleaseQuery(0, false); CheckPoolStatsEmpty(pool_stats); } diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 1f0c80dfd..dc2b53e10 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -68,6 +68,7 @@ namespace impala { const int64_t AdmissionController::PoolStats::HISTOGRAM_NUM_OF_BINS = 128; const int64_t AdmissionController::PoolStats::HISTOGRAM_BIN_SIZE = 1024L * 1024L * 1024L; const double AdmissionController::PoolStats::EMA_MULTIPLIER = 0.2; +const int AdmissionController::PoolStats::MAX_NUM_TRIVIAL_QUERY_RUNNING = 3; /// Convenience method. string PrintBytes(int64_t value) { @@ -147,6 +148,8 @@ const string POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT = const string AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT = "Admission result"; const string AdmissionController::PROFILE_INFO_VAL_ADMIT_IMMEDIATELY = "Admitted immediately"; +const string AdmissionController::PROFILE_INFO_VAL_ADMIT_TRIVIAL = + "Admitted as a trivial query"; const string AdmissionController::PROFILE_INFO_VAL_QUEUED = "Queued"; const string AdmissionController::PROFILE_INFO_VAL_CANCELLED_IN_QUEUE = "Cancelled (queued)"; @@ -335,6 +338,7 @@ string AdmissionController::PoolStats::DebugString() const { ss << "agg_num_queued=" << agg_num_queued_ << ", "; ss << "agg_mem_reserved=" << PrintBytes(agg_mem_reserved_) << ", "; ss << " local_host(local_mem_admitted=" << PrintBytes(local_mem_admitted_) << ", "; + ss << "local_trivial_running=" << local_trivial_running_ << ", "; ss << DebugPoolStats(local_stats_) << ")"; return ss.str(); } @@ -676,7 +680,8 @@ Status AdmissionController::Init() { return status; } -void AdmissionController::PoolStats::AdmitQueryAndMemory(const ScheduleState& state) { +void AdmissionController::PoolStats::AdmitQueryAndMemory( + const ScheduleState& state, bool is_trivial) { int64_t cluster_mem_admitted = state.GetClusterMemoryToAdmit(); DCHECK_GT(cluster_mem_admitted, 0); local_mem_admitted_ += cluster_mem_admitted; @@ -689,9 +694,11 @@ void AdmissionController::PoolStats::AdmitQueryAndMemory(const ScheduleState& st metrics_.local_num_admitted_running->Increment(1L); metrics_.total_admitted->Increment(1L); + if (is_trivial) ++local_trivial_running_; } -void AdmissionController::PoolStats::ReleaseQuery(int64_t peak_mem_consumption) { +void AdmissionController::PoolStats::ReleaseQuery( + int64_t peak_mem_consumption, bool is_trivial) { // Update stats tracking the number of running and admitted queries. agg_num_running_ -= 1; metrics_.agg_num_running->Increment(-1L); @@ -702,6 +709,10 @@ void AdmissionController::PoolStats::ReleaseQuery(int64_t peak_mem_consumption) metrics_.total_released->Increment(1L); DCHECK_GE(local_stats_.num_admitted_running, 0); DCHECK_GE(agg_num_running_, 0); + if (is_trivial) { + --local_trivial_running_; + DCHECK_GE(local_trivial_running_, 0); + } // Update the 'peak_mem_histogram' based on the given peak memory consumption of the // query, if provided. @@ -773,14 +784,15 @@ void AdmissionController::UpdateStatsOnReleaseForBackends(const UniqueIdPB& quer pools_for_updates_.insert(running_query.request_pool); } -void AdmissionController::UpdateStatsOnAdmission(const ScheduleState& state) { +void AdmissionController::UpdateStatsOnAdmission( + const ScheduleState& state, bool is_trivial) { for (const auto& entry : state.per_backend_schedule_states()) { const NetworkAddressPB& host_addr = entry.first; int64_t mem_to_admit = GetMemToAdmit(state, entry.second); UpdateHostStats(host_addr, mem_to_admit, 1, entry.second.exec_params->slots_to_use()); } PoolStats* pool_stats = GetPoolStats(state); - pool_stats->AdmitQueryAndMemory(state); + pool_stats->AdmitQueryAndMemory(state, is_trivial); pools_for_updates_.insert(state.request_pool()); } @@ -950,6 +962,13 @@ bool AdmissionController::HasAvailableSlots(const ScheduleState& state, return true; } +bool AdmissionController::CanAdmitTrivialRequest(const ScheduleState& state) { + PoolStats* pool_stats = GetPoolStats(state); + DCHECK(pool_stats != nullptr); + return pool_stats->local_trivial_running() + 1 + <= PoolStats::MAX_NUM_TRIVIAL_QUERY_RUNNING; +} + bool AdmissionController::CanAdmitRequest(const ScheduleState& state, const TPoolConfig& pool_cfg, bool admit_from_queue, string* not_admitted_reason, string* not_admitted_details, bool& coordinator_resource_limited) { @@ -959,7 +978,6 @@ bool AdmissionController::CanAdmitRequest(const ScheduleState& state, // (c) One of the executors in 'schedule' is already at its maximum number of requests // (when not using the default executor group). // (d) There are not enough memory resources available for the query. - const int64_t max_requests = GetMaxRequestsForPool(pool_cfg); PoolStats* pool_stats = GetPoolStats(state); bool default_group = @@ -1222,9 +1240,10 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request, stats->UpdateConfigMetrics(queue_node->pool_cfg); bool unused_bool; + bool is_trivial = false; bool must_reject = !FindGroupToAdmitOrReject(membership_snapshot, queue_node->pool_cfg, - /* admit_from_queue=*/false, stats, queue_node, unused_bool); + /* admit_from_queue=*/false, stats, queue_node, unused_bool, &is_trivial); if (must_reject) { AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::REJECTED); if (outcome != AdmissionOutcome::REJECTED) { @@ -1246,16 +1265,18 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request, DCHECK(queue_node->admitted_schedule->query_schedule_pb().get() != nullptr); const string& group_name = queue_node->admitted_schedule->executor_group(); VLOG(3) << "Can admit to group " << group_name << " (or cancelled)"; - DCHECK_EQ(stats->local_stats().num_queued, 0); + if (!is_trivial) DCHECK_EQ(stats->local_stats().num_queued, 0); AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::ADMITTED); if (outcome != AdmissionOutcome::ADMITTED) { DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED); - VLOG_QUERY << "Ready to be " << PROFILE_INFO_VAL_ADMIT_IMMEDIATELY + VLOG_QUERY << "Ready to be " + << (is_trivial ? PROFILE_INFO_VAL_ADMIT_TRIVIAL : + PROFILE_INFO_VAL_ADMIT_IMMEDIATELY) << " but already cancelled, query id=" << PrintId(request.query_id); return Status::CANCELLED; } VLOG_QUERY << "Admitting query id=" << PrintId(request.query_id); - AdmitQuery(queue_node, false); + AdmitQuery(queue_node, false /* was_queued */, is_trivial); stats->UpdateWaitTime(0); VLOG_RPC << "Final: " << stats->DebugString(); *schedule_result = move(queue_node->admitted_schedule->query_schedule_pb()); @@ -1429,7 +1450,7 @@ void AdmissionController::ReleaseQuery(const UniqueIdPB& query_id, DCHECK_EQ(num_released_backends_.at(query_id), 0) << PrintId(query_id); num_released_backends_.erase(num_released_backends_.find(query_id)); PoolStats* stats = GetPoolStats(running_query.request_pool); - stats->ReleaseQuery(peak_mem_consumption); + stats->ReleaseQuery(peak_mem_consumption, running_query.is_trivial); // No need to update the Host Stats as they should have been updated in // ReleaseQueryBackends. pools_for_updates_.insert(running_query.request_pool); @@ -1848,7 +1869,7 @@ Status AdmissionController::ComputeGroupScheduleStates( bool AdmissionController::FindGroupToAdmitOrReject( ClusterMembershipMgr::SnapshotPtr membership_snapshot, const TPoolConfig& pool_config, bool admit_from_queue, PoolStats* pool_stats, QueueNode* queue_node, - bool& coordinator_resource_limited) { + bool& coordinator_resource_limited, bool* is_trivial) { // Check for rejection based on current cluster size const string& pool_name = pool_stats->name(); string rejection_reason; @@ -1890,9 +1911,21 @@ bool AdmissionController::FindGroupToAdmitOrReject( << " dedicated_coord_mem_estimate=" << PrintBytes(state->GetDedicatedCoordMemoryEstimate()) << " max_requests=" << max_requests << " max_queued=" << max_queued - << " max_mem=" << PrintBytes(max_mem); + << " max_mem=" << PrintBytes(max_mem) << " is_trivial_query=" + << PrettyPrinter::Print(state->GetIsTrivialQuery(), TUnit::NONE); VLOG_QUERY << "Stats: " << pool_stats->DebugString(); + if (state->GetIsTrivialQuery() && CanAdmitTrivialRequest(*state)) { + // The trivial query is supposed to be a subset of the coord-only query, + // so the executor group should be an empty group. + DCHECK_EQ(&executor_group, cluster_membership_mgr_->GetEmptyExecutorGroup()); + VLOG_QUERY << "Admitted by trivial query policy."; + queue_node->admitted_schedule = std::move(group_state.state); + DCHECK(is_trivial != nullptr); + if (is_trivial != nullptr) *is_trivial = true; + return true; + } + // Query is rejected if the rejection check fails on *any* group. if (RejectForSchedule(*state, pool_config, &rejection_reason)) { DCHECK(!rejection_reason.empty()); @@ -2029,10 +2062,11 @@ void AdmissionController::DequeueLoop() { && queue_node->admit_outcome->Get() == AdmissionOutcome::CANCELLED; bool coordinator_resource_limited = false; + bool is_trivial = false; bool is_rejected = !is_cancelled && !FindGroupToAdmitOrReject(membership_snapshot, pool_config, - /* admit_from_queue=*/true, stats, queue_node, - coordinator_resource_limited); + /* admit_from_queue=*/true, stats, queue_node, + coordinator_resource_limited, &is_trivial); if (!is_cancelled && !is_rejected && queue_node->admitted_schedule.get() == nullptr) { @@ -2089,7 +2123,7 @@ void AdmissionController::DequeueLoop() { DCHECK(!is_cancelled); DCHECK(!is_rejected); DCHECK(queue_node->admitted_schedule != nullptr); - AdmitQuery(queue_node, true); + AdmitQuery(queue_node, true /* was_queued */, is_trivial); } pools_for_updates_.insert(pool_name); } @@ -2161,7 +2195,7 @@ AdmissionController::PoolStats* AdmissionController::GetPoolStats( return &it->second; } -void AdmissionController::AdmitQuery(QueueNode* node, bool was_queued) { +void AdmissionController::AdmitQuery(QueueNode* node, bool was_queued, bool is_trivial) { ScheduleState* state = node->admitted_schedule.get(); VLOG_RPC << "For Query " << PrintId(state->query_id()) << " per_backend_mem_limit set to: " @@ -2173,11 +2207,12 @@ void AdmissionController::AdmitQuery(QueueNode* node, bool was_queued) { << " coord_backend_mem_to_admit set to: " << PrintBytes(state->coord_backend_mem_to_admit()); // Update memory and number of queries. - UpdateStatsOnAdmission(*state); + UpdateStatsOnAdmission(*state, is_trivial); UpdateExecGroupMetric(state->executor_group(), 1); // Update summary profile. - const string& admission_result = - was_queued ? PROFILE_INFO_VAL_ADMIT_QUEUED : PROFILE_INFO_VAL_ADMIT_IMMEDIATELY; + const string& admission_result = was_queued ? + PROFILE_INFO_VAL_ADMIT_QUEUED : + (is_trivial ? PROFILE_INFO_VAL_ADMIT_TRIVIAL : PROFILE_INFO_VAL_ADMIT_IMMEDIATELY); state->summary_profile()->AddInfoString( PROFILE_INFO_KEY_ADMISSION_RESULT, admission_result); state->summary_profile()->AddInfoString( @@ -2212,6 +2247,7 @@ void AdmissionController::AdmitQuery(QueueNode* node, bool was_queued) { RunningQuery& running_query = it->second[state->query_id()]; running_query.request_pool = state->request_pool(); running_query.executor_group = state->executor_group(); + running_query.is_trivial = is_trivial; for (const auto& entry : state->per_backend_schedule_states()) { BackendAllocation& allocation = running_query.per_backend_resources[entry.first]; allocation.slots_to_use = entry.second.exec_params->slots_to_use(); diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index fec28d5d4..758c92f05 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -328,6 +328,7 @@ class AdmissionController { // Profile info strings static const std::string PROFILE_INFO_KEY_ADMISSION_RESULT; static const std::string PROFILE_INFO_VAL_ADMIT_IMMEDIATELY; + static const std::string PROFILE_INFO_VAL_ADMIT_TRIVIAL; static const std::string PROFILE_INFO_VAL_QUEUED; static const std::string PROFILE_INFO_VAL_CANCELLED_IN_QUEUE; static const std::string PROFILE_INFO_VAL_ADMIT_QUEUED; @@ -546,23 +547,30 @@ class AdmissionController { }; PoolStats(AdmissionController* parent, const std::string& name) - : name_(name), parent_(parent), agg_num_running_(0), agg_num_queued_(0), - agg_mem_reserved_(0), local_mem_admitted_(0), wait_time_ms_ema_(0.0) { + : name_(name), + parent_(parent), + agg_num_running_(0), + agg_num_queued_(0), + agg_mem_reserved_(0), + local_trivial_running_(0), + local_mem_admitted_(0), + wait_time_ms_ema_(0.0) { peak_mem_histogram_.resize(HISTOGRAM_NUM_OF_BINS, 0); InitMetrics(); } int64_t agg_num_running() const { return agg_num_running_; } int64_t agg_num_queued() const { return agg_num_queued_; } + int64_t local_trivial_running() const { return local_trivial_running_; } int64_t EffectiveMemReserved() const { return std::max(agg_mem_reserved_, local_mem_admitted_); } // ADMISSION LIFECYCLE METHODS /// Updates the pool stats when the request represented by 'state' is admitted. - void AdmitQueryAndMemory(const ScheduleState& state); + void AdmitQueryAndMemory(const ScheduleState& state, bool is_trivial); /// Updates the pool stats except the memory admitted stat. - void ReleaseQuery(int64_t peak_mem_consumption); + void ReleaseQuery(int64_t peak_mem_consumption, bool is_trivial); /// Releases the specified memory from the pool stats. void ReleaseMem(int64_t mem_to_release); /// Updates the pool stats when the request represented by 'state' is queued. @@ -630,6 +638,9 @@ class AdmissionController { const std::string& name() const { return name_; } + /// The max number of running trivial queries that can be allowed at the same time. + static const int MAX_NUM_TRIVIAL_QUERY_RUNNING; + private: const std::string name_; AdmissionController* parent_; @@ -648,6 +659,13 @@ class AdmissionController { /// other hosts. Updated only by UpdateAggregates(). int64_t agg_mem_reserved_; + /// Number of running trivial queries in this pool that have been admitted by this + /// local coordinator. The purpose of it is to control the concurrency of running + /// trivial queries in case they may consume too many resources because trivial + /// queries bypass the normal admission control procedure. + /// Updated only in AdmitQueryAndMemory() and ReleaseQuery(). + int64_t local_trivial_running_; + /// Memory in this pool (across all nodes) that is needed for requests that have been /// admitted by this local coordinator. Updated only on Admit() and Release(). Stored /// separately from the other 'local' stats in local_stats_ because it is not sent @@ -855,6 +873,9 @@ class AdmissionController { /// Map from backend addresses to the resouces this query was allocated on them. When /// backends are released, they are removed from this map. std::unordered_map<NetworkAddressPB, BackendAllocation> per_backend_resources; + + /// Indicate whether the query is admitted as a trivial query. + bool is_trivial; }; /// Map from host id to a map from query id of currently running queries to information @@ -933,9 +954,12 @@ class AdmissionController { /// true and keeps queue_node->admitted_schedule unset if the query cannot be admitted /// now, but also does not need to be rejected. If the query must be rejected, this /// method returns false and sets queue_node->not_admitted_reason. + /// The is_trivial is set to true when is_trivial is not null if the query is admitted + /// as a trivial query. bool FindGroupToAdmitOrReject(ClusterMembershipMgr::SnapshotPtr membership_snapshot, const TPoolConfig& pool_config, bool admit_from_queue, PoolStats* pool_stats, - QueueNode* queue_node, bool& coordinator_resource_limited); + QueueNode* queue_node, bool& coordinator_resource_limited, + bool* is_trivial = nullptr); /// Dequeues the queued queries when notified by dequeue_cv_ and admits them if they /// have not been cancelled yet. @@ -952,6 +976,10 @@ class AdmissionController { bool admit_from_queue, string* not_admitted_reason, string* not_admitted_details, bool& coordinator_resource_limited); + /// Returns true if the query can be admitted as a trivial query, therefore it can + /// bypass the admission control immediately. + bool CanAdmitTrivialRequest(const ScheduleState& state); + /// Returns true if all executors can accommodate the largest initial reservation of /// any executor and the backend running the coordinator fragment can accommodate its /// own initial reservation. Otherwise, returns false with the details about the memory @@ -988,7 +1016,7 @@ class AdmissionController { /// Updates the memory admitted and the num of queries running for each backend in /// 'state'. Also updates the stats of its associated resource pool. Used only when /// the 'state' is admitted. - void UpdateStatsOnAdmission(const ScheduleState& state); + void UpdateStatsOnAdmission(const ScheduleState& state, bool is_trivial); /// Updates the memory admitted and the num of queries running for each backend in /// 'state' which have been release/completed. The list of completed backends is @@ -1057,7 +1085,7 @@ class AdmissionController { /// Sets the per host mem limit and mem admitted in the schedule and does the necessary /// accounting and logging on successful submission. /// Caller must hold 'admission_ctrl_lock_'. - void AdmitQuery(QueueNode* node, bool was_queued); + void AdmitQuery(QueueNode* node, bool was_queued, bool is_trivial); /// Same as PoolToJson() but requires 'admission_ctrl_lock_' to be held by the caller. /// Is a helper method used by both PoolToJson() and AllPoolsToJson() diff --git a/be/src/scheduling/schedule-state.cc b/be/src/scheduling/schedule-state.cc index 79859876a..e9243c2f3 100644 --- a/be/src/scheduling/schedule-state.cc +++ b/be/src/scheduling/schedule-state.cc @@ -218,6 +218,11 @@ int64_t ScheduleState::GetDedicatedCoordMemoryEstimate() const { return request_.dedicated_coord_mem_estimate; } +bool ScheduleState::GetIsTrivialQuery() const { + DCHECK(request_.__isset.is_trivial_query); + return request_.is_trivial_query; +} + void ScheduleState::IncNumScanRanges(int64_t delta) { query_schedule_pb_->set_num_scan_ranges(query_schedule_pb_->num_scan_ranges() + delta); } diff --git a/be/src/scheduling/schedule-state.h b/be/src/scheduling/schedule-state.h index 233b25633..50e9af012 100644 --- a/be/src/scheduling/schedule-state.h +++ b/be/src/scheduling/schedule-state.h @@ -177,6 +177,9 @@ class ScheduleState { /// dedicated coordinator. int64_t GetDedicatedCoordMemoryEstimate() const; + /// Return whether the request is a trivial query. + bool GetIsTrivialQuery() const; + /// Helper methods used by scheduler to populate this ScheduleState. void IncNumScanRanges(int64_t delta); diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 4579a9612..e9c72c4ed 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -598,6 +598,10 @@ Status impala::SetQueryOption(const string& key, const string& value, query_options->__set_max_mem_estimate_for_admission(mem_spec_val.value); break; } + case TImpalaQueryOptions::ENABLE_TRIVIAL_QUERY_FOR_ADMISSION: { + query_options->__set_enable_trivial_query_for_admission(IsTrue(value)); + break; + } case TImpalaQueryOptions::THREAD_RESERVATION_LIMIT: { int32_t int32_t_val = 0; RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<int32_t>( diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index d21038491..402582bf8 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // time we add or remove a query option to/from the enum TImpalaQueryOptions. #define QUERY_OPTS_TABLE \ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \ - TImpalaQueryOptions::STRINGIFY_MAP_KEYS + 1); \ + TImpalaQueryOptions::ENABLE_TRIVIAL_QUERY_FOR_ADMISSION + 1); \ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \ @@ -282,7 +282,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> disable_codegen_cache, DISABLE_CODEGEN_CACHE, TQueryOptionLevel::ADVANCED) \ QUERY_OPT_FN(codegen_cache_mode, CODEGEN_CACHE_MODE, TQueryOptionLevel::DEVELOPMENT) \ QUERY_OPT_FN(stringify_map_keys, STRINGIFY_MAP_KEYS, TQueryOptionLevel::ADVANCED) \ -; + QUERY_OPT_FN(enable_trivial_query_for_admission, ENABLE_TRIVIAL_QUERY_FOR_ADMISSION, \ + TQueryOptionLevel::REGULAR); /// Enforce practical limits on some query options to avoid undesired query state. static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 49a7595be..add37998b 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -757,6 +757,9 @@ enum TImpalaQueryOptions { // Convert non-string map keys to string to produce valid JSON. STRINGIFY_MAP_KEYS = 151 + + // Enable immediate admission for trivial queries. + ENABLE_TRIVIAL_QUERY_FOR_ADMISSION = 152 } // The summary of a DML statement. diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 0bf150e14..80230f5f9 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -615,6 +615,9 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 152: optional bool stringify_map_keys = false; + + // See comment in ImpalaService.thrift + 153: optional bool enable_trivial_query_for_admission = true; } // Impala currently has three types of sessions: Beeswax, HiveServer2 and external @@ -889,5 +892,8 @@ struct TQueryExecRequest { // fragment will run on a dedicated coordinator. Set by the planner and used by // admission control. 12: optional i64 dedicated_coord_mem_estimate; + + // Indicate whether the request is a trivial query. Used by admission control. + 13: optional bool is_trivial_query } diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java index 722d99df4..44c478efb 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Expr.java +++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java @@ -363,6 +363,18 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl } }; + // Returns true if an Expr is a builtin sleep function. + public static final com.google.common.base.Predicate<Expr> IS_FN_SLEEP = + new com.google.common.base.Predicate<Expr>() { + @Override + public boolean apply(Expr arg) { + return arg instanceof FunctionCallExpr + && ((FunctionCallExpr) arg).getFnName().isBuiltin() + && ((FunctionCallExpr) arg).getFnName().getFunction() != null + && ((FunctionCallExpr) arg).getFnName().getFunction().equals("sleep"); + } + }; + // id that's unique across the entire query statement and is assigned by // Analyzer.registerConjuncts(); only assigned for the top-level terms of a // conjunction, and therefore null for most Exprs diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 9c9056206..2e077fe31 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -421,6 +421,9 @@ public class Planner { // profiles bottom-up since a fragment's profile may depend on its descendants. PlanFragment rootFragment = planRoots.get(0); List<PlanFragment> allFragments = rootFragment.getNodesPostOrder(); + boolean trivial = + TrivialQueryChecker.IsTrivial(rootFragment, queryOptions, isQueryStmt); + for (PlanFragment fragment: allFragments) { // Compute the per-node, per-sink and aggregate profiles for the fragment. fragment.computeResourceProfile(planCtx.getRootAnalyzer()); @@ -450,6 +453,7 @@ public class Planner { maxPerHostPeakResources = MIN_PER_HOST_RESOURCES.max(maxPerHostPeakResources); request.setPer_host_mem_estimate(maxPerHostPeakResources.getMemEstimateBytes()); + request.setIs_trivial_query(trivial); request.setMax_per_host_min_mem_reservation( maxPerHostPeakResources.getMinMemReservationBytes()); request.setMax_per_host_thread_reservation( diff --git a/fe/src/main/java/org/apache/impala/planner/TrivialQueryChecker.java b/fe/src/main/java/org/apache/impala/planner/TrivialQueryChecker.java new file mode 100644 index 000000000..bcc6dc13a --- /dev/null +++ b/fe/src/main/java/org/apache/impala/planner/TrivialQueryChecker.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.planner; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.FunctionCallExpr; +import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.catalog.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Check whether a query is trivial or not. A trivial query is allowed for an immediate + * admission even in the case that the admission pool runs out of the memory or hits the + * limit of the request number. + * + */ +public class TrivialQueryChecker { + private final static Logger LOG = LoggerFactory.getLogger(TrivialQueryChecker.class); + + /** + * Check whether query meets the general requirements that a trivial query must have. + */ + private static boolean PassedMustHave(PlanFragment rootFragment) { + List<PlanFragment> allFragments = rootFragment.getNodesPostOrder(); + if (allFragments.size() != 1) return false; + if (!(rootFragment.getSink() instanceof PlanRootSink)) return false; + PlanNode planRoot = rootFragment.getPlanRoot(); + if (planRoot.numNodes() != 1) return false; + if (planRoot instanceof UnionNode) { + // The trivial query would return 0 or 1 row, otherwise return false. + if (((UnionNode) planRoot).constExprLists_.size() > 1 + || ((UnionNode) planRoot).resultExprLists_.size() > 0) { + return false; + } + } else if (!(rootFragment.getPlanRoot() instanceof EmptySetNode)) { + return false; + } + return true; + } + + /** + * A helper method to check whether there is a sleep function expression inside the + * expression list. Used by PassedSpecialCheck() only. + */ + private static boolean HasFunctionSleep(List<Expr> exprList) { + if (exprList == null) return false; + List<FunctionCallExpr> sleepFuncList = new ArrayList<>(); + for (Expr expr : exprList) { + if (expr == null) continue; + expr.collectAll(Expr.IS_FN_SLEEP, sleepFuncList); + if (sleepFuncList.size() > 0) return true; + } + return false; + } + + /** + * Check whether the query meets the special requirements of a trivial query. + * Should only be called after passing PassedMustHave(). + */ + private static boolean PassedSpecialCheck(PlanFragment rootFragment) { + // If contains sleep function, we don't consider it to be trivial, because it can + // sleep for a long time and doesn't meet the original idea of the setting of + // trivial queries. + // Also a lot of testcases use sleep for testing, it is better to treat it as a + // normal query. + PlanNode planRoot = rootFragment.getPlanRoot(); + Preconditions.checkArgument(planRoot.numNodes() == 1); + if (planRoot instanceof UnionNode) { + UnionNode unionNode = (UnionNode) planRoot; + Preconditions.checkArgument(unionNode.resultExprLists_.size() == 0); + for (List<Expr> constList : unionNode.constExprLists_) { + if (HasFunctionSleep(constList)) return false; + } + } else { + // Must be an EmptySetNode if it is not a UnionNode. + Preconditions.checkArgument(rootFragment.getPlanRoot() instanceof EmptySetNode); + } + return true; + } + + /** + * Returns whether the query is trivial. Used for admission controller. + */ + public static boolean IsTrivial( + PlanFragment rootFragment, TQueryOptions queryOptions, boolean isQueryStmt) { + if (!queryOptions.isEnable_trivial_query_for_admission() || !isQueryStmt) { + return false; + } + if (!PassedMustHave(rootFragment)) return false; + return PassedSpecialCheck(rootFragment); + } +} diff --git a/tests/common/resource_pool_config.py b/tests/common/resource_pool_config.py index 47a37b5f3..ad18c5916 100644 --- a/tests/common/resource_pool_config.py +++ b/tests/common/resource_pool_config.py @@ -65,6 +65,7 @@ class ResourcePoolConfig(object): metric_key = "admission-controller.{0}.root.{1}".format(metric_str, pool_name) start_time = time() while (time() - start_time < timeout): + client.execute("set enable_trivial_query_for_admission=false") handle = client.execute_async("select 'wait_for_config_change'") client.close_query(handle) current_val = str(self.ac_service.get_metric_value(metric_key)) diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 213772ad7..445e0d208 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -314,6 +314,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # Also try setting a valid pool client.set_configuration({'request_pool': 'root.queueB'}) + client.execute('set enable_trivial_query_for_admission=false') result = client.execute("select 1") # Query should execute in queueB which doesn't have a default mem limit set in the # llama-site.xml, so it should inherit the value from the default process query @@ -325,6 +326,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # queueA allows only 1 running query and has a queue timeout of 50ms, so the # second concurrent query should time out quickly. client.set_configuration({'request_pool': 'root.queueA'}) + client.execute('set enable_trivial_query_for_admission=false') handle = client.execute_async("select sleep(1000)") # Wait for query to clear admission control and get accounted for client.wait_for_admission_control(handle) @@ -344,6 +346,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # proc/pool default. client.execute("set mem_limit=31337") client.execute("set abort_on_error=1") + client.execute('set enable_trivial_query_for_admission=false') result = client.execute("select 1") self.__check_query_options(result.runtime_profile, ['MEM_LIMIT=31337', 'ABORT_ON_ERROR=1', 'QUERY_TIMEOUT_S=5', @@ -353,6 +356,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # config overlay sent with the query RPC. mem_limit is a pool-level override and # max_io_buffers has no proc/pool default. client.set_configuration({'request_pool': 'root.queueA', 'mem_limit': '12345'}) + client.execute('set enable_trivial_query_for_admission=false') result = client.execute("select 1") self.__check_query_options(result.runtime_profile, ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA', @@ -364,6 +368,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # abort on error, because it's back to being the default. client.execute('set mem_limit=""') client.execute('set abort_on_error=""') + client.execute('set enable_trivial_query_for_admission=false') client.set_configuration({'request_pool': 'root.queueA'}) result = client.execute("select 1") self.__check_query_options(result.runtime_profile, @@ -756,6 +761,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): try: client.set_configuration_option("debug_action", "AC_BEFORE_ADMISSION:SLEEP@2000") client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 1) + client.set_configuration_option('enable_trivial_query_for_admission', 'false') handle = client.execute_async("select 1") sleep(1) client.close_query(handle) @@ -764,6 +770,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): client.clear_configuration() client.set_configuration_option("debug_action", "AC_BEFORE_ADMISSION:SLEEP@2000") + client.set_configuration_option('enable_trivial_query_for_admission', 'false') handle = client.execute_async("select 2") sleep(1) client.close_query(handle) @@ -772,6 +779,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): client.set_configuration_option("debug_action", "CRS_BEFORE_COORD_STARTS:SLEEP@2000") + client.set_configuration_option('enable_trivial_query_for_admission', 'false') handle = client.execute_async("select 3") sleep(1) client.close_query(handle) @@ -779,6 +787,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): "Cancelled right after starting the coordinator query id=") client.set_configuration_option("debug_action", "CRS_AFTER_COORD_STARTS:SLEEP@2000") + client.set_configuration_option('enable_trivial_query_for_admission', 'false') handle = client.execute_async("select 4") sleep(1) client.close_query(handle) @@ -789,6 +798,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): handle = client.execute_async("select sleep(10000)") client.set_configuration_option("debug_action", "AC_AFTER_ADMISSION_OUTCOME:SLEEP@2000") + client.set_configuration_option('enable_trivial_query_for_admission', 'false') queued_query_handle = client.execute_async("select 5") sleep(1) assert client.get_state(queued_query_handle) == QueryState.COMPILED @@ -805,6 +815,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): self.get_ac_log_name(), 'INFO', "Dequeued cancelled query=") client.clear_configuration() + client.set_configuration_option('enable_trivial_query_for_admission', 'false') handle = client.execute_async("select sleep(10000)") queued_query_handle = client.execute_async("select 6") sleep(1) @@ -832,6 +843,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): pool_max_mem=1024 * 1024 * 1024), statestored_args=_STATESTORED_ARGS) def test_queue_reasons_num_queries(self): + self.client.set_configuration_option('enable_trivial_query_for_admission', 'false') + """Test that queue details appear in the profile when queued based on num_queries.""" # Run a bunch of queries - one should get admitted immediately, the rest should # be dequeued one-by-one. @@ -864,6 +877,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): pool_max_mem=10 * 1024 * 1024), statestored_args=_STATESTORED_ARGS) def test_queue_reasons_memory(self): + self.client.set_configuration_option('enable_trivial_query_for_admission', 'false') + """Test that queue details appear in the profile when queued based on memory.""" # Run a bunch of queries with mem_limit set so that only one can be admitted at a # time- one should get admitted immediately, the rest should be dequeued one-by-one. @@ -905,6 +920,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): queue_wait_timeout_ms=1000), statestored_args=_STATESTORED_ARGS) def test_timeout_reason_host_memory(self): + self.client.set_configuration_option('enable_trivial_query_for_admission', 'false') + """Test that queue details appear in the profile when queued and then timed out due to a small 2MB host memory limit configuration.""" # Run a bunch of queries with mem_limit set so that only one can be admitted @@ -937,6 +954,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): queue_wait_timeout_ms=1000), statestored_args=_STATESTORED_ARGS) def test_timeout_reason_pool_memory(self): + self.client.set_configuration_option('enable_trivial_query_for_admission', 'false') + """Test that queue details appear in the profile when queued and then timed out due to a small 2MB pool memory limit configuration.""" # Run a bunch of queries with mem_limit set so that only one can be admitted @@ -1094,11 +1113,13 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): pool_name = "invalidTestPool" config_str = "max-query-mem-limit" self.client.set_configuration_option('request_pool', pool_name) + self.client.set_configuration_option('enable_trivial_query_for_admission', 'false') # Setup to queue a query. sleep_query_handle = self.client.execute_async("select sleep(10000)") self.client.wait_for_admission_control(sleep_query_handle) self._wait_for_change_to_profile(sleep_query_handle, "Admission result: Admitted immediately") + self.client.execute("set enable_trivial_query_for_admission=false") queued_query_handle = self.client.execute_async("select 2") self._wait_for_change_to_profile(queued_query_handle, "Admission result: Queued") @@ -1135,6 +1156,175 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20), self.close_query(queued_query_handle) + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1, + pool_max_mem=1024 * 1024 * 1024), + statestored_args=_STATESTORED_ARGS) + def test_trivial_query(self): + self.client.execute("set enable_trivial_query_for_admission=false") + + # Test the second request does need to queue when trivial query is disabled. + sleep_query_handle = self.client.execute_async("select sleep(10000)") + self.client.wait_for_admission_control(sleep_query_handle) + self._wait_for_change_to_profile(sleep_query_handle, + "Admission result: Admitted immediately") + trivial_query_handle = self.client.execute_async("select 2") + self._wait_for_change_to_profile(trivial_query_handle, "Admission result: Queued") + self.client.close_query(sleep_query_handle) + self.client.close_query(trivial_query_handle) + + self.client.execute("set enable_trivial_query_for_admission=true") + # Test when trivial query is enabled, all trivial queries should be + # admitted immediately. + sleep_query_handle = self.client.execute_async("select sleep(10000)") + self.client.wait_for_admission_control(sleep_query_handle) + self._wait_for_change_to_profile(sleep_query_handle, + "Admission result: Admitted immediately") + # Test the trivial queries. + self._test_trivial_queries_suc() + # Test the queries that are not trivial. + self._test_trivial_queries_negative() + self.client.close_query(sleep_query_handle) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1, + pool_max_mem=1), + statestored_args=_STATESTORED_ARGS) + def test_trivial_query_low_mem(self): + # Test whether it will fail for a normal query. + failed_query_handle = self.client.execute_async( + "select * from functional_parquet.alltypes limit 100") + self.wait_for_state(failed_query_handle, QueryState.EXCEPTION, 20) + self.client.close_query(failed_query_handle) + # Test it should pass all the trivial queries. + self._test_trivial_queries_suc() + + class MultiTrivialRunThread(threading.Thread): + def __init__(self, admit_obj, sql, expect_err=False): + super(self.__class__, self).__init__() + self.admit_obj = admit_obj + self.sql = sql + self.error = None + self.expect_err = expect_err + + def run(self): + try: + self._test_multi_trivial_query_runs() + except Exception as e: + LOG.exception(e) + self.error = e + raise e + + def _test_multi_trivial_query_runs(self): + timeout = 10 + admit_obj = self.admit_obj + client = admit_obj.cluster.impalads[0].service.create_beeswax_client() + for i in range(100): + handle = client.execute_async(self.sql) + if not self.expect_err: + assert client.wait_for_finished_timeout(handle, timeout) + else: + if not client.wait_for_finished_timeout(handle, timeout): + self.error = Exception("Wait timeout " + str(timeout) + " seconds.") + break + result = client.fetch(self.sql, handle) + assert result.success + client.close_query(handle) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=100000, + pool_max_mem=1024 * 1024 * 1024), + statestored_args=_STATESTORED_ARGS) + def test_trivial_query_multi_runs(self): + threads = [] + # Test mixed trivial and non-trivial queries workload, and should successfully run + # for all. + # Test the case when the number of trivial queries is over the maximum pallelism, + # which is three. + for i in range(5): + thread_instance = self.MultiTrivialRunThread(self, "select 1") + threads.append(thread_instance) + # Runs non-trivial queries below. + for i in range(2): + thread_instance = self.MultiTrivialRunThread(self, "select sleep(1)") + threads.append(thread_instance) + for thread_instance in threads: + thread_instance.start() + for thread_instance in threads: + thread_instance.join() + for thread_instance in threads: + if thread_instance.error is not None: + raise thread_instance.error + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=100000, + pool_max_mem=1024 * 1024 * 1024), + statestored_args=_STATESTORED_ARGS) + def test_trivial_query_multi_runs_fallback(self): + threads = [] + # Test the case when the number of trivial queries is over the maximum pallelism, + # which is three, other trivial queries should fall back to normal process and + # blocked by the long sleep query in our testcase, then leads to a timeout error. + long_query_handle = self.client.execute_async("select sleep(100000)") + for i in range(5): + thread_instance = self.MultiTrivialRunThread(self, "select 1", True) + threads.append(thread_instance) + for thread_instance in threads: + thread_instance.start() + for thread_instance in threads: + thread_instance.join() + has_error = False + for thread_instance in threads: + if thread_instance.error is not None: + assert "Wait timeout" in str(thread_instance.error) + has_error = True + assert has_error + self.client.close_query(long_query_handle) + + def _test_trivial_queries_suc(self): + self._test_trivial_queries_helper("select 1") + self._test_trivial_queries_helper( + "select * from functional_parquet.alltypes limit 0") + self._test_trivial_queries_helper("select 1, (2 + 3)") + self._test_trivial_queries_helper( + "select id from functional_parquet.alltypes limit 0 union all select 1") + self._test_trivial_queries_helper( + "select 1 union all select id from functional_parquet.alltypes limit 0") + + # Test the cases that do not fit for trivial queries. + def _test_trivial_queries_negative(self): + self._test_trivial_queries_helper("select 1 union all select 2", False) + self._test_trivial_queries_helper( + "select * from functional_parquet.alltypes limit 1", False) + + # Cases when the query contains function sleep(). + self._test_trivial_queries_helper( + "select 1 union all select sleep(1)", False) + self._test_trivial_queries_helper( + "select 1 from functional.alltypes limit 0 union all select sleep(1)", + False) + self._test_trivial_queries_helper( + "select a from (select 1 a, sleep(1)) s", False) + self._test_trivial_queries_helper("select sleep(1)", False) + self._test_trivial_queries_helper("select ISTRUE(sleep(1))", False) + self._test_trivial_queries_helper( + "select 1 from functional.alltypes limit 0 " + "union all select ISTRUE(sleep(1))", + False) + + def _test_trivial_queries_helper(self, sql, expect_trivial=True): + trivial_query_handle = self.client.execute_async(sql) + if expect_trivial: + expect_msg = "Admission result: Admitted as a trivial query" + else: + expect_msg = "Admission result: Queued" + self._wait_for_change_to_profile(trivial_query_handle, expect_msg) + self.client.close_query(trivial_query_handle) + def _wait_for_change_to_profile( self, query_handle, search_string, timeout=20, client=None): if client is None: @@ -1161,7 +1351,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # Ensure that the query has started executing. self.wait_for_admission_control(long_query_resp.operationHandle) # Submit another query. - queued_query_resp = self.execute_statement("select 1") + queued_query_resp = self.execute_statement("select sleep(1)") # Wait until the query is queued. self.wait_for_operation_state(queued_query_resp.operationHandle, TCLIService.TOperationState.PENDING_STATE) @@ -1193,6 +1383,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): STALE_TOPIC_THRESHOLD_MS), statestored_args=_STATESTORED_ARGS) def test_statestore_outage(self): + self.client.set_configuration_option('enable_trivial_query_for_admission', 'false') + """Test behaviour with a failed statestore. Queries should continue to be admitted but we should generate diagnostics about the stale topic.""" self.cluster.statestored.kill() @@ -1270,7 +1462,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): # With a new client, execute a query and observe that it gets queued and ultimately # succeeds. client = self.create_impala_client() - result = self.execute_query_expect_success(client, "select 1") + result = self.execute_query_expect_success(client, "select sleep(1)") start_cluster_thread.join() profile = result.runtime_profile reasons = self.__extract_init_queue_reasons([profile]) diff --git a/tests/custom_cluster/test_session_expiration.py b/tests/custom_cluster/test_session_expiration.py index 05f81a2f6..fb9771c83 100644 --- a/tests/custom_cluster/test_session_expiration.py +++ b/tests/custom_cluster/test_session_expiration.py @@ -103,6 +103,9 @@ class TestSessionExpiration(CustomClusterTestSuite): impalad = self.cluster.get_any_impalad() client = impalad.service.create_beeswax_client() client.execute("SET IDLE_SESSION_TIMEOUT=3") + # Set disable the trivial query otherwise "select 1" would be admitted as a + # trivial query. + client.execute("set enable_trivial_query_for_admission=false") client.execute_async("select sleep(10000)") queued_handle = client.execute_async("select 1") impalad.service.wait_for_metric_value( diff --git a/tests/custom_cluster/test_shell_interactive.py b/tests/custom_cluster/test_shell_interactive.py index 361a1264e..f9ed9c8bb 100644 --- a/tests/custom_cluster/test_shell_interactive.py +++ b/tests/custom_cluster/test_shell_interactive.py @@ -45,6 +45,7 @@ class TestShellInteractive(CustomClusterTestSuite): for vector in\ [ImpalaTestVector([value]) for value in create_client_protocol_dimension()]: proc = spawn_shell(get_shell_cmd(vector)) + proc.sendline("set enable_trivial_query_for_admission=false;") # Check with only live_summary set to true. proc.expect("{0}] default>".format(get_impalad_port(vector))) proc.sendline("set live_summary=true;")
