This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a680b302 IMPALA-13478: Sync tuple cache files to disk asynchronously
2a680b302 is described below

commit 2a680b302e1c581deb4f7312323188d718c23bcb
Author: Joe McDonnell <[email protected]>
AuthorDate: Sun Dec 15 15:45:27 2024 -0800

    IMPALA-13478: Sync tuple cache files to disk asynchronously
    
    When a tuple cache entry is first being written, we want to
    sync the contents to disk. Currently, that happens on the
    fast path and delays the query results, sometimes significantly.
    This moves the Sync() call off of the fast path by passing
    the work to a thread pool. The threads in the pool open
    the file, sync it to disk, then close the file. If anything
    goes wrong, the cache entry is evicted.
    
    The tuple cache can generate writes very quickly, so this needs
    a backpressure mechanism to avoid overwhelming the disk. In
    particular, it needs to avoid accumulating dirty buffers to
    the point that the OS throttles new writes, delaying the query
    fast path. This implements a limit on outstanding writes (i.e.
    writes that have not been flushed to disk). To enforce it,
    writers now call UpdateWriteSize() to reserve space before
    writing. UpdateWriteSize() can fail if it hits the limit on
    outstanding writes or if this particular cache entry has hit
    the maximum size. When it fails, the writer should abort writing
    the cache entry.
    
    Since UpdateWriteSize() is updating the charge in the cache,
    the outstanding writes are being counted against the capacity,
    triggering evictions. This improves the tuple cache's adherence
    to the capacity limit.
    
    The outstanding writes limits is configured via the
    tuple_cache_outstanding_write_limit startup flag, which is
    either a specific size string (e.g. 1GB) or a percentage of
    the process memory limit. To avoid updating the cache charge
    very frequently, this has an update chunk size specified
    by tuple_cache_outstanding_write_chunk_bytes.
    
    This adds counters at the daemon level:
     - outstanding write bytes
     - number of writes halted due to backpressure
     - number of sync calls that fail (due to IO errors)
     - number of sync calls dropped due to queue backpressure
    The runtime profile adds a NumTupleCacheBackpressureHalted
    counter that is set when a write hits the outstanding write
    limit.
    
    This has a startup option to add randomness to the tuple cache
    keys to make it easy to test a scenario with no cache hits.
    
    Testing:
     - Added unit tests to tuple-cache-mgr-test
     - Testing with TPC-DS on a cluster with fast NVME SSDs showed
       a significant improvement in the first-run times due to the
       asynchronous syncs.
     - Testing with TPC-H on a system with a slow disk and zero cache
       hits showed improved behavior with the backpressure
    
    Change-Id: I646bb56300656d8b8ac613cb8fe2f85180b386d3
    Reviewed-on: http://gerrit.cloudera.org:8080/22215
    Reviewed-by: Joe McDonnell <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/tuple-cache-node.cc           |  24 ++-
 be/src/exec/tuple-cache-node.h            |   3 +
 be/src/exec/tuple-file-read-write-test.cc |  16 +-
 be/src/exec/tuple-file-writer.cc          |  17 +-
 be/src/exec/tuple-file-writer.h           |  20 +-
 be/src/runtime/exec-env.cc                |   2 +-
 be/src/runtime/tuple-cache-mgr-test.cc    | 299 +++++++++++++++++++++++++++++-
 be/src/runtime/tuple-cache-mgr.cc         | 299 ++++++++++++++++++++++++++----
 be/src/runtime/tuple-cache-mgr.h          |  58 +++++-
 be/src/service/query-options.cc           |  13 +-
 common/thrift/generate_error_codes.py     |   8 +-
 common/thrift/metrics.json                |  40 ++++
 12 files changed, 723 insertions(+), 76 deletions(-)

diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc
index f2ade640d..165f89f7a 100644
--- a/be/src/exec/tuple-cache-node.cc
+++ b/be/src/exec/tuple-cache-node.cc
@@ -60,6 +60,8 @@ Status TupleCacheNode::Prepare(RuntimeState* state) {
   num_hits_counter_ = ADD_COUNTER(runtime_profile(), "NumTupleCacheHits", 
TUnit::UNIT);
   num_halted_counter_ =
       ADD_COUNTER(runtime_profile(), "NumTupleCacheHalted", TUnit::UNIT);
+  num_backpressure_halted_counter_ =
+      ADD_COUNTER(runtime_profile(), "NumTupleCacheBackpressureHalted", 
TUnit::UNIT);
   num_skipped_counter_ =
       ADD_COUNTER(runtime_profile(), "NumTupleCacheSkipped", TUnit::UNIT);
 
@@ -128,7 +130,10 @@ Status TupleCacheNode::Open(RuntimeState* state) {
     }
   } else if (tuple_cache_mgr->IsAvailableForWrite(handle_)) {
     writer_ = make_unique<TupleFileWriter>(tuple_cache_mgr->GetPath(handle_),
-        mem_tracker(), runtime_profile(), tuple_cache_mgr->MaxSize());
+        mem_tracker(), runtime_profile(),
+        [this, tuple_cache_mgr] (size_t new_size) {
+            return tuple_cache_mgr->RequestWriteSize(&this->handle_, new_size);
+        });
     Status status = writer_->Open(state);
     if (!status.ok()) {
       LOG(WARNING) << "Could not write cache entry for "
@@ -282,18 +287,23 @@ Status TupleCacheNode::GetNext(
       // If there was an error or we exceeded the file size limit, stop 
caching but
       // continue reading from the child node.
       if (!status.ok()) {
-        if (writer_->ExceededMaxSize()) {
+        bool set_tombstone = false;
+        if (status.code() == 
TErrorCode::TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED) {
           VLOG_FILE << "Tuple Cache entry for " << combined_key_
                     << " hit the maximum file size: " << status.GetDetail();
           COUNTER_ADD(num_halted_counter_, 1);
-          tuple_cache_mgr->IncrementMetric(TupleCacheMgr::MetricType::HALTED);
-          writer_->Abort();
-          tuple_cache_mgr->AbortWrite(move(handle_), true);
+          set_tombstone = true;
+        } else if (status.code() ==
+            TErrorCode::TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED) {
+          VLOG_FILE << "Tuple Cache entry for " << combined_key_
+                    << " hit the outstanding writes limit: " << 
status.GetDetail();
+          COUNTER_ADD(num_backpressure_halted_counter_, 1);
         } else {
+          // This is an unknown error (e.g. an IO error), so write a warning.
           LOG(WARNING) << "Unable to write cache file: " << status.GetDetail();
-          writer_->Abort();
-          tuple_cache_mgr->AbortWrite(move(handle_), false);
         }
+        writer_->Abort();
+        tuple_cache_mgr->AbortWrite(move(handle_), set_tombstone);
         writer_.reset();
       } else if (*eos) {
         // If we hit end of stream, then we can complete the cache entry
diff --git a/be/src/exec/tuple-cache-node.h b/be/src/exec/tuple-cache-node.h
index 6c7cb0af1..6adeaf58a 100644
--- a/be/src/exec/tuple-cache-node.h
+++ b/be/src/exec/tuple-cache-node.h
@@ -66,6 +66,9 @@ private:
   RuntimeProfile::Counter* num_hits_counter_ = nullptr;
   /// Number of results that were too large for the cache
   RuntimeProfile::Counter* num_halted_counter_ = nullptr;
+  /// Number of results that were halted due to backpressure (i.e. hitting the
+  /// outstanding writes limit)
+  RuntimeProfile::Counter* num_backpressure_halted_counter_ = nullptr;
   /// Number of results that skip the cache due to a tombstone
   RuntimeProfile::Counter* num_skipped_counter_ = nullptr;
 
diff --git a/be/src/exec/tuple-file-read-write-test.cc 
b/be/src/exec/tuple-file-read-write-test.cc
index 678065df8..f633bf718 100644
--- a/be/src/exec/tuple-file-read-write-test.cc
+++ b/be/src/exec/tuple-file-read-write-test.cc
@@ -246,7 +246,13 @@ TEST_F(TupleFileReadWriteTest, TestExceedMaxFileSize) {
   // Limit the file to 20 bytes
   size_t max_size = 20;
 
-  TupleFileWriter writer(path, tracker(), profile(), max_size);
+  TupleFileWriter writer(path, tracker(), profile(),
+     [max_size] (size_t requested_size) {
+       if (requested_size > max_size) {
+         return Status("exceed the maximum file size");
+       }
+       return Status::OK();
+     });
 
   Status status = writer.Open(runtime_state());
   ASSERT_OK(status);
@@ -279,7 +285,13 @@ TEST_F(TupleFileReadWriteTest, TestExactMaxFileSize) {
   // Now, run the same thing with the max size set to the number of bytes 
written.
   string path2 = Path("exact-max-size-file");
   filesystem::remove(path2);
-  TupleFileWriter limited_writer(path2, tracker(), profile(), max_size);
+  TupleFileWriter limited_writer(path2, tracker(), profile(),
+      [max_size] (size_t requested_size) {
+        if (requested_size > max_size) {
+          return Status("exceed the maximum file size");
+        }
+        return Status::OK();
+      });
 
   status = limited_writer.Open(runtime_state());
   ASSERT_OK(status);
diff --git a/be/src/exec/tuple-file-writer.cc b/be/src/exec/tuple-file-writer.cc
index 46af2bbf8..4fe1f0cfb 100644
--- a/be/src/exec/tuple-file-writer.cc
+++ b/be/src/exec/tuple-file-writer.cc
@@ -39,7 +39,8 @@ namespace impala {
 static const char* UNIQUE_PATH_SUFFIX = ".%%%%";
 
 TupleFileWriter::TupleFileWriter(
-    std::string path, MemTracker* parent, RuntimeProfile* profile, size_t 
max_file_size)
+    std::string path, MemTracker* parent, RuntimeProfile* profile,
+    RequestWriteSizeCb request_write_size_cb)
   : path_(move(path)),
     temp_suffix_(filesystem::unique_path(UNIQUE_PATH_SUFFIX).string()),
     tracker_(new MemTracker(-1, "TupleFileWriter", parent)),
@@ -49,7 +50,7 @@ TupleFileWriter::TupleFileWriter(
     serialize_timer_(profile ? ADD_TIMER(profile, "TupleCacheSerializeTime") : 
nullptr),
     bytes_written_(profile ?
         ADD_COUNTER(profile, "TupleCacheBytesWritten", TUnit::BYTES) : 
nullptr),
-    max_file_size_(max_file_size) {}
+    request_write_size_cb_(move(request_write_size_cb)) {}
 
 TupleFileWriter::~TupleFileWriter() {
   if (state_ != State::Uninitialized) {
@@ -135,11 +136,11 @@ Status TupleFileWriter::Write(RuntimeState* state, 
RowBatch* row_batch) {
   for (auto slice : slices) {
     num_bytes_to_write += slice.size();
   }
-  if (BytesWritten() + num_bytes_to_write > max_file_size_) {
-    exceeded_max_size_ = true;
-    return Status(
-        Substitute("Write of size $0 would cause $1 to exceed the maximum file 
size $2",
-            num_bytes_to_write, TempPath(), max_file_size_));
+
+  if (request_write_size_cb_ != nullptr) {
+    // If the request_write_size_cb is set, call it before writing to get 
permission.
+    // It will return a not-OK status if this writer should stop.
+    RETURN_IF_ERROR(request_write_size_cb_(BytesWritten() + 
num_bytes_to_write));
   }
 
   RETURN_IF_ERROR(DebugAction(state->query_options(), 
"TUPLE_FILE_WRITER_WRITE"));
@@ -187,7 +188,7 @@ Status TupleFileWriter::Commit(RuntimeState* state) {
 
   RETURN_IF_ERROR(DebugAction(state->query_options(), 
"TUPLE_FILE_WRITER_COMMIT"));
 
-  KUDU_RETURN_IF_ERROR(tmp_file_->Sync(), "Failed to sync cache file");
+  // Sync() is called asychnronously by TupleCacheMgr's sync thread pool.
   KUDU_RETURN_IF_ERROR(tmp_file_->Close(), "Failed to close cache file");
 
   std::string src = TempPath();
diff --git a/be/src/exec/tuple-file-writer.h b/be/src/exec/tuple-file-writer.h
index 209229472..5c34e427d 100644
--- a/be/src/exec/tuple-file-writer.h
+++ b/be/src/exec/tuple-file-writer.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <fstream>
+#include <functional>
 #include <limits>
 #include <memory>
 
@@ -37,6 +38,8 @@ class RowBatch;
 class RuntimeState;
 class TupleReadWriteTest;
 
+using RequestWriteSizeCb = std::function<Status (size_t)>;
+
 /// The TupleFileWriter is used to serialize a stream of RowBatches to a local 
file
 /// for the tuple cache. It uses the standard RowBatch serialization used for 
KRPC
 /// data streams (i.e. RowBatch::Serialize()). The files can be read back 
using the
@@ -48,9 +51,10 @@ class TupleReadWriteTest;
 /// Commit(), it runs Abort() and any associated file is deleted. The user can
 /// proactively call Abort() to delete any associated files, but it is not 
required.
 ///
-/// The TupleFileWriter enforces a maximum file size and will fail Write() 
calls that
-/// would exceed this limit. It provides a way for the caller to get how many 
bytes
-/// have been written for accounting purposes.
+/// The TupleFileWriter calls the request_write_size_cb requesting a new write 
size
+/// before each write and will fail Write() if this callback returns an error.
+/// It provides a way for the caller to get how many bytes have been written 
for
+/// accounting purposes.
 ///
 /// Currently, the TupleFileWriter does not embed the actual tuple layout into 
the
 /// file. It relies on the corresponding TupleFileReader reading with the same
@@ -59,7 +63,7 @@ class TupleReadWriteTest;
 class TupleFileWriter {
 public:
   TupleFileWriter(std::string path, MemTracker* parent, RuntimeProfile* 
profile,
-      size_t max_file_size = std::numeric_limits<size_t>::max());
+      RequestWriteSizeCb request_write_size_cb = nullptr);
   ~TupleFileWriter();
 
   Status Open(RuntimeState* state);
@@ -69,8 +73,6 @@ public:
   // call Write() or Commit().
   Status Write(RuntimeState* state, RowBatch* row_batch);
 
-  bool ExceededMaxSize() const { return exceeded_max_size_; }
-
   // Number of bytes written to file. Must be called before Commit/Abort.
   size_t BytesWritten() const;
 
@@ -102,10 +104,8 @@ private:
   RuntimeProfile::Counter* serialize_timer_;
   // Total bytes written
   RuntimeProfile::Counter* bytes_written_;
-  // Maximum size for the resulting file
-  size_t max_file_size_;
-  // True if the file reached the maximum size
-  bool exceeded_max_size_ = false;
+  // Callback to request an increase to the write size
+  RequestWriteSizeCb request_write_size_cb_;
 
   // This writes to a temporary file, only moving it into the final location 
with
   // Commit(). tmp_file_ is the file abstraction used for writing the 
temporary file.
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index b16275ed4..065f2b014 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -404,7 +404,7 @@ Status ExecEnv::Init() {
   }
 
   // Initialize the tuple cache
-  RETURN_IF_ERROR(tuple_cache_mgr_->Init());
+  RETURN_IF_ERROR(tuple_cache_mgr_->Init(bytes_limit));
 
   LOG(INFO) << "Admit memory limit: "
             << PrettyPrinter::Print(admit_mem_limit_, TUnit::BYTES);
diff --git a/be/src/runtime/tuple-cache-mgr-test.cc 
b/be/src/runtime/tuple-cache-mgr-test.cc
index 1576a3eea..2a9d10165 100644
--- a/be/src/runtime/tuple-cache-mgr-test.cc
+++ b/be/src/runtime/tuple-cache-mgr-test.cc
@@ -19,9 +19,11 @@
 #include <boost/filesystem.hpp>
 
 #include "gutil/strings/substitute.h"
+#include "kudu/util/env.h"
 #include "runtime/tuple-cache-mgr.h"
 #include "testutil/gtest-util.h"
 #include "util/filesystem-util.h"
+#include "util/time.h"
 
 #include "common/names.h"
 
@@ -47,12 +49,17 @@ public:
   }
 
   TupleCacheMgr GetCache(const string& cache_dir, const string& capacity = 
"1MB",
-      string eviction_policy = "LRU", uint8_t debug_pos = 0) {
+      string eviction_policy = "LRU", uint8_t debug_pos = 
TupleCacheMgr::NO_FILES,
+      uint32_t sync_pool_size = 0, uint32_t sync_pool_queue_depth = 1000,
+      string outstanding_write_limit_str = "1GB",
+      uint32_t outstanding_write_chunk_bytes = 0) {
     string cache_config;
     if (!cache_dir.empty()) {
       cache_config = Substitute("$0:$1", cache_dir, capacity);
     }
-    return TupleCacheMgr{move(cache_config), move(eviction_policy), &metrics_, 
debug_pos};
+    return TupleCacheMgr{move(cache_config), move(eviction_policy), &metrics_, 
debug_pos,
+        sync_pool_size, sync_pool_queue_depth, 
move(outstanding_write_limit_str),
+        outstanding_write_chunk_bytes};
   }
 
   TupleCacheMgr GetCache() {
@@ -60,11 +67,13 @@ public:
   }
 
   TupleCacheMgr GetFailAllocateCache() {
-    return GetCache(GetCacheDir(), "1MB", "LRU", TupleCacheMgr::FAIL_ALLOCATE);
+    return GetCache(GetCacheDir(), "1MB", "LRU",
+        TupleCacheMgr::FAIL_ALLOCATE | TupleCacheMgr::NO_FILES);
   }
 
   TupleCacheMgr GetFailInsertCache() {
-    return GetCache(GetCacheDir(), "1MB", "LRU", TupleCacheMgr::FAIL_INSERT);
+    return GetCache(GetCacheDir(), "1MB", "LRU",
+        TupleCacheMgr::FAIL_INSERT | TupleCacheMgr::NO_FILES);
   }
 
   std::string GetCacheDir() const { return cache_dir_; }
@@ -303,4 +312,286 @@ TEST_F(TupleCacheMgrTest, TestMaxSize) {
   EXPECT_EQ(1024, cache.MaxSize());
 }
 
+TEST_F(TupleCacheMgrTest, TestRequestWriteSize) {
+  FLAGS_cache_force_single_shard = true;
+  TupleCacheMgr cache = GetCache(GetCacheDir(), "1KB");
+  ASSERT_OK(cache.Init());
+
+  // Write 5 entries of 200 bytes each
+  for (int i = 0; i < 5; ++i) {
+    TupleCacheMgr::UniqueHandle handle = cache.Lookup(Substitute("a_key_$0", 
i), true);
+    EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+    cache.CompleteWrite(move(handle), 200);
+  }
+
+  TupleCacheMgr::UniqueHandle handle = cache.Lookup("update_entry_then_abort", 
true);
+  EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+  // Update to 200 bytes. This should evict one entry.
+  Status status = cache.RequestWriteSize(&handle, 200);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 1);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 200);
+
+  // Update to 900. This will evict all the others
+  status = cache.RequestWriteSize(&handle, 900);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 5);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 900);
+
+  // Update to MaxSize(). This will succeed.
+  status = cache.RequestWriteSize(&handle, cache.MaxSize());
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 5);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 
cache.MaxSize());
+
+  // Try to update to MaxSize() + 1. This will fail.
+  status = cache.RequestWriteSize(&handle, cache.MaxSize() + 1);
+  EXPECT_FALSE(status.ok());
+  EXPECT_EQ(status.code(), TErrorCode::TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED);
+  EXPECT_EQ(cache.tuple_cache_entries_evicted_->GetValue(), 5);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 
cache.MaxSize());
+
+  // Need to test the three state transitions out of IN_PROGRESS
+  // Path #1: AbortWrite without tombstone
+  cache.AbortWrite(move(handle), /* tombstone */ false);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
+
+  // Path #2: AbortWrite with tombstone
+  handle = cache.Lookup("update_entry_then_tombstone", true);
+  EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+  status = cache.RequestWriteSize(&handle, 900);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 900);
+  cache.AbortWrite(move(handle), /* tombstone */ true);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
+
+  // Path #3: CompleteWrite
+  handle = cache.Lookup("update_entry_then_complete", true);
+  EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+  status = cache.RequestWriteSize(&handle, 900);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 900);
+  cache.CompleteWrite(move(handle), 900);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
+}
+
+TEST_F(TupleCacheMgrTest, TestOutstandingWriteLimit) {
+  FLAGS_cache_force_single_shard = true;
+  // Set up a cache with an outstanding write limit of 1KB
+  TupleCacheMgr cache = GetCache(GetCacheDir(), "1KB", "LRU", 0, 0, 0, "1KB");
+  ASSERT_OK(cache.Init());
+
+  // Open two handles
+  TupleCacheMgr::UniqueHandle handle1 = 
cache.Lookup("outstanding_write_limit_1", true);
+  EXPECT_TRUE(cache.IsAvailableForWrite(handle1));
+  TupleCacheMgr::UniqueHandle handle2 = 
cache.Lookup("outstanding_write_limit_2", true);
+  EXPECT_TRUE(cache.IsAvailableForWrite(handle2));
+
+  // UpdateWrite size to 512 bytes for each, so it is equal to the limit and 
succeeds.
+  Status status = cache.RequestWriteSize(&handle1, 512);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 512);
+
+  status = cache.RequestWriteSize(&handle2, 512);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 1024);
+
+  // Going one byte past the limit should fail
+  // This does not set exceeded_max_size
+  status = cache.RequestWriteSize(&handle1, 513);
+  EXPECT_FALSE(status.ok());
+  EXPECT_EQ(status.code(), 
TErrorCode::TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 1024);
+
+  // Clean up
+  cache.AbortWrite(move(handle1), /* tombstone */ false);
+  cache.AbortWrite(move(handle2), /* tombstone */ false);
+}
+
+TEST_F(TupleCacheMgrTest, TestOutstandingWriteLimitConcurrent) {
+  FLAGS_cache_force_single_shard = true;
+  // Set up a cache with a low outstanding write limit of 1KB to make it easy 
to hit
+  // the limit.
+  TupleCacheMgr cache = GetCache(GetCacheDir(), "100KB", "LRU", 0, 0, 0, 
"1KB");
+  ASSERT_OK(cache.Init());
+
+  // This attempts to do 100 512-byte writes to the cache with 64 byte request 
chunks.
+  // The cache is big enough to fit all of the writes, so the only reason they 
should
+  // fail is when they hit the outstanding write limit.
+  vector<future<bool>> results;
+  results.reserve(100);
+  for (int i = 0; i < 100; ++i) {
+    results.emplace_back(async(launch::async, [&cache, i]() {
+      TupleCacheMgr::UniqueHandle handle = cache.Lookup(Substitute("write$0", 
i), true);
+      EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+      // Write in 64 byte chunks, 8 chunks = 512 bytes
+      for (int num_chunks = 1; num_chunks <= 8; ++num_chunks) {
+        Status status = cache.RequestWriteSize(&handle, num_chunks * 64);
+        if (!status.ok()) {
+          cache.AbortWrite(move(handle), /* tombstone */ false);
+          return false;
+        }
+      }
+      cache.CompleteWrite(move(handle), 512);
+      return true;
+    }));
+  }
+
+  // Wait for all threads to complete and count the number of failures
+  uint32_t num_failures = 0;
+  for (auto& result : results) {
+    result.wait();
+    if (!result.get()) num_failures++;
+  }
+
+  // This test case has race conditions. We expect the failures to line up 
with the
+  // number of backpressure halted. We expect at least one thread to succeed.
+  // There are scenarios where all the threads can succeed, so this doesn't 
require
+  // num_failures > 0.
+  EXPECT_EQ(cache.tuple_cache_backpressure_halted_->GetValue(), num_failures);
+  EXPECT_LT(num_failures, 100);
+}
+
+TEST_F(TupleCacheMgrTest, TestOutstandingWriteChunkSize) {
+  FLAGS_cache_force_single_shard = true;
+  uint32_t chunk_size = 250;
+  // Set up a cache with an outstanding write limit of 1KB and a chunk size of 
250
+  // The chunk size is specifically not a clean divisor of 1KB.
+  TupleCacheMgr cache =
+    GetCache(GetCacheDir(), "1KB", "LRU", 0, 0, 0, "2KB", chunk_size);
+  ASSERT_OK(cache.Init());
+
+  TupleCacheMgr::UniqueHandle handle = 
cache.Lookup("outstanding_chunk_then_abort", true);
+  EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+
+  // Update write size to 1, but this is counted as the chunk size
+  Status status = cache.RequestWriteSize(&handle, 1);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 
chunk_size);
+
+  // Request write size to be equal to the chunk size. This doesn't change the 
outstanding
+  // write bytes.
+  status = cache.RequestWriteSize(&handle, chunk_size);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 
chunk_size);
+
+  // Request write size to be one above the chunk size. This grabs a second 
chunk.
+  status = cache.RequestWriteSize(&handle, chunk_size + 1);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 2 * 
chunk_size);
+
+  // The chunk size avoids conflicts with the MaxSize(). This requests a size 
that
+  // would round to larger than MaxSize (the chunk size is not a clean divisor 
of the
+  // cache size), but it does not result in an error. Instead, it reserves 
MaxSize().
+  status = cache.RequestWriteSize(&handle,
+      ((cache.MaxSize() / chunk_size) * chunk_size) + 1);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 
cache.MaxSize());
+
+  // Request size can go all the way to MaxSize() even with chunk size.
+  status = cache.RequestWriteSize(&handle, cache.MaxSize());
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 
cache.MaxSize());
+
+  // Need to test the three state transitions out of IN_PROGRESS
+  // Path #1: AbortWrite without tombstone
+  cache.AbortWrite(move(handle), /* tombstone */ false);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
+
+  // Path #2: AbortWrite with tombstone
+  handle = cache.Lookup("outstanding_chunk_then_tombstone", true);
+  EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+  status = cache.RequestWriteSize(&handle, chunk_size + 1);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 2 * 
chunk_size);
+  cache.AbortWrite(move(handle), /* tombstone */ true);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
+
+  // Path #3: CompleteWrite
+  handle = cache.Lookup("outstanding_chunk_then_complete", true);
+  EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+  status = cache.RequestWriteSize(&handle, chunk_size + 1);
+  EXPECT_OK(status);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 2 * 
chunk_size);
+  cache.CompleteWrite(move(handle), chunk_size + 1);
+  EXPECT_EQ(cache.tuple_cache_outstanding_writes_bytes_->GetValue(), 0);
+}
+
+TEST_F(TupleCacheMgrTest, TestSyncToDisk) {
+  // Need the debug_pos to be zero so that DebugPos::NO_FILES is not set.
+  TupleCacheMgr cache =
+      GetCache(GetCacheDir(), "1KB", "LRU", /* debug_pos */ 0, /* 
sync_pool_size */ 10);
+  ASSERT_OK(cache.Init());
+
+  // Error case: If there is no file, then the thread doing sync will get an 
error
+  // when trying to open the file. This causes the entry to be evicted.
+  TupleCacheMgr::UniqueHandle handle = cache.Lookup("key_without_file", true);
+  EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+  cache.CompleteWrite(move(handle), 100);
+  // Sleep a bit to let the thread pool process the entry
+  SleepForMs(100);
+  handle = cache.Lookup("key_without_file", false);
+  EXPECT_FALSE(cache.IsAvailableForRead(handle));
+
+  // Success case: If there is a file that can be synced to disk, everything 
behaves
+  // normally.
+  handle = cache.Lookup("key_with_file", true);
+  std::string file_path = cache.GetPath(handle);
+  std::unique_ptr<kudu::WritableFile> cache_file;
+  kudu::Status s = kudu::Env::Default()->NewWritableFile(file_path, 
&cache_file);
+  EXPECT_TRUE(s.ok());
+  std::string data("data");
+  cache_file->Append(Slice(data));
+  cache.CompleteWrite(move(handle), 100);
+  // Sleep a bit to let the thread pool process the entry
+  SleepForMs(100);
+  handle = cache.Lookup("key_with_file", false);
+  EXPECT_TRUE(cache.IsAvailableForRead(handle));
+}
+
+TEST_F(TupleCacheMgrTest, TestDroppedSyncs) {
+  // Need the debug_pos to be zero so that DebugPos::NO_FILES is not set.
+  // We set a small sync_pool_size (1) and the bare minimum 
sync_pool_queue_depth (1)
+  // to force some syncs to be dropped.
+  FLAGS_cache_force_single_shard = true;
+  TupleCacheMgr cache = GetCache(GetCacheDir(), "10KB", "LRU", /* debug_pos */ 
0,
+      /* sync_pool_size */ 1, /* sync_pool_queue_depth */ 1);
+  ASSERT_OK(cache.Init());
+
+  // Attempt to write entries to the cache concurrently to stress the sync 
pool.
+  // This uses many writers, but the writes are small and can all fit into the
+  // cache. The only reason something would fail to write to the cache is if 
the
+  // sync pool gets overwhelmed.
+  vector<future<bool>> results;
+  results.reserve(100);
+  for (int i = 0; i < 100; ++i) {
+    results.emplace_back(async(launch::async, [&cache, i]() {
+      TupleCacheMgr::UniqueHandle handle = cache.Lookup(Substitute("write$0", 
i), true);
+      EXPECT_TRUE(cache.IsAvailableForWrite(handle));
+      std::string file_path = cache.GetPath(handle);
+      std::unique_ptr<kudu::WritableFile> cache_file;
+      kudu::Status s = kudu::Env::Default()->NewWritableFile(file_path, 
&cache_file);
+      EXPECT_TRUE(s.ok());
+      std::string data("data");
+      cache_file->Append(Slice(data));
+      cache.CompleteWrite(move(handle), 100);
+      // CompleteWrite doesn't return status, so we can only tell if the sync 
failed
+      // by looking up the entry.
+      handle = cache.Lookup(Substitute("write$0", i), false);
+      return cache.IsAvailableForRead(handle);
+    }));
+  }
+
+  // Wait for all threads to complete and count the number of failures
+  uint32_t num_failures = 0;
+  for (auto& result : results) {
+    result.wait();
+    if (!result.get()) num_failures++;
+  }
+  // The sync pool should get overwhelmed and the number of dropped syncs 
should match
+  // the number of failures.
+  EXPECT_GT(cache.tuple_cache_dropped_sync_->GetValue(), 0);
+  EXPECT_EQ(cache.tuple_cache_dropped_sync_->GetValue(), num_failures);
+}
+
 } // namespace impala
diff --git a/be/src/runtime/tuple-cache-mgr.cc 
b/be/src/runtime/tuple-cache-mgr.cc
index ddda300ec..dbeb8ef68 100644
--- a/be/src/runtime/tuple-cache-mgr.cc
+++ b/be/src/runtime/tuple-cache-mgr.cc
@@ -19,6 +19,7 @@
 
 #include <boost/filesystem.hpp>
 
+#include "common/constant-strings.h"
 #include "common/logging.h"
 #include "exec/tuple-file-reader.h"
 #include "exec/tuple-text-file-reader.h"
@@ -54,6 +55,20 @@ DEFINE_string(tuple_cache_eviction_policy, "LRU",
 DEFINE_string(tuple_cache_debug_dump_dir, "",
     "Directory for dumping the intermediate query result tuples for debugging 
purpose.");
 
+DEFINE_uint32(tuple_cache_sync_pool_size, 10,
+    "(Advanced) Size of the thread pool syncing cache files to disk 
asynchronously. "
+    "If set to 0, cache files are flushed sychronously.");
+DEFINE_uint32(tuple_cache_sync_pool_queue_depth, 1000,
+    "(Advanced) Maximum queue depth for the thread pool syncing cache files to 
disk");
+
+static const string OUTSTANDING_WRITE_LIMIT_MSG =
+  "(Advanced) Limit on the size of outstanding tuple cache writes. " +
+  Substitute(MEM_UNITS_HELP_MSG, "the process memory limit");
+DEFINE_string(tuple_cache_outstanding_write_limit, "1GB",
+    OUTSTANDING_WRITE_LIMIT_MSG.c_str());
+DEFINE_uint32(tuple_cache_outstanding_write_chunk_bytes, 128 * 1024,
+    "(Advanced) Chunk size for incrementing the outstanding tuple cache write 
size");
+
 // Global feature flag for tuple caching. If false, enable_tuple_cache cannot 
be true
 // and the coordinator cannot produce plans with TupleCacheNodes. The 
tuple_cache
 // parameter also cannot be specified.
@@ -91,31 +106,47 @@ static string ConstructTupleCacheDebugDumpPath() {
 }
 
 TupleCacheMgr::TupleCacheMgr(MetricGroup* metrics)
-  : TupleCacheMgr(FLAGS_tuple_cache, FLAGS_tuple_cache_eviction_policy, 
metrics, 0) {}
-
-TupleCacheMgr::TupleCacheMgr(string cache_config, string eviction_policy_str,
-    MetricGroup* metrics, uint8_t debug_pos)
+  : TupleCacheMgr(FLAGS_tuple_cache, FLAGS_tuple_cache_eviction_policy, 
metrics,
+        /* debug_pos */ 0, FLAGS_tuple_cache_sync_pool_size,
+        FLAGS_tuple_cache_sync_pool_queue_depth,
+        FLAGS_tuple_cache_outstanding_write_limit,
+        FLAGS_tuple_cache_outstanding_write_chunk_bytes) {}
+
+TupleCacheMgr::TupleCacheMgr(
+    string cache_config, string eviction_policy_str, MetricGroup* metrics,
+    uint8_t debug_pos, uint32_t sync_pool_size, uint32_t sync_pool_queue_depth,
+    string outstanding_write_limit_str, uint32_t outstanding_write_chunk_bytes)
   : cache_config_(move(cache_config)),
     eviction_policy_str_(move(eviction_policy_str)),
+    outstanding_write_limit_str_(move(outstanding_write_limit_str)),
     cache_debug_dump_dir_(ConstructTupleCacheDebugDumpPath()),
     debug_pos_(debug_pos),
+    sync_pool_size_(sync_pool_size),
+    sync_pool_queue_depth_(sync_pool_queue_depth),
+    outstanding_write_chunk_bytes_(outstanding_write_chunk_bytes),
     tuple_cache_hits_(metrics->AddCounter("impala.tuple-cache.hits", 0)),
     tuple_cache_misses_(metrics->AddCounter("impala.tuple-cache.misses", 0)),
     tuple_cache_skipped_(metrics->AddCounter("impala.tuple-cache.skipped", 0)),
     tuple_cache_halted_(metrics->AddCounter("impala.tuple-cache.halted", 0)),
+    tuple_cache_backpressure_halted_(
+        metrics->AddCounter("impala.tuple-cache.backpressure-halted", 0)),
     tuple_cache_entries_evicted_(
         metrics->AddCounter("impala.tuple-cache.entries-evicted", 0)),
+    
tuple_cache_failed_sync_(metrics->AddCounter("impala.tuple-cache.failed-syncs", 
0)),
+    
tuple_cache_dropped_sync_(metrics->AddCounter("impala.tuple-cache.dropped-syncs",
 0)),
     tuple_cache_entries_in_use_(
         metrics->AddGauge("impala.tuple-cache.entries-in-use", 0)),
     tuple_cache_entries_in_use_bytes_(
         metrics->AddGauge("impala.tuple-cache.entries-in-use-bytes", 0)),
     tuple_cache_tombstones_in_use_(
         metrics->AddGauge("impala.tuple-cache.tombstones-in-use", 0)),
+    tuple_cache_outstanding_writes_bytes_(
+        metrics->AddGauge("impala.tuple-cache.outstanding-writes-bytes", 0)),
     tuple_cache_entry_size_stats_(metrics->RegisterMetric(
         new HistogramMetric(MetricDefs::Get("impala.tuple-cache.entry-sizes"),
             STATS_MAX_TUPLE_CACHE_ENTRY_SIZE, 3))) {}
 
-Status TupleCacheMgr::Init() {
+Status TupleCacheMgr::Init(int64_t process_bytes_limit) {
   if (cache_config_.empty()) {
     LOG(INFO) << "Tuple Cache is disabled.";
     return Status::OK();
@@ -183,12 +214,36 @@ Status TupleCacheMgr::Init() {
         eviction_policy_str_));
   }
 
+  // The outstanding write limit can either be a specific value, or it can be a
+  // percentage of the process bytes limit. If the process bytes limit is zero,
+  // a percentage is not allowed.
+  outstanding_write_limit_ = 
ParseUtil::ParseMemSpec(outstanding_write_limit_str_,
+      &is_percent, process_bytes_limit);
+  if (outstanding_write_limit_ <= 0) {
+    CLEAN_EXIT_WITH_ERROR(
+        Substitute("Invalid tuple cache outstanding write limit configuration: 
$0.",
+            FLAGS_tuple_cache_outstanding_write_limit));
+  }
+
+  // Setting sync_pool_size == 0 results in synchronous flushing to disk. This 
is
+  // mainly used for backend tests
+  if (sync_pool_size_ > 0) {
+    sync_thread_pool_.reset(new ThreadPool<string>("tuple-cache-mgr", 
"sync-worker",
+        sync_pool_size_, sync_pool_queue_depth_,
+        [this] (int thread_id, const string& filename) {
+          this->SyncFileToDisk(filename);
+        }));
+    RETURN_IF_ERROR(sync_thread_pool_->Init());
+  }
+
   cache_.reset(NewCache(policy, capacity, "Tuple_Cache"));
 
   RETURN_IF_ERROR(cache_->Init());
 
   LOG(INFO) << "Tuple Cache initialized at " << cache_dir_
-            << " with capacity " << PrettyPrinter::Print(capacity, 
TUnit::BYTES);
+            << " with capacity " << PrettyPrinter::Print(capacity, 
TUnit::BYTES)
+            << " and outstanding write limit: "
+            << PrettyPrinter::Print(outstanding_write_limit_, TUnit::BYTES);
   enabled_ = true;
   return Status::OK();
 }
@@ -207,36 +262,41 @@ Status TupleCacheMgr::Init() {
 //                             again, if found return it 
(IsAvailableForWrite()=false),
 //                             else create a new entry 
(IsAvailableForWrite()=true).
 //
-//                          entry found
-// Lookup(acquire_state=true)   --->      [ ... ] returns any of the states 
below
+//                         entry found
+// Lookup(acquire_state=true) ---> [ ... ] returns any of the states below
 //          |
 //          | entry absent: create new entry
-//          v               CompleteWrite
-//   [ IN_PROGRESS, false ]     --->      [ COMPLETE, true ]
+//          v            CompleteWrite                     SyncFileToDisk
+//   [ IN_PROGRESS, false ]   ---> [ COMPLETE_UNSYNCED, true ] ---> [ 
COMPLETE, true ]
 //          |
 //          | AbortWrite
-//          v              tombstone=true
-//   [ IN_PROGRESS, false ]     --->      [ TOMBSTONE, false ]
+//          v            tombstone=true
+//   [ IN_PROGRESS, false ]   ---> [ TOMBSTONE, false ]
 //          |
 //          | tombstone=false
 //          v
 //   [ IN_PROGRESS, false ] Scheduled for eviction, will be deleted once ref 
count=0.
 //
 
-enum class TupleCacheState { IN_PROGRESS, TOMBSTONE, COMPLETE };
+enum class TupleCacheState { IN_PROGRESS, TOMBSTONE, COMPLETE_UNSYNCED, 
COMPLETE };
 
 // An entry consists of a TupleCacheEntry followed by a C-string for the path.
 struct TupleCacheEntry {
   std::atomic<TupleCacheState> state{TupleCacheState::IN_PROGRESS};
-  size_t size = 0;
+  // Charge in the cache when there is a file associated with this entry. This 
is zero for
+  // TOMBSTONE and IN_PROGRESS before the first UpdateWriteSize call, but 
those states
+  // still have a base charge in the cache. During IN_PROGRESS, this is a 
reservation that
+  // exceeds the current size of the file.
+  size_t charge = 0;
 };
 
 struct TupleCacheMgr::Handle {
   Cache::UniqueHandle cache_handle;
   bool is_writer = false;
+  // Minimum charge to use if this entry becomes a TOMBSTONE
+  size_t base_charge = 0;
 };
 
-
 void TupleCacheMgr::HandleDeleter::operator()(Handle* ptr) const { delete ptr; 
}
 
 static uint8_t* getHandleData(const Cache* cache, TupleCacheMgr::Handle* 
handle) {
@@ -244,22 +304,37 @@ static uint8_t* getHandleData(const Cache* cache, 
TupleCacheMgr::Handle* handle)
   return cache->Value(handle->cache_handle).mutable_data();
 }
 
-static TupleCacheState GetState(const Cache* cache, TupleCacheMgr::Handle* 
handle) {
-  uint8_t* data = getHandleData(cache, handle);
+TupleCacheState TupleCacheMgr::GetState(TupleCacheMgr::Handle* handle) const {
+  uint8_t* data = getHandleData(cache_.get(), handle);
   return reinterpret_cast<TupleCacheEntry*>(data)->state;
 }
 
 // Returns true if state was updated.
-static bool UpdateState(const Cache* cache, TupleCacheMgr::Handle* handle,
+bool TupleCacheMgr::UpdateState(TupleCacheMgr::Handle* handle,
     TupleCacheState requiredState, TupleCacheState newState) {
-  uint8_t* data = getHandleData(cache, handle);
+  uint8_t* data = getHandleData(cache_.get(), handle);
   return reinterpret_cast<TupleCacheEntry*>(data)->
       state.compare_exchange_strong(requiredState, newState);
 }
 
-static void UpdateSize(Cache* cache, TupleCacheMgr::Handle* handle, size_t 
size) {
-  uint8_t* data = getHandleData(cache, handle);
-  reinterpret_cast<TupleCacheEntry*>(data)->size = size;
+size_t TupleCacheMgr::GetCharge(TupleCacheMgr::Handle* handle) const {
+  uint8_t* data = getHandleData(cache_.get(), handle);
+  return reinterpret_cast<TupleCacheEntry*>(data)->charge;
+}
+
+void TupleCacheMgr::UpdateWriteSize(TupleCacheMgr::Handle* handle,
+    size_t charge) {
+  uint8_t* data = getHandleData(cache_.get(), handle);
+  // We can only adjust the cache charge while an entry is IN_PROGRESS
+  DCHECK(TupleCacheState::IN_PROGRESS == GetState(handle));
+  TupleCacheEntry* entry = reinterpret_cast<TupleCacheEntry*>(data);
+  int64_t diff = charge - entry->charge;
+  entry->charge = charge;
+  cache_->UpdateCharge(handle->cache_handle, charge);
+  if (diff < 0) {
+    DCHECK_LE(-diff, tuple_cache_outstanding_writes_bytes_->GetValue());
+  }
+  tuple_cache_outstanding_writes_bytes_->Increment(diff);
 }
 
 static Cache::UniquePendingHandle CreateEntry(
@@ -282,8 +357,8 @@ static Cache::UniquePendingHandle CreateEntry(
 }
 
 // If the entry exists, the Handle pins it so it doesn't go away, but the 
entry may be in
-// any state (IN PROGRESS, TOMBSTONE, COMPLETE). If the entry doesn't exist and
-// acquire_write is true, it's created with the state IN_PROGRESS.
+// any state (IN PROGRESS, TOMBSTONE, COMPLETE_UNSYNCED, COMPLETE). If the 
entry doesn't
+// exist and acquire_write is true, it's created with the state IN_PROGRESS.
 TupleCacheMgr::UniqueHandle TupleCacheMgr::Lookup(
     const Slice& key, bool acquire_write) {
   if (!enabled_) return nullptr;
@@ -319,6 +394,7 @@ TupleCacheMgr::UniqueHandle TupleCacheMgr::Lookup(
       VLOG_FILE << "Tuple Cache Entry created for " << path;
       handle->cache_handle = move(chandle);
       handle->is_writer = true;
+      handle->base_charge = sizeof(TupleCacheEntry) + path.size() + 1;
     }
   }
 
@@ -327,45 +403,123 @@ TupleCacheMgr::UniqueHandle TupleCacheMgr::Lookup(
 
 bool TupleCacheMgr::IsAvailableForRead(UniqueHandle& handle) const {
   if (!handle || !handle->cache_handle) return false;
-  return TupleCacheState::COMPLETE == GetState(cache_.get(), handle.get());
+  TupleCacheState state = GetState(handle.get());
+  return TupleCacheState::COMPLETE_UNSYNCED == state ||
+      TupleCacheState::COMPLETE == state;
 }
 
 bool TupleCacheMgr::IsAvailableForWrite(UniqueHandle& handle) const {
   if (!handle || !handle->cache_handle) return false;
-  return handle->is_writer &&
-      TupleCacheState::IN_PROGRESS == GetState(cache_.get(), handle.get());
+  return handle->is_writer && TupleCacheState::IN_PROGRESS == 
GetState(handle.get());
 }
 
 void TupleCacheMgr::CompleteWrite(UniqueHandle handle, size_t size) {
   DCHECK(enabled_);
   DCHECK(handle != nullptr && handle->cache_handle != nullptr);
+  DCHECK(handle->is_writer);
   DCHECK_LE(size, MaxSize());
   DCHECK_GE(size, 0);
+  if (sync_pool_size_ > 0 &&
+      sync_thread_pool_->GetQueueSize() >= sync_pool_queue_depth_) {
+    // The sync_thread_pool_ has reached its max queue size. This should 
almost never
+    // happen, as the outstanding writes limit should kick in before this is 
overwhelmed.
+    // If it does happen, bail out.
+    AbortWrite(move(handle), false);
+    tuple_cache_dropped_sync_->Increment(1);
+    return;
+  }
   VLOG_FILE << "Tuple Cache: Complete " << GetPath(handle) << " (" << size << 
")";
-  CHECK(UpdateState(cache_.get(), handle.get(),
-      TupleCacheState::IN_PROGRESS, TupleCacheState::COMPLETE));
-  UpdateSize(cache_.get(), handle.get(), size);
-  cache_->UpdateCharge(handle->cache_handle, size);
+  UpdateWriteSize(handle.get(), size);
+  CHECK(UpdateState(handle.get(),
+      TupleCacheState::IN_PROGRESS, TupleCacheState::COMPLETE_UNSYNCED));
   tuple_cache_entries_in_use_bytes_->Increment(size);
   tuple_cache_entry_size_stats_->Update(size);
+  // When the sync_pool_size_ is 0, there is no thread pool and this does the 
sync
+  // directly. This is used for backend tests to avoid race conditions.
+  if (sync_pool_size_ > 0) {
+    // Offer the cache key to the thread pool.
+    bool success = 
sync_thread_pool_->Offer(cache_->Key(handle->cache_handle).ToString());
+    if (!success) {
+      // The queue is full, so evict this entry
+      VLOG_FILE << "Tuple Cache: Sync thread pool queue full. Evicting "
+                << GetPath(handle);
+      cache_->Erase(cache_->Key(handle->cache_handle));
+      tuple_cache_dropped_sync_->Increment(1);
+    }
+  } else {
+    SyncFileToDisk(cache_->Key(handle->cache_handle).ToString());
+  }
 }
 
 void TupleCacheMgr::AbortWrite(UniqueHandle handle, bool tombstone) {
   DCHECK(enabled_);
   DCHECK(handle != nullptr && handle->cache_handle != nullptr);
+  DCHECK(handle->is_writer);
   if (tombstone) {
     VLOG_FILE << "Tuple Cache: Tombstone " << GetPath(handle);
     tuple_cache_tombstones_in_use_->Increment(1);
-    CHECK(UpdateState(cache_.get(), handle.get(),
+    // We update the write size to 0 to remove the existing cache charge
+    // (and decrement the outstanding writes counter)
+    UpdateWriteSize(handle.get(), 0);
+    CHECK(UpdateState(handle.get(),
         TupleCacheState::IN_PROGRESS, TupleCacheState::TOMBSTONE));
+    // We want the tombstone cache entry to have a base charge, so set that now
+    // (without counting towards the outstanding writes).
+    cache_->UpdateCharge(handle->cache_handle, handle->base_charge);
   } else {
     // Remove the cache entry. Leaves state IN_PROGRESS so entry won't be 
reused until
     // successfully evicted.
-    DCHECK(TupleCacheState::IN_PROGRESS == GetState(cache_.get(), 
handle.get()));
+    DCHECK(TupleCacheState::IN_PROGRESS == GetState(handle.get()));
     cache_->Erase(cache_->Key(handle->cache_handle));
   }
 }
 
+Status TupleCacheMgr::RequestWriteSize(UniqueHandle* handle, size_t new_size) {
+  // The handle better be from a writer
+  DCHECK((*handle)->is_writer);
+
+  uint8_t* data = getHandleData(cache_.get(), handle->get());
+  size_t cur_charge = reinterpret_cast<TupleCacheEntry*>(data)->charge;
+  if (new_size > cur_charge) {
+    // Need to increase the charge, which can fail
+    // 1. There is a maximum size for any given entry
+    // 2. There is a maximum amount of outstanding writes (i.e. dirty buffers)
+    // The chunk size limits the frequency of incrementing the counter in the 
cache
+    // itself. The chunk size is disabled for unit tests to have exact counter 
values.
+    // The chunk size does not impact enforcement of the maximum entry size.
+
+    // An individual entry cannot exceed the MaxSize()
+    if (new_size > MaxSize()) {
+      tuple_cache_halted_->Increment(1);
+      return Status(TErrorCode::TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED, 
MaxSize());
+    }
+
+    size_t new_charge = new_size;
+    if (outstanding_write_chunk_bytes_ != 0) {
+      new_charge = ((new_size / outstanding_write_chunk_bytes_) + 1) *
+          outstanding_write_chunk_bytes_;
+      // The chunk size should not change the behavior of the MaxSize(), so 
limit the
+      // new_charge to MaxSize() if it would otherwise exceed it.
+      if (new_charge > MaxSize()) {
+        new_charge = MaxSize();
+      }
+    }
+    int64_t diff = new_charge - cur_charge;
+    DCHECK_GT(new_charge, cur_charge);
+    DCHECK_GE(new_charge, new_size);
+
+    // Limit the total outstanding writes to avoid excessive dirty buffers for 
the OS
+    if (tuple_cache_outstanding_writes_bytes_->GetValue() + diff >
+        outstanding_write_limit_) {
+      tuple_cache_backpressure_halted_->Increment(1);
+      return Status(TErrorCode::TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED,
+          outstanding_write_limit_);
+    }
+    UpdateWriteSize(handle->get(), new_charge);
+  }
+  return Status::OK();
+}
+
 const char* TupleCacheMgr::GetPath(UniqueHandle& handle) const {
   DCHECK(enabled_);
   DCHECK(handle != nullptr && handle->cache_handle != nullptr);
@@ -426,9 +580,22 @@ void TupleCacheMgr::EvictedEntry(Slice key, Slice value) {
     DCHECK(tuple_cache_entries_in_use_bytes_ != nullptr);
     tuple_cache_entries_evicted_->Increment(1);
     tuple_cache_entries_in_use_->Increment(-1);
-    tuple_cache_entries_in_use_bytes_->Increment(-entry->size);
     DCHECK_GE(tuple_cache_entries_in_use_->GetValue(), 0);
-    DCHECK_GE(tuple_cache_entries_in_use_bytes_->GetValue(), 0);
+    // entries_in_use_bytes is incremented only when the entry reaches the
+    // COMPLETE_UNSYNCED state
+    if (TupleCacheState::COMPLETE_UNSYNCED == entry->state ||
+        TupleCacheState::COMPLETE == entry->state) {
+      tuple_cache_entries_in_use_bytes_->Increment(-entry->charge);
+      DCHECK_GE(tuple_cache_entries_in_use_bytes_->GetValue(), 0);
+    }
+    // Outstanding write bytes are accumulated during IN_PROGRESS, and remain 
set until
+    // the transition from COMPLETE_UNSYNCED to COMPLETE.
+    if (TupleCacheState::COMPLETE_UNSYNCED == entry->state ||
+        TupleCacheState::IN_PROGRESS == entry->state) {
+      DCHECK(tuple_cache_outstanding_writes_bytes_ != nullptr);
+      DCHECK_GE(tuple_cache_outstanding_writes_bytes_->GetValue(), 
entry->charge);
+      tuple_cache_outstanding_writes_bytes_->Increment(-entry->charge);
+    }
   } else {
     DCHECK(tuple_cache_tombstones_in_use_ != nullptr);
     tuple_cache_tombstones_in_use_->Increment(-1);
@@ -465,4 +632,66 @@ Status TupleCacheMgr::DeleteExistingFiles() const {
   }
   return Status::OK();
 }
+
+void TupleCacheMgr::SyncFileToDisk(const string& cache_key) {
+  Cache::UniqueHandle pos = cache_->Lookup(cache_key);
+  // The entry can be evicted while waiting to be synced to disk. If the entry 
no longer
+  // exists, there is nothing to do.
+  if (pos == nullptr) return;
+  UniqueHandle handle{new Handle()};
+  handle->cache_handle = move(pos);
+  // If the entry has a state other than COMPLETE_UNSYNCED, it could have been
+  // evicted and recreated. There is nothing to do.
+  if (TupleCacheState::COMPLETE_UNSYNCED != GetState(handle.get())) {
+    return;
+  }
+  bool success = true;
+  // Some unit tests don't create a real file when testing the TupleCacheMgr, 
so
+  // only do the sync if there is a backing file
+  bool has_backing_file = !(debug_pos_ & DebugPos::NO_FILES);
+  if (has_backing_file) {
+    // Open the cache file associated with this key, then call Sync() on it, 
and
+    // close it.
+    std::string file_path = GetPath(handle);
+    std::unique_ptr<kudu::RWFile> file_to_sync;
+    kudu::RWFileOptions opts;
+    opts.mode = kudu::Env::OpenMode::MUST_EXIST;
+    kudu::Status s = kudu::Env::Default()->NewRWFile(opts, file_path, 
&file_to_sync);
+    if (!s.ok()) {
+      LOG(WARNING) << Substitute("SyncFileToDisk: Failed to open file $0: $1", 
file_path,
+          s.ToString());
+      success = false;
+    } else {
+      s = file_to_sync->Sync();
+      if (!s.ok()) {
+        LOG(WARNING) << Substitute("SyncFileToDisk: Failed to sync file $0: 
$1",
+            file_path, s.ToString());
+        success = false;
+      }
+      // Close the file even if Sync() fails
+      s = file_to_sync->Close();
+      if (!s.ok()) {
+        LOG(WARNING) << Substitute("SyncFileToDisk: Failed to close file $0: 
$1",
+            file_path, s.ToString());
+        success = false;
+      }
+    }
+  }
+  if (success) {
+    bool update_succeeded = UpdateState(handle.get(),
+        TupleCacheState::COMPLETE_UNSYNCED, TupleCacheState::COMPLETE);
+    if (update_succeeded) {
+      
tuple_cache_outstanding_writes_bytes_->Increment(-GetCharge(handle.get()));
+    }
+    // Only crash for a failed state change on debug builds. The sync completed
+    // and the state change doesn't really impact external behavior. It isn't
+    // worth crashing on a release build.
+    DCHECK(update_succeeded);
+  } else {
+    // In case of any error, erase this cache entry
+    VLOG_FILE << "Tuple Cache: SyncFileToDisk failed. Evicting " << 
GetPath(handle);
+    cache_->Erase(cache_->Key(handle->cache_handle));
+    tuple_cache_failed_sync_->Increment(1);
+  }
+}
 } // namespace impala
diff --git a/be/src/runtime/tuple-cache-mgr.h b/be/src/runtime/tuple-cache-mgr.h
index 7a927f5b4..16dea0f3f 100644
--- a/be/src/runtime/tuple-cache-mgr.h
+++ b/be/src/runtime/tuple-cache-mgr.h
@@ -24,6 +24,7 @@
 #include "runtime/bufferpool/buffer-pool.h"
 #include "util/cache/cache.h"
 #include "util/metrics.h"
+#include "util/thread-pool.h"
 
 namespace impala {
 
@@ -33,6 +34,8 @@ class TupleReader;
 // Declaration of the debug tuple cache bad postfix constant.
 extern const char* DEBUG_TUPLE_CACHE_BAD_POSTFIX;
 
+enum class TupleCacheState;
+
 /// The TupleCacheMgr maintains per-daemon settings and metadata for the tuple 
cache.
 /// This it used by the various TupleCacheNodes from queries to lookup the 
cache
 /// entries or write cache entries. The TupleCacheMgr maintains the capacity 
constraint
@@ -58,14 +61,15 @@ public:
   ~TupleCacheMgr() = default;
 
   // Initialize the TupleCacheMgr. Must be called before any of the other APIs.
-  Status Init() WARN_UNUSED_RESULT;
+  // The process_bytes_limit is used to scale a percentage value for the 
outstanding
+  // writes limit. If it is set to 0, a percentage value is not allowed.
+  Status Init(int64_t process_bytes_limit = 0) WARN_UNUSED_RESULT;
 
   /// Enum for metric type.
   enum class MetricType {
     HIT,
     MISS,
     SKIPPED,
-    HALTED,
   };
 
   struct DebugDumpCacheMetaData {
@@ -108,6 +112,12 @@ public:
   // queries.
   void AbortWrite(UniqueHandle handle, bool tombstone);
 
+  // Request an increase to the outstanding write size. This should be called 
before
+  // writing more data to a tuple cache file. If the new_size exceeds the 
maximum size
+  // for a cache entry, this returns TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED. 
This returns
+  // TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED if it hits the outstanding 
writes limit.
+  Status RequestWriteSize(UniqueHandle* handle, size_t new_size);
+
   /// Get path to read/write.
   const char* GetPath(UniqueHandle&) const;
 
@@ -126,9 +136,6 @@ public:
       case MetricType::SKIPPED:
         tuple_cache_skipped_->Increment(1);
         break;
-      case MetricType::HALTED:
-        tuple_cache_halted_->Increment(1);
-        break;
     }
   }
 
@@ -161,35 +168,69 @@ public:
   TupleCacheMgr& operator=(const TupleCacheMgr&) = delete;
 
   friend class TupleCacheMgrTest;
+  FRIEND_TEST(TupleCacheMgrTest, TestRequestWriteSize);
+  FRIEND_TEST(TupleCacheMgrTest, TestOutstandingWriteLimit);
+  FRIEND_TEST(TupleCacheMgrTest, TestOutstandingWriteLimitConcurrent);
+  FRIEND_TEST(TupleCacheMgrTest, TestOutstandingWriteChunkSize);
+  FRIEND_TEST(TupleCacheMgrTest, TestDroppedSyncs);
 
   // Constructor for tests
   enum DebugPos {
     FAIL_ALLOCATE = 1 << 0,
     FAIL_INSERT   = 1 << 1,
+    NO_FILES      = 1 << 2,
   };
-  TupleCacheMgr(string cache_config, string eviction_policy_str,
-      MetricGroup* metrics, uint8_t debug_pos);
+  TupleCacheMgr(string cache_config, string eviction_policy_str, MetricGroup* 
metrics,
+      uint8_t debug_pos, uint32_t sync_pool_size, uint32_t 
sync_pool_queue_depth,
+      string outstanding_write_limit_str, uint32_t 
outstanding_write_chunk_bytes);
 
   // Delete any existing files in the cache directory to start fresh
   Status DeleteExistingFiles() const;
 
+  // Sync file for cache key to disk
+  void SyncFileToDisk(const std::string& cache_key);
+
+  // Get the current state for a cache handle
+  TupleCacheState GetState(Handle* handle) const;
+
+  // Update a handle's state to newState, verifying that it matches the 
requredState.
+  // If the update fails, return false.
+  bool UpdateState(Handle* handle, TupleCacheState requiredState,
+      TupleCacheState newState);
+
+  // Get the current charge for this handle.
+  size_t GetCharge(Handle* handle) const;
+
+  // Update the current charge for this handle and adjust the outstanding 
writes
+  // counter accordingly.
+  void UpdateWriteSize(Handle* handle, size_t charge);
+
   const std::string cache_config_;
   const std::string eviction_policy_str_;
+  const std::string outstanding_write_limit_str_;
 
   std::string cache_dir_;
   std::string cache_debug_dump_dir_;
   bool enabled_ = false;
   uint8_t debug_pos_;
+  uint32_t sync_pool_size_;
+  uint32_t sync_pool_queue_depth_;
+  uint32_t outstanding_write_chunk_bytes_;
+  int64_t outstanding_write_limit_ = 0;
 
   /// Metrics for the tuple cache in the daemon level.
   IntCounter* tuple_cache_hits_;
   IntCounter* tuple_cache_misses_;
   IntCounter* tuple_cache_skipped_;
   IntCounter* tuple_cache_halted_;
+  IntCounter* tuple_cache_backpressure_halted_;
   IntCounter* tuple_cache_entries_evicted_;
+  IntCounter* tuple_cache_failed_sync_;
+  IntCounter* tuple_cache_dropped_sync_;
   IntGauge* tuple_cache_entries_in_use_;
   IntGauge* tuple_cache_entries_in_use_bytes_;
   IntGauge* tuple_cache_tombstones_in_use_;
+  IntGauge* tuple_cache_outstanding_writes_bytes_;
 
   /// Statistics for the tuple cache sizes allocated.
   HistogramMetric* tuple_cache_entry_size_stats_;
@@ -206,6 +247,9 @@ public:
   /// An in-memory presentation for metadata of tuple caches for debug 
verification.
   /// The key is the key of the tuple cache.
   std::unordered_map<std::string, DebugDumpCacheMetaData> 
debug_dump_caches_metadata_;
+
+  /// Thread pool for syncing files to disk
+  std::unique_ptr<ThreadPool<std::string>> sync_thread_pool_;
 };
 
 }
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 3717994d9..c55d017f9 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -38,6 +38,7 @@
 #include "thirdparty/datasketches/MurmurHash3.h"
 #include "util/debug-util.h"
 #include "util/parse-util.h"
+#include "util/time.h"
 
 DECLARE_int64(min_buffer_size);
 
@@ -55,6 +56,10 @@ DEFINE_bool_hidden(tuple_cache_ignore_query_options, false,
     "If true, don't compute TQueryOptionsHash for tuple caching to allow 
testing tuple "
     "caching failure modes.");
 
+DEFINE_bool_hidden(tuple_cache_query_options_random_seed, false,
+    "Inject randomness into the TQueryOptionsHash to force zero hits. This is 
for "
+    "testing only.");
+
 DEFINE_string_hidden(tuple_cache_exempt_query_options, "",
     "A comma-separated list of additional query options to exclude from the 
tuple cache "
     "key. Option names must be lower-case.");
@@ -1511,7 +1516,13 @@ TQueryOptionsHash impala::QueryOptionsResultHash(const 
TQueryOptions& query_opti
     exempt = Split(FLAGS_tuple_cache_exempt_query_options, ",", SkipEmpty());
   }
 
-  HashState hash{QUERY_OPTION_HASH_SEED, QUERY_OPTION_HASH_SEED};
+  uint64_t seed = QUERY_OPTION_HASH_SEED;
+  // To allow testing scenarios with zero hits, the random seed flags 
incorporates the
+  // current time into the query option hash.
+  if (FLAGS_tuple_cache_query_options_random_seed) {
+    seed = static_cast<uint64_t>(MonotonicNanos());
+  }
+  HashState hash{seed, seed};
 #define QUERY_OPT_FN(NAME, ENUM, LEVEL) \
   if (query_options.__isset.NAME && exempt.count(#NAME) == 0) \
     HashQueryOptionValue(query_options.NAME, hash);
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index 7d404d7e8..3390a9dfa 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -494,7 +494,13 @@ error_codes = (
 
   ("TUPLE_CACHE_INCONSISTENCY", 160, "Inconsistent tuple cache found: $0."),
 
-  ("OAUTH_VERIFY_FAILED", 161, "Error verifying OAuth Token: $0.")
+  ("OAUTH_VERIFY_FAILED", 161, "Error verifying OAuth Token: $0."),
+
+  ("TUPLE_CACHE_ENTRY_SIZE_LIMIT_EXCEEDED", 162, "Exceeded the maximum size 
for a tuple "
+   "cache entry ($0 bytes)"),
+
+  ("TUPLE_CACHE_OUTSTANDING_WRITE_LIMIT_EXCEEDED", 163, "Outstanding tuple 
cache writes "
+   "exceeded the limit ($0 bytes)")
 )
 
 import sys
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 8b3aece34..b58e3c55f 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -119,6 +119,46 @@
     "kind": "COUNTER",
     "key": "impala.tuple-cache.halted"
   },
+  {
+    "description": "The total number of Tuple Cache that halted writing due to 
backpressure",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Tuple Cache Halted Writing Due to Backpressure",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala.tuple-cache.backpressure-halted"
+  },
+  {
+    "description": "The total number of Tuple Cache syncs to disk that failed",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Tuple Cache Failed Syncs",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala.tuple-cache.failed-syncs"
+  },
+  {
+    "description": "The total number of Tuple Cache syncs to disk dropped due 
to backpressure",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Tuple Cache Dropped Syncs",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala.tuple-cache.dropped-syncs"
+  },
+  {
+    "description": "The total outstanding bytes of Tuple Cache Writes",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Outstanding Tuple Cache Writes total bytes",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "impala.tuple-cache.outstanding-writes-bytes"
+  },
   {
     "description": "The number of in-use Tuple Cache Entries",
     "contexts": [

Reply via email to