yiguolei commented on code in PR #35386:
URL: https://github.com/apache/doris/pull/35386#discussion_r1615185821


##########
be/src/olap/parallel_scanner_builder.cpp:
##########
@@ -32,13 +32,138 @@ template <typename ParentType>
 Status 
ParallelScannerBuilder<ParentType>::build_scanners(std::list<VScannerSPtr>& 
scanners) {
     RETURN_IF_ERROR(_load());
     if (_is_dup_mow_key) {
+        if (_split_by_segment_size) {
+            return _build_scanners_by_segment_size(scanners);
+        }
         return _build_scanners_by_rowid(scanners);
     } else {
         // TODO: support to split by key range
         return Status::NotSupported("split by key range not supported yet.");
     }
 }
 
+template <typename ParentType>
+Status ParallelScannerBuilder<ParentType>::_build_scanners_by_segment_size(
+        std::list<VScannerSPtr>& scanners) {
+    DCHECK_GE(_bytes_per_scanner, 0);
+
+    for (auto&& [tablet, version] : _tablets) {
+        DCHECK(_all_rowsets.contains(tablet->tablet_id()));
+        auto& rowsets = _all_rowsets[tablet->tablet_id()];
+
+        TabletReader::ReadSource reader_source_with_delete_info;
+
+        if (config::is_cloud_mode()) {
+            // FIXME(plat1ko): Avoid pointer cast
+            
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet);
+        }
+
+        if (!_state->skip_delete_predicate()) {
+            RETURN_IF_ERROR(tablet->capture_rs_readers(
+                    {0, version}, &reader_source_with_delete_info.rs_splits, 
false));
+            reader_source_with_delete_info.fill_delete_predicates();
+        }
+
+        TabletReader::ReadSource read_source;
+
+        size_t bytes_collected = 0;
+        for (auto& rowset : rowsets) {
+            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+            RowsetReaderSharedPtr reader;
+            RETURN_IF_ERROR(beta_rowset->create_reader(&reader));
+            const auto rowset_id = beta_rowset->rowset_id();
+
+            if (beta_rowset->num_rows() == 0) {
+                continue;
+            }
+
+            DCHECK(_segments_size.contains(rowset_id));
+            auto& segments_size = _segments_size[rowset_id];
+
+            int segment_start = 0;
+            auto split = RowSetSplits(reader->clone());
+
+            const auto segments_count = segments_size.size();
+            for (size_t i = 0; i != segments_count; ++i) {
+                const auto segment_size = segments_size[i];
+                const bool is_last_segment = i == (segments_count - 1);
+                const size_t next_segment_size = is_last_segment ? 0 : 
segments_size[i + 1];
+
+                RowRanges row_ranges;
+
+                row_ranges.add({0, 0});
+                DCHECK_EQ(row_ranges.count(), 0);
+                split.segment_row_ranges.emplace_back(std::move(row_ranges));
+
+                bytes_collected += segment_size;
+
+                bool need_more_segments = bytes_collected < _bytes_per_scanner;
+                if (need_more_segments) {
+                    bool has_less_bytes = bytes_collected <= 
_bytes_per_scanner * 0.5;
+                    if (is_last_segment) { // last segment of this rowset.
+                        need_more_segments = has_less_bytes;
+                    } else if (has_less_bytes) {
+                        need_more_segments =
+                                (bytes_collected + next_segment_size) <= 
_bytes_per_scanner * 1.2;
+                    } else {
+                        need_more_segments =
+                                (bytes_collected + next_segment_size) <= 
_bytes_per_scanner;
+                    }
+                }
+
+                if (!need_more_segments) {
+                    split.segment_offsets.first = segment_start,
+                    split.segment_offsets.second = i + 1;
+
+                    DCHECK_EQ(split.segment_offsets.second - 
split.segment_offsets.first,
+                              split.segment_row_ranges.size());
+
+                    read_source.rs_splits.emplace_back(std::move(split));
+
+                    scanners.emplace_back(
+                            _build_scanner(tablet, version, _key_ranges,
+                                           {std::move(read_source.rs_splits),
+                                            
reader_source_with_delete_info.delete_predicates}));
+
+                    read_source = TabletReader::ReadSource();
+                    split = RowSetSplits(reader->clone());
+                    row_ranges = RowRanges();
+
+                    segment_start = i + 1;
+                    bytes_collected = 0;
+                }
+            }
+
+            if (bytes_collected > 0) {
+                split.segment_offsets.first = segment_start;
+                split.segment_offsets.second = segments_size.size();
+                DCHECK_GT(split.segment_offsets.second, 
split.segment_offsets.first);
+                DCHECK_EQ(split.segment_row_ranges.size(),
+                          split.segment_offsets.second - 
split.segment_offsets.first);
+                read_source.rs_splits.emplace_back(std::move(split));
+            }
+        } // end `for (auto& rowset : rowsets)`
+
+        if (bytes_collected > 0) {
+            DCHECK_GT(read_source.rs_splits.size(), 0);

Review Comment:
   Is this useful? I think it maybe never hit this code. because lint 137. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to