This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 2684cc03c1e branch-3.1: [Feature](orc-reader) Implement new merge io 
facility for orc reader. (#52085)
2684cc03c1e is described below

commit 2684cc03c1ec0702134ead5bdc7071477bdd566f
Author: Qi Chen <[email protected]>
AuthorDate: Wed Jun 25 14:33:56 2025 +0800

    branch-3.1: [Feature](orc-reader) Implement new merge io facility for orc 
reader. (#52085)
    
    Cherry-pick main PR: #45966, Fix bugs PR: #50185 #51102
---
 .gitmodules                                        |   2 +-
 be/src/apache-orc                                  |   2 +-
 be/src/vec/exec/format/orc/orc_file_reader.cpp     | 106 +++++++++++++
 be/src/vec/exec/format/orc/orc_file_reader.h       |  88 +++++++++++
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 167 ++++++++++++++++----
 be/src/vec/exec/format/orc/vorc_reader.h           |  86 ++++++++++-
 .../vec/exec/format/orc/orc_file_reader_test.cpp   | 170 +++++++++++++++++++++
 build.sh                                           |   2 +-
 .../hive/test_orc_merge_io_input_streams.out       | Bin 0 -> 123 bytes
 .../hive/test_orc_merge_io_input_streams.groovy    |  52 +++++++
 10 files changed, 641 insertions(+), 34 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index 80afde96cd7..ead4b2380bf 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -19,7 +19,7 @@
 [submodule "be/src/apache-orc"]
        path = be/src/apache-orc
        url = https://github.com/apache/doris-thirdparty.git
-       branch = orc-for-doris-21
+       branch = orc
 [submodule "be/src/clucene"]
        path = be/src/clucene
        url = https://github.com/apache/doris-thirdparty.git
diff --git a/be/src/apache-orc b/be/src/apache-orc
index 0182042e141..18fb8e2c288 160000
--- a/be/src/apache-orc
+++ b/be/src/apache-orc
@@ -1 +1 @@
-Subproject commit 0182042e141250802b1a6c1d7a5317b0055c776b
+Subproject commit 18fb8e2c2888a3518bf2bbd905f60772f4754739
diff --git a/be/src/vec/exec/format/orc/orc_file_reader.cpp 
b/be/src/vec/exec/format/orc/orc_file_reader.cpp
new file mode 100644
index 00000000000..6f1411563e7
--- /dev/null
+++ b/be/src/vec/exec/format/orc/orc_file_reader.cpp
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/orc/orc_file_reader.h"
+
+#include "util/runtime_profile.h"
+
+namespace doris {
+namespace vectorized {
+
+OrcMergeRangeFileReader::OrcMergeRangeFileReader(RuntimeProfile* profile,
+                                                 io::FileReaderSPtr 
inner_reader,
+                                                 io::PrefetchRange range)
+        : _profile(profile), _inner_reader(std::move(inner_reader)), 
_range(std::move(range)) {
+    _size = _inner_reader->size();
+    _statistics.apply_bytes += range.end_offset - range.start_offset;
+    if (_profile != nullptr) {
+        const char* random_profile = "MergedSmallIO";
+        ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
+        _copy_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "CopyTime", 
random_profile, 1);
+        _read_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadTime", 
random_profile, 1);
+        _request_io =
+                ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", 
TUnit::UNIT, random_profile, 1);
+        _merged_io =
+                ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedIO", 
TUnit::UNIT, random_profile, 1);
+        _request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, 
"RequestBytes", TUnit::BYTES,
+                                                      random_profile, 1);
+        _merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes", 
TUnit::BYTES,
+                                                     random_profile, 1);
+        _apply_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ApplyBytes", 
TUnit::BYTES,
+                                                    random_profile, 1);
+    }
+}
+
+Status OrcMergeRangeFileReader::read_at_impl(size_t offset, Slice result, 
size_t* bytes_read,
+                                             const io::IOContext* io_ctx) {
+    auto request_size = result.size;
+
+    _statistics.request_io++;
+    _statistics.request_bytes += request_size;
+
+    if (request_size == 0) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+
+    if (_cache == nullptr) {
+        auto range_size = _range.end_offset - _range.start_offset;
+        _cache = std::make_unique<char[]>(range_size);
+
+        {
+            SCOPED_RAW_TIMER(&_statistics.read_time);
+            Slice cache_slice = {_cache.get(), range_size};
+            RETURN_IF_ERROR(
+                    _inner_reader->read_at(_range.start_offset, cache_slice, 
bytes_read, io_ctx));
+            _statistics.merged_io++;
+            _statistics.merged_bytes += *bytes_read;
+        }
+
+        if (*bytes_read != range_size) [[unlikely]] {
+            return Status::InternalError(
+                    "OrcMergeRangeFileReader use inner reader read bytes {} 
not eq expect size {}",
+                    *bytes_read, range_size);
+        }
+
+        _current_start_offset = _range.start_offset;
+    }
+
+    SCOPED_RAW_TIMER(&_statistics.copy_time);
+    int64_t buffer_offset = offset - _current_start_offset;
+    memcpy(result.data, _cache.get() + buffer_offset, request_size);
+    *bytes_read = request_size;
+    return Status::OK();
+}
+
+void OrcMergeRangeFileReader::_collect_profile_before_close() {
+    if (_profile != nullptr) {
+        COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+        COUNTER_UPDATE(_read_time, _statistics.read_time);
+        COUNTER_UPDATE(_request_io, _statistics.request_io);
+        COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+        COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+        COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
+        COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
+        if (_inner_reader != nullptr) {
+            _inner_reader->collect_profile_before_close();
+        }
+    }
+}
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/format/orc/orc_file_reader.h 
b/be/src/vec/exec/format/orc/orc_file_reader.h
new file mode 100644
index 00000000000..d9d90f3e6e4
--- /dev/null
+++ b/be/src/vec/exec/format/orc/orc_file_reader.h
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "io/fs/buffered_reader.h"
+#include "io/fs/file_reader.h"
+
+namespace doris {
+namespace vectorized {
+
+class OrcMergeRangeFileReader : public io::FileReader {
+public:
+    struct Statistics {
+        int64_t copy_time = 0;
+        int64_t read_time = 0;
+        int64_t request_io = 0;
+        int64_t merged_io = 0;
+        int64_t request_bytes = 0;
+        int64_t merged_bytes = 0;
+        int64_t apply_bytes = 0;
+    };
+
+    OrcMergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr 
inner_reader,
+                            io::PrefetchRange range);
+
+    ~OrcMergeRangeFileReader() override = default;
+
+    Status close() override {
+        if (!_closed) {
+            _closed = true;
+        }
+        return Status::OK();
+    }
+
+    const io::Path& path() const override { return _inner_reader->path(); }
+
+    size_t size() const override { return _size; }
+
+    bool closed() const override { return _closed; }
+
+    // for test only
+    const Statistics& statistics() const { return _statistics; }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const io::IOContext* io_ctx) override;
+
+    void _collect_profile_before_close() override;
+
+private:
+    RuntimeProfile::Counter* _copy_time = nullptr;
+    RuntimeProfile::Counter* _read_time = nullptr;
+    RuntimeProfile::Counter* _request_io = nullptr;
+    RuntimeProfile::Counter* _merged_io = nullptr;
+    RuntimeProfile::Counter* _request_bytes = nullptr;
+    RuntimeProfile::Counter* _merged_bytes = nullptr;
+    RuntimeProfile::Counter* _apply_bytes = nullptr;
+
+    RuntimeProfile* _profile;
+    io::FileReaderSPtr _inner_reader;
+    io::PrefetchRange _range;
+
+    std::unique_ptr<char[]> _cache;
+    int64_t _current_start_offset = -1;
+
+    size_t _size;
+    bool _closed = false;
+
+    Statistics _statistics;
+};
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 4391917cf29..c50e1fa8883 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -71,6 +71,7 @@
 #include "vec/data_types/data_type_map.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_struct.h"
+#include "vec/exec/format/orc/orc_file_reader.h"
 #include "vec/exec/format/orc/orc_memory_pool.h"
 #include "vec/exec/format/table/transactional_hive_common.h"
 #include "vec/exprs/vbloom_predicate.h"
@@ -141,6 +142,34 @@ void ORCFileInputStream::read(void* buf, uint64_t length, 
uint64_t offset) {
     }
 }
 
+void StripeStreamInputStream::read(void* buf, uint64_t length, uint64_t 
offset) {
+    _statistics->fs_read_calls++;
+    _statistics->fs_read_bytes += length;
+    SCOPED_RAW_TIMER(&_statistics->fs_read_time);
+    uint64_t has_read = 0;
+    char* out = reinterpret_cast<char*>(buf);
+    while (has_read < length) {
+        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+            throw orc::ParseError("stop");
+        }
+        size_t loop_read;
+        Slice result(out + has_read, length - has_read);
+        Status st = _inner_reader->read_at(offset + has_read, result, 
&loop_read, _io_ctx);
+        if (!st.ok()) {
+            throw orc::ParseError(
+                    strings::Substitute("Failed to read $0: $1", _file_name, 
st.to_string()));
+        }
+        if (loop_read == 0) {
+            break;
+        }
+        has_read += loop_read;
+    }
+    if (has_read != length) {
+        throw orc::ParseError(strings::Substitute("Try to read $0 bytes from 
$1, actually read $2",
+                                                  length, has_read, 
_file_name));
+    }
+}
+
 OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
                      const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
                      size_t batch_size, const std::string& ctz, io::IOContext* 
io_ctx,
@@ -249,7 +278,8 @@ Status OrcReader::_create_file_reader() {
                 _profile, _system_properties, _file_description, 
reader_options,
                 io::DelegateReader::AccessMode::RANDOM, _io_ctx));
         _file_input_stream = std::make_unique<ORCFileInputStream>(
-                _scan_range.path, std::move(inner_reader), &_statistics, 
_io_ctx, _profile);
+                _scan_range.path, std::move(inner_reader), &_statistics, 
_io_ctx, _profile,
+                _orc_once_max_read_bytes, _orc_max_merge_distance_bytes);
     }
     if (_file_input_stream->getLength() == 0) {
         return Status::EndOfFile("empty orc file: " + _scan_range.path);
@@ -299,6 +329,13 @@ Status OrcReader::init_reader(
     }
     _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
     _obj_pool = std::make_shared<ObjectPool>();
+
+    if (_state != nullptr) {
+        _orc_tiny_stripe_threshold_bytes = 
_state->query_options().orc_tiny_stripe_threshold_bytes;
+        _orc_once_max_read_bytes = 
_state->query_options().orc_once_max_read_bytes;
+        _orc_max_merge_distance_bytes = 
_state->query_options().orc_max_merge_distance_bytes;
+    }
+
     {
         SCOPED_RAW_TIMER(&_statistics.create_reader_time);
         RETURN_IF_ERROR(_create_file_reader());
@@ -904,18 +941,6 @@ Status OrcReader::set_fill_columns(
 
         int64_t range_end_offset = _range_start_offset + _range_size;
 
-        // If you set "orc_tiny_stripe_threshold_bytes" = 0, the use tiny 
stripes merge io optimization will not be used.
-        int64_t orc_tiny_stripe_threshold_bytes = 8L * 1024L * 1024L;
-        int64_t orc_once_max_read_bytes = 8L * 1024L * 1024L;
-        int64_t orc_max_merge_distance_bytes = 1L * 1024L * 1024L;
-
-        if (_state != nullptr) {
-            orc_tiny_stripe_threshold_bytes =
-                    _state->query_options().orc_tiny_stripe_threshold_bytes;
-            orc_once_max_read_bytes = 
_state->query_options().orc_once_max_read_bytes;
-            orc_max_merge_distance_bytes = 
_state->query_options().orc_max_merge_distance_bytes;
-        }
-
         bool all_tiny_stripes = true;
         std::vector<io::PrefetchRange> tiny_stripe_ranges;
 
@@ -928,7 +953,7 @@ Status OrcReader::set_fill_columns(
                 !all_stripes_needed[i]) {
                 continue;
             }
-            if (strip_info->getLength() > orc_tiny_stripe_threshold_bytes) {
+            if (strip_info->getLength() > _orc_tiny_stripe_threshold_bytes) {
                 all_tiny_stripes = false;
                 break;
             }
@@ -938,8 +963,8 @@ Status OrcReader::set_fill_columns(
         if (all_tiny_stripes && number_of_stripes > 0) {
             std::vector<io::PrefetchRange> prefetch_merge_ranges =
                     
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
-                                                                 
orc_max_merge_distance_bytes,
-                                                                 
orc_once_max_read_bytes);
+                                                                 
_orc_max_merge_distance_bytes,
+                                                                 
_orc_once_max_read_bytes);
             auto range_finder =
                     
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));
 
@@ -2485,17 +2510,23 @@ MutableColumnPtr 
OrcReader::_convert_dict_column_to_string_column(
 
 void ORCFileInputStream::beforeReadStripe(
         std::unique_ptr<orc::StripeInformation> current_strip_information,
-        std::vector<bool> selected_columns) {
+        const std::vector<bool>& selected_columns,
+        std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams) {
     if (_is_all_tiny_stripes) {
         return;
     }
     if (_file_reader != nullptr) {
         _file_reader->collect_profile_before_close();
     }
-    // Generate prefetch ranges, build stripe file reader.
+    for (const auto& stripe_stream : _stripe_streams) {
+        if (stripe_stream != nullptr) {
+            stripe_stream->collect_profile_before_close();
+        }
+    }
+    _stripe_streams.clear();
+
     uint64_t offset = current_strip_information->getOffset();
-    std::vector<io::PrefetchRange> prefetch_ranges;
-    size_t total_io_size = 0;
+    std::unordered_map<orc::StreamId, io::PrefetchRange> prefetch_ranges;
     for (uint64_t stream_id = 0; stream_id < 
current_strip_information->getNumberOfStreams();
          ++stream_id) {
         std::unique_ptr<orc::StreamInformation> stream =
@@ -2503,19 +2534,91 @@ void ORCFileInputStream::beforeReadStripe(
         uint32_t columnId = stream->getColumnId();
         uint64_t length = stream->getLength();
         if (selected_columns[columnId]) {
-            total_io_size += length;
             doris::io::PrefetchRange prefetch_range = {offset, offset + 
length};
-            prefetch_ranges.emplace_back(std::move(prefetch_range));
+            orc::StreamId streamId(stream->getColumnId(), stream->getKind());
+            prefetch_ranges.emplace(std::move(streamId), 
std::move(prefetch_range));
         }
         offset += length;
     }
-    size_t num_columns = std::count_if(selected_columns.begin(), 
selected_columns.end(),
-                                       [](bool selected) { return selected; });
-    if (total_io_size / num_columns < io::MergeRangeFileReader::SMALL_IO) {
-        // The underlying page reader will prefetch data in column.
-        _file_reader.reset(new io::MergeRangeFileReader(_profile, 
_inner_reader, prefetch_ranges));
-    } else {
-        _file_reader = _inner_reader;
+    _build_input_stripe_streams(prefetch_ranges, streams);
+}
+
+void ORCFileInputStream::_build_input_stripe_streams(
+        const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+        std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams) {
+    if (ranges.empty()) {
+        return;
+    }
+
+    std::unordered_map<orc::StreamId, io::PrefetchRange> small_ranges;
+    std::unordered_map<orc::StreamId, io::PrefetchRange> large_ranges;
+
+    for (const auto& range : ranges) {
+        if (range.second.end_offset - range.second.start_offset <= 
_orc_once_max_read_bytes) {
+            small_ranges.emplace(range.first, range.second);
+        } else {
+            large_ranges.emplace(range.first, range.second);
+        }
+    }
+
+    _build_small_ranges_input_stripe_streams(small_ranges, streams);
+    _build_large_ranges_input_stripe_streams(large_ranges, streams);
+}
+
+void ORCFileInputStream::_build_small_ranges_input_stripe_streams(
+        const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+        std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams) {
+    std::vector<io::PrefetchRange> all_ranges;
+    all_ranges.reserve(ranges.size());
+    std::transform(ranges.begin(), ranges.end(), 
std::back_inserter(all_ranges),
+                   [](const auto& pair) { return pair.second; });
+    std::sort(all_ranges.begin(), all_ranges.end(),
+              [](const auto& a, const auto& b) { return a.start_offset < 
b.start_offset; });
+
+    auto merged_ranges = io::PrefetchRange::merge_adjacent_seq_ranges(
+            all_ranges, _orc_max_merge_distance_bytes, 
_orc_once_max_read_bytes);
+
+    // Sort ranges by start_offset for efficient searching
+    std::vector<std::pair<orc::StreamId, io::PrefetchRange>> 
sorted_ranges(ranges.begin(),
+                                                                           
ranges.end());
+    std::sort(sorted_ranges.begin(), sorted_ranges.end(), [](const auto& a, 
const auto& b) {
+        return a.second.start_offset < b.second.start_offset;
+    });
+
+    for (const auto& merged_range : merged_ranges) {
+        auto merge_range_file_reader =
+                std::make_shared<OrcMergeRangeFileReader>(_profile, 
_file_reader, merged_range);
+
+        // Use binary search to find the starting point in sorted_ranges
+        auto it =
+                std::lower_bound(sorted_ranges.begin(), sorted_ranges.end(),
+                                 merged_range.start_offset, [](const auto& 
pair, uint64_t offset) {
+                                     return pair.second.start_offset < offset;
+                                 });
+
+        // Iterate from the found starting point
+        for (; it != sorted_ranges.end() && it->second.start_offset < 
merged_range.end_offset;
+             ++it) {
+            if (it->second.end_offset <= merged_range.end_offset) {
+                auto stripe_stream_input_stream = 
std::make_shared<StripeStreamInputStream>(
+                        getName(), merge_range_file_reader, _statistics, 
_io_ctx, _profile);
+                streams.emplace(it->first, stripe_stream_input_stream);
+                _stripe_streams.emplace_back(stripe_stream_input_stream);
+            }
+        }
+    }
+}
+
+void ORCFileInputStream::_build_large_ranges_input_stripe_streams(
+        const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+        std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams) {
+    for (const auto& range : ranges) {
+        auto stripe_stream_input_stream = 
std::make_shared<StripeStreamInputStream>(
+                getName(), _file_reader, _statistics, _io_ctx, _profile);
+        streams.emplace(range.first,
+                        std::make_shared<StripeStreamInputStream>(getName(), 
_file_reader,
+                                                                  _statistics, 
_io_ctx, _profile));
+        _stripe_streams.emplace_back(stripe_stream_input_stream);
     }
 }
 
@@ -2523,7 +2626,13 @@ void ORCFileInputStream::_collect_profile_before_close() 
{
     if (_file_reader != nullptr) {
         _file_reader->collect_profile_before_close();
     }
+    for (const auto& stripe_stream : _stripe_streams) {
+        if (stripe_stream != nullptr) {
+            stripe_stream->collect_profile_before_close();
+        }
+    }
 }
+
 void OrcReader::_execute_filter_position_delete_rowids(IColumn::Filter& 
filter) {
     if (_position_delete_ordered_rowids == nullptr) {
         return;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index 0f09d4e7e28..4ebfb68a22f 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -629,16 +629,73 @@ private:
     std::unordered_map<std::string, std::string> _table_col_to_file_col;
     //support iceberg position delete .
     std::vector<int64_t>* _position_delete_ordered_rowids = nullptr;
+
+    // If you set "orc_tiny_stripe_threshold_bytes" = 0, the use tiny stripes 
merge io optimization will not be used.
+    int64_t _orc_tiny_stripe_threshold_bytes = 8L * 1024L * 1024L;
+    int64_t _orc_once_max_read_bytes = 8L * 1024L * 1024L;
+    int64_t _orc_max_merge_distance_bytes = 1L * 1024L * 1024L;
+};
+
+class StripeStreamInputStream : public orc::InputStream, public 
ProfileCollector {
+public:
+    StripeStreamInputStream(const std::string& file_name, io::FileReaderSPtr 
inner_reader,
+                            OrcReader::Statistics* statistics, const 
io::IOContext* io_ctx,
+                            RuntimeProfile* profile)
+            : _file_name(file_name),
+              _inner_reader(inner_reader),
+              _statistics(statistics),
+              _io_ctx(io_ctx),
+              _profile(profile) {}
+
+    ~StripeStreamInputStream() override {
+        if (_inner_reader != nullptr) {
+            _inner_reader->collect_profile_before_close();
+        }
+    }
+
+    uint64_t getLength() const override { return _inner_reader->size(); }
+
+    uint64_t getNaturalReadSize() const override { return 
config::orc_natural_read_size_mb << 20; }
+
+    void read(void* buf, uint64_t length, uint64_t offset) override;
+
+    const std::string& getName() const override { return _file_name; }
+
+    RuntimeProfile* profile() const { return _profile; }
+
+    void beforeReadStripe(
+            std::unique_ptr<orc::StripeInformation> current_strip_information,
+            const std::vector<bool>& selected_columns,
+            std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams) override {}
+
+protected:
+    void _collect_profile_at_runtime() override {};
+    void _collect_profile_before_close() override {
+        if (_inner_reader != nullptr) {
+            _inner_reader->collect_profile_before_close();
+        }
+    };
+
+private:
+    const std::string& _file_name;
+    io::FileReaderSPtr _inner_reader;
+    // Owned by OrcReader
+    OrcReader::Statistics* _statistics = nullptr;
+    const io::IOContext* _io_ctx = nullptr;
+    RuntimeProfile* _profile = nullptr;
 };
 
 class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
 public:
     ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr 
inner_reader,
                        OrcReader::Statistics* statistics, const io::IOContext* 
io_ctx,
-                       RuntimeProfile* profile)
+                       RuntimeProfile* profile, int64_t 
orc_once_max_read_bytes,
+                       int64_t orc_max_merge_distance_bytes)
             : _file_name(file_name),
               _inner_reader(inner_reader),
               _file_reader(inner_reader),
+              _orc_once_max_read_bytes(orc_once_max_read_bytes),
+              _orc_max_merge_distance_bytes(orc_max_merge_distance_bytes),
               _statistics(statistics),
               _io_ctx(io_ctx),
               _profile(profile) {}
@@ -647,6 +704,12 @@ public:
         if (_file_reader != nullptr) {
             _file_reader->collect_profile_before_close();
         }
+        for (const auto& stripe_stream : _stripe_streams) {
+            if (stripe_stream != nullptr) {
+                stripe_stream->collect_profile_before_close();
+            }
+        }
+        _stripe_streams.clear();
     }
 
     uint64_t getLength() const override { return _file_reader->size(); }
@@ -658,7 +721,9 @@ public:
     const std::string& getName() const override { return _file_name; }
 
     void beforeReadStripe(std::unique_ptr<orc::StripeInformation> 
current_strip_information,
-                          std::vector<bool> selected_columns) override;
+                          const std::vector<bool>& selected_columns,
+                          std::unordered_map<orc::StreamId, 
std::shared_ptr<InputStream>>&
+                                  stripe_streams) override;
 
     void set_all_tiny_stripes() { _is_all_tiny_stripes = true; }
 
@@ -671,10 +736,27 @@ protected:
     void _collect_profile_before_close() override;
 
 private:
+    void _build_input_stripe_streams(
+            const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+            std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams);
+
+    void _build_small_ranges_input_stripe_streams(
+            const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+            std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams);
+
+    void _build_large_ranges_input_stripe_streams(
+            const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+            std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>& 
streams);
+
     const std::string& _file_name;
     io::FileReaderSPtr _inner_reader;
     io::FileReaderSPtr _file_reader;
     bool _is_all_tiny_stripes = false;
+    int64_t _orc_once_max_read_bytes;
+    int64_t _orc_max_merge_distance_bytes;
+
+    std::vector<std::shared_ptr<StripeStreamInputStream>> _stripe_streams;
+
     // Owned by OrcReader
     OrcReader::Statistics* _statistics = nullptr;
     const io::IOContext* _io_ctx = nullptr;
diff --git a/be/test/vec/exec/format/orc/orc_file_reader_test.cpp 
b/be/test/vec/exec/format/orc/orc_file_reader_test.cpp
new file mode 100644
index 00000000000..9e1003c397f
--- /dev/null
+++ b/be/test/vec/exec/format/orc/orc_file_reader_test.cpp
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/orc/orc_file_reader.h"
+
+#include <gtest/gtest.h>
+
+#include "io/fs/local_file_system.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace vectorized {
+
+class MockFileReader : public io::FileReader {
+public:
+    MockFileReader() = default;
+    ~MockFileReader() override = default;
+
+    Status close() override {
+        _closed = true;
+        return Status::OK();
+    }
+
+    const io::Path& path() const override { return _path; }
+
+    size_t size() const override { return _data.size(); }
+
+    bool closed() const override { return _closed; }
+
+    void set_data(const std::string& data) { _data = data; }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const io::IOContext* io_ctx) override {
+        if (offset >= _data.size()) {
+            *bytes_read = 0;
+            return Status::OK();
+        }
+        *bytes_read = std::min(result.size, _data.size() - offset);
+        memcpy(result.data, _data.data() + offset, *bytes_read);
+        return Status::OK();
+    }
+
+private:
+    std::string _data;
+    bool _closed = false;
+    io::Path _path = "/tmp/mock";
+};
+
+class OrcMergeRangeFileReaderTest : public testing::Test {
+protected:
+    void SetUp() override { _mock_reader = std::make_shared<MockFileReader>(); 
}
+
+    std::shared_ptr<MockFileReader> _mock_reader;
+};
+
+TEST_F(OrcMergeRangeFileReaderTest, basic_init) {
+    std::string test_data(1024, 'A');
+    _mock_reader->set_data(test_data);
+
+    io::PrefetchRange range {0, 1024};
+    OrcMergeRangeFileReader reader(nullptr, _mock_reader, range);
+    EXPECT_EQ(1024, reader.size());
+    EXPECT_FALSE(reader.closed());
+}
+
+TEST_F(OrcMergeRangeFileReaderTest, read_with_cache) {
+    std::string test_data(1024, 'A');
+    _mock_reader->set_data(test_data);
+
+    io::PrefetchRange range {0, 1024};
+    const size_t test_size = 128;
+
+    OrcMergeRangeFileReader reader(nullptr, _mock_reader, range);
+
+    // Read from cache
+    char buffer[test_size];
+    Slice result(buffer, test_size);
+    size_t bytes_read = 0;
+
+    // Read from start
+    ASSERT_TRUE(reader.read_at(0, result, &bytes_read, nullptr).ok());
+    EXPECT_EQ(bytes_read, test_size);
+    EXPECT_EQ(std::string(buffer, test_size), std::string(test_size, 'A'));
+
+    // Read from middle
+    ASSERT_TRUE(reader.read_at(512, result, &bytes_read, nullptr).ok());
+    EXPECT_EQ(bytes_read, test_size);
+    EXPECT_EQ(std::string(buffer, test_size), std::string(test_size, 'A'));
+
+    // Verify statistics
+    EXPECT_EQ(reader.statistics().merged_io, 1);
+    EXPECT_EQ(reader.statistics().merged_bytes, 1024);
+}
+
+TEST_F(OrcMergeRangeFileReaderTest, read_empty_data) {
+    _mock_reader->set_data("");
+
+    io::PrefetchRange range {0, 1024};
+    OrcMergeRangeFileReader reader(nullptr, _mock_reader, range);
+
+    char buffer[128];
+    Slice result(buffer, 128);
+    size_t bytes_read = 0;
+
+    ASSERT_FALSE(reader.read_at(0, result, &bytes_read, nullptr).ok());
+    EXPECT_EQ(bytes_read, 0);
+}
+
+TEST_F(OrcMergeRangeFileReaderTest, close) {
+    std::string test_data(1024, 'A');
+    _mock_reader->set_data(test_data);
+
+    io::PrefetchRange range {0, 1024};
+    OrcMergeRangeFileReader reader(nullptr, _mock_reader, range);
+    ASSERT_FALSE(reader.closed());
+
+    ASSERT_TRUE(reader.close().ok());
+    ASSERT_TRUE(reader.closed());
+}
+
+TEST_F(OrcMergeRangeFileReaderTest, multiple_reads_from_cache) {
+    std::string test_data;
+    for (int i = 0; i < 1024; i++) {
+        test_data.push_back(i % 256);
+    }
+    _mock_reader->set_data(test_data);
+
+    io::PrefetchRange range {0, 1024};
+    OrcMergeRangeFileReader reader(nullptr, _mock_reader, range);
+
+    // Perform multiple reads with different sizes and offsets
+    const std::vector<std::pair<size_t, size_t>> read_patterns = {
+            {0, 128},   // Start, 128 bytes
+            {256, 64},  // Middle, 64 bytes
+            {1000, 24}, // Near end, 24 bytes
+            {512, 256}, // Middle, large read
+    };
+
+    for (const auto& pattern : read_patterns) {
+        std::vector<char> buffer(pattern.second);
+        Slice result(buffer.data(), pattern.second);
+        size_t bytes_read = 0;
+
+        ASSERT_TRUE(reader.read_at(pattern.first, result, &bytes_read, 
nullptr).ok());
+        EXPECT_EQ(bytes_read, pattern.second);
+        EXPECT_EQ(memcmp(buffer.data(), test_data.data() + pattern.first, 
pattern.second), 0);
+    }
+
+    // Verify that we only did one actual read
+    EXPECT_EQ(reader.statistics().merged_io, 1);
+    EXPECT_EQ(reader.statistics().merged_bytes, 1024);
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/build.sh b/build.sh
index 594e990821f..88e9ee4c0b0 100755
--- a/build.sh
+++ b/build.sh
@@ -565,7 +565,7 @@ FE_MODULES="$(
 
 # Clean and build Backend
 if [[ "${BUILD_BE}" -eq 1 ]]; then
-    update_submodule "be/src/apache-orc" "apache-orc" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc-for-doris-21.tar.gz";
+    update_submodule "be/src/apache-orc" "apache-orc" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz";
     update_submodule "be/src/clucene" "clucene" 
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene-3.0.tar.gz";
     if [[ -e "${DORIS_HOME}/gensrc/build/gen_cpp/version.h" ]]; then
         rm -f "${DORIS_HOME}/gensrc/build/gen_cpp/version.h"
diff --git 
a/regression-test/data/external_table_p2/hive/test_orc_merge_io_input_streams.out
 
b/regression-test/data/external_table_p2/hive/test_orc_merge_io_input_streams.out
new file mode 100644
index 00000000000..fa812f2afc2
Binary files /dev/null and 
b/regression-test/data/external_table_p2/hive/test_orc_merge_io_input_streams.out
 differ
diff --git 
a/regression-test/suites/external_table_p2/hive/test_orc_merge_io_input_streams.groovy
 
b/regression-test/suites/external_table_p2/hive/test_orc_merge_io_input_streams.groovy
new file mode 100644
index 00000000000..8406bf0b423
--- /dev/null
+++ 
b/regression-test/suites/external_table_p2/hive/test_orc_merge_io_input_streams.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_orc_merge_io_input_streams", 
"p2,external,hive,external_remote,external_remote_hive") {
+
+    String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+    //hudi hive use same catalog in p2.
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable test")
+        return;
+    }
+
+    String props = context.config.otherConfigs.get("hudiEmrCatalog")
+    String hms_catalog_name = "test_orc_merge_io_input_streams"
+
+    sql """drop catalog if exists ${hms_catalog_name};"""
+    sql """
+        CREATE CATALOG IF NOT EXISTS ${hms_catalog_name}
+        PROPERTIES (
+            ${props}
+            ,'hive.version' = '3.1.3'
+        );
+    """
+
+    logger.info("catalog " + hms_catalog_name + " created")
+    sql """switch ${hms_catalog_name};"""
+    logger.info("switched to catalog " + hms_catalog_name)
+    sql """ use regression;"""
+
+    sql """ set dry_run_query=true; """
+
+    qt_1 """  SELECT trace_id as trace_id_sub,created_time FROM 
test_orc_merge_io_input_streams_table
+              where dt = replace(date_sub('2025-04-16', 1), '-', '') and 
trace_id='1210647803'; """
+    qt_2 """ select * from test_orc_merge_io_input_streams_table ; """
+
+    sql """drop catalog ${hms_catalog_name};"""
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to