This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b3e9c4a65fa63da6f373c9ecec41fe4247e5e7d8
Author: Yida Wu <[email protected]>
AuthorDate: Wed Nov 2 20:34:54 2022 -0700

    IMPALA-7969: Always admit trivial queries immediately
    
    The idea of trivial query is to allow certain queries to bypass the
    admission control, and therefore accelerating the query execution
    even when the server resource is at capacity. It could benefit
    the queries that require a fast response while consuming the
    minimum resources.
    
    This patch adds support for the trivial query detection and allows
    an immediate admission for the trivial query. We define the trivial
    query as a subset of the coordinator-only query, and returns no more
    than one row. The definition is as below:
      - Must have PLAN ROOT SINK as the root
      - Can contain UNION and EMPTYSET nodes only
      - Results can not be over one row
    
    Examples of a trivial query:
      - select 1;
      - select * from table limit 0;
      - select * from table limit 0 union all select 1;
      - select 1, (2 + 3);
    
    Also, we restrict the parallelism of execution of the trivial
    query, each resource pool can execute no more than three trivial
    queries at the same time. If the maximum parallelism is reached,
    the admission controller would try to admit the trivial query
    via normal process. More precisely, if the cluster is running with
    a global admission controller, the max parallelism for the trivial
    query is three per resource pool, but if there is no global
    admission controller, each coordinator would admit the trivial
    queries based on its own local variable, therefore, the max
    parallelism would be three per coordinator per resource pool in
    this case.
    
    As the first patch, we try to keep the trivial query as simple as
    possible, and it could be extended in future.
    
    Added query option enable_trivial_query_for_admission to control
    whether the trivial query policy is enabled.
    
    Tests:
    Passed exhaustive tests.
    Added test_trivial_query and test_trivial_query_low_mem.
    
    Change-Id: I2a729764e3055d7eb11900c96c82ff53eb261f91
    Reviewed-on: http://gerrit.cloudera.org:8080/19214
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/scheduling/admission-controller-test.cc     |   4 +-
 be/src/scheduling/admission-controller.cc          |  74 ++++++--
 be/src/scheduling/admission-controller.h           |  42 ++++-
 be/src/scheduling/schedule-state.cc                |   5 +
 be/src/scheduling/schedule-state.h                 |   3 +
 be/src/service/query-options.cc                    |   4 +
 be/src/service/query-options.h                     |   5 +-
 common/thrift/ImpalaService.thrift                 |   3 +
 common/thrift/Query.thrift                         |   6 +
 .../main/java/org/apache/impala/analysis/Expr.java |  12 ++
 .../java/org/apache/impala/planner/Planner.java    |   4 +
 .../apache/impala/planner/TrivialQueryChecker.java | 113 ++++++++++++
 tests/common/resource_pool_config.py               |   1 +
 tests/custom_cluster/test_admission_controller.py  | 196 ++++++++++++++++++++-
 tests/custom_cluster/test_session_expiration.py    |   3 +
 tests/custom_cluster/test_shell_interactive.py     |   1 +
 16 files changed, 444 insertions(+), 32 deletions(-)

diff --git a/be/src/scheduling/admission-controller-test.cc 
b/be/src/scheduling/admission-controller-test.cc
index 9732a7e5d..85e2073e4 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -743,7 +743,7 @@ TEST_F(AdmissionControllerTest, PoolStats) {
   CheckPoolStatsEmpty(pool_stats);
 
   // Show that Admit and Release leave stats at zero.
-  pool_stats->AdmitQueryAndMemory(*schedule_state);
+  pool_stats->AdmitQueryAndMemory(*schedule_state, false);
   ASSERT_EQ(1, pool_stats->agg_num_running());
   ASSERT_EQ(1, pool_stats->metrics()->agg_num_running->GetValue());
   int64_t mem_to_release = 0;
@@ -752,7 +752,7 @@ TEST_F(AdmissionControllerTest, PoolStats) {
         admission_controller->GetMemToAdmit(*schedule_state, 
backend_state.second);
   }
   pool_stats->ReleaseMem(mem_to_release);
-  pool_stats->ReleaseQuery(0);
+  pool_stats->ReleaseQuery(0, false);
   CheckPoolStatsEmpty(pool_stats);
 }
 
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index 1f0c80dfd..dc2b53e10 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -68,6 +68,7 @@ namespace impala {
 const int64_t AdmissionController::PoolStats::HISTOGRAM_NUM_OF_BINS = 128;
 const int64_t AdmissionController::PoolStats::HISTOGRAM_BIN_SIZE = 1024L * 
1024L * 1024L;
 const double AdmissionController::PoolStats::EMA_MULTIPLIER = 0.2;
+const int AdmissionController::PoolStats::MAX_NUM_TRIVIAL_QUERY_RUNNING = 3;
 
 /// Convenience method.
 string PrintBytes(int64_t value) {
@@ -147,6 +148,8 @@ const string 
POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT =
 const string AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT = 
"Admission result";
 const string AdmissionController::PROFILE_INFO_VAL_ADMIT_IMMEDIATELY =
     "Admitted immediately";
+const string AdmissionController::PROFILE_INFO_VAL_ADMIT_TRIVIAL =
+    "Admitted as a trivial query";
 const string AdmissionController::PROFILE_INFO_VAL_QUEUED = "Queued";
 const string AdmissionController::PROFILE_INFO_VAL_CANCELLED_IN_QUEUE =
     "Cancelled (queued)";
@@ -335,6 +338,7 @@ string AdmissionController::PoolStats::DebugString() const {
   ss << "agg_num_queued=" << agg_num_queued_ << ", ";
   ss << "agg_mem_reserved=" << PrintBytes(agg_mem_reserved_) << ", ";
   ss << " local_host(local_mem_admitted=" << PrintBytes(local_mem_admitted_) 
<< ", ";
+  ss << "local_trivial_running=" << local_trivial_running_ << ", ";
   ss << DebugPoolStats(local_stats_) << ")";
   return ss.str();
 }
@@ -676,7 +680,8 @@ Status AdmissionController::Init() {
   return status;
 }
 
-void AdmissionController::PoolStats::AdmitQueryAndMemory(const ScheduleState& 
state) {
+void AdmissionController::PoolStats::AdmitQueryAndMemory(
+    const ScheduleState& state, bool is_trivial) {
   int64_t cluster_mem_admitted = state.GetClusterMemoryToAdmit();
   DCHECK_GT(cluster_mem_admitted, 0);
   local_mem_admitted_ += cluster_mem_admitted;
@@ -689,9 +694,11 @@ void 
AdmissionController::PoolStats::AdmitQueryAndMemory(const ScheduleState& st
   metrics_.local_num_admitted_running->Increment(1L);
 
   metrics_.total_admitted->Increment(1L);
+  if (is_trivial) ++local_trivial_running_;
 }
 
-void AdmissionController::PoolStats::ReleaseQuery(int64_t 
peak_mem_consumption) {
+void AdmissionController::PoolStats::ReleaseQuery(
+    int64_t peak_mem_consumption, bool is_trivial) {
   // Update stats tracking the number of running and admitted queries.
   agg_num_running_ -= 1;
   metrics_.agg_num_running->Increment(-1L);
@@ -702,6 +709,10 @@ void AdmissionController::PoolStats::ReleaseQuery(int64_t 
peak_mem_consumption)
   metrics_.total_released->Increment(1L);
   DCHECK_GE(local_stats_.num_admitted_running, 0);
   DCHECK_GE(agg_num_running_, 0);
+  if (is_trivial) {
+    --local_trivial_running_;
+    DCHECK_GE(local_trivial_running_, 0);
+  }
 
   // Update the 'peak_mem_histogram' based on the given peak memory 
consumption of the
   // query, if provided.
@@ -773,14 +784,15 @@ void 
AdmissionController::UpdateStatsOnReleaseForBackends(const UniqueIdPB& quer
   pools_for_updates_.insert(running_query.request_pool);
 }
 
-void AdmissionController::UpdateStatsOnAdmission(const ScheduleState& state) {
+void AdmissionController::UpdateStatsOnAdmission(
+    const ScheduleState& state, bool is_trivial) {
   for (const auto& entry : state.per_backend_schedule_states()) {
     const NetworkAddressPB& host_addr = entry.first;
     int64_t mem_to_admit = GetMemToAdmit(state, entry.second);
     UpdateHostStats(host_addr, mem_to_admit, 1, 
entry.second.exec_params->slots_to_use());
   }
   PoolStats* pool_stats = GetPoolStats(state);
-  pool_stats->AdmitQueryAndMemory(state);
+  pool_stats->AdmitQueryAndMemory(state, is_trivial);
   pools_for_updates_.insert(state.request_pool());
 }
 
@@ -950,6 +962,13 @@ bool AdmissionController::HasAvailableSlots(const 
ScheduleState& state,
   return true;
 }
 
+bool AdmissionController::CanAdmitTrivialRequest(const ScheduleState& state) {
+  PoolStats* pool_stats = GetPoolStats(state);
+  DCHECK(pool_stats != nullptr);
+  return pool_stats->local_trivial_running() + 1
+      <= PoolStats::MAX_NUM_TRIVIAL_QUERY_RUNNING;
+}
+
 bool AdmissionController::CanAdmitRequest(const ScheduleState& state,
     const TPoolConfig& pool_cfg, bool admit_from_queue, string* 
not_admitted_reason,
     string* not_admitted_details, bool& coordinator_resource_limited) {
@@ -959,7 +978,6 @@ bool AdmissionController::CanAdmitRequest(const 
ScheduleState& state,
   //  (c) One of the executors in 'schedule' is already at its maximum number 
of requests
   //      (when not using the default executor group).
   //  (d) There are not enough memory resources available for the query.
-
   const int64_t max_requests = GetMaxRequestsForPool(pool_cfg);
   PoolStats* pool_stats = GetPoolStats(state);
   bool default_group =
@@ -1222,9 +1240,10 @@ Status AdmissionController::SubmitForAdmission(const 
AdmissionRequest& request,
     stats->UpdateConfigMetrics(queue_node->pool_cfg);
 
     bool unused_bool;
+    bool is_trivial = false;
     bool must_reject =
         !FindGroupToAdmitOrReject(membership_snapshot, queue_node->pool_cfg,
-            /* admit_from_queue=*/false, stats, queue_node, unused_bool);
+            /* admit_from_queue=*/false, stats, queue_node, unused_bool, 
&is_trivial);
     if (must_reject) {
       AdmissionOutcome outcome = 
admit_outcome->Set(AdmissionOutcome::REJECTED);
       if (outcome != AdmissionOutcome::REJECTED) {
@@ -1246,16 +1265,18 @@ Status AdmissionController::SubmitForAdmission(const 
AdmissionRequest& request,
       DCHECK(queue_node->admitted_schedule->query_schedule_pb().get() != 
nullptr);
       const string& group_name = 
queue_node->admitted_schedule->executor_group();
       VLOG(3) << "Can admit to group " << group_name << " (or cancelled)";
-      DCHECK_EQ(stats->local_stats().num_queued, 0);
+      if (!is_trivial) DCHECK_EQ(stats->local_stats().num_queued, 0);
       AdmissionOutcome outcome = 
admit_outcome->Set(AdmissionOutcome::ADMITTED);
       if (outcome != AdmissionOutcome::ADMITTED) {
         DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED);
-        VLOG_QUERY << "Ready to be " << PROFILE_INFO_VAL_ADMIT_IMMEDIATELY
+        VLOG_QUERY << "Ready to be "
+                   << (is_trivial ? PROFILE_INFO_VAL_ADMIT_TRIVIAL :
+                                    PROFILE_INFO_VAL_ADMIT_IMMEDIATELY)
                    << " but already cancelled, query id=" << 
PrintId(request.query_id);
         return Status::CANCELLED;
       }
       VLOG_QUERY << "Admitting query id=" << PrintId(request.query_id);
-      AdmitQuery(queue_node, false);
+      AdmitQuery(queue_node, false /* was_queued */, is_trivial);
       stats->UpdateWaitTime(0);
       VLOG_RPC << "Final: " << stats->DebugString();
       *schedule_result = 
move(queue_node->admitted_schedule->query_schedule_pb());
@@ -1429,7 +1450,7 @@ void AdmissionController::ReleaseQuery(const UniqueIdPB& 
query_id,
     DCHECK_EQ(num_released_backends_.at(query_id), 0) << PrintId(query_id);
     num_released_backends_.erase(num_released_backends_.find(query_id));
     PoolStats* stats = GetPoolStats(running_query.request_pool);
-    stats->ReleaseQuery(peak_mem_consumption);
+    stats->ReleaseQuery(peak_mem_consumption, running_query.is_trivial);
     // No need to update the Host Stats as they should have been updated in
     // ReleaseQueryBackends.
     pools_for_updates_.insert(running_query.request_pool);
@@ -1848,7 +1869,7 @@ Status AdmissionController::ComputeGroupScheduleStates(
 bool AdmissionController::FindGroupToAdmitOrReject(
     ClusterMembershipMgr::SnapshotPtr membership_snapshot, const TPoolConfig& 
pool_config,
     bool admit_from_queue, PoolStats* pool_stats, QueueNode* queue_node,
-    bool& coordinator_resource_limited) {
+    bool& coordinator_resource_limited, bool* is_trivial) {
   // Check for rejection based on current cluster size
   const string& pool_name = pool_stats->name();
   string rejection_reason;
@@ -1890,9 +1911,21 @@ bool AdmissionController::FindGroupToAdmitOrReject(
                << " dedicated_coord_mem_estimate="
                << PrintBytes(state->GetDedicatedCoordMemoryEstimate())
                << " max_requests=" << max_requests << " max_queued=" << 
max_queued
-               << " max_mem=" << PrintBytes(max_mem);
+               << " max_mem=" << PrintBytes(max_mem) << " is_trivial_query="
+               << PrettyPrinter::Print(state->GetIsTrivialQuery(), 
TUnit::NONE);
     VLOG_QUERY << "Stats: " << pool_stats->DebugString();
 
+    if (state->GetIsTrivialQuery() && CanAdmitTrivialRequest(*state)) {
+      // The trivial query is supposed to be a subset of the coord-only query,
+      // so the executor group should be an empty group.
+      DCHECK_EQ(&executor_group, 
cluster_membership_mgr_->GetEmptyExecutorGroup());
+      VLOG_QUERY << "Admitted by trivial query policy.";
+      queue_node->admitted_schedule = std::move(group_state.state);
+      DCHECK(is_trivial != nullptr);
+      if (is_trivial != nullptr) *is_trivial = true;
+      return true;
+    }
+
     // Query is rejected if the rejection check fails on *any* group.
     if (RejectForSchedule(*state, pool_config, &rejection_reason)) {
       DCHECK(!rejection_reason.empty());
@@ -2029,10 +2062,11 @@ void AdmissionController::DequeueLoop() {
             && queue_node->admit_outcome->Get() == AdmissionOutcome::CANCELLED;
 
         bool coordinator_resource_limited = false;
+        bool is_trivial = false;
         bool is_rejected = !is_cancelled
             && !FindGroupToAdmitOrReject(membership_snapshot, pool_config,
-                /* admit_from_queue=*/true, stats, queue_node,
-                coordinator_resource_limited);
+                   /* admit_from_queue=*/true, stats, queue_node,
+                   coordinator_resource_limited, &is_trivial);
 
         if (!is_cancelled && !is_rejected
             && queue_node->admitted_schedule.get() == nullptr) {
@@ -2089,7 +2123,7 @@ void AdmissionController::DequeueLoop() {
         DCHECK(!is_cancelled);
         DCHECK(!is_rejected);
         DCHECK(queue_node->admitted_schedule != nullptr);
-        AdmitQuery(queue_node, true);
+        AdmitQuery(queue_node, true /* was_queued */, is_trivial);
       }
       pools_for_updates_.insert(pool_name);
     }
@@ -2161,7 +2195,7 @@ AdmissionController::PoolStats* 
AdmissionController::GetPoolStats(
   return &it->second;
 }
 
-void AdmissionController::AdmitQuery(QueueNode* node, bool was_queued) {
+void AdmissionController::AdmitQuery(QueueNode* node, bool was_queued, bool 
is_trivial) {
   ScheduleState* state = node->admitted_schedule.get();
   VLOG_RPC << "For Query " << PrintId(state->query_id())
            << " per_backend_mem_limit set to: "
@@ -2173,11 +2207,12 @@ void AdmissionController::AdmitQuery(QueueNode* node, 
bool was_queued) {
            << " coord_backend_mem_to_admit set to: "
            << PrintBytes(state->coord_backend_mem_to_admit());
   // Update memory and number of queries.
-  UpdateStatsOnAdmission(*state);
+  UpdateStatsOnAdmission(*state, is_trivial);
   UpdateExecGroupMetric(state->executor_group(), 1);
   // Update summary profile.
-  const string& admission_result =
-      was_queued ? PROFILE_INFO_VAL_ADMIT_QUEUED : 
PROFILE_INFO_VAL_ADMIT_IMMEDIATELY;
+  const string& admission_result = was_queued ?
+      PROFILE_INFO_VAL_ADMIT_QUEUED :
+      (is_trivial ? PROFILE_INFO_VAL_ADMIT_TRIVIAL : 
PROFILE_INFO_VAL_ADMIT_IMMEDIATELY);
   state->summary_profile()->AddInfoString(
       PROFILE_INFO_KEY_ADMISSION_RESULT, admission_result);
   state->summary_profile()->AddInfoString(
@@ -2212,6 +2247,7 @@ void AdmissionController::AdmitQuery(QueueNode* node, 
bool was_queued) {
   RunningQuery& running_query = it->second[state->query_id()];
   running_query.request_pool = state->request_pool();
   running_query.executor_group = state->executor_group();
+  running_query.is_trivial = is_trivial;
   for (const auto& entry : state->per_backend_schedule_states()) {
     BackendAllocation& allocation = 
running_query.per_backend_resources[entry.first];
     allocation.slots_to_use = entry.second.exec_params->slots_to_use();
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index fec28d5d4..758c92f05 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -328,6 +328,7 @@ class AdmissionController {
   // Profile info strings
   static const std::string PROFILE_INFO_KEY_ADMISSION_RESULT;
   static const std::string PROFILE_INFO_VAL_ADMIT_IMMEDIATELY;
+  static const std::string PROFILE_INFO_VAL_ADMIT_TRIVIAL;
   static const std::string PROFILE_INFO_VAL_QUEUED;
   static const std::string PROFILE_INFO_VAL_CANCELLED_IN_QUEUE;
   static const std::string PROFILE_INFO_VAL_ADMIT_QUEUED;
@@ -546,23 +547,30 @@ class AdmissionController {
     };
 
     PoolStats(AdmissionController* parent, const std::string& name)
-      : name_(name), parent_(parent), agg_num_running_(0), agg_num_queued_(0),
-        agg_mem_reserved_(0), local_mem_admitted_(0), wait_time_ms_ema_(0.0) {
+      : name_(name),
+        parent_(parent),
+        agg_num_running_(0),
+        agg_num_queued_(0),
+        agg_mem_reserved_(0),
+        local_trivial_running_(0),
+        local_mem_admitted_(0),
+        wait_time_ms_ema_(0.0) {
       peak_mem_histogram_.resize(HISTOGRAM_NUM_OF_BINS, 0);
       InitMetrics();
     }
 
     int64_t agg_num_running() const { return agg_num_running_; }
     int64_t agg_num_queued() const { return agg_num_queued_; }
+    int64_t local_trivial_running() const { return local_trivial_running_; }
     int64_t EffectiveMemReserved() const {
       return std::max(agg_mem_reserved_, local_mem_admitted_);
     }
 
     // ADMISSION LIFECYCLE METHODS
     /// Updates the pool stats when the request represented by 'state' is 
admitted.
-    void AdmitQueryAndMemory(const ScheduleState& state);
+    void AdmitQueryAndMemory(const ScheduleState& state, bool is_trivial);
     /// Updates the pool stats except the memory admitted stat.
-    void ReleaseQuery(int64_t peak_mem_consumption);
+    void ReleaseQuery(int64_t peak_mem_consumption, bool is_trivial);
     /// Releases the specified memory from the pool stats.
     void ReleaseMem(int64_t mem_to_release);
     /// Updates the pool stats when the request represented by 'state' is 
queued.
@@ -630,6 +638,9 @@ class AdmissionController {
 
     const std::string& name() const { return name_; }
 
+    /// The max number of running trivial queries that can be allowed at the 
same time.
+    static const int MAX_NUM_TRIVIAL_QUERY_RUNNING;
+
    private:
     const std::string name_;
     AdmissionController* parent_;
@@ -648,6 +659,13 @@ class AdmissionController {
     /// other hosts. Updated only by UpdateAggregates().
     int64_t agg_mem_reserved_;
 
+    /// Number of running trivial queries in this pool that have been admitted 
by this
+    /// local coordinator. The purpose of it is to control the concurrency of 
running
+    /// trivial queries in case they may consume too many resources because 
trivial
+    /// queries bypass the normal admission control procedure.
+    /// Updated only in AdmitQueryAndMemory() and ReleaseQuery().
+    int64_t local_trivial_running_;
+
     /// Memory in this pool (across all nodes) that is needed for requests 
that have been
     /// admitted by this local coordinator. Updated only on Admit() and 
Release(). Stored
     /// separately from the other 'local' stats in local_stats_ because it is 
not sent
@@ -855,6 +873,9 @@ class AdmissionController {
     /// Map from backend addresses to the resouces this query was allocated on 
them. When
     /// backends are released, they are removed from this map.
     std::unordered_map<NetworkAddressPB, BackendAllocation> 
per_backend_resources;
+
+    /// Indicate whether the query is admitted as a trivial query.
+    bool is_trivial;
   };
 
   /// Map from host id to a map from query id of currently running queries to 
information
@@ -933,9 +954,12 @@ class AdmissionController {
   /// true and keeps queue_node->admitted_schedule unset if the query cannot 
be admitted
   /// now, but also does not need to be rejected. If the query must be 
rejected, this
   /// method returns false and sets queue_node->not_admitted_reason.
+  /// The is_trivial is set to true when is_trivial is not null if the query 
is admitted
+  /// as a trivial query.
   bool FindGroupToAdmitOrReject(ClusterMembershipMgr::SnapshotPtr 
membership_snapshot,
       const TPoolConfig& pool_config, bool admit_from_queue, PoolStats* 
pool_stats,
-      QueueNode* queue_node, bool& coordinator_resource_limited);
+      QueueNode* queue_node, bool& coordinator_resource_limited,
+      bool* is_trivial = nullptr);
 
   /// Dequeues the queued queries when notified by dequeue_cv_ and admits them 
if they
   /// have not been cancelled yet.
@@ -952,6 +976,10 @@ class AdmissionController {
       bool admit_from_queue, string* not_admitted_reason, string* 
not_admitted_details,
       bool& coordinator_resource_limited);
 
+  /// Returns true if the query can be admitted as a trivial query, therefore 
it can
+  /// bypass the admission control immediately.
+  bool CanAdmitTrivialRequest(const ScheduleState& state);
+
   /// Returns true if all executors can accommodate the largest initial 
reservation of
   /// any executor and the backend running the coordinator fragment can 
accommodate its
   /// own initial reservation. Otherwise, returns false with the details about 
the memory
@@ -988,7 +1016,7 @@ class AdmissionController {
   /// Updates the memory admitted and the num of queries running for each 
backend in
   /// 'state'. Also updates the stats of its associated resource pool. Used 
only when
   /// the 'state' is admitted.
-  void UpdateStatsOnAdmission(const ScheduleState& state);
+  void UpdateStatsOnAdmission(const ScheduleState& state, bool is_trivial);
 
   /// Updates the memory admitted and the num of queries running for each 
backend in
   /// 'state' which have been release/completed. The list of completed 
backends is
@@ -1057,7 +1085,7 @@ class AdmissionController {
   /// Sets the per host mem limit and mem admitted in the schedule and does 
the necessary
   /// accounting and logging on successful submission.
   /// Caller must hold 'admission_ctrl_lock_'.
-  void AdmitQuery(QueueNode* node, bool was_queued);
+  void AdmitQuery(QueueNode* node, bool was_queued, bool is_trivial);
 
   /// Same as PoolToJson() but requires 'admission_ctrl_lock_' to be held by 
the caller.
   /// Is a helper method used by both PoolToJson() and AllPoolsToJson()
diff --git a/be/src/scheduling/schedule-state.cc 
b/be/src/scheduling/schedule-state.cc
index 79859876a..e9243c2f3 100644
--- a/be/src/scheduling/schedule-state.cc
+++ b/be/src/scheduling/schedule-state.cc
@@ -218,6 +218,11 @@ int64_t ScheduleState::GetDedicatedCoordMemoryEstimate() 
const {
   return request_.dedicated_coord_mem_estimate;
 }
 
+bool ScheduleState::GetIsTrivialQuery() const {
+  DCHECK(request_.__isset.is_trivial_query);
+  return request_.is_trivial_query;
+}
+
 void ScheduleState::IncNumScanRanges(int64_t delta) {
   
query_schedule_pb_->set_num_scan_ranges(query_schedule_pb_->num_scan_ranges() + 
delta);
 }
diff --git a/be/src/scheduling/schedule-state.h 
b/be/src/scheduling/schedule-state.h
index 233b25633..50e9af012 100644
--- a/be/src/scheduling/schedule-state.h
+++ b/be/src/scheduling/schedule-state.h
@@ -177,6 +177,9 @@ class ScheduleState {
   /// dedicated coordinator.
   int64_t GetDedicatedCoordMemoryEstimate() const;
 
+  /// Return whether the request is a trivial query.
+  bool GetIsTrivialQuery() const;
+
   /// Helper methods used by scheduler to populate this ScheduleState.
   void IncNumScanRanges(int64_t delta);
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 4579a9612..e9c72c4ed 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -598,6 +598,10 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         
query_options->__set_max_mem_estimate_for_admission(mem_spec_val.value);
         break;
       }
+      case TImpalaQueryOptions::ENABLE_TRIVIAL_QUERY_FOR_ADMISSION: {
+        query_options->__set_enable_trivial_query_for_admission(IsTrue(value));
+        break;
+      }
       case TImpalaQueryOptions::THREAD_RESERVATION_LIMIT: {
         int32_t int32_t_val = 0;
         
RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<int32_t>(
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index d21038491..402582bf8 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                       
          \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                       
          \
-      TImpalaQueryOptions::STRINGIFY_MAP_KEYS + 1);                            
        \
+      TImpalaQueryOptions::ENABLE_TRIVIAL_QUERY_FOR_ADMISSION + 1);            
          \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)     
          \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)   
          \
@@ -282,7 +282,8 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
       disable_codegen_cache, DISABLE_CODEGEN_CACHE, 
TQueryOptionLevel::ADVANCED)         \
   QUERY_OPT_FN(codegen_cache_mode, CODEGEN_CACHE_MODE, 
TQueryOptionLevel::DEVELOPMENT)   \
   QUERY_OPT_FN(stringify_map_keys, STRINGIFY_MAP_KEYS, 
TQueryOptionLevel::ADVANCED)      \
-;
+  QUERY_OPT_FN(enable_trivial_query_for_admission, 
ENABLE_TRIVIAL_QUERY_FOR_ADMISSION,   \
+      TQueryOptionLevel::REGULAR);
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
 static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 49a7595be..add37998b 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -757,6 +757,9 @@ enum TImpalaQueryOptions {
 
   // Convert non-string map keys to string to produce valid JSON.
   STRINGIFY_MAP_KEYS = 151
+
+  // Enable immediate admission for trivial queries.
+  ENABLE_TRIVIAL_QUERY_FOR_ADMISSION = 152
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 0bf150e14..80230f5f9 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -615,6 +615,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   152: optional bool stringify_map_keys = false;
+
+  // See comment in ImpalaService.thrift
+  153: optional bool enable_trivial_query_for_admission = true;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and 
external
@@ -889,5 +892,8 @@ struct TQueryExecRequest {
   // fragment will run on a dedicated coordinator. Set by the planner and used 
by
   // admission control.
   12: optional i64 dedicated_coord_mem_estimate;
+
+  // Indicate whether the request is a trivial query. Used by admission 
control.
+  13: optional bool is_trivial_query
 }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java 
b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 722d99df4..44c478efb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -363,6 +363,18 @@ abstract public class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
         }
       };
 
+  // Returns true if an Expr is a builtin sleep function.
+  public static final com.google.common.base.Predicate<Expr> IS_FN_SLEEP =
+      new com.google.common.base.Predicate<Expr>() {
+        @Override
+        public boolean apply(Expr arg) {
+          return arg instanceof FunctionCallExpr
+              && ((FunctionCallExpr) arg).getFnName().isBuiltin()
+              && ((FunctionCallExpr) arg).getFnName().getFunction() != null
+              && ((FunctionCallExpr) 
arg).getFnName().getFunction().equals("sleep");
+        }
+      };
+
   // id that's unique across the entire query statement and is assigned by
   // Analyzer.registerConjuncts(); only assigned for the top-level terms of a
   // conjunction, and therefore null for most Exprs
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 9c9056206..2e077fe31 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -421,6 +421,9 @@ public class Planner {
     // profiles bottom-up since a fragment's profile may depend on its 
descendants.
     PlanFragment rootFragment = planRoots.get(0);
     List<PlanFragment> allFragments = rootFragment.getNodesPostOrder();
+    boolean trivial =
+        TrivialQueryChecker.IsTrivial(rootFragment, queryOptions, isQueryStmt);
+
     for (PlanFragment fragment: allFragments) {
       // Compute the per-node, per-sink and aggregate profiles for the 
fragment.
       fragment.computeResourceProfile(planCtx.getRootAnalyzer());
@@ -450,6 +453,7 @@ public class Planner {
     maxPerHostPeakResources = 
MIN_PER_HOST_RESOURCES.max(maxPerHostPeakResources);
 
     
request.setPer_host_mem_estimate(maxPerHostPeakResources.getMemEstimateBytes());
+    request.setIs_trivial_query(trivial);
     request.setMax_per_host_min_mem_reservation(
         maxPerHostPeakResources.getMinMemReservationBytes());
     request.setMax_per_host_thread_reservation(
diff --git 
a/fe/src/main/java/org/apache/impala/planner/TrivialQueryChecker.java 
b/fe/src/main/java/org/apache/impala/planner/TrivialQueryChecker.java
new file mode 100644
index 000000000..bcc6dc13a
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/TrivialQueryChecker.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.FunctionCallExpr;
+import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.catalog.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Check whether a query is trivial or not. A trivial query is allowed for an 
immediate
+ * admission even in the case that the admission pool runs out of the memory 
or hits the
+ * limit of the request number.
+ *
+ */
+public class TrivialQueryChecker {
+  private final static Logger LOG = 
LoggerFactory.getLogger(TrivialQueryChecker.class);
+
+  /**
+   * Check whether query meets the general requirements that a trivial query 
must have.
+   */
+  private static boolean PassedMustHave(PlanFragment rootFragment) {
+    List<PlanFragment> allFragments = rootFragment.getNodesPostOrder();
+    if (allFragments.size() != 1) return false;
+    if (!(rootFragment.getSink() instanceof PlanRootSink)) return false;
+    PlanNode planRoot = rootFragment.getPlanRoot();
+    if (planRoot.numNodes() != 1) return false;
+    if (planRoot instanceof UnionNode) {
+      // The trivial query would return 0 or 1 row, otherwise return false.
+      if (((UnionNode) planRoot).constExprLists_.size() > 1
+          || ((UnionNode) planRoot).resultExprLists_.size() > 0) {
+        return false;
+      }
+    } else if (!(rootFragment.getPlanRoot() instanceof EmptySetNode)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * A helper method to check whether there is a sleep function expression 
inside the
+   * expression list. Used by PassedSpecialCheck() only.
+   */
+  private static boolean HasFunctionSleep(List<Expr> exprList) {
+    if (exprList == null) return false;
+    List<FunctionCallExpr> sleepFuncList = new ArrayList<>();
+    for (Expr expr : exprList) {
+      if (expr == null) continue;
+      expr.collectAll(Expr.IS_FN_SLEEP, sleepFuncList);
+      if (sleepFuncList.size() > 0) return true;
+    }
+    return false;
+  }
+
+  /**
+   * Check whether the query meets the special requirements of a trivial query.
+   * Should only be called after passing PassedMustHave().
+   */
+  private static boolean PassedSpecialCheck(PlanFragment rootFragment) {
+    // If contains sleep function, we don't consider it to be trivial, because 
it can
+    // sleep for a long time and doesn't meet the original idea of the setting 
of
+    // trivial queries.
+    // Also a lot of testcases use sleep for testing, it is better to treat it 
as a
+    // normal query.
+    PlanNode planRoot = rootFragment.getPlanRoot();
+    Preconditions.checkArgument(planRoot.numNodes() == 1);
+    if (planRoot instanceof UnionNode) {
+      UnionNode unionNode = (UnionNode) planRoot;
+      Preconditions.checkArgument(unionNode.resultExprLists_.size() == 0);
+      for (List<Expr> constList : unionNode.constExprLists_) {
+        if (HasFunctionSleep(constList)) return false;
+      }
+    } else {
+      // Must be an EmptySetNode if it is not a UnionNode.
+      Preconditions.checkArgument(rootFragment.getPlanRoot() instanceof 
EmptySetNode);
+    }
+    return true;
+  }
+
+  /**
+   * Returns whether the query is trivial. Used for admission controller.
+   */
+  public static boolean IsTrivial(
+      PlanFragment rootFragment, TQueryOptions queryOptions, boolean 
isQueryStmt) {
+    if (!queryOptions.isEnable_trivial_query_for_admission() || !isQueryStmt) {
+      return false;
+    }
+    if (!PassedMustHave(rootFragment)) return false;
+    return PassedSpecialCheck(rootFragment);
+  }
+}
diff --git a/tests/common/resource_pool_config.py 
b/tests/common/resource_pool_config.py
index 47a37b5f3..ad18c5916 100644
--- a/tests/common/resource_pool_config.py
+++ b/tests/common/resource_pool_config.py
@@ -65,6 +65,7 @@ class ResourcePoolConfig(object):
     metric_key = "admission-controller.{0}.root.{1}".format(metric_str, 
pool_name)
     start_time = time()
     while (time() - start_time < timeout):
+      client.execute("set enable_trivial_query_for_admission=false")
       handle = client.execute_async("select 'wait_for_config_change'")
       client.close_query(handle)
       current_val = str(self.ac_service.get_metric_value(metric_key))
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 213772ad7..445e0d208 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -314,6 +314,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
 
       # Also try setting a valid pool
       client.set_configuration({'request_pool': 'root.queueB'})
+      client.execute('set enable_trivial_query_for_admission=false')
       result = client.execute("select 1")
       # Query should execute in queueB which doesn't have a default mem limit 
set in the
       # llama-site.xml, so it should inherit the value from the default 
process query
@@ -325,6 +326,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
       # queueA allows only 1 running query and has a queue timeout of 50ms, so 
the
       # second concurrent query should time out quickly.
       client.set_configuration({'request_pool': 'root.queueA'})
+      client.execute('set enable_trivial_query_for_admission=false')
       handle = client.execute_async("select sleep(1000)")
       # Wait for query to clear admission control and get accounted for
       client.wait_for_admission_control(handle)
@@ -344,6 +346,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
       # proc/pool default.
       client.execute("set mem_limit=31337")
       client.execute("set abort_on_error=1")
+      client.execute('set enable_trivial_query_for_admission=false')
       result = client.execute("select 1")
       self.__check_query_options(result.runtime_profile,
           ['MEM_LIMIT=31337', 'ABORT_ON_ERROR=1', 'QUERY_TIMEOUT_S=5',
@@ -353,6 +356,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
       # config overlay sent with the query RPC. mem_limit is a pool-level 
override and
       # max_io_buffers has no proc/pool default.
       client.set_configuration({'request_pool': 'root.queueA', 'mem_limit': 
'12345'})
+      client.execute('set enable_trivial_query_for_admission=false')
       result = client.execute("select 1")
       self.__check_query_options(result.runtime_profile,
           ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA',
@@ -364,6 +368,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
       # abort on error, because it's back to being the default.
       client.execute('set mem_limit=""')
       client.execute('set abort_on_error=""')
+      client.execute('set enable_trivial_query_for_admission=false')
       client.set_configuration({'request_pool': 'root.queueA'})
       result = client.execute("select 1")
       self.__check_query_options(result.runtime_profile,
@@ -756,6 +761,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
     try:
       client.set_configuration_option("debug_action", 
"AC_BEFORE_ADMISSION:SLEEP@2000")
       client.set_configuration_option("mem_limit", self.PROC_MEM_TEST_LIMIT + 
1)
+      client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
       handle = client.execute_async("select 1")
       sleep(1)
       client.close_query(handle)
@@ -764,6 +770,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
       client.clear_configuration()
 
       client.set_configuration_option("debug_action", 
"AC_BEFORE_ADMISSION:SLEEP@2000")
+      client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
       handle = client.execute_async("select 2")
       sleep(1)
       client.close_query(handle)
@@ -772,6 +779,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
 
       client.set_configuration_option("debug_action",
           "CRS_BEFORE_COORD_STARTS:SLEEP@2000")
+      client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
       handle = client.execute_async("select 3")
       sleep(1)
       client.close_query(handle)
@@ -779,6 +787,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
           "Cancelled right after starting the coordinator query id=")
 
       client.set_configuration_option("debug_action", 
"CRS_AFTER_COORD_STARTS:SLEEP@2000")
+      client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
       handle = client.execute_async("select 4")
       sleep(1)
       client.close_query(handle)
@@ -789,6 +798,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
       handle = client.execute_async("select sleep(10000)")
       client.set_configuration_option("debug_action",
           "AC_AFTER_ADMISSION_OUTCOME:SLEEP@2000")
+      client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
       queued_query_handle = client.execute_async("select 5")
       sleep(1)
       assert client.get_state(queued_query_handle) == QueryState.COMPILED
@@ -805,6 +815,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
           self.get_ac_log_name(), 'INFO', "Dequeued cancelled query=")
       client.clear_configuration()
 
+      client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
       handle = client.execute_async("select sleep(10000)")
       queued_query_handle = client.execute_async("select 6")
       sleep(1)
@@ -832,6 +843,8 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
           pool_max_mem=1024 * 1024 * 1024),
       statestored_args=_STATESTORED_ARGS)
   def test_queue_reasons_num_queries(self):
+    self.client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
+
     """Test that queue details appear in the profile when queued based on 
num_queries."""
     # Run a bunch of queries - one should get admitted immediately, the rest 
should
     # be dequeued one-by-one.
@@ -864,6 +877,8 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
           pool_max_mem=10 * 1024 * 1024),
       statestored_args=_STATESTORED_ARGS)
   def test_queue_reasons_memory(self):
+    self.client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
+
     """Test that queue details appear in the profile when queued based on 
memory."""
     # Run a bunch of queries with mem_limit set so that only one can be 
admitted at a
     # time- one should get admitted immediately, the rest should be dequeued 
one-by-one.
@@ -905,6 +920,8 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
         queue_wait_timeout_ms=1000),
       statestored_args=_STATESTORED_ARGS)
   def test_timeout_reason_host_memory(self):
+    self.client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
+
     """Test that queue details appear in the profile when queued and then 
timed out
     due to a small 2MB host memory limit configuration."""
     # Run a bunch of queries with mem_limit set so that only one can be 
admitted
@@ -937,6 +954,8 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
         queue_wait_timeout_ms=1000),
       statestored_args=_STATESTORED_ARGS)
   def test_timeout_reason_pool_memory(self):
+    self.client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
+
     """Test that queue details appear in the profile when queued and then 
timed out
     due to a small 2MB pool memory limit configuration."""
     # Run a bunch of queries with mem_limit set so that only one can be 
admitted
@@ -1094,11 +1113,13 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     pool_name = "invalidTestPool"
     config_str = "max-query-mem-limit"
     self.client.set_configuration_option('request_pool', pool_name)
+    self.client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
     # Setup to queue a query.
     sleep_query_handle = self.client.execute_async("select sleep(10000)")
     self.client.wait_for_admission_control(sleep_query_handle)
     self._wait_for_change_to_profile(sleep_query_handle,
                                       "Admission result: Admitted immediately")
+    self.client.execute("set enable_trivial_query_for_admission=false")
     queued_query_handle = self.client.execute_async("select 2")
     self._wait_for_change_to_profile(queued_query_handle, "Admission result: 
Queued")
 
@@ -1135,6 +1156,175 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
     self.close_query(queued_query_handle)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
+      pool_max_mem=1024 * 1024 * 1024),
+    statestored_args=_STATESTORED_ARGS)
+  def test_trivial_query(self):
+    self.client.execute("set enable_trivial_query_for_admission=false")
+
+    # Test the second request does need to queue when trivial query is 
disabled.
+    sleep_query_handle = self.client.execute_async("select sleep(10000)")
+    self.client.wait_for_admission_control(sleep_query_handle)
+    self._wait_for_change_to_profile(sleep_query_handle,
+                                      "Admission result: Admitted immediately")
+    trivial_query_handle = self.client.execute_async("select 2")
+    self._wait_for_change_to_profile(trivial_query_handle, "Admission result: 
Queued")
+    self.client.close_query(sleep_query_handle)
+    self.client.close_query(trivial_query_handle)
+
+    self.client.execute("set enable_trivial_query_for_admission=true")
+    # Test when trivial query is enabled, all trivial queries should be
+    # admitted immediately.
+    sleep_query_handle = self.client.execute_async("select sleep(10000)")
+    self.client.wait_for_admission_control(sleep_query_handle)
+    self._wait_for_change_to_profile(sleep_query_handle,
+                                      "Admission result: Admitted immediately")
+    # Test the trivial queries.
+    self._test_trivial_queries_suc()
+    # Test the queries that are not trivial.
+    self._test_trivial_queries_negative()
+    self.client.close_query(sleep_query_handle)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
+      pool_max_mem=1),
+    statestored_args=_STATESTORED_ARGS)
+  def test_trivial_query_low_mem(self):
+    # Test whether it will fail for a normal query.
+    failed_query_handle = self.client.execute_async(
+            "select * from functional_parquet.alltypes limit 100")
+    self.wait_for_state(failed_query_handle, QueryState.EXCEPTION, 20)
+    self.client.close_query(failed_query_handle)
+    # Test it should pass all the trivial queries.
+    self._test_trivial_queries_suc()
+
+  class MultiTrivialRunThread(threading.Thread):
+    def __init__(self, admit_obj, sql, expect_err=False):
+        super(self.__class__, self).__init__()
+        self.admit_obj = admit_obj
+        self.sql = sql
+        self.error = None
+        self.expect_err = expect_err
+
+    def run(self):
+        try:
+          self._test_multi_trivial_query_runs()
+        except Exception as e:
+          LOG.exception(e)
+          self.error = e
+          raise e
+
+    def _test_multi_trivial_query_runs(self):
+      timeout = 10
+      admit_obj = self.admit_obj
+      client = admit_obj.cluster.impalads[0].service.create_beeswax_client()
+      for i in range(100):
+        handle = client.execute_async(self.sql)
+        if not self.expect_err:
+          assert client.wait_for_finished_timeout(handle, timeout)
+        else:
+          if not client.wait_for_finished_timeout(handle, timeout):
+            self.error = Exception("Wait timeout " + str(timeout) + " 
seconds.")
+            break
+        result = client.fetch(self.sql, handle)
+        assert result.success
+        client.close_query(handle)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_flags(max_requests=1, 
max_queued=100000,
+      pool_max_mem=1024 * 1024 * 1024),
+    statestored_args=_STATESTORED_ARGS)
+  def test_trivial_query_multi_runs(self):
+    threads = []
+    # Test mixed trivial and non-trivial queries workload, and should 
successfully run
+    # for all.
+    # Test the case when the number of trivial queries is over the maximum 
pallelism,
+    # which is three.
+    for i in range(5):
+      thread_instance = self.MultiTrivialRunThread(self, "select 1")
+      threads.append(thread_instance)
+    # Runs non-trivial queries below.
+    for i in range(2):
+      thread_instance = self.MultiTrivialRunThread(self, "select sleep(1)")
+      threads.append(thread_instance)
+    for thread_instance in threads:
+      thread_instance.start()
+    for thread_instance in threads:
+      thread_instance.join()
+    for thread_instance in threads:
+      if thread_instance.error is not None:
+        raise thread_instance.error
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_flags(max_requests=1, 
max_queued=100000,
+      pool_max_mem=1024 * 1024 * 1024),
+    statestored_args=_STATESTORED_ARGS)
+  def test_trivial_query_multi_runs_fallback(self):
+    threads = []
+    # Test the case when the number of trivial queries is over the maximum 
pallelism,
+    # which is three, other trivial queries should fall back to normal process 
and
+    # blocked by the long sleep query in our testcase, then leads to a timeout 
error.
+    long_query_handle = self.client.execute_async("select sleep(100000)")
+    for i in range(5):
+      thread_instance = self.MultiTrivialRunThread(self, "select 1", True)
+      threads.append(thread_instance)
+    for thread_instance in threads:
+      thread_instance.start()
+    for thread_instance in threads:
+      thread_instance.join()
+    has_error = False
+    for thread_instance in threads:
+      if thread_instance.error is not None:
+        assert "Wait timeout" in str(thread_instance.error)
+        has_error = True
+    assert has_error
+    self.client.close_query(long_query_handle)
+
+  def _test_trivial_queries_suc(self):
+    self._test_trivial_queries_helper("select 1")
+    self._test_trivial_queries_helper(
+            "select * from functional_parquet.alltypes limit 0")
+    self._test_trivial_queries_helper("select 1, (2 + 3)")
+    self._test_trivial_queries_helper(
+            "select id from functional_parquet.alltypes limit 0 union all 
select 1")
+    self._test_trivial_queries_helper(
+            "select 1 union all select id from functional_parquet.alltypes 
limit 0")
+
+  # Test the cases that do not fit for trivial queries.
+  def _test_trivial_queries_negative(self):
+    self._test_trivial_queries_helper("select 1 union all select 2", False)
+    self._test_trivial_queries_helper(
+            "select * from functional_parquet.alltypes limit 1", False)
+
+    # Cases when the query contains function sleep().
+    self._test_trivial_queries_helper(
+            "select 1 union all select sleep(1)", False)
+    self._test_trivial_queries_helper(
+            "select 1 from functional.alltypes limit 0 union all select 
sleep(1)",
+            False)
+    self._test_trivial_queries_helper(
+            "select a from (select 1 a, sleep(1)) s", False)
+    self._test_trivial_queries_helper("select sleep(1)", False)
+    self._test_trivial_queries_helper("select ISTRUE(sleep(1))", False)
+    self._test_trivial_queries_helper(
+            "select 1 from functional.alltypes limit 0 "
+            "union all select ISTRUE(sleep(1))",
+            False)
+
+  def _test_trivial_queries_helper(self, sql, expect_trivial=True):
+    trivial_query_handle = self.client.execute_async(sql)
+    if expect_trivial:
+      expect_msg = "Admission result: Admitted as a trivial query"
+    else:
+      expect_msg = "Admission result: Queued"
+    self._wait_for_change_to_profile(trivial_query_handle, expect_msg)
+    self.client.close_query(trivial_query_handle)
+
   def _wait_for_change_to_profile(
       self, query_handle, search_string, timeout=20, client=None):
     if client is None:
@@ -1161,7 +1351,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     # Ensure that the query has started executing.
     self.wait_for_admission_control(long_query_resp.operationHandle)
     # Submit another query.
-    queued_query_resp = self.execute_statement("select 1")
+    queued_query_resp = self.execute_statement("select sleep(1)")
     # Wait until the query is queued.
     self.wait_for_operation_state(queued_query_resp.operationHandle,
             TCLIService.TOperationState.PENDING_STATE)
@@ -1193,6 +1383,8 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
           STALE_TOPIC_THRESHOLD_MS),
       statestored_args=_STATESTORED_ARGS)
   def test_statestore_outage(self):
+    self.client.set_configuration_option('enable_trivial_query_for_admission', 
'false')
+
     """Test behaviour with a failed statestore. Queries should continue to be 
admitted
     but we should generate diagnostics about the stale topic."""
     self.cluster.statestored.kill()
@@ -1270,7 +1462,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     # With a new client, execute a query and observe that it gets queued and 
ultimately
     # succeeds.
     client = self.create_impala_client()
-    result = self.execute_query_expect_success(client, "select 1")
+    result = self.execute_query_expect_success(client, "select sleep(1)")
     start_cluster_thread.join()
     profile = result.runtime_profile
     reasons = self.__extract_init_queue_reasons([profile])
diff --git a/tests/custom_cluster/test_session_expiration.py 
b/tests/custom_cluster/test_session_expiration.py
index 05f81a2f6..fb9771c83 100644
--- a/tests/custom_cluster/test_session_expiration.py
+++ b/tests/custom_cluster/test_session_expiration.py
@@ -103,6 +103,9 @@ class TestSessionExpiration(CustomClusterTestSuite):
     impalad = self.cluster.get_any_impalad()
     client = impalad.service.create_beeswax_client()
     client.execute("SET IDLE_SESSION_TIMEOUT=3")
+    # Set disable the trivial query otherwise "select 1" would be admitted as a
+    # trivial query.
+    client.execute("set enable_trivial_query_for_admission=false")
     client.execute_async("select sleep(10000)")
     queued_handle = client.execute_async("select 1")
     impalad.service.wait_for_metric_value(
diff --git a/tests/custom_cluster/test_shell_interactive.py 
b/tests/custom_cluster/test_shell_interactive.py
index 361a1264e..f9ed9c8bb 100644
--- a/tests/custom_cluster/test_shell_interactive.py
+++ b/tests/custom_cluster/test_shell_interactive.py
@@ -45,6 +45,7 @@ class TestShellInteractive(CustomClusterTestSuite):
     for vector in\
         [ImpalaTestVector([value]) for value in 
create_client_protocol_dimension()]:
       proc = spawn_shell(get_shell_cmd(vector))
+      proc.sendline("set enable_trivial_query_for_admission=false;")
       # Check with only live_summary set to true.
       proc.expect("{0}] default>".format(get_impalad_port(vector)))
       proc.sendline("set live_summary=true;")

Reply via email to