This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e5c2bb9699 [fix](remote)Fix bug for Cache Reader (#11629)
e5c2bb9699 is described below

commit e5c2bb969979d320308e9bac1850d29bf7a3ae20
Author: pengxiangyu <diablo...@163.com>
AuthorDate: Fri Aug 12 13:40:32 2022 +0800

    [fix](remote)Fix bug for Cache Reader (#11629)
---
 be/src/io/cache/sub_file_cache.cpp              | 24 ++++++++++++++++++++----
 be/src/io/cache/whole_file_cache.cpp            | 22 +++++++++++++++++++---
 be/src/olap/rowset/beta_rowset.cpp              | 14 +++++++++++---
 be/src/olap/rowset/beta_rowset.h                |  2 ++
 be/src/olap/rowset/segment_v2/segment.cpp       |  6 +++---
 be/src/olap/rowset/segment_v2/segment.h         |  5 +++--
 be/test/olap/rowset/segment_v2/segment_test.cpp |  4 ++--
 be/test/tools/benchmark_tool.cpp                |  2 +-
 8 files changed, 61 insertions(+), 18 deletions(-)

diff --git a/be/src/io/cache/sub_file_cache.cpp 
b/be/src/io/cache/sub_file_cache.cpp
index 519fa29730..356d21e0e4 100644
--- a/be/src/io/cache/sub_file_cache.cpp
+++ b/be/src/io/cache/sub_file_cache.cpp
@@ -52,7 +52,16 @@ Status SubFileCache::read_at(size_t offset, Slice result, 
size_t* bytes_read) {
         }
     }
     if (need_download) {
-        std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+        std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
+        bool cache_dir_exist = false;
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->exists(_cache_dir, 
&cache_dir_exist),
+                fmt::format("Check local cache dir exist failed. {}", 
_cache_dir.native()));
+        if (!cache_dir_exist) {
+            RETURN_NOT_OK_STATUS_WITH_WARN(
+                    
io::global_local_filesystem()->create_directory(_cache_dir),
+                    fmt::format("Create local cache dir failed. {}", 
_cache_dir.native()));
+        }
         for (vector<size_t>::const_iterator iter = need_cache_offsets.cbegin();
              iter != need_cache_offsets.cend(); ++iter) {
             if (_cache_file_readers.find(*iter) == _cache_file_readers.end() ||
@@ -145,8 +154,15 @@ Status SubFileCache::_generate_cache_reader(size_t offset, 
size_t req_size) {
                 file_writer->append(file_slice),
                 fmt::format("Write local cache file failed: {}", 
cache_file.native()));
         RETURN_NOT_OK_STATUS_WITH_WARN(
-                io::global_local_filesystem()->create_file(cache_done_file, 
&file_writer),
+                file_writer->close(),
+                fmt::format("Close local cache file failed: {}", 
cache_file.native()));
+        io::FileWriterPtr done_file_writer;
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->create_file(cache_done_file, 
&done_file_writer),
                 fmt::format("Create local done file failed: {}", 
cache_done_file.native()));
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                done_file_writer->close(),
+                fmt::format("Close local done file failed: {}", 
cache_done_file.native()));
     }
     io::FileReaderSPtr cache_reader;
     RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, 
&cache_reader));
@@ -181,7 +197,7 @@ Status SubFileCache::clean_timeout_cache() {
         }
     }
     if (timeout_keys.size() > 0) {
-        std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+        std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
         for (std::vector<size_t>::const_iterator iter = timeout_keys.cbegin();
              iter != timeout_keys.cend(); ++iter) {
             RETURN_IF_ERROR(_clean_cache_internal(*iter));
@@ -192,7 +208,7 @@ Status SubFileCache::clean_timeout_cache() {
 }
 
 Status SubFileCache::clean_all_cache() {
-    std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock);
+    std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
     for (std::map<size_t, int64_t>::const_iterator iter = 
_last_match_times.cbegin();
          iter != _last_match_times.cend(); ++iter) {
         RETURN_IF_ERROR(_clean_cache_internal(iter->first));
diff --git a/be/src/io/cache/whole_file_cache.cpp 
b/be/src/io/cache/whole_file_cache.cpp
index 012ae3e983..426b3f175c 100644
--- a/be/src/io/cache/whole_file_cache.cpp
+++ b/be/src/io/cache/whole_file_cache.cpp
@@ -53,7 +53,7 @@ Status WholeFileCache::read_at(size_t offset, Slice result, 
size_t* bytes_read)
 }
 
 Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
-    std::lock_guard<std::shared_mutex> wrlock(_cache_lock);
+    std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
     Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
     Path cache_done_file = _cache_dir / WHOLE_FILE_CACHE_DONE_NAME;
     bool done_file_exist = false;
@@ -61,6 +61,15 @@ Status WholeFileCache::_generate_cache_reader(size_t offset, 
size_t req_size) {
             io::global_local_filesystem()->exists(cache_done_file, 
&done_file_exist),
             "Check local cache done file exist failed.");
     if (!done_file_exist) {
+        bool cache_dir_exist = false;
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->exists(_cache_dir, 
&cache_dir_exist),
+                fmt::format("Check local cache dir exist failed. {}", 
_cache_dir.native()));
+        if (!cache_dir_exist) {
+            RETURN_NOT_OK_STATUS_WITH_WARN(
+                    
io::global_local_filesystem()->create_directory(_cache_dir),
+                    fmt::format("Create local cache dir failed. {}", 
_cache_dir.native()));
+        }
         bool cache_file_exist = false;
         RETURN_NOT_OK_STATUS_WITH_WARN(
                 io::global_local_filesystem()->exists(cache_file, 
&cache_file_exist),
@@ -91,8 +100,15 @@ Status WholeFileCache::_generate_cache_reader(size_t 
offset, size_t req_size) {
                 file_writer->append(file_slice),
                 fmt::format("Write local cache file failed: {}", 
cache_file.native()));
         RETURN_NOT_OK_STATUS_WITH_WARN(
-                io::global_local_filesystem()->create_file(cache_done_file, 
&file_writer),
+                file_writer->close(),
+                fmt::format("Close local cache file failed: {}", 
cache_file.native()));
+        io::FileWriterPtr done_file_writer;
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                io::global_local_filesystem()->create_file(cache_done_file, 
&done_file_writer),
                 fmt::format("Create local done file failed: {}", 
cache_done_file.native()));
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                done_file_writer->close(),
+                fmt::format("Close local done file failed: {}", 
cache_done_file.native()));
     }
     RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, 
&_cache_file_reader));
     _cache_file_size = _cache_file_reader->size();
@@ -115,7 +131,7 @@ Status WholeFileCache::clean_all_cache() {
 }
 
 Status WholeFileCache::_clean_cache_internal() {
-    std::lock_guard<std::shared_mutex> wrlock(_cache_lock);
+    std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
     _cache_file_reader.reset();
     _cache_file_size = 0;
     Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index 24801e4c7f..ebe9ce4e27 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -41,6 +41,11 @@ std::string BetaRowset::segment_file_path(int segment_id) {
     return remote_segment_path(_rowset_meta->tablet_id(), rowset_id(), 
segment_id);
 }
 
+std::string BetaRowset::segment_cache_path(int segment_id) {
+    // 
{root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num}
+    return fmt::format("{}/{}_{}", _tablet_path, rowset_id().to_string(), 
segment_id);
+}
+
 std::string BetaRowset::local_segment_path(const std::string& tablet_path,
                                            const RowsetId& rowset_id, int 
segment_id) {
     // 
{root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num}.dat
@@ -83,8 +88,9 @@ Status 
BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segm
     }
     for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
         auto seg_path = segment_file_path(seg_id);
+        auto cache_path = segment_cache_path(seg_id);
         std::shared_ptr<segment_v2::Segment> segment;
-        auto s = segment_v2::Segment::open(fs, seg_path, seg_id, _schema, 
&segment);
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, 
_schema, &segment);
         if (!s.ok()) {
             LOG(WARNING) << "failed to open segment. " << seg_path << " under 
rowset "
                          << unique_id() << " : " << s.to_string();
@@ -102,7 +108,8 @@ Status BetaRowset::load_segment(int64_t seg_id, 
segment_v2::SegmentSharedPtr* se
         return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
     }
     auto seg_path = segment_file_path(seg_id);
-    auto s = segment_v2::Segment::open(fs, seg_path, seg_id, _schema, segment);
+    auto cache_path = segment_cache_path(seg_id);
+    auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, 
_schema, segment);
     if (!s.ok()) {
         LOG(WARNING) << "failed to open segment. " << seg_path << " under 
rowset " << unique_id()
                      << " : " << s.to_string();
@@ -257,8 +264,9 @@ bool BetaRowset::check_current_rowset_segment() {
     }
     for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
         auto seg_path = segment_file_path(seg_id);
+        auto cache_path = segment_cache_path(seg_id);
         std::shared_ptr<segment_v2::Segment> segment;
-        auto s = segment_v2::Segment::open(fs, seg_path, seg_id, _schema, 
&segment);
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, 
_schema, &segment);
         if (!s.ok()) {
             LOG(WARNING) << "segment can not be opened. file=" << seg_path;
             return false;
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index e36538a413..c5b23e4fe8 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -44,6 +44,8 @@ public:
 
     std::string segment_file_path(int segment_id);
 
+    std::string segment_cache_path(int segment_id);
+
     static std::string local_segment_path(const std::string& tablet_path, 
const RowsetId& rowset_id,
                                           int segment_id);
 
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index d042e4f7ef..18c7e2797f 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -40,15 +40,15 @@ namespace segment_v2 {
 
 using io::FileCacheManager;
 
-Status Segment::open(io::FileSystem* fs, const std::string& path, uint32_t 
segment_id,
-                     TabletSchemaSPtr tablet_schema, std::shared_ptr<Segment>* 
output) {
+Status Segment::open(io::FileSystem* fs, const std::string& path, const 
std::string& cache_path,
+                     uint32_t segment_id, TabletSchemaSPtr tablet_schema,
+                     std::shared_ptr<Segment>* output) {
     std::shared_ptr<Segment> segment(new Segment(segment_id, tablet_schema));
     io::FileReaderSPtr file_reader;
     RETURN_IF_ERROR(fs->open_file(path, &file_reader));
     if (config::file_cache_type.empty()) {
         segment->_file_reader = std::move(file_reader);
     } else {
-        std::string cache_path = path.substr(0, path.size() - 4);
         io::FileReaderSPtr cache_reader = 
FileCacheManager::instance()->new_file_cache(
                 cache_path, config::file_cache_alive_time_sec, file_reader,
                 config::file_cache_type);
diff --git a/be/src/olap/rowset/segment_v2/segment.h 
b/be/src/olap/rowset/segment_v2/segment.h
index c7d27495a0..6a67b2aa9b 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -61,8 +61,9 @@ using SegmentSharedPtr = std::shared_ptr<Segment>;
 // change finished, client should disable all cached Segment for old 
TabletSchema.
 class Segment : public std::enable_shared_from_this<Segment> {
 public:
-    static Status open(io::FileSystem* fs, const std::string& path, uint32_t 
segment_id,
-                       TabletSchemaSPtr tablet_schema, 
std::shared_ptr<Segment>* output);
+    static Status open(io::FileSystem* fs, const std::string& path, const 
std::string& cache_path,
+                       uint32_t segment_id, TabletSchemaSPtr tablet_schema,
+                       std::shared_ptr<Segment>* output);
 
     ~Segment();
 
diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp 
b/be/test/olap/rowset/segment_v2/segment_test.cpp
index 4438d7f0b8..df73e8f8f7 100644
--- a/be/test/olap/rowset/segment_v2/segment_test.cpp
+++ b/be/test/olap/rowset/segment_v2/segment_test.cpp
@@ -181,7 +181,7 @@ protected:
             EXPECT_EQ("", writer.max_encoded_key().to_string());
         }
 
-        st = Segment::open(fs, path, 0, query_schema, res);
+        st = Segment::open(fs, path, "", 0, query_schema, res);
         EXPECT_TRUE(st.ok());
         EXPECT_EQ(nrows, (*res)->num_rows());
     }
@@ -947,7 +947,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
 
     {
         std::shared_ptr<Segment> segment;
-        st = Segment::open(fs, fname, 0, tablet_schema, &segment);
+        st = Segment::open(fs, fname, "", 0, tablet_schema, &segment);
         EXPECT_TRUE(st.ok());
         EXPECT_EQ(4096, segment->num_rows());
         Schema schema(tablet_schema);
diff --git a/be/test/tools/benchmark_tool.cpp b/be/test/tools/benchmark_tool.cpp
index 3b51659178..b5c1280796 100644
--- a/be/test/tools/benchmark_tool.cpp
+++ b/be/test/tools/benchmark_tool.cpp
@@ -364,7 +364,7 @@ public:
         writer.finalize(&file_size, &index_size);
         file_writer->close();
 
-        Segment::open(fs, path, seg_id, &_tablet_schema, res);
+        Segment::open(fs, path, "", seg_id, &_tablet_schema, res);
     }
 
     std::vector<std::vector<std::string>> generate_dataset(int rows_number) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to