csun5285 commented on code in PR #42328: URL: https://github.com/apache/doris/pull/42328#discussion_r1839673652
########## be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp: ########## @@ -256,120 +256,32 @@ Status InvertedIndexFileWriter::write_v1() { const int64_t index_id = entry.first.first; const auto& index_suffix = entry.first.second; try { - const auto& directory = entry.second; - std::vector<std::string> files; - directory->list(&files); - // remove write.lock file - auto it = std::find(files.begin(), files.end(), DorisFSDirectory::WRITE_LOCK_FILE); - if (it != files.end()) { - files.erase(it); - } + const auto& directory = entry.second.get(); - std::vector<FileInfo> sorted_files; - for (auto file : files) { - FileInfo file_info; - file_info.filename = file; - file_info.filesize = directory->fileLength(file.c_str()); - sorted_files.emplace_back(std::move(file_info)); - } - sort_files(sorted_files); - - int32_t file_count = sorted_files.size(); - - io::Path cfs_path(InvertedIndexDescriptor::get_index_file_path_v1( - _index_path_prefix, index_id, index_suffix)); - auto idx_path = cfs_path.parent_path(); - std::string idx_name = cfs_path.filename(); - // write file entries to ram directory to get header length - lucene::store::RAMDirectory ram_dir; - auto* out_idx = ram_dir.createOutput(idx_name.c_str()); - DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_ram_output_is_nullptr", - { out_idx = nullptr; }) - if (out_idx == nullptr) { - LOG(WARNING) << "Write compound file error: RAMDirectory output is nullptr."; - _CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error"); - } + // Prepare sorted file list + auto sorted_files = prepare_sorted_files(directory); + + // Calculate header length + auto [header_length, header_file_count] = + calculate_header_length(sorted_files, directory); + + // Create output stream + auto [out_dir, output] = create_output_stream_v1(index_id, index_suffix); - std::unique_ptr<lucene::store::IndexOutput> ram_output(out_idx); - ram_output->writeVInt(file_count); - // write file entries in ram directory - // number of files, which data are in header - int header_file_count = 0; - int64_t header_file_length = 0; - const int64_t buffer_length = 16384; - uint8_t ram_buffer[buffer_length]; - for (auto file : sorted_files) { - ram_output->writeString(file.filename); // file name - ram_output->writeLong(0); // data offset - ram_output->writeLong(file.filesize); // file length - header_file_length += file.filesize; - if (header_file_length <= DorisFSDirectory::MAX_HEADER_DATA_SIZE) { - copyFile(file.filename.c_str(), directory.get(), ram_output.get(), ram_buffer, - buffer_length); - header_file_count++; - } - } - auto header_len = ram_output->getFilePointer(); - ram_output->close(); - ram_dir.deleteFile(idx_name.c_str()); - ram_dir.close(); - - auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs, idx_path.c_str()); - out_dir->set_file_writer_opts(_opts); - - auto* out = out_dir->createOutput(idx_name.c_str()); - DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr", - { out = nullptr; }); - if (out == nullptr) { - LOG(WARNING) << "Write compound file error: CompoundDirectory output is nullptr."; - _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error"); - } - std::unique_ptr<lucene::store::IndexOutput> output(out); size_t start = output->getFilePointer(); - output->writeVInt(file_count); - // write file entries - int64_t data_offset = header_len; - uint8_t header_buffer[buffer_length]; - for (int i = 0; i < sorted_files.size(); ++i) { - auto file = sorted_files[i]; - output->writeString(file.filename); // FileName - // DataOffset - if (i < header_file_count) { - // file data write in header, so we set its offset to -1. - output->writeLong(-1); - } else { - output->writeLong(data_offset); - } - output->writeLong(file.filesize); // FileLength - if (i < header_file_count) { - // append data - copyFile(file.filename.c_str(), directory.get(), output.get(), header_buffer, - buffer_length); - } else { - data_offset += file.filesize; - } - } - // write rest files' data - uint8_t data_buffer[buffer_length]; - for (int i = header_file_count; i < sorted_files.size(); ++i) { - auto file = sorted_files[i]; - copyFile(file.filename.c_str(), directory.get(), output.get(), data_buffer, - buffer_length); - } - out_dir->close(); - // NOTE: need to decrease ref count, but not to delete here, - // because index cache may get the same directory from DIRECTORIES - _CLDECDELETE(out_dir) + // Write header and data + write_header_and_data_v1(output.get(), sorted_files, directory, header_length, + header_file_count); + + // Close and clean up + finalize_output_dir(out_dir); + + // Collect file information auto compound_file_size = output->getFilePointer() - start; output->close(); - //LOG(INFO) << (idx_path / idx_name).c_str() << " size:" << compound_file_size; total_size += compound_file_size; - InvertedIndexFileInfo_IndexInfo index_info; - index_info.set_index_id(index_id); - index_info.set_index_suffix(index_suffix); - index_info.set_index_file_size(compound_file_size); - auto* new_index_info = _file_info.add_index_info(); - *new_index_info = index_info; + add_index_info(index_id, index_suffix, compound_file_size); + } catch (CLuceneError& err) { auto index_path = InvertedIndexDescriptor::get_index_file_path_v1( Review Comment: finalize_output_dir(out_dir); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org