This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 8a6c5d8d7a8 branch-4.1: [refactor](be) Remove scanner eos flag #63578 
(#63731)
8a6c5d8d7a8 is described below

commit 8a6c5d8d7a8135b13b2c29649261b13a6ed67c37
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 28 13:30:49 2026 +0800

    branch-4.1: [refactor](be) Remove scanner eos flag #63578 (#63731)
    
    Cherry-picked from #63578
    
    Co-authored-by: Jerry Hu <[email protected]>
---
 be/src/exec/scan/scanner.cpp                       | 53 ++++++++++------------
 be/src/exec/scan/scanner.h                         |  1 -
 be/test/exec/scan/scanner_late_arrival_rf_test.cpp | 52 ++++++++++++++++++++-
 3 files changed, 75 insertions(+), 31 deletions(-)

diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp
index 4fc0d445616..9304fc6860a 100644
--- a/be/src/exec/scan/scanner.cpp
+++ b/be/src/exec/scan/scanner.cpp
@@ -81,39 +81,36 @@ Status Scanner::get_block_after_projects(RuntimeState* 
state, Block* block, bool
     
SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().vscanner_get_block);
     auto& row_descriptor = _local_state->_parent->row_descriptor();
     if (_output_row_descriptor) {
-        if (_alreay_eos) {
-            *eos = true;
-            _padding_block.swap(_origin_block);
-        } else {
-            
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
-            const auto min_batch_size = std::max(state->batch_size() / 2, 1);
-            const auto block_max_bytes = state->preferred_block_size_bytes();
-            while (_padding_block.rows() < min_batch_size &&
-                   _padding_block.bytes() < block_max_bytes && !*eos) {
-                RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
-                if (_origin_block.rows() >= min_batch_size) {
-                    break;
-                }
+        
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+        const auto min_batch_size = std::max(state->batch_size() / 2, 1);
+        const auto block_max_bytes = state->preferred_block_size_bytes();
+        while (_padding_block.rows() < min_batch_size && 
_padding_block.bytes() < block_max_bytes &&
+               !*eos) {
+            RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
+            if (*eos) {
+                // For the final block, merge any padding directly and return 
eos in this call.
+                // The merged tail can be larger than the target batch, but 
each source block is
+                // already bounded by the lower scanner.
+                RETURN_IF_ERROR(_merge_padding_block());
+                
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+                break;
+            }
+            if (_origin_block.rows() >= min_batch_size) {
+                break;
+            }
 
-                if (_origin_block.rows() + _padding_block.rows() <= 
state->batch_size() &&
-                    _origin_block.bytes() + _padding_block.bytes() <= 
block_max_bytes) {
-                    RETURN_IF_ERROR(_merge_padding_block());
-                    
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
-                } else {
-                    if (_origin_block.rows() < _padding_block.rows()) {
-                        _padding_block.swap(_origin_block);
-                    }
-                    break;
+            if (_origin_block.rows() + _padding_block.rows() <= 
state->batch_size() &&
+                _origin_block.bytes() + _padding_block.bytes() <= 
block_max_bytes) {
+                RETURN_IF_ERROR(_merge_padding_block());
+                
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+            } else {
+                if (_origin_block.rows() < _padding_block.rows()) {
+                    _padding_block.swap(_origin_block);
                 }
+                break;
             }
         }
 
-        // first output the origin block change eos = false, next time output 
padding block
-        // set the eos to true
-        if (*eos && !_padding_block.empty() && !_origin_block.empty()) {
-            _alreay_eos = true;
-            *eos = false;
-        }
         if (_origin_block.empty() && !_padding_block.empty()) {
             _padding_block.swap(_origin_block);
         }
diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h
index 4a8a7739ff8..0f5b7ffa850 100644
--- a/be/src/exec/scan/scanner.h
+++ b/be/src/exec/scan/scanner.h
@@ -216,7 +216,6 @@ protected:
     std::vector<VExprContextSPtrs> _intermediate_projections;
     Block _origin_block;
     Block _padding_block;
-    bool _alreay_eos = false;
 
     VExprContextSPtrs _common_expr_ctxs_push_down;
 
diff --git a/be/test/exec/scan/scanner_late_arrival_rf_test.cpp 
b/be/test/exec/scan/scanner_late_arrival_rf_test.cpp
index f1e21ebc4c3..0d31b694951 100644
--- a/be/test/exec/scan/scanner_late_arrival_rf_test.cpp
+++ b/be/test/exec/scan/scanner_late_arrival_rf_test.cpp
@@ -18,6 +18,9 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include <list>
+
+#include "common/object_pool.h"
 #include "core/data_type/data_type_factory.hpp"
 #include "core/data_type/data_type_number.h"
 #include "exec/operator/mock_scan_operator.h"
@@ -28,6 +31,10 @@
 #include "exec/scan/scanner.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_descriptors.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "testutil/mock/mock_slot_ref.h"
 
 namespace doris {
 
@@ -40,11 +47,22 @@ public:
                 RuntimeProfile* profile)
             : Scanner(state, local_state, limit, profile) {}
 
+    void add_block(Block block) { _blocks.push_back(std::move(block)); }
+
 protected:
-    Status _get_block_impl(RuntimeState* /*state*/, Block* /*block*/, bool* 
eof) override {
-        *eof = true;
+    Status _get_block_impl(RuntimeState* /*state*/, Block* block, bool* eof) 
override {
+        if (_blocks.empty()) {
+            *eof = true;
+            return Status::OK();
+        }
+        *eof = false;
+        block->swap(_blocks.front());
+        _blocks.pop_front();
         return Status::OK();
     }
+
+private:
+    std::list<Block> _blocks;
 };
 
 class ScannerLateArrivalRfTest : public RuntimeFilterTest {
@@ -117,4 +135,34 @@ TEST_F(ScannerLateArrivalRfTest, 
applied_rf_num_advances_after_late_arrival) {
     ASSERT_TRUE(scanner->_conjuncts.empty());
 }
 
+TEST(ScannerProjectionTest, 
merges_padding_block_when_limit_eos_without_extra_flag) {
+    ObjectPool pool;
+    auto data_type = std::make_shared<DataTypeInt32>();
+    auto row_descriptor = MockRowDescriptor({data_type}, &pool);
+
+    MockRuntimeState state;
+    state._batch_size = 6;
+
+    auto op = std::make_shared<MockScanOperatorX>();
+    op->_row_descriptor = row_descriptor;
+    op->_output_row_descriptor =
+            std::make_unique<MockRowDescriptor>(std::vector<DataTypePtr> 
{data_type}, &pool);
+    op->_output_tuple_desc = 
op->_output_row_descriptor->tuple_descriptors()[0];
+
+    auto local_state = std::make_shared<MockScanLocalState>(&state, op.get());
+    local_state->_projections = MockSlotRef::create_mock_contexts(0, 
data_type);
+
+    RuntimeProfile profile("scanner");
+    TestScanner scanner(&state, local_state.get(), 7, &profile);
+    ASSERT_TRUE(scanner.init(&state, {}).ok());
+    scanner.add_block(ColumnHelper::create_block<DataTypeInt32>({0, 1}));
+    scanner.add_block(ColumnHelper::create_block<DataTypeInt32>({2, 3, 4, 5, 
6}));
+
+    Block first_output;
+    bool eos = false;
+    ASSERT_TRUE(scanner.get_block_after_projects(&state, &first_output, 
&eos).ok());
+    EXPECT_TRUE(eos);
+    EXPECT_EQ(first_output.rows(), 7);
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to