This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e5c2bb9699 [fix](remote)Fix bug for Cache Reader (#11629) e5c2bb9699 is described below commit e5c2bb969979d320308e9bac1850d29bf7a3ae20 Author: pengxiangyu <diablo...@163.com> AuthorDate: Fri Aug 12 13:40:32 2022 +0800 [fix](remote)Fix bug for Cache Reader (#11629) --- be/src/io/cache/sub_file_cache.cpp | 24 ++++++++++++++++++++---- be/src/io/cache/whole_file_cache.cpp | 22 +++++++++++++++++++--- be/src/olap/rowset/beta_rowset.cpp | 14 +++++++++++--- be/src/olap/rowset/beta_rowset.h | 2 ++ be/src/olap/rowset/segment_v2/segment.cpp | 6 +++--- be/src/olap/rowset/segment_v2/segment.h | 5 +++-- be/test/olap/rowset/segment_v2/segment_test.cpp | 4 ++-- be/test/tools/benchmark_tool.cpp | 2 +- 8 files changed, 61 insertions(+), 18 deletions(-) diff --git a/be/src/io/cache/sub_file_cache.cpp b/be/src/io/cache/sub_file_cache.cpp index 519fa29730..356d21e0e4 100644 --- a/be/src/io/cache/sub_file_cache.cpp +++ b/be/src/io/cache/sub_file_cache.cpp @@ -52,7 +52,16 @@ Status SubFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) { } } if (need_download) { - std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock); + std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock); + bool cache_dir_exist = false; + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->exists(_cache_dir, &cache_dir_exist), + fmt::format("Check local cache dir exist failed. {}", _cache_dir.native())); + if (!cache_dir_exist) { + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->create_directory(_cache_dir), + fmt::format("Create local cache dir failed. {}", _cache_dir.native())); + } for (vector<size_t>::const_iterator iter = need_cache_offsets.cbegin(); iter != need_cache_offsets.cend(); ++iter) { if (_cache_file_readers.find(*iter) == _cache_file_readers.end() || @@ -145,8 +154,15 @@ Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) { file_writer->append(file_slice), fmt::format("Write local cache file failed: {}", cache_file.native())); RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->create_file(cache_done_file, &file_writer), + file_writer->close(), + fmt::format("Close local cache file failed: {}", cache_file.native())); + io::FileWriterPtr done_file_writer; + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->create_file(cache_done_file, &done_file_writer), fmt::format("Create local done file failed: {}", cache_done_file.native())); + RETURN_NOT_OK_STATUS_WITH_WARN( + done_file_writer->close(), + fmt::format("Close local done file failed: {}", cache_done_file.native())); } io::FileReaderSPtr cache_reader; RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &cache_reader)); @@ -181,7 +197,7 @@ Status SubFileCache::clean_timeout_cache() { } } if (timeout_keys.size() > 0) { - std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock); + std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock); for (std::vector<size_t>::const_iterator iter = timeout_keys.cbegin(); iter != timeout_keys.cend(); ++iter) { RETURN_IF_ERROR(_clean_cache_internal(*iter)); @@ -192,7 +208,7 @@ Status SubFileCache::clean_timeout_cache() { } Status SubFileCache::clean_all_cache() { - std::lock_guard<std::shared_mutex> wrlock(_cache_map_lock); + std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock); for (std::map<size_t, int64_t>::const_iterator iter = _last_match_times.cbegin(); iter != _last_match_times.cend(); ++iter) { RETURN_IF_ERROR(_clean_cache_internal(iter->first)); diff --git a/be/src/io/cache/whole_file_cache.cpp b/be/src/io/cache/whole_file_cache.cpp index 012ae3e983..426b3f175c 100644 --- a/be/src/io/cache/whole_file_cache.cpp +++ b/be/src/io/cache/whole_file_cache.cpp @@ -53,7 +53,7 @@ Status WholeFileCache::read_at(size_t offset, Slice result, size_t* bytes_read) } Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) { - std::lock_guard<std::shared_mutex> wrlock(_cache_lock); + std::unique_lock<std::shared_mutex> wrlock(_cache_lock); Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME; Path cache_done_file = _cache_dir / WHOLE_FILE_CACHE_DONE_NAME; bool done_file_exist = false; @@ -61,6 +61,15 @@ Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) { io::global_local_filesystem()->exists(cache_done_file, &done_file_exist), "Check local cache done file exist failed."); if (!done_file_exist) { + bool cache_dir_exist = false; + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->exists(_cache_dir, &cache_dir_exist), + fmt::format("Check local cache dir exist failed. {}", _cache_dir.native())); + if (!cache_dir_exist) { + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->create_directory(_cache_dir), + fmt::format("Create local cache dir failed. {}", _cache_dir.native())); + } bool cache_file_exist = false; RETURN_NOT_OK_STATUS_WITH_WARN( io::global_local_filesystem()->exists(cache_file, &cache_file_exist), @@ -91,8 +100,15 @@ Status WholeFileCache::_generate_cache_reader(size_t offset, size_t req_size) { file_writer->append(file_slice), fmt::format("Write local cache file failed: {}", cache_file.native())); RETURN_NOT_OK_STATUS_WITH_WARN( - io::global_local_filesystem()->create_file(cache_done_file, &file_writer), + file_writer->close(), + fmt::format("Close local cache file failed: {}", cache_file.native())); + io::FileWriterPtr done_file_writer; + RETURN_NOT_OK_STATUS_WITH_WARN( + io::global_local_filesystem()->create_file(cache_done_file, &done_file_writer), fmt::format("Create local done file failed: {}", cache_done_file.native())); + RETURN_NOT_OK_STATUS_WITH_WARN( + done_file_writer->close(), + fmt::format("Close local done file failed: {}", cache_done_file.native())); } RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file, &_cache_file_reader)); _cache_file_size = _cache_file_reader->size(); @@ -115,7 +131,7 @@ Status WholeFileCache::clean_all_cache() { } Status WholeFileCache::_clean_cache_internal() { - std::lock_guard<std::shared_mutex> wrlock(_cache_lock); + std::unique_lock<std::shared_mutex> wrlock(_cache_lock); _cache_file_reader.reset(); _cache_file_size = 0; Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME; diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 24801e4c7f..ebe9ce4e27 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -41,6 +41,11 @@ std::string BetaRowset::segment_file_path(int segment_id) { return remote_segment_path(_rowset_meta->tablet_id(), rowset_id(), segment_id); } +std::string BetaRowset::segment_cache_path(int segment_id) { + // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num} + return fmt::format("{}/{}_{}", _tablet_path, rowset_id().to_string(), segment_id); +} + std::string BetaRowset::local_segment_path(const std::string& tablet_path, const RowsetId& rowset_id, int segment_id) { // {root_path}/data/{shard_id}/{tablet_id}/{schema_hash}/{rowset_id}_{seg_num}.dat @@ -83,8 +88,9 @@ Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segm } for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { auto seg_path = segment_file_path(seg_id); + auto cache_path = segment_cache_path(seg_id); std::shared_ptr<segment_v2::Segment> segment; - auto s = segment_v2::Segment::open(fs, seg_path, seg_id, _schema, &segment); + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _schema, &segment); if (!s.ok()) { LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << unique_id() << " : " << s.to_string(); @@ -102,7 +108,8 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); } auto seg_path = segment_file_path(seg_id); - auto s = segment_v2::Segment::open(fs, seg_path, seg_id, _schema, segment); + auto cache_path = segment_cache_path(seg_id); + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _schema, segment); if (!s.ok()) { LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << unique_id() << " : " << s.to_string(); @@ -257,8 +264,9 @@ bool BetaRowset::check_current_rowset_segment() { } for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { auto seg_path = segment_file_path(seg_id); + auto cache_path = segment_cache_path(seg_id); std::shared_ptr<segment_v2::Segment> segment; - auto s = segment_v2::Segment::open(fs, seg_path, seg_id, _schema, &segment); + auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, _schema, &segment); if (!s.ok()) { LOG(WARNING) << "segment can not be opened. file=" << seg_path; return false; diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index e36538a413..c5b23e4fe8 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -44,6 +44,8 @@ public: std::string segment_file_path(int segment_id); + std::string segment_cache_path(int segment_id); + static std::string local_segment_path(const std::string& tablet_path, const RowsetId& rowset_id, int segment_id); diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index d042e4f7ef..18c7e2797f 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -40,15 +40,15 @@ namespace segment_v2 { using io::FileCacheManager; -Status Segment::open(io::FileSystem* fs, const std::string& path, uint32_t segment_id, - TabletSchemaSPtr tablet_schema, std::shared_ptr<Segment>* output) { +Status Segment::open(io::FileSystem* fs, const std::string& path, const std::string& cache_path, + uint32_t segment_id, TabletSchemaSPtr tablet_schema, + std::shared_ptr<Segment>* output) { std::shared_ptr<Segment> segment(new Segment(segment_id, tablet_schema)); io::FileReaderSPtr file_reader; RETURN_IF_ERROR(fs->open_file(path, &file_reader)); if (config::file_cache_type.empty()) { segment->_file_reader = std::move(file_reader); } else { - std::string cache_path = path.substr(0, path.size() - 4); io::FileReaderSPtr cache_reader = FileCacheManager::instance()->new_file_cache( cache_path, config::file_cache_alive_time_sec, file_reader, config::file_cache_type); diff --git a/be/src/olap/rowset/segment_v2/segment.h b/be/src/olap/rowset/segment_v2/segment.h index c7d27495a0..6a67b2aa9b 100644 --- a/be/src/olap/rowset/segment_v2/segment.h +++ b/be/src/olap/rowset/segment_v2/segment.h @@ -61,8 +61,9 @@ using SegmentSharedPtr = std::shared_ptr<Segment>; // change finished, client should disable all cached Segment for old TabletSchema. class Segment : public std::enable_shared_from_this<Segment> { public: - static Status open(io::FileSystem* fs, const std::string& path, uint32_t segment_id, - TabletSchemaSPtr tablet_schema, std::shared_ptr<Segment>* output); + static Status open(io::FileSystem* fs, const std::string& path, const std::string& cache_path, + uint32_t segment_id, TabletSchemaSPtr tablet_schema, + std::shared_ptr<Segment>* output); ~Segment(); diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp index 4438d7f0b8..df73e8f8f7 100644 --- a/be/test/olap/rowset/segment_v2/segment_test.cpp +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -181,7 +181,7 @@ protected: EXPECT_EQ("", writer.max_encoded_key().to_string()); } - st = Segment::open(fs, path, 0, query_schema, res); + st = Segment::open(fs, path, "", 0, query_schema, res); EXPECT_TRUE(st.ok()); EXPECT_EQ(nrows, (*res)->num_rows()); } @@ -947,7 +947,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) { { std::shared_ptr<Segment> segment; - st = Segment::open(fs, fname, 0, tablet_schema, &segment); + st = Segment::open(fs, fname, "", 0, tablet_schema, &segment); EXPECT_TRUE(st.ok()); EXPECT_EQ(4096, segment->num_rows()); Schema schema(tablet_schema); diff --git a/be/test/tools/benchmark_tool.cpp b/be/test/tools/benchmark_tool.cpp index 3b51659178..b5c1280796 100644 --- a/be/test/tools/benchmark_tool.cpp +++ b/be/test/tools/benchmark_tool.cpp @@ -364,7 +364,7 @@ public: writer.finalize(&file_size, &index_size); file_writer->close(); - Segment::open(fs, path, seg_id, &_tablet_schema, res); + Segment::open(fs, path, "", seg_id, &_tablet_schema, res); } std::vector<std::vector<std::string>> generate_dataset(int rows_number) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org