jacktengg commented on code in PR #61212:
URL: https://github.com/apache/doris/pull/61212#discussion_r2937827243


##########
be/src/exec/spill/spill_file_reader.cpp:
##########
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/spill/spill_file_reader.h"
+
+#include <glog/logging.h>
+
+#include <algorithm>
+
+#include "common/cast_set.h"
+#include "common/exception.h"
+#include "core/block/block.h"
+#include "exec/spill/spill_file_manager.h"
+#include "io/file_factory.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/local_file_system.h"
+#include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "runtime/runtime_state.h"
+#include "util/debug_points.h"
+#include "util/slice.h"
+namespace doris {
+#include "common/compile_check_begin.h"
+namespace io {
+class FileSystem;
+} // namespace io
+
+SpillFileReader::SpillFileReader(RuntimeState* state, RuntimeProfile* profile,
+                                 std::string spill_dir, size_t part_count)
+        : _spill_dir(std::move(spill_dir)),
+          _part_count(part_count),
+          _resource_ctx(state->get_query_ctx()->resource_ctx()) {
+    // Internalize counter setup
+    RuntimeProfile* custom_profile = profile->get_child("CustomCounters");
+    DCHECK(custom_profile != nullptr);
+    _read_file_timer = custom_profile->get_counter("SpillReadFileTime");
+    _deserialize_timer = 
custom_profile->get_counter("SpillReadDerializeBlockTime");
+    _read_block_count = custom_profile->get_counter("SpillReadBlockCount");
+    _read_block_data_size = custom_profile->get_counter("SpillReadBlockBytes");
+    _read_file_size = custom_profile->get_counter("SpillReadFileBytes");
+    _read_rows_count = custom_profile->get_counter("SpillReadRows");
+    _read_file_count = custom_profile->get_counter("SpillReadFileCount");
+}
+
+Status SpillFileReader::open() {
+    if (_is_open || _part_count == 0) {
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(_open_part(0));
+    _is_open = true;
+    return Status::OK();
+}
+
+Status SpillFileReader::_open_part(size_t part_index) {
+    _close_current_part();
+
+    _current_part_index = part_index;
+    _part_opened = true;
+    std::string part_path = _spill_dir + "/" + std::to_string(part_index);
+
+    SCOPED_TIMER(_read_file_timer);
+    COUNTER_UPDATE(_read_file_count, 1);
+    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(part_path, 
&_file_reader));
+
+    size_t file_size = _file_reader->size();
+    DCHECK(file_size >= 16); // max_sub_block_size + block count
+
+    Slice result((char*)&_part_block_count, sizeof(size_t));
+
+    // read block count
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(_file_reader->read_at(file_size - sizeof(size_t), result, 
&bytes_read));
+    DCHECK(bytes_read == 8);
+
+    // read max sub block size
+    bytes_read = 0;
+    result.data = (char*)&_part_max_sub_block_size;
+    RETURN_IF_ERROR(_file_reader->read_at(file_size - sizeof(size_t) * 2, 
result, &bytes_read));
+    DCHECK(bytes_read == 8);
+
+    // The buffer is used for two purposes:
+    // 1. Reading the block start offsets array (needs _part_block_count * 
sizeof(size_t) bytes)
+    // 2. Reading a single block's serialized data (needs up to 
_part_max_sub_block_size bytes)
+    // We must ensure the buffer is large enough for either case, so take the 
maximum.
+    size_t buff_size = std::max(_part_block_count * sizeof(size_t), 
_part_max_sub_block_size);
+    if (buff_size > _read_buff.size()) {
+        _read_buff.reserve(buff_size);
+    }
+
+    // Read the block start offsets array from the end of the file.
+    // The file layout (from end backwards) is:
+    //   [block count (size_t)]
+    //   [max sub block size (size_t)]
+    //   [block start offsets array (_part_block_count * size_t)]
+    // So the offsets array starts at:
+    //   file_size - (_part_block_count + 2) * sizeof(size_t)
+    size_t read_offset = file_size - (_part_block_count + 2) * sizeof(size_t);
+    result.data = _read_buff.data();
+    result.size = _part_block_count * sizeof(size_t);
+
+    RETURN_IF_ERROR(_file_reader->read_at(read_offset, result, &bytes_read));
+    DCHECK(bytes_read == _part_block_count * sizeof(size_t));
+
+    _block_start_offsets.resize(_part_block_count + 1);
+    for (size_t i = 0; i < _part_block_count; ++i) {
+        _block_start_offsets[i] = *(size_t*)(result.data + i * sizeof(size_t));
+    }
+    _block_start_offsets[_part_block_count] = file_size - (_part_block_count + 
2) * sizeof(size_t);
+
+    _part_read_block_index = 0;
+    return Status::OK();
+}
+
+void SpillFileReader::_close_current_part() {
+    if (_file_reader) {
+        (void)_file_reader->close();
+        _file_reader.reset();
+    }
+    _part_block_count = 0;
+    _part_read_block_index = 0;
+    _part_max_sub_block_size = 0;
+    _block_start_offsets.clear();
+    _part_opened = false;
+}
+
+Status SpillFileReader::read(Block* block, bool* eos) {
+    DBUG_EXECUTE_IF("fault_inject::spill_file::read_next_block", {
+        return Status::InternalError("fault_inject spill_file read_next_block 
failed");
+    });
+    block->clear_column_data();
+
+    if (_part_count == 0) {
+        *eos = true;
+        return Status::OK();
+    }
+
+    // Advance to next part if current part is exhausted
+    while (_part_read_block_index >= _part_block_count) {
+        size_t next_part = _part_opened ? _current_part_index + 1 : 0;
+        if (next_part >= _part_count) {
+            *eos = true;
+            return Status::OK();
+        }
+        RETURN_IF_ERROR(_open_part(next_part));
+    }
+
+    size_t bytes_to_read = _block_start_offsets[_part_read_block_index + 1] -
+                           _block_start_offsets[_part_read_block_index];
+
+    if (bytes_to_read == 0) {
+        ++_part_read_block_index;
+        *eos = false;
+        return Status::OK();
+    }
+
+    Slice result(_read_buff.data(), bytes_to_read);
+    size_t bytes_read = 0;
+    {
+        SCOPED_TIMER(_read_file_timer);
+        
RETURN_IF_ERROR(_file_reader->read_at(_block_start_offsets[_part_read_block_index],
 result,
+                                              &bytes_read));
+    }
+    DCHECK(bytes_read == bytes_to_read);
+
+    if (bytes_read > 0) {
+        COUNTER_UPDATE(_read_file_size, bytes_read);
+        
ExecEnv::GetInstance()->spill_file_mgr()->update_spill_read_bytes(bytes_read);
+        if (_resource_ctx) {
+            
_resource_ctx->io_context()->update_spill_read_bytes_from_local_storage(bytes_read);
+        }
+        COUNTER_UPDATE(_read_block_count, 1);
+        {
+            SCOPED_TIMER(_deserialize_timer);
+            if (!_pb_block.ParseFromArray(result.data, 
cast_set<int>(result.size))) {
+                return Status::InternalError("Failed to read spilled block");
+            }
+            size_t uncompressed_size = 0;
+            int64_t uncompressed_time = 0;
+            RETURN_IF_ERROR(block->deserialize(_pb_block, &uncompressed_size, 
&uncompressed_time));
+        }
+        COUNTER_UPDATE(_read_block_data_size, block->bytes());
+        COUNTER_UPDATE(_read_rows_count, block->rows());
+    } else {
+        block->clear_column_data();
+    }
+
+    ++_part_read_block_index;
+    *eos = false;
+    return Status::OK();
+}
+
+void SpillFileReader::seek(size_t block_index) {
+    auto st = _seek_to_block(block_index);
+    DCHECK(st.ok()) << "SpillFileReader::seek failed, block_index=" << 
block_index
+                    << ", error=" << st.to_string();

Review Comment:
   status should not be ignored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to