This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 831585c2f52934f9367639eff47e13afcf0fc7b2 Author: Joe McDonnell <[email protected]> AuthorDate: Mon Jun 17 20:25:00 2024 -0700 IMPALA-12906: Incorporate scan range information into the tuple cache key This change is accomplishing two things: 1. It incorporates scan range information into the tuple cache key. 2. It reintroduces deterministic scheduling as an option for mt_dop and uses it for HdfsScanNodes that feed into a TupleCacheNode. The combination of these two things solves several problems: 1. When a table is modified, the list of scan ranges will change, and this will naturally change the cache keys. 2. When accessing a partitioned table, two queries may have different predicates on the partition columns. Since the predicates can be satisfied via partition pruning, they are not included at runtime. This means that two queries may have identical compile-time keys with only the scan ranges being different due to different partition pruning. 3. Each fragment instance processes different scan ranges, so each will have a unique cache key. To incorporate scan range information, this introduces a new per-fragment-instance cache key. At compile time, the planner now keeps track of which HdfsScanNodes feed into a TupleCacheNode. This is passed over to the runtime as a list of plan node ids that contain scan ranges. At runtime, the fragment instance walks through the list of plan nodes ids and hashes any scan ranges associated with them. This hash is the per-fragment-instance cache key. The combination of the compile-time cache key produced by the planner and the per-fragment-instance cache key is a unique identifier of the result. Deterministic scheduling for mt_dop was removed via IMPALA-9655 with the introduction of the shared queue. This revives some of the pre-IMPALA-9655 scheduling logic as an option. Since the TupleCacheNode knows which HdfsScanNodes feed into it, the TupleCacheNode turns on deterministic scheduling for all of those HdfsScanNodes. Since this only applies to HdfsScanNodes that feed into a TupleCacheNode, it means that any HdfsScanNode that doesn't feed into a TupleCacheNode continues using the current algorithm. The pre-IMPALA-9655 code is modified to make it more deterministic about how it assigns scan ranges to instances. Testing: - Added custom cluster tests for the scan range information including modifying a table, selecting from a partitioned table, and verifying that fragment instances have unique keys - Added basic frontend test to verify that deterministic scheduling gets set on the HdfsScanNode that feed into the TupleCacheNode. - Restored the pre-IMPALA-9655 backend test to cover the LPT code Change-Id: Ibe298fff0f644ce931a2aa934ebb98f69aab9d34 Reviewed-on: http://gerrit.cloudera.org:8080/21541 Reviewed-by: Michael Smith <[email protected]> Tested-by: Michael Smith <[email protected]> Reviewed-by: Yida Wu <[email protected]> --- be/src/exec/hdfs-scan-node-base.cc | 47 ++-- be/src/exec/hdfs-scan-node-base.h | 10 + be/src/exec/hdfs-scan-node-mt.cc | 14 +- be/src/exec/tuple-cache-node.cc | 56 ++++- be/src/exec/tuple-cache-node.h | 15 +- be/src/scheduling/scheduler-test.cc | 201 ++++++++++++++- be/src/scheduling/scheduler.cc | 157 +++++++++++- be/src/scheduling/scheduler.h | 14 +- common/thrift/PlanNodes.thrift | 15 +- .../impala/common/ThriftSerializationCtx.java | 13 +- .../org/apache/impala/planner/HdfsScanNode.java | 44 +++- .../org/apache/impala/planner/TupleCacheInfo.java | 46 +++- .../org/apache/impala/planner/TupleCacheNode.java | 27 ++- .../org/apache/impala/planner/TupleCacheTest.java | 19 ++ tests/custom_cluster/test_tuple_cache.py | 270 ++++++++++++++++++++- 15 files changed, 879 insertions(+), 69 deletions(-) diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 4ccd656a6..51046243c 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -254,7 +254,11 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat int64_t total_splits = 0; const vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs = state->instance_ctx_pbs(); - for (auto ctx : instance_ctx_pbs) { + auto instance_ctxs = state->instance_ctxs(); + DCHECK_EQ(instance_ctxs.size(), instance_ctx_pbs.size()); + for (int i = 0; i < instance_ctxs.size(); ++i) { + auto ctx = instance_ctx_pbs[i]; + auto instance_ctx = instance_ctxs[i]; auto ranges = ctx->per_node_scan_ranges().find(tnode_->node_id); if (ranges == ctx->per_node_scan_ranges().end()) continue; for (const ScanRangeParamsPB& params : ranges->second.scan_ranges()) { @@ -307,6 +311,7 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat file_desc->is_encrypted = split.is_encrypted(); file_desc->is_erasure_coded = split.is_erasure_coded(); file_desc->file_metadata = file_metadata; + file_desc->fragment_instance_id = instance_ctx->fragment_instance_id; if (file_metadata) { DCHECK(file_metadata->iceberg_metadata() != nullptr); switch (file_metadata->iceberg_metadata()->file_format()) { @@ -375,22 +380,32 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat // Distribute the work evenly for issuing initial scan ranges. DCHECK(shared_state_.use_mt_scan_node_ || instance_ctx_pbs.size() == 1) << "Non MT scan node should only have a single instance."; - auto instance_ctxs = state->instance_ctxs(); - DCHECK_EQ(instance_ctxs.size(), instance_ctx_pbs.size()); - int files_per_instance = file_descs.size() / instance_ctxs.size(); - int remainder = file_descs.size() % instance_ctxs.size(); - int num_lists = min(file_descs.size(), instance_ctxs.size()); - auto fd_it = file_descs.begin(); - for (int i = 0; i < num_lists; ++i) { - vector<HdfsFileDesc*>* curr_file_list = - &shared_state_ - .file_assignment_per_instance_[instance_ctxs[i]->fragment_instance_id]; - for (int j = 0; j < files_per_instance + (i < remainder); ++j) { - curr_file_list->push_back(fd_it->second); - ++fd_it; + if (tnode_->hdfs_scan_node.deterministic_scanrange_assignment) { + // If using deterministic scan range assignment, there is no need to rebalance + // the scan ranges. The scan ranges stay with their original fragment instance. + for (auto& fd : file_descs) { + const TUniqueId& instance_id = fd.second->fragment_instance_id; + shared_state_.file_assignment_per_instance_[instance_id].push_back(fd.second); + } + } else { + // When not using the deterministic scan range assignment, the scan ranges are + // balanced round robin across fragment instances for the purpose of issuing + // initial scan ranges. + int files_per_instance = file_descs.size() / instance_ctxs.size(); + int remainder = file_descs.size() % instance_ctxs.size(); + int num_lists = min(file_descs.size(), instance_ctxs.size()); + auto fd_it = file_descs.begin(); + for (int i = 0; i < num_lists; ++i) { + vector<HdfsFileDesc*>* curr_file_list = + &shared_state_ + .file_assignment_per_instance_[instance_ctxs[i]->fragment_instance_id]; + for (int j = 0; j < files_per_instance + (i < remainder); ++j) { + curr_file_list->push_back(fd_it->second); + ++fd_it; + } } + DCHECK(fd_it == file_descs.end()); } - DCHECK(fd_it == file_descs.end()); return Status::OK(); } @@ -470,6 +485,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pno disks_accessed_bitmap_(TUnit::UNIT, 0), active_hdfs_read_thread_counter_(TUnit::UNIT, 0), shared_state_(const_cast<ScanRangeSharedState*>(&(pnode.shared_state_))), + deterministic_scanrange_assignment_( + hdfs_scan_node.deterministic_scanrange_assignment), file_metadata_utils_(this) {} HdfsScanNodeBase::~HdfsScanNodeBase() {} diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 0517b1f3c..8802b7ff3 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -103,6 +103,10 @@ struct HdfsFileDesc { /// Whether file is erasure coded. bool is_erasure_coded = false; + /// Fragment instance assignment from the scheduler (used for maintaining + /// deterministic assignments when that mode is enabled). + TUniqueId fragment_instance_id; + /// Some useful typedefs for creating HdfsFileDesc related data structures. /// This is a pair for partition ID and filename which uniquely identifies a file. typedef pair<int64_t, std::string> PartitionFileKey; @@ -819,6 +823,12 @@ class HdfsScanNodeBase : public ScanNode { /// Pointer to the scan range related state that is shared across all node instances. ScanRangeSharedState* shared_state_ = nullptr; + /// Whether mt_dop uses deterministic scan range assignment + /// If true, each fragment instance has its own list of scan ranges. + /// If false, the fragment instances get scan ranges from the shared queue. + /// Not used for mt_dop=0. + bool deterministic_scanrange_assignment_; + /// Utility class for handling file metadata. FileMetadataUtils file_metadata_utils_; diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc index 85aada3c8..1d3077d72 100644 --- a/be/src/exec/hdfs-scan-node-mt.cc +++ b/be/src/exec/hdfs-scan-node-mt.cc @@ -54,7 +54,10 @@ Status HdfsScanNodeMt::Open(RuntimeState* state) { ScopedOpenEventAdder ea(this); RETURN_IF_ERROR(HdfsScanNodeBase::Open(state)); DCHECK(!initial_ranges_issued_.Load()); - shared_state_->AddCancellationHook(state); + // The cancellation hook is only needed when using the shared queue + if (!deterministic_scanrange_assignment_) { + shared_state_->AddCancellationHook(state); + } RETURN_IF_ERROR(IssueInitialScanRanges(state)); return Status::OK(); } @@ -150,6 +153,11 @@ Status HdfsScanNodeMt::AddDiskIoRanges( << "Don't call AddScanRanges() after all ranges finished."; DCHECK_GT(shared_state_->RemainingScanRangeSubmissions(), 0); DCHECK_GT(ranges.size(), 0); + if (deterministic_scanrange_assignment_) { + // Use independent scan ranges for different fragment instances + // Same code as non-mt-dop + return reader_context_->AddScanRanges(ranges, enqueue_location); + } bool at_front = false; if (enqueue_location == EnqueueLocation::HEAD) { at_front = true; @@ -160,6 +168,10 @@ Status HdfsScanNodeMt::AddDiskIoRanges( Status HdfsScanNodeMt::GetNextScanRangeToRead( io::ScanRange** scan_range, bool* needs_buffers) { + // With deterministic scan ranges, use the same code as non-mt-dop + if (deterministic_scanrange_assignment_) { + return reader_context_->GetNextUnstartedRange(scan_range, needs_buffers); + } RETURN_IF_ERROR(shared_state_->GetNextScanRange(runtime_state_, scan_range)); if (*scan_range != nullptr) { RETURN_IF_ERROR(reader_context_->StartScanRange(*scan_range, needs_buffers)); diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc index fd5dea2be..c140c1b9d 100644 --- a/be/src/exec/tuple-cache-node.cc +++ b/be/src/exec/tuple-cache-node.cc @@ -25,6 +25,7 @@ #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/tuple-cache-mgr.h" +#include "util/hash-util.h" #include "util/runtime-profile-counters.h" #include "util/runtime-profile.h" @@ -41,8 +42,7 @@ Status TupleCachePlanNode::CreateExecNode( TupleCacheNode::TupleCacheNode( ObjectPool* pool, const TupleCachePlanNode& pnode, const DescriptorTbl& descs) - : ExecNode(pool, pnode, descs) - , subtree_hash_(pnode.tnode_->tuple_cache_node.subtree_hash) { + : ExecNode(pool, pnode, descs) { } TupleCacheNode::~TupleCacheNode() = default; @@ -54,6 +54,14 @@ Status TupleCacheNode::Prepare(RuntimeState* state) { ADD_COUNTER(runtime_profile(), "NumTupleCacheHalted", TUnit::UNIT); num_skipped_counter_ = ADD_COUNTER(runtime_profile(), "NumTupleCacheSkipped", TUnit::UNIT); + + // Compute the combined cache key by computing the fragment instance key and + // fusing it with the compile time key. + ComputeFragmentInstanceKey(state); + combined_key_ = plan_node().tnode_->tuple_cache_node.compile_time_key + "_" + + std::to_string(fragment_instance_key_); + runtime_profile()->AddInfoString("Combined Key", combined_key_); + return Status::OK(); } @@ -69,7 +77,7 @@ Status TupleCacheNode::Open(RuntimeState* state) { } TupleCacheMgr* tuple_cache_mgr = ExecEnv::GetInstance()->tuple_cache_mgr(); - handle_ = tuple_cache_mgr->Lookup(subtree_hash_, true); + handle_ = tuple_cache_mgr->Lookup(combined_key_, true); if (tuple_cache_mgr->IsAvailableForRead(handle_)) { reader_ = make_unique<TupleFileReader>( tuple_cache_mgr->GetPath(handle_), mem_tracker(), runtime_profile()); @@ -101,7 +109,7 @@ Status TupleCacheNode::Open(RuntimeState* state) { // - the query requests caching but cache is disabled via startup option // - another fragment is currently writing this cache entry // - the cache entry is a tombstone to prevent retries for too large entries - VLOG_FILE << "Tuple Cache: skipped for " << subtree_hash_; + VLOG_FILE << "Tuple Cache: skipped for " << combined_key_; COUNTER_ADD(num_skipped_counter_, 1); tuple_cache_mgr->IncrementMetric(TupleCacheMgr::MetricType::SKIPPED); } @@ -161,7 +169,7 @@ Status TupleCacheNode::GetNext( // continue reading from the child node. if (!status.ok()) { if (writer_->ExceededMaxSize()) { - VLOG_FILE << "Tuple Cache entry for " << subtree_hash_ + VLOG_FILE << "Tuple Cache entry for " << combined_key_ << " hit the maximum file size: " << status.GetDetail(); COUNTER_ADD(num_halted_counter_, 1); tuple_cache_mgr->IncrementMetric(TupleCacheMgr::MetricType::HALTED); @@ -230,9 +238,45 @@ void TupleCacheNode::Close(RuntimeState* state) { void TupleCacheNode::DebugString(int indentation_level, stringstream* out) const { *out << string(indentation_level * 2, ' '); - *out << "TupleCacheNode(" << subtree_hash_; + *out << "TupleCacheNode(" << combined_key_; ExecNode::DebugString(indentation_level, out); *out << ")"; } +void TupleCacheNode::ComputeFragmentInstanceKey(const RuntimeState* state) { + const PlanFragmentInstanceCtxPB& ctx = state->instance_ctx_pb(); + uint32_t hash = 0; + for (int32_t node_id : plan_node().tnode_->tuple_cache_node.input_scan_node_ids) { + auto ranges = ctx.per_node_scan_ranges().find(node_id); + if (ranges == ctx.per_node_scan_ranges().end()) continue; + for (const ScanRangeParamsPB& params : ranges->second.scan_ranges()) { + // This only supports HDFS right now + DCHECK(params.scan_range().has_hdfs_file_split()); + const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split(); + if (split.has_relative_path() && !split.relative_path().empty()) { + hash = HashUtil::Hash( + split.relative_path().data(), split.relative_path().length(), hash); + DCHECK(split.has_partition_path_hash()); + int32_t partition_path_hash = split.partition_path_hash(); + hash = HashUtil::Hash(&partition_path_hash, sizeof(partition_path_hash), hash); + } else if (split.has_absolute_path() && !split.absolute_path().empty()) { + hash = HashUtil::Hash( + split.absolute_path().data(), split.absolute_path().length(), hash); + } else { + DCHECK("Either relative_path or absolute_path must be set"); + } + DCHECK(split.has_offset()); + int64_t offset = split.offset(); + hash = HashUtil::Hash(&offset, sizeof(offset), hash); + DCHECK(split.has_length()); + int64_t length = split.length(); + hash = HashUtil::Hash(&length, sizeof(length), hash); + DCHECK(split.has_mtime()); + int64_t mtime = split.mtime(); + hash = HashUtil::Hash(&mtime, sizeof(mtime), hash); + } + } + fragment_instance_key_ = hash; +} + } diff --git a/be/src/exec/tuple-cache-node.h b/be/src/exec/tuple-cache-node.h index 772202125..19ff7eac6 100644 --- a/be/src/exec/tuple-cache-node.h +++ b/be/src/exec/tuple-cache-node.h @@ -35,7 +35,7 @@ class TupleCachePlanNode : public PlanNode { /// Node that caches rows produced by a child node. /// -/// If the subtree_hash_ matches an existing cache entry, returns result rows from the +/// If the combined_key_ matches an existing cache entry, returns result rows from the /// cache rather than from the child. Otherwise reads results from the child, writes them /// to cache, and returns them. @@ -52,7 +52,14 @@ class TupleCacheNode : public ExecNode { void Close(RuntimeState* state) override; void DebugString(int indentation_level, std::stringstream* out) const override; private: - const std::string subtree_hash_; + // Fragment instance cache key. This is calculated at runtime by combining information + // about the input nodes for this fragment. It currently focuses on hashing the + // scan ranges from scan nodes. In future, it will need to handle exchanges. + uint32_t fragment_instance_key_; + + // This is a string containing the compile time key and the fragment_instance_key_. + // This combination is unique for a given fragment instance. + std::string combined_key_; /// Number of results that were found in the tuple cache RuntimeProfile::Counter* num_hits_counter_ = nullptr; @@ -68,6 +75,10 @@ private: void ReleaseResult(); + // Construct the fragment instance part of the cache key by hashing information about + // inputs to this fragment (e.g. scan ranges). + void ComputeFragmentInstanceKey(const RuntimeState *state); + /// Reader/Writer for caching TupleCacheMgr::UniqueHandle handle_; std::unique_ptr<TupleFileReader> reader_; diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc index d29f4e7f7..b79728b1c 100644 --- a/be/src/scheduling/scheduler-test.cc +++ b/be/src/scheduling/scheduler-test.cc @@ -779,11 +779,11 @@ TEST_F(SchedulerTest, TestMultipleFinstances) { // Test handling of the single instance case - all ranges go to the same instance. vector<vector<ScanRangeParamsPB>> fs_one_instance = - Scheduler::AssignRangesToInstances(1, fs_ranges); + Scheduler::AssignRangesToInstances(1, &fs_ranges); ASSERT_EQ(1, fs_one_instance.size()); EXPECT_EQ(NUM_RANGES, fs_one_instance[0].size()); vector<vector<ScanRangeParamsPB>> kudu_one_instance = - Scheduler::AssignRangesToInstances(1, kudu_ranges); + Scheduler::AssignRangesToInstances(1, &kudu_ranges); ASSERT_EQ(1, kudu_one_instance.size()); EXPECT_EQ(NUM_RANGES, kudu_one_instance[0].size()); @@ -791,7 +791,7 @@ TEST_F(SchedulerTest, TestMultipleFinstances) { for (int attempt = 0; attempt < 20; ++attempt) { std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_); vector<vector<ScanRangeParamsPB>> range_per_instance = - Scheduler::AssignRangesToInstances(NUM_RANGES, fs_ranges); + Scheduler::AssignRangesToInstances(NUM_RANGES, &fs_ranges); EXPECT_EQ(NUM_RANGES, range_per_instance.size()); // Confirm each range is present and each instance got exactly one range. for (int i = 0; i < NUM_RANGES; ++i) { @@ -804,7 +804,7 @@ TEST_F(SchedulerTest, TestMultipleFinstances) { for (int attempt = 0; attempt < 20; ++attempt) { std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_); vector<vector<ScanRangeParamsPB>> range_per_instance = - Scheduler::AssignRangesToInstances(4, fs_ranges); + Scheduler::AssignRangesToInstances(4, &fs_ranges); EXPECT_EQ(4, range_per_instance.size()); for (int i = 0; i < range_per_instance.size(); ++i) { EXPECT_EQ(4, range_per_instance[i].size()) << i; @@ -814,7 +814,7 @@ TEST_F(SchedulerTest, TestMultipleFinstances) { for (int attempt = 0; attempt < 20; ++attempt) { std::shuffle(kudu_ranges.begin(), kudu_ranges.end(), rng_); vector<vector<ScanRangeParamsPB>> range_per_instance = - Scheduler::AssignRangesToInstances(4, kudu_ranges); + Scheduler::AssignRangesToInstances(4, &kudu_ranges); EXPECT_EQ(4, range_per_instance.size()); for (const auto& instance_ranges : range_per_instance) { EXPECT_EQ(4, instance_ranges.size()); @@ -824,4 +824,195 @@ TEST_F(SchedulerTest, TestMultipleFinstances) { } } } + +// This tests the pre-IMPALA-9655 LPT scheduling code that is now used for tuple caching +// This is equivalent to the TestMultipleFinstances test before IMPALA-9655. +TEST_F(SchedulerTest, TestMultipleFinstancesLPT) { + const int NUM_RANGES = 16; + std::vector<ScanRangeParamsPB> fs_ranges(NUM_RANGES); + std::vector<ScanRangeParamsPB> kudu_ranges(NUM_RANGES); + // Create ranges with lengths 1, 2, ..., etc. + for (int i = 0; i < NUM_RANGES; ++i) { + *fs_ranges[i].mutable_scan_range()->mutable_hdfs_file_split() = HdfsFileSplitPB(); + fs_ranges[i].mutable_scan_range()->mutable_hdfs_file_split()->set_length(i + 1); + kudu_ranges[i].mutable_scan_range()->set_kudu_scan_token("fake token"); + } + + // Test handling of the single instance case - all ranges go to the same instance. + vector<vector<ScanRangeParamsPB>> fs_one_instance = + Scheduler::AssignRangesToInstances(1, &fs_ranges, /* use_lpt */ true); + ASSERT_EQ(1, fs_one_instance.size()); + EXPECT_EQ(NUM_RANGES, fs_one_instance[0].size()); + vector<vector<ScanRangeParamsPB>> kudu_one_instance = + Scheduler::AssignRangesToInstances(1, &kudu_ranges, /* use_lpt */ true); + ASSERT_EQ(1, kudu_one_instance.size()); + EXPECT_EQ(NUM_RANGES, kudu_one_instance[0].size()); + + // Ensure that each executor gets one range regardless of input order. + for (int attempt = 0; attempt < 20; ++attempt) { + std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_); + vector<vector<ScanRangeParamsPB>> range_per_instance = + Scheduler::AssignRangesToInstances(NUM_RANGES, &fs_ranges, /* use_lpt */ true); + EXPECT_EQ(NUM_RANGES, range_per_instance.size()); + // Confirm each range is present and each instance got exactly one range. + vector<int> range_length_count(NUM_RANGES); + for (const auto& instance_ranges : range_per_instance) { + ASSERT_EQ(1, instance_ranges.size()); + ++range_length_count[instance_ranges[0].scan_range().hdfs_file_split().length() + - 1]; + } + for (int i = 0; i < NUM_RANGES; ++i) { + EXPECT_EQ(1, range_length_count[i]) << i; + } + } + + // Test load balancing FS ranges across 4 instances. We should get an even assignment + // across the instances regardless of input order. + for (int attempt = 0; attempt < 20; ++attempt) { + std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_); + vector<vector<ScanRangeParamsPB>> range_per_instance = + Scheduler::AssignRangesToInstances(4, &fs_ranges, /* use_lpt */ true); + EXPECT_EQ(4, range_per_instance.size()); + // Ensure we got a range of each length in the output. + vector<int> range_length_count(NUM_RANGES); + for (const auto& instance_ranges : range_per_instance) { + EXPECT_EQ(4, instance_ranges.size()); + int64_t instance_bytes = 0; + for (const auto& range : instance_ranges) { + instance_bytes += range.scan_range().hdfs_file_split().length(); + ++range_length_count[range.scan_range().hdfs_file_split().length() - 1]; + } + // Expect each instance to get sum([1, 2, ..., 16]) / 4 bytes when things are + // distributed evenly. + EXPECT_EQ(34, instance_bytes); + } + for (int i = 0; i < NUM_RANGES; ++i) { + EXPECT_EQ(1, range_length_count[i]) << i; + } + } + + // Test load balancing Kudu ranges across 4 instances. We should get an even assignment + // across the instances regardless of input order. We don't know the size of each Kudu + // range, so we just need to check the # of ranges. + for (int attempt = 0; attempt < 20; ++attempt) { + std::shuffle(kudu_ranges.begin(), kudu_ranges.end(), rng_); + vector<vector<ScanRangeParamsPB>> range_per_instance = + Scheduler::AssignRangesToInstances(4, &kudu_ranges, /* use_lpt */ true); + EXPECT_EQ(4, range_per_instance.size()); + for (const auto& instance_ranges : range_per_instance) { + EXPECT_EQ(4, instance_ranges.size()); + for (const auto& range : instance_ranges) { + EXPECT_TRUE(range.scan_range().has_kudu_scan_token()); + } + } + } +} + +// This tests the ScanRangeComparator to verify that it is consistent. +TEST_F(SchedulerTest, TestScanRangeComparator) { + // Test comparisons for HDFS ranges + // Start with two ranges a and b that are identical + ScanRangeParamsPB a; + *a.mutable_scan_range()->mutable_hdfs_file_split() = HdfsFileSplitPB(); + HdfsFileSplitPB* a_hdfs = a.mutable_scan_range()->mutable_hdfs_file_split(); + a_hdfs->set_relative_path("aaaa.txt"); + a_hdfs->set_offset(0); + a_hdfs->set_length(512); + a_hdfs->set_partition_id(10); + a_hdfs->set_file_length(1024); + a_hdfs->set_file_compression(CompressionTypePB::LZ4); + a_hdfs->set_mtime(12345); + a_hdfs->set_is_erasure_coded(false); + a_hdfs->set_partition_path_hash(11111); + a_hdfs->set_absolute_path("absolute_path"); + a_hdfs->set_is_encrypted(false); + + ScanRangeParamsPB b = a; + HdfsFileSplitPB* b_hdfs = b.mutable_scan_range()->mutable_hdfs_file_split(); + EXPECT_FALSE(Scheduler::ScanRangeComparator(a, b)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(b, a)); + + // Some fields don't matter, so changing them should have no influence on the + // comparator. + a_hdfs->set_partition_id(1); + a_hdfs->set_file_compression(CompressionTypePB::LZO); + a_hdfs->set_is_erasure_coded(true); + a_hdfs->set_is_encrypted(true); + a_hdfs->set_absolute_path("other absolute path"); + EXPECT_FALSE(Scheduler::ScanRangeComparator(a, b)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(b, a)); + + // ScanRangeComparator checks fields in a specific order to bail out early. + // This is testing them in the opposite order of their predence, starting with the + // lowest predence field and moving up to the highest predence field. Each field + // overrides the lower predence field, flipping the comparison. + // The last thing it compares is relative path. Make a > b based on relative path. + a_hdfs->set_relative_path("bbbb.txt"); + EXPECT_TRUE(Scheduler::ScanRangeComparator(a, b)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(b, a)); + + // 2nd to last is partition_path_hash. Make b > a. This takes precedence over the + // relative_path. + b_hdfs->set_partition_path_hash(22222); + EXPECT_TRUE(Scheduler::ScanRangeComparator(b, a)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(a, b)); + + // 3rd to last is file_length. Make a > b. + a_hdfs->set_file_length(1025); + EXPECT_TRUE(Scheduler::ScanRangeComparator(a, b)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(b, a)); + + // 4th to last is offset. Make b > a. + b_hdfs->set_offset(1); + EXPECT_TRUE(Scheduler::ScanRangeComparator(b, a)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(a, b)); + + // 5th to last field checked is mtime. Make a > b. + a_hdfs->set_mtime(12346); + EXPECT_TRUE(Scheduler::ScanRangeComparator(a, b)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(b, a)); + + // Length is used by ScanRangeWeight and is the first field checked. Make b > a. + b_hdfs->set_length(513); + EXPECT_TRUE(Scheduler::ScanRangeComparator(b, a)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(a, b)); + + // Test comparison for Kudu ranges + ScanRangeParamsPB a_kudu; + a_kudu.mutable_scan_range()->set_kudu_scan_token("abc"); + ScanRangeParamsPB b_kudu = a_kudu; + EXPECT_FALSE(Scheduler::ScanRangeComparator(a_kudu, b_kudu)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(b_kudu, a_kudu)); + + // Set the kudu scan token to make a > b + a_kudu.mutable_scan_range()->set_kudu_scan_token("bcd"); + EXPECT_TRUE(Scheduler::ScanRangeComparator(a_kudu, b_kudu)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(b_kudu, a_kudu)); + + // Test comparison for HBase ranges + ScanRangeParamsPB a_hbase; + *a_hbase.mutable_scan_range()->mutable_hbase_key_range() = HBaseKeyRangePB(); + HBaseKeyRangePB* a_hbase_keyrange = + a_hbase.mutable_scan_range()->mutable_hbase_key_range(); + a_hbase_keyrange->set_startkey("aaa"); + a_hbase_keyrange->set_stopkey("fff"); + + ScanRangeParamsPB b_hbase = a_hbase; + HBaseKeyRangePB* b_hbase_keyrange = + b_hbase.mutable_scan_range()->mutable_hbase_key_range(); + EXPECT_FALSE(Scheduler::ScanRangeComparator(a_hbase, b_hbase)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(b_hbase, a_hbase)); + + // Again, set fields differently from the lowest precedence field to the + // highest predence field. + // The last field ScanRangeComparator compares is stopkey. Make a > b. + a_hbase_keyrange->set_stopkey("ggg"); + EXPECT_TRUE(Scheduler::ScanRangeComparator(a_hbase, b_hbase)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(b_hbase, a_hbase)); + + // Set startkey so that b > a. This takes precedence over stopkey. + b_hbase_keyrange->set_startkey("bbb"); + EXPECT_TRUE(Scheduler::ScanRangeComparator(b_hbase, a_hbase)); + EXPECT_FALSE(Scheduler::ScanRangeComparator(a_hbase, b_hbase)); +} } // end namespace impala diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index e13c437e5..4373334ca 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -572,6 +572,103 @@ Status Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_confi return CheckEffectiveInstanceCount(fragment_state, state); } +/// Returns a numeric weight that is proportional to the estimated processing time for +/// the scan range represented by 'params'. Weights from different scan node +/// implementations, e.g. FS vs Kudu, are not comparable. +static int64_t ScanRangeWeight(const ScanRangeParamsPB& params) { + if (params.scan_range().has_hdfs_file_split()) { + return params.scan_range().hdfs_file_split().length(); + } else { + // Give equal weight to each Kudu and Hbase split. + // TODO: implement more accurate logic for Kudu and Hbase + return 1; + } +} + +bool Scheduler::ScanRangeComparator(const ScanRangeParamsPB& a, + const ScanRangeParamsPB& b) { + // We are ordering by weight largest to smallest. Return quickly if the weights are + // different. + int64_t a_weight = ScanRangeWeight(a), b_weight = ScanRangeWeight(b); + if (a_weight != b_weight) return a_weight > b_weight; + // The weights are the same, so we need to break ties to make this deterministic. + if (a.scan_range().has_hdfs_file_split()) { + // HDFS scan ranges should be compared against other HDFS scan ranges. + if (!b.scan_range().has_hdfs_file_split()) { + DCHECK(false) << "HDFS scan ranges can only be compared against other HDFS ranges"; + return false; + } + // Break ties by comparing various fields of the HDFS split. The ordering here is + // arbitrary, so this starts with cheap checks and moves to more expensive checks. + const HdfsFileSplitPB& a_hdfs_split = a.scan_range().hdfs_file_split(); + const HdfsFileSplitPB& b_hdfs_split = b.scan_range().hdfs_file_split(); + if (a_hdfs_split.mtime() != b_hdfs_split.mtime()) { + return a_hdfs_split.mtime() > b_hdfs_split.mtime(); + } + if (a_hdfs_split.offset() != b_hdfs_split.offset()) { + return a_hdfs_split.offset() > b_hdfs_split.offset(); + } + if (a_hdfs_split.file_length() != b_hdfs_split.file_length()) { + return a_hdfs_split.file_length() > b_hdfs_split.file_length(); + } + if (a_hdfs_split.partition_path_hash() != b_hdfs_split.partition_path_hash()) { + return a_hdfs_split.partition_path_hash() > b_hdfs_split.partition_path_hash(); + } + if (a_hdfs_split.relative_path() != b_hdfs_split.relative_path()) { + return a_hdfs_split.relative_path() > b_hdfs_split.relative_path(); + } + } else if (a.scan_range().has_kudu_scan_token()) { + // Kudu scan ranges should be compared against other Kudu scan ranges + if (!b.scan_range().has_kudu_scan_token()) { + DCHECK(false) << "Kudu scan ranges can only be compared against other Kudu ranges"; + return false; + } + // Break ties by comparing the kudu scan token + return a.scan_range().kudu_scan_token() > b.scan_range().kudu_scan_token(); + } else if (a.scan_range().has_hbase_key_range()) { + // HBase scan ranges should be compared against other HBase scan ranges + if (!b.scan_range().has_hbase_key_range()) { + DCHECK(false) + << "HBase scan ranges can only be compared against other HBase ranges"; + return false; + } + const HBaseKeyRangePB& a_hbase_range = a.scan_range().hbase_key_range(); + const HBaseKeyRangePB& b_hbase_range = b.scan_range().hbase_key_range(); + if (a_hbase_range.startkey() != b_hbase_range.startkey()) { + return a_hbase_range.startkey() > b_hbase_range.startkey(); + } + if (a_hbase_range.stopkey() != b_hbase_range.stopkey()) { + return a_hbase_range.stopkey() > b_hbase_range.stopkey(); + } + } + return false; +} + +/// Helper class used in CreateScanInstances() to track the amount of work assigned +/// to each instance so far. +struct InstanceAssignment { + // The weight assigned so far. + int64_t weight; + + // The index of the instance in 'per_instance_ranges' + int instance_idx; + + // Comparator for use in a heap as part of the longest processing time algo. + // Invert the comparison order because the *_heap functions implement a max-heap + // and we want to assign to the least-loaded instance first. + bool operator<(InstanceAssignment& other) const { + if (weight == other.weight) { + // To make this deterministic, break ties by comparing the instance idxs + // (which are unique). Like the weight, this is also inverted to put the lower + // indexes first as this is a max heap. That matches the order that we use when + // constructing the initial list, so there is no need to call make_heap(). + return instance_idx > other.instance_idx; + } else { + return weight > other.weight; + } + } +}; + // Maybe the easiest way to understand the objective of this algorithm is as a // generalization of two simpler instance creation algorithms that decide how many // instances of a fragment to create on each node, given a set of nodes that were @@ -694,8 +791,22 @@ void Scheduler::CreateCollocatedAndScanInstances(const ExecutorConfig& executor_ if (assignment_it == sra.end()) continue; auto scan_ranges_it = assignment_it->second.find(scan_node_id); if (scan_ranges_it == assignment_it->second.end()) continue; + const TPlanNode& scan_node = state->GetNode(scan_node_id); + // For mt_dop, there are two scheduling modes. The first is the normal mode + // that uses a shared queue. For that mode, there is no reason to do anything + // special about assigning scan ranges to fragment instances, because they will + // all be placed in a single queue at runtime. The other is deterministic + // scheduling that does not use the shared queue. In this mode, no rebalancing + // takes place at runtime, so balancing the scan ranges among the fragment + // instances is important. For this mode, use the longest processing time (LPT) + // algorithm. The deterministic mode is used for tuple caching. + bool use_lpt = scan_node.__isset.hdfs_scan_node + && scan_node.hdfs_scan_node.__isset.use_mt_scan_node + && scan_node.hdfs_scan_node.use_mt_scan_node + && scan_node.hdfs_scan_node.__isset.deterministic_scanrange_assignment + && scan_node.hdfs_scan_node.deterministic_scanrange_assignment; per_scan_per_instance_ranges.back() = - AssignRangesToInstances(max_num_instances, scan_ranges_it->second); + AssignRangesToInstances(max_num_instances, &scan_ranges_it->second, use_lpt); DCHECK_LE(per_scan_per_instance_ranges.back().size(), max_num_instances); } @@ -746,18 +857,48 @@ void Scheduler::CreateCollocatedAndScanInstances(const ExecutorConfig& executor_ } vector<vector<ScanRangeParamsPB>> Scheduler::AssignRangesToInstances( - int max_num_instances, vector<ScanRangeParamsPB>& ranges) { + int max_num_instances, vector<ScanRangeParamsPB>* ranges, bool use_lpt) { DCHECK_GT(max_num_instances, 0); - int num_instances = min(max_num_instances, static_cast<int>(ranges.size())); + int num_instances = min(max_num_instances, static_cast<int>(ranges->size())); vector<vector<ScanRangeParamsPB>> per_instance_ranges(num_instances); if (num_instances < 2) { // Short-circuit the assignment algorithm for the single instance case. - per_instance_ranges[0] = ranges; + per_instance_ranges[0] = *ranges; } else { - int idx = 0; - for (auto& range : ranges) { - per_instance_ranges[idx].push_back(range); - idx = (idx + 1 == num_instances) ? 0 : idx + 1; + if (use_lpt) { + // We need to assign scan ranges to instances. We would like the assignment to be + // as even as possible, so that each instance does about the same amount of work. + // Use longest-processing time (LPT) algorithm, which is a good approximation of the + // optimal solution (there is a theoretic bound of ~4/3 of the optimal solution + // in the worst case). It also guarantees that at least one scan range is assigned + // to each instance. + // The LPT algorithm is straightforward: + // 1. sort the scan ranges to be assigned by descending weight. + // 2. assign each item to the instance with the least weight assigned so far. + vector<InstanceAssignment> instance_heap; + instance_heap.reserve(num_instances); + for (int i = 0; i < num_instances; ++i) { + instance_heap.emplace_back(InstanceAssignment{0, i}); + } + // The instance_heap vector was created in sorted order, so this is already a heap + // without needing a make_heap() call. + DCHECK(std::is_heap(instance_heap.begin(), instance_heap.end())); + std::sort(ranges->begin(), ranges->end(), ScanRangeComparator); + for (ScanRangeParamsPB& range : *ranges) { + per_instance_ranges[instance_heap[0].instance_idx].push_back(range); + instance_heap[0].weight += ScanRangeWeight(range); + pop_heap(instance_heap.begin(), instance_heap.end()); + push_heap(instance_heap.begin(), instance_heap.end()); + } + } else { + // When not using LPT, a simple round-robin is sufficient. The use case for non-LPT + // is when the ranges will be placed in a shared queue anyway, so the balancing + // is not as crucial. + int idx = 0; + for (auto& range : *ranges) { + per_instance_ranges[idx].push_back(range); + idx = (idx + 1 == num_instances) ? 0 : idx + 1; + } } } return per_instance_ranges; diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index 9abcab40b..d50142ac0 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -410,8 +410,11 @@ class Scheduler { /// to at most 'max_num_instances' fragment instances running on the same host. /// 'max_num_ranges' must be positive. Only returns non-empty vectors: if there are not /// enough ranges to create 'max_num_instances', fewer instances are assigned ranges. + /// 'use_lpt' determines whether this assigns scan ranges via the Longest Processing + /// Time algorithm. If false, this uses round-robin (which is cheaper). static std::vector<std::vector<ScanRangeParamsPB>> AssignRangesToInstances( - int max_num_instances, std::vector<ScanRangeParamsPB>& ranges); + int max_num_instances, std::vector<ScanRangeParamsPB>* ranges, + bool use_lpt = false); /// For each instance of fragment_state's input fragment, create a collocated /// instance for fragment_state's fragment. @@ -482,12 +485,21 @@ class Scheduler { > state->query_options().max_fs_writers); } + /// Comparator to order scan ranges for scheduling. This uses ScanRangeWeight(), but it + /// also compares other fields so that it is deterministic for scan ranges with the + /// same weight. This comparator only works when both scan ranges have the same storage + /// type: HDFS vs HDFS, Kudu vs Kudu, HBase vs HBase. Otherwise, it asserts. + static bool ScanRangeComparator(const ScanRangeParamsPB& a, + const ScanRangeParamsPB& b); + friend class impala::test::SchedulerWrapper; FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached); FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached); FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomDiskLocal); FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomRemote); FRIEND_TEST(SchedulerTest, TestMultipleFinstances); + FRIEND_TEST(SchedulerTest, TestMultipleFinstancesLPT); + FRIEND_TEST(SchedulerTest, TestScanRangeComparator); }; } diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index fa798c721..ba50fb9a7 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -350,6 +350,11 @@ struct THdfsScanNode { // For IcebergScanNodes that are the left children of an IcebergDeleteNode this stores // the node ID of the right child. 14: optional Types.TPlanNodeId deleteFileScanNodeId + + // Whether mt_dop should use deterministic scan range assignment. If true, + // each fragment instance has its own list of scan ranges. If false, + // the fragment instances use a shared queue. + 15: optional bool deterministic_scanrange_assignment } struct TDataSourceScanNode { @@ -711,9 +716,13 @@ struct TIcebergMetadataScanNode { } struct TTupleCacheNode { - // Cache key that includes a hashed representation of the entire subtree below - // this point in the plan. - 1: required string subtree_hash + // Compile-time cache key that includes a hashed representation of the entire subtree + // below this point in the plan. + 1: required string compile_time_key + // To calculate the per-fragment key, this keeps track of scan nodes that feed + // into this node. The TupleCacheNode will hash the scan ranges for its fragment + // at runtime. + 2: required list<i32> input_scan_node_ids; } // See PipelineMembership in the frontend for details. diff --git a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java index af4a508a1..be39266ca 100644 --- a/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java +++ b/fe/src/main/java/org/apache/impala/common/ThriftSerializationCtx.java @@ -19,8 +19,8 @@ package org.apache.impala.common; import org.apache.impala.analysis.SlotId; import org.apache.impala.analysis.TupleId; -import org.apache.impala.catalog.FeTable; import org.apache.impala.planner.TupleCacheInfo; +import org.apache.impala.planner.HdfsScanNode; /** * The Thrift serialization functions need to adjust output based on whether the @@ -80,13 +80,14 @@ public class ThriftSerializationCtx { } /** - * registerTable() should be called for any table that is referenced from a PlanNode - * that participates in tuple caching. In practice, this is only used for HDFS tables - * at the moment. + * registerInputScanNode() is used to keep track of which HdfsScanNodes feed into a + * particular location for tuple caching. Tuple caching only supports HDFS tables at + * the moment, so this is limited to HdfsScanNode. See TupleCacheInfo for more + * information about how this is used. */ - public void registerTable(FeTable table) { + public void registerInputScanNode(HdfsScanNode hdfsScanNode) { if (isTupleCache()) { - tupleCacheInfo_.registerTable(table); + tupleCacheInfo_.registerInputScanNode(hdfsScanNode); } } diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 593168bc6..f5a52f3ac 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -280,6 +280,11 @@ public class HdfsScanNode extends ScanNode { // Set in computeNodeResourceProfile(). private boolean useMtScanNode_; + // True if this scan node should do deterministic assignment of scan ranges to + // fragment instances when using mt_dop. If false, this uses the shared queue. + // This has no impact on mt_dop=0. + private boolean deterministicScanRangeAssignment_ = false; + // True if this is a scan that only returns partition keys and is only required to // return at least one of each of the distinct values of the partition keys. private final boolean isPartitionKeyScan_; @@ -1930,8 +1935,8 @@ public class HdfsScanNode extends ScanNode { protected void toThrift(TPlanNode msg, ThriftSerializationCtx serialCtx) { msg.hdfs_scan_node = new THdfsScanNode(serialCtx.translateTupleId( desc_.getId()).asInt(), new HashSet<>()); - // Register the table for this scan node so tuple caching knows about it. - serialCtx.registerTable(desc_.getTable()); + // Register this scan node as an input for tuple caching. + serialCtx.registerInputScanNode(this); if (replicaPreference_ != null) { msg.hdfs_scan_node.setReplica_preference(replicaPreference_); } @@ -1951,6 +1956,14 @@ public class HdfsScanNode extends ScanNode { msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_); } msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_); + // The reason we skip setting the deterministic scan range assignment field for + // tuple cache is that has not been set yet at this point. At the point we are + // computing the tuple cache key, it is always false and has no information. + // Even if it was set, it would not tell us anything about the result set. + if (!serialCtx.isTupleCache()) { + msg.hdfs_scan_node.setDeterministic_scanrange_assignment( + deterministicScanRangeAssignment_); + } Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == null)); if (countStarSlot_ != null) { msg.hdfs_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset()); @@ -1977,8 +1990,12 @@ public class HdfsScanNode extends ScanNode { } msg.hdfs_scan_node.setIs_partition_key_scan(isPartitionKeyScan_); - for (HdfsFileFormat format : fileFormats_) { - msg.hdfs_scan_node.addToFile_formats(format.toThrift()); + // Since the tuple cache will hash information about the scan ranges at runtime, + // it is not necessary to include the file formats in the hash. + if (!serialCtx.isTupleCache()) { + for (HdfsFileFormat format : fileFormats_) { + msg.hdfs_scan_node.addToFile_formats(format.toThrift()); + } } if (!overlapPredicateDescs_.isEmpty()) { @@ -2044,6 +2061,15 @@ public class HdfsScanNode extends ScanNode { 0, PrintUtils.printBytes(0))); } + // Add information about whether this uses deterministic scan range scheduling + // To avoid polluting the explain output, only add this if mt_dop>0 and + // deterministic scan range scheduling is enabled. + if (useMtScanNode_ && deterministicScanRangeAssignment_) { + output.append(detailPrefix) + .append(String.format("deterministic scan range assignment: %b\n", + deterministicScanRangeAssignment_)); + } + if (!conjuncts_.isEmpty()) { output.append(detailPrefix) .append(String.format("predicates: %s\n", @@ -2669,4 +2695,14 @@ public class HdfsScanNode extends ScanNode { @Override public boolean isTupleCachingImplemented() { return true; } + + public void setDeterministicScanRangeAssignment(boolean enabled) { + // Deterministic scan range assignment only applies when using mt_dop, + // but we set it unconditionally for simplicity. + deterministicScanRangeAssignment_ = enabled; + } + + public boolean usesDeterministicScanRangeAssignment() { + return deterministicScanRangeAssignment_; + } } diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java index 63f867cb2..fa065fa29 100644 --- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java +++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java @@ -17,8 +17,10 @@ package org.apache.impala.planner; +import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -107,6 +109,15 @@ public class TupleCacheInfo { private final IdGenerator<SlotId> translatedSlotIdGenerator_ = SlotId.createGenerator(); + // This tracks all the HdfsScanNodes that are inputs. This is used for several + // purposes: + // 1. Input scan nodes need to use deterministic scan range scheduling. + // 2. At runtime, the tuple cache needs to hash the input scan ranges, so this + // provides information about which scan nodes feed in. + // 3. In future, when tuple caching moves past exchanges, the exchange will need + // to hash the scan ranges of input scan nodes to generate the key. + private final List<HdfsScanNode> inputScanNodes_ = new ArrayList<HdfsScanNode>(); + // These fields accumulate partial results until finalizeHash() is called. private Hasher hasher_ = Hashing.murmur3_128().newHasher(); @@ -181,6 +192,9 @@ public class TupleCacheInfo { // and each contribution would be clear. hashTraceBuilder_.append(child.getHashTrace()); + // Merge the child's inputScanNodes_ + inputScanNodes_.addAll(child.inputScanNodes_); + // Incorporate the child's tuple references. This is creating a new translation // of TupleIds, because it will be incorporating multiple children. for (TupleId id : child.tupleTranslationMap_.keySet()) { @@ -241,6 +255,9 @@ public class TupleCacheInfo { tupleTranslationMap_.put(id, translatedTupleIdGenerator_.getNextId()); TupleDescriptor tupleDesc = descriptorTable_.getTupleDesc(id); + // This matches the behavior of DescriptorTable::toThrift() and skips + // non-materialized tuple descriptors. See comment in DescriptorTable::toThrift(). + if (!tupleDesc.isMaterialized()) return; if (incorporateIntoHash) { // Incorporate the tupleDescriptor into the hash boolean needs_table_id = @@ -250,8 +267,9 @@ public class TupleCacheInfo { hashThrift(thriftTupleDesc); } - // Go through the tuple's slots and add them - for (SlotDescriptor slotDesc : tupleDesc.getSlots()) { + // Go through the tuple's slots and add them. This matches the behavior of + // DescriptorTable::toThrift() and only serializes the materialized slots. + for (SlotDescriptor slotDesc : tupleDesc.getMaterializedSlots()) { // Assign a translated slot id and it to the map slotTranslationMap_.put(slotDesc.getId(), translatedSlotIdGenerator_.getNextId()); @@ -274,7 +292,7 @@ public class TupleCacheInfo { * designed to be called by scan nodes via the ThriftSerializationCtx. In future, * this will store information about the table's scan ranges. */ - public void registerTable(FeTable tbl) { + private void registerTable(FeTable tbl) { Preconditions.checkState(!(tbl instanceof FeView), "registerTable() only applies to base tables"); Preconditions.checkState(tbl != null, "Invalid null argument to registerTable()"); @@ -284,6 +302,28 @@ public class TupleCacheInfo { hashThrift(tblName); } + /** + * registerInputScanNode() is used to keep track of which HdfsScanNodes feed into a + * particular location for tuple caching. Tuple caching only supports HDFS tables at + * the moment, so this is limited to HdfsScanNode. Tuple caching uses this for + * multiple things: + * 1. HdfsScanNodes that feed into a TupleCacheNode need to be marked to use + * deterministic scheduling. + * 2. Each fragment instance needs to construct the fragment instance specific key + * based on the scan ranges it will process. To construct that, it needs to know + * which HdfsScanNodes feed into it. + * 3. There will be future uses when tuple caching extends past exchanges. + * + * Since this has all the information needed, it also calls registerTable() under + * the covers. + */ + public void registerInputScanNode(HdfsScanNode hdfsScanNode) { + registerTable(hdfsScanNode.getTupleDesc().getTable()); + inputScanNodes_.add(hdfsScanNode); + } + + public List<HdfsScanNode> getInputScanNodes() { return inputScanNodes_; } + /** * getLocalTupleId() converts a global TupleId to a local TupleId (i.e an id that is * not influenced by the structure of the rest of the query). Most users should access diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java index 1ef7f944d..d93145668 100644 --- a/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java +++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheNode.java @@ -17,6 +17,10 @@ package org.apache.impala.planner; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + import org.apache.impala.analysis.Analyzer; import org.apache.impala.common.ImpalaException; import org.apache.impala.thrift.TTupleCacheNode; @@ -37,8 +41,9 @@ import com.google.common.hash.HashCode; */ public class TupleCacheNode extends PlanNode { - protected String subtreeHash_; + protected String compileTimeKey_; protected String hashTrace_; + protected final List<Integer> inputScanNodeIds_ = new ArrayList<Integer>(); public TupleCacheNode(PlanNodeId id, PlanNode child) { super(id, "TUPLE CACHE"); @@ -48,8 +53,13 @@ public class TupleCacheNode extends PlanNode { TupleCacheInfo childCacheInfo = child.getTupleCacheInfo(); Preconditions.checkState(childCacheInfo.isEligible()); - subtreeHash_ = childCacheInfo.getHashString(); + compileTimeKey_ = childCacheInfo.getHashString(); hashTrace_ = childCacheInfo.getHashTrace(); + for (HdfsScanNode scanNode : childCacheInfo.getInputScanNodes()) { + // Inputs into the tuple cache need to use deterministic scan range assignment + scanNode.setDeterministicScanRangeAssignment(true); + inputScanNodeIds_.add(scanNode.getId().asInt()); + } } @Override @@ -75,7 +85,8 @@ public class TupleCacheNode extends PlanNode { Preconditions.checkState(!hasLimit(), "TupleCacheNode does not enforce limits itself and cannot have a limit set."); TTupleCacheNode tupleCacheNode = new TTupleCacheNode(); - tupleCacheNode.setSubtree_hash(subtreeHash_); + tupleCacheNode.setCompile_time_key(compileTimeKey_); + tupleCacheNode.setInput_scan_node_ids(inputScanNodeIds_); msg.setTuple_cache_node(tupleCacheNode); } @@ -101,22 +112,24 @@ public class TupleCacheNode extends PlanNode { TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); output.append(String.format("%s%s:%s\n", prefix, id_.toString(), displayName_)); - output.append(detailPrefix + "cache key: " + subtreeHash_ + "\n"); + output.append(detailPrefix + "cache key: " + compileTimeKey_ + "\n"); // For debuggability, always print the hash trace until the cache key calculation // matures. Print trace in chunks to avoid excessive wrapping and padding in // impala-shell. There are other explain lines at VERBOSE level that are // over 100 chars long so we limit the key chunk length similarly here. final int keyFormatWidth = 100; - for(int idx = 0; idx < hashTrace_.length(); idx += keyFormatWidth) { + for (int idx = 0; idx < hashTrace_.length(); idx += keyFormatWidth) { int stop_idx = Math.min(hashTrace_.length(), idx + keyFormatWidth); output.append(detailPrefix + "[" + hashTrace_.substring(idx, stop_idx) + "]\n"); } + List<String> input_scan_node_ids_strs = + inputScanNodeIds_.stream().map(Object::toString).collect(Collectors.toList()); + output.append(detailPrefix + "input scan node ids: " + + String.join(",", input_scan_node_ids_strs) + "\n"); return output.toString(); } - public String getSubtreeHash() { return subtreeHash_; } - @Override public void computeProcessingCost(TQueryOptions queryOptions) { processingCost_ = ProcessingCost.basicCost(getDisplayLabel(), getCardinality(), 0); diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java index 40424f662..1bce808b6 100644 --- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java +++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java @@ -151,6 +151,25 @@ public class TupleCacheTest extends PlannerTestBase { "functional.alltypes build where probe1.id = build.id"); } + @Test + public void testDeterministicScheduling() { + // Verify that the HdfsScanNode that feeds into a TupleCacheNode uses deterministic + // scan range scheduling. When there are more ways for locations to be cache + // ineligible, this test will be expanded to cover the case where scan nodes don't + // use deterministic scheduling. + List<PlanNode> cacheEligibleNodes = + getCacheEligibleNodes("select id from functional.alltypes where int_col = 500"); + for (PlanNode node : cacheEligibleNodes) { + // The HdfsScanNode for this query will have determinstic scan range assignment set + // This test uses mt_dop=0, so the value wouldn't matter for execution, but it + // still verifies that it is set properly. + if (node instanceof HdfsScanNode) { + HdfsScanNode hdfsScanNode = (HdfsScanNode) node; + assertTrue(hdfsScanNode.usesDeterministicScanRangeAssignment()); + } + } + } + protected List<PlanNode> getCacheEligibleNodes(String query) { List<PlanFragment> plan = getPlan(query); PlanNode planRoot = plan.get(0).getPlanRoot(); diff --git a/tests/custom_cluster/test_tuple_cache.py b/tests/custom_cluster/test_tuple_cache.py index af65655af..df5a3aea3 100644 --- a/tests/custom_cluster/test_tuple_cache.py +++ b/tests/custom_cluster/test_tuple_cache.py @@ -22,8 +22,10 @@ import random import string from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.test_vector import ImpalaTestDimension TABLE_LAYOUT = 'name STRING, age INT, address STRING' +CACHE_START_ARGS = "--tuple_cache_dir=/tmp --log_level=2" # Generates a random table entry of at least 15 bytes. @@ -42,15 +44,27 @@ def assertCounters(profile, num_hits, num_halted, num_skipped): assert "NumTupleCacheSkipped: {0} ".format(num_skipped) in profile -class TestTupleCache(CustomClusterTestSuite): +def get_cache_keys(profile): + cache_keys = [] + for line in profile.splitlines(): + if "Combined Key:" in line: + key = line.split(":")[1].strip() + cache_keys.append(key) + return cache_keys + + +def assert_deterministic_scan(profile): + assert "deterministic scan range assignment: true" in profile + + +class TestTupleCacheBase(CustomClusterTestSuite): @classmethod def get_workload(cls): return 'functional-query' - CACHE_START_ARGS = "--tuple_cache_dir=/tmp --log_level=2" - - def cached_query(self, query): - return self.execute_query(query, {"ENABLE_TUPLE_CACHE": "TRUE", "MT_DOP": "1"}) + def cached_query(self, query, mt_dop=1): + return self.execute_query(query, + {"ENABLE_TUPLE_CACHE": "TRUE", "MT_DOP": str(mt_dop)}) def cached_query_w_debugaction(self, query, debugaction): query_opts = { @@ -63,10 +77,20 @@ class TestTupleCache(CustomClusterTestSuite): # Generates a table containing at least <scale> KB of data. def create_table(self, fq_table, scale=1): self.cached_query("CREATE TABLE {0} ({1})".format(fq_table, TABLE_LAYOUT)) + # To make the rows distinct, we keep using a different seed for table_value + global_index = 0 for _ in range(scale): - values = [table_value(i) for i in range(70)] + values = [table_value(i) for i in range(global_index, global_index + 70)] self.cached_query("INSERT INTO {0} VALUES ({1})".format( fq_table, "), (".join(values))) + global_index += 70 + + # Helper function to get a tuple cache metric from a single impalad. + def get_tuple_cache_metric(self, impalaservice, suffix): + return impalaservice.get_metric_value('impala.tuple-cache.' + suffix) + + +class TestTupleCache(TestTupleCacheBase): @CustomClusterTestSuite.with_args(cluster_size=1) @pytest.mark.execute_serially @@ -150,7 +174,8 @@ class TestTupleCache(CustomClusterTestSuite): # Case 1: fail during Open() result = self.cached_query_w_debugaction(query, "TUPLE_FILE_READER_OPEN:[email protected]") assert result.success - assert result.data == result1.data + # Do an unordered compare (the rows are unique) + assert set(result.data) == set(result1.data) # Not a hit assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=1) @@ -158,7 +183,8 @@ class TestTupleCache(CustomClusterTestSuite): result = self.cached_query_w_debugaction(query, "TUPLE_FILE_READER_FIRST_GETNEXT:[email protected]") assert result.success - assert result.data == result1.data + # Do an unordered compare (the rows are unique) + assert set(result.data) == set(result1.data) # Technically, this is a hit assertCounters(result.runtime_profile, num_hits=1, num_halted=0, num_skipped=0) @@ -173,3 +199,231 @@ class TestTupleCache(CustomClusterTestSuite): hit_error = True assert hit_error + + +class TestTupleCacheRuntimeKeys(TestTupleCacheBase): + + @classmethod + def add_test_dimensions(cls): + super(TestTupleCacheRuntimeKeys, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('mt_dop', *[0, 1, 2])) + + @CustomClusterTestSuite.with_args( + start_args=CACHE_START_ARGS, cluster_size=1) + @pytest.mark.execute_serially + def test_scan_range_basics(self, vector, unique_database): + """ + This tests that adding/removing files to a table results in different keys. + This runs on a single node with mt_dop=0 or mt_dop=1, so it is the simplest + test. + """ + mt_dop = vector.get_value('mt_dop') + # To keep this simple, we skip mt_dop > 1. + if mt_dop > 1: + pytest.skip() + fq_table = "{0}.scan_range_basics_mtdop{1}".format(unique_database, mt_dop) + query = "SELECT * from {0}".format(fq_table) + + # Create an empty table + self.create_table(fq_table, scale=0) + + # When there are no scan ranges, then fragment instance key is 0. This is + # somewhat of a toy case and we probably want to avoid caching in this + # case. Nonetheless, it is a good sanity check. + empty_result = self.cached_query(query, mt_dop=mt_dop) + cache_keys = get_cache_keys(empty_result.runtime_profile) + assert len(cache_keys) == 1 + empty_table_compile_key, empty_table_finst_key = cache_keys[0].split("_") + assert empty_table_finst_key == "0" + assert len(empty_result.data) == 0 + if mt_dop > 0: + assert_deterministic_scan(empty_result.runtime_profile) + + # Insert a row, which creates a file / scan range + self.cached_query("INSERT INTO {0} VALUES ({1})".format( + fq_table, table_value(0))) + + # Now, there is a scan range, so the fragment instance key should be non-zero. + one_file_result = self.cached_query(query, mt_dop=mt_dop) + cache_keys = get_cache_keys(one_file_result.runtime_profile) + assert len(cache_keys) == 1 + one_file_compile_key, one_file_finst_key = cache_keys[0].split("_") + assert one_file_finst_key != "0" + # This should be a cache miss + assertCounters(one_file_result.runtime_profile, 0, 0, 0) + assert len(one_file_result.data) == 1 + if mt_dop > 0: + assert_deterministic_scan(one_file_result.runtime_profile) + + # The new scan range did not change the compile-time key + assert empty_table_compile_key == one_file_compile_key + + # Insert another row, which creates a file / scan range + self.cached_query("INSERT INTO {0} VALUES ({1})".format( + fq_table, table_value(1))) + + # There is a second scan range, so the fragment instance key should change again + two_files_result = self.cached_query(query, mt_dop=mt_dop) + cache_keys = get_cache_keys(two_files_result.runtime_profile) + assert len(cache_keys) == 1 + two_files_compile_key, two_files_finst_key = cache_keys[0].split("_") + assert two_files_finst_key != "0" + assertCounters(two_files_result.runtime_profile, 0, 0, 0) + assert len(two_files_result.data) == 2 + assert one_file_finst_key != two_files_finst_key + overlapping_rows = set(one_file_result.data).intersection(set(two_files_result.data)) + assert len(overlapping_rows) == 1 + if mt_dop > 0: + assert_deterministic_scan(two_files_result.runtime_profile) + + # The new scan range did not change the compile-time key + assert one_file_compile_key == two_files_compile_key + + # Invalidate metadata and rerun the last query. The keys should stay the same. + self.cached_query("invalidate metadata") + rerun_two_files_result = self.cached_query(query, mt_dop=mt_dop) + # Verify that this is a cache hit + assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0) + cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile) + assert len(cache_keys) == 1 + rerun_two_files_compile_key, rerun_two_files_finst_key = cache_keys[0].split("_") + assert rerun_two_files_finst_key == two_files_finst_key + assert rerun_two_files_compile_key == two_files_compile_key + assert rerun_two_files_result.data == two_files_result.data + + @CustomClusterTestSuite.with_args( + start_args=CACHE_START_ARGS, cluster_size=1) + @pytest.mark.execute_serially + def test_scan_range_partitioned(self, vector, unique_database): + """ + This tests a basic partitioned case where the query is identical except that + it operates on different partitions (and thus different scan ranges) + This runs on a single node with mt_dop=0 or mt_dop=1 to keep it simple. + """ + mt_dop = vector.get_value('mt_dop') + # To keep this simple, we skip mt_dop > 1. + if mt_dop > 1: + pytest.skip() + year2009_result = self.cached_query( + "select * from functional.alltypes where year=2009", mt_dop=mt_dop) + cache_keys = get_cache_keys(year2009_result.runtime_profile) + assert len(cache_keys) == 1 + year2009_compile_key, year2009_finst_key = cache_keys[0].split("_") + + year2010_result = self.cached_query( + "select * from functional.alltypes where year=2010", mt_dop=mt_dop) + cache_keys = get_cache_keys(year2010_result.runtime_profile) + assert len(cache_keys) == 1 + year2010_compile_key, year2010_finst_key = cache_keys[0].split("_") + # This should be a cache miss + assertCounters(year2010_result.runtime_profile, 0, 0, 0) + + # The year=X predicate is on a partition column, so it is enforced by pruning + # partitions and doesn't carry through to execution. The compile keys for + # the two queries are the same, but the fragment instance keys are different due + # to the different scan ranges from different partitions. + assert year2009_compile_key == year2010_compile_key + assert year2009_finst_key != year2010_finst_key + # Verify that the results are completely different + year2009_result_set = set(year2009_result.data) + year2010_result_set = set(year2010_result.data) + overlapping_rows = year2009_result_set.intersection(year2010_result_set) + assert len(overlapping_rows) == 0 + assert year2009_result.data[0].find("2009") != -1 + assert year2009_result.data[0].find("2010") == -1 + assert year2010_result.data[0].find("2010") != -1 + assert year2010_result.data[0].find("2009") == -1 + + @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS) + @pytest.mark.execute_serially + def test_scan_range_distributed(self, vector, unique_database): + """ + This tests the distributed case where there are multiple fragment instances + processing different scan ranges. Each fragment instance should have a + distinct cache key. When adding a scan range, at least one fragment instance + cache key should change. + """ + + mt_dop = vector.get_value('mt_dop') + fq_table = "{0}.scan_range_basics_mtdop{1}".format(unique_database, mt_dop) + query = "SELECT * from {0}".format(fq_table) + + # Create a table with several files so that we always have enough work for multiple + # fragment instances + self.create_table(fq_table, scale=20) + + # We run a simple select. This is running with multiple impalads, so there are + # always multiple fragment instances + before_result = self.cached_query(query, mt_dop=mt_dop) + cache_keys = get_cache_keys(before_result.runtime_profile) + expected_num_keys = 3 * max(mt_dop, 1) + assert len(cache_keys) == expected_num_keys + # Every cache key should be distinct, as the fragment instances are processing + # different data + unique_cache_keys = set(cache_keys) + assert len(unique_cache_keys) == expected_num_keys + # Every cache key has the same compile key + unique_compile_keys = set([key.split("_")[0] for key in unique_cache_keys]) + assert len(unique_compile_keys) == 1 + # Verify the cache metrics for each impalad. Since we started from scratch, the + # number of entries in the cache should be the same as the number of cache keys. + for impalad in self.cluster.impalads: + entries_in_use = self.get_tuple_cache_metric(impalad.service, "entries-in-use") + assert entries_in_use == max(mt_dop, 1) + if mt_dop > 0: + assert_deterministic_scan(before_result.runtime_profile) + + # Insert another row, which creates a file / scan range + # This uses a very large seed for table_value() to get a unique row that isn't + # already in the table. + self.cached_query("INSERT INTO {0} VALUES ({1})".format( + fq_table, table_value(1000000))) + + # Rerun the query with the extra scan range + after_insert_result = self.cached_query(query, mt_dop=mt_dop) + cache_keys = get_cache_keys(after_insert_result.runtime_profile) + expected_num_keys = 3 * max(mt_dop, 1) + assert len(cache_keys) == expected_num_keys + # Every cache key should be distinct, as the fragment instances are processing + # different data + after_insert_unique_cache_keys = set(cache_keys) + assert len(after_insert_unique_cache_keys) == expected_num_keys + # Every cache key has the same compile key + unique_compile_keys = \ + set([key.split("_")[0] for key in after_insert_unique_cache_keys]) + assert len(unique_compile_keys) == 1 + # Verify the cache metrics. We can do a more exact bound by looking at the total + # across all impalads. The lower bound for this is the number of unique cache + # keys across both queries we ran. The upper bound for the number of entries is + # double the expected number from the first run of the query. + # + # This is not the exact number, because cache key X could have run on executor 1 + # for the first query and on executor 2 for the second query. Even though it would + # appear as a single unique cache key, it is two different cache entries in different + # executors. + all_cache_keys = unique_cache_keys.union(after_insert_unique_cache_keys) + total_entries_in_use = 0 + for impalad in self.cluster.impalads: + entries_in_use = self.get_tuple_cache_metric(impalad.service, "entries-in-use") + assert entries_in_use >= max(mt_dop, 1) + assert entries_in_use <= (2 * max(mt_dop, 1)) + total_entries_in_use += entries_in_use + assert total_entries_in_use >= len(all_cache_keys) + if mt_dop > 0: + assert_deterministic_scan(after_insert_result.runtime_profile) + + # The extra scan range means that at least one fragment instance key changed + # Since scheduling can change completely with the addition of a single scan range, + # we can't assert that only one cache key changes. + changed_cache_keys = unique_cache_keys.symmetric_difference( + after_insert_unique_cache_keys) + assert len(changed_cache_keys) != 0 + + # Each row is distinct, so that makes it easy to verify that the results overlap + # except the second result contains one more row than the first result. + before_result_set = set(before_result.data) + after_insert_result_set = set(after_insert_result.data) + assert len(before_result_set) == 70 * 20 + assert len(before_result_set) + 1 == len(after_insert_result_set) + different_rows = before_result_set.symmetric_difference(after_insert_result_set) + assert len(different_rows) == 1
