This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 017175558 IMPALA-12123: Fix crash triggered by incomplete HDFS cache 
reads
017175558 is described below

commit 017175558341204bc32dd7998b245a12995234d7
Author: Joe McDonnell <[email protected]>
AuthorDate: Wed May 10 15:26:44 2023 -0700

    IMPALA-12123: Fix crash triggered by incomplete HDFS cache reads
    
    When using HDFS caching, the HDFS cache may not have the full
    buffer in memory, and it can return a buffer that is incomplete.
    In this case, the code falls back to the ordinary read path.
    However, the ScanRange cache_ structure is still set up, and
    the code in ScanRange::ReadSubRanges() tries to use it. This
    can crash, because the buffer is too short (and may have been
    freed).
    
    This changes the code to null out the cache_ data structure
    when there is an incomplete read from the HDFS cache.
    
    Testing:
     - Reproduced the crash stack manually by putting a Parquet
       file with a page index in HDFS cache and manually forcing
       it down the incomplete read codepath.
     - Modified the disk-io-mgr-test and CacheReaderTestStub to
       simulate the incomplete read case. The test will hit a
       DCHECK or crash without this fixup.
    
    Change-Id: I51d8be6c03716badee81675447ed94ae6249b21b
    Reviewed-on: http://gerrit.cloudera.org:8080/19869
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Zoltan Borok-Nagy <[email protected]>
---
 be/src/runtime/io/cache-reader-test-stub.h | 74 ++++++++++++++++++++++--------
 be/src/runtime/io/disk-io-mgr-test.cc      | 55 +++++++++++-----------
 be/src/runtime/io/scan-range.cc            |  5 ++
 3 files changed, 87 insertions(+), 47 deletions(-)

diff --git a/be/src/runtime/io/cache-reader-test-stub.h 
b/be/src/runtime/io/cache-reader-test-stub.h
index 00be66530..e195c8a2c 100644
--- a/be/src/runtime/io/cache-reader-test-stub.h
+++ b/be/src/runtime/io/cache-reader-test-stub.h
@@ -17,45 +17,79 @@
 
 #pragma once
 
-#include "runtime/io/file-reader.h"
+#include "runtime/io/local-file-reader.h"
 #include "runtime/io/request-ranges.h"
 
 namespace impala {
 namespace io {
 
-/// Only for testing the code path when reading from the cache is successful.
-/// Takes a pointer to a buffer in its constructor, also the length of this 
buffer.
-/// CachedFile() simply returns the pointer and length.
-/// Invoking ReadFromPos() on it results in an error.
-class CacheReaderTestStub : public FileReader {
+// Test scenarios:
+// VALID_BUFFER - Successful HDFS cache read returns the whole buffer / length
+// FALLBACK_NULL_BUFFER - Failed HDFS cache read returns null / length = 0
+// FALLBACK_INCOMPLETE_BUFFER - Failed HDFS cache read returns invalid buffer
+//   with a short length
+enum HdfsCachingScenario {
+    VALID_BUFFER,
+    FALLBACK_NULL_BUFFER,
+    FALLBACK_INCOMPLETE_BUFFER
+};
+
+/// This simulates reads from HDFS caching for the successful path 
(VALID_BUFFER)
+/// and some unsuccessful paths (FALLBACK*) that should fall back to the normal
+/// file read path.
+/// Takes a pointer to a buffer, the length of the buffer, and an indicator of
+/// what scenario to test. The scenario determins what CachedFile() returns.
+/// See the description of HdfsCachingScenario above.
+/// All other methods fall through to the LocalFileReader, so fallback reads
+/// can succeed for applicable scenarios. The passthrough of Open() and Close()
+/// are harmless for the successful cache scenario.
+class CacheReaderTestStub : public LocalFileReader {
 public:
-  CacheReaderTestStub(ScanRange* scan_range, uint8_t* cache, int64_t length) :
-    FileReader(scan_range),
+  CacheReaderTestStub(ScanRange* scan_range, uint8_t* cache, int64_t length,
+                      HdfsCachingScenario scenario) :
+    LocalFileReader(scan_range),
     cache_(cache),
-    length_(length) {
+    length_(length),
+    scenario_(scenario){
   }
 
   ~CacheReaderTestStub() {}
 
-  virtual Status Open() override {
-    return Status::OK();
-  }
-
-  virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset, uint8_t* 
buffer,
-      int64_t bytes_to_read, int64_t* bytes_read, bool* eof) override {
-    DCHECK(false);
-    return Status("Not implemented");
+  virtual Status ReadFromPos(DiskQueue* queue, int64_t file_offset,
+      uint8_t* buffer, int64_t bytes_to_read, int64_t* bytes_read, bool* eof) 
override {
+    // This should not be reached for the VALID_BUFFER scenario, because
+    // the reads will come from the cached buffer.
+    DCHECK_NE(scenario_, VALID_BUFFER);
+    return LocalFileReader::ReadFromPos(queue, file_offset, buffer, 
bytes_to_read,
+        bytes_read, eof);
   }
 
   virtual void CachedFile(uint8_t** data, int64_t* length) override {
-    *length = length_;
-    *data = cache_;
+    switch (scenario_) {
+    case VALID_BUFFER:
+      *length = length_;
+      *data = cache_;
+      break;
+    case FALLBACK_NULL_BUFFER:
+      *length = 0;
+      *data = nullptr;
+      break;
+    case FALLBACK_INCOMPLETE_BUFFER:
+      // Use a fake too-short length and fake non-null buffer
+      // The buffer should not be dereferenced in this scenario, so
+      // it is useful for it to be invalid.
+      *length = 1;
+      *data = (uint8_t *) 0x1;
+      break;
+    default:
+      DCHECK(false) << "Invalid HdfsCachingScenario value";
+    }
   }
 
-  virtual void Close() override {}
 private:
   uint8_t* cache_ = nullptr;
   int64_t length_ = 0;
+  HdfsCachingScenario scenario_;
 };
 
 }
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc 
b/be/src/runtime/io/disk-io-mgr-test.cc
index f50fb7efa..7eb8984b1 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -323,7 +323,7 @@ class DiskIoMgrTest : public testing::Test {
       vector<ScanRange::SubRange> sub_ranges = {});
 
   void CachedReadsTestBody(const char* data, const char* expected,
-      bool fake_cache, vector<ScanRange::SubRange> sub_ranges = {});
+      HdfsCachingScenario scenario, vector<ScanRange::SubRange> sub_ranges = 
{});
 
   /// Convenience function to get a reference to the buffer pool.
   BufferPool* buffer_pool() const { return 
ExecEnv::GetInstance()->buffer_pool(); }
@@ -1050,7 +1050,7 @@ TEST_F(DiskIoMgrTest, MemScarcity) {
 }
 
 void DiskIoMgrTest::CachedReadsTestBody(const char* data, const char* expected,
-    bool fake_cache, vector<ScanRange::SubRange> sub_ranges) {
+    HdfsCachingScenario scenario, vector<ScanRange::SubRange> sub_ranges) {
   const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
   uint8_t* cached_data = reinterpret_cast<uint8_t*>(const_cast<char*>(data));
   int len = strlen(data);
@@ -1073,10 +1073,8 @@ void DiskIoMgrTest::CachedReadsTestBody(const char* 
data, const char* expected,
     ScanRange* complete_range =
         InitRange(&pool_, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, 
nullptr, true,
             sub_ranges);
-    if (fake_cache) {
-      SetReaderStub(complete_range, make_unique<CacheReaderTestStub>(
-          complete_range, cached_data, len));
-    }
+    SetReaderStub(complete_range, 
make_unique<CacheReaderTestStub>(complete_range,
+        cached_data, len, scenario));
 
     // Issue some reads before the async ones are issued
     ValidateSyncRead(&io_mgr, reader.get(), &read_client, complete_range, 
expected);
@@ -1088,9 +1086,8 @@ void DiskIoMgrTest::CachedReadsTestBody(const char* data, 
const char* expected,
       ScanRange* range = InitRange(&pool_, tmp_file, 0, len, disk_id, 
stat_val.st_mtime,
           nullptr, true, sub_ranges);
       ranges.push_back(range);
-      if (fake_cache) {
-        SetReaderStub(range, make_unique<CacheReaderTestStub>(range, 
cached_data, len));
-      }
+      SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cached_data,
+          len, scenario));
     }
     ASSERT_OK(reader->AddScanRanges(ranges, EnqueueLocation::TAIL));
 
@@ -1124,10 +1121,11 @@ void DiskIoMgrTest::CachedReadsTestBody(const char* 
data, const char* expected,
 TEST_F(DiskIoMgrTest, CachedReads) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);
   const char* data = "abcdefghijklm";
-  // Don't fake the cache, i.e. test the fallback mechanism
-  CachedReadsTestBody(data, data, false);
-  // Fake the test with a file reader stub.
-  CachedReadsTestBody(data, data, true);
+  // Test the fallback mechanism for the cache
+  CachedReadsTestBody(data, data, FALLBACK_NULL_BUFFER);
+  CachedReadsTestBody(data, data, FALLBACK_INCOMPLETE_BUFFER);
+  // Test the success path for the cache
+  CachedReadsTestBody(data, data, VALID_BUFFER);
 }
 
 // Test when some scan ranges are marked as being cached and there
@@ -1139,10 +1137,11 @@ TEST_F(DiskIoMgrTest, CachedReadsSubRanges) {
 
   // first iteration tests the fallback mechanism with sub-ranges
   // second iteration fakes a cache
-  for (bool fake_cache : {false, true}) {
-    CachedReadsTestBody(data, data, fake_cache, {{0, data_len}});
-    CachedReadsTestBody(data, "bc", fake_cache, {{1, 2}});
-    CachedReadsTestBody(data, "abchilm", fake_cache, {{0, 3}, {7, 2}, {11, 
2}});
+  for (HdfsCachingScenario scenario :
+       {VALID_BUFFER, FALLBACK_NULL_BUFFER, FALLBACK_INCOMPLETE_BUFFER}) {
+    CachedReadsTestBody(data, data, scenario, {{0, data_len}});
+    CachedReadsTestBody(data, "bc", scenario, {{1, 2}});
+    CachedReadsTestBody(data, "abchilm", scenario, {{0, 3}, {7, 2}, {11, 2}});
   }
 }
 
@@ -1635,18 +1634,19 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
   // Reader doesn't need to provide client if it's providing buffers.
   unique_ptr<RequestContext> reader = io_mgr->RegisterContext();
 
-  auto test_case = [&](bool fake_cache, const char* expected_result,
+  auto test_case = [&](HdfsCachingScenario scenario, const char* 
expected_result,
       vector<ScanRange::SubRange> sub_ranges) {
     int result_len = strlen(expected_result);
     vector<uint8_t> client_buffer(result_len);
     ScanRange* range = pool_.Add(new ScanRange);
-    int cache_options = fake_cache ? BufferOpts::USE_HDFS_CACHE : 
BufferOpts::NO_CACHING;
+    // Note: Even though this specifies HDFS caching, some scenarios will fall 
back
+    // to doing regular reads.
+    int cache_options = BufferOpts::USE_HDFS_CACHE;
     range->Reset(ScanRange::FileInfo{tmp_file, nullptr, stat_val.st_mtime}, 
data_len, 0,
         0, true, BufferOpts::ReadInto(cache_options, client_buffer.data(), 
result_len),
         move(sub_ranges));
-    if (fake_cache) {
-      SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, 
data_len));
-    }
+    SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, 
data_len,
+        scenario));
     bool needs_buffers;
     ASSERT_OK(reader->StartScanRange(range, &needs_buffers));
     ASSERT_FALSE(needs_buffers);
@@ -1663,11 +1663,12 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) {
     range->ReturnBuffer(move(io_buffer));
   };
 
-  for (bool fake_cache : {false, true}) {
-    test_case(fake_cache, data, {{0, data_len}});
-    test_case(fake_cache, data, {{0, 15}, {15, data_len - 15}});
-    test_case(fake_cache, "quick fox", {{4, 5}, {15, 4}});
-    test_case(fake_cache, "the brown dog", {{0, 3}, {9, 6}, {data_len - 4, 
4}});
+  for (HdfsCachingScenario scenario :
+       {VALID_BUFFER, FALLBACK_NULL_BUFFER, FALLBACK_INCOMPLETE_BUFFER}) {
+    test_case(scenario, data, {{0, data_len}});
+    test_case(scenario, data, {{0, 15}, {15, data_len - 15}});
+    test_case(scenario, "quick fox", {{4, 5}, {15, 4}});
+    test_case(scenario, "the brown dog", {{0, 3}, {9, 6}, {data_len - 4, 4}});
   }
 
   io_mgr->UnregisterContext(reader.get());
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 5dd5eae8f..ab44c83ba 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -342,6 +342,7 @@ Status ScanRange::ReadSubRanges(
         buffer_desc->buffer_len() - buffer_desc->len());
 
     if (cache_.data != nullptr) {
+      DCHECK_LE(offset + bytes_to_read, cache_.len);
       memcpy(buffer_desc->buffer_ + buffer_desc->len(),
           cache_.data + offset, bytes_to_read);
     } else {
@@ -637,6 +638,10 @@ Status ScanRange::ReadFromCache(
   if (cache_.len < len()) {
     VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". 
Expected "
       << len() << " bytes, but read " << cache_.len << ". Switching to disk 
read path.";
+    // Null out the cache buffer to avoid any interactions when this falls
+    // back to the regular read path.
+    cache_.len = 0;
+    cache_.data = nullptr;
     // Close the scan range. 'read_succeeded' is still false, so the caller 
will fall back
     // to non-cached read of this scan range.
     file_reader_->Close();

Reply via email to