This is an automated email from the ASF dual-hosted git repository. airborne pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 83d4911aae0 [test](inverted index)Add fault injection cases for index writing (#39649) 83d4911aae0 is described below commit 83d4911aae00531b756ba1598af517d57e208b21 Author: qiye <jianliang5...@gmail.com> AuthorDate: Fri Nov 8 13:54:10 2024 +0800 [test](inverted index)Add fault injection cases for index writing (#39649) ## Proposed changes add fault injection case for inverted index writer --- be/src/olap/compaction.cpp | 56 +++- be/src/olap/inverted_index_parser.cpp | 1 + be/src/olap/olap_server.cpp | 3 + .../char_filter/char_filter_factory.h | 1 + .../segment_v2/inverted_index_compaction.cpp | 16 +- .../segment_v2/inverted_index_file_writer.cpp | 211 +++++++------ .../rowset/segment_v2/inverted_index_file_writer.h | 4 +- .../segment_v2/inverted_index_fs_directory.cpp | 182 +++++++++-- .../rowset/segment_v2/inverted_index_writer.cpp | 95 +++++- be/src/olap/task/index_builder.cpp | 87 +++++- ...dex_compaction_exception_fault_injection.groovy | 341 +++++++++++++++++++++ ...inverted_index_exception_fault_injection.groovy | 301 ++++++++++++++++++ ...st_build_index_exception_fault_injection.groovy | 263 ++++++++++++++++ 13 files changed, 1419 insertions(+), 142 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 85e0a94c874..a581bce72c2 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -516,6 +516,8 @@ Status Compaction::do_inverted_index_compaction() { auto src_segment_num = src_seg_to_id_map.size(); auto dest_segment_num = dest_segment_num_rows.size(); + DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_dest_segment_num_is_zero", + { dest_segment_num = 0; }) if (dest_segment_num <= 0) { LOG(INFO) << "skip doing index compaction due to no output segments" << ". tablet=" << _tablet->tablet_id() << ", input row number=" << _input_row_num @@ -590,14 +592,17 @@ Status Compaction::do_inverted_index_compaction() { const auto& [rowset_id, seg_id] = m.first; auto find_it = rs_id_to_rowset_map.find(rowset_id); + DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_find_rowset_error", + { find_it = rs_id_to_rowset_map.end(); }) if (find_it == rs_id_to_rowset_map.end()) [[unlikely]] { - DCHECK(false) << _tablet->tablet_id() << ' ' << rowset_id; + // DCHECK(false) << _tablet->tablet_id() << ' ' << rowset_id; return Status::InternalError("cannot find rowset. tablet_id={} rowset_id={}", _tablet->tablet_id(), rowset_id.to_string()); } auto* rowset = find_it->second; - const auto& fs = rowset->rowset_meta()->fs(); + auto fs = rowset->rowset_meta()->fs(); + DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_get_fs_error", { fs = nullptr; }) if (!fs) { return Status::InternalError("get fs failed, resource_id={}", rowset->rowset_meta()->resource_id()); @@ -645,6 +650,13 @@ Status Compaction::do_inverted_index_compaction() { for (auto&& column_uniq_id : ctx.columns_to_do_index_compaction) { auto col = _cur_tablet_schema->column_by_uid(column_uniq_id); const auto* index_meta = _cur_tablet_schema->inverted_index(col); + DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_can_not_find_index_meta", + { index_meta = nullptr; }) + if (index_meta == nullptr) { + status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>( + fmt::format("Can not find index_meta for col {}", col.name())); + break; + } std::vector<lucene::store::Directory*> dest_index_dirs(dest_segment_num); try { @@ -707,10 +719,13 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { is_continue = true; break; } - const auto& properties = tablet_index->properties(); + auto properties = tablet_index->properties(); if (!first_properties.has_value()) { first_properties = properties; } else { + DBUG_EXECUTE_IF( + "Compaction::do_inverted_index_compaction_index_properties_different", + { properties.emplace("dummy_key", "dummy_value"); }) if (properties != first_properties.value()) { is_continue = true; break; @@ -722,6 +737,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { } auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) { auto* rowset = static_cast<BetaRowset*>(src_rs.get()); + DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_is_skip_index_compaction", + { rowset->set_skip_index_compaction(col_unique_id); }) if (rowset->is_skip_index_compaction(col_unique_id)) { LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] rowset[" << rowset->rowset_id() << "] column_unique_id[" << col_unique_id @@ -729,7 +746,9 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { return false; } - const auto& fs = rowset->rowset_meta()->fs(); + auto fs = rowset->rowset_meta()->fs(); + DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_get_fs_error", + { fs = nullptr; }) if (!fs) { LOG(WARNING) << "get fs failed, resource_id=" << rowset->rowset_meta()->resource_id(); @@ -737,6 +756,8 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { } const auto* index_meta = rowset->tablet_schema()->inverted_index(col_unique_id); + DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_index_meta_nullptr", + { index_meta = nullptr; }) if (index_meta == nullptr) { LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "] column_unique_id[" << col_unique_id << "] index meta is null, will skip index compaction"; @@ -746,6 +767,9 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { for (auto i = 0; i < rowset->num_segments(); i++) { // TODO: inverted_index_path auto seg_path = rowset->segment_path(i); + DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_seg_path_nullptr", { + seg_path = ResultError(Status::Error<ErrorCode::INTERNAL_ERROR>("error")); + }) if (!seg_path) { LOG(WARNING) << seg_path.error(); return false; @@ -763,6 +787,16 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { auto st = inverted_index_file_reader->init( config::inverted_index_read_buffer_size, open_idx_file_cache); index_file_path = inverted_index_file_reader->get_index_file_path(index_meta); + DBUG_EXECUTE_IF( + "Compaction::construct_skip_inverted_index_index_file_reader_init_" + "status_not_ok", + { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: " + "construct_skip_inverted_index_index_file_reader_init_" + "status_" + "not_ok"); + }) if (!st.ok()) { LOG(WARNING) << "init index " << index_file_path << " error:" << st; return false; @@ -770,6 +804,14 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { // check index meta auto result = inverted_index_file_reader->open(index_meta); + DBUG_EXECUTE_IF( + "Compaction::construct_skip_inverted_index_index_file_reader_open_" + "error", + { + result = ResultError( + Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur when open idx file")); + }) if (!result.has_value()) { LOG(WARNING) << "open index " << index_file_path << " error:" << result.error(); @@ -779,6 +821,12 @@ void Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) { std::vector<std::string> files; reader->list(&files); reader->close(); + DBUG_EXECUTE_IF( + "Compaction::construct_skip_inverted_index_index_reader_close_error", + { _CLTHROWA(CL_ERR_IO, "debug point: reader close error"); }) + + DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_index_files_count", + { files.clear(); }) // why is 3? // slice type index file at least has 3 files: null_bitmap, segments_N, segments.gen diff --git a/be/src/olap/inverted_index_parser.cpp b/be/src/olap/inverted_index_parser.cpp index a9ed7ec062e..f7e511970d9 100644 --- a/be/src/olap/inverted_index_parser.cpp +++ b/be/src/olap/inverted_index_parser.cpp @@ -128,6 +128,7 @@ std::string get_parser_ignore_above_value_from_properties( std::string get_parser_stopwords_from_properties( const std::map<std::string, std::string>& properties) { + DBUG_EXECUTE_IF("inverted_index_parser.get_parser_stopwords_from_properties", { return ""; }) if (properties.find(INVERTED_INDEX_PARSER_STOPWORDS_KEY) != properties.end()) { return properties.at(INVERTED_INDEX_PARSER_STOPWORDS_KEY); } else { diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 020d151d16b..8fae8887d7a 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -78,6 +78,7 @@ #include "runtime/memory/cache_manager.h" #include "runtime/memory/global_memory_arbitrator.h" #include "util/countdown_latch.h" +#include "util/debug_points.h" #include "util/doris_metrics.h" #include "util/mem_info.h" #include "util/thread.h" @@ -1134,6 +1135,8 @@ Status StorageEngine::submit_seg_compaction_task(std::shared_ptr<SegcompactionWo Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq& request) { auto tablet_id = request.tablet_id; TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); + DBUG_EXECUTE_IF("StorageEngine::process_index_change_task_tablet_nullptr", + { tablet = nullptr; }) if (tablet == nullptr) { LOG(WARNING) << "tablet: " << tablet_id << " not exist"; return Status::InternalError("tablet not exist, tablet_id={}.", tablet_id); diff --git a/be/src/olap/rowset/segment_v2/inverted_index/char_filter/char_filter_factory.h b/be/src/olap/rowset/segment_v2/inverted_index/char_filter/char_filter_factory.h index 561054863d7..bebbea58f72 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index/char_filter/char_filter_factory.h +++ b/be/src/olap/rowset/segment_v2/inverted_index/char_filter/char_filter_factory.h @@ -27,6 +27,7 @@ class CharFilterFactory { public: template <typename... Args> static lucene::analysis::CharFilter* create(const std::string& name, Args&&... args) { + DBUG_EXECUTE_IF("CharFilterFactory::create_return_nullptr", { return nullptr; }) if (name == INVERTED_INDEX_CHAR_FILTER_CHAR_REPLACE) { return new CharReplaceCharFilter(std::forward<Args>(args)...); } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp index 7d1b348b95b..88a8f241722 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp @@ -44,10 +44,16 @@ Status compact_column(int64_t index_id, bool can_use_ram_dir = true; lucene::store::Directory* dir = DorisFSDirectoryFactory::getDirectory( io::global_local_filesystem(), tmp_path.data(), can_use_ram_dir); + DBUG_EXECUTE_IF("compact_column_getDirectory_error", { + _CLTHROWA(CL_ERR_IO, "debug point: compact_column_getDirectory_error in index compaction"); + }) lucene::analysis::SimpleAnalyzer<char> analyzer; auto* index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer, true /* create */, true /* closeDirOnShutdown */); - + DBUG_EXECUTE_IF("compact_column_create_index_writer_error", { + _CLTHROWA(CL_ERR_IO, + "debug point: compact_column_create_index_writer_error in index compaction"); + }) DCHECK_EQ(src_index_dirs.size(), trans_vec.size()); std::vector<lucene::store::Directory*> tmp_src_index_dirs(src_index_dirs.size()); for (size_t i = 0; i < tmp_src_index_dirs.size(); ++i) { @@ -55,8 +61,16 @@ Status compact_column(int64_t index_id, } index_writer->indexCompaction(tmp_src_index_dirs, dest_index_dirs, trans_vec, dest_segment_num_rows); + DBUG_EXECUTE_IF("compact_column_indexCompaction_error", { + _CLTHROWA(CL_ERR_IO, + "debug point: compact_column_indexCompaction_error in index compaction"); + }) index_writer->close(); + DBUG_EXECUTE_IF("compact_column_index_writer_close_error", { + _CLTHROWA(CL_ERR_IO, + "debug point: compact_column_index_writer_close_error in index compaction"); + }) _CLDELETE(index_writer); // NOTE: need to ref_cnt-- for dir, // when index_writer is destroyed, if closeDir is set, dir will be close diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp index 70c1e55d1e8..5599faa351d 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp @@ -17,6 +17,8 @@ #include "olap/rowset/segment_v2/inverted_index_file_writer.h" +#include <glog/logging.h> + #include <filesystem> #include "common/status.h" @@ -44,11 +46,13 @@ Result<DorisFSDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index index_meta->get_index_suffix()); bool exists = false; auto st = local_fs->exists(local_fs_index_path, &exists); + DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_error", + { st = Status::Error<ErrorCode::IO_ERROR>("debug point: no such file error"); }) if (!st.ok()) { LOG(ERROR) << "index_path:" << local_fs_index_path << " exists error:" << st; return ResultError(st); } - + DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_true", { exists = true; }) if (exists) { LOG(ERROR) << "try to init a directory:" << local_fs_index_path << " already exists"; return ResultError( @@ -75,6 +79,8 @@ Result<DorisFSDirectory*> InvertedIndexFileWriter::open(const TabletIndex* index } Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) { + DBUG_EXECUTE_IF("InvertedIndexFileWriter::delete_index_index_meta_nullptr", + { index_meta = nullptr; }); if (!index_meta) { return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is null."); } @@ -84,6 +90,8 @@ Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) { // Check if the specified index exists auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix)); + DBUG_EXECUTE_IF("InvertedIndexFileWriter::delete_index_indices_dirs_reach_end", + { index_it = _indices_dirs.end(); }) if (index_it == _indices_dirs.end()) { std::ostringstream errMsg; errMsg << "No inverted index with id " << index_id << " and suffix " << index_suffix @@ -136,7 +144,7 @@ Status InvertedIndexFileWriter::close() { }) if (_storage_format == InvertedIndexStorageFormatPB::V1) { try { - _total_file_size = write_v1(); + RETURN_IF_ERROR(write_v1()); for (const auto& entry : _indices_dirs) { const auto& dir = entry.second; // delete index path, which contains separated inverted index files @@ -151,7 +159,7 @@ Status InvertedIndexFileWriter::close() { } } else { try { - _total_file_size = write_v2(); + RETURN_IF_ERROR(write_v2()); for (const auto& entry : _indices_dirs) { const auto& dir = entry.second; // delete index path, which contains separated inverted index files @@ -198,7 +206,12 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire int64_t bufferLength) { lucene::store::IndexInput* tmp = nullptr; CLuceneError err; - if (!dir->openInput(fileName, tmp, err)) { + auto open = dir->openInput(fileName, tmp, err); + DBUG_EXECUTE_IF("InvertedIndexFileWriter::copyFile_openInput_error", { + open = false; + err.set(CL_ERR_IO, "debug point: copyFile_openInput_error"); + }); + if (!open) { throw err; } @@ -214,6 +227,7 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire output->writeBytes(buffer, len); remainder -= len; } + DBUG_EXECUTE_IF("InvertedIndexFileWriter::copyFile_remainder_is_not_zero", { remainder = 10; }); if (remainder != 0) { std::ostringstream errMsg; errMsg << "Non-zero remainder length after copying: " << remainder << " (id: " << fileName @@ -224,6 +238,8 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire int64_t end_ptr = output->getFilePointer(); int64_t diff = end_ptr - start_ptr; + DBUG_EXECUTE_IF("InvertedIndexFileWriter::copyFile_diff_not_equals_length", + { diff = length - 10; }); if (diff != length) { std::ostringstream errMsg; errMsg << "Difference in the output file offsets " << diff @@ -234,7 +250,7 @@ void InvertedIndexFileWriter::copyFile(const char* fileName, lucene::store::Dire input->close(); } -int64_t InvertedIndexFileWriter::write_v1() { +Status InvertedIndexFileWriter::write_v1() { int64_t total_size = 0; for (const auto& entry : _indices_dirs) { const int64_t index_id = entry.first.first; @@ -267,6 +283,8 @@ int64_t InvertedIndexFileWriter::write_v1() { // write file entries to ram directory to get header length lucene::store::RAMDirectory ram_dir; auto* out_idx = ram_dir.createOutput(idx_name.c_str()); + DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_ram_output_is_nullptr", + { out_idx = nullptr; }) if (out_idx == nullptr) { LOG(WARNING) << "Write compound file error: RAMDirectory output is nullptr."; _CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error"); @@ -300,6 +318,8 @@ int64_t InvertedIndexFileWriter::write_v1() { out_dir->set_file_writer_opts(_opts); auto* out = out_dir->createOutput(idx_name.c_str()); + DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr", + { out = nullptr; }); if (out == nullptr) { LOG(WARNING) << "Write compound file error: CompoundDirectory output is nullptr."; _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error"); @@ -351,106 +371,125 @@ int64_t InvertedIndexFileWriter::write_v1() { auto* new_index_info = _file_info.add_index_info(); *new_index_info = index_info; } catch (CLuceneError& err) { - LOG(ERROR) << "CLuceneError occur when close idx file " - << InvertedIndexDescriptor::get_index_file_path_v1(_index_path_prefix, - index_id, index_suffix) + auto index_path = InvertedIndexDescriptor::get_index_file_path_v1( + _index_path_prefix, index_id, index_suffix); + LOG(ERROR) << "CLuceneError occur when write_v1 idx file " << index_path << " error msg: " << err.what(); - throw err; + return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur when write_v1 idx file: {}, error msg: {}", index_path, + err.what()); } } - return total_size; + _total_file_size = total_size; + return Status::OK(); } -int64_t InvertedIndexFileWriter::write_v2() { - // Create the output stream to write the compound file - int64_t current_offset = headerLength(); - +Status InvertedIndexFileWriter::write_v2() { io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)}; + std::unique_ptr<lucene::store::IndexOutput> compound_file_output; + try { + // Create the output stream to write the compound file + int64_t current_offset = headerLength(); - auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, index_path.parent_path().c_str()); - out_dir->set_file_writer_opts(_opts); + io::Path index_path {InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)}; - std::unique_ptr<lucene::store::IndexOutput> compound_file_output; + auto* out_dir = + DorisFSDirectoryFactory::getDirectory(_fs, index_path.parent_path().c_str()); + out_dir->set_file_writer_opts(_opts); - DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is nullptr"; - compound_file_output = std::unique_ptr<lucene::store::IndexOutput>( - out_dir->createOutputV2(_idx_v2_writer.get())); + std::unique_ptr<lucene::store::IndexOutput> compound_file_output; - // Write the version number - compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2); + DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is nullptr"; + compound_file_output = std::unique_ptr<lucene::store::IndexOutput>( + out_dir->createOutputV2(_idx_v2_writer.get())); - // Write the number of indices - const auto numIndices = static_cast<uint32_t>(_indices_dirs.size()); - compound_file_output->writeInt(numIndices); + // Write the version number + compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2); - std::vector<std::tuple<std::string, int64_t, int64_t, CL_NS(store)::Directory*>> - file_metadata; // Store file name, offset, file length, and corresponding directory + // Write the number of indices + const auto numIndices = static_cast<uint32_t>(_indices_dirs.size()); + compound_file_output->writeInt(numIndices); - // First, write all index information and file metadata - for (const auto& entry : _indices_dirs) { - const int64_t index_id = entry.first.first; - const auto& index_suffix = entry.first.second; - const auto& dir = entry.second; - std::vector<std::string> files; - dir->list(&files); + std::vector<std::tuple<std::string, int64_t, int64_t, CL_NS(store)::Directory*>> + file_metadata; // Store file name, offset, file length, and corresponding directory - auto it = std::find(files.begin(), files.end(), DorisFSDirectory::WRITE_LOCK_FILE); - if (it != files.end()) { - files.erase(it); - } - // sort file list by file length - std::vector<std::pair<std::string, int64_t>> sorted_files; - for (const auto& file : files) { - sorted_files.emplace_back(file, dir->fileLength(file.c_str())); - } + // First, write all index information and file metadata + for (const auto& entry : _indices_dirs) { + const int64_t index_id = entry.first.first; + const auto& index_suffix = entry.first.second; + const auto& dir = entry.second; + std::vector<std::string> files; + dir->list(&files); + + auto it = std::find(files.begin(), files.end(), DorisFSDirectory::WRITE_LOCK_FILE); + if (it != files.end()) { + files.erase(it); + } + // sort file list by file length + std::vector<std::pair<std::string, int64_t>> sorted_files; + for (const auto& file : files) { + sorted_files.emplace_back(file, dir->fileLength(file.c_str())); + } + + std::sort( + sorted_files.begin(), sorted_files.end(), + [](const std::pair<std::string, int64_t>& a, + const std::pair<std::string, int64_t>& b) { return (a.second < b.second); }); - std::sort(sorted_files.begin(), sorted_files.end(), - [](const std::pair<std::string, int64_t>& a, - const std::pair<std::string, int64_t>& b) { return (a.second < b.second); }); - - int32_t file_count = sorted_files.size(); - - // Write the index ID and the number of files - compound_file_output->writeLong(index_id); - compound_file_output->writeInt(static_cast<int32_t>(index_suffix.length())); - compound_file_output->writeBytes(reinterpret_cast<const uint8_t*>(index_suffix.data()), - index_suffix.length()); - compound_file_output->writeInt(file_count); - - // Calculate the offset for each file and write the file metadata - for (const auto& file : sorted_files) { - int64_t file_length = dir->fileLength(file.first.c_str()); - compound_file_output->writeInt(static_cast<int32_t>(file.first.length())); - compound_file_output->writeBytes(reinterpret_cast<const uint8_t*>(file.first.data()), - file.first.length()); - compound_file_output->writeLong(current_offset); - compound_file_output->writeLong(file_length); - - file_metadata.emplace_back(file.first, current_offset, file_length, dir.get()); - current_offset += file_length; // Update the data offset + int32_t file_count = sorted_files.size(); + + // Write the index ID and the number of files + compound_file_output->writeLong(index_id); + compound_file_output->writeInt(static_cast<int32_t>(index_suffix.length())); + compound_file_output->writeBytes(reinterpret_cast<const uint8_t*>(index_suffix.data()), + index_suffix.length()); + compound_file_output->writeInt(file_count); + + // Calculate the offset for each file and write the file metadata + for (const auto& file : sorted_files) { + int64_t file_length = dir->fileLength(file.first.c_str()); + compound_file_output->writeInt(static_cast<int32_t>(file.first.length())); + compound_file_output->writeBytes( + reinterpret_cast<const uint8_t*>(file.first.data()), file.first.length()); + compound_file_output->writeLong(current_offset); + compound_file_output->writeLong(file_length); + + file_metadata.emplace_back(file.first, current_offset, file_length, dir.get()); + current_offset += file_length; // Update the data offset + } } - } - const int64_t buffer_length = 16384; - uint8_t header_buffer[buffer_length]; + const int64_t buffer_length = 16384; + uint8_t header_buffer[buffer_length]; - // Next, write the file data - for (const auto& info : file_metadata) { - const std::string& file = std::get<0>(info); - auto* dir = std::get<3>(info); + // Next, write the file data + for (const auto& info : file_metadata) { + const std::string& file = std::get<0>(info); + auto* dir = std::get<3>(info); - // Write the actual file data - copyFile(file.c_str(), dir, compound_file_output.get(), header_buffer, buffer_length); - } + // Write the actual file data + copyFile(file.c_str(), dir, compound_file_output.get(), header_buffer, buffer_length); + } - out_dir->close(); - // NOTE: need to decrease ref count, but not to delete here, - // because index cache may get the same directory from DIRECTORIES - _CLDECDELETE(out_dir) - auto compound_file_size = compound_file_output->getFilePointer(); - compound_file_output->close(); - _file_info.set_index_size(compound_file_size); - return compound_file_size; + out_dir->close(); + // NOTE: need to decrease ref count, but not to delete here, + // because index cache may get the same directory from DIRECTORIES + _CLDECDELETE(out_dir) + _total_file_size = compound_file_output->getFilePointer(); + compound_file_output->close(); + _file_info.set_index_size(_total_file_size); + } catch (CLuceneError& err) { + LOG(ERROR) << "CLuceneError occur when close idx file " << index_path + << " error msg: " << err.what(); + if (compound_file_output) { + compound_file_output->close(); + compound_file_output.reset(); + } + return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError occur when close idx file: {}, error msg: {}", index_path.c_str(), + err.what()); + } + return Status::OK(); } -} // namespace doris::segment_v2 \ No newline at end of file +} // namespace doris::segment_v2 diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h index ccd6953cdd7..31e287d6dd3 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h @@ -64,8 +64,8 @@ public: Status delete_index(const TabletIndex* index_meta); Status initialize(InvertedIndexDirectoryMap& indices_dirs); ~InvertedIndexFileWriter() = default; - int64_t write_v2(); - int64_t write_v1(); + Status write_v2(); + Status write_v1(); Status close(); int64_t headerLength(); const InvertedIndexFileInfo* get_index_file_info() const { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp index f752c530020..ded71c8a6cc 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp @@ -183,7 +183,10 @@ DorisFSDirectory::FSIndexInput::SharedHandle::SharedHandle(const char* path) { DorisFSDirectory::FSIndexInput::SharedHandle::~SharedHandle() { if (_reader) { - if (_reader->close().ok()) { + auto st = _reader->close(); + DBUG_EXECUTE_IF("FSIndexInput::~SharedHandle_reader_close_error", + { st = Status::Error<doris::ErrorCode::NOT_FOUND>("failed to close"); }); + if (st.ok()) { _reader = nullptr; } } @@ -238,10 +241,17 @@ void DorisFSDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_t len) Slice result {b, (size_t)len}; size_t bytes_read = 0; - if (!_handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx).ok()) { + auto st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx); + DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error", { + st = Status::InternalError( + "debug point: DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error"); + }) + if (!st.ok()) { _CLTHROWA(CL_ERR_IO, "read past EOF"); } bufferLength = len; + DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_bytes_read_error", + { bytes_read = len + 10; }) if (bytes_read != len) { _CLTHROWA(CL_ERR_IO, "read error"); } @@ -313,6 +323,10 @@ void DorisFSDirectory::FSIndexOutput::flushBuffer(const uint8_t* b, const int32_ _CLTHROWA(CL_ERR_IO, "writer append data when flushBuffer error"); } } else { + DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput::flushBuffer_writer_is_nullptr", + { _writer = nullptr; }) + DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput::flushBuffer_b_is_nullptr", + { b = nullptr; }) if (_writer == nullptr) { LOG(WARNING) << "File writer is nullptr in DorisFSDirectory::FSIndexOutput, " "ignore flush."; @@ -327,8 +341,7 @@ void DorisFSDirectory::FSIndexOutput::close() { try { BufferedIndexOutput::close(); DBUG_EXECUTE_IF( - "DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_" - "close", + "DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close", { _CLTHROWA(CL_ERR_IO, "debug point: test throw error in bufferedindexoutput close"); @@ -342,6 +355,10 @@ void DorisFSDirectory::FSIndexOutput::close() { _writer.reset(nullptr); _CLTHROWA(err.number(), err.what()); } + DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput.set_writer_nullptr", { + LOG(WARNING) << "Dbug execute, set _writer to nullptr"; + _writer = nullptr; + }) if (_writer) { auto ret = _writer->close(); DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error", @@ -353,6 +370,7 @@ void DorisFSDirectory::FSIndexOutput::close() { } } else { LOG(WARNING) << "File writer is nullptr, ignore finalize and close."; + _CLTHROWA(CL_ERR_IO, "close file writer error, _writer = nullptr"); } _writer.reset(nullptr); } @@ -364,13 +382,9 @@ int64_t DorisFSDirectory::FSIndexOutput::length() const { void DorisFSDirectory::FSIndexOutputV2::init(io::FileWriter* file_writer) { _index_v2_file_writer = file_writer; - DBUG_EXECUTE_IF( - "DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_" - "init", - { - _CLTHROWA(CL_ERR_IO, - "debug point: test throw error in fsindexoutput init mock error"); - }) + DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init", { + _CLTHROWA(CL_ERR_IO, "debug point: test throw error in fsindexoutput init mock error"); + }) } DorisFSDirectory::FSIndexOutputV2::~FSIndexOutputV2() {} @@ -393,6 +407,10 @@ void DorisFSDirectory::FSIndexOutputV2::flushBuffer(const uint8_t* b, const int3 _CLTHROWA(CL_ERR_IO, "writer append data when flushBuffer error"); } } else { + DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutputV2::flushBuffer_file_writer_is_nullptr", + { _index_v2_file_writer = nullptr; }) + DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutputV2::flushBuffer_b_is_nullptr", + { b = nullptr; }) if (_index_v2_file_writer == nullptr) { LOG(WARNING) << "File writer is nullptr in DorisFSDirectory::FSIndexOutputV2, " "ignore flush."; @@ -408,8 +426,7 @@ void DorisFSDirectory::FSIndexOutputV2::close() { try { BufferedIndexOutput::close(); DBUG_EXECUTE_IF( - "DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_" - "close", + "DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close", { _CLTHROWA(CL_ERR_IO, "debug point: test throw error in bufferedindexoutput close"); @@ -422,6 +439,10 @@ void DorisFSDirectory::FSIndexOutputV2::close() { } _CLTHROWA(err.number(), err.what()); } + DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput.set_writer_nullptr", { + LOG(WARNING) << "Dbug execute, set _index_v2_file_writer to nullptr"; + _index_v2_file_writer = nullptr; + }) if (_index_v2_file_writer) { auto ret = _index_v2_file_writer->close(); DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error", @@ -480,7 +501,16 @@ bool DorisFSDirectory::list(std::vector<std::string>* names) const { priv_getFN(fl, ""); std::vector<io::FileInfo> files; bool exists; - LOG_AND_THROW_IF_ERROR(_fs->list(fl, true, &files, &exists), "List file IO error"); + auto st = _fs->list(fl, true, &files, &exists); + DBUG_EXECUTE_IF("DorisFSDirectory::list_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: DorisFSDirectory::list_status_is_not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, "List file IO error"); + DBUG_EXECUTE_IF("DorisFSDirectory::list_directory_not_exists", { exists = false; }) + if (!exists) { + LOG_AND_THROW_IF_ERROR(st, fmt::format("Directory {} is not exist", fl)); + } for (auto& file : files) { names->push_back(file.file_name); } @@ -492,7 +522,12 @@ bool DorisFSDirectory::fileExists(const char* name) const { char fl[CL_MAX_DIR]; priv_getFN(fl, name); bool exists = false; - LOG_AND_THROW_IF_ERROR(_fs->exists(fl, &exists), "File exists IO error"); + auto st = _fs->exists(fl, &exists); + DBUG_EXECUTE_IF("DorisFSDirectory::fileExists_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: DorisFSDirectory::fileExists_status_is_not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, "File exists IO error"); return exists; } @@ -518,7 +553,12 @@ void DorisFSDirectory::touchFile(const char* name) { snprintf(buffer, CL_MAX_DIR, "%s%s%s", directory.c_str(), PATH_DELIMITERA, name); io::FileWriterPtr tmp_writer; - LOG_AND_THROW_IF_ERROR(_fs->create_file(buffer, &tmp_writer), "Touch file IO error"); + auto st = _fs->create_file(buffer, &tmp_writer); + DBUG_EXECUTE_IF("DorisFSDirectory::touchFile_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: DorisFSDirectory::touchFile_status_is_not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, "Touch file IO error"); } int64_t DorisFSDirectory::fileLength(const char* name) const { @@ -532,6 +572,10 @@ int64_t DorisFSDirectory::fileLength(const char* name) const { if (st.code() == ErrorCode::NOT_FOUND) { _CLTHROWA(CL_ERR_FileNotFound, "File does not exist"); } + DBUG_EXECUTE_IF("DorisFSDirectory::fileLength_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: DorisFSDirectory::fileLength_status_is_not_ok"); + }) LOG_AND_THROW_IF_ERROR(st, "Get file size IO error"); return size; } @@ -544,13 +588,21 @@ bool DorisFSDirectory::openInput(const char* name, lucene::store::IndexInput*& r return FSIndexInput::open(_fs, fl, ret, error, bufferSize); } -void DorisFSDirectory::close() {} +void DorisFSDirectory::close() { + DBUG_EXECUTE_IF("DorisFSDirectory::close_close_with_error", + { _CLTHROWA(CL_ERR_IO, "debug_point: close DorisFSDirectory error"); }) +} bool DorisFSDirectory::doDeleteFile(const char* name) { CND_PRECONDITION(directory[0] != 0, "directory is not open"); char fl[CL_MAX_DIR]; priv_getFN(fl, name); - LOG_AND_THROW_IF_ERROR(_fs->delete_file(fl), "Delete file IO error"); + auto st = _fs->delete_file(fl); + DBUG_EXECUTE_IF("DorisFSDirectory::doDeleteFile_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: DorisFSDirectory::doDeleteFile_status_is_not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, "Delete file IO error"); return true; } @@ -558,8 +610,12 @@ bool DorisFSDirectory::deleteDirectory() { CND_PRECONDITION(directory[0] != 0, "directory is not open"); char fl[CL_MAX_DIR]; priv_getFN(fl, ""); - LOG_AND_THROW_IF_ERROR(_fs->delete_directory(fl), - fmt::format("Delete directory {} IO error", fl)); + auto st = _fs->delete_directory(fl); + DBUG_EXECUTE_IF("DorisFSDirectory::deleteDirectory_throw_is_not_directory", { + st = Status::Error<ErrorCode::NOT_FOUND>( + fmt::format("debug point: {} is not a directory", fl)); + }) + LOG_AND_THROW_IF_ERROR(st, fmt::format("Delete directory {} IO error", fl)); return true; } @@ -573,11 +629,26 @@ void DorisFSDirectory::renameFile(const char* from, const char* to) { priv_getFN(nu, to); bool exists = false; - LOG_AND_THROW_IF_ERROR(_fs->exists(nu, &exists), "File exists IO error"); + auto st = _fs->exists(nu, &exists); + DBUG_EXECUTE_IF("DorisFSDirectory::renameFile_exists_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: DorisFSDirectory::renameFile_exists_status_is_not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, "File exists IO error"); if (exists) { - LOG_AND_THROW_IF_ERROR(_fs->delete_directory(nu), fmt::format("Delete {} IO error", nu)); + st = _fs->delete_directory(nu); + DBUG_EXECUTE_IF("DorisFSDirectory::renameFile_delete_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: DorisFSDirectory::renameFile_delete_status_is_not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, fmt::format("Delete {} IO error", nu)); } - LOG_AND_THROW_IF_ERROR(_fs->rename(old, nu), fmt::format("Rename {} to {} IO error", old, nu)); + st = _fs->rename(old, nu); + DBUG_EXECUTE_IF("DorisFSDirectory::renameFile_rename_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: DorisFSDirectory::renameFile_rename_status_is_not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, fmt::format("Rename {} to {} IO error", old, nu)); } lucene::store::IndexOutput* DorisFSDirectory::createOutput(const char* name) { @@ -585,11 +656,31 @@ lucene::store::IndexOutput* DorisFSDirectory::createOutput(const char* name) { char fl[CL_MAX_DIR]; priv_getFN(fl, name); bool exists = false; - LOG_AND_THROW_IF_ERROR(_fs->exists(fl, &exists), "Create output file exists IO error"); + auto st = _fs->exists(fl, &exists); + DBUG_EXECUTE_IF("DorisFSDirectory::createOutput_exists_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: DorisFSDirectory::createOutput_exists_status_is_not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, "Create output file exists IO error"); if (exists) { - LOG_AND_THROW_IF_ERROR(_fs->delete_file(fl), - fmt::format("Create output delete file {} IO error", fl)); - LOG_AND_THROW_IF_ERROR(_fs->exists(fl, &exists), "Create output file exists IO error"); + st = _fs->delete_file(fl); + DBUG_EXECUTE_IF("DorisFSDirectory::createOutput_delete_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: DorisFSDirectory::createOutput_delete_status_is_not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, fmt::format("Create output delete file {} IO error", fl)); + st = _fs->exists(fl, &exists); + DBUG_EXECUTE_IF("DorisFSDirectory::createOutput_exists_after_delete_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: " + "DorisFSDirectory::createOutput_exists_after_delete_status_is_not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, "Create output file exists IO error"); + DBUG_EXECUTE_IF("DorisFSDirectory::createOutput_exists_after_delete_error", + { exists = true; }) + if (exists) { + _CLTHROWA(CL_ERR_IO, fmt::format("File {} should not exist", fl).c_str()); + } assert(!exists); } auto* ret = _CLNEW FSIndexOutput(); @@ -653,6 +744,10 @@ bool DorisRAMFSDirectory::fileExists(const char* name) const { int64_t DorisRAMFSDirectory::fileModified(const char* name) const { std::lock_guard<std::mutex> wlock(_this_lock); auto* f = filesMap->get((char*)name); + DBUG_EXECUTE_IF("DorisRAMFSDirectory::fileModified_file_not_found", { f = nullptr; }) + if (f == nullptr) { + _CLTHROWA(CL_ERR_IO, fmt::format("NOT FOUND File {}.", name).c_str()); + } return f->getLastModified(); } @@ -661,6 +756,10 @@ void DorisRAMFSDirectory::touchFile(const char* name) { { std::lock_guard<std::mutex> wlock(_this_lock); file = filesMap->get((char*)name); + DBUG_EXECUTE_IF("DorisRAMFSDirectory::touchFile_file_not_found", { file = nullptr; }) + if (file == nullptr) { + _CLTHROWA(CL_ERR_IO, fmt::format("NOT FOUND File {}.", name).c_str()); + } } const uint64_t ts1 = file->getLastModified(); uint64_t ts2 = lucene::util::Misc::currentTimeMillis(); @@ -677,6 +776,10 @@ void DorisRAMFSDirectory::touchFile(const char* name) { int64_t DorisRAMFSDirectory::fileLength(const char* name) const { std::lock_guard<std::mutex> wlock(_this_lock); auto* f = filesMap->get((char*)name); + DBUG_EXECUTE_IF("DorisRAMFSDirectory::fileLength_file_not_found", { f = nullptr; }) + if (f == nullptr) { + _CLTHROWA(CL_ERR_IO, fmt::format("NOT FOUND File {}.", name).c_str()); + } return f->getLength(); } @@ -684,6 +787,7 @@ bool DorisRAMFSDirectory::openInput(const char* name, lucene::store::IndexInput* CLuceneError& error, int32_t bufferSize) { std::lock_guard<std::mutex> wlock(_this_lock); auto* file = filesMap->get((char*)name); + DBUG_EXECUTE_IF("DorisRAMFSDirectory::openInput_file_not_found", { file = nullptr; }) if (file == nullptr) { error.set(CL_ERR_IO, "[DorisRAMCompoundDirectory::open] The requested file does not exist."); @@ -695,6 +799,8 @@ bool DorisRAMFSDirectory::openInput(const char* name, lucene::store::IndexInput* void DorisRAMFSDirectory::close() { DorisFSDirectory::close(); + DBUG_EXECUTE_IF("DorisRAMFSDirectory::close_close_with_error", + { _CLTHROWA(CL_ERR_IO, "debug_point: close DorisRAMFSDirectory error"); }) } bool DorisRAMFSDirectory::doDeleteFile(const char* name) { @@ -730,6 +836,7 @@ void DorisRAMFSDirectory::renameFile(const char* from, const char* to) { sizeInBytes -= itr1->second->sizeInBytes; filesMap->removeitr(itr1); } + DBUG_EXECUTE_IF("DorisRAMFSDirectory::renameFile_itr_filesMap_end", { itr = filesMap->end(); }) if (itr == filesMap->end()) { char tmp[1024]; snprintf(tmp, 1024, "cannot rename %s, file does not exist", from); @@ -752,6 +859,8 @@ lucene::store::IndexOutput* DorisRAMFSDirectory::createOutput(const char* name) // get the actual pointer to the output name char* n = nullptr; auto itr = filesMap->find(const_cast<char*>(name)); + DBUG_EXECUTE_IF("DorisRAMFSDirectory::createOutput_itr_filesMap_end", + { itr = filesMap->end(); }) if (itr != filesMap->end()) { n = itr->first; lucene::store::RAMFile* rf = itr->second; @@ -784,6 +893,7 @@ DorisFSDirectory* DorisFSDirectoryFactory::getDirectory(const io::FileSystemSPtr const char* _file, bool can_use_ram_dir, lucene::store::LockFactory* lock_factory) { DorisFSDirectory* dir = nullptr; + DBUG_EXECUTE_IF("DorisFSDirectoryFactory::getDirectory_file_is_nullptr", { _file = nullptr; }); if (!_file || !*_file) { _CLTHROWA(CL_ERR_IO, "Invalid directory"); } @@ -797,10 +907,22 @@ DorisFSDirectory* DorisFSDirectoryFactory::getDirectory(const io::FileSystemSPtr dir = _CLNEW DorisRAMFSDirectory(); } else { bool exists = false; - LOG_AND_THROW_IF_ERROR(_fs->exists(file, &exists), "Get directory exists IO error"); + auto st = _fs->exists(file, &exists); + DBUG_EXECUTE_IF("DorisFSDirectoryFactory::getDirectory_exists_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: DorisFSDirectoryFactory::getDirectory_exists_status_is_not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, "Get directory exists IO error"); if (!exists) { - LOG_AND_THROW_IF_ERROR(_fs->create_directory(file), - "Get directory create directory IO error"); + st = _fs->create_directory(file); + DBUG_EXECUTE_IF( + "DorisFSDirectoryFactory::getDirectory_create_directory_status_is_not_ok", { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: " + "DorisFSDirectoryFactory::getDirectory_create_directory_status_is_" + "not_ok"); + }) + LOG_AND_THROW_IF_ERROR(st, "Get directory create directory IO error"); } dir = _CLNEW DorisFSDirectory(); } diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp index 50874d0db5c..29fe4609e59 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp @@ -118,6 +118,12 @@ public: Status init() override { try { + DBUG_EXECUTE_IF("InvertedIndexColumnWriter::init_field_type_not_supported", { + return Status::Error<doris::ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>( + "Field type not supported"); + }) + DBUG_EXECUTE_IF("InvertedIndexColumnWriter::init_inverted_index_writer_init_error", + { _CLTHROWA(CL_ERR_IO, "debug point: init index error"); }) if constexpr (field_is_slice_type(field_type)) { return init_fulltext_index(); } else if constexpr (field_is_numeric_type(field_type)) { @@ -141,6 +147,8 @@ public: void close_on_error() override { try { + DBUG_EXECUTE_IF("InvertedIndexColumnWriter::close_on_error_throw_exception", + { _CLTHROWA(CL_ERR_IO, "debug point: close on error"); }) if (_index_writer) { _index_writer->close(); } @@ -160,6 +168,9 @@ public: _bkd_writer = std::make_shared<lucene::util::bkd::bkd_writer>( max_doc, DIMS, DIMS, value_length, MAX_LEAF_COUNT, MAXMBSortInHeap, total_point_count, true, config::max_depth_in_bkd_tree); + DBUG_EXECUTE_IF("InvertedIndexColumnWriter::init_bkd_index_throw_error", { + _CLTHROWA(CL_ERR_IllegalArgument, "debug point: create bkd_writer error"); + }) return open_index_directory(); } @@ -174,6 +185,10 @@ public: } Status open_index_directory() { + DBUG_EXECUTE_IF("InvertedIndexColumnWriter::open_index_directory_error", { + return Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: open_index_directory_error"); + }) _dir = DORIS_TRY(_index_file_writer->open(_index_meta)); return Status::OK(); } @@ -183,6 +198,12 @@ public: bool close_dir_on_shutdown = true; auto index_writer = std::make_unique<lucene::index::IndexWriter>( _dir, _analyzer.get(), create_index, close_dir_on_shutdown); + DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setRAMBufferSizeMB_error", + { index_writer->setRAMBufferSizeMB(-100); }) + DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setMaxBufferedDocs_error", + { index_writer->setMaxBufferedDocs(1); }) + DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setMergeFactor_error", + { index_writer->setMergeFactor(1); }) index_writer->setRAMBufferSizeMB(config::inverted_index_ram_buffer_size); index_writer->setMaxBufferedDocs(config::inverted_index_max_buffered_docs); index_writer->setMaxFieldLength(MAX_FIELD_LEN); @@ -247,6 +268,8 @@ public: try { _index_writer->addDocument(_doc.get()); + DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_document_throw_error", + { _CLTHROWA(CL_ERR_IO, "debug point: add_document io error"); }) } catch (const CLuceneError& e) { close_on_error(); return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( @@ -258,6 +281,8 @@ public: Status add_null_document() { try { _index_writer->addNullDocument(_doc.get()); + DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_null_document_throw_error", + { _CLTHROWA(CL_ERR_IO, "debug point: add_null_document io error"); }) } catch (const CLuceneError& e) { close_on_error(); return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( @@ -270,6 +295,10 @@ public: _null_bitmap.addRange(_rid, _rid + count); _rid += count; if constexpr (field_is_slice_type(field_type)) { + DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_nulls_field_nullptr", + { _field = nullptr; }) + DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_nulls_index_writer_nullptr", + { _index_writer = nullptr; }) if (_field == nullptr || _index_writer == nullptr) { LOG(ERROR) << "field or index writer is null in inverted index writer."; return Status::InternalError( @@ -288,17 +317,30 @@ public: return Status::OK(); } - void new_inverted_index_field(const char* field_value_data, size_t field_value_size) { - if (_parser_type != InvertedIndexParserType::PARSER_UNKNOWN && - _parser_type != InvertedIndexParserType::PARSER_NONE) { - new_char_token_stream(field_value_data, field_value_size, _field); - } else { - new_field_char_value(field_value_data, field_value_size, _field); + Status new_inverted_index_field(const char* field_value_data, size_t field_value_size) { + try { + if (_parser_type != InvertedIndexParserType::PARSER_UNKNOWN && + _parser_type != InvertedIndexParserType::PARSER_NONE) { + new_char_token_stream(field_value_data, field_value_size, _field); + } else { + new_field_char_value(field_value_data, field_value_size, _field); + } + } catch (const CLuceneError& e) { + return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "CLuceneError create new index field error: {}", e.what()); } + return Status::OK(); } void new_char_token_stream(const char* s, size_t len, lucene::document::Field* field) { _char_string_reader->init(s, len, false); + DBUG_EXECUTE_IF( + "InvertedIndexColumnWriterImpl::new_char_token_stream__char_string_reader_init_" + "error", + { + _CLTHROWA(CL_ERR_UnsupportedOperation, + "UnsupportedOperationException: CLStream::init"); + }) auto* stream = _analyzer->reusableTokenStream(field->name(), _char_string_reader.get()); field->setValue(stream); } @@ -316,6 +358,10 @@ public: Status add_values(const std::string fn, const void* values, size_t count) override { if constexpr (field_is_slice_type(field_type)) { + DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_values_field_is_nullptr", + { _field = nullptr; }) + DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_values_index_writer_is_nullptr", + { _index_writer = nullptr; }) if (_field == nullptr || _index_writer == nullptr) { LOG(ERROR) << "field or index writer is null in inverted index writer."; return Status::InternalError( @@ -329,7 +375,7 @@ public: (_parser_type != InvertedIndexParserType::PARSER_NONE && v->empty())) { RETURN_IF_ERROR(add_null_document()); } else { - new_inverted_index_field(v->get_data(), v->get_size()); + RETURN_IF_ERROR(new_inverted_index_field(v->get_data(), v->get_size())); RETURN_IF_ERROR(add_document()); } ++v; @@ -343,12 +389,17 @@ public: Status add_array_values(size_t field_size, const void* value_ptr, const uint8_t* null_map, const uint8_t* offsets_ptr, size_t count) override { + DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_array_values_count_is_zero", + { count = 0; }) if (count == 0) { // no values to add inverted index return Status::OK(); } const auto* offsets = reinterpret_cast<const uint64_t*>(offsets_ptr); if constexpr (field_is_slice_type(field_type)) { + DBUG_EXECUTE_IF( + "InvertedIndexColumnWriterImpl::add_array_values_index_writer_is_nullptr", + { _index_writer = nullptr; }) if (_index_writer == nullptr) { LOG(ERROR) << "index writer is null in inverted index writer."; return Status::InternalError("index writer is null in inverted index writer"); @@ -374,7 +425,15 @@ public: continue; } else { // now we temp create field . later make a pool - if (Status st = create_field(&new_field); st != Status::OK()) { + Status st = create_field(&new_field); + DBUG_EXECUTE_IF( + "InvertedIndexColumnWriterImpl::add_array_values_create_field_" + "error", + { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: add_array_values_create_field_error"); + }) + if (st != Status::OK()) { LOG(ERROR) << "create field " << string(_field_name.begin(), _field_name.end()) << " error:" << st; @@ -426,7 +485,14 @@ public: // avoid to add doc which without any field which may make threadState init skip // init fieldDataArray, then will make error with next doc with fields in // resetCurrentFieldData - if (Status st = create_field(&new_field); st != Status::OK()) { + Status st = create_field(&new_field); + DBUG_EXECUTE_IF( + "InvertedIndexColumnWriterImpl::add_array_values_create_field_error_2", + { + st = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: add_array_values_create_field_error_2"); + }) + if (st != Status::OK()) { LOG(ERROR) << "create field " << string(_field_name.begin(), _field_name.end()) << " error:" << st; @@ -460,6 +526,11 @@ public: Status add_array_values(size_t field_size, const CollectionValue* values, size_t count) override { if constexpr (field_is_slice_type(field_type)) { + DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_array_values_field_is_nullptr", + { _field = nullptr; }) + DBUG_EXECUTE_IF( + "InvertedIndexColumnWriterImpl::add_array_values_index_writer_is_nullptr", + { _index_writer = nullptr; }) if (_field == nullptr || _index_writer == nullptr) { LOG(ERROR) << "field or index writer is null in inverted index writer."; return Status::InternalError( @@ -478,7 +549,7 @@ public: item_data_ptr = (uint8_t*)item_data_ptr + field_size; } auto value = join(strings, " "); - new_inverted_index_field(value.c_str(), value.length()); + RETURN_IF_ERROR(new_inverted_index_field(value.c_str(), value.length())); _rid++; RETURN_IF_ERROR(add_document()); values++; @@ -668,6 +739,8 @@ Status InvertedIndexColumnWriter::create(const Field* field, bool single_field = true; if (type == FieldType::OLAP_FIELD_TYPE_ARRAY) { const auto* array_typeinfo = dynamic_cast<const ArrayTypeInfo*>(typeinfo); + DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_array_typeinfo_is_nullptr", + { array_typeinfo = nullptr; }) if (array_typeinfo != nullptr) { typeinfo = array_typeinfo->item_type_info(); type = typeinfo->type(); @@ -678,6 +751,8 @@ Status InvertedIndexColumnWriter::create(const Field* field, } } + DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_unsupported_type_for_inverted_index", + { type = FieldType::OLAP_FIELD_TYPE_FLOAT; }) switch (type) { #define M(TYPE) \ case TYPE: \ diff --git a/be/src/olap/task/index_builder.cpp b/be/src/olap/task/index_builder.cpp index 5e44f049f47..d7c450d7e44 100644 --- a/be/src/olap/task/index_builder.cpp +++ b/be/src/olap/task/index_builder.cpp @@ -68,8 +68,11 @@ Status IndexBuilder::update_inverted_index_info() { _output_rowsets.reserve(_input_rowsets.size()); _pending_rs_guards.reserve(_input_rowsets.size()); for (auto&& input_rowset : _input_rowsets) { - if (!input_rowset->is_local()) [[unlikely]] { - DCHECK(false) << _tablet->tablet_id() << ' ' << input_rowset->rowset_id(); + bool is_local_rowset = input_rowset->is_local(); + DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_is_local_rowset", + { is_local_rowset = false; }) + if (!is_local_rowset) [[unlikely]] { + // DCHECK(false) << _tablet->tablet_id() << ' ' << input_rowset->rowset_id(); return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", _tablet->tablet_id(), input_rowset->rowset_id().to_string()); @@ -81,6 +84,9 @@ Status IndexBuilder::update_inverted_index_info() { size_t total_index_size = 0; auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get()); auto size_st = beta_rowset->get_inverted_index_size(&total_index_size); + DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_size_st_not_ok", { + size_st = Status::Error<ErrorCode::INIT_FAILED>("debug point: get fs failed"); + }) if (!size_st.ok() && !size_st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>() && !size_st.is<ErrorCode::NOT_FOUND>()) { return size_st; @@ -229,6 +235,11 @@ Status IndexBuilder::update_inverted_index_info() { std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)}, output_rs_tablet_schema->get_inverted_index_storage_format()); auto st = idx_file_reader->init(); + DBUG_EXECUTE_IF( + "IndexBuilder::update_inverted_index_info_index_file_reader_init_not_ok", { + st = Status::Error<ErrorCode::INIT_FAILED>( + "debug point: reader init error"); + }) if (!st.ok() && !st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) { return st; } @@ -262,8 +273,11 @@ Status IndexBuilder::update_inverted_index_info() { Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta, std::vector<segment_v2::SegmentSharedPtr>& segments) { - if (!output_rowset_meta->is_local()) [[unlikely]] { - DCHECK(false) << _tablet->tablet_id() << ' ' << output_rowset_meta->rowset_id(); + bool is_local_rowset = output_rowset_meta->is_local(); + DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_is_local_rowset", + { is_local_rowset = false; }) + if (!is_local_rowset) [[unlikely]] { + // DCHECK(false) << _tablet->tablet_id() << ' ' << output_rowset_meta->rowset_id(); return Status::InternalError("should be local rowset. tablet_id={} rowset_id={}", _tablet->tablet_id(), output_rowset_meta->rowset_id().to_string()); @@ -280,6 +294,8 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta for (auto& seg_ptr : segments) { auto idx_file_reader_iter = _inverted_index_file_readers.find( std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id())); + DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader_drop_op", + { idx_file_reader_iter = _inverted_index_file_readers.end(); }) if (idx_file_reader_iter == _inverted_index_file_readers.end()) { LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":" << seg_ptr->id() << " cannot be found"; @@ -350,6 +366,8 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta InvertedIndexStorageFormatPB::V2) { auto idx_file_reader_iter = _inverted_index_file_readers.find( std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id())); + DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader", + { idx_file_reader_iter = _inverted_index_file_readers.end(); }) if (idx_file_reader_iter == _inverted_index_file_readers.end()) { LOG(ERROR) << "idx_file_reader_iter" << output_rowset_meta->rowset_id() << ":" << seg_ptr->id() << " cannot be found"; @@ -395,7 +413,11 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta } auto column = output_rowset_schema->column(column_idx); // variant column is not support for building index - if (!InvertedIndexColumnWriter::check_support_inverted_index(column)) { + auto is_support_inverted_index = + InvertedIndexColumnWriter::check_support_inverted_index(column); + DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_support_inverted_index", + { is_support_inverted_index = false; }) + if (!is_support_inverted_index) { continue; } DCHECK(output_rowset_schema->has_inverted_index_with_index_id(index_id)); @@ -408,6 +430,12 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta RETURN_IF_ERROR(segment_v2::InvertedIndexColumnWriter::create( field.get(), &inverted_index_builder, inverted_index_file_writer.get(), index_meta)); + DBUG_EXECUTE_IF( + "IndexBuilder::handle_single_rowset_index_column_writer_create_error", { + _CLTHROWA(CL_ERR_IO, + "debug point: " + "handle_single_rowset_index_column_writer_create_error"); + }) } catch (const std::exception& e) { return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( "CLuceneError occured: {}", e.what()); @@ -438,6 +466,10 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta std::make_shared<Schema>(output_rowset_schema->columns(), return_columns); std::unique_ptr<RowwiseIterator> iter; auto res = seg_ptr->new_iterator(schema, read_options, &iter); + DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_create_iterator_error", { + res = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: handle_single_rowset_create_iterator_error"); + }) if (!res.ok()) { LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << res.to_string(); @@ -448,7 +480,7 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta output_rowset_schema->create_block(return_columns)); while (true) { auto status = iter->next_batch(block.get()); - DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset", { + DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_iterator_next_batch_error", { status = Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>( "next_batch fault injection"); }); @@ -463,8 +495,15 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta } // write inverted index data - if (_write_inverted_index_data(output_rowset_schema, iter->data_id(), - block.get()) != Status::OK()) { + status = _write_inverted_index_data(output_rowset_schema, iter->data_id(), + block.get()); + DBUG_EXECUTE_IF( + "IndexBuilder::handle_single_rowset_write_inverted_index_data_error", { + status = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: " + "handle_single_rowset_write_inverted_index_data_error"); + }) + if (!status.ok()) { return Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>( "failed to write block."); } @@ -477,6 +516,10 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta if (_inverted_index_builders[writer_sign]) { RETURN_IF_ERROR(_inverted_index_builders[writer_sign]->finish()); } + DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_index_build_finish_error", { + _CLTHROWA(CL_ERR_IO, + "debug point: handle_single_rowset_index_build_finish_error"); + }) } catch (const std::exception& e) { return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( "CLuceneError occured: {}", e.what()); @@ -487,6 +530,10 @@ Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta } for (auto&& [seg_id, inverted_index_file_writer] : _inverted_index_file_writers) { auto st = inverted_index_file_writer->close(); + DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_file_writer_close_error", { + st = Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( + "debug point: handle_single_rowset_file_writer_close_error"); + }) if (!st.ok()) { LOG(ERROR) << "close inverted_index_writer error:" << st; return st; @@ -516,6 +563,8 @@ Status IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema, auto index_id = inverted_index.index_id; auto column_name = inverted_index.columns[0]; auto column_idx = tablet_schema->field_index(column_name); + DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_column_idx_is_negative", + { column_idx = -1; }) if (column_idx < 0) { if (!inverted_index.column_unique_ids.empty()) { auto column_unique_id = inverted_index.column_unique_ids[0]; @@ -532,6 +581,10 @@ Status IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema, auto writer_sign = std::make_pair(segment_idx, index_id); std::unique_ptr<Field> field(FieldFactory::create(column)); auto converted_result = _olap_data_convertor->convert_column_data(i); + DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_convert_column_data_error", { + converted_result.first = Status::Error<ErrorCode::INTERNAL_ERROR>( + "debug point: _write_inverted_index_data_convert_column_data_error"); + }) if (converted_result.first != Status::OK()) { LOG(WARNING) << "failed to convert block, errcode: " << converted_result.first; return converted_result.first; @@ -583,6 +636,9 @@ Status IndexBuilder::_add_nullable(const std::string& column_name, field->get_sub_field(0)->size(), reinterpret_cast<const void*>(data), reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows)); } + DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_add_array_values_error", { + _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_add_array_values_error"); + }) } catch (const std::exception& e) { return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>( "CLuceneError occured: {}", e.what()); @@ -608,6 +664,8 @@ Status IndexBuilder::_add_nullable(const std::string& column_name, } *ptr += field->size() * step; offset += step; + DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_throw_exception", + { _CLTHROWA(CL_ERR_IO, "debug point: _add_nullable_throw_exception"); }) } while (offset < num_rows); } catch (const std::exception& e) { return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}", @@ -640,6 +698,8 @@ Status IndexBuilder::_add_data(const std::string& column_name, RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values( column_name, *ptr, num_rows)); } + DBUG_EXECUTE_IF("IndexBuilder::_add_data_throw_exception", + { _CLTHROWA(CL_ERR_IO, "debug point: _add_data_throw_exception"); }) } catch (const std::exception& e) { return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured: {}", e.what()); @@ -665,6 +725,8 @@ Status IndexBuilder::handle_inverted_index_data() { Status IndexBuilder::do_build_inverted_index() { LOG(INFO) << "begin to do_build_inverted_index, tablet=" << _tablet->tablet_id() << ", is_drop_op=" << _is_drop_op; + DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_alter_inverted_indexes_empty", + { _alter_inverted_indexes.clear(); }) if (_alter_inverted_indexes.empty()) { return Status::OK(); } @@ -731,6 +793,10 @@ Status IndexBuilder::do_build_inverted_index() { // modify rowsets in memory st = modify_rowsets(); + DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_modify_rowsets_status_error", { + st = Status::Error<ErrorCode::DELETE_VERSION_ERROR>( + "debug point: do_build_inverted_index_modify_rowsets_status_error"); + }) if (!st.ok()) { LOG(WARNING) << "failed to modify rowsets in memory. " << "tablet=" << _tablet->tablet_id() << ", error=" << st; @@ -788,7 +854,10 @@ Status IndexBuilder::modify_rowsets(const Merger::Statistics* stats) { void IndexBuilder::gc_output_rowset() { for (auto&& output_rowset : _output_rowsets) { - if (!output_rowset->is_local()) { + auto is_local_rowset = output_rowset->is_local(); + DBUG_EXECUTE_IF("IndexBuilder::gc_output_rowset_is_local_rowset", + { is_local_rowset = false; }) + if (!is_local_rowset) { _tablet->record_unused_remote_rowset(output_rowset->rowset_id(), output_rowset->rowset_meta()->resource_id(), output_rowset->num_segments()); diff --git a/regression-test/suites/fault_injection_p0/test_index_compaction_exception_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_index_compaction_exception_fault_injection.groovy new file mode 100644 index 00000000000..ac3cd8125a8 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_index_compaction_exception_fault_injection.groovy @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_index_compaction_exception_fault_injection", "nonConcurrent") { + def isCloudMode = isCloudMode() + def tableName = "test_index_compaction_exception_fault_injection_dups" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def changed_variables = sql "show variables where Changed = 1" + logger.info("changed variables: " + changed_variables.toString()) + // sql "UNSET GLOBAL VARIABLE ALL;" + sql "SET global enable_match_without_inverted_index = false" + + boolean disableAutoCompaction = false + + def set_be_config = { key, value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def trigger_full_compaction_on_tablets = { tablets -> + for (def tablet : tablets) { + String tablet_id = tablet.TabletId + String backend_id = tablet.BackendId + int times = 1 + + String compactionStatus; + do{ + def (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + ++times + sleep(2000) + compactionStatus = parseJson(out.trim()).status.toLowerCase(); + } while (compactionStatus!="success" && times<=10 && compactionStatus!="e-6010") + + + if (compactionStatus == "fail") { + assertEquals(disableAutoCompaction, false) + logger.info("Compaction was done automatically!") + } + if (disableAutoCompaction && compactionStatus!="e-6010") { + assertEquals("success", compactionStatus) + } + } + } + + def wait_full_compaction_done = { tablets -> + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + String backend_id = tablet.BackendId + def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + } + + def get_rowset_count = { tablets -> + int rowsetCount = 0 + for (def tablet in tablets) { + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List<String>) tabletJson.rowsets).size() + } + return rowsetCount + } + + def check_config = { String key, String value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == key) { + assertEquals(value, ((List<String>) ele)[2]) + } + } + } + } + + def insert_data = { -> + sql """ INSERT INTO ${tableName} VALUES (1, "andy", 10, [89, 80, 98], ["football", "basketball"], "andy is good at sports", ["andy has a good heart", "andy is so nice"]); """ + sql """ INSERT INTO ${tableName} VALUES (1, "bason", 11, [79, 85, 97], ["singing", "dancing"], "bason is good at singing", ["bason is very clever", "bason is very healthy"]); """ + sql """ INSERT INTO ${tableName} VALUES (2, "andy", 10, [89, 80, 98], ["football", "basketball"], "andy is good at sports", ["andy has a good heart", "andy is so nice"]); """ + sql """ INSERT INTO ${tableName} VALUES (2, "bason", 11, [79, 85, 97], ["singing", "dancing"], "bason is good at singing", ["bason is very clever", "bason is very healthy"]); """ + sql """ INSERT INTO ${tableName} VALUES (3, "andy", 10, [89, 80, 98], ["football", "basketball"], "andy is good at sports", ["andy has a good heart", "andy is so nice"]); """ + sql """ INSERT INTO ${tableName} VALUES (3, "bason", 11, [79, 85, 97], ["singing", "dancing"], "bason is good at singing", ["bason is very clever", "bason is very healthy"]); """ + } + + def run_sql = { -> + def result = sql_return_maparray "SELECT * FROM ${tableName} WHERE name MATCH 'bason'" + assertEquals(3, result.size()) + assertEquals(1, result[0]['id']) + assertEquals("bason", result[0]['name']) + assertEquals(2, result[1]['id']) + assertEquals("bason", result[1]['name']) + assertEquals(3, result[2]['id']) + assertEquals("bason", result[2]['name']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE age = 11" + assertEquals(3, result.size()) + assertEquals(1, result[0]['id']) + assertEquals("bason", result[0]['name']) + assertEquals(2, result[1]['id']) + assertEquals("bason", result[1]['name']) + assertEquals(3, result[2]['id']) + assertEquals("bason", result[2]['name']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE description MATCH 'singing'" + assertEquals(3, result.size()) + assertEquals("bason", result[0]['name']) + assertEquals("bason is good at singing", result[0]['description']) + assertEquals("bason", result[1]['name']) + assertEquals("bason is good at singing", result[1]['description']) + assertEquals("bason", result[2]['name']) + assertEquals("bason is good at singing", result[2]['description']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(scores, 79)" + assertEquals(3, result.size()) + assertEquals("bason", result[0]['name']) + assertEquals("[79, 85, 97]", result[0]['scores']) + assertEquals("bason", result[1]['name']) + assertEquals("[79, 85, 97]", result[1]['scores']) + assertEquals("bason", result[2]['name']) + assertEquals("[79, 85, 97]", result[2]['scores']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(hobbies, 'dancing')" + assertEquals(3, result.size()) + assertEquals("bason", result[0]['name']) + assertEquals('["singing", "dancing"]', result[0]['hobbies']) + assertEquals("bason", result[1]['name']) + assertEquals('["singing", "dancing"]', result[1]['hobbies']) + assertEquals("bason", result[2]['name']) + assertEquals('["singing", "dancing"]', result[2]['hobbies']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(evaluation, 'bason is very clever')" + assertEquals(3, result.size()) + assertEquals("bason", result[0]['name']) + assertEquals('["bason is very clever", "bason is very healthy"]', result[0]['evaluation']) + assertEquals("bason", result[1]['name']) + assertEquals('["bason is very clever", "bason is very healthy"]', result[1]['evaluation']) + assertEquals("bason", result[2]['name']) + assertEquals('["bason is very clever", "bason is very healthy"]', result[2]['evaluation']) + } + + // define debug points array + def debug_points_abnormal_compaction = [ + "compact_column_getDirectory_error", + "compact_column_create_index_writer_error", + "compact_column_indexCompaction_error", + "compact_column_index_writer_close_error", + "compact_column_src_index_dirs_close_error", + "Compaction::do_inverted_index_compaction_find_rowset_error", + "Compaction::do_inverted_index_compaction_get_fs_error", + "Compaction::do_inverted_index_compaction_index_file_reader_init_error", + // "Compaction::do_inverted_index_compaction_file_size_status_not_ok", // v2 do not do index compaction + "Compaction::do_inverted_index_compaction_can_not_find_index_meta", + "Compaction::do_inverted_index_compaction_index_properties_different", + "Compaction::do_inverted_index_compaction_index_file_writer_close_not_ok", + "Compaction::construct_skip_inverted_index_index_reader_close_error" + ] + + def debug_points_normal_compaction = [ + "compact_column_local_tmp_dir_delete_error", + // "Compaction::do_inverted_index_compaction_dest_segment_num_is_zero", // query result not match without inverted index + "Compaction::do_inverted_index_compaction_index_file_reader_init_not_found", + "Compaction::construct_skip_inverted_index_is_skip_index_compaction", + "Compaction::construct_skip_inverted_index_get_fs_error", + "Compaction::construct_skip_inverted_index_index_meta_nullptr", + "Compaction::construct_skip_inverted_index_seg_path_nullptr", + "Compaction::construct_skip_inverted_index_index_file_reader_init_status_not_ok", + "Compaction::construct_skip_inverted_index_index_file_reader_exist_status_not_ok", + "Compaction::construct_skip_inverted_index_index_file_reader_exist_false", + "Compaction::construct_skip_inverted_index_index_file_reader_open_error", + "Compaction::construct_skip_inverted_index_index_files_count" + ] + + def run_test = { tablets, debug_point, abnormal -> + insert_data.call() + + run_sql.call() + + int replicaNum = 1 + def dedup_tablets = deduplicate_tablets(tablets) + if (dedup_tablets.size() > 0) { + replicaNum = Math.round(tablets.size() / dedup_tablets.size()) + if (replicaNum != 1 && replicaNum != 3) { + assert(false) + } + } + + // before full compaction, there are 7 rowsets. + int rowsetCount = get_rowset_count.call(tablets); + assert (rowsetCount == 7 * replicaNum) + + // debug point, enable it, triger full compaction, wait full compaction done, and disable the debug point + try { + GetDebugPoint().enableDebugPointForAllBEs(debug_point) + logger.info("trigger_full_compaction_on_tablets with fault injection: ${debug_point}") + trigger_full_compaction_on_tablets.call(tablets) + wait_full_compaction_done.call(tablets) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(debug_point) + } + + if (abnormal) { + // after fault injection, there are still 7 rowsets. + rowsetCount = get_rowset_count.call(tablets); + assert (rowsetCount == 7 * replicaNum) + + logger.info("trigger_full_compaction_on_tablets normally") + // trigger full compactions for all tablets in ${tableName} + // this time, index compaction of some columns will be skipped because of the fault injection + trigger_full_compaction_on_tablets.call(tablets) + + // wait for full compaction done + wait_full_compaction_done.call(tablets) + } + + // after full compaction, there is only 1 rowset. + rowsetCount = get_rowset_count.call(tablets); + if (isCloudMode) { + assert (rowsetCount == (1 + 1) * replicaNum) + } else { + assert (rowsetCount == 1 * replicaNum) + } + + run_sql.call() + } + + def create_and_test_table = { table_name, key_type, debug_points, is_abnormal -> + debug_points.each { debug_point -> + sql """ DROP TABLE IF EXISTS ${table_name}; """ + sql """ + CREATE TABLE ${table_name} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `age` int(11) NULL, + `scores` array<int> NULL, + `hobbies` array<text> NULL, + `description` text NULL, + `evaluation` array<text> NULL, + index index_name (name) using inverted, + index index_age (age) using inverted, + index index_scores (scores) using inverted, + index index_hobbies (hobbies) using inverted, + index index_description (description) using inverted properties("parser" = "english"), + index index_evaluation (evaluation) using inverted + ) ENGINE=OLAP + ${key_type} KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "inverted_index_storage_format" = "V1", + "disable_auto_compaction" = "true" + ); + """ + + def tablets = sql_return_maparray """ show tablets from ${table_name}; """ + run_test.call(tablets, debug_point, is_abnormal) + } + } + + boolean invertedIndexCompactionEnable = false + boolean has_update_be_config = false + try { + String backend_id; + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "inverted_index_compaction_enable") { + invertedIndexCompactionEnable = Boolean.parseBoolean(((List<String>) ele)[2]) + logger.info("inverted_index_compaction_enable: ${((List<String>) ele)[2]}") + } + if (((List<String>) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) + logger.info("disable_auto_compaction: ${((List<String>) ele)[2]}") + } + } + set_be_config.call("inverted_index_compaction_enable", "true") + has_update_be_config = true + // check updated config + check_config.call("inverted_index_compaction_enable", "true"); + + // duplicated key table + create_and_test_table.call(tableName, "DUPLICATE", debug_points_abnormal_compaction, true) + create_and_test_table.call(tableName, "DUPLICATE", debug_points_normal_compaction, false) + + // unique key table + tableName = "test_index_compaction_exception_fault_injection_unique" + create_and_test_table.call(tableName, "UNIQUE", debug_points_abnormal_compaction, true) + create_and_test_table.call(tableName, "UNIQUE", debug_points_normal_compaction, false) + + } finally { + if (has_update_be_config) { + set_be_config.call("inverted_index_compaction_enable", invertedIndexCompactionEnable.toString()) + } + sql "SET global enable_match_without_inverted_index = true" + } + +} diff --git a/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy new file mode 100644 index 00000000000..6deb96bfdea --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_write_inverted_index_exception_fault_injection", "nonConcurrent") { + def tableNamePrefix = "test_write_inverted_index_exception_fault_injection" + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_config = { key, value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def check_config = { String key, String value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == key) { + assertEquals(value, ((List<String>) ele)[2]) + } + } + } + } + + def changed_variables = sql "show variables where Changed = 1" + logger.info("changed variables: " + changed_variables.toString()) + // sql "UNSET GLOBAL VARIABLE ALL;" + + sql "SET global enable_match_without_inverted_index = false" + boolean inverted_index_ram_dir_enable = true + boolean has_update_be_config = false + + def creata_table = { String tableName, String format -> + sql "DROP TABLE IF EXISTS ${tableName}" + + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `age` int(11) NULL, + `scores` array<int> NULL, + `hobbies` array<text> NULL, + `description` text NULL, + `evaluation` array<text> NULL, + index index_name (name) using inverted, + index index_age (age) using inverted, + index index_scores (scores) using inverted, + index index_hobbies (hobbies) using inverted, + index index_description (description) using inverted properties("parser" = "english"), + index index_evaluation (evaluation) using inverted + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "inverted_index_storage_format" = "${format}", + "disable_auto_compaction" = "true" + ); + """ + } + + def run_insert = { String tableName -> + sql """ INSERT INTO ${tableName} VALUES (1, "andy", 10, [89, 80, 98], ["football", "basketball"], "andy is good at sports", ["andy has a good heart", "andy is so nice"]); """ + sql """ INSERT INTO ${tableName} VALUES (1, "bason", 11, [79, 85, 97], ["singing", "dancing"], "bason is good at singing", ["bason is very clever", "bason is very healthy"]); """ + sql """ INSERT INTO ${tableName} VALUES (2, "andy", 10, [89, 80, 98], ["football", "basketball"], "andy is good at sports", ["andy has a good heart", "andy is so nice"]); """ + sql """ INSERT INTO ${tableName} VALUES (2, "bason", 11, [79, 85, 97], ["singing", "dancing"], "bason is good at singing", ["bason is very clever", "bason is very healthy"]); """ + sql """ INSERT INTO ${tableName} VALUES (3, "andy", 10, [89, 80, 98], ["football", "basketball"], "andy is good at sports", ["andy has a good heart", "andy is so nice"]); """ + sql """ INSERT INTO ${tableName} VALUES (3, "bason", 11, [79, 85, 97], ["singing", "dancing"], "bason is good at singing", ["bason is very clever", "bason is very healthy"]); """ + } + + def check_count = { String tableName, int count -> + def result = sql "SELECT COUNT(*) FROM ${tableName}" + assertEquals(count, result[0][0]) + } + + def run_select = { String tableName, boolean normal -> + def result = sql_return_maparray "SELECT * FROM ${tableName} WHERE name MATCH 'andy'" + assertEquals(3, result.size()) + assertEquals(1, result[0]['id']) + assertEquals("andy", result[0]['name']) + assertEquals(2, result[1]['id']) + assertEquals("andy", result[1]['name']) + assertEquals(3, result[2]['id']) + assertEquals("andy", result[2]['name']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE age < 11" + assertEquals(3, result.size()) + assertEquals("andy", result[0]['name']) + assertEquals(2, result[1]['id']) + assertEquals("andy", result[1]['name']) + assertEquals(3, result[2]['id']) + assertEquals("andy", result[2]['name']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE description MATCH 'sports'" + assertEquals(3, result.size()) + assertEquals("andy", result[0]['name']) + assertEquals("andy is good at sports", result[0]['description']) + assertEquals("andy", result[1]['name']) + assertEquals("andy is good at sports", result[1]['description']) + assertEquals("andy", result[2]['name']) + assertEquals("andy is good at sports", result[2]['description']) + + if (normal) { + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(scores, 79)" + assertEquals(3, result.size()) + assertEquals("bason", result[0]['name']) + assertEquals("[79, 85, 97]", result[0]['scores']) + assertEquals("bason", result[1]['name']) + assertEquals("[79, 85, 97]", result[1]['scores']) + assertEquals("bason", result[2]['name']) + assertEquals("[79, 85, 97]", result[2]['scores']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(hobbies, 'football')" + assertEquals(3, result.size()) + assertEquals("andy", result[0]['name']) + assertEquals('["football", "basketball"]', result[0]['hobbies']) + assertEquals("andy", result[1]['name']) + assertEquals('["football", "basketball"]', result[1]['hobbies']) + assertEquals("andy", result[2]['name']) + assertEquals('["football", "basketball"]', result[2]['hobbies']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(evaluation, 'andy is so nice')" + assertEquals(3, result.size()) + assertEquals("andy", result[0]['name']) + assertEquals('["andy has a good heart", "andy is so nice"]', result[0]['evaluation']) + assertEquals("andy", result[1]['name']) + assertEquals('["andy has a good heart", "andy is so nice"]', result[1]['evaluation']) + assertEquals("andy", result[2]['name']) + assertEquals('["andy has a good heart", "andy is so nice"]', result[2]['evaluation']) + } else { + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(scores, 79)" + assertEquals(0, result.size()) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(hobbies, 'football')" + assertEquals(0, result.size()) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(evaluation, 'andy is so nice')" + assertEquals(0, result.size()) + } + } + + // define debug points array + def debug_points = [ + "inverted_index_parser.get_parser_stopwords_from_properties", + "CharFilterFactory::create_return_nullptr", + "InvertedIndexFileWriter::open_local_fs_exists_error", + "InvertedIndexFileWriter::open_local_fs_exists_true", + "InvertedIndexFileWriter::delete_index_index_meta_nullptr", + "InvertedIndexFileWriter::delete_index_indices_dirs_reach_end", + "InvertedIndexFileWriter::copyFile_openInput_error", + "InvertedIndexFileWriter::copyFile_remainder_is_not_zero", + "InvertedIndexFileWriter::copyFile_diff_not_equals_length", + "InvertedIndexFileWriter::write_v1_ram_output_is_nullptr", + "InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr", + "FSIndexInput::~SharedHandle_reader_close_error", + "DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error", + "DorisFSDirectory::FSIndexInput::readInternal_bytes_read_error", + "DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init", + "DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor", + "DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer", + "DorisFSDirectory::FSIndexOutput::flushBuffer_writer_is_nullptr", + "DorisFSDirectory::FSIndexOutput::flushBuffer_b_is_nullptr", + "DorisFSDirectory::FSIndexOutput.set_writer_nullptr", + "DorisFSDirectory::FSIndexOutput._set_writer_close_status_error", + "DorisFSDirectory::FSIndexOutputV2::flushBuffer_file_writer_is_nullptr", + "DorisFSDirectory::FSIndexOutputV2::flushBuffer_b_is_nullptr", + "DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close", + "DorisFSDirectory::list_status_is_not_ok", + "DorisFSDirectory::list_directory_not_exists", + "DorisFSDirectory::fileExists_status_is_not_ok", + "DorisFSDirectory::touchFile_status_is_not_ok", + "DorisFSDirectory::fileLength_status_is_not_ok", + //"DorisFSDirectory::close_close_with_error", // will block the process, off now + "DorisFSDirectory::doDeleteFile_status_is_not_ok", + "DorisFSDirectory::deleteDirectory_throw_is_not_directory", + "DorisFSDirectory::renameFile_exists_status_is_not_ok", + "DorisFSDirectory::renameFile_delete_status_is_not_ok", + "DorisFSDirectory::renameFile_rename_status_is_not_ok", + "DorisFSDirectory::createOutput_exists_status_is_not_ok", + "DorisFSDirectory::createOutput_delete_status_is_not_ok", + "DorisFSDirectory::createOutput_exists_after_delete_status_is_not_ok", + "DorisFSDirectory::createOutput_exists_after_delete_error", + "DorisRAMFSDirectory::fileModified_file_not_found", + "DorisRAMFSDirectory::touchFile_file_not_found", + "DorisRAMFSDirectory::fileLength_file_not_found", + "DorisRAMFSDirectory::openInput_file_not_found", + "DorisRAMFSDirectory::close_close_with_error", + "DorisRAMFSDirectory::createOutput_itr_filesMap_end", + "DorisFSDirectoryFactory::getDirectory_file_is_nullptr", + "DorisFSDirectoryFactory::getDirectory_exists_status_is_not_ok", + "DorisFSDirectoryFactory::getDirectory_create_directory_status_is_not_ok", + "InvertedIndexColumnWriter::init_field_type_not_supported", + "InvertedIndexColumnWriter::init_inverted_index_writer_init_error", + "InvertedIndexColumnWriter::close_on_error_throw_exception", + "InvertedIndexColumnWriter::init_bkd_index_throw_error", + "InvertedIndexColumnWriter::create_chinese_analyzer_throw_error", + "InvertedIndexColumnWriter::open_index_directory_error", + "InvertedIndexColumnWriter::create_index_writer_setRAMBufferSizeMB_error", + "InvertedIndexColumnWriter::create_index_writer_setMaxBufferedDocs_error", + "InvertedIndexColumnWriter::create_index_writer_setMergeFactor_error", + "InvertedIndexColumnWriterImpl::add_document_throw_error", + "InvertedIndexColumnWriterImpl::add_null_document_throw_error", + "InvertedIndexColumnWriterImpl::add_nulls_field_nullptr", + "InvertedIndexColumnWriterImpl::add_nulls_index_writer_nullptr", + "InvertedIndexColumnWriterImpl::new_char_token_stream__char_string_reader_init_error", + "InvertedIndexColumnWriterImpl::add_values_field_is_nullptr", + "InvertedIndexColumnWriterImpl::add_values_index_writer_is_nullptr", + "InvertedIndexColumnWriterImpl::add_array_values_count_is_zero", + "InvertedIndexColumnWriterImpl::add_array_values_index_writer_is_nullptr", + "InvertedIndexColumnWriterImpl::add_array_values_create_field_error", + "InvertedIndexColumnWriterImpl::add_array_values_create_field_error_2", + "InvertedIndexColumnWriterImpl::add_array_values_field_is_nullptr", + "InvertedIndexWriter._throw_clucene_error_in_fulltext_writer_close", + "InvertedIndexColumnWriter::create_array_typeinfo_is_nullptr", + "InvertedIndexColumnWriter::create_unsupported_type_for_inverted_index" + ] + + def inverted_index_storage_format = ["v1", "v2"] + + try { + String backend_id; + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "inverted_index_ram_dir_enable") { + invertedIndexCompactionEnable = Boolean.parseBoolean(((List<String>) ele)[2]) + logger.info("inverted_index_ram_dir_enable: ${((List<String>) ele)[2]}") + } + } + set_be_config.call("inverted_index_ram_dir_enable", "false") + has_update_be_config = true + // check updated config + check_config.call("inverted_index_ram_dir_enable", "false"); + inverted_index_storage_format.each { format -> + def tableName = "${tableNamePrefix}_${format}" + creata_table("${tableName}", format) + + // for each debug point, enable it, run the insert, check the count, and disable the debug point + // catch any exceptions and disable the debug point + debug_points.each { debug_point -> + try { + GetDebugPoint().enableDebugPointForAllBEs(debug_point) + run_insert("${tableName}") + check_count("${tableName}", 6) + // if debug_point equals InvertedIndexColumnWriterImpl::add_array_values_count_is_zero, run_select(false) + // else run_select(true) + if (debug_point == "InvertedIndexColumnWriterImpl::add_array_values_count_is_zero") { + run_select("${tableName}", false) + } else { + run_select("${tableName}", true) + } + sql "TRUNCATE TABLE ${tableName}" + } catch (Exception e) { + log.error("Caught exception: ${e}") + check_count("${tableName}", 0) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(debug_point) + } + } + } + } finally { + if (has_update_be_config) { + set_be_config.call("inverted_index_ram_dir_enable", inverted_index_ram_dir_enable.toString()) + } + sql "SET global enable_match_without_inverted_index = true" + } + +} diff --git a/regression-test/suites/fault_injection_p2/test_build_index_exception_fault_injection.groovy b/regression-test/suites/fault_injection_p2/test_build_index_exception_fault_injection.groovy new file mode 100644 index 00000000000..94cf4d6c15c --- /dev/null +++ b/regression-test/suites/fault_injection_p2/test_build_index_exception_fault_injection.groovy @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_build_index_exception_fault_injection", "nonConcurrent,p2") { + if (isCloudMode()) { + return + } + def tableNamePrefix = "test_build_index_exception_fault_injection" + + def changed_variables = sql "show variables where Changed = 1" + logger.info("changed variables: " + changed_variables.toString()) + // sql "UNSET GLOBAL VARIABLE ALL;" + sql "SET global enable_match_without_inverted_index = false" + + // prepare test table + def timeout = 60000 + def delta_time = 1000 + def alter_res = "null" + def useTime = 0 + + def wait_for_latest_op_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${table_name}" ORDER BY CreateTime DESC LIMIT 1;""" + alter_res = alter_res.toString() + if(alter_res.contains("FINISHED")) { + sleep(3000) // wait change table state to normal + logger.info(table_name + " latest alter job finished, detail: " + alter_res) + break + } + useTime = t + sleep(delta_time) + } + assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout") + } + + def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + if (alter_res.size() > 0) { + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") { + logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res[alter_res.size()-1]) + return last_job_state; + } + } + useTime = t + sleep(delta_time) + } + logger.info("wait_for_last_build_index_on_table_finish debug: " + alter_res) + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout") + return "wait_timeout" + } + + def creata_table = { String tableName, String format -> + sql "DROP TABLE IF EXISTS ${tableName}" + + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NULL, + `name` varchar(255) NULL, + `age` int(11) NULL, + `scores` array<int> NULL, + `hobbies` array<text> NULL, + `description` text NULL, + `evaluation` array<text> NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "inverted_index_storage_format" = "${format}", + "disable_auto_compaction" = "true" + ); + """ + } + + def create_index = { String tableName -> + sql "CREATE INDEX idx_name ON ${tableName} (name) USING INVERTED" + wait_for_latest_op_on_table_finish("${tableName}", timeout) + sql "CREATE INDEX idx_age ON ${tableName} (age) USING INVERTED" + wait_for_latest_op_on_table_finish("${tableName}", timeout) + sql "CREATE INDEX idx_scores ON ${tableName} (scores) USING INVERTED" + wait_for_latest_op_on_table_finish("${tableName}", timeout) + sql "CREATE INDEX idx_hobbies ON ${tableName} (hobbies) USING INVERTED" + wait_for_latest_op_on_table_finish("${tableName}", timeout) + sql "CREATE INDEX idx_description ON ${tableName} (description) USING INVERTED properties(\"parser\" = \"english\")" + wait_for_latest_op_on_table_finish("${tableName}", timeout) + sql "CREATE INDEX idx_evaluation ON ${tableName} (evaluation) USING INVERTED" + wait_for_latest_op_on_table_finish("${tableName}", timeout) + } + + def build_index = { String tableName -> + sql "BUILD INDEX idx_name ON ${tableName}" + wait_for_last_build_index_on_table_finish("${tableName}", timeout) + sql "BUILD INDEX idx_age ON ${tableName}" + wait_for_last_build_index_on_table_finish("${tableName}", timeout) + sql "BUILD INDEX idx_scores ON ${tableName}" + wait_for_last_build_index_on_table_finish("${tableName}", timeout) + sql "BUILD INDEX idx_hobbies ON ${tableName}" + wait_for_last_build_index_on_table_finish("${tableName}", timeout) + sql "BUILD INDEX idx_description ON ${tableName}" + wait_for_last_build_index_on_table_finish("${tableName}", timeout) + sql "BUILD INDEX idx_evaluation ON ${tableName}" + wait_for_last_build_index_on_table_finish("${tableName}", timeout) + } + + def run_insert = { String tableName -> + sql """ INSERT INTO ${tableName} VALUES (1, "andy", 10, [89, 80, 98], ["football", "basketball"], "andy is good at sports", ["andy has a good heart", "andy is so nice"]); """ + sql """ INSERT INTO ${tableName} VALUES (1, "bason", 11, [79, 85, 97], ["singing", "dancing"], "bason is good at singing", ["bason is very clever", "bason is very healthy"]); """ + sql """ INSERT INTO ${tableName} VALUES (2, "andy", 10, [89, 80, 98], ["football", "basketball"], "andy is good at sports", ["andy has a good heart", "andy is so nice"]); """ + sql """ INSERT INTO ${tableName} VALUES (2, "bason", 11, [79, 85, 97], ["singing", "dancing"], "bason is good at singing", ["bason is very clever", "bason is very healthy"]); """ + sql """ INSERT INTO ${tableName} VALUES (3, "andy", 10, [89, 80, 98], ["football", "basketball"], "andy is good at sports", ["andy has a good heart", "andy is so nice"]); """ + sql """ INSERT INTO ${tableName} VALUES (3, "bason", 11, [79, 85, 97], ["singing", "dancing"], "bason is good at singing", ["bason is very clever", "bason is very healthy"]); """ + } + + def check_count = { String tableName, int count -> + def result = sql "SELECT COUNT(*) FROM ${tableName}" + assertEquals(count, result[0][0]) + } + + def run_select = { String tableName, boolean normal -> + def result + + if (normal) { + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE name MATCH 'andy'" + assertEquals(3, result.size()) + assertEquals(1, result[0]['id']) + assertEquals("andy", result[0]['name']) + assertEquals(2, result[1]['id']) + assertEquals("andy", result[1]['name']) + assertEquals(3, result[2]['id']) + assertEquals("andy", result[2]['name']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE description MATCH 'sports'" + assertEquals(3, result.size()) + assertEquals("andy", result[0]['name']) + assertEquals("andy is good at sports", result[0]['description']) + assertEquals("andy", result[1]['name']) + assertEquals("andy is good at sports", result[1]['description']) + assertEquals("andy", result[2]['name']) + assertEquals("andy is good at sports", result[2]['description']) + } else { + try { + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE name MATCH 'andy'" + assertTrue(0, result.size()) + } catch (Exception e) { + log.error("Caught exception: ${e}") + assertContains(e.toString(), "[E-6001]match_any not support execute_match") + } + try { + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE description MATCH 'sports'" + assertTrue(0, result.size()) + } catch (Exception e) { + log.error("Caught exception: ${e}") + assertContains(e.toString(), "[E-6001]match_any not support execute_match") + } + } + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE age < 11" + assertEquals(3, result.size()) + assertEquals("andy", result[0]['name']) + assertEquals(2, result[1]['id']) + assertEquals("andy", result[1]['name']) + assertEquals(3, result[2]['id']) + assertEquals("andy", result[2]['name']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(scores, 79)" + assertEquals(3, result.size()) + assertEquals("bason", result[0]['name']) + assertEquals("[79, 85, 97]", result[0]['scores']) + assertEquals("bason", result[1]['name']) + assertEquals("[79, 85, 97]", result[1]['scores']) + assertEquals("bason", result[2]['name']) + assertEquals("[79, 85, 97]", result[2]['scores']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(hobbies, 'football')" + assertEquals(3, result.size()) + assertEquals("andy", result[0]['name']) + assertEquals('["football", "basketball"]', result[0]['hobbies']) + assertEquals("andy", result[1]['name']) + assertEquals('["football", "basketball"]', result[1]['hobbies']) + assertEquals("andy", result[2]['name']) + assertEquals('["football", "basketball"]', result[2]['hobbies']) + + result = sql_return_maparray "SELECT * FROM ${tableName} WHERE array_contains(evaluation, 'andy is so nice')" + assertEquals(3, result.size()) + assertEquals("andy", result[0]['name']) + assertEquals('["andy has a good heart", "andy is so nice"]', result[0]['evaluation']) + assertEquals("andy", result[1]['name']) + assertEquals('["andy has a good heart", "andy is so nice"]', result[1]['evaluation']) + assertEquals("andy", result[2]['name']) + assertEquals('["andy has a good heart", "andy is so nice"]', result[2]['evaluation']) + } + + // define debug points array + def debug_points = [ + "IndexBuilder::update_inverted_index_info_is_local_rowset", + "IndexBuilder::update_inverted_index_info_size_st_not_ok", + "IndexBuilder::update_inverted_index_info_index_file_reader_init_not_ok", + "IndexBuilder::handle_single_rowset_is_local_rowset", + "IndexBuilder::handle_single_rowset_can_not_find_reader_drop_op", + "IndexBuilder::handle_single_rowset_can_not_find_reader", + "IndexBuilder::handle_single_rowset_support_inverted_index", + "IndexBuilder::handle_single_rowset_index_column_writer_create_error", + "IndexBuilder::handle_single_rowset_create_iterator_error", + "IndexBuilder::handle_single_rowset", + "IndexBuilder::handle_single_rowset_iterator_next_batch_error", + "IndexBuilder::handle_single_rowset_write_inverted_index_data_error", + "IndexBuilder::handle_single_rowset_index_build_finish_error", + "IndexBuilder::handle_single_rowset_file_writer_close_error", + // "IndexBuilder::_write_inverted_index_data_column_idx_is_negative" // skip build index + "IndexBuilder::_write_inverted_index_data_convert_column_data_error", + "IndexBuilder::_add_nullable_add_array_values_error", + "IndexBuilder::_add_nullable_throw_exception", + "IndexBuilder::_add_data_throw_exception", + "IndexBuilder::do_build_inverted_index_alter_inverted_indexes_empty", + "IndexBuilder::do_build_inverted_index_modify_rowsets_status_error", + "IndexBuilder::gc_output_rowset_is_local_rowset" + ] + + def inverted_index_storage_format = ["v1", "v2"] + inverted_index_storage_format.each { format -> + def tableName = "${tableNamePrefix}_${format}" + + // for each debug point, enable it, run the insert, check the count, and disable the debug point + // catch any exceptions and disable the debug point + debug_points.each { debug_point -> + try { + GetDebugPoint().enableDebugPointForAllBEs(debug_point) + creata_table("${tableName}", format) + run_insert("${tableName}") + create_index("${tableName}") + build_index("${tableName}") + check_count("${tableName}", 6) + run_select("${tableName}", false) + } catch (Exception e) { + log.error("Caught exception: ${e}") + } finally { + GetDebugPoint().disableDebugPointForAllBEs(debug_point) + } + } + } + + sql "SET global enable_match_without_inverted_index = true" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org