This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c90825da938 [refactor](spill) Delete not used code (#36218) c90825da938 is described below commit c90825da93856e9c82529e6d53e2eb9d00c2d8ba Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Thu Jun 13 16:32:14 2024 +0800 [refactor](spill) Delete not used code (#36218) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/pipeline/exec/aggregation_sink_operator.h | 1 - be/src/runtime/block_spill_manager.cpp | 147 ------- be/src/runtime/block_spill_manager.h | 64 --- be/src/runtime/exec_env.h | 3 - be/src/runtime/exec_env_init.cpp | 5 - be/src/vec/core/block_spill_reader.cpp | 161 ------- be/src/vec/core/block_spill_reader.h | 86 ---- be/src/vec/core/block_spill_writer.cpp | 161 ------- be/src/vec/core/block_spill_writer.h | 95 ----- be/test/vec/core/block_spill_test.cpp | 507 ----------------------- 10 files changed, 1230 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index add6712453f..10a81199140 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -20,7 +20,6 @@ #include <stdint.h> #include "pipeline/exec/operator.h" -#include "runtime/block_spill_manager.h" #include "runtime/exec_env.h" namespace doris::pipeline { diff --git a/be/src/runtime/block_spill_manager.cpp b/be/src/runtime/block_spill_manager.cpp deleted file mode 100644 index 8c2438f19b3..00000000000 --- a/be/src/runtime/block_spill_manager.cpp +++ /dev/null @@ -1,147 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/block_spill_manager.h" - -#include <fmt/format.h> -#include <glog/logging.h> - -#include <algorithm> -#include <boost/uuid/random_generator.hpp> -#include <boost/uuid/uuid_io.hpp> -#include <numeric> -#include <random> - -#include "io/fs/file_system.h" -#include "io/fs/local_file_system.h" -#include "util/time.h" -#include "vec/core/block_spill_reader.h" -#include "vec/core/block_spill_writer.h" - -namespace doris { -static const std::string BLOCK_SPILL_DIR = "spill"; -static const std::string BLOCK_SPILL_GC_DIR = "spill_gc"; -BlockSpillManager::BlockSpillManager(const std::vector<StorePath>& paths) : _store_paths(paths) {} - -Status BlockSpillManager::init() { - for (const auto& path : _store_paths) { - auto dir = fmt::format("{}/{}", path.path, BLOCK_SPILL_GC_DIR); - bool exists = true; - RETURN_IF_ERROR(io::global_local_filesystem()->exists(dir, &exists)); - if (!exists) { - RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir)); - } - - dir = fmt::format("{}/{}", path.path, BLOCK_SPILL_DIR); - RETURN_IF_ERROR(io::global_local_filesystem()->exists(dir, &exists)); - if (!exists) { - RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir)); - } else { - auto suffix = ToStringFromUnixMillis(UnixMillis()); - auto gc_dir = fmt::format("{}/{}/{}", path.path, BLOCK_SPILL_GC_DIR, suffix); - RETURN_IF_ERROR(io::global_local_filesystem()->rename(dir, gc_dir)); - RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir)); - } - } - - return Status::OK(); -} - -void BlockSpillManager::gc(int64_t max_file_count) { - if (max_file_count < 1) { - return; - } - bool exists = true; - int64_t count = 0; - for (const auto& path : _store_paths) { - std::string gc_root_dir = fmt::format("{}/{}", path.path, BLOCK_SPILL_GC_DIR); - - std::error_code ec; - exists = std::filesystem::exists(gc_root_dir, ec); - if (ec || !exists) { - continue; - } - std::vector<io::FileInfo> dirs; - auto st = io::global_local_filesystem()->list(gc_root_dir, false, &dirs, &exists); - if (!st.ok()) { - continue; - } - for (const auto& dir : dirs) { - if (dir.is_file) { - continue; - } - std::string abs_dir = fmt::format("{}/{}", gc_root_dir, dir.file_name); - std::vector<io::FileInfo> files; - st = io::global_local_filesystem()->list(abs_dir, true, &files, &exists); - if (!st.ok()) { - continue; - } - if (files.empty()) { - static_cast<void>(io::global_local_filesystem()->delete_directory(abs_dir)); - if (count++ == max_file_count) { - return; - } - continue; - } - for (const auto& file : files) { - auto abs_file_path = fmt::format("{}/{}", abs_dir, file.file_name); - static_cast<void>(io::global_local_filesystem()->delete_file(abs_file_path)); - if (count++ == max_file_count) { - return; - } - } - } - } -} - -Status BlockSpillManager::get_writer(int32_t batch_size, vectorized::BlockSpillWriterUPtr& writer, - RuntimeProfile* profile) { - int64_t id; - std::vector<int> indices(_store_paths.size()); - std::iota(indices.begin(), indices.end(), 0); - std::shuffle(indices.begin(), indices.end(), std::mt19937 {std::random_device {}()}); - - std::string path = _store_paths[indices[0]].path + "/" + BLOCK_SPILL_DIR; - std::string unique_name = boost::uuids::to_string(boost::uuids::random_generator()()); - path += "/" + unique_name; - { - std::lock_guard<std::mutex> l(lock_); - id = id_++; - id_to_file_paths_[id] = path; - } - - writer.reset(new vectorized::BlockSpillWriter(id, batch_size, path, profile)); - return writer->open(); -} - -Status BlockSpillManager::get_reader(int64_t stream_id, vectorized::BlockSpillReaderUPtr& reader, - RuntimeProfile* profile, bool delete_after_read) { - std::string path; - { - std::lock_guard<std::mutex> l(lock_); - CHECK(id_to_file_paths_.end() != id_to_file_paths_.find(stream_id)); - path = id_to_file_paths_[stream_id]; - } - reader.reset(new vectorized::BlockSpillReader(stream_id, path, profile, delete_after_read)); - return reader->open(); -} - -void BlockSpillManager::remove(int64_t stream_id) { - std::lock_guard<std::mutex> l(lock_); - id_to_file_paths_.erase(stream_id); -} -} // namespace doris diff --git a/be/src/runtime/block_spill_manager.h b/be/src/runtime/block_spill_manager.h deleted file mode 100644 index 5601d58aed1..00000000000 --- a/be/src/runtime/block_spill_manager.h +++ /dev/null @@ -1,64 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <stdint.h> - -#include <memory> -#include <mutex> -#include <string> -#include <unordered_map> -#include <vector> - -#include "common/status.h" -#include "olap/options.h" -#include "vec/core/block_spill_reader.h" -#include "vec/core/block_spill_writer.h" - -namespace doris { -class RuntimeProfile; - -namespace vectorized { - -using BlockSpillWriterUPtr = std::unique_ptr<BlockSpillWriter>; -using BlockSpillReaderUPtr = std::unique_ptr<BlockSpillReader>; -} // namespace vectorized - -class BlockSpillManager { -public: - BlockSpillManager(const std::vector<StorePath>& paths); - - Status init(); - - Status get_writer(int32_t batch_size, vectorized::BlockSpillWriterUPtr& writer, - RuntimeProfile* profile); - - Status get_reader(int64_t stream_id, vectorized::BlockSpillReaderUPtr& reader, - RuntimeProfile* profile, bool delete_after_read = true); - - void remove(int64_t streamid_); - - void gc(int64_t max_file_count); - -private: - std::vector<StorePath> _store_paths; - std::mutex lock_; - int64_t id_ = 0; - std::unordered_map<int64_t, std::string> id_to_file_paths_; -}; -} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index ef6671d96bc..2bba483249d 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -81,7 +81,6 @@ class LoadStreamMapPool; class StreamLoadExecutor; class RoutineLoadTaskExecutor; class SmallFileMgr; -class BlockSpillManager; class BackendServiceClient; class TPaloBrokerServiceClient; class PBackendService_Stub; @@ -218,7 +217,6 @@ public: LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; } std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return _new_load_stream_mgr; } SmallFileMgr* small_file_mgr() { return _small_file_mgr; } - BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; } doris::vectorized::SpillStreamManager* spill_stream_mgr() { return _spill_stream_mgr; } GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; } @@ -395,7 +393,6 @@ private: HeartbeatFlags* _heartbeat_flags = nullptr; vectorized::ScannerScheduler* _scanner_scheduler = nullptr; - BlockSpillManager* _block_spill_mgr = nullptr; // To save meta info of external file, such as parquet footer. FileMetaCache* _file_meta_cache = nullptr; std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 8d654c8d09b..50ef300412c 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -57,7 +57,6 @@ #include "pipeline/pipeline_tracing.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" -#include "runtime/block_spill_manager.h" #include "runtime/broker_mgr.h" #include "runtime/cache/result_cache.h" #include "runtime/client_cache.h" @@ -293,7 +292,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, _routine_load_task_executor = new RoutineLoadTaskExecutor(this); RETURN_IF_ERROR(_routine_load_task_executor->init()); _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); - _block_spill_mgr = new BlockSpillManager(store_paths); _group_commit_mgr = new GroupCommitMgr(this); _memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>(); _load_stream_map_pool = std::make_unique<LoadStreamMapPool>(); @@ -571,8 +569,6 @@ Status ExecEnv::_init_mem_env() { << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES) << ", origin config value: " << config::inverted_index_query_cache_limit; - RETURN_IF_ERROR(_block_spill_mgr->init()); - return Status::OK(); } @@ -676,7 +672,6 @@ void ExecEnv::destroy() { SAFE_DELETE(_load_channel_mgr); SAFE_DELETE(_spill_stream_mgr); - SAFE_DELETE(_block_spill_mgr); SAFE_DELETE(_inverted_index_query_cache); SAFE_DELETE(_inverted_index_searcher_cache); SAFE_DELETE(_lookup_connection_cache); diff --git a/be/src/vec/core/block_spill_reader.cpp b/be/src/vec/core/block_spill_reader.cpp deleted file mode 100644 index fe1ee13e830..00000000000 --- a/be/src/vec/core/block_spill_reader.cpp +++ /dev/null @@ -1,161 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/core/block_spill_reader.h" - -#include <gen_cpp/Types_types.h> -#include <gen_cpp/data.pb.h> -#include <glog/logging.h> -#include <unistd.h> - -#include <algorithm> - -#include "io/file_factory.h" -#include "io/fs/file_reader.h" -#include "io/fs/local_file_system.h" -#include "runtime/block_spill_manager.h" -#include "runtime/exec_env.h" -#include "util/slice.h" -#include "vec/core/block.h" - -namespace doris { -namespace io { -class FileSystem; -} // namespace io - -namespace vectorized { -void BlockSpillReader::_init_profile() { - read_time_ = ADD_TIMER(profile_, "ReadTime"); - deserialize_time_ = ADD_TIMER(profile_, "DeserializeTime"); - read_bytes_ = ADD_COUNTER(profile_, "ReadBytes", TUnit::BYTES); - read_block_num_ = ADD_COUNTER(profile_, "ReadBlockNum", TUnit::UNIT); -} - -Status BlockSpillReader::open() { - io::FileSystemProperties system_properties; - system_properties.system_type = TFileType::FILE_LOCAL; - - io::FileDescription file_description; - file_description.path = file_path_; - file_reader_ = DORIS_TRY(FileFactory::create_file_reader(system_properties, file_description, - io::FileReaderOptions::DEFAULT)); - - size_t file_size = file_reader_->size(); - - Slice result((char*)&block_count_, sizeof(size_t)); - - // read block count - size_t bytes_read = 0; - RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result, &bytes_read)); - - // read max sub block size - result.data = (char*)&max_sub_block_size_; - RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2, result, &bytes_read)); - - size_t buff_size = std::max(block_count_ * sizeof(size_t), max_sub_block_size_); - read_buff_.reset(new char[buff_size]); - - // read block start offsets - size_t read_offset = file_size - (block_count_ + 2) * sizeof(size_t); - result.data = read_buff_.get(); - result.size = block_count_ * sizeof(size_t); - - RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read)); - DCHECK(bytes_read == block_count_ * sizeof(size_t)); - - block_start_offsets_.resize(block_count_ + 1); - for (size_t i = 0; i < block_count_; ++i) { - block_start_offsets_[i] = *(size_t*)(result.data + i * sizeof(size_t)); - } - block_start_offsets_[block_count_] = file_size - (block_count_ + 2) * sizeof(size_t); - - return Status::OK(); -} - -void BlockSpillReader::seek(size_t block_index) { - DCHECK(file_reader_ != nullptr); - DCHECK_LT(block_index, block_count_); - read_block_index_ = block_index; -} - -// The returned block is owned by BlockSpillReader and is -// destroyed when reading next block. -Status BlockSpillReader::read(Block* block, bool* eos) { - DCHECK(file_reader_); - block->clear(); - - if (read_block_index_ >= block_count_) { - *eos = true; - return Status::OK(); - } - - size_t bytes_to_read = - block_start_offsets_[read_block_index_ + 1] - block_start_offsets_[read_block_index_]; - - if (bytes_to_read == 0) { - ++read_block_index_; - COUNTER_UPDATE(read_block_num_, 1); - return Status::OK(); - } - Slice result(read_buff_.get(), bytes_to_read); - - size_t bytes_read = 0; - - { - SCOPED_TIMER(read_time_); - RETURN_IF_ERROR(file_reader_->read_at(block_start_offsets_[read_block_index_], result, - &bytes_read)); - } - DCHECK(bytes_read == bytes_to_read); - COUNTER_UPDATE(read_bytes_, bytes_read); - COUNTER_UPDATE(read_block_num_, 1); - - if (bytes_read > 0) { - PBlock pb_block; - BlockUPtr new_block = nullptr; - { - SCOPED_TIMER(deserialize_time_); - if (!pb_block.ParseFromArray(result.data, result.size)) { - return Status::InternalError("Failed to read spilled block"); - } - new_block = Block::create_unique(); - RETURN_IF_ERROR(new_block->deserialize(pb_block)); - } - block->swap(*new_block); - } else { - block->clear_column_data(); - } - - ++read_block_index_; - - return Status::OK(); -} - -Status BlockSpillReader::close() { - if (!file_reader_) { - return Status::OK(); - } - ExecEnv::GetInstance()->block_spill_mgr()->remove(stream_id_); - file_reader_.reset(); - if (delete_after_read_) { - static_cast<void>(io::global_local_filesystem()->delete_file(file_path_)); - } - return Status::OK(); -} - -} // namespace vectorized -} // namespace doris diff --git a/be/src/vec/core/block_spill_reader.h b/be/src/vec/core/block_spill_reader.h deleted file mode 100644 index d982d586a1a..00000000000 --- a/be/src/vec/core/block_spill_reader.h +++ /dev/null @@ -1,86 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <stddef.h> -#include <stdint.h> - -#include <memory> -#include <string> -#include <vector> - -#include "common/status.h" -#include "io/fs/file_reader_writer_fwd.h" -#include "util/runtime_profile.h" - -namespace doris { -namespace vectorized { -class Block; - -// Read data spilled to local file. -class BlockSpillReader { -public: - BlockSpillReader(int64_t stream_id, const std::string& file_path, RuntimeProfile* profile, - bool delete_after_read = true) - : stream_id_(stream_id), - file_path_(file_path), - delete_after_read_(delete_after_read), - profile_(profile) { - _init_profile(); - } - - ~BlockSpillReader() { static_cast<void>(close()); } - - Status open(); - - Status close(); - - Status read(Block* block, bool* eos); - - void seek(size_t block_index); - - int64_t get_id() const { return stream_id_; } - - std::string get_path() const { return file_path_; } - - size_t block_count() const { return block_count_; } - -private: - void _init_profile(); - - int64_t stream_id_; - std::string file_path_; - bool delete_after_read_; - io::FileReaderSPtr file_reader_; - - size_t block_count_ = 0; - size_t read_block_index_ = 0; - size_t max_sub_block_size_ = 0; - std::unique_ptr<char[]> read_buff_; - std::vector<size_t> block_start_offsets_; - - RuntimeProfile* profile_ = nullptr; - RuntimeProfile::Counter* read_time_ = nullptr; - RuntimeProfile::Counter* deserialize_time_ = nullptr; - RuntimeProfile::Counter* read_bytes_ = nullptr; - RuntimeProfile::Counter* read_block_num_ = nullptr; -}; - -using BlockSpillReaderUPtr = std::unique_ptr<BlockSpillReader>; -} // namespace vectorized -} // namespace doris diff --git a/be/src/vec/core/block_spill_writer.cpp b/be/src/vec/core/block_spill_writer.cpp deleted file mode 100644 index 92fe34a3eb0..00000000000 --- a/be/src/vec/core/block_spill_writer.cpp +++ /dev/null @@ -1,161 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "vec/core/block_spill_writer.h" - -#include <gen_cpp/Metrics_types.h> -#include <gen_cpp/Types_types.h> -#include <gen_cpp/data.pb.h> -#include <gen_cpp/segment_v2.pb.h> -#include <unistd.h> - -#include <algorithm> - -#include "agent/be_exec_version_manager.h" -#include "io/file_factory.h" -#include "runtime/exec_env.h" -#include "runtime/thread_context.h" -#include "vec/columns/column.h" -#include "vec/core/column_with_type_and_name.h" - -namespace doris { -namespace vectorized { -void BlockSpillWriter::_init_profile() { - write_bytes_counter_ = ADD_COUNTER(profile_, "WriteBytes", TUnit::BYTES); - write_timer_ = ADD_TIMER(profile_, "WriteTime"); - serialize_timer_ = ADD_TIMER(profile_, "SerializeTime"); - write_blocks_num_ = ADD_COUNTER(profile_, "WriteBlockNum", TUnit::UNIT); -} - -Status BlockSpillWriter::open() { - file_writer_ = DORIS_TRY(FileFactory::create_file_writer( - TFileType::FILE_LOCAL, ExecEnv::GetInstance(), {}, {}, file_path_, - { - .write_file_cache = false, - .sync_file_data = false, - })); - is_open_ = true; - return Status::OK(); -} - -Status BlockSpillWriter::close() { - if (!is_open_) { - return Status::OK(); - } - - is_open_ = false; - - tmp_block_.clear_column_data(); - - meta_.append((const char*)&max_sub_block_size_, sizeof(max_sub_block_size_)); - meta_.append((const char*)&written_blocks_, sizeof(written_blocks_)); - - Status status; - // meta: block1 offset, block2 offset, ..., blockn offset, n - { - SCOPED_TIMER(write_timer_); - status = file_writer_->append(meta_); - } - if (!status.ok()) { - unlink(file_path_.c_str()); - return status; - } - - RETURN_IF_ERROR(file_writer_->close()); - file_writer_.reset(); - return Status::OK(); -} - -Status BlockSpillWriter::write(const Block& block) { - auto rows = block.rows(); - // file format: block1, block2, ..., blockn, meta - if (rows <= batch_size_) { - return _write_internal(block); - } else { - if (is_first_write_) { - is_first_write_ = false; - tmp_block_ = block.clone_empty(); - } - - const auto& src_data = block.get_columns_with_type_and_name(); - - for (size_t row_idx = 0; row_idx < rows;) { - tmp_block_.clear_column_data(); - - auto& dst_data = tmp_block_.get_columns_with_type_and_name(); - - size_t block_rows = std::min(rows - row_idx, batch_size_); - RETURN_IF_CATCH_EXCEPTION({ - for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx) { - dst_data[col_idx].column->assume_mutable()->insert_range_from( - *src_data[col_idx].column, row_idx, block_rows); - } - }); - - RETURN_IF_ERROR(_write_internal(tmp_block_)); - - row_idx += block_rows; - } - return Status::OK(); - } -} -Status BlockSpillWriter::_write_internal(const Block& block) { - size_t uncompressed_bytes = 0, compressed_bytes = 0; - size_t written_bytes = 0; - - Status status; - std::string buff; - - if (block.rows() > 0) { - PBlock pblock; - { - SCOPED_TIMER(serialize_timer_); - status = block.serialize(BeExecVersionManager::get_newest_version(), &pblock, - &uncompressed_bytes, &compressed_bytes, - segment_v2::CompressionTypePB::NO_COMPRESSION); - if (!status.ok()) { - unlink(file_path_.c_str()); - return status; - } - pblock.SerializeToString(&buff); - } - - { - SCOPED_TIMER(write_timer_); - status = file_writer_->append(buff); - written_bytes = buff.size(); - } - - if (!status.ok()) { - unlink(file_path_.c_str()); - return status; - } - } - - max_sub_block_size_ = std::max(max_sub_block_size_, written_bytes); - - meta_.append((const char*)&total_written_bytes_, sizeof(size_t)); - COUNTER_UPDATE(write_bytes_counter_, written_bytes); - COUNTER_UPDATE(write_blocks_num_, 1); - total_written_bytes_ += written_bytes; - ++written_blocks_; - - return Status::OK(); -} - -} // namespace vectorized -} // namespace doris diff --git a/be/src/vec/core/block_spill_writer.h b/be/src/vec/core/block_spill_writer.h deleted file mode 100644 index 86533a99966..00000000000 --- a/be/src/vec/core/block_spill_writer.h +++ /dev/null @@ -1,95 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <stddef.h> -#include <stdint.h> - -#include <memory> -#include <string> - -#include "common/status.h" -#include "io/fs/file_writer.h" -#include "util/runtime_profile.h" -#include "vec/core/block.h" - -namespace doris { -namespace vectorized { - -// Write a block to a local file. -// -// The block may be logically splitted to small sub blocks in the single file, -// which can be read back one small block at a time by BlockSpillReader::read. -// -// Split to small blocks is necessary for Sort node, which need to merge multiple -// spilled big sorted blocks into a bigger sorted block. A small block is read from each -// spilled block file each time. -class BlockSpillWriter { -public: - BlockSpillWriter(int64_t id, size_t batch_size, const std::string& file_path, - RuntimeProfile* profile) - : stream_id_(id), batch_size_(batch_size), file_path_(file_path), profile_(profile) { - _init_profile(); - } - - ~BlockSpillWriter() { - if (nullptr != file_writer_ && file_writer_->state() != io::FileWriter::State::CLOSED) { - std::ignore = file_writer_->close(); - } - } - - Status open(); - - Status close(); - - Status write(const Block& block); - - int64_t get_id() const { return stream_id_; } - - size_t get_written_bytes() const { return total_written_bytes_; } - -private: - void _init_profile(); - - Status _write_internal(const Block& block); - -private: - bool is_open_ = false; - int64_t stream_id_; - size_t batch_size_; - size_t max_sub_block_size_ = 0; - std::string file_path_; - std::unique_ptr<doris::io::FileWriter> file_writer_; - - size_t written_blocks_ = 0; - size_t total_written_bytes_ = 0; - std::string meta_; - - bool is_first_write_ = true; - Block tmp_block_; - - RuntimeProfile* profile_ = nullptr; - RuntimeProfile::Counter* write_bytes_counter_ = nullptr; - RuntimeProfile::Counter* serialize_timer_ = nullptr; - RuntimeProfile::Counter* write_timer_ = nullptr; - RuntimeProfile::Counter* write_blocks_num_ = nullptr; -}; - -using BlockSpillWriterUPtr = std::unique_ptr<BlockSpillWriter>; -} // namespace vectorized -} // namespace doris diff --git a/be/test/vec/core/block_spill_test.cpp b/be/test/vec/core/block_spill_test.cpp deleted file mode 100644 index af30479e10a..00000000000 --- a/be/test/vec/core/block_spill_test.cpp +++ /dev/null @@ -1,507 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <gen_cpp/PaloInternalService_types.h> -#include <gtest/gtest-message.h> -#include <gtest/gtest-test-part.h> -#include <stdint.h> -#include <unistd.h> - -#include <cmath> -#include <iostream> -#include <memory> -#include <string> -#include <utility> -#include <vector> - -#include "common/status.h" -#include "gtest/gtest_pred_impl.h" -#include "io/fs/local_file_system.h" -#include "olap/options.h" -#include "runtime/block_spill_manager.h" -#include "runtime/exec_env.h" -#include "runtime/runtime_state.h" -#include "util/bitmap_value.h" -#include "vec/columns/column.h" -#include "vec/columns/column_complex.h" -#include "vec/columns/column_decimal.h" -#include "vec/columns/column_nullable.h" -#include "vec/columns/column_string.h" -#include "vec/columns/column_vector.h" -#include "vec/common/string_ref.h" -#include "vec/core/block.h" -#include "vec/core/block_spill_reader.h" -#include "vec/core/block_spill_writer.h" -#include "vec/core/column_with_type_and_name.h" -#include "vec/core/types.h" -#include "vec/data_types/data_type.h" -#include "vec/data_types/data_type_bitmap.h" -#include "vec/data_types/data_type_decimal.h" -#include "vec/data_types/data_type_nullable.h" -#include "vec/data_types/data_type_number.h" -#include "vec/data_types/data_type_string.h" - -namespace doris { -class RuntimeProfile; - -static const uint32_t MAX_PATH_LEN = 1024; - -static const std::string TMP_DATA_DIR = "block_spill_test"; - -std::string test_data_dir; -std::shared_ptr<BlockSpillManager> block_spill_manager; - -class TestBlockSpill : public testing::Test { -public: - TestBlockSpill() : runtime_state_(TQueryGlobals()) { - profile_ = runtime_state_.runtime_profile(); - } - static void SetUpTestSuite() { - char buffer[MAX_PATH_LEN]; - EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); - test_data_dir = std::string(buffer) + "/" + TMP_DATA_DIR; - std::cout << "test data dir: " << test_data_dir << "\n"; - auto st = io::global_local_filesystem()->delete_directory(test_data_dir); - ASSERT_TRUE(st.ok()) << st; - st = io::global_local_filesystem()->create_directory(test_data_dir); - ASSERT_TRUE(st.ok()) << st; - - std::vector<StorePath> paths; - paths.emplace_back(test_data_dir, -1); - block_spill_manager = std::make_shared<BlockSpillManager>(paths); - static_cast<void>(block_spill_manager->init()); - } - - static void TearDownTestSuite() { - static_cast<void>(io::global_local_filesystem()->delete_directory(test_data_dir)); - } - -protected: - void SetUp() { - env_ = ExecEnv::GetInstance(); - env_->_block_spill_mgr = block_spill_manager.get(); - } - - void TearDown() {} - -private: - ExecEnv* env_ = nullptr; - RuntimeState runtime_state_; - RuntimeProfile* profile_; -}; - -TEST_F(TestBlockSpill, TestInt) { - int batch_size = 3; // rows in a block - int batch_num = 3; - int total_rows = batch_size * batch_num; - auto col1 = vectorized::ColumnVector<int>::create(); - auto col2 = vectorized::ColumnVector<int>::create(); - auto& data1 = col1->get_data(); - for (int i = 0; i < total_rows; ++i) { - data1.push_back(i); - } - auto& data2 = col2->get_data(); - data2.push_back(0); - - vectorized::DataTypePtr data_type(std::make_shared<vectorized::DataTypeInt32>()); - vectorized::ColumnWithTypeAndName type_and_name1(col1->get_ptr(), data_type, - "spill_block_test_int"); - vectorized::Block block1({type_and_name1}); - - vectorized::ColumnWithTypeAndName type_and_name2(col2->get_ptr(), data_type, - "spill_block_test_int"); - vectorized::Block block2({type_and_name2}); - - vectorized::BlockSpillWriterUPtr spill_block_writer; - static_cast<void>(block_spill_manager->get_writer(batch_size, spill_block_writer, profile_)); - static_cast<void>(spill_block_writer->write(block1)); - static_cast<void>(spill_block_writer->write(block2)); - static_cast<void>(spill_block_writer->close()); - - vectorized::BlockSpillReaderUPtr spill_block_reader; - static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(), - spill_block_reader, profile_)); - - vectorized::Block block_read; - bool eos = false; - - for (int i = 0; i < batch_num; ++i) { - static_cast<void>(spill_block_reader->read(&block_read, &eos)); - EXPECT_EQ(block_read.rows(), batch_size); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnVector<int>*)column.get(); - for (size_t j = 0; j < batch_size; ++j) { - EXPECT_EQ(real_column->get_int(j), j + i * batch_size); - } - } - - static_cast<void>(spill_block_reader->read(&block_read, &eos)); - static_cast<void>(spill_block_reader->close()); - - EXPECT_EQ(block_read.rows(), 1); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnVector<int>*)column.get(); - EXPECT_EQ(real_column->get_int(0), 0); -} - -TEST_F(TestBlockSpill, TestIntNullable) { - int batch_size = 3; // rows in a block - int batch_num = 3; - int total_rows = batch_size * batch_num + 1; - auto vec = vectorized::ColumnVector<int>::create(); - auto nullable_vec = vectorized::make_nullable(std::move(vec)); - auto* raw_nullable_vec = (vectorized::ColumnNullable*)nullable_vec.get(); - for (int i = 0; i < total_rows; ++i) { - if ((i + 1) % batch_size == 0) { - raw_nullable_vec->insert_data(nullptr, 0); - } else { - raw_nullable_vec->insert_data((const char*)&i, 4); - } - } - auto data_type = vectorized::make_nullable(std::make_shared<vectorized::DataTypeInt32>()); - vectorized::ColumnWithTypeAndName type_and_name(nullable_vec->get_ptr(), data_type, - "spill_block_test_int_nullable"); - vectorized::Block block({type_and_name}); - - vectorized::BlockSpillWriterUPtr spill_block_writer; - static_cast<void>(block_spill_manager->get_writer(batch_size, spill_block_writer, profile_)); - static_cast<void>(spill_block_writer->write(block)); - static_cast<void>(spill_block_writer->close()); - - vectorized::BlockSpillReaderUPtr spill_block_reader; - static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(), - spill_block_reader, profile_)); - - vectorized::Block block_read; - bool eos = false; - - for (int i = 0; i < batch_num; ++i) { - static_cast<void>(spill_block_reader->read(&block_read, &eos)); - - EXPECT_EQ(block_read.rows(), batch_size); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnNullable*)column.get(); - const auto& int_column = - (const vectorized::ColumnVector<int>&)(real_column->get_nested_column()); - for (size_t j = 0; j < batch_size; ++j) { - if ((j + 1) % batch_size == 0) { - ASSERT_TRUE(real_column->is_null_at(j)); - } else { - EXPECT_EQ(int_column.get_int(j), j + i * batch_size); - } - } - } - - static_cast<void>(spill_block_reader->read(&block_read, &eos)); - static_cast<void>(spill_block_reader->close()); - - EXPECT_EQ(block_read.rows(), 1); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnNullable*)column.get(); - const auto& int_column = - (const vectorized::ColumnVector<int>&)(real_column->get_nested_column()); - EXPECT_EQ(int_column.get_int(0), batch_size * 3); -} -TEST_F(TestBlockSpill, TestString) { - int batch_size = 3; // rows in a block - int batch_num = 3; - int total_rows = batch_size * batch_num + 1; - auto strcol = vectorized::ColumnString::create(); - for (int i = 0; i < total_rows; ++i) { - std::string is = std::to_string(i); - strcol->insert_data(is.c_str(), is.size()); - } - vectorized::DataTypePtr string_type(std::make_shared<vectorized::DataTypeString>()); - vectorized::ColumnWithTypeAndName test_string(strcol->get_ptr(), string_type, - "spill_block_test_string"); - vectorized::Block block({test_string}); - - vectorized::BlockSpillWriterUPtr spill_block_writer; - static_cast<void>(block_spill_manager->get_writer(batch_size, spill_block_writer, profile_)); - Status st = spill_block_writer->write(block); - static_cast<void>(spill_block_writer->close()); - EXPECT_TRUE(st.ok()); - - vectorized::BlockSpillReaderUPtr spill_block_reader; - static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(), - spill_block_reader, profile_)); - - vectorized::Block block_read; - bool eos = false; - - for (int i = 0; i < batch_num; ++i) { - st = spill_block_reader->read(&block_read, &eos); - EXPECT_TRUE(st.ok()); - - EXPECT_EQ(block_read.rows(), batch_size); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnString*)column.get(); - for (size_t j = 0; j < batch_size; ++j) { - EXPECT_EQ(real_column->get_data_at(j), StringRef(std::to_string(j + i * batch_size))); - } - } - - static_cast<void>(spill_block_reader->read(&block_read, &eos)); - static_cast<void>(spill_block_reader->close()); - - EXPECT_EQ(block_read.rows(), 1); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnString*)column.get(); - EXPECT_EQ(real_column->get_data_at(0), StringRef(std::to_string(batch_size * 3))); -} -TEST_F(TestBlockSpill, TestStringNullable) { - int batch_size = 3; // rows in a block - int batch_num = 3; - int total_rows = batch_size * batch_num + 1; - auto strcol = vectorized::ColumnString::create(); - auto nullable_vec = vectorized::make_nullable(std::move(strcol)); - auto* raw_nullable_vec = (vectorized::ColumnNullable*)nullable_vec.get(); - for (int i = 0; i < total_rows; ++i) { - if ((i + 1) % batch_size == 0) { - raw_nullable_vec->insert_data(nullptr, 0); - } else { - std::string is = std::to_string(i); - raw_nullable_vec->insert_data(is.c_str(), is.size()); - } - } - auto data_type = vectorized::make_nullable(std::make_shared<vectorized::DataTypeString>()); - vectorized::ColumnWithTypeAndName type_and_name(nullable_vec->get_ptr(), data_type, - "spill_block_test_string_nullable"); - vectorized::Block block({type_and_name}); - - vectorized::BlockSpillWriterUPtr spill_block_writer; - static_cast<void>(block_spill_manager->get_writer(batch_size, spill_block_writer, profile_)); - Status st = spill_block_writer->write(block); - static_cast<void>(spill_block_writer->close()); - EXPECT_TRUE(st.ok()); - - vectorized::BlockSpillReaderUPtr spill_block_reader; - static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(), - spill_block_reader, profile_)); - - vectorized::Block block_read; - bool eos = false; - - for (int i = 0; i < batch_num; ++i) { - st = spill_block_reader->read(&block_read, &eos); - EXPECT_TRUE(st.ok()); - - EXPECT_EQ(block_read.rows(), batch_size); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnNullable*)column.get(); - const auto& string_column = - (const vectorized::ColumnString&)(real_column->get_nested_column()); - for (size_t j = 0; j < batch_size; ++j) { - if ((j + 1) % batch_size == 0) { - ASSERT_TRUE(real_column->is_null_at(j)); - } else { - EXPECT_EQ(string_column.get_data_at(j), - StringRef(std::to_string(j + i * batch_size))); - } - } - } - - st = spill_block_reader->read(&block_read, &eos); - static_cast<void>(spill_block_reader->close()); - EXPECT_TRUE(st.ok()); - - EXPECT_EQ(block_read.rows(), 1); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnNullable*)column.get(); - const auto& string_column = (const vectorized::ColumnString&)(real_column->get_nested_column()); - EXPECT_EQ(string_column.get_data_at(0), StringRef(std::to_string(batch_size * 3))); -} -TEST_F(TestBlockSpill, TestDecimal) { - int batch_size = 3; // rows in a block - int batch_num = 3; - int total_rows = batch_size * batch_num + 1; - - vectorized::DataTypePtr decimal_data_type(doris::vectorized::create_decimal(27, 9, true)); - auto decimal_column = decimal_data_type->create_column(); - auto& decimal_data = ((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*) - decimal_column.get()) - ->get_data(); - for (int i = 0; i < total_rows; ++i) { - __int128_t value = __int128_t(i * pow(10, 9) + i * pow(10, 8)); - decimal_data.push_back(value); - } - vectorized::ColumnWithTypeAndName test_decimal(decimal_column->get_ptr(), decimal_data_type, - "spill_block_test_decimal"); - vectorized::Block block({test_decimal}); - - vectorized::BlockSpillWriterUPtr spill_block_writer; - static_cast<void>(block_spill_manager->get_writer(batch_size, spill_block_writer, profile_)); - auto st = spill_block_writer->write(block); - static_cast<void>(spill_block_writer->close()); - EXPECT_TRUE(st.ok()); - - vectorized::BlockSpillReaderUPtr spill_block_reader; - static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(), - spill_block_reader, profile_)); - - vectorized::Block block_read; - bool eos = false; - - for (int i = 0; i < batch_num; ++i) { - st = spill_block_reader->read(&block_read, &eos); - EXPECT_TRUE(st.ok()); - - EXPECT_EQ(block_read.rows(), batch_size); - auto column = block_read.get_by_position(0).column; - auto* real_column = - (vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)column.get(); - for (size_t j = 0; j < batch_size; ++j) { - __int128_t value = __int128_t((j + i * batch_size) * (pow(10, 9) + pow(10, 8))); - EXPECT_EQ(real_column->get_element(j).value, value); - } - } - - st = spill_block_reader->read(&block_read, &eos); - static_cast<void>(spill_block_reader->close()); - EXPECT_TRUE(st.ok()); - - EXPECT_EQ(block_read.rows(), 1); - auto column = block_read.get_by_position(0).column; - auto* real_column = - (vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)column.get(); - EXPECT_EQ(real_column->get_element(0).value, batch_size * 3 * (pow(10, 9) + pow(10, 8))); -} -TEST_F(TestBlockSpill, TestDecimalNullable) { - int batch_size = 3; // rows in a block - int batch_num = 3; - int total_rows = batch_size * batch_num + 1; - - vectorized::DataTypePtr decimal_data_type(doris::vectorized::create_decimal(27, 9, true)); - auto base_col = vectorized::make_nullable(decimal_data_type->create_column()); - auto* nullable_col = (vectorized::ColumnNullable*)base_col.get(); - for (int i = 0; i < total_rows; ++i) { - if ((i + 1) % batch_size == 0) { - nullable_col->insert_data(nullptr, 0); - } else { - __int128_t value = __int128_t(i * pow(10, 9) + i * pow(10, 8)); - nullable_col->insert_data((const char*)&value, sizeof(value)); - } - } - auto data_type = vectorized::make_nullable(decimal_data_type); - vectorized::ColumnWithTypeAndName type_and_name(nullable_col->get_ptr(), data_type, - "spill_block_test_decimal_nullable"); - vectorized::Block block({type_and_name}); - - vectorized::BlockSpillWriterUPtr spill_block_writer; - static_cast<void>(block_spill_manager->get_writer(batch_size, spill_block_writer, profile_)); - auto st = spill_block_writer->write(block); - static_cast<void>(spill_block_writer->close()); - EXPECT_TRUE(st.ok()); - - vectorized::BlockSpillReaderUPtr spill_block_reader; - static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(), - spill_block_reader, profile_)); - - vectorized::Block block_read; - bool eos = false; - - for (int i = 0; i < batch_num; ++i) { - st = spill_block_reader->read(&block_read, &eos); - EXPECT_TRUE(st.ok()); - - EXPECT_EQ(block_read.rows(), batch_size); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnNullable*)column.get(); - const auto& decimal_col = (vectorized::ColumnDecimal<vectorized::Decimal< - vectorized::Int128>>&)(real_column->get_nested_column()); - for (size_t j = 0; j < batch_size; ++j) { - if ((j + 1) % batch_size == 0) { - ASSERT_TRUE(real_column->is_null_at(j)); - } else { - __int128_t value = __int128_t((j + i * batch_size) * (pow(10, 9) + pow(10, 8))); - EXPECT_EQ(decimal_col.get_element(j).value, value); - } - } - } - - st = spill_block_reader->read(&block_read, &eos); - static_cast<void>(spill_block_reader->close()); - EXPECT_TRUE(st.ok()); - - EXPECT_EQ(block_read.rows(), 1); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnNullable*)column.get(); - const auto& decimal_col = - (vectorized::ColumnDecimal< - vectorized::Decimal<vectorized::Int128>>&)(real_column->get_nested_column()); - EXPECT_EQ(decimal_col.get_element(0).value, batch_size * 3 * (pow(10, 9) + pow(10, 8))); -} -std::string convert_bitmap_to_string(BitmapValue& bitmap); -TEST_F(TestBlockSpill, TestBitmap) { - int batch_size = 3; // rows in a block - int batch_num = 3; - int total_rows = batch_size * batch_num + 1; - - vectorized::DataTypePtr bitmap_data_type(std::make_shared<vectorized::DataTypeBitMap>()); - auto bitmap_column = bitmap_data_type->create_column(); - std::vector<BitmapValue>& container = - ((vectorized::ColumnBitmap*)bitmap_column.get())->get_data(); - std::vector<std::string> expected_bitmap_str; - for (int i = 0; i < total_rows; ++i) { - BitmapValue bv; - for (int j = 0; j <= i; ++j) { - bv.add(j); - } - expected_bitmap_str.emplace_back(convert_bitmap_to_string(bv)); - container.push_back(bv); - } - vectorized::ColumnWithTypeAndName type_and_name(bitmap_column->get_ptr(), bitmap_data_type, - "spill_block_test_bitmap"); - vectorized::Block block({type_and_name}); - - vectorized::BlockSpillWriterUPtr spill_block_writer; - static_cast<void>(block_spill_manager->get_writer(batch_size, spill_block_writer, profile_)); - auto st = spill_block_writer->write(block); - static_cast<void>(spill_block_writer->close()); - EXPECT_TRUE(st.ok()); - - vectorized::BlockSpillReaderUPtr spill_block_reader; - static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(), - spill_block_reader, profile_)); - - vectorized::Block block_read; - bool eos = false; - - for (int i = 0; i < batch_num; ++i) { - st = spill_block_reader->read(&block_read, &eos); - EXPECT_TRUE(st.ok()); - - EXPECT_EQ(block_read.rows(), batch_size); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnBitmap*)column.get(); - for (size_t j = 0; j < batch_size; ++j) { - auto bitmap_str = convert_bitmap_to_string(real_column->get_element(j)); - EXPECT_EQ(bitmap_str, expected_bitmap_str[j + i * batch_size]); - } - } - - st = spill_block_reader->read(&block_read, &eos); - static_cast<void>(spill_block_reader->close()); - EXPECT_TRUE(st.ok()); - - EXPECT_EQ(block_read.rows(), 1); - auto column = block_read.get_by_position(0).column; - auto* real_column = (vectorized::ColumnBitmap*)column.get(); - auto bitmap_str = convert_bitmap_to_string(real_column->get_element(0)); - EXPECT_EQ(bitmap_str, expected_bitmap_str[3 * batch_size]); -} -} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org