dataroaring commented on code in PR #19490:
URL: https://github.com/apache/doris/pull/19490#discussion_r1195200855


##########
be/src/vec/olap/vertical_merge_iterator.h:
##########
@@ -299,6 +299,58 @@ class VerticalHeapMergeIterator : public RowwiseIterator {
     std::vector<RowLocation> _block_row_locations;
 };
 
+// --------------- VerticalFifoMergeIterator ------------- //
+class VerticalFifoMergeIterator : public RowwiseIterator {
+public:
+    // VerticalFifoMergeIterator takes the ownership of input iterators
+    VerticalFifoMergeIterator(std::vector<RowwiseIteratorUPtr>&& iters,
+                              std::vector<bool> iterator_init_flags,
+                              std::vector<RowsetId> rowset_ids, size_t 
ori_return_cols,
+                              KeysType keys_type, int32_t seq_col_idx,
+                              RowSourcesBuffer* row_sources_buf)
+            : _origin_iters(std::move(iters)),
+              _iterator_init_flags(iterator_init_flags),
+              _rowset_ids(rowset_ids),
+              _ori_return_cols(ori_return_cols),
+              _keys_type(keys_type),
+              _seq_col_idx(seq_col_idx),
+              _row_sources_buf(row_sources_buf) {}
+
+    ~VerticalFifoMergeIterator() override {}
+
+    Status init(const StorageReadOptions& opts) override;
+    Status next_batch(Block* block) override;
+    const Schema& schema() const override { return *_schema; }
+    uint64_t merged_rows() const override { return _merged_rows; }
+    Status current_block_row_locations(std::vector<RowLocation>* 
block_row_locations) override {
+        DCHECK(_record_rowids);
+        *block_row_locations = _block_row_locations;
+        return Status::OK();
+    }
+
+private:
+    int _get_size(Block* block) { return block->rows(); }
+
+private:
+    // It will be released after '_merge_heap' has been built.
+    std::vector<RowwiseIteratorUPtr> _origin_iters;
+    std::vector<bool> _iterator_init_flags;
+    std::vector<RowsetId> _rowset_ids;
+    size_t _ori_return_cols;
+
+    const Schema* _schema = nullptr;
+
+    VerticalMergeIteratorContext* _cur_iter_ctx;

Review Comment:
   please use unique_ptr



##########
be/src/vec/olap/vertical_merge_iterator.cpp:
##########
@@ -504,6 +504,103 @@ Status VerticalHeapMergeIterator::init(const 
StorageReadOptions& opts) {
     return Status::OK();
 }
 
+//  ----------------  VerticalFifoMergeIterator  -------------  //
+Status VerticalFifoMergeIterator::next_batch(Block* block) {
+    size_t row_idx = 0;
+    std::vector<RowSource> tmp_row_sources;
+    if (UNLIKELY(_record_rowids)) {
+        _block_row_locations.resize(_block_row_max);
+    }
+    while (_get_size(block) < _block_row_max) {
+        if (_cur_iter_ctx == nullptr) {
+            VLOG_NOTICE << "_merge_list empty";
+            break;
+        }
+
+        auto ctx = _cur_iter_ctx;
+        tmp_row_sources.emplace_back(ctx->order(), false);
+
+        // Fifo only for duplicate no key
+        ctx->add_cur_batch();
+        if (UNLIKELY(_record_rowids)) {
+            _block_row_locations[row_idx] = ctx->current_row_location();
+        }
+        row_idx++;
+        if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
+            // current block finished, ctx not advance
+            // so copy start_idx = (_index_in_block - _cur_batch_num + 1)
+            ctx->copy_rows(block, false);
+        }
+
+        RETURN_IF_ERROR(ctx->advance());
+        if (!ctx->valid()) {
+            _cur_iter_ctx = nullptr;
+            // push next iterator in same rowset into heap
+            auto cur_order = ctx->order();
+            while (cur_order + 1 < _iterator_init_flags.size()) {
+                auto& next_iter = _origin_iters[cur_order + 1];
+                VerticalMergeIteratorContext* next_ctx = new 
VerticalMergeIteratorContext(

Review Comment:
   please use unique_ptr



##########
be/src/vec/olap/vertical_merge_iterator.cpp:
##########
@@ -504,6 +504,103 @@ Status VerticalHeapMergeIterator::init(const 
StorageReadOptions& opts) {
     return Status::OK();
 }
 
+//  ----------------  VerticalFifoMergeIterator  -------------  //
+Status VerticalFifoMergeIterator::next_batch(Block* block) {
+    size_t row_idx = 0;
+    std::vector<RowSource> tmp_row_sources;
+    if (UNLIKELY(_record_rowids)) {
+        _block_row_locations.resize(_block_row_max);
+    }
+    while (_get_size(block) < _block_row_max) {
+        if (_cur_iter_ctx == nullptr) {
+            VLOG_NOTICE << "_merge_list empty";
+            break;
+        }
+
+        auto ctx = _cur_iter_ctx;
+        tmp_row_sources.emplace_back(ctx->order(), false);
+
+        // Fifo only for duplicate no key
+        ctx->add_cur_batch();
+        if (UNLIKELY(_record_rowids)) {
+            _block_row_locations[row_idx] = ctx->current_row_location();
+        }
+        row_idx++;
+        if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
+            // current block finished, ctx not advance
+            // so copy start_idx = (_index_in_block - _cur_batch_num + 1)
+            ctx->copy_rows(block, false);
+        }
+
+        RETURN_IF_ERROR(ctx->advance());
+        if (!ctx->valid()) {
+            _cur_iter_ctx = nullptr;
+            // push next iterator in same rowset into heap
+            auto cur_order = ctx->order();
+            while (cur_order + 1 < _iterator_init_flags.size()) {

Review Comment:
   A for loop deliever bette readability.



##########
be/src/vec/olap/vertical_merge_iterator.h:
##########
@@ -299,6 +299,58 @@ class VerticalHeapMergeIterator : public RowwiseIterator {
     std::vector<RowLocation> _block_row_locations;
 };
 
+// --------------- VerticalFifoMergeIterator ------------- //
+class VerticalFifoMergeIterator : public RowwiseIterator {
+public:
+    // VerticalFifoMergeIterator takes the ownership of input iterators
+    VerticalFifoMergeIterator(std::vector<RowwiseIteratorUPtr>&& iters,
+                              std::vector<bool> iterator_init_flags,
+                              std::vector<RowsetId> rowset_ids, size_t 
ori_return_cols,
+                              KeysType keys_type, int32_t seq_col_idx,
+                              RowSourcesBuffer* row_sources_buf)
+            : _origin_iters(std::move(iters)),
+              _iterator_init_flags(iterator_init_flags),
+              _rowset_ids(rowset_ids),
+              _ori_return_cols(ori_return_cols),
+              _keys_type(keys_type),
+              _seq_col_idx(seq_col_idx),
+              _row_sources_buf(row_sources_buf) {}
+
+    ~VerticalFifoMergeIterator() override {}
+
+    Status init(const StorageReadOptions& opts) override;
+    Status next_batch(Block* block) override;
+    const Schema& schema() const override { return *_schema; }
+    uint64_t merged_rows() const override { return _merged_rows; }
+    Status current_block_row_locations(std::vector<RowLocation>* 
block_row_locations) override {
+        DCHECK(_record_rowids);
+        *block_row_locations = _block_row_locations;
+        return Status::OK();
+    }
+
+private:
+    int _get_size(Block* block) { return block->rows(); }
+
+private:
+    // It will be released after '_merge_heap' has been built.
+    std::vector<RowwiseIteratorUPtr> _origin_iters;
+    std::vector<bool> _iterator_init_flags;
+    std::vector<RowsetId> _rowset_ids;
+    size_t _ori_return_cols;
+
+    const Schema* _schema = nullptr;
+
+    VerticalMergeIteratorContext* _cur_iter_ctx;
+    int _block_row_max = 0;
+    KeysType _keys_type;
+    int32_t _seq_col_idx = -1;
+    RowSourcesBuffer* _row_sources_buf;
+    uint32_t _merged_rows = 0;
+    StorageReadOptions _opts;
+    bool _record_rowids = false;

Review Comment:
   please use unique_ptr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to