yiguolei commented on code in PR #35386: URL: https://github.com/apache/doris/pull/35386#discussion_r1615185821
########## be/src/olap/parallel_scanner_builder.cpp: ########## @@ -32,13 +32,138 @@ template <typename ParentType> Status ParallelScannerBuilder<ParentType>::build_scanners(std::list<VScannerSPtr>& scanners) { RETURN_IF_ERROR(_load()); if (_is_dup_mow_key) { + if (_split_by_segment_size) { + return _build_scanners_by_segment_size(scanners); + } return _build_scanners_by_rowid(scanners); } else { // TODO: support to split by key range return Status::NotSupported("split by key range not supported yet."); } } +template <typename ParentType> +Status ParallelScannerBuilder<ParentType>::_build_scanners_by_segment_size( + std::list<VScannerSPtr>& scanners) { + DCHECK_GE(_bytes_per_scanner, 0); + + for (auto&& [tablet, version] : _tablets) { + DCHECK(_all_rowsets.contains(tablet->tablet_id())); + auto& rowsets = _all_rowsets[tablet->tablet_id()]; + + TabletReader::ReadSource reader_source_with_delete_info; + + if (config::is_cloud_mode()) { + // FIXME(plat1ko): Avoid pointer cast + ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); + } + + if (!_state->skip_delete_predicate()) { + RETURN_IF_ERROR(tablet->capture_rs_readers( + {0, version}, &reader_source_with_delete_info.rs_splits, false)); + reader_source_with_delete_info.fill_delete_predicates(); + } + + TabletReader::ReadSource read_source; + + size_t bytes_collected = 0; + for (auto& rowset : rowsets) { + auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset); + RowsetReaderSharedPtr reader; + RETURN_IF_ERROR(beta_rowset->create_reader(&reader)); + const auto rowset_id = beta_rowset->rowset_id(); + + if (beta_rowset->num_rows() == 0) { + continue; + } + + DCHECK(_segments_size.contains(rowset_id)); + auto& segments_size = _segments_size[rowset_id]; + + int segment_start = 0; + auto split = RowSetSplits(reader->clone()); + + const auto segments_count = segments_size.size(); + for (size_t i = 0; i != segments_count; ++i) { + const auto segment_size = segments_size[i]; + const bool is_last_segment = i == (segments_count - 1); + const size_t next_segment_size = is_last_segment ? 0 : segments_size[i + 1]; + + RowRanges row_ranges; + + row_ranges.add({0, 0}); + DCHECK_EQ(row_ranges.count(), 0); + split.segment_row_ranges.emplace_back(std::move(row_ranges)); + + bytes_collected += segment_size; + + bool need_more_segments = bytes_collected < _bytes_per_scanner; + if (need_more_segments) { + bool has_less_bytes = bytes_collected <= _bytes_per_scanner * 0.5; + if (is_last_segment) { // last segment of this rowset. + need_more_segments = has_less_bytes; + } else if (has_less_bytes) { + need_more_segments = + (bytes_collected + next_segment_size) <= _bytes_per_scanner * 1.2; + } else { + need_more_segments = + (bytes_collected + next_segment_size) <= _bytes_per_scanner; + } + } + + if (!need_more_segments) { + split.segment_offsets.first = segment_start, + split.segment_offsets.second = i + 1; + + DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first, + split.segment_row_ranges.size()); + + read_source.rs_splits.emplace_back(std::move(split)); + + scanners.emplace_back( + _build_scanner(tablet, version, _key_ranges, + {std::move(read_source.rs_splits), + reader_source_with_delete_info.delete_predicates})); + + read_source = TabletReader::ReadSource(); + split = RowSetSplits(reader->clone()); + row_ranges = RowRanges(); + + segment_start = i + 1; + bytes_collected = 0; + } + } + + if (bytes_collected > 0) { + split.segment_offsets.first = segment_start; + split.segment_offsets.second = segments_size.size(); + DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first); + DCHECK_EQ(split.segment_row_ranges.size(), + split.segment_offsets.second - split.segment_offsets.first); + read_source.rs_splits.emplace_back(std::move(split)); + } + } // end `for (auto& rowset : rowsets)` + + if (bytes_collected > 0) { + DCHECK_GT(read_source.rs_splits.size(), 0); Review Comment: Is this useful? I think it maybe never hit this code. because lint 137. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org