This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c90825da938 [refactor](spill) Delete not used code (#36218)
c90825da938 is described below

commit c90825da93856e9c82529e6d53e2eb9d00c2d8ba
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Thu Jun 13 16:32:14 2024 +0800

    [refactor](spill) Delete not used code (#36218)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/exec/aggregation_sink_operator.h |   1 -
 be/src/runtime/block_spill_manager.cpp           | 147 -------
 be/src/runtime/block_spill_manager.h             |  64 ---
 be/src/runtime/exec_env.h                        |   3 -
 be/src/runtime/exec_env_init.cpp                 |   5 -
 be/src/vec/core/block_spill_reader.cpp           | 161 -------
 be/src/vec/core/block_spill_reader.h             |  86 ----
 be/src/vec/core/block_spill_writer.cpp           | 161 -------
 be/src/vec/core/block_spill_writer.h             |  95 -----
 be/test/vec/core/block_spill_test.cpp            | 507 -----------------------
 10 files changed, 1230 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index add6712453f..10a81199140 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -20,7 +20,6 @@
 #include <stdint.h>
 
 #include "pipeline/exec/operator.h"
-#include "runtime/block_spill_manager.h"
 #include "runtime/exec_env.h"
 
 namespace doris::pipeline {
diff --git a/be/src/runtime/block_spill_manager.cpp 
b/be/src/runtime/block_spill_manager.cpp
deleted file mode 100644
index 8c2438f19b3..00000000000
--- a/be/src/runtime/block_spill_manager.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/block_spill_manager.h"
-
-#include <fmt/format.h>
-#include <glog/logging.h>
-
-#include <algorithm>
-#include <boost/uuid/random_generator.hpp>
-#include <boost/uuid/uuid_io.hpp>
-#include <numeric>
-#include <random>
-
-#include "io/fs/file_system.h"
-#include "io/fs/local_file_system.h"
-#include "util/time.h"
-#include "vec/core/block_spill_reader.h"
-#include "vec/core/block_spill_writer.h"
-
-namespace doris {
-static const std::string BLOCK_SPILL_DIR = "spill";
-static const std::string BLOCK_SPILL_GC_DIR = "spill_gc";
-BlockSpillManager::BlockSpillManager(const std::vector<StorePath>& paths) : 
_store_paths(paths) {}
-
-Status BlockSpillManager::init() {
-    for (const auto& path : _store_paths) {
-        auto dir = fmt::format("{}/{}", path.path, BLOCK_SPILL_GC_DIR);
-        bool exists = true;
-        RETURN_IF_ERROR(io::global_local_filesystem()->exists(dir, &exists));
-        if (!exists) {
-            
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir));
-        }
-
-        dir = fmt::format("{}/{}", path.path, BLOCK_SPILL_DIR);
-        RETURN_IF_ERROR(io::global_local_filesystem()->exists(dir, &exists));
-        if (!exists) {
-            
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir));
-        } else {
-            auto suffix = ToStringFromUnixMillis(UnixMillis());
-            auto gc_dir = fmt::format("{}/{}/{}", path.path, 
BLOCK_SPILL_GC_DIR, suffix);
-            RETURN_IF_ERROR(io::global_local_filesystem()->rename(dir, 
gc_dir));
-            
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir));
-        }
-    }
-
-    return Status::OK();
-}
-
-void BlockSpillManager::gc(int64_t max_file_count) {
-    if (max_file_count < 1) {
-        return;
-    }
-    bool exists = true;
-    int64_t count = 0;
-    for (const auto& path : _store_paths) {
-        std::string gc_root_dir = fmt::format("{}/{}", path.path, 
BLOCK_SPILL_GC_DIR);
-
-        std::error_code ec;
-        exists = std::filesystem::exists(gc_root_dir, ec);
-        if (ec || !exists) {
-            continue;
-        }
-        std::vector<io::FileInfo> dirs;
-        auto st = io::global_local_filesystem()->list(gc_root_dir, false, 
&dirs, &exists);
-        if (!st.ok()) {
-            continue;
-        }
-        for (const auto& dir : dirs) {
-            if (dir.is_file) {
-                continue;
-            }
-            std::string abs_dir = fmt::format("{}/{}", gc_root_dir, 
dir.file_name);
-            std::vector<io::FileInfo> files;
-            st = io::global_local_filesystem()->list(abs_dir, true, &files, 
&exists);
-            if (!st.ok()) {
-                continue;
-            }
-            if (files.empty()) {
-                
static_cast<void>(io::global_local_filesystem()->delete_directory(abs_dir));
-                if (count++ == max_file_count) {
-                    return;
-                }
-                continue;
-            }
-            for (const auto& file : files) {
-                auto abs_file_path = fmt::format("{}/{}", abs_dir, 
file.file_name);
-                
static_cast<void>(io::global_local_filesystem()->delete_file(abs_file_path));
-                if (count++ == max_file_count) {
-                    return;
-                }
-            }
-        }
-    }
-}
-
-Status BlockSpillManager::get_writer(int32_t batch_size, 
vectorized::BlockSpillWriterUPtr& writer,
-                                     RuntimeProfile* profile) {
-    int64_t id;
-    std::vector<int> indices(_store_paths.size());
-    std::iota(indices.begin(), indices.end(), 0);
-    std::shuffle(indices.begin(), indices.end(), std::mt19937 
{std::random_device {}()});
-
-    std::string path = _store_paths[indices[0]].path + "/" + BLOCK_SPILL_DIR;
-    std::string unique_name = 
boost::uuids::to_string(boost::uuids::random_generator()());
-    path += "/" + unique_name;
-    {
-        std::lock_guard<std::mutex> l(lock_);
-        id = id_++;
-        id_to_file_paths_[id] = path;
-    }
-
-    writer.reset(new vectorized::BlockSpillWriter(id, batch_size, path, 
profile));
-    return writer->open();
-}
-
-Status BlockSpillManager::get_reader(int64_t stream_id, 
vectorized::BlockSpillReaderUPtr& reader,
-                                     RuntimeProfile* profile, bool 
delete_after_read) {
-    std::string path;
-    {
-        std::lock_guard<std::mutex> l(lock_);
-        CHECK(id_to_file_paths_.end() != id_to_file_paths_.find(stream_id));
-        path = id_to_file_paths_[stream_id];
-    }
-    reader.reset(new vectorized::BlockSpillReader(stream_id, path, profile, 
delete_after_read));
-    return reader->open();
-}
-
-void BlockSpillManager::remove(int64_t stream_id) {
-    std::lock_guard<std::mutex> l(lock_);
-    id_to_file_paths_.erase(stream_id);
-}
-} // namespace doris
diff --git a/be/src/runtime/block_spill_manager.h 
b/be/src/runtime/block_spill_manager.h
deleted file mode 100644
index 5601d58aed1..00000000000
--- a/be/src/runtime/block_spill_manager.h
+++ /dev/null
@@ -1,64 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include <memory>
-#include <mutex>
-#include <string>
-#include <unordered_map>
-#include <vector>
-
-#include "common/status.h"
-#include "olap/options.h"
-#include "vec/core/block_spill_reader.h"
-#include "vec/core/block_spill_writer.h"
-
-namespace doris {
-class RuntimeProfile;
-
-namespace vectorized {
-
-using BlockSpillWriterUPtr = std::unique_ptr<BlockSpillWriter>;
-using BlockSpillReaderUPtr = std::unique_ptr<BlockSpillReader>;
-} // namespace vectorized
-
-class BlockSpillManager {
-public:
-    BlockSpillManager(const std::vector<StorePath>& paths);
-
-    Status init();
-
-    Status get_writer(int32_t batch_size, vectorized::BlockSpillWriterUPtr& 
writer,
-                      RuntimeProfile* profile);
-
-    Status get_reader(int64_t stream_id, vectorized::BlockSpillReaderUPtr& 
reader,
-                      RuntimeProfile* profile, bool delete_after_read = true);
-
-    void remove(int64_t streamid_);
-
-    void gc(int64_t max_file_count);
-
-private:
-    std::vector<StorePath> _store_paths;
-    std::mutex lock_;
-    int64_t id_ = 0;
-    std::unordered_map<int64_t, std::string> id_to_file_paths_;
-};
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index ef6671d96bc..2bba483249d 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -81,7 +81,6 @@ class LoadStreamMapPool;
 class StreamLoadExecutor;
 class RoutineLoadTaskExecutor;
 class SmallFileMgr;
-class BlockSpillManager;
 class BackendServiceClient;
 class TPaloBrokerServiceClient;
 class PBackendService_Stub;
@@ -218,7 +217,6 @@ public:
     LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
     std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return 
_new_load_stream_mgr; }
     SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
-    BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; }
     doris::vectorized::SpillStreamManager* spill_stream_mgr() { return 
_spill_stream_mgr; }
     GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; }
 
@@ -395,7 +393,6 @@ private:
     HeartbeatFlags* _heartbeat_flags = nullptr;
     vectorized::ScannerScheduler* _scanner_scheduler = nullptr;
 
-    BlockSpillManager* _block_spill_mgr = nullptr;
     // To save meta info of external file, such as parquet footer.
     FileMetaCache* _file_meta_cache = nullptr;
     std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 8d654c8d09b..50ef300412c 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -57,7 +57,6 @@
 #include "pipeline/pipeline_tracing.h"
 #include "pipeline/task_queue.h"
 #include "pipeline/task_scheduler.h"
-#include "runtime/block_spill_manager.h"
 #include "runtime/broker_mgr.h"
 #include "runtime/cache/result_cache.h"
 #include "runtime/client_cache.h"
@@ -293,7 +292,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _routine_load_task_executor = new RoutineLoadTaskExecutor(this);
     RETURN_IF_ERROR(_routine_load_task_executor->init());
     _small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
-    _block_spill_mgr = new BlockSpillManager(store_paths);
     _group_commit_mgr = new GroupCommitMgr(this);
     _memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
     _load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
@@ -571,8 +569,6 @@ Status ExecEnv::_init_mem_env() {
               << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
               << ", origin config value: " << 
config::inverted_index_query_cache_limit;
 
-    RETURN_IF_ERROR(_block_spill_mgr->init());
-
     return Status::OK();
 }
 
@@ -676,7 +672,6 @@ void ExecEnv::destroy() {
     SAFE_DELETE(_load_channel_mgr);
 
     SAFE_DELETE(_spill_stream_mgr);
-    SAFE_DELETE(_block_spill_mgr);
     SAFE_DELETE(_inverted_index_query_cache);
     SAFE_DELETE(_inverted_index_searcher_cache);
     SAFE_DELETE(_lookup_connection_cache);
diff --git a/be/src/vec/core/block_spill_reader.cpp 
b/be/src/vec/core/block_spill_reader.cpp
deleted file mode 100644
index fe1ee13e830..00000000000
--- a/be/src/vec/core/block_spill_reader.cpp
+++ /dev/null
@@ -1,161 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/core/block_spill_reader.h"
-
-#include <gen_cpp/Types_types.h>
-#include <gen_cpp/data.pb.h>
-#include <glog/logging.h>
-#include <unistd.h>
-
-#include <algorithm>
-
-#include "io/file_factory.h"
-#include "io/fs/file_reader.h"
-#include "io/fs/local_file_system.h"
-#include "runtime/block_spill_manager.h"
-#include "runtime/exec_env.h"
-#include "util/slice.h"
-#include "vec/core/block.h"
-
-namespace doris {
-namespace io {
-class FileSystem;
-} // namespace io
-
-namespace vectorized {
-void BlockSpillReader::_init_profile() {
-    read_time_ = ADD_TIMER(profile_, "ReadTime");
-    deserialize_time_ = ADD_TIMER(profile_, "DeserializeTime");
-    read_bytes_ = ADD_COUNTER(profile_, "ReadBytes", TUnit::BYTES);
-    read_block_num_ = ADD_COUNTER(profile_, "ReadBlockNum", TUnit::UNIT);
-}
-
-Status BlockSpillReader::open() {
-    io::FileSystemProperties system_properties;
-    system_properties.system_type = TFileType::FILE_LOCAL;
-
-    io::FileDescription file_description;
-    file_description.path = file_path_;
-    file_reader_ = 
DORIS_TRY(FileFactory::create_file_reader(system_properties, file_description,
-                                                             
io::FileReaderOptions::DEFAULT));
-
-    size_t file_size = file_reader_->size();
-
-    Slice result((char*)&block_count_, sizeof(size_t));
-
-    // read block count
-    size_t bytes_read = 0;
-    RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result, 
&bytes_read));
-
-    // read max sub block size
-    result.data = (char*)&max_sub_block_size_;
-    RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2, 
result, &bytes_read));
-
-    size_t buff_size = std::max(block_count_ * sizeof(size_t), 
max_sub_block_size_);
-    read_buff_.reset(new char[buff_size]);
-
-    // read block start offsets
-    size_t read_offset = file_size - (block_count_ + 2) * sizeof(size_t);
-    result.data = read_buff_.get();
-    result.size = block_count_ * sizeof(size_t);
-
-    RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read));
-    DCHECK(bytes_read == block_count_ * sizeof(size_t));
-
-    block_start_offsets_.resize(block_count_ + 1);
-    for (size_t i = 0; i < block_count_; ++i) {
-        block_start_offsets_[i] = *(size_t*)(result.data + i * sizeof(size_t));
-    }
-    block_start_offsets_[block_count_] = file_size - (block_count_ + 2) * 
sizeof(size_t);
-
-    return Status::OK();
-}
-
-void BlockSpillReader::seek(size_t block_index) {
-    DCHECK(file_reader_ != nullptr);
-    DCHECK_LT(block_index, block_count_);
-    read_block_index_ = block_index;
-}
-
-// The returned block is owned by BlockSpillReader and is
-// destroyed when reading next block.
-Status BlockSpillReader::read(Block* block, bool* eos) {
-    DCHECK(file_reader_);
-    block->clear();
-
-    if (read_block_index_ >= block_count_) {
-        *eos = true;
-        return Status::OK();
-    }
-
-    size_t bytes_to_read =
-            block_start_offsets_[read_block_index_ + 1] - 
block_start_offsets_[read_block_index_];
-
-    if (bytes_to_read == 0) {
-        ++read_block_index_;
-        COUNTER_UPDATE(read_block_num_, 1);
-        return Status::OK();
-    }
-    Slice result(read_buff_.get(), bytes_to_read);
-
-    size_t bytes_read = 0;
-
-    {
-        SCOPED_TIMER(read_time_);
-        
RETURN_IF_ERROR(file_reader_->read_at(block_start_offsets_[read_block_index_], 
result,
-                                              &bytes_read));
-    }
-    DCHECK(bytes_read == bytes_to_read);
-    COUNTER_UPDATE(read_bytes_, bytes_read);
-    COUNTER_UPDATE(read_block_num_, 1);
-
-    if (bytes_read > 0) {
-        PBlock pb_block;
-        BlockUPtr new_block = nullptr;
-        {
-            SCOPED_TIMER(deserialize_time_);
-            if (!pb_block.ParseFromArray(result.data, result.size)) {
-                return Status::InternalError("Failed to read spilled block");
-            }
-            new_block = Block::create_unique();
-            RETURN_IF_ERROR(new_block->deserialize(pb_block));
-        }
-        block->swap(*new_block);
-    } else {
-        block->clear_column_data();
-    }
-
-    ++read_block_index_;
-
-    return Status::OK();
-}
-
-Status BlockSpillReader::close() {
-    if (!file_reader_) {
-        return Status::OK();
-    }
-    ExecEnv::GetInstance()->block_spill_mgr()->remove(stream_id_);
-    file_reader_.reset();
-    if (delete_after_read_) {
-        
static_cast<void>(io::global_local_filesystem()->delete_file(file_path_));
-    }
-    return Status::OK();
-}
-
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/core/block_spill_reader.h 
b/be/src/vec/core/block_spill_reader.h
deleted file mode 100644
index d982d586a1a..00000000000
--- a/be/src/vec/core/block_spill_reader.h
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "common/status.h"
-#include "io/fs/file_reader_writer_fwd.h"
-#include "util/runtime_profile.h"
-
-namespace doris {
-namespace vectorized {
-class Block;
-
-// Read data spilled to local file.
-class BlockSpillReader {
-public:
-    BlockSpillReader(int64_t stream_id, const std::string& file_path, 
RuntimeProfile* profile,
-                     bool delete_after_read = true)
-            : stream_id_(stream_id),
-              file_path_(file_path),
-              delete_after_read_(delete_after_read),
-              profile_(profile) {
-        _init_profile();
-    }
-
-    ~BlockSpillReader() { static_cast<void>(close()); }
-
-    Status open();
-
-    Status close();
-
-    Status read(Block* block, bool* eos);
-
-    void seek(size_t block_index);
-
-    int64_t get_id() const { return stream_id_; }
-
-    std::string get_path() const { return file_path_; }
-
-    size_t block_count() const { return block_count_; }
-
-private:
-    void _init_profile();
-
-    int64_t stream_id_;
-    std::string file_path_;
-    bool delete_after_read_;
-    io::FileReaderSPtr file_reader_;
-
-    size_t block_count_ = 0;
-    size_t read_block_index_ = 0;
-    size_t max_sub_block_size_ = 0;
-    std::unique_ptr<char[]> read_buff_;
-    std::vector<size_t> block_start_offsets_;
-
-    RuntimeProfile* profile_ = nullptr;
-    RuntimeProfile::Counter* read_time_ = nullptr;
-    RuntimeProfile::Counter* deserialize_time_ = nullptr;
-    RuntimeProfile::Counter* read_bytes_ = nullptr;
-    RuntimeProfile::Counter* read_block_num_ = nullptr;
-};
-
-using BlockSpillReaderUPtr = std::unique_ptr<BlockSpillReader>;
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/core/block_spill_writer.cpp 
b/be/src/vec/core/block_spill_writer.cpp
deleted file mode 100644
index 92fe34a3eb0..00000000000
--- a/be/src/vec/core/block_spill_writer.cpp
+++ /dev/null
@@ -1,161 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/core/block_spill_writer.h"
-
-#include <gen_cpp/Metrics_types.h>
-#include <gen_cpp/Types_types.h>
-#include <gen_cpp/data.pb.h>
-#include <gen_cpp/segment_v2.pb.h>
-#include <unistd.h>
-
-#include <algorithm>
-
-#include "agent/be_exec_version_manager.h"
-#include "io/file_factory.h"
-#include "runtime/exec_env.h"
-#include "runtime/thread_context.h"
-#include "vec/columns/column.h"
-#include "vec/core/column_with_type_and_name.h"
-
-namespace doris {
-namespace vectorized {
-void BlockSpillWriter::_init_profile() {
-    write_bytes_counter_ = ADD_COUNTER(profile_, "WriteBytes", TUnit::BYTES);
-    write_timer_ = ADD_TIMER(profile_, "WriteTime");
-    serialize_timer_ = ADD_TIMER(profile_, "SerializeTime");
-    write_blocks_num_ = ADD_COUNTER(profile_, "WriteBlockNum", TUnit::UNIT);
-}
-
-Status BlockSpillWriter::open() {
-    file_writer_ = DORIS_TRY(FileFactory::create_file_writer(
-            TFileType::FILE_LOCAL, ExecEnv::GetInstance(), {}, {}, file_path_,
-            {
-                    .write_file_cache = false,
-                    .sync_file_data = false,
-            }));
-    is_open_ = true;
-    return Status::OK();
-}
-
-Status BlockSpillWriter::close() {
-    if (!is_open_) {
-        return Status::OK();
-    }
-
-    is_open_ = false;
-
-    tmp_block_.clear_column_data();
-
-    meta_.append((const char*)&max_sub_block_size_, 
sizeof(max_sub_block_size_));
-    meta_.append((const char*)&written_blocks_, sizeof(written_blocks_));
-
-    Status status;
-    // meta: block1 offset, block2 offset, ..., blockn offset, n
-    {
-        SCOPED_TIMER(write_timer_);
-        status = file_writer_->append(meta_);
-    }
-    if (!status.ok()) {
-        unlink(file_path_.c_str());
-        return status;
-    }
-
-    RETURN_IF_ERROR(file_writer_->close());
-    file_writer_.reset();
-    return Status::OK();
-}
-
-Status BlockSpillWriter::write(const Block& block) {
-    auto rows = block.rows();
-    // file format: block1, block2, ..., blockn, meta
-    if (rows <= batch_size_) {
-        return _write_internal(block);
-    } else {
-        if (is_first_write_) {
-            is_first_write_ = false;
-            tmp_block_ = block.clone_empty();
-        }
-
-        const auto& src_data = block.get_columns_with_type_and_name();
-
-        for (size_t row_idx = 0; row_idx < rows;) {
-            tmp_block_.clear_column_data();
-
-            auto& dst_data = tmp_block_.get_columns_with_type_and_name();
-
-            size_t block_rows = std::min(rows - row_idx, batch_size_);
-            RETURN_IF_CATCH_EXCEPTION({
-                for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx) 
{
-                    
dst_data[col_idx].column->assume_mutable()->insert_range_from(
-                            *src_data[col_idx].column, row_idx, block_rows);
-                }
-            });
-
-            RETURN_IF_ERROR(_write_internal(tmp_block_));
-
-            row_idx += block_rows;
-        }
-        return Status::OK();
-    }
-}
-Status BlockSpillWriter::_write_internal(const Block& block) {
-    size_t uncompressed_bytes = 0, compressed_bytes = 0;
-    size_t written_bytes = 0;
-
-    Status status;
-    std::string buff;
-
-    if (block.rows() > 0) {
-        PBlock pblock;
-        {
-            SCOPED_TIMER(serialize_timer_);
-            status = 
block.serialize(BeExecVersionManager::get_newest_version(), &pblock,
-                                     &uncompressed_bytes, &compressed_bytes,
-                                     
segment_v2::CompressionTypePB::NO_COMPRESSION);
-            if (!status.ok()) {
-                unlink(file_path_.c_str());
-                return status;
-            }
-            pblock.SerializeToString(&buff);
-        }
-
-        {
-            SCOPED_TIMER(write_timer_);
-            status = file_writer_->append(buff);
-            written_bytes = buff.size();
-        }
-
-        if (!status.ok()) {
-            unlink(file_path_.c_str());
-            return status;
-        }
-    }
-
-    max_sub_block_size_ = std::max(max_sub_block_size_, written_bytes);
-
-    meta_.append((const char*)&total_written_bytes_, sizeof(size_t));
-    COUNTER_UPDATE(write_bytes_counter_, written_bytes);
-    COUNTER_UPDATE(write_blocks_num_, 1);
-    total_written_bytes_ += written_bytes;
-    ++written_blocks_;
-
-    return Status::OK();
-}
-
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/core/block_spill_writer.h 
b/be/src/vec/core/block_spill_writer.h
deleted file mode 100644
index 86533a99966..00000000000
--- a/be/src/vec/core/block_spill_writer.h
+++ /dev/null
@@ -1,95 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include <memory>
-#include <string>
-
-#include "common/status.h"
-#include "io/fs/file_writer.h"
-#include "util/runtime_profile.h"
-#include "vec/core/block.h"
-
-namespace doris {
-namespace vectorized {
-
-// Write a block to a local file.
-//
-// The block may be logically splitted to small sub blocks in the single file,
-// which can be read back one small block at a time by BlockSpillReader::read.
-//
-// Split to small blocks is necessary for Sort node, which need to merge 
multiple
-// spilled big sorted blocks into a bigger sorted block. A small block is read 
from each
-// spilled block file each time.
-class BlockSpillWriter {
-public:
-    BlockSpillWriter(int64_t id, size_t batch_size, const std::string& 
file_path,
-                     RuntimeProfile* profile)
-            : stream_id_(id), batch_size_(batch_size), file_path_(file_path), 
profile_(profile) {
-        _init_profile();
-    }
-
-    ~BlockSpillWriter() {
-        if (nullptr != file_writer_ && file_writer_->state() != 
io::FileWriter::State::CLOSED) {
-            std::ignore = file_writer_->close();
-        }
-    }
-
-    Status open();
-
-    Status close();
-
-    Status write(const Block& block);
-
-    int64_t get_id() const { return stream_id_; }
-
-    size_t get_written_bytes() const { return total_written_bytes_; }
-
-private:
-    void _init_profile();
-
-    Status _write_internal(const Block& block);
-
-private:
-    bool is_open_ = false;
-    int64_t stream_id_;
-    size_t batch_size_;
-    size_t max_sub_block_size_ = 0;
-    std::string file_path_;
-    std::unique_ptr<doris::io::FileWriter> file_writer_;
-
-    size_t written_blocks_ = 0;
-    size_t total_written_bytes_ = 0;
-    std::string meta_;
-
-    bool is_first_write_ = true;
-    Block tmp_block_;
-
-    RuntimeProfile* profile_ = nullptr;
-    RuntimeProfile::Counter* write_bytes_counter_ = nullptr;
-    RuntimeProfile::Counter* serialize_timer_ = nullptr;
-    RuntimeProfile::Counter* write_timer_ = nullptr;
-    RuntimeProfile::Counter* write_blocks_num_ = nullptr;
-};
-
-using BlockSpillWriterUPtr = std::unique_ptr<BlockSpillWriter>;
-} // namespace vectorized
-} // namespace doris
diff --git a/be/test/vec/core/block_spill_test.cpp 
b/be/test/vec/core/block_spill_test.cpp
deleted file mode 100644
index af30479e10a..00000000000
--- a/be/test/vec/core/block_spill_test.cpp
+++ /dev/null
@@ -1,507 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gen_cpp/PaloInternalService_types.h>
-#include <gtest/gtest-message.h>
-#include <gtest/gtest-test-part.h>
-#include <stdint.h>
-#include <unistd.h>
-
-#include <cmath>
-#include <iostream>
-#include <memory>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "common/status.h"
-#include "gtest/gtest_pred_impl.h"
-#include "io/fs/local_file_system.h"
-#include "olap/options.h"
-#include "runtime/block_spill_manager.h"
-#include "runtime/exec_env.h"
-#include "runtime/runtime_state.h"
-#include "util/bitmap_value.h"
-#include "vec/columns/column.h"
-#include "vec/columns/column_complex.h"
-#include "vec/columns/column_decimal.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_string.h"
-#include "vec/columns/column_vector.h"
-#include "vec/common/string_ref.h"
-#include "vec/core/block.h"
-#include "vec/core/block_spill_reader.h"
-#include "vec/core/block_spill_writer.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/core/types.h"
-#include "vec/data_types/data_type.h"
-#include "vec/data_types/data_type_bitmap.h"
-#include "vec/data_types/data_type_decimal.h"
-#include "vec/data_types/data_type_nullable.h"
-#include "vec/data_types/data_type_number.h"
-#include "vec/data_types/data_type_string.h"
-
-namespace doris {
-class RuntimeProfile;
-
-static const uint32_t MAX_PATH_LEN = 1024;
-
-static const std::string TMP_DATA_DIR = "block_spill_test";
-
-std::string test_data_dir;
-std::shared_ptr<BlockSpillManager> block_spill_manager;
-
-class TestBlockSpill : public testing::Test {
-public:
-    TestBlockSpill() : runtime_state_(TQueryGlobals()) {
-        profile_ = runtime_state_.runtime_profile();
-    }
-    static void SetUpTestSuite() {
-        char buffer[MAX_PATH_LEN];
-        EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
-        test_data_dir = std::string(buffer) + "/" + TMP_DATA_DIR;
-        std::cout << "test data dir: " << test_data_dir << "\n";
-        auto st = 
io::global_local_filesystem()->delete_directory(test_data_dir);
-        ASSERT_TRUE(st.ok()) << st;
-        st = io::global_local_filesystem()->create_directory(test_data_dir);
-        ASSERT_TRUE(st.ok()) << st;
-
-        std::vector<StorePath> paths;
-        paths.emplace_back(test_data_dir, -1);
-        block_spill_manager = std::make_shared<BlockSpillManager>(paths);
-        static_cast<void>(block_spill_manager->init());
-    }
-
-    static void TearDownTestSuite() {
-        
static_cast<void>(io::global_local_filesystem()->delete_directory(test_data_dir));
-    }
-
-protected:
-    void SetUp() {
-        env_ = ExecEnv::GetInstance();
-        env_->_block_spill_mgr = block_spill_manager.get();
-    }
-
-    void TearDown() {}
-
-private:
-    ExecEnv* env_ = nullptr;
-    RuntimeState runtime_state_;
-    RuntimeProfile* profile_;
-};
-
-TEST_F(TestBlockSpill, TestInt) {
-    int batch_size = 3; // rows in a block
-    int batch_num = 3;
-    int total_rows = batch_size * batch_num;
-    auto col1 = vectorized::ColumnVector<int>::create();
-    auto col2 = vectorized::ColumnVector<int>::create();
-    auto& data1 = col1->get_data();
-    for (int i = 0; i < total_rows; ++i) {
-        data1.push_back(i);
-    }
-    auto& data2 = col2->get_data();
-    data2.push_back(0);
-
-    vectorized::DataTypePtr 
data_type(std::make_shared<vectorized::DataTypeInt32>());
-    vectorized::ColumnWithTypeAndName type_and_name1(col1->get_ptr(), 
data_type,
-                                                     "spill_block_test_int");
-    vectorized::Block block1({type_and_name1});
-
-    vectorized::ColumnWithTypeAndName type_and_name2(col2->get_ptr(), 
data_type,
-                                                     "spill_block_test_int");
-    vectorized::Block block2({type_and_name2});
-
-    vectorized::BlockSpillWriterUPtr spill_block_writer;
-    static_cast<void>(block_spill_manager->get_writer(batch_size, 
spill_block_writer, profile_));
-    static_cast<void>(spill_block_writer->write(block1));
-    static_cast<void>(spill_block_writer->write(block2));
-    static_cast<void>(spill_block_writer->close());
-
-    vectorized::BlockSpillReaderUPtr spill_block_reader;
-    
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
-                                                      spill_block_reader, 
profile_));
-
-    vectorized::Block block_read;
-    bool eos = false;
-
-    for (int i = 0; i < batch_num; ++i) {
-        static_cast<void>(spill_block_reader->read(&block_read, &eos));
-        EXPECT_EQ(block_read.rows(), batch_size);
-        auto column = block_read.get_by_position(0).column;
-        auto* real_column = (vectorized::ColumnVector<int>*)column.get();
-        for (size_t j = 0; j < batch_size; ++j) {
-            EXPECT_EQ(real_column->get_int(j), j + i * batch_size);
-        }
-    }
-
-    static_cast<void>(spill_block_reader->read(&block_read, &eos));
-    static_cast<void>(spill_block_reader->close());
-
-    EXPECT_EQ(block_read.rows(), 1);
-    auto column = block_read.get_by_position(0).column;
-    auto* real_column = (vectorized::ColumnVector<int>*)column.get();
-    EXPECT_EQ(real_column->get_int(0), 0);
-}
-
-TEST_F(TestBlockSpill, TestIntNullable) {
-    int batch_size = 3; // rows in a block
-    int batch_num = 3;
-    int total_rows = batch_size * batch_num + 1;
-    auto vec = vectorized::ColumnVector<int>::create();
-    auto nullable_vec = vectorized::make_nullable(std::move(vec));
-    auto* raw_nullable_vec = (vectorized::ColumnNullable*)nullable_vec.get();
-    for (int i = 0; i < total_rows; ++i) {
-        if ((i + 1) % batch_size == 0) {
-            raw_nullable_vec->insert_data(nullptr, 0);
-        } else {
-            raw_nullable_vec->insert_data((const char*)&i, 4);
-        }
-    }
-    auto data_type = 
vectorized::make_nullable(std::make_shared<vectorized::DataTypeInt32>());
-    vectorized::ColumnWithTypeAndName type_and_name(nullable_vec->get_ptr(), 
data_type,
-                                                    
"spill_block_test_int_nullable");
-    vectorized::Block block({type_and_name});
-
-    vectorized::BlockSpillWriterUPtr spill_block_writer;
-    static_cast<void>(block_spill_manager->get_writer(batch_size, 
spill_block_writer, profile_));
-    static_cast<void>(spill_block_writer->write(block));
-    static_cast<void>(spill_block_writer->close());
-
-    vectorized::BlockSpillReaderUPtr spill_block_reader;
-    
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
-                                                      spill_block_reader, 
profile_));
-
-    vectorized::Block block_read;
-    bool eos = false;
-
-    for (int i = 0; i < batch_num; ++i) {
-        static_cast<void>(spill_block_reader->read(&block_read, &eos));
-
-        EXPECT_EQ(block_read.rows(), batch_size);
-        auto column = block_read.get_by_position(0).column;
-        auto* real_column = (vectorized::ColumnNullable*)column.get();
-        const auto& int_column =
-                (const 
vectorized::ColumnVector<int>&)(real_column->get_nested_column());
-        for (size_t j = 0; j < batch_size; ++j) {
-            if ((j + 1) % batch_size == 0) {
-                ASSERT_TRUE(real_column->is_null_at(j));
-            } else {
-                EXPECT_EQ(int_column.get_int(j), j + i * batch_size);
-            }
-        }
-    }
-
-    static_cast<void>(spill_block_reader->read(&block_read, &eos));
-    static_cast<void>(spill_block_reader->close());
-
-    EXPECT_EQ(block_read.rows(), 1);
-    auto column = block_read.get_by_position(0).column;
-    auto* real_column = (vectorized::ColumnNullable*)column.get();
-    const auto& int_column =
-            (const 
vectorized::ColumnVector<int>&)(real_column->get_nested_column());
-    EXPECT_EQ(int_column.get_int(0), batch_size * 3);
-}
-TEST_F(TestBlockSpill, TestString) {
-    int batch_size = 3; // rows in a block
-    int batch_num = 3;
-    int total_rows = batch_size * batch_num + 1;
-    auto strcol = vectorized::ColumnString::create();
-    for (int i = 0; i < total_rows; ++i) {
-        std::string is = std::to_string(i);
-        strcol->insert_data(is.c_str(), is.size());
-    }
-    vectorized::DataTypePtr 
string_type(std::make_shared<vectorized::DataTypeString>());
-    vectorized::ColumnWithTypeAndName test_string(strcol->get_ptr(), 
string_type,
-                                                  "spill_block_test_string");
-    vectorized::Block block({test_string});
-
-    vectorized::BlockSpillWriterUPtr spill_block_writer;
-    static_cast<void>(block_spill_manager->get_writer(batch_size, 
spill_block_writer, profile_));
-    Status st = spill_block_writer->write(block);
-    static_cast<void>(spill_block_writer->close());
-    EXPECT_TRUE(st.ok());
-
-    vectorized::BlockSpillReaderUPtr spill_block_reader;
-    
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
-                                                      spill_block_reader, 
profile_));
-
-    vectorized::Block block_read;
-    bool eos = false;
-
-    for (int i = 0; i < batch_num; ++i) {
-        st = spill_block_reader->read(&block_read, &eos);
-        EXPECT_TRUE(st.ok());
-
-        EXPECT_EQ(block_read.rows(), batch_size);
-        auto column = block_read.get_by_position(0).column;
-        auto* real_column = (vectorized::ColumnString*)column.get();
-        for (size_t j = 0; j < batch_size; ++j) {
-            EXPECT_EQ(real_column->get_data_at(j), StringRef(std::to_string(j 
+ i * batch_size)));
-        }
-    }
-
-    static_cast<void>(spill_block_reader->read(&block_read, &eos));
-    static_cast<void>(spill_block_reader->close());
-
-    EXPECT_EQ(block_read.rows(), 1);
-    auto column = block_read.get_by_position(0).column;
-    auto* real_column = (vectorized::ColumnString*)column.get();
-    EXPECT_EQ(real_column->get_data_at(0), StringRef(std::to_string(batch_size 
* 3)));
-}
-TEST_F(TestBlockSpill, TestStringNullable) {
-    int batch_size = 3; // rows in a block
-    int batch_num = 3;
-    int total_rows = batch_size * batch_num + 1;
-    auto strcol = vectorized::ColumnString::create();
-    auto nullable_vec = vectorized::make_nullable(std::move(strcol));
-    auto* raw_nullable_vec = (vectorized::ColumnNullable*)nullable_vec.get();
-    for (int i = 0; i < total_rows; ++i) {
-        if ((i + 1) % batch_size == 0) {
-            raw_nullable_vec->insert_data(nullptr, 0);
-        } else {
-            std::string is = std::to_string(i);
-            raw_nullable_vec->insert_data(is.c_str(), is.size());
-        }
-    }
-    auto data_type = 
vectorized::make_nullable(std::make_shared<vectorized::DataTypeString>());
-    vectorized::ColumnWithTypeAndName type_and_name(nullable_vec->get_ptr(), 
data_type,
-                                                    
"spill_block_test_string_nullable");
-    vectorized::Block block({type_and_name});
-
-    vectorized::BlockSpillWriterUPtr spill_block_writer;
-    static_cast<void>(block_spill_manager->get_writer(batch_size, 
spill_block_writer, profile_));
-    Status st = spill_block_writer->write(block);
-    static_cast<void>(spill_block_writer->close());
-    EXPECT_TRUE(st.ok());
-
-    vectorized::BlockSpillReaderUPtr spill_block_reader;
-    
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
-                                                      spill_block_reader, 
profile_));
-
-    vectorized::Block block_read;
-    bool eos = false;
-
-    for (int i = 0; i < batch_num; ++i) {
-        st = spill_block_reader->read(&block_read, &eos);
-        EXPECT_TRUE(st.ok());
-
-        EXPECT_EQ(block_read.rows(), batch_size);
-        auto column = block_read.get_by_position(0).column;
-        auto* real_column = (vectorized::ColumnNullable*)column.get();
-        const auto& string_column =
-                (const 
vectorized::ColumnString&)(real_column->get_nested_column());
-        for (size_t j = 0; j < batch_size; ++j) {
-            if ((j + 1) % batch_size == 0) {
-                ASSERT_TRUE(real_column->is_null_at(j));
-            } else {
-                EXPECT_EQ(string_column.get_data_at(j),
-                          StringRef(std::to_string(j + i * batch_size)));
-            }
-        }
-    }
-
-    st = spill_block_reader->read(&block_read, &eos);
-    static_cast<void>(spill_block_reader->close());
-    EXPECT_TRUE(st.ok());
-
-    EXPECT_EQ(block_read.rows(), 1);
-    auto column = block_read.get_by_position(0).column;
-    auto* real_column = (vectorized::ColumnNullable*)column.get();
-    const auto& string_column = (const 
vectorized::ColumnString&)(real_column->get_nested_column());
-    EXPECT_EQ(string_column.get_data_at(0), 
StringRef(std::to_string(batch_size * 3)));
-}
-TEST_F(TestBlockSpill, TestDecimal) {
-    int batch_size = 3; // rows in a block
-    int batch_num = 3;
-    int total_rows = batch_size * batch_num + 1;
-
-    vectorized::DataTypePtr 
decimal_data_type(doris::vectorized::create_decimal(27, 9, true));
-    auto decimal_column = decimal_data_type->create_column();
-    auto& decimal_data = 
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)
-                                  decimal_column.get())
-                                 ->get_data();
-    for (int i = 0; i < total_rows; ++i) {
-        __int128_t value = __int128_t(i * pow(10, 9) + i * pow(10, 8));
-        decimal_data.push_back(value);
-    }
-    vectorized::ColumnWithTypeAndName test_decimal(decimal_column->get_ptr(), 
decimal_data_type,
-                                                   "spill_block_test_decimal");
-    vectorized::Block block({test_decimal});
-
-    vectorized::BlockSpillWriterUPtr spill_block_writer;
-    static_cast<void>(block_spill_manager->get_writer(batch_size, 
spill_block_writer, profile_));
-    auto st = spill_block_writer->write(block);
-    static_cast<void>(spill_block_writer->close());
-    EXPECT_TRUE(st.ok());
-
-    vectorized::BlockSpillReaderUPtr spill_block_reader;
-    
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
-                                                      spill_block_reader, 
profile_));
-
-    vectorized::Block block_read;
-    bool eos = false;
-
-    for (int i = 0; i < batch_num; ++i) {
-        st = spill_block_reader->read(&block_read, &eos);
-        EXPECT_TRUE(st.ok());
-
-        EXPECT_EQ(block_read.rows(), batch_size);
-        auto column = block_read.get_by_position(0).column;
-        auto* real_column =
-                
(vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)column.get();
-        for (size_t j = 0; j < batch_size; ++j) {
-            __int128_t value = __int128_t((j + i * batch_size) * (pow(10, 9) + 
pow(10, 8)));
-            EXPECT_EQ(real_column->get_element(j).value, value);
-        }
-    }
-
-    st = spill_block_reader->read(&block_read, &eos);
-    static_cast<void>(spill_block_reader->close());
-    EXPECT_TRUE(st.ok());
-
-    EXPECT_EQ(block_read.rows(), 1);
-    auto column = block_read.get_by_position(0).column;
-    auto* real_column =
-            
(vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)column.get();
-    EXPECT_EQ(real_column->get_element(0).value, batch_size * 3 * (pow(10, 9) 
+ pow(10, 8)));
-}
-TEST_F(TestBlockSpill, TestDecimalNullable) {
-    int batch_size = 3; // rows in a block
-    int batch_num = 3;
-    int total_rows = batch_size * batch_num + 1;
-
-    vectorized::DataTypePtr 
decimal_data_type(doris::vectorized::create_decimal(27, 9, true));
-    auto base_col = 
vectorized::make_nullable(decimal_data_type->create_column());
-    auto* nullable_col = (vectorized::ColumnNullable*)base_col.get();
-    for (int i = 0; i < total_rows; ++i) {
-        if ((i + 1) % batch_size == 0) {
-            nullable_col->insert_data(nullptr, 0);
-        } else {
-            __int128_t value = __int128_t(i * pow(10, 9) + i * pow(10, 8));
-            nullable_col->insert_data((const char*)&value, sizeof(value));
-        }
-    }
-    auto data_type = vectorized::make_nullable(decimal_data_type);
-    vectorized::ColumnWithTypeAndName type_and_name(nullable_col->get_ptr(), 
data_type,
-                                                    
"spill_block_test_decimal_nullable");
-    vectorized::Block block({type_and_name});
-
-    vectorized::BlockSpillWriterUPtr spill_block_writer;
-    static_cast<void>(block_spill_manager->get_writer(batch_size, 
spill_block_writer, profile_));
-    auto st = spill_block_writer->write(block);
-    static_cast<void>(spill_block_writer->close());
-    EXPECT_TRUE(st.ok());
-
-    vectorized::BlockSpillReaderUPtr spill_block_reader;
-    
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
-                                                      spill_block_reader, 
profile_));
-
-    vectorized::Block block_read;
-    bool eos = false;
-
-    for (int i = 0; i < batch_num; ++i) {
-        st = spill_block_reader->read(&block_read, &eos);
-        EXPECT_TRUE(st.ok());
-
-        EXPECT_EQ(block_read.rows(), batch_size);
-        auto column = block_read.get_by_position(0).column;
-        auto* real_column = (vectorized::ColumnNullable*)column.get();
-        const auto& decimal_col = 
(vectorized::ColumnDecimal<vectorized::Decimal<
-                                           
vectorized::Int128>>&)(real_column->get_nested_column());
-        for (size_t j = 0; j < batch_size; ++j) {
-            if ((j + 1) % batch_size == 0) {
-                ASSERT_TRUE(real_column->is_null_at(j));
-            } else {
-                __int128_t value = __int128_t((j + i * batch_size) * (pow(10, 
9) + pow(10, 8)));
-                EXPECT_EQ(decimal_col.get_element(j).value, value);
-            }
-        }
-    }
-
-    st = spill_block_reader->read(&block_read, &eos);
-    static_cast<void>(spill_block_reader->close());
-    EXPECT_TRUE(st.ok());
-
-    EXPECT_EQ(block_read.rows(), 1);
-    auto column = block_read.get_by_position(0).column;
-    auto* real_column = (vectorized::ColumnNullable*)column.get();
-    const auto& decimal_col =
-            (vectorized::ColumnDecimal<
-                    
vectorized::Decimal<vectorized::Int128>>&)(real_column->get_nested_column());
-    EXPECT_EQ(decimal_col.get_element(0).value, batch_size * 3 * (pow(10, 9) + 
pow(10, 8)));
-}
-std::string convert_bitmap_to_string(BitmapValue& bitmap);
-TEST_F(TestBlockSpill, TestBitmap) {
-    int batch_size = 3; // rows in a block
-    int batch_num = 3;
-    int total_rows = batch_size * batch_num + 1;
-
-    vectorized::DataTypePtr 
bitmap_data_type(std::make_shared<vectorized::DataTypeBitMap>());
-    auto bitmap_column = bitmap_data_type->create_column();
-    std::vector<BitmapValue>& container =
-            ((vectorized::ColumnBitmap*)bitmap_column.get())->get_data();
-    std::vector<std::string> expected_bitmap_str;
-    for (int i = 0; i < total_rows; ++i) {
-        BitmapValue bv;
-        for (int j = 0; j <= i; ++j) {
-            bv.add(j);
-        }
-        expected_bitmap_str.emplace_back(convert_bitmap_to_string(bv));
-        container.push_back(bv);
-    }
-    vectorized::ColumnWithTypeAndName type_and_name(bitmap_column->get_ptr(), 
bitmap_data_type,
-                                                    "spill_block_test_bitmap");
-    vectorized::Block block({type_and_name});
-
-    vectorized::BlockSpillWriterUPtr spill_block_writer;
-    static_cast<void>(block_spill_manager->get_writer(batch_size, 
spill_block_writer, profile_));
-    auto st = spill_block_writer->write(block);
-    static_cast<void>(spill_block_writer->close());
-    EXPECT_TRUE(st.ok());
-
-    vectorized::BlockSpillReaderUPtr spill_block_reader;
-    
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
-                                                      spill_block_reader, 
profile_));
-
-    vectorized::Block block_read;
-    bool eos = false;
-
-    for (int i = 0; i < batch_num; ++i) {
-        st = spill_block_reader->read(&block_read, &eos);
-        EXPECT_TRUE(st.ok());
-
-        EXPECT_EQ(block_read.rows(), batch_size);
-        auto column = block_read.get_by_position(0).column;
-        auto* real_column = (vectorized::ColumnBitmap*)column.get();
-        for (size_t j = 0; j < batch_size; ++j) {
-            auto bitmap_str = 
convert_bitmap_to_string(real_column->get_element(j));
-            EXPECT_EQ(bitmap_str, expected_bitmap_str[j + i * batch_size]);
-        }
-    }
-
-    st = spill_block_reader->read(&block_read, &eos);
-    static_cast<void>(spill_block_reader->close());
-    EXPECT_TRUE(st.ok());
-
-    EXPECT_EQ(block_read.rows(), 1);
-    auto column = block_read.get_by_position(0).column;
-    auto* real_column = (vectorized::ColumnBitmap*)column.get();
-    auto bitmap_str = convert_bitmap_to_string(real_column->get_element(0));
-    EXPECT_EQ(bitmap_str, expected_bitmap_str[3 * batch_size]);
-}
-} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to