This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 831585c2f52934f9367639eff47e13afcf0fc7b2
Author: Joe McDonnell <[email protected]>
AuthorDate: Mon Jun 17 20:25:00 2024 -0700

    IMPALA-12906: Incorporate scan range information into the tuple cache key
    
    This change is accomplishing two things:
    1. It incorporates scan range information into the tuple
       cache key.
    2. It reintroduces deterministic scheduling as an option
       for mt_dop and uses it for HdfsScanNodes that feed
       into a TupleCacheNode.
    
    The combination of these two things solves several problems:
    1. When a table is modified, the list of scan ranges will
       change, and this will naturally change the cache keys.
    2. When accessing a partitioned table, two queries may have
       different predicates on the partition columns. Since the
       predicates can be satisfied via partition pruning, they are
       not included at runtime. This means that two queries
       may have identical compile-time keys with only the scan
       ranges being different due to different partition pruning.
    3. Each fragment instance processes different scan ranges, so
       each will have a unique cache key.
    
    To incorporate scan range information, this introduces a new
    per-fragment-instance cache key. At compile time, the planner
    now keeps track of which HdfsScanNodes feed into a TupleCacheNode.
    This is passed over to the runtime as a list of plan node ids
    that contain scan ranges. At runtime, the fragment instance
    walks through the list of plan nodes ids and hashes any scan ranges
    associated with them. This hash is the per-fragment-instance
    cache key. The combination of the compile-time cache key produced
    by the planner and the per-fragment-instance cache key is a unique
    identifier of the result.
    
    Deterministic scheduling for mt_dop was removed via IMPALA-9655
    with the introduction of the shared queue. This revives some of
    the pre-IMPALA-9655 scheduling logic as an option. Since the
    TupleCacheNode knows which HdfsScanNodes feed into it, the
    TupleCacheNode turns on deterministic scheduling for all of those
    HdfsScanNodes. Since this only applies to HdfsScanNodes that feed
    into a TupleCacheNode, it means that any HdfsScanNode that doesn't
    feed into a TupleCacheNode continues using the current algorithm.
    The pre-IMPALA-9655 code is modified to make it more deterministic
    about how it assigns scan ranges to instances.
    
    Testing:
     - Added custom cluster tests for the scan range information
       including modifying a table, selecting from a partitioned
       table, and verifying that fragment instances have unique
       keys
     - Added basic frontend test to verify that deterministic scheduling
       gets set on the HdfsScanNode that feed into the TupleCacheNode.
     - Restored the pre-IMPALA-9655 backend test to cover the LPT code
    
    Change-Id: Ibe298fff0f644ce931a2aa934ebb98f69aab9d34
    Reviewed-on: http://gerrit.cloudera.org:8080/21541
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Michael Smith <[email protected]>
    Reviewed-by: Yida Wu <[email protected]>
---
 be/src/exec/hdfs-scan-node-base.cc                 |  47 ++--
 be/src/exec/hdfs-scan-node-base.h                  |  10 +
 be/src/exec/hdfs-scan-node-mt.cc                   |  14 +-
 be/src/exec/tuple-cache-node.cc                    |  56 ++++-
 be/src/exec/tuple-cache-node.h                     |  15 +-
 be/src/scheduling/scheduler-test.cc                | 201 ++++++++++++++-
 be/src/scheduling/scheduler.cc                     | 157 +++++++++++-
 be/src/scheduling/scheduler.h                      |  14 +-
 common/thrift/PlanNodes.thrift                     |  15 +-
 .../impala/common/ThriftSerializationCtx.java      |  13 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  44 +++-
 .../org/apache/impala/planner/TupleCacheInfo.java  |  46 +++-
 .../org/apache/impala/planner/TupleCacheNode.java  |  27 ++-
 .../org/apache/impala/planner/TupleCacheTest.java  |  19 ++
 tests/custom_cluster/test_tuple_cache.py           | 270 ++++++++++++++++++++-
 15 files changed, 879 insertions(+), 69 deletions(-)

diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index 4ccd656a6..51046243c 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -254,7 +254,11 @@ Status 
HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
   int64_t total_splits = 0;
   const vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs =
       state->instance_ctx_pbs();
-  for (auto ctx : instance_ctx_pbs) {
+  auto instance_ctxs = state->instance_ctxs();
+  DCHECK_EQ(instance_ctxs.size(), instance_ctx_pbs.size());
+  for (int i = 0; i < instance_ctxs.size(); ++i) {
+    auto ctx = instance_ctx_pbs[i];
+    auto instance_ctx = instance_ctxs[i];
     auto ranges = ctx->per_node_scan_ranges().find(tnode_->node_id);
     if (ranges == ctx->per_node_scan_ranges().end()) continue;
     for (const ScanRangeParamsPB& params : ranges->second.scan_ranges()) {
@@ -307,6 +311,7 @@ Status 
HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
         file_desc->is_encrypted = split.is_encrypted();
         file_desc->is_erasure_coded = split.is_erasure_coded();
         file_desc->file_metadata = file_metadata;
+        file_desc->fragment_instance_id = instance_ctx->fragment_instance_id;
         if (file_metadata) {
           DCHECK(file_metadata->iceberg_metadata() != nullptr);
           switch (file_metadata->iceberg_metadata()->file_format()) {
@@ -375,22 +380,32 @@ Status 
HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat
   // Distribute the work evenly for issuing initial scan ranges.
   DCHECK(shared_state_.use_mt_scan_node_ || instance_ctx_pbs.size() == 1)
       << "Non MT scan node should only have a single instance.";
-  auto instance_ctxs = state->instance_ctxs();
-  DCHECK_EQ(instance_ctxs.size(), instance_ctx_pbs.size());
-  int files_per_instance = file_descs.size() / instance_ctxs.size();
-  int remainder = file_descs.size() % instance_ctxs.size();
-  int num_lists = min(file_descs.size(), instance_ctxs.size());
-  auto fd_it = file_descs.begin();
-  for (int i = 0; i < num_lists; ++i) {
-    vector<HdfsFileDesc*>* curr_file_list =
-        &shared_state_
-             
.file_assignment_per_instance_[instance_ctxs[i]->fragment_instance_id];
-    for (int j = 0; j < files_per_instance + (i < remainder); ++j) {
-      curr_file_list->push_back(fd_it->second);
-      ++fd_it;
+  if (tnode_->hdfs_scan_node.deterministic_scanrange_assignment) {
+    // If using deterministic scan range assignment, there is no need to 
rebalance
+    // the scan ranges. The scan ranges stay with their original fragment 
instance.
+    for (auto& fd : file_descs) {
+      const TUniqueId& instance_id = fd.second->fragment_instance_id;
+      
shared_state_.file_assignment_per_instance_[instance_id].push_back(fd.second);
+    }
+  } else {
+    // When not using the deterministic scan range assignment, the scan ranges 
are
+    // balanced round robin across fragment instances for the purpose of 
issuing
+    // initial scan ranges.
+    int files_per_instance = file_descs.size() / instance_ctxs.size();
+    int remainder = file_descs.size() % instance_ctxs.size();
+    int num_lists = min(file_descs.size(), instance_ctxs.size());
+    auto fd_it = file_descs.begin();
+    for (int i = 0; i < num_lists; ++i) {
+      vector<HdfsFileDesc*>* curr_file_list =
+          &shared_state_
+              
.file_assignment_per_instance_[instance_ctxs[i]->fragment_instance_id];
+      for (int j = 0; j < files_per_instance + (i < remainder); ++j) {
+        curr_file_list->push_back(fd_it->second);
+        ++fd_it;
+      }
     }
+    DCHECK(fd_it == file_descs.end());
   }
-  DCHECK(fd_it == file_descs.end());
   return Status::OK();
 }
 
@@ -470,6 +485,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const 
HdfsScanPlanNode& pno
     disks_accessed_bitmap_(TUnit::UNIT, 0),
     active_hdfs_read_thread_counter_(TUnit::UNIT, 0),
     shared_state_(const_cast<ScanRangeSharedState*>(&(pnode.shared_state_))),
+    deterministic_scanrange_assignment_(
+        hdfs_scan_node.deterministic_scanrange_assignment),
     file_metadata_utils_(this) {}
 
 HdfsScanNodeBase::~HdfsScanNodeBase() {}
diff --git a/be/src/exec/hdfs-scan-node-base.h 
b/be/src/exec/hdfs-scan-node-base.h
index 0517b1f3c..8802b7ff3 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -103,6 +103,10 @@ struct HdfsFileDesc {
   /// Whether file is erasure coded.
   bool is_erasure_coded = false;
 
+  /// Fragment instance assignment from the scheduler (used for maintaining
+  /// deterministic assignments when that mode is enabled).
+  TUniqueId fragment_instance_id;
+
   /// Some useful typedefs for creating HdfsFileDesc related data structures.
   /// This is a pair for partition ID and filename which uniquely identifies a 
file.
   typedef pair<int64_t, std::string> PartitionFileKey;
@@ -819,6 +823,12 @@ class HdfsScanNodeBase : public ScanNode {
   /// Pointer to the scan range related state that is shared across all node 
instances.
   ScanRangeSharedState* shared_state_ = nullptr;
 
+  /// Whether mt_dop uses deterministic scan range assignment
+  /// If true, each fragment instance has its own list of scan ranges.
+  /// If false, the fragment instances get scan ranges from the shared queue.
+  /// Not used for mt_dop=0.
+  bool deterministic_scanrange_assignment_;
+
   /// Utility class for handling file metadata.
   FileMetadataUtils file_metadata_utils_;
 
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 85aada3c8..1d3077d72 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -54,7 +54,10 @@ Status HdfsScanNodeMt::Open(RuntimeState* state) {
   ScopedOpenEventAdder ea(this);
   RETURN_IF_ERROR(HdfsScanNodeBase::Open(state));
   DCHECK(!initial_ranges_issued_.Load());
-  shared_state_->AddCancellationHook(state);
+  // The cancellation hook is only needed when using the shared queue
+  if (!deterministic_scanrange_assignment_) {
+    shared_state_->AddCancellationHook(state);
+  }
   RETURN_IF_ERROR(IssueInitialScanRanges(state));
   return Status::OK();
 }
@@ -150,6 +153,11 @@ Status HdfsScanNodeMt::AddDiskIoRanges(
       << "Don't call AddScanRanges() after all ranges finished.";
   DCHECK_GT(shared_state_->RemainingScanRangeSubmissions(), 0);
   DCHECK_GT(ranges.size(), 0);
+  if (deterministic_scanrange_assignment_) {
+    // Use independent scan ranges for different fragment instances
+    // Same code as non-mt-dop
+    return reader_context_->AddScanRanges(ranges, enqueue_location);
+  }
   bool at_front = false;
   if (enqueue_location == EnqueueLocation::HEAD) {
     at_front = true;
@@ -160,6 +168,10 @@ Status HdfsScanNodeMt::AddDiskIoRanges(
 
 Status HdfsScanNodeMt::GetNextScanRangeToRead(
     io::ScanRange** scan_range, bool* needs_buffers) {
+  // With deterministic scan ranges, use the same code as non-mt-dop
+  if (deterministic_scanrange_assignment_) {
+    return reader_context_->GetNextUnstartedRange(scan_range, needs_buffers);
+  }
   RETURN_IF_ERROR(shared_state_->GetNextScanRange(runtime_state_, scan_range));
   if (*scan_range != nullptr) {
     RETURN_IF_ERROR(reader_context_->StartScanRange(*scan_range, 
needs_buffers));
diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc
index fd5dea2be..c140c1b9d 100644
--- a/be/src/exec/tuple-cache-node.cc
+++ b/be/src/exec/tuple-cache-node.cc
@@ -25,6 +25,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-cache-mgr.h"
+#include "util/hash-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/runtime-profile.h"
 
@@ -41,8 +42,7 @@ Status TupleCachePlanNode::CreateExecNode(
 
 TupleCacheNode::TupleCacheNode(
     ObjectPool* pool, const TupleCachePlanNode& pnode, const DescriptorTbl& 
descs)
-    : ExecNode(pool, pnode, descs)
-    , subtree_hash_(pnode.tnode_->tuple_cache_node.subtree_hash) {
+  : ExecNode(pool, pnode, descs) {
 }
 
 TupleCacheNode::~TupleCacheNode() = default;
@@ -54,6 +54,14 @@ Status TupleCacheNode::Prepare(RuntimeState* state) {
       ADD_COUNTER(runtime_profile(), "NumTupleCacheHalted", TUnit::UNIT);
   num_skipped_counter_ =
       ADD_COUNTER(runtime_profile(), "NumTupleCacheSkipped", TUnit::UNIT);
+
+  // Compute the combined cache key by computing the fragment instance key and
+  // fusing it with the compile time key.
+  ComputeFragmentInstanceKey(state);
+  combined_key_ = plan_node().tnode_->tuple_cache_node.compile_time_key + "_" +
+      std::to_string(fragment_instance_key_);
+  runtime_profile()->AddInfoString("Combined Key", combined_key_);
+
   return Status::OK();
 }
 
@@ -69,7 +77,7 @@ Status TupleCacheNode::Open(RuntimeState* state) {
   }
 
   TupleCacheMgr* tuple_cache_mgr = ExecEnv::GetInstance()->tuple_cache_mgr();
-  handle_ = tuple_cache_mgr->Lookup(subtree_hash_, true);
+  handle_ = tuple_cache_mgr->Lookup(combined_key_, true);
   if (tuple_cache_mgr->IsAvailableForRead(handle_)) {
     reader_ = make_unique<TupleFileReader>(
         tuple_cache_mgr->GetPath(handle_), mem_tracker(), runtime_profile());
@@ -101,7 +109,7 @@ Status TupleCacheNode::Open(RuntimeState* state) {
       // - the query requests caching but cache is disabled via startup option
       // - another fragment is currently writing this cache entry
       // - the cache entry is a tombstone to prevent retries for too large 
entries
-      VLOG_FILE << "Tuple Cache: skipped for " << subtree_hash_;
+      VLOG_FILE << "Tuple Cache: skipped for " << combined_key_;
       COUNTER_ADD(num_skipped_counter_, 1);
       tuple_cache_mgr->IncrementMetric(TupleCacheMgr::MetricType::SKIPPED);
     }
@@ -161,7 +169,7 @@ Status TupleCacheNode::GetNext(
       // continue reading from the child node.
       if (!status.ok()) {
         if (writer_->ExceededMaxSize()) {
-          VLOG_FILE << "Tuple Cache entry for " << subtree_hash_
+          VLOG_FILE << "Tuple Cache entry for " << combined_key_
                     << " hit the maximum file size: " << status.GetDetail();
           COUNTER_ADD(num_halted_counter_, 1);
           tuple_cache_mgr->IncrementMetric(TupleCacheMgr::MetricType::HALTED);
@@ -230,9 +238,45 @@ void TupleCacheNode::Close(RuntimeState* state) {
 
 void TupleCacheNode::DebugString(int indentation_level, stringstream* out) 
const {
   *out << string(indentation_level * 2, ' ');
-  *out << "TupleCacheNode(" << subtree_hash_;
+  *out << "TupleCacheNode(" << combined_key_;
   ExecNode::DebugString(indentation_level, out);
   *out << ")";
 }
 
+void TupleCacheNode::ComputeFragmentInstanceKey(const RuntimeState* state) {
+  const PlanFragmentInstanceCtxPB& ctx = state->instance_ctx_pb();
+  uint32_t hash = 0;
+  for (int32_t node_id : 
plan_node().tnode_->tuple_cache_node.input_scan_node_ids) {
+    auto ranges = ctx.per_node_scan_ranges().find(node_id);
+    if (ranges == ctx.per_node_scan_ranges().end()) continue;
+    for (const ScanRangeParamsPB& params : ranges->second.scan_ranges()) {
+      // This only supports HDFS right now
+      DCHECK(params.scan_range().has_hdfs_file_split());
+      const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split();
+      if (split.has_relative_path() && !split.relative_path().empty()) {
+        hash = HashUtil::Hash(
+            split.relative_path().data(), split.relative_path().length(), 
hash);
+        DCHECK(split.has_partition_path_hash());
+        int32_t partition_path_hash = split.partition_path_hash();
+        hash = HashUtil::Hash(&partition_path_hash, 
sizeof(partition_path_hash), hash);
+      } else if (split.has_absolute_path() && !split.absolute_path().empty()) {
+        hash = HashUtil::Hash(
+            split.absolute_path().data(), split.absolute_path().length(), 
hash);
+      } else {
+        DCHECK("Either relative_path or absolute_path must be set");
+      }
+      DCHECK(split.has_offset());
+      int64_t offset = split.offset();
+      hash = HashUtil::Hash(&offset, sizeof(offset), hash);
+      DCHECK(split.has_length());
+      int64_t length = split.length();
+      hash = HashUtil::Hash(&length, sizeof(length), hash);
+      DCHECK(split.has_mtime());
+      int64_t mtime = split.mtime();
+      hash = HashUtil::Hash(&mtime, sizeof(mtime), hash);
+    }
+  }
+  fragment_instance_key_ = hash;
+}
+
 }
diff --git a/be/src/exec/tuple-cache-node.h b/be/src/exec/tuple-cache-node.h
index 772202125..19ff7eac6 100644
--- a/be/src/exec/tuple-cache-node.h
+++ b/be/src/exec/tuple-cache-node.h
@@ -35,7 +35,7 @@ class TupleCachePlanNode : public PlanNode {
 
 /// Node that caches rows produced by a child node.
 ///
-/// If the subtree_hash_ matches an existing cache entry, returns result rows 
from the
+/// If the combined_key_ matches an existing cache entry, returns result rows 
from the
 /// cache rather than from the child. Otherwise reads results from the child, 
writes them
 /// to cache, and returns them.
 
@@ -52,7 +52,14 @@ class TupleCacheNode : public ExecNode {
   void Close(RuntimeState* state) override;
   void DebugString(int indentation_level, std::stringstream* out) const 
override;
 private:
-  const std::string subtree_hash_;
+  // Fragment instance cache key. This is calculated at runtime by combining 
information
+  // about the input nodes for this fragment. It currently focuses on hashing 
the
+  // scan ranges from scan nodes. In future, it will need to handle exchanges.
+  uint32_t fragment_instance_key_;
+
+  // This is a string containing the compile time key and the 
fragment_instance_key_.
+  // This combination is unique for a given fragment instance.
+  std::string combined_key_;
 
   /// Number of results that were found in the tuple cache
   RuntimeProfile::Counter* num_hits_counter_ = nullptr;
@@ -68,6 +75,10 @@ private:
 
   void ReleaseResult();
 
+  // Construct the fragment instance part of the cache key by hashing 
information about
+  // inputs to this fragment (e.g. scan ranges).
+  void ComputeFragmentInstanceKey(const RuntimeState *state);
+
   /// Reader/Writer for caching
   TupleCacheMgr::UniqueHandle handle_;
   std::unique_ptr<TupleFileReader> reader_;
diff --git a/be/src/scheduling/scheduler-test.cc 
b/be/src/scheduling/scheduler-test.cc
index d29f4e7f7..b79728b1c 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -779,11 +779,11 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
 
   // Test handling of the single instance case - all ranges go to the same 
instance.
   vector<vector<ScanRangeParamsPB>> fs_one_instance =
-      Scheduler::AssignRangesToInstances(1, fs_ranges);
+      Scheduler::AssignRangesToInstances(1, &fs_ranges);
   ASSERT_EQ(1, fs_one_instance.size());
   EXPECT_EQ(NUM_RANGES, fs_one_instance[0].size());
   vector<vector<ScanRangeParamsPB>> kudu_one_instance =
-      Scheduler::AssignRangesToInstances(1, kudu_ranges);
+      Scheduler::AssignRangesToInstances(1, &kudu_ranges);
   ASSERT_EQ(1, kudu_one_instance.size());
   EXPECT_EQ(NUM_RANGES, kudu_one_instance[0].size());
 
@@ -791,7 +791,7 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
   for (int attempt = 0; attempt < 20; ++attempt) {
     std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
     vector<vector<ScanRangeParamsPB>> range_per_instance =
-        Scheduler::AssignRangesToInstances(NUM_RANGES, fs_ranges);
+        Scheduler::AssignRangesToInstances(NUM_RANGES, &fs_ranges);
     EXPECT_EQ(NUM_RANGES, range_per_instance.size());
     // Confirm each range is present and each instance got exactly one range.
     for (int i = 0; i < NUM_RANGES; ++i) {
@@ -804,7 +804,7 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
   for (int attempt = 0; attempt < 20; ++attempt) {
     std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
     vector<vector<ScanRangeParamsPB>> range_per_instance =
-        Scheduler::AssignRangesToInstances(4, fs_ranges);
+        Scheduler::AssignRangesToInstances(4, &fs_ranges);
     EXPECT_EQ(4, range_per_instance.size());
     for (int i = 0; i < range_per_instance.size(); ++i) {
       EXPECT_EQ(4, range_per_instance[i].size()) << i;
@@ -814,7 +814,7 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
   for (int attempt = 0; attempt < 20; ++attempt) {
     std::shuffle(kudu_ranges.begin(), kudu_ranges.end(), rng_);
     vector<vector<ScanRangeParamsPB>> range_per_instance =
-        Scheduler::AssignRangesToInstances(4, kudu_ranges);
+        Scheduler::AssignRangesToInstances(4, &kudu_ranges);
     EXPECT_EQ(4, range_per_instance.size());
     for (const auto& instance_ranges : range_per_instance) {
       EXPECT_EQ(4, instance_ranges.size());
@@ -824,4 +824,195 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
     }
   }
 }
+
+// This tests the pre-IMPALA-9655 LPT scheduling code that is now used for 
tuple caching
+// This is equivalent to the TestMultipleFinstances test before IMPALA-9655.
+TEST_F(SchedulerTest, TestMultipleFinstancesLPT) {
+  const int NUM_RANGES = 16;
+  std::vector<ScanRangeParamsPB> fs_ranges(NUM_RANGES);
+  std::vector<ScanRangeParamsPB> kudu_ranges(NUM_RANGES);
+  // Create ranges with lengths 1, 2, ..., etc.
+  for (int i = 0; i < NUM_RANGES; ++i) {
+    *fs_ranges[i].mutable_scan_range()->mutable_hdfs_file_split() = 
HdfsFileSplitPB();
+    fs_ranges[i].mutable_scan_range()->mutable_hdfs_file_split()->set_length(i 
+ 1);
+    kudu_ranges[i].mutable_scan_range()->set_kudu_scan_token("fake token");
+  }
+
+  // Test handling of the single instance case - all ranges go to the same 
instance.
+  vector<vector<ScanRangeParamsPB>> fs_one_instance =
+      Scheduler::AssignRangesToInstances(1, &fs_ranges, /* use_lpt */ true);
+  ASSERT_EQ(1, fs_one_instance.size());
+  EXPECT_EQ(NUM_RANGES, fs_one_instance[0].size());
+  vector<vector<ScanRangeParamsPB>> kudu_one_instance =
+      Scheduler::AssignRangesToInstances(1, &kudu_ranges, /* use_lpt */ true);
+  ASSERT_EQ(1, kudu_one_instance.size());
+  EXPECT_EQ(NUM_RANGES, kudu_one_instance[0].size());
+
+  // Ensure that each executor gets one range regardless of input order.
+  for (int attempt = 0; attempt < 20; ++attempt) {
+    std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
+    vector<vector<ScanRangeParamsPB>> range_per_instance =
+        Scheduler::AssignRangesToInstances(NUM_RANGES, &fs_ranges, /* use_lpt 
*/ true);
+    EXPECT_EQ(NUM_RANGES, range_per_instance.size());
+    // Confirm each range is present and each instance got exactly one range.
+    vector<int> range_length_count(NUM_RANGES);
+    for (const auto& instance_ranges : range_per_instance) {
+      ASSERT_EQ(1, instance_ranges.size());
+      
++range_length_count[instance_ranges[0].scan_range().hdfs_file_split().length()
+          - 1];
+    }
+    for (int i = 0; i < NUM_RANGES; ++i) {
+      EXPECT_EQ(1, range_length_count[i]) << i;
+    }
+  }
+
+  // Test load balancing FS ranges across 4 instances. We should get an even 
assignment
+  // across the instances regardless of input order.
+  for (int attempt = 0; attempt < 20; ++attempt) {
+    std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
+    vector<vector<ScanRangeParamsPB>> range_per_instance =
+        Scheduler::AssignRangesToInstances(4, &fs_ranges, /* use_lpt */ true);
+    EXPECT_EQ(4, range_per_instance.size());
+    // Ensure we got a range of each length in the output.
+    vector<int> range_length_count(NUM_RANGES);
+    for (const auto& instance_ranges : range_per_instance) {
+      EXPECT_EQ(4, instance_ranges.size());
+      int64_t instance_bytes = 0;
+      for (const auto& range : instance_ranges) {
+        instance_bytes += range.scan_range().hdfs_file_split().length();
+        ++range_length_count[range.scan_range().hdfs_file_split().length() - 
1];
+      }
+      // Expect each instance to get sum([1, 2, ..., 16]) / 4 bytes when 
things are
+      // distributed evenly.
+      EXPECT_EQ(34, instance_bytes);
+    }
+    for (int i = 0; i < NUM_RANGES; ++i) {
+      EXPECT_EQ(1, range_length_count[i]) << i;
+    }
+  }
+
+  // Test load balancing Kudu ranges across 4 instances. We should get an even 
assignment
+  // across the instances regardless of input order. We don't know the size of 
each Kudu
+  // range, so we just need to check the # of ranges.
+  for (int attempt = 0; attempt < 20; ++attempt) {
+    std::shuffle(kudu_ranges.begin(), kudu_ranges.end(), rng_);
+    vector<vector<ScanRangeParamsPB>> range_per_instance =
+        Scheduler::AssignRangesToInstances(4, &kudu_ranges, /* use_lpt */ 
true);
+    EXPECT_EQ(4, range_per_instance.size());
+    for (const auto& instance_ranges : range_per_instance) {
+      EXPECT_EQ(4, instance_ranges.size());
+      for (const auto& range : instance_ranges) {
+        EXPECT_TRUE(range.scan_range().has_kudu_scan_token());
+      }
+    }
+  }
+}
+
+// This tests the ScanRangeComparator to verify that it is consistent.
+TEST_F(SchedulerTest, TestScanRangeComparator) {
+  // Test comparisons for HDFS ranges
+  // Start with two ranges a and b that are identical
+  ScanRangeParamsPB a;
+  *a.mutable_scan_range()->mutable_hdfs_file_split() = HdfsFileSplitPB();
+  HdfsFileSplitPB* a_hdfs = a.mutable_scan_range()->mutable_hdfs_file_split();
+  a_hdfs->set_relative_path("aaaa.txt");
+  a_hdfs->set_offset(0);
+  a_hdfs->set_length(512);
+  a_hdfs->set_partition_id(10);
+  a_hdfs->set_file_length(1024);
+  a_hdfs->set_file_compression(CompressionTypePB::LZ4);
+  a_hdfs->set_mtime(12345);
+  a_hdfs->set_is_erasure_coded(false);
+  a_hdfs->set_partition_path_hash(11111);
+  a_hdfs->set_absolute_path("absolute_path");
+  a_hdfs->set_is_encrypted(false);
+
+  ScanRangeParamsPB b = a;
+  HdfsFileSplitPB* b_hdfs = b.mutable_scan_range()->mutable_hdfs_file_split();
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(a, b));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(b, a));
+
+  // Some fields don't matter, so changing them should have no influence on the
+  // comparator.
+  a_hdfs->set_partition_id(1);
+  a_hdfs->set_file_compression(CompressionTypePB::LZO);
+  a_hdfs->set_is_erasure_coded(true);
+  a_hdfs->set_is_encrypted(true);
+  a_hdfs->set_absolute_path("other absolute path");
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(a, b));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(b, a));
+
+  // ScanRangeComparator checks fields in a specific order to bail out early.
+  // This is testing them in the opposite order of their predence, starting 
with the
+  // lowest predence field and moving up to the highest predence field. Each 
field
+  // overrides the lower predence field, flipping the comparison.
+  // The last thing it compares is relative path. Make a > b based on relative 
path.
+  a_hdfs->set_relative_path("bbbb.txt");
+  EXPECT_TRUE(Scheduler::ScanRangeComparator(a, b));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(b, a));
+
+  // 2nd to last is partition_path_hash. Make b > a. This takes precedence 
over the
+  // relative_path.
+  b_hdfs->set_partition_path_hash(22222);
+  EXPECT_TRUE(Scheduler::ScanRangeComparator(b, a));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(a, b));
+
+  // 3rd to last is file_length. Make a > b.
+  a_hdfs->set_file_length(1025);
+  EXPECT_TRUE(Scheduler::ScanRangeComparator(a, b));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(b, a));
+
+  // 4th to last is offset. Make b > a.
+  b_hdfs->set_offset(1);
+  EXPECT_TRUE(Scheduler::ScanRangeComparator(b, a));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(a, b));
+
+  // 5th to last field checked is mtime. Make a > b.
+  a_hdfs->set_mtime(12346);
+  EXPECT_TRUE(Scheduler::ScanRangeComparator(a, b));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(b, a));
+
+  // Length is used by ScanRangeWeight and is the first field checked. Make b 
> a.
+  b_hdfs->set_length(513);
+  EXPECT_TRUE(Scheduler::ScanRangeComparator(b, a));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(a, b));
+
+  // Test comparison for Kudu ranges
+  ScanRangeParamsPB a_kudu;
+  a_kudu.mutable_scan_range()->set_kudu_scan_token("abc");
+  ScanRangeParamsPB b_kudu = a_kudu;
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(a_kudu, b_kudu));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(b_kudu, a_kudu));
+
+  // Set the kudu scan token to make a > b
+  a_kudu.mutable_scan_range()->set_kudu_scan_token("bcd");
+  EXPECT_TRUE(Scheduler::ScanRangeComparator(a_kudu, b_kudu));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(b_kudu, a_kudu));
+
+  // Test comparison for HBase ranges
+  ScanRangeParamsPB a_hbase;
+  *a_hbase.mutable_scan_range()->mutable_hbase_key_range() = HBaseKeyRangePB();
+  HBaseKeyRangePB* a_hbase_keyrange =
+      a_hbase.mutable_scan_range()->mutable_hbase_key_range();
+  a_hbase_keyrange->set_startkey("aaa");
+  a_hbase_keyrange->set_stopkey("fff");
+
+  ScanRangeParamsPB b_hbase = a_hbase;
+  HBaseKeyRangePB* b_hbase_keyrange =
+      b_hbase.mutable_scan_range()->mutable_hbase_key_range();
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(a_hbase, b_hbase));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(b_hbase, a_hbase));
+
+  // Again, set fields differently from the lowest precedence field to the
+  // highest predence field.
+  // The last field ScanRangeComparator compares is stopkey. Make a > b.
+  a_hbase_keyrange->set_stopkey("ggg");
+  EXPECT_TRUE(Scheduler::ScanRangeComparator(a_hbase, b_hbase));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(b_hbase, a_hbase));
+
+  // Set startkey so that b > a. This takes precedence over stopkey.
+  b_hbase_keyrange->set_startkey("bbb");
+  EXPECT_TRUE(Scheduler::ScanRangeComparator(b_hbase, a_hbase));
+  EXPECT_FALSE(Scheduler::ScanRangeComparator(a_hbase, b_hbase));
+}
 } // end namespace impala
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index e13c437e5..4373334ca 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -572,6 +572,103 @@ Status Scheduler::ComputeFragmentExecParams(const 
ExecutorConfig& executor_confi
   return CheckEffectiveInstanceCount(fragment_state, state);
 }
 
+/// Returns a numeric weight that is proportional to the estimated processing 
time for
+/// the scan range represented by 'params'. Weights from different scan node
+/// implementations, e.g. FS vs Kudu, are not comparable.
+static int64_t ScanRangeWeight(const ScanRangeParamsPB& params) {
+  if (params.scan_range().has_hdfs_file_split()) {
+    return params.scan_range().hdfs_file_split().length();
+  } else {
+    // Give equal weight to each Kudu and Hbase split.
+    // TODO: implement more accurate logic for Kudu and Hbase
+    return 1;
+  }
+}
+
+bool Scheduler::ScanRangeComparator(const ScanRangeParamsPB& a,
+    const ScanRangeParamsPB& b) {
+  // We are ordering by weight largest to smallest. Return quickly if the 
weights are
+  // different.
+  int64_t a_weight = ScanRangeWeight(a), b_weight = ScanRangeWeight(b);
+  if (a_weight != b_weight) return a_weight > b_weight;
+  // The weights are the same, so we need to break ties to make this 
deterministic.
+  if (a.scan_range().has_hdfs_file_split()) {
+    // HDFS scan ranges should be compared against other HDFS scan ranges.
+    if (!b.scan_range().has_hdfs_file_split()) {
+      DCHECK(false) << "HDFS scan ranges can only be compared against other 
HDFS ranges";
+      return false;
+    }
+    // Break ties by comparing various fields of the HDFS split. The ordering 
here is
+    // arbitrary, so this starts with cheap checks and moves to more expensive 
checks.
+    const HdfsFileSplitPB& a_hdfs_split = a.scan_range().hdfs_file_split();
+    const HdfsFileSplitPB& b_hdfs_split = b.scan_range().hdfs_file_split();
+    if (a_hdfs_split.mtime() != b_hdfs_split.mtime()) {
+      return a_hdfs_split.mtime() > b_hdfs_split.mtime();
+    }
+    if (a_hdfs_split.offset() != b_hdfs_split.offset()) {
+      return a_hdfs_split.offset() > b_hdfs_split.offset();
+    }
+    if (a_hdfs_split.file_length() != b_hdfs_split.file_length()) {
+      return a_hdfs_split.file_length() > b_hdfs_split.file_length();
+    }
+    if (a_hdfs_split.partition_path_hash() != 
b_hdfs_split.partition_path_hash()) {
+      return a_hdfs_split.partition_path_hash() > 
b_hdfs_split.partition_path_hash();
+    }
+    if (a_hdfs_split.relative_path() != b_hdfs_split.relative_path()) {
+      return a_hdfs_split.relative_path() > b_hdfs_split.relative_path();
+    }
+  } else if (a.scan_range().has_kudu_scan_token()) {
+    // Kudu scan ranges should be compared against other Kudu scan ranges
+    if (!b.scan_range().has_kudu_scan_token()) {
+      DCHECK(false) << "Kudu scan ranges can only be compared against other 
Kudu ranges";
+      return false;
+    }
+    // Break ties by comparing the kudu scan token
+    return a.scan_range().kudu_scan_token() > b.scan_range().kudu_scan_token();
+  } else if (a.scan_range().has_hbase_key_range()) {
+    // HBase scan ranges should be compared against other HBase scan ranges
+    if (!b.scan_range().has_hbase_key_range()) {
+      DCHECK(false)
+          << "HBase scan ranges can only be compared against other HBase 
ranges";
+      return false;
+    }
+    const HBaseKeyRangePB& a_hbase_range = a.scan_range().hbase_key_range();
+    const HBaseKeyRangePB& b_hbase_range = b.scan_range().hbase_key_range();
+    if (a_hbase_range.startkey() != b_hbase_range.startkey()) {
+      return a_hbase_range.startkey() > b_hbase_range.startkey();
+    }
+    if (a_hbase_range.stopkey() != b_hbase_range.stopkey()) {
+      return a_hbase_range.stopkey() > b_hbase_range.stopkey();
+    }
+  }
+  return false;
+}
+
+/// Helper class used in CreateScanInstances() to track the amount of work 
assigned
+/// to each instance so far.
+struct InstanceAssignment {
+  // The weight assigned so far.
+  int64_t weight;
+
+  // The index of the instance in 'per_instance_ranges'
+  int instance_idx;
+
+  // Comparator for use in a heap as part of the longest processing time algo.
+  // Invert the comparison order because the *_heap functions implement a 
max-heap
+  // and we want to assign to the least-loaded instance first.
+  bool operator<(InstanceAssignment& other) const {
+    if (weight == other.weight) {
+      // To make this deterministic, break ties by comparing the instance idxs
+      // (which are unique). Like the weight, this is also inverted to put the 
lower
+      // indexes first as this is a max heap. That matches the order that we 
use when
+      // constructing the initial list, so there is no need to call 
make_heap().
+      return instance_idx > other.instance_idx;
+    } else {
+      return weight > other.weight;
+    }
+  }
+};
+
 // Maybe the easiest way to understand the objective of this algorithm is as a
 // generalization of two simpler instance creation algorithms that decide how 
many
 // instances of a fragment to create on each node, given a set of nodes that 
were
@@ -694,8 +791,22 @@ void Scheduler::CreateCollocatedAndScanInstances(const 
ExecutorConfig& executor_
       if (assignment_it == sra.end()) continue;
       auto scan_ranges_it = assignment_it->second.find(scan_node_id);
       if (scan_ranges_it == assignment_it->second.end()) continue;
+      const TPlanNode& scan_node = state->GetNode(scan_node_id);
+      // For mt_dop, there are two scheduling modes. The first is the normal 
mode
+      // that uses a shared queue. For that mode, there is no reason to do 
anything
+      // special about assigning scan ranges to fragment instances, because 
they will
+      // all be placed in a single queue at runtime. The other is deterministic
+      // scheduling that does not use the shared queue. In this mode, no 
rebalancing
+      // takes place at runtime, so balancing the scan ranges among the 
fragment
+      // instances is important. For this mode, use the longest processing 
time (LPT)
+      // algorithm. The deterministic mode is used for tuple caching.
+      bool use_lpt = scan_node.__isset.hdfs_scan_node
+          && scan_node.hdfs_scan_node.__isset.use_mt_scan_node
+          && scan_node.hdfs_scan_node.use_mt_scan_node
+          && 
scan_node.hdfs_scan_node.__isset.deterministic_scanrange_assignment
+          && scan_node.hdfs_scan_node.deterministic_scanrange_assignment;
       per_scan_per_instance_ranges.back() =
-          AssignRangesToInstances(max_num_instances, scan_ranges_it->second);
+          AssignRangesToInstances(max_num_instances, &scan_ranges_it->second, 
use_lpt);
       DCHECK_LE(per_scan_per_instance_ranges.back().size(), max_num_instances);
     }
 
@@ -746,18 +857,48 @@ void Scheduler::CreateCollocatedAndScanInstances(const 
ExecutorConfig& executor_
 }
 
 vector<vector<ScanRangeParamsPB>> Scheduler::AssignRangesToInstances(
-    int max_num_instances, vector<ScanRangeParamsPB>& ranges) {
+    int max_num_instances, vector<ScanRangeParamsPB>* ranges, bool use_lpt) {
   DCHECK_GT(max_num_instances, 0);
-  int num_instances = min(max_num_instances, static_cast<int>(ranges.size()));
+  int num_instances = min(max_num_instances, static_cast<int>(ranges->size()));
   vector<vector<ScanRangeParamsPB>> per_instance_ranges(num_instances);
   if (num_instances < 2) {
     // Short-circuit the assignment algorithm for the single instance case.
-    per_instance_ranges[0] = ranges;
+    per_instance_ranges[0] = *ranges;
   } else {
-    int idx = 0;
-    for (auto& range : ranges) {
-      per_instance_ranges[idx].push_back(range);
-      idx = (idx + 1 == num_instances) ? 0 : idx + 1;
+    if (use_lpt) {
+      // We need to assign scan ranges to instances. We would like the 
assignment to be
+      // as even as possible, so that each instance does about the same amount 
of work.
+      // Use longest-processing time (LPT) algorithm, which is a good 
approximation of the
+      // optimal solution (there is a theoretic bound of ~4/3 of the optimal 
solution
+      // in the worst case). It also guarantees that at least one scan range 
is assigned
+      // to each instance.
+      // The LPT algorithm is straightforward:
+      // 1. sort the scan ranges to be assigned by descending weight.
+      // 2. assign each item to the instance with the least weight assigned so 
far.
+      vector<InstanceAssignment> instance_heap;
+      instance_heap.reserve(num_instances);
+      for (int i = 0; i < num_instances; ++i) {
+        instance_heap.emplace_back(InstanceAssignment{0, i});
+      }
+      // The instance_heap vector was created in sorted order, so this is 
already a heap
+      // without needing a make_heap() call.
+      DCHECK(std::is_heap(instance_heap.begin(), instance_heap.end()));
+      std::sort(ranges->begin(), ranges->end(), ScanRangeComparator);
+      for (ScanRangeParamsPB& range : *ranges) {
+        per_instance_ranges[instance_heap[0].instance_idx].push_back(range);
+        instance_heap[0].weight += ScanRangeWeight(range);
+        pop_heap(instance_heap.begin(), instance_heap.end());
+        push_heap(instance_heap.begin(), instance_heap.end());
+      }
+    } else {
+      // When not using LPT, a simple round-robin is sufficient. The use case 
for non-LPT
+      // is when the ranges will be placed in a shared queue anyway, so the 
balancing
+      // is not as crucial.
+      int idx = 0;
+      for (auto& range : *ranges) {
+        per_instance_ranges[idx].push_back(range);
+        idx = (idx + 1 == num_instances) ? 0 : idx + 1;
+      }
     }
   }
   return per_instance_ranges;
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 9abcab40b..d50142ac0 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -410,8 +410,11 @@ class Scheduler {
   /// to at most 'max_num_instances' fragment instances running on the same 
host.
   /// 'max_num_ranges' must be positive. Only returns non-empty vectors: if 
there are not
   /// enough ranges to create 'max_num_instances', fewer instances are 
assigned ranges.
+  /// 'use_lpt' determines whether this assigns scan ranges via the Longest 
Processing
+  /// Time algorithm. If false, this uses round-robin (which is cheaper).
   static std::vector<std::vector<ScanRangeParamsPB>> AssignRangesToInstances(
-      int max_num_instances, std::vector<ScanRangeParamsPB>& ranges);
+      int max_num_instances, std::vector<ScanRangeParamsPB>* ranges,
+      bool use_lpt = false);
 
   /// For each instance of fragment_state's input fragment, create a collocated
   /// instance for fragment_state's fragment.
@@ -482,12 +485,21 @@ class Scheduler {
             > state->query_options().max_fs_writers);
   }
 
+  /// Comparator to order scan ranges for scheduling. This uses 
ScanRangeWeight(), but it
+  /// also compares other fields so that it is deterministic for scan ranges 
with the
+  /// same weight. This comparator only works when both scan ranges have the 
same storage
+  /// type: HDFS vs HDFS, Kudu vs Kudu, HBase vs HBase. Otherwise, it asserts.
+  static bool ScanRangeComparator(const ScanRangeParamsPB& a,
+      const ScanRangeParamsPB& b);
+
   friend class impala::test::SchedulerWrapper;
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached);
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached);
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomDiskLocal);
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomRemote);
   FRIEND_TEST(SchedulerTest, TestMultipleFinstances);
+  FRIEND_TEST(SchedulerTest, TestMultipleFinstancesLPT);
+  FRIEND_TEST(SchedulerTest, TestScanRangeComparator);
 };
 
 }
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index fa798c721..ba50fb9a7 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -350,6 +350,11 @@ struct THdfsScanNode {
   // For IcebergScanNodes that are the left children of an IcebergDeleteNode 
this stores
   // the node ID of the right child.
   14: optional Types.TPlanNodeId deleteFileScanNodeId
+
+  // Whether mt_dop should use deterministic scan range assignment. If true,
+  // each fragment instance has its own list of scan ranges. If false,
+  // the fragment instances use a shared queue.
+  15: optional bool deterministic_scanrange_assignment
 }
 
 struct TDataSourceScanNode {
@@ -711,9 +716,13 @@ struct TIcebergMetadataScanNode {
 }
 
 struct TTupleCacheNode {
-  // Cache key that includes a hashed representation of the entire subtree 
below
-  // this point in the plan.
-  1: required string subtree_hash
+  // Compile-time cache key that includes a hashed representation of the 
entire subtree
+  // below this point in the plan.
+  1: required string compile_time_key
+  // To calculate the per-fragment key, this keeps track of scan nodes that 
feed
+  // into this node. The TupleCacheNode will hash the scan ranges for its 
fragment
+  // at runtime.
+  2: required list<i32> input_scan_node_ids;
 }
 
 // See PipelineMembership in the frontend for details.
diff --git 
a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java 
b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
index af4a508a1..be39266ca 100644
--- a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
+++ b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java
@@ -19,8 +19,8 @@ package org.apache.impala.common;
 
 import org.apache.impala.analysis.SlotId;
 import org.apache.impala.analysis.TupleId;
-import org.apache.impala.catalog.FeTable;
 import org.apache.impala.planner.TupleCacheInfo;
+import org.apache.impala.planner.HdfsScanNode;
 
 /**
  * The Thrift serialization functions need to adjust output based on whether 
the
@@ -80,13 +80,14 @@ public class ThriftSerializationCtx {
   }
 
   /**
-   * registerTable() should be called for any table that is referenced from a 
PlanNode
-   * that participates in tuple caching. In practice, this is only used for 
HDFS tables
-   * at the moment.
+   * registerInputScanNode() is used to keep track of which HdfsScanNodes feed 
into a
+   * particular location for tuple caching. Tuple caching only supports HDFS 
tables at
+   * the moment, so this is limited to HdfsScanNode. See TupleCacheInfo for 
more
+   * information about how this is used.
    */
-  public void registerTable(FeTable table) {
+  public void registerInputScanNode(HdfsScanNode hdfsScanNode) {
     if (isTupleCache()) {
-      tupleCacheInfo_.registerTable(table);
+      tupleCacheInfo_.registerInputScanNode(hdfsScanNode);
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 593168bc6..f5a52f3ac 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -280,6 +280,11 @@ public class HdfsScanNode extends ScanNode {
   // Set in computeNodeResourceProfile().
   private boolean useMtScanNode_;
 
+  // True if this scan node should do deterministic assignment of scan ranges 
to
+  // fragment instances when using mt_dop. If false, this uses the shared 
queue.
+  // This has no impact on mt_dop=0.
+  private boolean deterministicScanRangeAssignment_ = false;
+
   // True if this is a scan that only returns partition keys and is only 
required to
   // return at least one of each of the distinct values of the partition keys.
   private final boolean isPartitionKeyScan_;
@@ -1930,8 +1935,8 @@ public class HdfsScanNode extends ScanNode {
   protected void toThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) {
     msg.hdfs_scan_node = new THdfsScanNode(serialCtx.translateTupleId(
         desc_.getId()).asInt(), new HashSet<>());
-    // Register the table for this scan node so tuple caching knows about it.
-    serialCtx.registerTable(desc_.getTable());
+    // Register this scan node as an input for tuple caching.
+    serialCtx.registerInputScanNode(this);
     if (replicaPreference_ != null) {
       msg.hdfs_scan_node.setReplica_preference(replicaPreference_);
     }
@@ -1951,6 +1956,14 @@ public class HdfsScanNode extends ScanNode {
       msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_);
     }
     msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_);
+    // The reason we skip setting the deterministic scan range assignment 
field for
+    // tuple cache is that has not been set yet at this point. At the point we 
are
+    // computing the tuple cache key, it is always false and has no 
information.
+    // Even if it was set, it would not tell us anything about the result set.
+    if (!serialCtx.isTupleCache()) {
+      msg.hdfs_scan_node.setDeterministic_scanrange_assignment(
+          deterministicScanRangeAssignment_);
+    }
     Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == 
null));
     if (countStarSlot_ != null) {
       
msg.hdfs_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset());
@@ -1977,8 +1990,12 @@ public class HdfsScanNode extends ScanNode {
     }
     msg.hdfs_scan_node.setIs_partition_key_scan(isPartitionKeyScan_);
 
-    for (HdfsFileFormat format : fileFormats_) {
-      msg.hdfs_scan_node.addToFile_formats(format.toThrift());
+    // Since the tuple cache will hash information about the scan ranges at 
runtime,
+    // it is not necessary to include the file formats in the hash.
+    if (!serialCtx.isTupleCache()) {
+      for (HdfsFileFormat format : fileFormats_) {
+        msg.hdfs_scan_node.addToFile_formats(format.toThrift());
+      }
     }
 
     if (!overlapPredicateDescs_.isEmpty()) {
@@ -2044,6 +2061,15 @@ public class HdfsScanNode extends ScanNode {
             0, PrintUtils.printBytes(0)));
       }
 
+      // Add information about whether this uses deterministic scan range 
scheduling
+      // To avoid polluting the explain output, only add this if mt_dop>0 and
+      // deterministic scan range scheduling is enabled.
+      if (useMtScanNode_ && deterministicScanRangeAssignment_) {
+        output.append(detailPrefix)
+          .append(String.format("deterministic scan range assignment: %b\n",
+              deterministicScanRangeAssignment_));
+      }
+
       if (!conjuncts_.isEmpty()) {
         output.append(detailPrefix)
           .append(String.format("predicates: %s\n",
@@ -2669,4 +2695,14 @@ public class HdfsScanNode extends ScanNode {
 
   @Override
   public boolean isTupleCachingImplemented() { return true; }
+
+  public void setDeterministicScanRangeAssignment(boolean enabled) {
+    // Deterministic scan range assignment only applies when using mt_dop,
+    // but we set it unconditionally for simplicity.
+    deterministicScanRangeAssignment_ = enabled;
+  }
+
+  public boolean usesDeterministicScanRangeAssignment() {
+    return deterministicScanRangeAssignment_;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
index 63f867cb2..fa065fa29 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java
@@ -17,8 +17,10 @@
 
 package org.apache.impala.planner;
 
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -107,6 +109,15 @@ public class TupleCacheInfo {
   private final IdGenerator<SlotId> translatedSlotIdGenerator_ =
       SlotId.createGenerator();
 
+  // This tracks all the HdfsScanNodes that are inputs. This is used for 
several
+  // purposes:
+  // 1. Input scan nodes need to use deterministic scan range scheduling.
+  // 2. At runtime, the tuple cache needs to hash the input scan ranges, so 
this
+  //    provides information about which scan nodes feed in.
+  // 3. In future, when tuple caching moves past exchanges, the exchange will 
need
+  //    to hash the scan ranges of input scan nodes to generate the key.
+  private final List<HdfsScanNode> inputScanNodes_ = new 
ArrayList<HdfsScanNode>();
+
   // These fields accumulate partial results until finalizeHash() is called.
   private Hasher hasher_ = Hashing.murmur3_128().newHasher();
 
@@ -181,6 +192,9 @@ public class TupleCacheInfo {
       // and each contribution would be clear.
       hashTraceBuilder_.append(child.getHashTrace());
 
+      // Merge the child's inputScanNodes_
+      inputScanNodes_.addAll(child.inputScanNodes_);
+
       // Incorporate the child's tuple references. This is creating a new 
translation
       // of TupleIds, because it will be incorporating multiple children.
       for (TupleId id : child.tupleTranslationMap_.keySet()) {
@@ -241,6 +255,9 @@ public class TupleCacheInfo {
       tupleTranslationMap_.put(id, translatedTupleIdGenerator_.getNextId());
 
       TupleDescriptor tupleDesc = descriptorTable_.getTupleDesc(id);
+      // This matches the behavior of DescriptorTable::toThrift() and skips
+      // non-materialized tuple descriptors. See comment in 
DescriptorTable::toThrift().
+      if (!tupleDesc.isMaterialized()) return;
       if (incorporateIntoHash) {
         // Incorporate the tupleDescriptor into the hash
         boolean needs_table_id =
@@ -250,8 +267,9 @@ public class TupleCacheInfo {
         hashThrift(thriftTupleDesc);
       }
 
-      // Go through the tuple's slots and add them
-      for (SlotDescriptor slotDesc : tupleDesc.getSlots()) {
+      // Go through the tuple's slots and add them. This matches the behavior 
of
+      // DescriptorTable::toThrift() and only serializes the materialized 
slots.
+      for (SlotDescriptor slotDesc : tupleDesc.getMaterializedSlots()) {
         // Assign a translated slot id and it to the map
         slotTranslationMap_.put(slotDesc.getId(), 
translatedSlotIdGenerator_.getNextId());
 
@@ -274,7 +292,7 @@ public class TupleCacheInfo {
    * designed to be called by scan nodes via the ThriftSerializationCtx. In 
future,
    * this will store information about the table's scan ranges.
    */
-  public void registerTable(FeTable tbl) {
+  private void registerTable(FeTable tbl) {
     Preconditions.checkState(!(tbl instanceof FeView),
         "registerTable() only applies to base tables");
     Preconditions.checkState(tbl != null, "Invalid null argument to 
registerTable()");
@@ -284,6 +302,28 @@ public class TupleCacheInfo {
     hashThrift(tblName);
   }
 
+  /**
+   * registerInputScanNode() is used to keep track of which HdfsScanNodes feed 
into a
+   * particular location for tuple caching. Tuple caching only supports HDFS 
tables at
+   * the moment, so this is limited to HdfsScanNode. Tuple caching uses this 
for
+   * multiple things:
+   * 1. HdfsScanNodes that feed into a TupleCacheNode need to be marked to use
+   *    deterministic scheduling.
+   * 2. Each fragment instance needs to construct the fragment instance 
specific key
+   *    based on the scan ranges it will process. To construct that, it needs 
to know
+   *    which HdfsScanNodes feed into it.
+   * 3. There will be future uses when tuple caching extends past exchanges.
+   *
+   * Since this has all the information needed, it also calls registerTable() 
under
+   * the covers.
+   */
+  public void registerInputScanNode(HdfsScanNode hdfsScanNode) {
+    registerTable(hdfsScanNode.getTupleDesc().getTable());
+    inputScanNodes_.add(hdfsScanNode);
+  }
+
+  public List<HdfsScanNode> getInputScanNodes() { return inputScanNodes_; }
+
   /**
    * getLocalTupleId() converts a global TupleId to a local TupleId (i.e an id 
that is
    * not influenced by the structure of the rest of the query). Most users 
should access
diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java 
b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
index 1ef7f944d..d93145668 100644
--- a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java
@@ -17,6 +17,10 @@
 
 package org.apache.impala.planner;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.thrift.TTupleCacheNode;
@@ -37,8 +41,9 @@ import com.google.common.hash.HashCode;
  */
 public class TupleCacheNode extends PlanNode {
 
-  protected String subtreeHash_;
+  protected String compileTimeKey_;
   protected String hashTrace_;
+  protected final List<Integer> inputScanNodeIds_ = new ArrayList<Integer>();
 
   public TupleCacheNode(PlanNodeId id, PlanNode child) {
     super(id, "TUPLE CACHE");
@@ -48,8 +53,13 @@ public class TupleCacheNode extends PlanNode {
 
     TupleCacheInfo childCacheInfo = child.getTupleCacheInfo();
     Preconditions.checkState(childCacheInfo.isEligible());
-    subtreeHash_ = childCacheInfo.getHashString();
+    compileTimeKey_ = childCacheInfo.getHashString();
     hashTrace_ = childCacheInfo.getHashTrace();
+    for (HdfsScanNode scanNode : childCacheInfo.getInputScanNodes()) {
+      // Inputs into the tuple cache need to use deterministic scan range 
assignment
+      scanNode.setDeterministicScanRangeAssignment(true);
+      inputScanNodeIds_.add(scanNode.getId().asInt());
+    }
   }
 
   @Override
@@ -75,7 +85,8 @@ public class TupleCacheNode extends PlanNode {
     Preconditions.checkState(!hasLimit(),
         "TupleCacheNode does not enforce limits itself and cannot have a limit 
set.");
     TTupleCacheNode tupleCacheNode = new TTupleCacheNode();
-    tupleCacheNode.setSubtree_hash(subtreeHash_);
+    tupleCacheNode.setCompile_time_key(compileTimeKey_);
+    tupleCacheNode.setInput_scan_node_ids(inputScanNodeIds_);
     msg.setTuple_cache_node(tupleCacheNode);
   }
 
@@ -101,22 +112,24 @@ public class TupleCacheNode extends PlanNode {
       TExplainLevel detailLevel) {
     StringBuilder output = new StringBuilder();
     output.append(String.format("%s%s:%s\n", prefix, id_.toString(), 
displayName_));
-    output.append(detailPrefix + "cache key: " + subtreeHash_ + "\n");
+    output.append(detailPrefix + "cache key: " + compileTimeKey_ + "\n");
 
     // For debuggability, always print the hash trace until the cache key 
calculation
     // matures. Print trace in chunks to avoid excessive wrapping and padding 
in
     // impala-shell. There are other explain lines at VERBOSE level that are
     // over 100 chars long so we limit the key chunk length similarly here.
     final int keyFormatWidth = 100;
-    for(int idx = 0; idx < hashTrace_.length(); idx += keyFormatWidth) {
+    for (int idx = 0; idx < hashTrace_.length(); idx += keyFormatWidth) {
       int stop_idx = Math.min(hashTrace_.length(), idx + keyFormatWidth);
       output.append(detailPrefix + "[" + hashTrace_.substring(idx, stop_idx) + 
"]\n");
     }
+    List<String> input_scan_node_ids_strs =
+        
inputScanNodeIds_.stream().map(Object::toString).collect(Collectors.toList());
+    output.append(detailPrefix + "input scan node ids: " +
+        String.join(",", input_scan_node_ids_strs) + "\n");
     return output.toString();
   }
 
-  public String getSubtreeHash() { return subtreeHash_; }
-
   @Override
   public void computeProcessingCost(TQueryOptions queryOptions) {
     processingCost_ = ProcessingCost.basicCost(getDisplayLabel(), 
getCardinality(), 0);
diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java 
b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
index 40424f662..1bce808b6 100644
--- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java
@@ -151,6 +151,25 @@ public class TupleCacheTest extends PlannerTestBase {
         "functional.alltypes build where probe1.id = build.id");
   }
 
+  @Test
+  public void testDeterministicScheduling() {
+    // Verify that the HdfsScanNode that feeds into a TupleCacheNode uses 
deterministic
+    // scan range scheduling. When there are more ways for locations to be 
cache
+    // ineligible, this test will be expanded to cover the case where scan 
nodes don't
+    // use deterministic scheduling.
+    List<PlanNode> cacheEligibleNodes =
+        getCacheEligibleNodes("select id from functional.alltypes where 
int_col = 500");
+    for (PlanNode node : cacheEligibleNodes) {
+      // The HdfsScanNode for this query will have determinstic scan range 
assignment set
+      // This test uses mt_dop=0, so the value wouldn't matter for execution, 
but it
+      // still verifies that it is set properly.
+      if (node instanceof HdfsScanNode) {
+        HdfsScanNode hdfsScanNode = (HdfsScanNode) node;
+        assertTrue(hdfsScanNode.usesDeterministicScanRangeAssignment());
+      }
+    }
+  }
+
   protected List<PlanNode> getCacheEligibleNodes(String query) {
     List<PlanFragment> plan = getPlan(query);
     PlanNode planRoot = plan.get(0).getPlanRoot();
diff --git a/tests/custom_cluster/test_tuple_cache.py 
b/tests/custom_cluster/test_tuple_cache.py
index af65655af..df5a3aea3 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -22,8 +22,10 @@ import random
 import string
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.test_vector import ImpalaTestDimension
 
 TABLE_LAYOUT = 'name STRING, age INT, address STRING'
+CACHE_START_ARGS = "--tuple_cache_dir=/tmp --log_level=2"
 
 
 # Generates a random table entry of at least 15 bytes.
@@ -42,15 +44,27 @@ def assertCounters(profile, num_hits, num_halted, 
num_skipped):
   assert "NumTupleCacheSkipped: {0} ".format(num_skipped) in profile
 
 
-class TestTupleCache(CustomClusterTestSuite):
+def get_cache_keys(profile):
+  cache_keys = []
+  for line in profile.splitlines():
+    if "Combined Key:" in line:
+      key = line.split(":")[1].strip()
+      cache_keys.append(key)
+  return cache_keys
+
+
+def assert_deterministic_scan(profile):
+  assert "deterministic scan range assignment: true" in profile
+
+
+class TestTupleCacheBase(CustomClusterTestSuite):
   @classmethod
   def get_workload(cls):
     return 'functional-query'
 
-  CACHE_START_ARGS = "--tuple_cache_dir=/tmp --log_level=2"
-
-  def cached_query(self, query):
-    return self.execute_query(query, {"ENABLE_TUPLE_CACHE": "TRUE", "MT_DOP": 
"1"})
+  def cached_query(self, query, mt_dop=1):
+    return self.execute_query(query,
+        {"ENABLE_TUPLE_CACHE": "TRUE", "MT_DOP": str(mt_dop)})
 
   def cached_query_w_debugaction(self, query, debugaction):
     query_opts = {
@@ -63,10 +77,20 @@ class TestTupleCache(CustomClusterTestSuite):
   # Generates a table containing at least <scale> KB of data.
   def create_table(self, fq_table, scale=1):
     self.cached_query("CREATE TABLE {0} ({1})".format(fq_table, TABLE_LAYOUT))
+    # To make the rows distinct, we keep using a different seed for table_value
+    global_index = 0
     for _ in range(scale):
-      values = [table_value(i) for i in range(70)]
+      values = [table_value(i) for i in range(global_index, global_index + 70)]
       self.cached_query("INSERT INTO {0} VALUES ({1})".format(
           fq_table, "), (".join(values)))
+      global_index += 70
+
+  # Helper function to get a tuple cache metric from a single impalad.
+  def get_tuple_cache_metric(self, impalaservice, suffix):
+    return impalaservice.get_metric_value('impala.tuple-cache.' + suffix)
+
+
+class TestTupleCache(TestTupleCacheBase):
 
   @CustomClusterTestSuite.with_args(cluster_size=1)
   @pytest.mark.execute_serially
@@ -150,7 +174,8 @@ class TestTupleCache(CustomClusterTestSuite):
     # Case 1: fail during Open()
     result = self.cached_query_w_debugaction(query, 
"TUPLE_FILE_READER_OPEN:[email protected]")
     assert result.success
-    assert result.data == result1.data
+    # Do an unordered compare (the rows are unique)
+    assert set(result.data) == set(result1.data)
     # Not a hit
     assertCounters(result.runtime_profile, num_hits=0, num_halted=0, 
num_skipped=1)
 
@@ -158,7 +183,8 @@ class TestTupleCache(CustomClusterTestSuite):
     result = self.cached_query_w_debugaction(query,
         "TUPLE_FILE_READER_FIRST_GETNEXT:[email protected]")
     assert result.success
-    assert result.data == result1.data
+    # Do an unordered compare (the rows are unique)
+    assert set(result.data) == set(result1.data)
     # Technically, this is a hit
     assertCounters(result.runtime_profile, num_hits=1, num_halted=0, 
num_skipped=0)
 
@@ -173,3 +199,231 @@ class TestTupleCache(CustomClusterTestSuite):
       hit_error = True
 
     assert hit_error
+
+
+class TestTupleCacheRuntimeKeys(TestTupleCacheBase):
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestTupleCacheRuntimeKeys, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('mt_dop', *[0, 1, 
2]))
+
+  @CustomClusterTestSuite.with_args(
+    start_args=CACHE_START_ARGS, cluster_size=1)
+  @pytest.mark.execute_serially
+  def test_scan_range_basics(self, vector, unique_database):
+    """
+    This tests that adding/removing files to a table results in different keys.
+    This runs on a single node with mt_dop=0 or mt_dop=1, so it is the simplest
+    test.
+    """
+    mt_dop = vector.get_value('mt_dop')
+    # To keep this simple, we skip mt_dop > 1.
+    if mt_dop > 1:
+      pytest.skip()
+    fq_table = "{0}.scan_range_basics_mtdop{1}".format(unique_database, mt_dop)
+    query = "SELECT * from {0}".format(fq_table)
+
+    # Create an empty table
+    self.create_table(fq_table, scale=0)
+
+    # When there are no scan ranges, then fragment instance key is 0. This is
+    # somewhat of a toy case and we probably want to avoid caching in this
+    # case. Nonetheless, it is a good sanity check.
+    empty_result = self.cached_query(query, mt_dop=mt_dop)
+    cache_keys = get_cache_keys(empty_result.runtime_profile)
+    assert len(cache_keys) == 1
+    empty_table_compile_key, empty_table_finst_key = cache_keys[0].split("_")
+    assert empty_table_finst_key == "0"
+    assert len(empty_result.data) == 0
+    if mt_dop > 0:
+      assert_deterministic_scan(empty_result.runtime_profile)
+
+    # Insert a row, which creates a file / scan range
+    self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+        fq_table, table_value(0)))
+
+    # Now, there is a scan range, so the fragment instance key should be 
non-zero.
+    one_file_result = self.cached_query(query, mt_dop=mt_dop)
+    cache_keys = get_cache_keys(one_file_result.runtime_profile)
+    assert len(cache_keys) == 1
+    one_file_compile_key, one_file_finst_key = cache_keys[0].split("_")
+    assert one_file_finst_key != "0"
+    # This should be a cache miss
+    assertCounters(one_file_result.runtime_profile, 0, 0, 0)
+    assert len(one_file_result.data) == 1
+    if mt_dop > 0:
+      assert_deterministic_scan(one_file_result.runtime_profile)
+
+    # The new scan range did not change the compile-time key
+    assert empty_table_compile_key == one_file_compile_key
+
+    # Insert another row, which creates a file / scan range
+    self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+        fq_table, table_value(1)))
+
+    # There is a second scan range, so the fragment instance key should change 
again
+    two_files_result = self.cached_query(query, mt_dop=mt_dop)
+    cache_keys = get_cache_keys(two_files_result.runtime_profile)
+    assert len(cache_keys) == 1
+    two_files_compile_key, two_files_finst_key = cache_keys[0].split("_")
+    assert two_files_finst_key != "0"
+    assertCounters(two_files_result.runtime_profile, 0, 0, 0)
+    assert len(two_files_result.data) == 2
+    assert one_file_finst_key != two_files_finst_key
+    overlapping_rows = 
set(one_file_result.data).intersection(set(two_files_result.data))
+    assert len(overlapping_rows) == 1
+    if mt_dop > 0:
+      assert_deterministic_scan(two_files_result.runtime_profile)
+
+    # The new scan range did not change the compile-time key
+    assert one_file_compile_key == two_files_compile_key
+
+    # Invalidate metadata and rerun the last query. The keys should stay the 
same.
+    self.cached_query("invalidate metadata")
+    rerun_two_files_result = self.cached_query(query, mt_dop=mt_dop)
+    # Verify that this is a cache hit
+    assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0)
+    cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile)
+    assert len(cache_keys) == 1
+    rerun_two_files_compile_key, rerun_two_files_finst_key = 
cache_keys[0].split("_")
+    assert rerun_two_files_finst_key == two_files_finst_key
+    assert rerun_two_files_compile_key == two_files_compile_key
+    assert rerun_two_files_result.data == two_files_result.data
+
+  @CustomClusterTestSuite.with_args(
+    start_args=CACHE_START_ARGS, cluster_size=1)
+  @pytest.mark.execute_serially
+  def test_scan_range_partitioned(self, vector, unique_database):
+    """
+    This tests a basic partitioned case where the query is identical except 
that
+    it operates on different partitions (and thus different scan ranges)
+    This runs on a single node with mt_dop=0 or mt_dop=1 to keep it simple.
+    """
+    mt_dop = vector.get_value('mt_dop')
+    # To keep this simple, we skip mt_dop > 1.
+    if mt_dop > 1:
+      pytest.skip()
+    year2009_result = self.cached_query(
+        "select * from functional.alltypes where year=2009", mt_dop=mt_dop)
+    cache_keys = get_cache_keys(year2009_result.runtime_profile)
+    assert len(cache_keys) == 1
+    year2009_compile_key, year2009_finst_key = cache_keys[0].split("_")
+
+    year2010_result = self.cached_query(
+        "select * from functional.alltypes where year=2010", mt_dop=mt_dop)
+    cache_keys = get_cache_keys(year2010_result.runtime_profile)
+    assert len(cache_keys) == 1
+    year2010_compile_key, year2010_finst_key = cache_keys[0].split("_")
+    # This should be a cache miss
+    assertCounters(year2010_result.runtime_profile, 0, 0, 0)
+
+    # The year=X predicate is on a partition column, so it is enforced by 
pruning
+    # partitions and doesn't carry through to execution. The compile keys for
+    # the two queries are the same, but the fragment instance keys are 
different due
+    # to the different scan ranges from different partitions.
+    assert year2009_compile_key == year2010_compile_key
+    assert year2009_finst_key != year2010_finst_key
+    # Verify that the results are completely different
+    year2009_result_set = set(year2009_result.data)
+    year2010_result_set = set(year2010_result.data)
+    overlapping_rows = year2009_result_set.intersection(year2010_result_set)
+    assert len(overlapping_rows) == 0
+    assert year2009_result.data[0].find("2009") != -1
+    assert year2009_result.data[0].find("2010") == -1
+    assert year2010_result.data[0].find("2010") != -1
+    assert year2010_result.data[0].find("2009") == -1
+
+  @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
+  @pytest.mark.execute_serially
+  def test_scan_range_distributed(self, vector, unique_database):
+    """
+    This tests the distributed case where there are multiple fragment instances
+    processing different scan ranges. Each fragment instance should have a
+    distinct cache key. When adding a scan range, at least one fragment 
instance
+    cache key should change.
+    """
+
+    mt_dop = vector.get_value('mt_dop')
+    fq_table = "{0}.scan_range_basics_mtdop{1}".format(unique_database, mt_dop)
+    query = "SELECT * from {0}".format(fq_table)
+
+    # Create a table with several files so that we always have enough work for 
multiple
+    # fragment instances
+    self.create_table(fq_table, scale=20)
+
+    # We run a simple select. This is running with multiple impalads, so there 
are
+    # always multiple fragment instances
+    before_result = self.cached_query(query, mt_dop=mt_dop)
+    cache_keys = get_cache_keys(before_result.runtime_profile)
+    expected_num_keys = 3 * max(mt_dop, 1)
+    assert len(cache_keys) == expected_num_keys
+    # Every cache key should be distinct, as the fragment instances are 
processing
+    # different data
+    unique_cache_keys = set(cache_keys)
+    assert len(unique_cache_keys) == expected_num_keys
+    # Every cache key has the same compile key
+    unique_compile_keys = set([key.split("_")[0] for key in unique_cache_keys])
+    assert len(unique_compile_keys) == 1
+    # Verify the cache metrics for each impalad. Since we started from 
scratch, the
+    # number of entries in the cache should be the same as the number of cache 
keys.
+    for impalad in self.cluster.impalads:
+      entries_in_use = self.get_tuple_cache_metric(impalad.service, 
"entries-in-use")
+      assert entries_in_use == max(mt_dop, 1)
+    if mt_dop > 0:
+      assert_deterministic_scan(before_result.runtime_profile)
+
+    # Insert another row, which creates a file / scan range
+    # This uses a very large seed for table_value() to get a unique row that 
isn't
+    # already in the table.
+    self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+        fq_table, table_value(1000000)))
+
+    # Rerun the query with the extra scan range
+    after_insert_result = self.cached_query(query, mt_dop=mt_dop)
+    cache_keys = get_cache_keys(after_insert_result.runtime_profile)
+    expected_num_keys = 3 * max(mt_dop, 1)
+    assert len(cache_keys) == expected_num_keys
+    # Every cache key should be distinct, as the fragment instances are 
processing
+    # different data
+    after_insert_unique_cache_keys = set(cache_keys)
+    assert len(after_insert_unique_cache_keys) == expected_num_keys
+    # Every cache key has the same compile key
+    unique_compile_keys = \
+        set([key.split("_")[0] for key in after_insert_unique_cache_keys])
+    assert len(unique_compile_keys) == 1
+    # Verify the cache metrics. We can do a more exact bound by looking at the 
total
+    # across all impalads. The lower bound for this is the number of unique 
cache
+    # keys across both queries we ran. The upper bound for the number of 
entries is
+    # double the expected number from the first run of the query.
+    #
+    # This is not the exact number, because cache key X could have run on 
executor 1
+    # for the first query and on executor 2 for the second query. Even though 
it would
+    # appear as a single unique cache key, it is two different cache entries 
in different
+    # executors.
+    all_cache_keys = unique_cache_keys.union(after_insert_unique_cache_keys)
+    total_entries_in_use = 0
+    for impalad in self.cluster.impalads:
+      entries_in_use = self.get_tuple_cache_metric(impalad.service, 
"entries-in-use")
+      assert entries_in_use >= max(mt_dop, 1)
+      assert entries_in_use <= (2 * max(mt_dop, 1))
+      total_entries_in_use += entries_in_use
+    assert total_entries_in_use >= len(all_cache_keys)
+    if mt_dop > 0:
+      assert_deterministic_scan(after_insert_result.runtime_profile)
+
+    # The extra scan range means that at least one fragment instance key 
changed
+    # Since scheduling can change completely with the addition of a single 
scan range,
+    # we can't assert that only one cache key changes.
+    changed_cache_keys = unique_cache_keys.symmetric_difference(
+        after_insert_unique_cache_keys)
+    assert len(changed_cache_keys) != 0
+
+    # Each row is distinct, so that makes it easy to verify that the results 
overlap
+    # except the second result contains one more row than the first result.
+    before_result_set = set(before_result.data)
+    after_insert_result_set = set(after_insert_result.data)
+    assert len(before_result_set) == 70 * 20
+    assert len(before_result_set) + 1 == len(after_insert_result_set)
+    different_rows = 
before_result_set.symmetric_difference(after_insert_result_set)
+    assert len(different_rows) == 1

Reply via email to