morningman commented on code in PR #45966:
URL: https://github.com/apache/doris/pull/45966#discussion_r1955522599
##########
be/src/vec/exec/format/orc/vorc_reader.cpp:
##########
@@ -2660,45 +2685,112 @@ MutableColumnPtr
OrcReader::_convert_dict_column_to_string_column(
void ORCFileInputStream::beforeReadStripe(
std::unique_ptr<orc::StripeInformation> current_strip_information,
- std::vector<bool> selected_columns) {
+ std::vector<bool> selected_columns,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) {
if (_is_all_tiny_stripes) {
return;
}
if (_file_reader != nullptr) {
_file_reader->collect_profile_before_close();
}
- // Generate prefetch ranges, build stripe file reader.
+ for (const auto& stripe_stream : _stripe_streams) {
+ if (stripe_stream != nullptr) {
+ stripe_stream->collect_profile_before_close();
+ }
+ }
+ _stripe_streams.clear();
+
uint64_t offset = current_strip_information->getOffset();
- std::vector<io::PrefetchRange> prefetch_ranges;
- size_t total_io_size = 0;
+ std::unordered_map<orc::StreamId, io::PrefetchRange> prefetch_ranges;
for (uint64_t stream_id = 0; stream_id <
current_strip_information->getNumberOfStreams();
++stream_id) {
std::unique_ptr<orc::StreamInformation> stream =
current_strip_information->getStreamInformation(stream_id);
uint32_t columnId = stream->getColumnId();
uint64_t length = stream->getLength();
if (selected_columns[columnId]) {
- total_io_size += length;
doris::io::PrefetchRange prefetch_range = {offset, offset +
length};
- prefetch_ranges.emplace_back(std::move(prefetch_range));
+ orc::StreamId streamId(stream->getColumnId(), stream->getKind());
+ prefetch_ranges.emplace(std::move(streamId),
std::move(prefetch_range));
}
offset += length;
}
- size_t num_columns = std::count_if(selected_columns.begin(),
selected_columns.end(),
- [](bool selected) { return selected; });
- if (total_io_size / num_columns < io::MergeRangeFileReader::SMALL_IO) {
- // The underlying page reader will prefetch data in column.
- _file_reader.reset(new io::MergeRangeFileReader(_profile,
_inner_reader, prefetch_ranges));
- } else {
- _file_reader = _inner_reader;
+ _build_input_stripe_streams(prefetch_ranges, streams);
+}
+
+void ORCFileInputStream::_build_input_stripe_streams(
+ const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) {
+ if (ranges.empty()) {
+ return;
+ }
+
+ std::unordered_map<orc::StreamId, io::PrefetchRange> small_ranges;
+ std::unordered_map<orc::StreamId, io::PrefetchRange> large_ranges;
+
+ for (const auto& range : ranges) {
+ if (range.second.end_offset - range.second.start_offset <=
_orc_once_max_read_bytes) {
+ small_ranges.emplace(range.first, range.second);
+ } else {
+ large_ranges.emplace(range.first, range.second);
+ }
+ }
+
+ _build_small_ranges_input_stripe_streams(small_ranges, streams);
+ _build_large_ranges_input_stripe_streams(large_ranges, streams);
+}
+
+void ORCFileInputStream::_build_small_ranges_input_stripe_streams(
Review Comment:
we can opt this logic by sorting the ranges first, so no need to traverse
all origin ranges each time:
```
void ORCFileInputStream::_build_small_ranges_input_stripe_streams(
const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) {
// Sort ranges by start_offset for efficient searching
std::vector<std::pair<orc::StreamId, io::PrefetchRange>>
sorted_ranges(ranges.begin(), ranges.end());
std::sort(sorted_ranges.begin(), sorted_ranges.end(), [](const auto& a,
const auto& b) {
return a.second.start_offset < b.second.start_offset;
});
for (const auto& merged_range : merged_ranges) {
auto merge_range_file_reader =
std::make_shared<OrcMergeRangeFileReader>(_profile,
_file_reader, merged_range);
// Use binary search to find the starting point in sorted_ranges
auto it = std::lower_bound(sorted_ranges.begin(),
sorted_ranges.end(), merged_range.start_offset,
[](const auto& pair, uint64_t offset) {
return pair.second.start_offset <
offset;
});
// Iterate from the found starting point
for (; it != sorted_ranges.end() && it->second.start_offset <
merged_range.end_offset; ++it) {
if (it->second.end_offset <= merged_range.end_offset) {
auto stripe_stream_input_stream =
std::make_shared<StripeStreamInputStream>(
getName(), merge_range_file_reader, _statistics,
_io_ctx, _profile);
streams.emplace(it->first, stripe_stream_input_stream);
_stripe_streams.emplace_back(stripe_stream_input_stream);
}
}
}
}
```
##########
be/src/vec/exec/format/orc/vorc_reader.cpp:
##########
@@ -2660,45 +2685,112 @@ MutableColumnPtr
OrcReader::_convert_dict_column_to_string_column(
void ORCFileInputStream::beforeReadStripe(
std::unique_ptr<orc::StripeInformation> current_strip_information,
- std::vector<bool> selected_columns) {
+ std::vector<bool> selected_columns,
Review Comment:
```suggestion
const std::vector<bool>& selected_columns,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]