This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d2922aa4031 [bugfix](arrowflight) should call done run in on_xxx 
method to make work in async mode (#60282)
d2922aa4031 is described below

commit d2922aa4031818bed041a271d1315846c08182d4
Author: yiguolei <[email protected]>
AuthorDate: Thu Jan 29 08:51:37 2026 +0800

    [bugfix](arrowflight) should call done run in on_xxx method to make work in 
async mode (#60282)
    
    there is a logic similar to our FE fetching data from the BE: if the
    data is not available, it will be added to a waiting queue. Once added
    to the waiting queue, the guard will invoke the run method
    automatically.
    At this point, the data received by the client is actually undefined,
    yet the status of this undefined value is unexpectedly ok. As a result,
    the program attempts to read the block value, leading to a dirty read.
---
 be/src/service/internal_service.cpp                |  3 +--
 be/src/vec/sink/varrow_flight_result_writer.cpp    |  4 +++
 be/src/vec/sink/varrow_flight_result_writer.h      |  4 ++-
 .../vec/sink/arrow_result_block_buffer_test.cpp    | 29 +++++++++++++++++-----
 be/test/vec/sink/get_result_batch_test.cpp         |  6 +++--
 5 files changed, 35 insertions(+), 11 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index f68716eebc8..f118e8b4122 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -671,8 +671,7 @@ void 
PInternalService::fetch_arrow_data(google::protobuf::RpcController* control
                                         PFetchArrowDataResult* result,
                                         google::protobuf::Closure* done) {
     bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        auto ctx = vectorized::GetArrowResultBatchCtx::create_shared(result);
+        auto ctx = vectorized::GetArrowResultBatchCtx::create_shared(result, 
done);
         TUniqueId unique_id = UniqueId(request->finst_id()).to_thrift(); // 
query_id or instance_id
         std::shared_ptr<vectorized::ArrowFlightResultBlockBuffer> arrow_buffer;
         auto st = ExecEnv::GetInstance()->result_mgr()->find_buffer(unique_id, 
arrow_buffer);
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp 
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index b8280785546..6c9c393ae4a 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -33,6 +33,7 @@ namespace doris::vectorized {
 void GetArrowResultBatchCtx::on_failure(const Status& status) {
     DCHECK(!status.ok()) << "status is ok, errmsg=" << status;
     status.to_protobuf(_result->mutable_status());
+    _done->Run();
 }
 
 void GetArrowResultBatchCtx::on_close(int64_t packet_seq, int64_t /* 
returned_rows */) {
@@ -40,6 +41,7 @@ void GetArrowResultBatchCtx::on_close(int64_t packet_seq, 
int64_t /* returned_ro
     status.to_protobuf(_result->mutable_status());
     _result->set_packet_seq(packet_seq);
     _result->set_eos(true);
+    _done->Run();
 }
 
 Status GetArrowResultBatchCtx::on_data(const 
std::shared_ptr<vectorized::Block>& block,
@@ -72,6 +74,8 @@ Status GetArrowResultBatchCtx::on_data(const 
std::shared_ptr<vectorized::Block>&
         _result->clear_block();
     }
     st.to_protobuf(_result->mutable_status());
+
+    _done->Run();
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/varrow_flight_result_writer.h 
b/be/src/vec/sink/varrow_flight_result_writer.h
index f84c3a12fa8..d1f5e21404a 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.h
+++ b/be/src/vec/sink/varrow_flight_result_writer.h
@@ -35,7 +35,8 @@ class GetArrowResultBatchCtx {
 public:
     using ResultType = vectorized::Block;
     ENABLE_FACTORY_CREATOR(GetArrowResultBatchCtx)
-    GetArrowResultBatchCtx(PFetchArrowDataResult* result) : _result(result) {}
+    GetArrowResultBatchCtx(PFetchArrowDataResult* result, 
google::protobuf::Closure* done)
+            : _result(result), _done(done) {}
 #ifdef BE_TEST
     GetArrowResultBatchCtx() = default;
 #endif
@@ -53,6 +54,7 @@ private:
     int32_t _max_msg_size = std::numeric_limits<int32_t>::max();
 #endif
     PFetchArrowDataResult* _result = nullptr;
+    google::protobuf::Closure* _done = nullptr;
 };
 
 class ArrowFlightResultBlockBuffer final : public 
ResultBlockBuffer<GetArrowResultBatchCtx> {
diff --git a/be/test/vec/sink/arrow_result_block_buffer_test.cpp 
b/be/test/vec/sink/arrow_result_block_buffer_test.cpp
index 48718361185..7fb1d8adf1f 100644
--- a/be/test/vec/sink/arrow_result_block_buffer_test.cpp
+++ b/be/test/vec/sink/arrow_result_block_buffer_test.cpp
@@ -34,6 +34,14 @@
 
 namespace doris::vectorized {
 
+class MockClosure : public google::protobuf::Closure {
+public:
+    MockClosure() {}
+    MockClosure(std::function<void()> cb) : _cb(cb) {}
+    void Run() override { _cb(); }
+
+    std::function<void()> _cb;
+};
 class ArrowResultBlockBufferTest : public ::testing::Test {
 public:
     ArrowResultBlockBufferTest() = default;
@@ -44,8 +52,9 @@ class MockGetArrowResultBatchCtx : public 
GetArrowResultBatchCtx {
 public:
     ENABLE_FACTORY_CREATOR(MockGetArrowResultBatchCtx)
     MockGetArrowResultBatchCtx(std::function<void()> fail_cb, 
std::function<void()> close_cb,
-                               std::function<void()> data_cb, 
PFetchArrowDataResult* result)
-            : GetArrowResultBatchCtx(result),
+                               std::function<void()> data_cb, 
PFetchArrowDataResult* result,
+                               google::protobuf::Closure* done)
+            : GetArrowResultBatchCtx(result, done),
               _fail_cb(fail_cb),
               _close_cb(close_cb),
               _data_cb(data_cb) {}
@@ -78,9 +87,11 @@ TEST_F(ArrowResultBlockBufferTest, 
TestArrowResultBlockBuffer) {
     ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, 
buffer_size);
     buffer.set_dependency(ins_id, dep);
     PFetchArrowDataResult presult;
+
+    MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
     std::shared_ptr<GetArrowResultBatchCtx> ctx = 
MockGetArrowResultBatchCtx::create_shared(
             [&]() -> void { fail = true; }, [&]() -> void { close = true; },
-            [&]() -> void { data = true; }, &presult);
+            [&]() -> void { data = true; }, &presult, &done);
 
     {
         auto num_rows = 2;
@@ -201,9 +212,11 @@ TEST_F(ArrowResultBlockBufferTest, 
TestCancelArrowResultBlockBuffer) {
     ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, 
buffer_size);
     buffer.set_dependency(ins_id, dep);
     PFetchArrowDataResult presult;
+
+    MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
     std::shared_ptr<GetArrowResultBatchCtx> ctx = 
MockGetArrowResultBatchCtx::create_shared(
             [&]() -> void { fail = true; }, [&]() -> void { close = true; },
-            [&]() -> void { data = true; }, &presult);
+            [&]() -> void { data = true; }, &presult, &done);
 
     {
         EXPECT_TRUE(buffer.get_batch(ctx).ok());
@@ -273,9 +286,11 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
     ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, 
buffer_size);
     buffer.set_dependency(ins_id, dep);
     PFetchArrowDataResult presult;
+
+    MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
     std::shared_ptr<GetArrowResultBatchCtx> ctx = 
MockGetArrowResultBatchCtx::create_shared(
             [&]() -> void { fail = true; }, [&]() -> void { close = true; },
-            [&]() -> void { data = true; }, &presult);
+            [&]() -> void { data = true; }, &presult, &done);
 
     {
         EXPECT_TRUE(buffer.get_batch(ctx).ok());
@@ -330,9 +345,11 @@ TEST_F(ArrowResultBlockBufferTest, 
TestArrowResultSerializeFailure) {
     ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema, 
buffer_size);
     buffer.set_dependency(ins_id, dep);
     PFetchArrowDataResult presult;
+
+    MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
     std::shared_ptr<GetArrowResultBatchCtx> ctx = 
MockGetArrowResultBatchCtx::create_shared(
             [&]() -> void { fail = true; }, [&]() -> void { close = true; },
-            [&]() -> void { data = true; }, &presult);
+            [&]() -> void { data = true; }, &presult, &done);
 
     {
         auto num_rows = 2;
diff --git a/be/test/vec/sink/get_result_batch_test.cpp 
b/be/test/vec/sink/get_result_batch_test.cpp
index 82eff9dd289..b820bd43ecf 100644
--- a/be/test/vec/sink/get_result_batch_test.cpp
+++ b/be/test/vec/sink/get_result_batch_test.cpp
@@ -40,10 +40,10 @@ public:
 
 class MockClosure : public google::protobuf::Closure {
 public:
+    MockClosure() {}
     MockClosure(std::function<void()> cb) : _cb(cb) {}
     void Run() override { _cb(); }
 
-private:
     std::function<void()> _cb;
 };
 
@@ -126,7 +126,9 @@ TEST_F(GetResultBatchCtxTest, TestGetResultBatchCtx) {
 
 TEST_F(GetResultBatchCtxTest, TestGetArrowResultBatchCtx) {
     PFetchArrowDataResult result;
-    auto ctx = GetArrowResultBatchCtx::create_shared(&result);
+    MockClosure closure;
+    closure._cb = [&]() { std::cout << "cb" << std::endl; };
+    auto ctx = GetArrowResultBatchCtx::create_shared(&result, &closure);
 
     {
         // on_failure


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to