This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 6121c4f7d IMPALA-12905: Disk-based tuple caching
6121c4f7d is described below
commit 6121c4f7d61fb9f2341cf14e1be3404325fb35b9
Author: Michael Smith <[email protected]>
AuthorDate: Tue Mar 12 16:58:35 2024 -0700
IMPALA-12905: Disk-based tuple caching
This implements on-disk caching for the tuple cache. The
TupleCacheNode uses the TupleFileWriter and TupleFileReader
to write and read back tuples from local files. The file format
uses RowBatch's standard serialization used for KRPC data streams.
The TupleCacheMgr is the daemon-level structure that coordinates
the state machine for cache entries, including eviction. When a
writer is adding an entry, it inserts an IN_PROGRESS entry before
starting to write data. This does not count towards cache capacity,
because the total size is not known yet. This IN_PROGRESS entry
prevents other writers from concurrently writing the same entry.
If the write is successful, the entry transitions to the COMPLETE
state and updates the total size of the entry. If the write is
unsuccessful and a new execution might succeed, then the entry is
removed. If the write is unsuccessful and won't succeed later
(e.g. if the total size of the entry exceeds the max size of an
entry), then it transitions to the TOMBSTONE state. TOMBSTONE
entries avoid the overhead of trying to write entries that are
too large.
Given these states, when a TupleCacheNode is doing its initial
Lookup() call, one of three things can happen:
1. It can find a COMPLETE entry and read it.
2. It can find an IN_PROGRESS/TOMBSTONE entry, which means it
cannot read or write the entry.
3. It finds no entry and inserts its own IN_PROGRESS entry
to start a write.
The tuple cache is configured using the tuple_cache parameter,
which is a combination of the cache directory and the capacity
similar to the data_cache parameter. For example, /data/0:100GB
uses directory /data/0 for the cache with a total capacity of
100GB. This currently supports a single directory, but it can
be expanded to multiple directories later if needed. The cache
eviction policy can be specified via the tuple_cache_eviction_policy
parameter, which currently supports LRU or LIRS. The tuple_cache
parameter cannot be specified if allow_tuple_caching=false.
This contains contributions from Michael Smith, Yida Wu,
and Joe McDonnell.
Testing:
- This adds basic custom cluster tests for the tuple cache.
Change-Id: I13a65c4c0559cad3559d5f714a074dd06e9cc9bf
Reviewed-on: http://gerrit.cloudera.org:8080/21171
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Kurt Deschler <[email protected]>
---
be/src/exec/CMakeLists.txt | 4 +
be/src/exec/parquet/parquet-page-reader.cc | 1 -
be/src/exec/tuple-cache-node.cc | 161 +++++++++++-
be/src/exec/tuple-cache-node.h | 38 ++-
be/src/exec/tuple-file-read-write-test.cc | 296 ++++++++++++++++++++++
be/src/exec/tuple-file-reader.cc | 184 ++++++++++++++
be/src/exec/tuple-file-reader.h | 75 ++++++
be/src/exec/tuple-file-writer.cc | 202 +++++++++++++++
be/src/exec/tuple-file-writer.h | 122 +++++++++
be/src/runtime/CMakeLists.txt | 3 +
be/src/runtime/exec-env.cc | 5 +
be/src/runtime/exec-env.h | 6 +
be/src/runtime/tuple-cache-mgr-test.cc | 306 ++++++++++++++++++++++
be/src/runtime/tuple-cache-mgr.cc | 394 +++++++++++++++++++++++++++++
be/src/runtime/tuple-cache-mgr.h | 176 +++++++++++++
bin/start-impala-cluster.py | 31 +++
common/thrift/metrics.json | 90 +++++++
tests/custom_cluster/test_tuple_cache.py | 175 +++++++++++++
18 files changed, 2253 insertions(+), 16 deletions(-)
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 63c9ac595..429f93c44 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -107,6 +107,8 @@ add_library(Exec
text-converter.cc
topn-node.cc
tuple-cache-node.cc
+ tuple-file-reader.cc
+ tuple-file-writer.cc
union-node.cc
unnest-node.cc
)
@@ -123,6 +125,7 @@ add_library(ExecTests STATIC
hash-table-test.cc
incr-stats-util-test.cc
read-write-util-test.cc
+ tuple-file-read-write-test.cc
zigzag-test.cc
)
add_dependencies(ExecTests gen-deps)
@@ -132,6 +135,7 @@ ADD_UNIFIED_BE_LSAN_TEST(zigzag-test ZigzagTest.*)
ADD_UNIFIED_BE_LSAN_TEST(hash-table-test HashTableTest.*)
ADD_UNIFIED_BE_LSAN_TEST(delimited-text-parser-test DelimitedTextParser.*)
ADD_UNIFIED_BE_LSAN_TEST(read-write-util-test ReadWriteUtil.*)
+ADD_UNIFIED_BE_LSAN_TEST(tuple-file-read-write-test TupleFileReadWriteTest.*)
# Exception to unified be tests: Custom main with global Frontend object
ADD_BE_LSAN_TEST(row-batch-list-test)
ADD_BE_LSAN_TEST(scratch-tuple-batch-test)
diff --git a/be/src/exec/parquet/parquet-page-reader.cc
b/be/src/exec/parquet/parquet-page-reader.cc
index 3caa3ebda..1d64d6e5e 100644
--- a/be/src/exec/parquet/parquet-page-reader.cc
+++ b/be/src/exec/parquet/parquet-page-reader.cc
@@ -235,7 +235,6 @@ std::ostream& operator<<(std::ostream& out, const
ParquetPageReader::State state
case ParquetPageReader::State::ToReadHeader: out << "ToReadHeader"; break;
case ParquetPageReader::State::ToReadData: out << "ToReadData"; break;
}
-
return out;
}
diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc
index a63f64765..fd5dea2be 100644
--- a/be/src/exec/tuple-cache-node.cc
+++ b/be/src/exec/tuple-cache-node.cc
@@ -19,13 +19,16 @@
#include "exec/tuple-cache-node.h"
#include "exec/exec-node-util.h"
+#include "exec/tuple-file-reader.h"
+#include "exec/tuple-file-writer.h"
+#include "runtime/exec-env.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
+#include "runtime/tuple-cache-mgr.h"
#include "util/runtime-profile-counters.h"
+#include "util/runtime-profile.h"
-// Global feature flag for tuple caching. If false, enable_tuple_cache cannot
be true
-// and the coordinator cannot produce plans with TupleCacheNodes.
-DEFINE_bool(allow_tuple_caching, false, "If false, tuple caching cannot be
used.");
+#include "common/names.h"
namespace impala {
@@ -42,8 +45,20 @@ TupleCacheNode::TupleCacheNode(
, subtree_hash_(pnode.tnode_->tuple_cache_node.subtree_hash) {
}
+TupleCacheNode::~TupleCacheNode() = default;
+
+Status TupleCacheNode::Prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(ExecNode::Prepare(state));
+ num_hits_counter_ = ADD_COUNTER(runtime_profile(), "NumTupleCacheHits",
TUnit::UNIT);
+ num_halted_counter_ =
+ ADD_COUNTER(runtime_profile(), "NumTupleCacheHalted", TUnit::UNIT);
+ num_skipped_counter_ =
+ ADD_COUNTER(runtime_profile(), "NumTupleCacheSkipped", TUnit::UNIT);
+ return Status::OK();
+}
+
Status TupleCacheNode::Open(RuntimeState* state) {
- SCOPED_TIMER(runtime_profile_->total_time_counter());
+ SCOPED_TIMER(runtime_profile()->total_time_counter());
ScopedOpenEventAdder ea(this);
RETURN_IF_ERROR(ExecNode::Open(state));
@@ -53,14 +68,59 @@ Status TupleCacheNode::Open(RuntimeState* state) {
return Status("Invalid tuple caching configuration:
enable_tuple_cache=false");
}
- RETURN_IF_ERROR(child(0)->Open(state));
+ TupleCacheMgr* tuple_cache_mgr = ExecEnv::GetInstance()->tuple_cache_mgr();
+ handle_ = tuple_cache_mgr->Lookup(subtree_hash_, true);
+ if (tuple_cache_mgr->IsAvailableForRead(handle_)) {
+ reader_ = make_unique<TupleFileReader>(
+ tuple_cache_mgr->GetPath(handle_), mem_tracker(), runtime_profile());
+ Status status = reader_->Open(state);
+ // Clear reader if it's not usable
+ if (!status.ok()) {
+ LOG(WARNING) << "Could not read cache entry for "
+ << tuple_cache_mgr->GetPath(handle_);
+ reader_.reset();
+ }
+ } else if (tuple_cache_mgr->IsAvailableForWrite(handle_)) {
+ writer_ = make_unique<TupleFileWriter>(tuple_cache_mgr->GetPath(handle_),
+ mem_tracker(), runtime_profile(), tuple_cache_mgr->MaxSize());
+ Status status = writer_->Open(state);
+ if (!status.ok()) {
+ LOG(WARNING) << "Could not write cache entry for "
+ << tuple_cache_mgr->GetPath(handle_);
+ tuple_cache_mgr->AbortWrite(move(handle_), false);
+ writer_.reset();
+ }
+ }
+
+ if (reader_) {
+ COUNTER_ADD(num_hits_counter_, 1);
+ tuple_cache_mgr->IncrementMetric(TupleCacheMgr::MetricType::HIT);
+ } else {
+ if (!writer_) {
+ // May be skipped due to any of:
+ // - the query requests caching but cache is disabled via startup option
+ // - another fragment is currently writing this cache entry
+ // - the cache entry is a tombstone to prevent retries for too large
entries
+ VLOG_FILE << "Tuple Cache: skipped for " << subtree_hash_;
+ COUNTER_ADD(num_skipped_counter_, 1);
+ tuple_cache_mgr->IncrementMetric(TupleCacheMgr::MetricType::SKIPPED);
+ }
+ tuple_cache_mgr->IncrementMetric(TupleCacheMgr::MetricType::MISS);
+ // No reader, so open the child.
+ RETURN_IF_ERROR(child(0)->Open(state));
+ }
+ // Claim reservation after the child has been opened to reduce the peak
reservation
+ // requirement.
+ if (!buffer_pool_client()->is_registered()) {
+ RETURN_IF_ERROR(ClaimBufferReservation(state));
+ }
return Status::OK();
}
Status TupleCacheNode::GetNext(
RuntimeState* state, RowBatch* output_row_batch, bool* eos) {
- SCOPED_TIMER(runtime_profile_->total_time_counter());
+ SCOPED_TIMER(runtime_profile()->total_time_counter());
ScopedGetNextEventAdder ea(this, eos);
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
RETURN_IF_CANCELLED(state);
@@ -70,7 +130,65 @@ Status TupleCacheNode::GetNext(
// which can happen in a subplan.
int num_rows_before = output_row_batch->num_rows();
- RETURN_IF_ERROR(child(0)->GetNext(state, output_row_batch, eos));
+ // If we have a Reader, return the next batch from it.
+ // Else GetNext from child, write to Writer, and return the batch.
+ if (reader_) {
+ Status status = reader_->GetNext(state, buffer_pool_client(),
output_row_batch, eos);
+ if (status.ok()) {
+ cached_rowbatch_returned_to_caller_ = true;
+ } else {
+ // If we have returned a cached row batch to the caller, then it is not
safe
+ // to try to get any rows from the child as they could be duplicates. Any
+ // error needs to end the query.
+ if (cached_rowbatch_returned_to_caller_) return status;
+
+ // We haven't returned a RowBatch to the caller yet, so we can recover
by aborting
+ // the read from the cache and fetching from the child. We won't try to
write to
+ // the cache.
+ LOG(WARNING) << "Unable to read cache file: " << status.GetDetail()
+ << "Falling back to regular non-cached path.";
+ reader_.reset();
+ // If reader_ is set, then the child was never opened and needs to be
opened now
+ RETURN_IF_ERROR(child(0)->Open(state));
+ RETURN_IF_ERROR(child(0)->GetNext(state, output_row_batch, eos));
+ }
+ } else {
+ RETURN_IF_ERROR(child(0)->GetNext(state, output_row_batch, eos));
+ if (writer_) {
+ Status status = writer_->Write(state, output_row_batch);
+ TupleCacheMgr* tuple_cache_mgr =
ExecEnv::GetInstance()->tuple_cache_mgr();
+ // If there was an error or we exceeded the file size limit, stop
caching but
+ // continue reading from the child node.
+ if (!status.ok()) {
+ if (writer_->ExceededMaxSize()) {
+ VLOG_FILE << "Tuple Cache entry for " << subtree_hash_
+ << " hit the maximum file size: " << status.GetDetail();
+ COUNTER_ADD(num_halted_counter_, 1);
+ tuple_cache_mgr->IncrementMetric(TupleCacheMgr::MetricType::HALTED);
+ writer_->Abort();
+ tuple_cache_mgr->AbortWrite(move(handle_), true);
+ } else {
+ LOG(WARNING) << "Unable to write cache file: " << status.GetDetail();
+ writer_->Abort();
+ tuple_cache_mgr->AbortWrite(move(handle_), false);
+ }
+ writer_.reset();
+ } else if (*eos) {
+ // If we hit end of stream, then we can complete the cache entry
+ // If the child did not reach end of stream, then it clearly isn't the
complete
+ // result set. This is currently the only way a cache entry can be
completed.
+ size_t bytes_written = writer_->BytesWritten();
+ Status status = writer_->Commit(state);
+ if (status.ok()) {
+ tuple_cache_mgr->CompleteWrite(move(handle_), bytes_written);
+ } else {
+ writer_->Abort();
+ tuple_cache_mgr->AbortWrite(move(handle_), false);
+ }
+ writer_.reset();
+ }
+ }
+ }
// Note: TupleCacheNode does not alter its child's output (or the equivalent
// output from the cache), so it does not enforce its own limit on the
output.
@@ -83,7 +201,34 @@ Status TupleCacheNode::GetNext(
return Status::OK();
}
-void TupleCacheNode::DebugString(int indentation_level, std::stringstream*
out) const {
+void TupleCacheNode::ReleaseResult() {
+ reader_.reset();
+ writer_.reset();
+ handle_.reset();
+}
+
+Status TupleCacheNode::Reset(RuntimeState* state, RowBatch* row_batch) {
+ // Reset() is not supported.
+ DCHECK(false) << "Internal error: Tuple cache nodes should not appear in
subplans.";
+ return Status("Internal error: Tuple cache nodes should not appear in
subplans.");
+}
+
+void TupleCacheNode::Close(RuntimeState* state) {
+ if (is_closed()) return;
+ // If we reach this point with an open writer_, then this cache entry is
invalid. We
+ // will delete the file and abort the write. This can happen if the query is
cancelled,
+ // if the query hits an error, or if a parent node has a limit and doesn't
complete
+ // fetching. This is intentionally restrictive.
+ if (writer_) {
+ TupleCacheMgr* tuple_cache_mgr = ExecEnv::GetInstance()->tuple_cache_mgr();
+ writer_->Abort();
+ tuple_cache_mgr->AbortWrite(move(handle_), false);
+ }
+ ReleaseResult();
+ ExecNode::Close(state);
+}
+
+void TupleCacheNode::DebugString(int indentation_level, stringstream* out)
const {
*out << string(indentation_level * 2, ' ');
*out << "TupleCacheNode(" << subtree_hash_;
ExecNode::DebugString(indentation_level, out);
diff --git a/be/src/exec/tuple-cache-node.h b/be/src/exec/tuple-cache-node.h
index 43401bb97..772202125 100644
--- a/be/src/exec/tuple-cache-node.h
+++ b/be/src/exec/tuple-cache-node.h
@@ -20,9 +20,13 @@
#include <string>
#include "exec/exec-node.h"
+#include "runtime/tuple-cache-mgr.h"
namespace impala {
+class TupleFileReader;
+class TupleFileWriter;
+
class TupleCachePlanNode : public PlanNode {
public:
Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
@@ -31,23 +35,43 @@ class TupleCachePlanNode : public PlanNode {
/// Node that caches rows produced by a child node.
///
-/// This is currently a stub implementation that simply returns the RowBatch
produced
-/// by its child node.
-///
-/// FUTURE:
-/// This node looks up the subtree_hash_ in the tuple cache. If an entry
exists, this
-/// reads rows from the cache rather than executing its child node. If the
entry does not
-/// exist, this will read rows from its child, write them to the cache, and
returns them.
+/// If the subtree_hash_ matches an existing cache entry, returns result rows
from the
+/// cache rather than from the child. Otherwise reads results from the child,
writes them
+/// to cache, and returns them.
+
class TupleCacheNode : public ExecNode {
public:
TupleCacheNode(ObjectPool* pool, const TupleCachePlanNode& pnode,
const DescriptorTbl& descs);
+ ~TupleCacheNode();
+ Status Prepare(RuntimeState* state) override;
Status Open(RuntimeState* state) override;
Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+ Status Reset(RuntimeState* state, RowBatch* row_batch) override;
+ void Close(RuntimeState* state) override;
void DebugString(int indentation_level, std::stringstream* out) const
override;
private:
const std::string subtree_hash_;
+
+ /// Number of results that were found in the tuple cache
+ RuntimeProfile::Counter* num_hits_counter_ = nullptr;
+ /// Number of results that were too large for the cache
+ RuntimeProfile::Counter* num_halted_counter_ = nullptr;
+ /// Number of results that skip the cache due to a tombstone
+ RuntimeProfile::Counter* num_skipped_counter_ = nullptr;
+
+ /// Whether any RowBatch from a cache file has been returned to a caller
+ /// It is possible to recover from an error reading a cache file if no
+ /// cached RowBatch has been returned to a caller.
+ bool cached_rowbatch_returned_to_caller_ = false;
+
+ void ReleaseResult();
+
+ /// Reader/Writer for caching
+ TupleCacheMgr::UniqueHandle handle_;
+ std::unique_ptr<TupleFileReader> reader_;
+ std::unique_ptr<TupleFileWriter> writer_;
};
}
diff --git a/be/src/exec/tuple-file-read-write-test.cc
b/be/src/exec/tuple-file-read-write-test.cc
new file mode 100644
index 000000000..678065df8
--- /dev/null
+++ b/be/src/exec/tuple-file-read-write-test.cc
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/filesystem.hpp>
+#include <gtest/gtest.h>
+
+#include "exec/tuple-file-reader.h"
+#include "exec/tuple-file-writer.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/test-env.h"
+#include "runtime/tuple.h"
+#include "runtime/tuple-row.h"
+#include "service/frontend.h"
+#include "testutil/desc-tbl-builder.h"
+#include "util/filesystem-util.h"
+
+#include "common/names.h"
+
+namespace filesystem = boost::filesystem;
+
+namespace impala {
+
+class TupleFileReadWriteTest : public ::testing::Test {
+public:
+ TupleFileReadWriteTest()
+ : mem_pool_(&tracker_), fe_(new Frontend()), test_env_(new TestEnv) {}
+
+ ~TupleFileReadWriteTest() {
+ mem_pool_.FreeAll();
+ }
+
+ void SetUp() override {
+ tmp_dir_ = "/tmp" / boost::filesystem::unique_path();
+ ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(tmp_dir_.string()));
+
+ test_env_->SetBufferPoolArgs(1, 1024);
+ ASSERT_OK(test_env_->Init());
+ ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
+ MemTracker* client_tracker =
+ pool_.Add(new MemTracker(-1, "client",
runtime_state_->instance_mem_tracker()));
+ ASSERT_OK(test_env_->exec_env()->buffer_pool()->RegisterClient(
+ "TupleFileReadWriteTest", nullptr,
runtime_state_->instance_buffer_reservation(),
+ client_tracker, std::numeric_limits<int64>::max(),
+ runtime_state_->runtime_profile(), &buffer_pool_client_));
+
+ // Create nullable row descriptor
+ DescriptorTblBuilder builder(fe_.get(), &pool_);
+ builder.DeclareTuple() << TYPE_INT;
+ DescriptorTbl* desc_tbl = builder.Build();
+
+ vector<bool> nullable_tuples = {false};
+ vector<TTupleId> tuple_id = {static_cast<TupleId>(0)};
+ nullable_row_desc_ = RowDescriptor{*desc_tbl, tuple_id, nullable_tuples};
+ }
+
+ void TearDown() override {
+ if (buffer_pool_client_.is_registered()) {
+
test_env_->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
+ }
+ ASSERT_OK(FileSystemUtil::RemovePaths({tmp_dir_.string()}));
+ }
+
+ string Path(string filename) const {
+ return (tmp_dir_ / filename).string();
+ }
+
+ string TempPath(const TupleFileWriter& writer) const {
+ return writer.TempPath();
+ }
+
+ unique_ptr<RowBatch> CreateBatch(int size = 10) {
+ unique_ptr<RowBatch> batch =
+ make_unique<RowBatch>(nullable_row_desc(), size, tracker());
+ batch->AddRows(size);
+ for (int i = 0; i < size; i++) {
+ TupleRow* row = batch->GetRow(i);
+ Tuple* tuple_mem = Tuple::Create(sizeof(char) + sizeof(int32_t),
mem_pool());
+ *reinterpret_cast<int32_t *>(tuple_mem->GetSlot(1)) = i;
+ tuple_mem->SetNotNull(NullIndicatorOffset(0, 1));
+ row->SetTuple(0, tuple_mem);
+ }
+ batch->CommitRows(size);
+ return batch;
+ }
+
+ MemTracker* tracker() { return &tracker_; }
+
+ MemPool* mem_pool() { return &mem_pool_; }
+
+ RowDescriptor* nullable_row_desc() { return &nullable_row_desc_; }
+
+ RuntimeState* runtime_state() { return runtime_state_; }
+
+ RuntimeProfile* profile() { return runtime_state_->runtime_profile(); }
+
+ BufferPool::ClientHandle* buffer_pool_client() { return
&buffer_pool_client_; }
+
+private:
+ MemTracker tracker_;
+ MemPool mem_pool_;
+ RowDescriptor nullable_row_desc_;
+
+ ObjectPool pool_;
+ scoped_ptr<Frontend> fe_;
+
+ // The temporary runtime environment used for the test.
+ boost::scoped_ptr<TestEnv> test_env_;
+ RuntimeState* runtime_state_;
+ BufferPool::ClientHandle buffer_pool_client_;
+ filesystem::path tmp_dir_;
+};
+
+TEST_F(TupleFileReadWriteTest, TestBadRead) {
+ TupleFileReader reader(Path("no-file"), tracker(), profile());
+ EXPECT_FALSE(reader.Open(runtime_state()).ok());
+}
+
+TEST_F(TupleFileReadWriteTest, TestWriteRead) {
+ string path = Path("a-file");
+ filesystem::remove(path);
+ TupleFileWriter writer(path, tracker(), profile());
+
+ Status status = writer.Open(runtime_state());
+ ASSERT_OK(status);
+
+ unique_ptr<RowBatch> batch = CreateBatch();
+ status = writer.Write(runtime_state(), batch.get());
+ ASSERT_OK(status);
+
+ // Reading before commit will fail.
+ TupleFileReader failed_reader(path, tracker(), profile());
+ EXPECT_FALSE(failed_reader.Open(runtime_state()).ok());
+
+ status = writer.Commit(runtime_state());
+ ASSERT_OK(status);
+ ASSERT_FALSE(filesystem::exists(TempPath(writer)));
+ ASSERT_TRUE(filesystem::exists(path));
+
+ TupleFileReader reader(path, tracker(), profile());
+ EXPECT_OK(reader.Open(runtime_state()));
+
+ RowBatch output_row_batch(nullable_row_desc(), 10, tracker());
+ bool eos = false;
+ status =
+ reader.GetNext(runtime_state(), buffer_pool_client(), &output_row_batch,
&eos);
+ EXPECT_OK(status);
+ EXPECT_TRUE(eos);
+ EXPECT_EQ(batch->num_rows(), output_row_batch.num_rows());
+}
+
+TEST_F(TupleFileReadWriteTest, TestEmptyWrite) {
+ string path = Path("empty-file");
+ filesystem::remove(path);
+ TupleFileWriter writer(path, tracker(), profile());
+
+ Status status = writer.Open(runtime_state());
+ ASSERT_OK(status);
+
+ RowBatch batch(nullable_row_desc(), 1, tracker());
+ status = writer.Write(runtime_state(), &batch);
+ ASSERT_OK(status);
+ status = writer.Commit(runtime_state());
+ ASSERT_OK(status);
+ ASSERT_TRUE(filesystem::exists(path));
+
+ // Read back the empty file and make sure the reader can handle it
+ TupleFileReader reader(path, tracker(), profile());
+ EXPECT_OK(reader.Open(runtime_state()));
+
+ RowBatch output_row_batch(nullable_row_desc(), 1, tracker());
+ bool eos = false;
+ status =
+ reader.GetNext(runtime_state(), buffer_pool_client(), &output_row_batch,
&eos);
+ EXPECT_OK(status);
+ EXPECT_TRUE(eos);
+ EXPECT_EQ(batch.num_rows(), output_row_batch.num_rows());
+}
+
+TEST_F(TupleFileReadWriteTest, TestDeletedFile) {
+ string path = Path("missing-file");
+ filesystem::remove(path);
+ TupleFileWriter writer(path, tracker(), profile());
+
+ Status status = writer.Open(runtime_state());
+ ASSERT_OK(status);
+
+ unique_ptr<RowBatch> batch = CreateBatch();
+ status = writer.Write(runtime_state(), batch.get());
+ ASSERT_OK(status);
+
+ filesystem::remove(TempPath(writer));
+ status = writer.Commit(runtime_state());
+ ASSERT_FALSE(status.ok());
+ ASSERT_FALSE(filesystem::exists(TempPath(writer)));
+ ASSERT_FALSE(filesystem::exists(path));
+
+ // Even if the writer is in a bad state, it can still call Abort().
+ writer.Abort();
+}
+
+TEST_F(TupleFileReadWriteTest, TestDestructorNoOpen) {
+ // Simple test to make sure we can destroy a TupleFileWriter that we never
opened.
+ string path = Path("no-open-file");
+ filesystem::remove(path);
+ TupleFileWriter writer(path, tracker(), profile());
+}
+
+TEST_F(TupleFileReadWriteTest, TestDestructorAbort) {
+ // If neither Abort() nor Commit() are called explicitly, then the
destructor for
+ // TupleFileWriter will run Abort().
+ string path = Path("destructor-file");
+ filesystem::remove(path);
+ unique_ptr<TupleFileWriter> writer =
+ make_unique<TupleFileWriter>(path, tracker(), profile());
+
+ Status status = writer->Open(runtime_state());
+ ASSERT_OK(status);
+
+ string temp_path = TempPath(*writer);
+ ASSERT_TRUE(filesystem::exists(temp_path));
+
+ writer.reset();
+ ASSERT_FALSE(filesystem::exists(temp_path));
+ ASSERT_FALSE(filesystem::exists(path));
+}
+
+TEST_F(TupleFileReadWriteTest, TestExceedMaxFileSize) {
+ string path = Path("exceed-max-size-file");
+ filesystem::remove(path);
+ // Limit the file to 20 bytes
+ size_t max_size = 20;
+
+ TupleFileWriter writer(path, tracker(), profile(), max_size);
+
+ Status status = writer.Open(runtime_state());
+ ASSERT_OK(status);
+
+ unique_ptr<RowBatch> batch = CreateBatch();
+ status = writer.Write(runtime_state(), batch.get());
+ ASSERT_FALSE(status.ok());
+ ASSERT_TRUE(status.GetDetail().find("exceed the maximum file size") != -1);
+}
+
+TEST_F(TupleFileReadWriteTest, TestExactMaxFileSize) {
+ // This tests that reaching exactly equal to the max file size is allowed.
+
+ // Run without a limit first to get the exact size that we will write.
+ string path = Path("no-limit-file");
+ filesystem::remove(path);
+ TupleFileWriter writer(path, tracker(), profile());
+
+ Status status = writer.Open(runtime_state());
+ ASSERT_OK(status);
+
+ unique_ptr<RowBatch> batch = CreateBatch();
+ status = writer.Write(runtime_state(), batch.get());
+ ASSERT_OK(status);
+
+ // Store the number of bytes written
+ size_t max_size = writer.BytesWritten();
+ writer.Abort();
+
+ // Now, run the same thing with the max size set to the number of bytes
written.
+ string path2 = Path("exact-max-size-file");
+ filesystem::remove(path2);
+ TupleFileWriter limited_writer(path2, tracker(), profile(), max_size);
+
+ status = limited_writer.Open(runtime_state());
+ ASSERT_OK(status);
+
+ status = limited_writer.Write(runtime_state(), batch.get());
+ ASSERT_OK(status);
+
+ // We reached exactly the max size, and everything is ok.
+ EXPECT_EQ(limited_writer.BytesWritten(), max_size);
+ status = limited_writer.Commit(runtime_state());
+ ASSERT_OK(status);
+}
+
+} // namespace impala
diff --git a/be/src/exec/tuple-file-reader.cc b/be/src/exec/tuple-file-reader.cc
new file mode 100644
index 000000000..02dd7f31c
--- /dev/null
+++ b/be/src/exec/tuple-file-reader.cc
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/tuple-file-reader.h"
+
+#include "gutil/strings/substitute.h"
+#include "kudu/util/array_view.h"
+#include "kudu/util/env.h"
+#include "kudu/util/slice.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/debug-util.h"
+#include "util/kudu-status-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+TupleFileReader::TupleFileReader(
+ const std::string& path, MemTracker* parent, RuntimeProfile* profile)
+ : path_(path),
+ tracker_(new MemTracker(-1, "TupleFileReader", parent)),
+ read_timer_(profile ? ADD_TIMER(profile, "TupleCacheReadTime") : nullptr),
+ deserialize_timer_(
+ profile ? ADD_TIMER(profile, "TupleCacheDeserializeTime") : nullptr) {}
+
+TupleFileReader::~TupleFileReader() {
+ // MemTracker expects an explicit close.
+ tracker_->Close();
+}
+
+Status TupleFileReader::Open(RuntimeState *state) {
+ kudu::RWFileOptions opts;
+ opts.mode = kudu::Env::OpenMode::MUST_EXIST;
+ KUDU_RETURN_IF_ERROR(
+ kudu::Env::Default()->NewRWFile(opts, path_, &reader_),
+ "Failed to open tuple cache file");
+ // Get the file size
+ KUDU_RETURN_IF_ERROR(reader_->Size(&file_size_),
+ "Failed to get size for the tuple cache file");
+ RETURN_IF_ERROR(DebugAction(state->query_options(),
"TUPLE_FILE_READER_OPEN"));
+ return Status::OK();
+}
+
+Status TupleFileReader::GetNext(RuntimeState *state,
+ BufferPool::ClientHandle* bpclient, RowBatch* output_row_batch, bool* eos)
{
+ if (offset_ == file_size_) {
+ *eos = true;
+ return Status::OK();
+ }
+ // Only the first rowbatch starts at the zero offset of the file.
+ // We use this for injecting errors for testing purposes.
+ bool first_rowbatch = offset_ == 0;
+ *eos = false;
+ SCOPED_TIMER(read_timer_);
+ // Each block starts with the sizes of the variable chunks of data:
+ // 1. The header size
+ // 2. The tuple data size
+ // 3. The tuple offsets size
+ size_t header_len;
+ size_t tuple_data_len;
+ size_t tuple_offsets_len;
+
+ vector<kudu::Slice> chunk_lens_slices = {
+ kudu::Slice(reinterpret_cast<const char*>(&header_len),
sizeof(header_len)),
+ kudu::Slice(reinterpret_cast<const char*>(&tuple_data_len),
+ sizeof(tuple_data_len)),
+ kudu::Slice(reinterpret_cast<const char*>(&tuple_offsets_len),
+ sizeof(tuple_offsets_len))};
+
+ KUDU_RETURN_IF_ERROR(reader_->ReadV(offset_,
+ kudu::ArrayView<kudu::Slice>(chunk_lens_slices)),
+ "Failed to read cache file");
+
+ if (header_len == 0 || tuple_data_len == 0 || tuple_offsets_len == 0) {
+ string err_msg = Substitute("Invalid data lengths at offset $0 in $1: "
+ "header_len=$2, tuple_data_len=$3, tuple_offsets_len=$4", offset_,
path_,
+ header_len, tuple_data_len, tuple_offsets_len);
+ DCHECK(false) << err_msg;
+ return Status(Substitute("Invalid tuple cache file: $0", err_msg));
+ }
+ offset_ += sizeof(header_len) + sizeof(tuple_data_len) +
sizeof(tuple_offsets_len);
+
+ // Now, we know the total size of the variable-length data, and we can read
+ // it in a single chunk.
+ size_t varlen_size = header_len + tuple_data_len + tuple_offsets_len;
+
+ // Sanity check: The varlen_size shouldn't be larger than the rest of the
file.
+ // This protects us from doing very large memory allocations if any of the
lengths
+ // are bogus.
+ if (offset_ + varlen_size > file_size_) {
+ string err_msg = Substitute("Invalid data lengths at offset $0 in $1
exceed "
+ "file size $2: header_len=$3, tuple_data_len=$4, tuple_offsets_len=$5",
+ offset_, path_, file_size_, header_len, tuple_data_len,
tuple_offsets_len);
+ DCHECK(false) << err_msg;
+ return Status(Substitute("Invalid tuple cache file: $0", err_msg));
+ }
+ std::unique_ptr<char []> varlen_data(new char[varlen_size]);
+
+ kudu::Slice header_slice = kudu::Slice(varlen_data.get(), header_len);
+ kudu::Slice tuple_data_slice =
+ kudu::Slice(varlen_data.get() + header_len, tuple_data_len);
+ kudu::Slice tuple_offsets_slice =
+ kudu::Slice(varlen_data.get() + header_len + tuple_data_len,
tuple_offsets_len);
+ std::vector<kudu::Slice> varlen_data_slices =
+ { header_slice, tuple_data_slice, tuple_offsets_slice };
+
+ KUDU_RETURN_IF_ERROR(reader_->ReadV(offset_,
+ kudu::ArrayView<kudu::Slice>(varlen_data_slices)),
+ "Failed to read tuple cache file");
+ offset_ += varlen_size;
+
+ RowBatchHeaderPB header;
+ // The header is at the start of the varlen data
+ if (!header.ParseFromArray(header_slice.data(), header_slice.size())) {
+ return Status(TErrorCode::INTERNAL_ERROR,
+ "Could not deserialize RowBatchHeaderPB from disk cache");
+ }
+
+ std::unique_ptr<RowBatch> row_batch;
+ {
+ SCOPED_TIMER(deserialize_timer_);
+ RETURN_IF_ERROR(RowBatch::FromProtobuf(output_row_batch->row_desc(),
header,
+ tuple_offsets_slice, tuple_data_slice, tracker_.get(), bpclient,
&row_batch));
+ }
+
+ DCHECK_EQ(output_row_batch->num_rows(), 0);
+ if (output_row_batch->capacity() < row_batch->num_rows()) {
+ string err_msg =
+ Substitute("Too many rows ($0) for the output row batch (capacity $1)",
+ row_batch->num_rows(), output_row_batch->capacity());
+ DCHECK(false) << err_msg;
+ return Status(Substitute("Invalid tuple cache file: $0", err_msg));
+ }
+
+ // For testing, we inject an error as late as possible in this function.
Nothing
+ // after this point returns not-OK status. If we can recover from this point,
+ // we can recover from previous points. There are two different cases: either
+ // we want an error on the first row batch produced or we want an error on
+ // a subsequent row batch.
+ if (first_rowbatch) {
+ RETURN_IF_ERROR(
+ DebugAction(state->query_options(),
"TUPLE_FILE_READER_FIRST_GETNEXT"));
+ } else {
+ RETURN_IF_ERROR(
+ DebugAction(state->query_options(),
"TUPLE_FILE_READER_SECOND_GETNEXT"));
+ }
+ // Set eos after any possibility of a not-OK status
+ if (offset_ == file_size_) {
+ *eos = true;
+ }
+ output_row_batch->AddRows(row_batch->num_rows());
+ for (int row = 0; row < row_batch->num_rows(); row++) {
+ TupleRow* src = row_batch->GetRow(row);
+ TupleRow* dest = output_row_batch->GetRow(row);
+ // if the input row is shorter than the output row, make sure not to leave
+ // uninitialized Tuple* around
+ output_row_batch->ClearRow(dest);
+ // this works as expected if rows from input_batch form a prefix of
+ // rows in output_batch
+ row_batch->CopyRow(src, dest);
+ }
+ output_row_batch->CommitRows(row_batch->num_rows());
+ row_batch->TransferResourceOwnership(output_row_batch);
+
+ return Status::OK();
+}
+
+} // namespace impala
diff --git a/be/src/exec/tuple-file-reader.h b/be/src/exec/tuple-file-reader.h
new file mode 100644
index 000000000..9e60c3ec1
--- /dev/null
+++ b/be/src/exec/tuple-file-reader.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <fstream>
+#include <memory>
+
+#include "runtime/bufferpool/buffer-pool.h"
+#include "util/runtime-profile.h"
+
+#include "common/status.h"
+
+namespace kudu {
+ class RWFile;
+}
+
+namespace impala {
+
+class RowBatch;
+class RuntimeState;
+
+/// The TupleFileReader reads tuple files produced by the TupleFileWriter.
This is used
+/// by the tuple cache for reading content from the cache on local disk. The
file format
+/// is based on the standard RowBatch serialization used for KRPC data
streams. The
+/// reader relies on the caller using the same RowBatch layout and batch size
as the
+/// writer.
+///
+/// In future, the file format should contain enough metadata to dump the
contents
+/// with no extra information.
+///
+/// See tuple-file-writer.h for more information.
+class TupleFileReader {
+public:
+ TupleFileReader(const std::string& path, MemTracker* parent, RuntimeProfile*
profile);
+
+ ~TupleFileReader();
+
+ // Open the file. This returns an error if there is any issue accessing the
file
+ Status Open(RuntimeState *state);
+
+ // Read a row batch from the file. Sets eos=true if it reaches the end of
the file.
+ Status GetNext(RuntimeState *state, BufferPool::ClientHandle* bpclient,
+ RowBatch* output_row_batch, bool* eos);
+
+ private:
+ // Path to read.
+ std::string path_;
+ // MemTracker for deserializing protobuf.
+ std::shared_ptr<MemTracker> tracker_;
+ // Total read time by the reader.
+ RuntimeProfile::Counter* read_timer_;
+ // Total time spent on deserialization.
+ RuntimeProfile::Counter* deserialize_timer_;
+
+ std::unique_ptr<kudu::RWFile> reader_;
+ size_t offset_ = 0;
+ size_t file_size_ = 0;
+};
+
+}
diff --git a/be/src/exec/tuple-file-writer.cc b/be/src/exec/tuple-file-writer.cc
new file mode 100644
index 000000000..e68d0ae30
--- /dev/null
+++ b/be/src/exec/tuple-file-writer.cc
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/tuple-file-writer.h"
+
+#include <boost/filesystem.hpp>
+
+#include "common/logging.h"
+#include "gutil/strings/substitute.h"
+#include "kudu/util/array_view.h"
+#include "kudu/util/env.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/outbound-row-batch.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "util/debug-util.h"
+#include "util/kudu-status-util.h"
+
+#include "common/names.h"
+
+namespace filesystem = boost::filesystem;
+
+namespace impala {
+
+static const char* UNIQUE_PATH_SUFFIX = ".%%%%";
+
+TupleFileWriter::TupleFileWriter(
+ std::string path, MemTracker* parent, RuntimeProfile* profile, size_t
max_file_size)
+ : path_(move(path)),
+ temp_suffix_(filesystem::unique_path(UNIQUE_PATH_SUFFIX).string()),
+ tracker_(new MemTracker(-1, "TupleFileWriter", parent)),
+ write_timer_(profile ? ADD_TIMER(profile, "TupleCacheWriteTime") :
nullptr),
+ serialize_timer_(profile ? ADD_TIMER(profile, "TupleCacheSerializeTime") :
nullptr),
+ max_file_size_(max_file_size) {}
+
+TupleFileWriter::~TupleFileWriter() {
+ if (state_ != State::Uninitialized) {
+ if (tmp_file_ != nullptr) {
+ DCHECK_EQ(state_, State::InProgress);
+ Abort();
+ }
+
+ // The final state is either committed or aborted. In either case, the
temporary file
+ // should be gone.
+ DCHECK(state_ == State::Committed || state_ == State::Aborted);
+ DCHECK(!kudu::Env::Default()->FileExists(TempPath()));
+ }
+ // MemTracker expects an explicit close.
+ if (tracker_) tracker_->Close();
+}
+
+Status TupleFileWriter::Open(RuntimeState* state) {
+ DCHECK_EQ(state_, State::Uninitialized);
+ VLOG_FILE << "Tuple Cache: Opening " << TempPath() << " for writing";
+ KUDU_RETURN_IF_ERROR(kudu::Env::Default()->NewWritableFile(TempPath(),
&tmp_file_),
+ "Failed to create tuple cache file");
+ RETURN_IF_ERROR(DebugAction(state->query_options(),
"TUPLE_FILE_WRITER_OPEN"));
+ state_ = State::InProgress;
+ return Status::OK();
+}
+
+Status TupleFileWriter::Write(RuntimeState* state, RowBatch* row_batch) {
+ DCHECK_EQ(state_, State::InProgress);
+ SCOPED_TIMER(write_timer_);
+ // serialize and write row batch
+ OutboundRowBatch out(make_shared<CharMemTrackerAllocator>(tracker_));
+ {
+ SCOPED_TIMER(serialize_timer_);
+ // Passing in nullptr for 'compression_scrtach' disables compression.
+ RETURN_IF_ERROR(row_batch->Serialize(&out, /* compression_scratch */
nullptr));
+ }
+
+ if (out.header()->num_rows() == 0) {
+ DCHECK_EQ(out.header()->uncompressed_size(), 0);
+ return Status::OK();
+ }
+
+ // Collect all of the pieces that we would want to write, then determine if
writing
+ // them would exceed the max file size.
+ std::string header_buf;
+ if (!out.header()->SerializeToString(&header_buf)) {
+ return Status(TErrorCode::INTERNAL_ERROR,
+ "Could not serialize RowBatchHeaderPB to string");
+ }
+ size_t header_len = header_buf.size();
+ DCHECK_GT(header_len, 0);
+ kudu::Slice tuple_data = out.TupleDataAsSlice();
+ kudu::Slice tuple_offsets = out.TupleOffsetsAsSlice();
+ size_t tuple_data_len = tuple_data.size();
+ size_t tuple_offsets_len = tuple_offsets.size();
+ DCHECK_GT(tuple_data_len, 0);
+ DCHECK_GT(tuple_offsets_len, 0);
+
+ // We write things in this order (sizes first, then the variable-sized data):
+ // 1. The size of the header
+ // 2. The size of the tuple data
+ // 3. The size of the tuple offsets
+ // 4. The serialized header
+ // 5. The tuple data
+ // 6. The tuple offsets
+ std::vector<kudu::Slice> slices = {
+ kudu::Slice(reinterpret_cast<const char*>(&header_len),
sizeof(header_len)),
+ kudu::Slice(reinterpret_cast<const char*>(&tuple_data_len),
+ sizeof(tuple_data_len)),
+ kudu::Slice(reinterpret_cast<const char*>(&tuple_offsets_len),
+ sizeof(tuple_offsets_len)),
+ kudu::Slice(header_buf),
+ kudu::Slice(tuple_data),
+ kudu::Slice(tuple_offsets)};
+
+ // Enforce the max file size
+ size_t num_bytes_to_write = 0;
+ for (auto slice : slices) {
+ num_bytes_to_write += slice.size();
+ }
+ if (BytesWritten() + num_bytes_to_write > max_file_size_) {
+ exceeded_max_size_ = true;
+ return Status(
+ Substitute("Write of size $0 would cause $1 to exceed the maximum file
size $2",
+ num_bytes_to_write, TempPath(), max_file_size_));
+ }
+
+ RETURN_IF_ERROR(DebugAction(state->query_options(),
"TUPLE_FILE_WRITER_WRITE"));
+
+ KUDU_RETURN_IF_ERROR(
+ tmp_file_->AppendV(kudu::ArrayView<const kudu::Slice>(slices)),
+ "Failed to write to cache file");
+
+ return Status::OK();
+}
+
+size_t TupleFileWriter::BytesWritten() const {
+ DCHECK(tmp_file_ != nullptr);
+ return tmp_file_->Size();
+}
+
+std::string TupleFileWriter::TempPath() const {
+ return path_ + temp_suffix_;
+}
+
+void TupleFileWriter::Abort() {
+ DCHECK_EQ(state_, State::InProgress);
+ DCHECK(tmp_file_ != nullptr);
+
+ kudu::Status status = tmp_file_->Close();
+ if (!status.ok()) {
+ LOG(WARNING) <<
+ Substitute("Failed to close file $0: $1", TempPath(),
status.ToString());
+ }
+
+ // Delete the file.
+ status = kudu::Env::Default()->DeleteFile(TempPath());
+ if (!status.ok()) {
+ LOG(WARNING) <<
+ Substitute("Failed to unlink $0: $1", TempPath(), status.ToString());
+ }
+ tmp_file_.reset();
+ state_ = State::Aborted;
+}
+
+Status TupleFileWriter::Commit(RuntimeState* state) {
+ DCHECK_EQ(state_, State::InProgress);
+ DCHECK(tmp_file_ != nullptr);
+
+ RETURN_IF_ERROR(DebugAction(state->query_options(),
"TUPLE_FILE_WRITER_COMMIT"));
+
+ KUDU_RETURN_IF_ERROR(tmp_file_->Sync(), "Failed to sync cache file");
+ KUDU_RETURN_IF_ERROR(tmp_file_->Close(), "Failed to close cache file");
+
+ std::string src = TempPath();
+ KUDU_RETURN_IF_ERROR(kudu::Env::Default()->RenameFile(src, path_),
+ "Failed to rename cache file");
+ tmp_file_.reset();
+ state_ = State::Committed;
+ return Status::OK();
+}
+
+std::ostream& operator<<(std::ostream& out, const TupleFileWriter::State&
state) {
+ switch (state) {
+ case TupleFileWriter::State::Uninitialized: out << "Uninitialized"; break;
+ case TupleFileWriter::State::InProgress: out << "InProgress"; break;
+ case TupleFileWriter::State::Committed: out << "Committed"; break;
+ case TupleFileWriter::State::Aborted: out << "Aborted"; break;
+ }
+ return out;
+}
+
+} // namespace impala
diff --git a/be/src/exec/tuple-file-writer.h b/be/src/exec/tuple-file-writer.h
new file mode 100644
index 000000000..b855fc384
--- /dev/null
+++ b/be/src/exec/tuple-file-writer.h
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <fstream>
+#include <limits>
+#include <memory>
+
+#include "util/runtime-profile.h"
+
+#include "common/status.h"
+
+namespace kudu {
+ class WritableFile;
+}
+
+namespace impala {
+
+class MemTracker;
+class RowBatch;
+class RuntimeState;
+class TupleReadWriteTest;
+
+/// The TupleFileWriter is used to serialize a stream of RowBatches to a local
file
+/// for the tuple cache. It uses the standard RowBatch serialization used for
KRPC
+/// data streams (i.e. RowBatch::Serialize()). The files can be read back
using the
+/// TupleFileReader.
+///
+/// The TupleFileWriter writes in a temporary location and then moves the file
+/// into its final location when Commit() is called. Commit() is the only way
that a
+/// file will persist over time. If the TupleFileWriter is destructed without
calling
+/// Commit(), it runs Abort() and any associated file is deleted. The user can
+/// proactively call Abort() to delete any associated files, but it is not
required.
+///
+/// The TupleFileWriter enforces a maximum file size and will fail Write()
calls that
+/// would exceed this limit. It provides a way for the caller to get how many
bytes
+/// have been written for accounting purposes.
+///
+/// Currently, the TupleFileWriter does not embed the actual tuple layout into
the
+/// file. It relies on the corresponding TupleFileReader reading with the same
+/// tuple layout. This will be modified later to embed a representation of the
tuple
+/// layout into the file.
+class TupleFileWriter {
+public:
+ TupleFileWriter(std::string path, MemTracker* parent, RuntimeProfile*
profile,
+ size_t max_file_size = std::numeric_limits<size_t>::max());
+ ~TupleFileWriter();
+
+ Status Open(RuntimeState* state);
+
+ // Writes a row batch to file. This holds no references to memory from the
RowBatch.
+ // If Write() returns a non-OK Status, it is not recoverable and the caller
should not
+ // call Write() or Commit().
+ Status Write(RuntimeState* state, RowBatch* row_batch);
+
+ bool ExceededMaxSize() const { return exceeded_max_size_; }
+
+ // Number of bytes written to file. Must be called before Commit/Abort.
+ size_t BytesWritten() const;
+
+ // Stop writing and delete any written data.
+ void Abort();
+
+ // Ensure data is available for future reads.
+ Status Commit(RuntimeState* state);
+
+protected:
+ friend class TupleFileReadWriteTest;
+
+ std::string TempPath() const;
+
+private:
+ // Destination path
+ std::string path_;
+ // Suffix for temporary filename during writing.
+ std::string temp_suffix_;
+ // MemTracker for OutboundRowBatches.
+ std::shared_ptr<MemTracker> tracker_;
+ // Total write time by the writer.
+ RuntimeProfile::Counter* write_timer_;
+ // Total time spent on serialization.
+ RuntimeProfile::Counter* serialize_timer_;
+ // Maximum size for the resulting file
+ size_t max_file_size_;
+ // True if the file reached the maximum size
+ bool exceeded_max_size_ = false;
+
+ // This writes to a temporary file, only moving it into the final location
with
+ // Commit(). tmp_file_ is the file abstraction used for writing the
temporary file.
+ std::unique_ptr<kudu::WritableFile> tmp_file_;
+
+ // The writer starts as UNINIT. Once it transitions to IN_PROGRESS in
Open(), it will
+ // be ABORTED unless the caller runs Commit().
+ enum class State {
+ Uninitialized,
+ InProgress,
+ Committed,
+ Aborted
+ };
+
+ friend
+ std::ostream& operator<<(std::ostream& out, const TupleFileWriter::State&
state);
+
+ State state_ = State::Uninitialized;
+};
+
+}
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 8d9f08eae..fb9f8c7c6 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -90,6 +90,7 @@ add_library(Runtime
timestamp-parse-util.cc
timestamp-value.cc
tuple.cc
+ tuple-cache-mgr.cc
tuple-row.cc
tmp-file-mgr.cc
)
@@ -118,6 +119,7 @@ add_library(RuntimeTests STATIC
thread-resource-mgr-test.cc
timestamp-test.cc
tmp-file-mgr-test.cc
+ tuple-cache-mgr-test.cc
)
add_dependencies(RuntimeTests gen-deps)
@@ -143,6 +145,7 @@ ADD_UNIFIED_BE_LSAN_TEST(decimal-test DecimalTest.*)
ADD_BE_LSAN_TEST(buffered-tuple-stream-test)
ADD_UNIFIED_BE_LSAN_TEST(hdfs-fs-cache-test "HdfsFsCacheTest.*")
ADD_UNIFIED_BE_LSAN_TEST(tmp-file-mgr-test "TmpFileMgrTest.*")
+ADD_UNIFIED_BE_LSAN_TEST(tuple-cache-mgr-test "TupleCacheMgrTest.*")
ADD_UNIFIED_BE_LSAN_TEST(row-batch-serialize-test "RowBatchSerializeTest.*")
# Exception to unified be tests: Custom main function with global Frontend
object
ADD_UNIFIED_BE_LSAN_TEST(runtime-filter-test "RuntimeFilterTest.*")
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index fb1ed7381..506547413 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -43,6 +43,7 @@
#include "runtime/query-exec-mgr.h"
#include "runtime/thread-resource-mgr.h"
#include "runtime/tmp-file-mgr.h"
+#include "runtime/tuple-cache-mgr.h"
#include "scheduling/admission-controller.h"
#include "scheduling/cluster-membership-mgr.h"
#include "scheduling/request-pool-service.h"
@@ -243,6 +244,7 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int
webserver_port,
frontend_(external_fe ? nullptr : new Frontend()),
async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8,
10000)),
query_exec_mgr_(new QueryExecMgr()),
+ tuple_cache_mgr_(new TupleCacheMgr(metrics_.get())),
rpc_metrics_(metrics_->GetOrCreateChildGroup("rpc")),
enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
external_fe_(external_fe),
@@ -390,6 +392,9 @@ Status ExecEnv::Init() {
LOG(INFO) << "CodeGen Cache is disabled.";
}
+ // Initialize the tuple cache
+ RETURN_IF_ERROR(tuple_cache_mgr_->Init());
+
LOG(INFO) << "Admit memory limit: "
<< PrettyPrinter::Print(admit_mem_limit_, TUnit::BYTES);
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 3a87df656..bee6e4d94 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -70,6 +70,7 @@ class StatestoreSubscriber;
class SystemStateInfo;
class ThreadResourceMgr;
class TmpFileMgr;
+class TupleCacheMgr;
class Webserver;
class CodeGenCache;
class TCatalogRegistration;
@@ -163,6 +164,8 @@ class ExecEnv {
CodeGenCache* codegen_cache() const { return codegen_cache_.get(); }
bool codegen_cache_enabled() const { return codegen_cache_ != nullptr; }
+ TupleCacheMgr* tuple_cache_mgr() const { return tuple_cache_mgr_.get(); }
+
const TNetworkAddress& configured_backend_address() const {
return configured_backend_address_;
}
@@ -270,6 +273,9 @@ class ExecEnv {
/// Singleton cache for codegen functions.
boost::scoped_ptr<CodeGenCache> codegen_cache_;
+ /// Singleton cache for tuple caching
+ boost::scoped_ptr<TupleCacheMgr> tuple_cache_mgr_;
+
/// Not owned by this class
ImpalaServer* impala_server_ = nullptr;
MetricGroup* rpc_metrics_ = nullptr;
diff --git a/be/src/runtime/tuple-cache-mgr-test.cc
b/be/src/runtime/tuple-cache-mgr-test.cc
new file mode 100644
index 000000000..e15644e05
--- /dev/null
+++ b/be/src/runtime/tuple-cache-mgr-test.cc
@@ -0,0 +1,306 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <future>
+#include <boost/filesystem.hpp>
+
+#include "gutil/strings/substitute.h"
+#include "runtime/tuple-cache-mgr.h"
+#include "testutil/gtest-util.h"
+#include "util/filesystem-util.h"
+
+#include "common/names.h"
+
+namespace filesystem = boost::filesystem;
+using std::async;
+using std::future;
+using std::launch;
+
+DECLARE_bool(cache_force_single_shard);
+
+namespace impala {
+
+class TupleCacheMgrTest : public ::testing::Test {
+public:
+
+ void SetUp() override {
+ cache_dir_ = ("/tmp" / boost::filesystem::unique_path()).string();
+ ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(cache_dir_));
+ }
+
+ void TearDown() override {
+ ASSERT_OK(FileSystemUtil::RemovePaths({cache_dir_}));
+ }
+
+ TupleCacheMgr GetCache(string cache_dir, string capacity = "1MB",
+ string eviction_policy = "LRU", uint8_t debug_pos = 0) {
+ string cache_config;
+ if (!cache_dir.empty()) {
+ cache_config = Substitute("$0:$1", cache_dir, capacity);
+ }
+ return TupleCacheMgr{cache_config, eviction_policy, &metrics_, debug_pos};
+ }
+
+ TupleCacheMgr GetCache() {
+ return GetCache(GetCacheDir());
+ }
+
+ TupleCacheMgr GetFailAllocateCache() {
+ return GetCache(GetCacheDir(), "1MB", "LRU", TupleCacheMgr::FAIL_ALLOCATE);
+ }
+
+ TupleCacheMgr GetFailInsertCache() {
+ return GetCache(GetCacheDir(), "1MB", "LRU", TupleCacheMgr::FAIL_INSERT);
+ }
+
+ std::string GetCacheDir() const { return cache_dir_; }
+
+ private:
+ std::string cache_dir_;
+ MetricGroup metrics_{"tuple-cache-test"};
+};
+
+TEST_F(TupleCacheMgrTest, Disabled) {
+ TupleCacheMgr cache = GetCache("");
+ ASSERT_OK(cache.Init());
+ TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key");
+ EXPECT_FALSE(cache.IsAvailableForRead(handle));
+ EXPECT_FALSE(cache.IsAvailableForWrite(handle));
+}
+
+TEST_F(TupleCacheMgrTest, TestMiss) {
+ TupleCacheMgr cache = GetCache();
+ ASSERT_OK(cache.Init());
+ TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key");
+ EXPECT_FALSE(cache.IsAvailableForRead(handle));
+ EXPECT_FALSE(cache.IsAvailableForWrite(handle));
+}
+
+TEST_F(TupleCacheMgrTest, TestMissAcquire) {
+ TupleCacheMgr cache = GetCache();
+ ASSERT_OK(cache.Init());
+ TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key", true);
+ EXPECT_FALSE(cache.IsAvailableForRead(handle));
+ EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+}
+
+TEST_F(TupleCacheMgrTest, TestFailAllocate) {
+ TupleCacheMgr cache = GetFailAllocateCache();
+ ASSERT_OK(cache.Init());
+ TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key", true);
+ EXPECT_FALSE(cache.IsAvailableForRead(handle));
+ EXPECT_FALSE(cache.IsAvailableForWrite(handle));
+}
+
+TEST_F(TupleCacheMgrTest, TestFailInsert) {
+ TupleCacheMgr cache = GetFailInsertCache();
+ ASSERT_OK(cache.Init());
+ TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key", true);
+ EXPECT_FALSE(cache.IsAvailableForRead(handle));
+ EXPECT_FALSE(cache.IsAvailableForWrite(handle));
+}
+
+TEST_F(TupleCacheMgrTest, TestHit) {
+ TupleCacheMgr cache = GetCache();
+ ASSERT_OK(cache.Init());
+ TupleCacheMgr::UniqueHandle handle = cache.Lookup("a_key", true);
+ EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+ cache.CompleteWrite(move(handle), 100);
+
+ handle = cache.Lookup("a_key", true);
+ EXPECT_TRUE(cache.IsAvailableForRead(handle));
+ EXPECT_FALSE(cache.IsAvailableForWrite(handle));
+ std::string expected_loc =
+ (filesystem::path(GetCacheDir()) / "tuple-cache-a_key").string();
+ std::string actual_loc = cache.GetPath(handle);
+ EXPECT_EQ(expected_loc, actual_loc);
+}
+
+TEST_F(TupleCacheMgrTest, TestTombstone) {
+ // Create and immediately tombstone an entry.
+ TupleCacheMgr cache = GetCache();
+ ASSERT_OK(cache.Init());
+ TupleCacheMgr::UniqueHandle handle = cache.Lookup("tombstone_key", true);
+ EXPECT_FALSE(cache.IsAvailableForRead(handle));
+ EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+ cache.AbortWrite(move(handle), true);
+
+ // Subsequent lookups should find a tombstone.
+ handle = cache.Lookup("tombstone_key", true);
+ EXPECT_FALSE(cache.IsAvailableForRead(handle));
+ EXPECT_FALSE(cache.IsAvailableForWrite(handle));
+}
+
+TEST_F(TupleCacheMgrTest, TestConcurrentWrite) {
+ TupleCacheMgr cache = GetCache();
+ ASSERT_OK(cache.Init());
+ // Attempt to IsAvailableForWrite many times concurrently. Successes will be
returned.
+ vector<future<TupleCacheMgr::UniqueHandle>> results;
+ results.reserve(100);
+ for (int i = 0; i < 100; ++i) {
+ results.emplace_back(async(launch::async, [&cache]() {
+ TupleCacheMgr::UniqueHandle handle = cache.Lookup("concurrent_key",
true);
+ EXPECT_FALSE(cache.IsAvailableForRead(handle));
+ if (cache.IsAvailableForWrite(handle)) {
+ return handle;
+ }
+ return TupleCacheMgr::UniqueHandle{nullptr};
+ }));
+ }
+
+ // Wait for all threads to complete so we don't abort and allow another
success.
+ for (auto& result : results) {
+ result.wait();
+ }
+
+ int successes = 0;
+ for (auto& result : results) {
+ if (TupleCacheMgr::UniqueHandle handle = result.get(); handle) {
+ ++successes;
+ cache.AbortWrite(move(handle), false);
+ }
+ }
+ // Only one Acquire should succeed.
+ EXPECT_EQ(1, successes);
+}
+
+TEST_F(TupleCacheMgrTest, TestConcurrentTombstone) {
+ TupleCacheMgr cache = GetCache();
+ ASSERT_OK(cache.Init());
+ // Attempt to IsAvailableForWrite many times concurrently. Returns successes.
+ vector<future<bool>> results;
+ results.reserve(100);
+ for (int i = 0; i < 100; ++i) {
+ results.emplace_back(async(launch::async, [&cache]() {
+ TupleCacheMgr::UniqueHandle handle = cache.Lookup("concurrent_key",
true);
+ EXPECT_FALSE(cache.IsAvailableForRead(handle));
+ if (cache.IsAvailableForWrite(handle)) {
+ // Immediately tombstone for other threads.
+ cache.AbortWrite(move(handle), true);
+ return true;
+ }
+ return false;
+ }));
+ }
+
+ int successes = 0;
+ for (auto& result : results) {
+ result.wait();
+ if (result.get()) ++successes;
+ }
+ // Only one Acquire should succeed.
+ EXPECT_EQ(1, successes);
+}
+
+TEST_F(TupleCacheMgrTest, TestConcurrentAbort) {
+ TupleCacheMgr cache = GetCache();
+ ASSERT_OK(cache.Init());
+ // Attempt to IsAvailableForWrite many times concurrently. Returns successes.
+ vector<future<bool>> results;
+ results.reserve(100);
+ for (int i = 0; i < 100; ++i) {
+ results.emplace_back(async(launch::async, [&cache]() {
+ TupleCacheMgr::UniqueHandle handle = cache.Lookup("concurrent_key",
true);
+ EXPECT_FALSE(cache.IsAvailableForRead(handle));
+ if (cache.IsAvailableForWrite(handle)) {
+ // Immediately abort for other threads.
+ cache.AbortWrite(move(handle), false);
+ return true;
+ }
+ return false;
+ }));
+ }
+
+ int successes = 0;
+ for (auto& result : results) {
+ result.wait();
+ if (result.get()) ++successes;
+ }
+ // Multiple Acquires should succeed. This is somewhat probabilistic, but
odds of
+ // every other thread completing between the first
IsAvailableForWrite/AbortWrite
+ // seem very low.
+ EXPECT_GT(successes, 1);
+}
+
+TEST_F(TupleCacheMgrTest, TestConcurrentComplete) {
+ TupleCacheMgr cache = GetCache();
+ ASSERT_OK(cache.Init());
+ // Attempt to write many times concurrently. Returns successful reads of
that write.
+ vector<future<bool>> results;
+ results.reserve(100);
+ for (int i = 0; i < 100; ++i) {
+ results.emplace_back(async(launch::async, [&cache]() {
+ TupleCacheMgr::UniqueHandle handle = cache.Lookup("concurrent_key",
true);
+ if (cache.IsAvailableForRead(handle)) {
+ return true;
+ }
+ if (cache.IsAvailableForWrite(handle)) {
+ // Immediately complete for other threads.
+ cache.CompleteWrite(move(handle), 10);
+ }
+ return false;
+ }));
+ }
+
+ int successes = 0;
+ for (auto& result : results) {
+ result.wait();
+ if (result.get()) ++successes;
+ }
+ // At least one Read should succeed. This is somewhat probabilistic, but
odds of
+ // every other thread completing between the first
IsAvailableForWrite/CompleteWrite
+ // seem very low.
+ EXPECT_GT(successes, 0);
+}
+
+TEST_F(TupleCacheMgrTest, TestConcurrentEviction) {
+ FLAGS_cache_force_single_shard = true;
+ TupleCacheMgr cache = GetCache(GetCacheDir(), "1KB");
+ ASSERT_OK(cache.Init());
+ // Add many entries concurrently.
+ vector<future<void>> results;
+ results.reserve(100);
+ for (int i = 0; i < 100; ++i) {
+ results.emplace_back(async(launch::async, [&cache, i]() {
+ TupleCacheMgr::UniqueHandle handle =
+ cache.Lookup(Substitute("concurrent_key$0", i), true);
+ EXPECT_FALSE(cache.IsAvailableForRead(handle));
+ EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+ // Keep the 1st key in the cache.
+ cache.Lookup("concurrent_key0");
+ cache.CompleteWrite(move(handle), 20+i/2);
+ }));
+ }
+
+ for (auto& result : results) {
+ result.wait();
+ }
+ // The 1st key should still be in cache, 2nd should have been evicted.
+ TupleCacheMgr::UniqueHandle handle0 = cache.Lookup("concurrent_key0");
+ EXPECT_TRUE(cache.IsAvailableForRead(handle0));
+ TupleCacheMgr::UniqueHandle handle1 = cache.Lookup("concurrent_key1");
+ EXPECT_FALSE(cache.IsAvailableForRead(handle1));
+}
+
+TEST_F(TupleCacheMgrTest, TestMaxSize) {
+ FLAGS_cache_force_single_shard = true;
+ TupleCacheMgr cache = GetCache(GetCacheDir(), "1KB");
+ ASSERT_OK(cache.Init());
+ EXPECT_EQ(1024, cache.MaxSize());
+}
+
+} // namespace impala
diff --git a/be/src/runtime/tuple-cache-mgr.cc
b/be/src/runtime/tuple-cache-mgr.cc
new file mode 100644
index 000000000..e6b5d5a5f
--- /dev/null
+++ b/be/src/runtime/tuple-cache-mgr.cc
@@ -0,0 +1,394 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/tuple-cache-mgr.h"
+
+#include <boost/filesystem.hpp>
+
+#include "common/logging.h"
+#include "gutil/strings/split.h"
+#include "gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/path_util.h"
+#include "util/filesystem-util.h"
+#include "util/histogram-metric.h"
+#include "util/kudu-status-util.h"
+#include "util/parse-util.h"
+#include "util/pretty-printer.h"
+#include "util/test-info.h"
+
+#include "common/names.h"
+
+namespace filesystem = boost::filesystem;
+
+using kudu::JoinPathSegments;
+using strings::SkipEmpty;
+using strings::Split;
+
+// We ensure each impalad process gets a unique directory because currently
cache keys
+// can be shared across instances but represent different scan ranges.
+DEFINE_string(tuple_cache, "", "The configuration string for the tuple cache. "
+ "The default is the empty string, which disables the tuple cache. The
configuration "
+ "string is expected to be a directory followed by a ':' and a capacity
quota. "
+ "For example, /data/0:1TB means the cache may use 1TB in /data/0. Please
note that "
+ "each Impala daemon on a host must have a unique cache directory.");
+DEFINE_string(tuple_cache_eviction_policy, "LRU",
+ "(Advanced) The cache eviction policy to use for the tuple cache. "
+ "Either 'LRU' (default) or 'LIRS' (experimental)");
+// Global feature flag for tuple caching. If false, enable_tuple_cache cannot
be true
+// and the coordinator cannot produce plans with TupleCacheNodes. The
tuple_cache
+// parameter also cannot be specified.
+DEFINE_bool(allow_tuple_caching, false, "If false, tuple caching cannot be
used.");
+
+namespace impala {
+
+// Minimum tuple cache capacity is 64MB.
+static const int64_t MIN_TUPLE_CACHE_CAPACITY_BYTES = 64L << 20;
+static const char* MIN_TUPLE_CACHE_CAPACITY_STR = "64MB";
+
+// Maximum tuple cache entry size for the purposes of sizing the histogram of
+// entry sizes. This does not currently constrain the actual entries.
+static constexpr int64_t STATS_MAX_TUPLE_CACHE_ENTRY_SIZE = 128L << 20;
+
+static const char* CACHE_FILE_PREFIX = "tuple-cache-";
+
+TupleCacheMgr::TupleCacheMgr(MetricGroup* metrics)
+ : TupleCacheMgr(FLAGS_tuple_cache, FLAGS_tuple_cache_eviction_policy,
metrics, 0) {}
+
+TupleCacheMgr::TupleCacheMgr(string cache_config, string eviction_policy_str,
+ MetricGroup* metrics, uint8_t debug_pos)
+ : cache_config_(move(cache_config)),
+ eviction_policy_str_(move(eviction_policy_str)),
+ debug_pos_(debug_pos),
+ tuple_cache_hits_(metrics->AddCounter("impala.tuple-cache.hits", 0)),
+ tuple_cache_misses_(metrics->AddCounter("impala.tuple-cache.misses", 0)),
+ tuple_cache_skipped_(metrics->AddCounter("impala.tuple-cache.skipped", 0)),
+ tuple_cache_halted_(metrics->AddCounter("impala.tuple-cache.halted", 0)),
+ tuple_cache_entries_evicted_(
+ metrics->AddCounter("impala.tuple-cache.entries-evicted", 0)),
+ tuple_cache_entries_in_use_(
+ metrics->AddGauge("impala.tuple-cache.entries-in-use", 0)),
+ tuple_cache_entries_in_use_bytes_(
+ metrics->AddGauge("impala.tuple-cache.entries-in-use-bytes", 0)),
+ tuple_cache_tombstones_in_use_(
+ metrics->AddGauge("impala.tuple-cache.tombstones-in-use", 0)),
+ tuple_cache_entry_size_stats_(metrics->RegisterMetric(
+ new HistogramMetric(MetricDefs::Get("impala.tuple-cache.entry-sizes"),
+ STATS_MAX_TUPLE_CACHE_ENTRY_SIZE, 3))) {}
+
+Status TupleCacheMgr::Init() {
+ if (cache_config_.empty()) {
+ LOG(INFO) << "Tuple Cache is disabled.";
+ return Status::OK();
+ }
+
+ // The expected form of the configuration string is: dir1:capacity
+ // Example: /tmp/data1:1TB
+ vector<string> all_cache_configs = Split(cache_config_, ":", SkipEmpty());
+ if (all_cache_configs.size() != 2) {
+ return Status(Substitute("Malformed data cache configuration $0",
cache_config_));
+ }
+
+ // Parse the capacity string to make sure it's well-formed
+ bool is_percent = false;
+ int64_t capacity = ParseUtil::ParseMemSpec(all_cache_configs[1],
&is_percent, 0);
+ if (is_percent) {
+ return Status(Substitute("Malformed tuple cache capacity configuration $0",
+ all_cache_configs[1]));
+ }
+ // For normal execution (not backend tests), impose a minimum size on the
+ // cache of 64MB.
+ if (!TestInfo::is_be_test() && capacity < MIN_TUPLE_CACHE_CAPACITY_BYTES) {
+ return Status(Substitute(
+ "Tuple cache capacity $0 is less than the minimum allowed capacity $1",
+ all_cache_configs[1], MIN_TUPLE_CACHE_CAPACITY_STR));
+ }
+
+ vector<string> cache_dirs = Split(all_cache_configs[0], ",", SkipEmpty());
+ if (cache_dirs.size() > 1) {
+ return Status(Substitute("Malformed tuple cache directory $0. "
+ "The tuple cache only supports a single directory.",
all_cache_configs[0]));
+ }
+ cache_dir_ = cache_dirs[0];
+
+ // Verify the validity of the path specified.
+ if (!FileSystemUtil::IsCanonicalPath(cache_dir_)) {
+ return Status(Substitute("$0 is not a canonical path", cache_dir_));
+ }
+ RETURN_IF_ERROR(FileSystemUtil::VerifyIsDirectory(cache_dir_));
+
+ // Verify we can create a file in the cache directory
+ filesystem::path path =
+ filesystem::path(cache_dir_) / Substitute("$0test", CACHE_FILE_PREFIX);
+ RETURN_IF_ERROR(FileSystemUtil::CreateFile(path.string()));
+
+ // Remove any existing cache files from the cache directory. This ensures
that the
+ // usage starts at zero. This also removes the test file we just wrote.
+ RETURN_IF_ERROR(DeleteExistingFiles());
+
+ // Check that there is enough space available for the specified cache size
+ uint64_t available_bytes;
+ RETURN_IF_ERROR(FileSystemUtil::GetSpaceAvailable(cache_dir_,
&available_bytes));
+ if (available_bytes < capacity) {
+ string err = Substitute("Insufficient space for $0. Required $1. Only $2
is "
+ "available", cache_dir_, PrettyPrinter::PrintBytes(capacity),
+ PrettyPrinter::PrintBytes(available_bytes));
+ LOG(ERROR) << err;
+ return Status(err);
+ }
+
+ // Verify the cache eviction policy
+ Cache::EvictionPolicy policy =
Cache::ParseEvictionPolicy(eviction_policy_str_);
+ if (policy != Cache::EvictionPolicy::LRU && policy !=
Cache::EvictionPolicy::LIRS) {
+ return Status(Substitute("Unsupported tuple cache eviction policy: $0",
+ eviction_policy_str_));
+ }
+
+ cache_.reset(NewCache(policy, capacity, "Tuple_Cache"));
+
+ RETURN_IF_ERROR(cache_->Init());
+
+ LOG(INFO) << "Tuple Cache initialized at " << cache_dir_
+ << " with capacity " << PrettyPrinter::Print(capacity,
TUnit::BYTES);
+ enabled_ = true;
+ return Status::OK();
+}
+
+// TupleCacheState tracks what operations can be performed on a cache entry,
+// and is stored as part of a UniqueHandle. The diagram below outlines
transitions
+// between states and what operations are allowed. State transitions are
atomic. State is
+// represented by [TupleCacheState, IsAvailableForRead()]. Cache lookup,
insert,
+// erase all rely on internal locking of the private Cache object.
+//
+// Lookup(acquire_state=false): If an entry exists return it; else return an
empty entry.
+// IsAvailableForWrite() always returns false.
+//
+// Lookup(acquire_state=true): If an entry exists return it
(IsAvailableForWrite()=false);
+// else acquire creation_lock_ and check if an
entry exists
+// again, if found return it
(IsAvailableForWrite()=false),
+// else create a new entry
(IsAvailableForWrite()=true).
+//
+// entry found
+// Lookup(acquire_state=true) ---> [ ... ] returns any of the states
below
+// |
+// | entry absent: create new entry
+// v CompleteWrite
+// [ IN_PROGRESS, false ] ---> [ COMPLETE, true ]
+// |
+// | AbortWrite
+// v tombstone=true
+// [ IN_PROGRESS, false ] ---> [ TOMBSTONE, false ]
+// |
+// | tombstone=false
+// v
+// [ IN_PROGRESS, false ] Scheduled for eviction, will be deleted once ref
count=0.
+//
+
+enum class TupleCacheState { IN_PROGRESS, TOMBSTONE, COMPLETE };
+
+// An entry consists of a TupleCacheEntry followed by a C-string for the path.
+struct TupleCacheEntry {
+ std::atomic<TupleCacheState> state{TupleCacheState::IN_PROGRESS};
+ size_t size = 0;
+};
+
+struct TupleCacheMgr::Handle {
+ Cache::UniqueHandle cache_handle;
+ bool is_writer = false;
+};
+
+
+void TupleCacheMgr::HandleDeleter::operator()(Handle* ptr) const { delete ptr;
}
+
+static uint8_t* getHandleData(const Cache* cache, TupleCacheMgr::Handle*
handle) {
+ DCHECK(handle->cache_handle != nullptr);
+ return cache->Value(handle->cache_handle).mutable_data();
+}
+
+static TupleCacheState GetState(const Cache* cache, TupleCacheMgr::Handle*
handle) {
+ uint8_t* data = getHandleData(cache, handle);
+ return reinterpret_cast<TupleCacheEntry*>(data)->state;
+}
+
+// Returns true if state was updated.
+static bool UpdateState(const Cache* cache, TupleCacheMgr::Handle* handle,
+ TupleCacheState requiredState, TupleCacheState newState) {
+ uint8_t* data = getHandleData(cache, handle);
+ return reinterpret_cast<TupleCacheEntry*>(data)->
+ state.compare_exchange_strong(requiredState, newState);
+}
+
+static void UpdateSize(Cache* cache, TupleCacheMgr::Handle* handle, size_t
size) {
+ uint8_t* data = getHandleData(cache, handle);
+ reinterpret_cast<TupleCacheEntry*>(data)->size = size;
+}
+
+static Cache::UniquePendingHandle CreateEntry(
+ Cache* cache, const Slice& key, const string& path) {
+ Cache::UniquePendingHandle pending =
+ cache->Allocate(key, sizeof(TupleCacheEntry) + path.size() + 1);
+ if (!pending) {
+ return pending;
+ }
+
+ // buf is used to store a TupleCacheEntry, followed by the path.
+ uint8_t* buf = cache->MutableValue(&pending);
+ new (buf) TupleCacheEntry{};
+ char* path_s = reinterpret_cast<char*>(buf + sizeof(TupleCacheEntry));
+ memcpy(path_s, path.data(), path.size());
+ // Null terminate because UniquePendingHandle doesn't provide access to size.
+ path_s[path.size()] = '\0';
+
+ return pending;
+}
+
+// If the entry exists, the Handle pins it so it doesn't go away, but the
entry may be in
+// any state (IN PROGRESS, TOMBSTONE, COMPLETE). If the entry doesn't exist and
+// acquire_write is true, it's created with the state IN_PROGRESS.
+TupleCacheMgr::UniqueHandle TupleCacheMgr::Lookup(
+ const Slice& key, bool acquire_write) {
+ if (!enabled_) return nullptr;
+
+ UniqueHandle handle{new Handle()};
+ if (Cache::UniqueHandle pos = cache_->Lookup(key); pos) {
+ handle->cache_handle = move(pos);
+ } else if (acquire_write) {
+ lock_guard<mutex> guard(creation_lock_);
+ // Retry lookup under the creation lock in case another thread added an
entry.
+ if (Cache::UniqueHandle pos = cache_->Lookup(key); pos) {
+ handle->cache_handle = move(pos);
+ } else {
+ // Entry not found, create a new one.
+ filesystem::path path =
+ filesystem::path(cache_dir_) / (CACHE_FILE_PREFIX + key.ToString());
+ Cache::UniquePendingHandle pending = CreateEntry(cache_.get(), key,
path.string());
+ if (UNLIKELY(!pending || debug_pos_ & DebugPos::FAIL_ALLOCATE)) {
+ VLOG_FILE << "Tuple Cache: CreateEntry failed for " << path;
+ return handle;
+ }
+
+ // Insert into the cache. If immediately evicted, evict_callback_
handles cleanup
+ // and decrements the counter.
+ tuple_cache_entries_in_use_->Increment(1);
+ Cache::UniqueHandle chandle = cache_->Insert(move(pending), this);
+ if (UNLIKELY(!chandle || debug_pos_ & DebugPos::FAIL_INSERT)) {
+ // This shouldn't happen normally because the cache is recency-based
and initial
+ // handle is small.
+ LOG(WARNING) << "Tuple Cache Entry was immediately evicted";
+ return handle;
+ }
+ VLOG_FILE << "Tuple Cache Entry created for " << path;
+ handle->cache_handle = move(chandle);
+ handle->is_writer = true;
+ }
+ }
+
+ return handle;
+}
+
+bool TupleCacheMgr::IsAvailableForRead(UniqueHandle& handle) const {
+ if (!handle || !handle->cache_handle) return false;
+ return TupleCacheState::COMPLETE == GetState(cache_.get(), handle.get());
+}
+
+bool TupleCacheMgr::IsAvailableForWrite(UniqueHandle& handle) const {
+ if (!handle || !handle->cache_handle) return false;
+ return handle->is_writer &&
+ TupleCacheState::IN_PROGRESS == GetState(cache_.get(), handle.get());
+}
+
+void TupleCacheMgr::CompleteWrite(UniqueHandle handle, size_t size) {
+ DCHECK(enabled_);
+ DCHECK(handle != nullptr && handle->cache_handle != nullptr);
+ DCHECK_LE(size, MaxSize());
+ DCHECK_GE(size, 0);
+ VLOG_FILE << "Tuple Cache: Complete " << GetPath(handle) << " (" << size <<
")";
+ CHECK(UpdateState(cache_.get(), handle.get(),
+ TupleCacheState::IN_PROGRESS, TupleCacheState::COMPLETE));
+ UpdateSize(cache_.get(), handle.get(), size);
+ cache_->UpdateCharge(handle->cache_handle, size);
+ tuple_cache_entries_in_use_bytes_->Increment(size);
+ tuple_cache_entry_size_stats_->Update(size);
+}
+
+void TupleCacheMgr::AbortWrite(UniqueHandle handle, bool tombstone) {
+ DCHECK(enabled_);
+ DCHECK(handle != nullptr && handle->cache_handle != nullptr);
+ if (tombstone) {
+ VLOG_FILE << "Tuple Cache: Tombstone " << GetPath(handle);
+ tuple_cache_tombstones_in_use_->Increment(1);
+ CHECK(UpdateState(cache_.get(), handle.get(),
+ TupleCacheState::IN_PROGRESS, TupleCacheState::TOMBSTONE));
+ } else {
+ // Remove the cache entry. Leaves state IN_PROGRESS so entry won't be
reused until
+ // successfully evicted.
+ DCHECK(TupleCacheState::IN_PROGRESS == GetState(cache_.get(),
handle.get()));
+ cache_->Erase(cache_->Key(handle->cache_handle));
+ }
+}
+
+const char* TupleCacheMgr::GetPath(UniqueHandle& handle) const {
+ DCHECK(enabled_);
+ DCHECK(handle != nullptr && handle->cache_handle != nullptr);
+ uint8_t* data = getHandleData(cache_.get(), handle.get());
+ return reinterpret_cast<const char*>(data + sizeof(TupleCacheEntry));
+}
+
+void TupleCacheMgr::EvictedEntry(Slice key, Slice value) {
+ const TupleCacheEntry* entry = reinterpret_cast<const
TupleCacheEntry*>(value.data());
+ if (TupleCacheState::TOMBSTONE != entry->state) {
+ DCHECK(tuple_cache_entries_evicted_ != nullptr);
+ DCHECK(tuple_cache_entries_in_use_ != nullptr);
+ DCHECK(tuple_cache_entries_in_use_bytes_ != nullptr);
+ tuple_cache_entries_evicted_->Increment(1);
+ tuple_cache_entries_in_use_->Increment(-1);
+ tuple_cache_entries_in_use_bytes_->Increment(-entry->size);
+ DCHECK_GE(tuple_cache_entries_in_use_->GetValue(), 0);
+ DCHECK_GE(tuple_cache_entries_in_use_bytes_->GetValue(), 0);
+ } else {
+ DCHECK(tuple_cache_tombstones_in_use_ != nullptr);
+ tuple_cache_tombstones_in_use_->Increment(-1);
+ DCHECK_GE(tuple_cache_tombstones_in_use_->GetValue(), 0);
+ }
+ // Retrieve the path following TupleCacheEntry.
+ value.remove_prefix(sizeof(TupleCacheEntry));
+ value.truncate(value.size() - 1);
+ VLOG_FILE << "Tuple Cache: Evict " << key << " at " << value.ToString();
+ // Delete file on disk.
+ kudu::Status status = kudu::Env::Default()->DeleteFile(value.ToString());
+ if (!status.ok()) {
+ LOG(WARNING) <<
+ Substitute("Failed to unlink $0: $1", value.ToString(),
status.ToString());
+ }
+}
+
+Status TupleCacheMgr::DeleteExistingFiles() const {
+ vector<string> entries;
+ RETURN_IF_ERROR(FileSystemUtil::Directory::GetEntryNames(cache_dir_,
&entries, 0,
+ FileSystemUtil::Directory::EntryType::DIR_ENTRY_REG));
+ for (const string& entry : entries) {
+ if (entry.find(CACHE_FILE_PREFIX) == 0) {
+ const string file_path = JoinPathSegments(cache_dir_, entry);
+ KUDU_RETURN_IF_ERROR(kudu::Env::Default()->DeleteFile(file_path),
+ Substitute("Failed to delete old cache file $0", file_path));
+ LOG(INFO) << Substitute("Deleted old cache file $0", file_path);
+ }
+ }
+ return Status::OK();
+}
+
+} // namespace impala
diff --git a/be/src/runtime/tuple-cache-mgr.h b/be/src/runtime/tuple-cache-mgr.h
new file mode 100644
index 000000000..abed2ae40
--- /dev/null
+++ b/be/src/runtime/tuple-cache-mgr.h
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "util/cache/cache.h"
+#include "util/metrics.h"
+
+namespace impala {
+
+class HistogramMetric;
+class TupleReader;
+
+
+/// The TupleCacheMgr maintains per-daemon settings and metadata for the tuple
cache.
+/// This it used by the various TupleCacheNodes from queries to lookup the
cache
+/// entries or write cache entries. The TupleCacheMgr maintains the capacity
constraint
+/// by evicting entries as needed. Unlike the data cache, the tuple cache
maintains
+/// individual entries in separate files and the files have the cache key
incorporated
+/// into the file name.
+///
+/// There are a couple unique features for the TupleCacheMgr that makes it
distinct from
+/// other caches:
+/// 1. It inserts an entry into the cache immediately, even before it knows
the total
+/// size of the entry. It updates the size of the cache entry later when
the entry
+/// is completed. This allows the cache to avoid multiple writers trying to
create the
+/// same entry.
+/// 2. When a cache entry is aborted due to its size, the cache keeps a
tombstone record
+/// to prevent future writers from trying to write that entry. This reduces
the
+/// overhead of the tuple cache by avoiding writing entries that won't be
useful
+/// to the cache.
+/// For more information about the exact state transitions, see the diagram of
the states
+/// in tuple-cache-mgr.cc.
+class TupleCacheMgr : public Cache::EvictionCallback {
+public:
+ TupleCacheMgr(MetricGroup* metrics);
+ ~TupleCacheMgr() = default;
+
+ // Initialize the TupleCacheMgr. Must be called before any of the other APIs.
+ Status Init() WARN_UNUSED_RESULT;
+
+ /// Enum for metric type.
+ enum class MetricType {
+ HIT,
+ MISS,
+ SKIPPED,
+ HALTED,
+ };
+
+ struct Handle;
+ class HandleDeleter {
+ public:
+ void operator()(Handle*) const;
+ };
+
+ // UniqueHandle -- a wrapper around opaque Handle structure to facilitate
deletion.
+ typedef std::unique_ptr<Handle, HandleDeleter> UniqueHandle;
+
+ // Following methods need to be thread-safe. They primarily rely on Cache
locking.
+
+ // UniqueHandle lifecycle:
+ // 1. Lookup(acquire_write=true). Abandon caching if null.
+ // 2a. If IsAvailableForRead, start reading. END
+ // 2b. Else if IsAvailableForWrite, start writing.
+ // 3. If successful, CompleteWrite. Else AbortWrite. If repeating is also
expected to
+ // result in failure, set tombstone. END
+
+ /// Get a handle for the key. If acquire_write and no entry exists, takes a
creation
+ /// lock and creates a new entry. Null if cache is unavailable.
+ UniqueHandle Lookup(const Slice& key, bool acquire_write = false);
+
+ /// Check if entry is complete and ready to read.
+ bool IsAvailableForRead(UniqueHandle&) const;
+
+ /// Check if entry is ready to write. Always false if acquire_write=false.
+ bool IsAvailableForWrite(UniqueHandle&) const;
+
+ // Register results are complete and available.
+ void CompleteWrite(UniqueHandle handle, size_t size);
+
+ // Abort writing. If tombstone is true, IsAvailableForWrite will be false
for future
+ // queries.
+ void AbortWrite(UniqueHandle handle, bool tombstone);
+
+ /// Get path to read/write.
+ const char* GetPath(UniqueHandle&) const;
+
+ /// Max size for a single cache entry.
+ int MaxSize() const { return cache_->MaxCharge(); }
+
+ /// For metrics increment.
+ void IncrementMetric(MetricType type) {
+ switch (type) {
+ case MetricType::HIT:
+ tuple_cache_hits_->Increment(1);
+ break;
+ case MetricType::MISS:
+ tuple_cache_misses_->Increment(1);
+ break;
+ case MetricType::SKIPPED:
+ tuple_cache_skipped_->Increment(1);
+ break;
+ case MetricType::HALTED:
+ tuple_cache_halted_->Increment(1);
+ break;
+ }
+ }
+
+ /// Callback invoked when evicting an entry from the cache. 'key' is the
cache key
+ /// of the entry being evicted and 'value' contains the cache entry which is
the
+ /// meta-data of where the cached data is stored.
+ virtual void EvictedEntry(kudu::Slice key, kudu::Slice value) override;
+
+ private:
+ // Disallow copy and assign
+ TupleCacheMgr(const TupleCacheMgr&) = delete;
+ TupleCacheMgr& operator=(const TupleCacheMgr&) = delete;
+
+ friend class TupleCacheMgrTest;
+
+ // Constructor for tests
+ enum DebugPos {
+ FAIL_ALLOCATE = 1 << 0,
+ FAIL_INSERT = 1 << 1,
+ };
+ TupleCacheMgr(string cache_config, string eviction_policy_str,
+ MetricGroup* metrics, uint8_t debug_pos);
+
+ // Delete any existing files in the cache directory to start fresh
+ Status DeleteExistingFiles() const;
+
+ const std::string cache_config_;
+ const std::string eviction_policy_str_;
+
+ std::string cache_dir_;
+ bool enabled_ = false;
+ uint8_t debug_pos_;
+
+ /// Metrics for the tuple cache in the daemon level.
+ IntCounter* tuple_cache_hits_;
+ IntCounter* tuple_cache_misses_;
+ IntCounter* tuple_cache_skipped_;
+ IntCounter* tuple_cache_halted_;
+ IntCounter* tuple_cache_entries_evicted_;
+ IntGauge* tuple_cache_entries_in_use_;
+ IntGauge* tuple_cache_entries_in_use_bytes_;
+ IntGauge* tuple_cache_tombstones_in_use_;
+
+ /// Statistics for the tuple cache sizes allocated.
+ HistogramMetric* tuple_cache_entry_size_stats_;
+
+ /// The instance of the cache.
+ mutable std::mutex creation_lock_;
+ std::unique_ptr<Cache> cache_;
+};
+
+}
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index a05bc04f0..5efe2188d 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -177,6 +177,16 @@ parser.add_option("--disable_tuple_caching",
default=False, action="store_true",
help="If true, sets the tuple caching feature flag "
"(allow_tuple_caching) to false. This defaults to false to
enable "
"tuple caching in the development environment")
+parser.add_option("--tuple_cache_dir", dest="tuple_cache_dir",
+ default=os.environ.get("TUPLE_CACHE_DIR", None),
+ help="Specifies a base directory for the result tuple
cache.")
+parser.add_option("--tuple_cache_capacity", dest="tuple_cache_capacity",
+ default=os.environ.get("TUPLE_CACHE_CAPACITY", "1GB"),
+ help="This specifies the maximum storage usage of the tuple
cache "
+ "each Impala daemon can use.")
+parser.add_option("--tuple_cache_eviction_policy",
dest="tuple_cache_eviction_policy",
+ default="LRU", help="This specifies the cache eviction
policy to use "
+ "for the tuple cache.")
# For testing: list of comma-separated delays, in milliseconds, that delay
impalad catalog
# replica initialization. The ith delay is applied to the ith impalad.
@@ -574,6 +584,27 @@ def build_impalad_arg_lists(cluster_size,
num_coordinators, use_exclusive_coordi
tracing_args=tracing_args)
args = "{tracing_args} {args}".format(tracing_args=tracing_args,
args=args)
+ if options.tuple_cache_dir:
+ # create the base directory
+ tuple_cache_path = \
+ os.path.join(options.tuple_cache_dir,
"impala-tuplecache-{0}".format(str(i)))
+ # Try creating the directory if it doesn't exist already. May raise
exception.
+ if not os.path.exists(tuple_cache_path):
+ os.makedirs(tuple_cache_path)
+ if options.docker_network is None:
+ tuple_cache_path_arg = tuple_cache_path
+ else:
+ # The cache directory will always be mounted at the same path inside
the
+ # container. Reuses the data cache dedicated mount.
+ tuple_cache_path_arg = DATA_CACHE_CONTAINER_PATH
+
+ args = "-tuple_cache={dir}:{cap} {args}".format(
+ dir=tuple_cache_path_arg, cap=options.tuple_cache_capacity,
args=args)
+
+ # Add the eviction policy
+ args = "-tuple_cache_eviction_policy={policy} {args}".format(
+ policy=options.tuple_cache_eviction_policy, args=args)
+
if options.enable_admission_service:
args = "{args} -admission_service_host={host}".format(
args=args, host=admissiond_host)
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 5cbd4786a..5aee94d49 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -79,6 +79,96 @@
"kind": "HISTOGRAM",
"key": "impala.codegen-cache.entry-sizes"
},
+ {
+ "description": "The total number of cache hits in the Tuple Cache",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Tuple Cache Hits",
+ "units": "UNIT",
+ "kind": "COUNTER",
+ "key": "impala.tuple-cache.hits"
+ },
+ {
+ "description": "The total number of cache misses in the Tuple Cache",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Tuple Cache Misses",
+ "units": "UNIT",
+ "kind": "COUNTER",
+ "key": "impala.tuple-cache.misses"
+ },
+ {
+ "description": "The total number of Tuple Cache that skipped writing",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Tuple Cache Skipped Writing",
+ "units": "UNIT",
+ "kind": "COUNTER",
+ "key": "impala.tuple-cache.skipped"
+ },
+ {
+ "description": "The total number of Tuple Cache that halted writing",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Tuple Cache Halted Writing",
+ "units": "UNIT",
+ "kind": "COUNTER",
+ "key": "impala.tuple-cache.halted"
+ },
+ {
+ "description": "The number of in-use Tuple Cache Entries",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "In-use Tuple Cache Entries",
+ "units": "UNIT",
+ "kind": "GAUGE",
+ "key": "impala.tuple-cache.entries-in-use"
+ },
+ {
+ "description": "The total bytes of in-use Tuple Cache Entries",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "In-use Tuple Cache Entries total bytes",
+ "units": "UNIT",
+ "kind": "GAUGE",
+ "key": "impala.tuple-cache.entries-in-use-bytes"
+ },
+ {
+ "description": "The number of in-use Tuple Cache Tombstones",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "In-use Tuple Cache Tombstones",
+ "units": "UNIT",
+ "kind": "GAUGE",
+ "key": "impala.tuple-cache.tombstones-in-use"
+ },
+ {
+ "description": "The number of evicted Tuple Cache Entries",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Evicted Tuple Cache Entries",
+ "units": "UNIT",
+ "kind": "COUNTER",
+ "key": "impala.tuple-cache.entries-evicted"
+ },
+ {
+ "description": "Statistics for Tuple Cache Entries sizes.",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Tuple Cache Entries Sizes.",
+ "units": "BYTES",
+ "kind": "HISTOGRAM",
+ "key": "impala.tuple-cache.entry-sizes"
+ },
{
"description": "Resource Pool $0 Configured Max Mem Resources",
"contexts": [
diff --git a/tests/custom_cluster/test_tuple_cache.py
b/tests/custom_cluster/test_tuple_cache.py
new file mode 100644
index 000000000..af65655af
--- /dev/null
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -0,0 +1,175 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+
+import pytest
+import random
+import string
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+TABLE_LAYOUT = 'name STRING, age INT, address STRING'
+
+
+# Generates a random table entry of at least 15 bytes.
+def table_value(seed):
+ r = random.Random(seed)
+ name = "".join([r.choice(string.ascii_letters) for _ in range(r.randint(5,
20))])
+ age = r.randint(1, 90)
+ address = "{0} {1}".format(r.randint(1, 9999),
+ "".join([r.choice(string.ascii_letters) for _ in range(r.randint(4,
12))]))
+ return '"{0}", {1}, "{2}"'.format(name, age, address)
+
+
+def assertCounters(profile, num_hits, num_halted, num_skipped):
+ assert "NumTupleCacheHits: {0} ".format(num_hits) in profile
+ assert "NumTupleCacheHalted: {0} ".format(num_halted) in profile
+ assert "NumTupleCacheSkipped: {0} ".format(num_skipped) in profile
+
+
+class TestTupleCache(CustomClusterTestSuite):
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ CACHE_START_ARGS = "--tuple_cache_dir=/tmp --log_level=2"
+
+ def cached_query(self, query):
+ return self.execute_query(query, {"ENABLE_TUPLE_CACHE": "TRUE", "MT_DOP":
"1"})
+
+ def cached_query_w_debugaction(self, query, debugaction):
+ query_opts = {
+ "ENABLE_TUPLE_CACHE": "TRUE",
+ "MT_DOP": "1",
+ "DEBUG_ACTION": debugaction
+ }
+ return self.execute_query(query, query_opts)
+
+ # Generates a table containing at least <scale> KB of data.
+ def create_table(self, fq_table, scale=1):
+ self.cached_query("CREATE TABLE {0} ({1})".format(fq_table, TABLE_LAYOUT))
+ for _ in range(scale):
+ values = [table_value(i) for i in range(70)]
+ self.cached_query("INSERT INTO {0} VALUES ({1})".format(
+ fq_table, "), (".join(values)))
+
+ @CustomClusterTestSuite.with_args(cluster_size=1)
+ @pytest.mark.execute_serially
+ def test_cache_disabled(self, vector, unique_database):
+ fq_table = "{0}.cache_disabled".format(unique_database)
+ self.create_table(fq_table)
+ result1 = self.cached_query("SELECT * from {0}".format(fq_table))
+ result2 = self.cached_query("SELECT * from {0}".format(fq_table))
+
+ assert result1.success
+ assert result2.success
+ assert result1.data == result2.data
+ assertCounters(result1.runtime_profile, num_hits=0, num_halted=0,
num_skipped=1)
+ assertCounters(result2.runtime_profile, num_hits=0, num_halted=0,
num_skipped=1)
+
+ @CustomClusterTestSuite.with_args(
+ start_args=CACHE_START_ARGS, cluster_size=1)
+ @pytest.mark.execute_serially
+ def test_create_and_select(self, vector, unique_database):
+ fq_table = "{0}.create_and_select".format(unique_database)
+ self.create_table(fq_table)
+ result1 = self.cached_query("SELECT * from {0}".format(fq_table))
+ result2 = self.cached_query("SELECT * from {0}".format(fq_table))
+
+ assert result1.success
+ assert result2.success
+ assert result1.data == result2.data
+ assertCounters(result1.runtime_profile, num_hits=0, num_halted=0,
num_skipped=0)
+ assertCounters(result2.runtime_profile, num_hits=1, num_halted=0,
num_skipped=0)
+
+ @CustomClusterTestSuite.with_args(
+ start_args=CACHE_START_ARGS + " --tuple_cache_capacity=64MB",
cluster_size=1,
+ impalad_args="--cache_force_single_shard")
+ @pytest.mark.execute_serially
+ def test_cache_halted_select(self, vector, unique_database):
+ # The cache is set to the minimum cache size, so run a SQL that produces
enough
+ # data to exceed the cache size and halt caching.
+ big_enough_query = "SELECT o_comment from tpch.orders"
+ result1 = self.cached_query(big_enough_query)
+ result2 = self.cached_query(big_enough_query)
+
+ assert result1.success
+ assert result2.success
+ assert result1.data == result2.data
+ assertCounters(result1.runtime_profile, num_hits=0, num_halted=1,
num_skipped=0)
+ assertCounters(result2.runtime_profile, num_hits=0, num_halted=0,
num_skipped=1)
+
+ @CustomClusterTestSuite.with_args(
+ start_args=CACHE_START_ARGS, cluster_size=1)
+ @pytest.mark.execute_serially
+ def test_failpoints(self, vector, unique_database):
+ fq_table = "{0}.create_and_select".format(unique_database)
+ # Scale 20 gets us enough rows to force multiple RowBatches (needed for the
+ # the reader GetNext() cases).
+ self.create_table(fq_table, scale=20)
+ query = "SELECT * from {0}".format(fq_table)
+
+ # Fail when writing cache entry. All of these are handled and will not
fail the
+ # query.
+ # Case 1: fail during Open()
+ result = self.cached_query_w_debugaction(query,
"TUPLE_FILE_WRITER_OPEN:[email protected]")
+ assert result.success
+ assertCounters(result.runtime_profile, num_hits=0, num_halted=0,
num_skipped=1)
+
+ # Case 2: fail during Write()
+ result = self.cached_query_w_debugaction(query,
"TUPLE_FILE_WRITER_WRITE:[email protected]")
+ assert result.success
+ assertCounters(result.runtime_profile, num_hits=0, num_halted=0,
num_skipped=0)
+
+ # Case 3: fail during Commit()
+ result = self.cached_query_w_debugaction(query,
"TUPLE_FILE_WRITER_COMMIT:[email protected]")
+ assert result.success
+ assertCounters(result.runtime_profile, num_hits=0, num_halted=0,
num_skipped=0)
+
+ # Now, successfully add a cache entry
+ result1 = self.cached_query(query)
+ assert result1.success
+ assertCounters(result1.runtime_profile, num_hits=0, num_halted=0,
num_skipped=0)
+
+ # Fail when reading a cache entry
+ # Case 1: fail during Open()
+ result = self.cached_query_w_debugaction(query,
"TUPLE_FILE_READER_OPEN:[email protected]")
+ assert result.success
+ assert result.data == result1.data
+ # Not a hit
+ assertCounters(result.runtime_profile, num_hits=0, num_halted=0,
num_skipped=1)
+
+ # Case 2: fail during the first GetNext() call
+ result = self.cached_query_w_debugaction(query,
+ "TUPLE_FILE_READER_FIRST_GETNEXT:[email protected]")
+ assert result.success
+ assert result.data == result1.data
+ # Technically, this is a hit
+ assertCounters(result.runtime_profile, num_hits=1, num_halted=0,
num_skipped=0)
+
+ # Case 3: fail during the second GetNext() call
+ # This one must fail for correctness, as it cannot fall back to the child
if it
+ # has already returned cached rows
+ hit_error = False
+ try:
+ result = self.cached_query_w_debugaction(query,
+ "TUPLE_FILE_READER_SECOND_GETNEXT:[email protected]")
+ except Exception:
+ hit_error = True
+
+ assert hit_error