github-actions[bot] commented on code in PR #43281:
URL: https://github.com/apache/doris/pull/43281#discussion_r1835606862


##########
be/src/service/arrow_flight/arrow_flight_batch_reader.cpp:
##########
@@ -18,52 +18,276 @@
 #include "service/arrow_flight/arrow_flight_batch_reader.h"
 
 #include <arrow/status.h>
+#include <arrow/type.h>
+#include <gen_cpp/internal_service.pb.h>
+
+#include <utility>
 
-#include "arrow/builder.h"
 #include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/result_buffer_mgr.h"
+#include "runtime/thread_context.h"
+#include "service/backend_options.h"
+#include "util/arrow/block_convertor.h"
 #include "util/arrow/row_batch.h"
 #include "util/arrow/utils.h"
+#include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
+#include "util/string_util.h"
+#include "vec/core/block.h"
+
+namespace doris::flight {
+
+constexpr size_t BRPC_CONTROLLER_TIMEOUT_MS = 60 * 1000;
+
+ArrowFlightBatchReaderBase::ArrowFlightBatchReaderBase(
+        const std::shared_ptr<QueryStatement>& statement)
+        : _statement(statement) {}
+
+std::shared_ptr<arrow::Schema> ArrowFlightBatchReaderBase::schema() const {
+    return _schema;
+}
 
-namespace doris {
-namespace flight {
+arrow::Status ArrowFlightBatchReaderBase::_return_invalid_status(const 
std::string& msg) {
+    std::string status_msg =
+            fmt::format("ArrowFlightBatchReader {}, packet_seq={}, 
result={}:{}, finistId={}", msg,
+                        _packet_seq, _statement->result_addr.hostname, 
_statement->result_addr.port,
+                        print_id(_statement->query_id));
+    LOG(WARNING) << status_msg;
+    return arrow::Status::Invalid(status_msg);
+}
 
-std::shared_ptr<arrow::Schema> ArrowFlightBatchReader::schema() const {
-    return schema_;
+ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() {
+    VLOG_NOTICE << fmt::format(
+            "ArrowFlightBatchReader finished, packet_seq={}, 
result_addr={}:{}, finistId={}, "
+            "convert_arrow_batch_timer={}, deserialize_block_timer={}, 
peak_memory_usage={}",
+            _packet_seq, _statement->result_addr.hostname, 
_statement->result_addr.port,
+            print_id(_statement->query_id), _convert_arrow_batch_timer, 
_deserialize_block_timer,
+            _mem_tracker->peak_consumption());
 }
 
-ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr<QueryStatement> 
statement,
-                                               std::shared_ptr<arrow::Schema> 
schema)
-        : statement_(std::move(statement)), schema_(std::move(schema)) {}
+ArrowFlightBatchLocalReader::ArrowFlightBatchLocalReader(
+        const std::shared_ptr<QueryStatement>& statement,
+        const std::shared_ptr<arrow::Schema>& schema,
+        const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
+        : ArrowFlightBatchReaderBase(statement) {
+    _schema = schema;
+    _mem_tracker = mem_tracker;
+}
 
-arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> 
ArrowFlightBatchReader::Create(
-        const std::shared_ptr<QueryStatement>& statement_) {
+arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>> 
ArrowFlightBatchLocalReader::Create(
+        const std::shared_ptr<QueryStatement>& statement) {
+    DCHECK(statement->result_addr.hostname == BackendOptions::get_localhost());
     // Make sure that FE send the fragment to BE and creates the 
BufferControlBlock before returning ticket
     // to the ADBC client, so that the schema and control block can be found.
-    auto schema = 
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id);
-    if (schema == nullptr) {
-        ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format(
-                "Client not found arrow flight schema, maybe query has been 
canceled, queryid: {}",
-                print_id(statement_->query_id))));
+    std::shared_ptr<arrow::Schema> schema;
+    RETURN_ARROW_STATUS_IF_ERROR(
+            
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement->query_id, 
&schema));
+    std::shared_ptr<MemTrackerLimiter> mem_tracker;
+    
RETURN_ARROW_STATUS_IF_ERROR(ExecEnv::GetInstance()->result_mgr()->find_mem_tracker(
+            statement->query_id, &mem_tracker));
+
+    std::shared_ptr<ArrowFlightBatchLocalReader> result(
+            new ArrowFlightBatchLocalReader(statement, schema, mem_tracker));
+    return result;
+}
+
+arrow::Status 
ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) 
{
+    // parameter *out not nullptr
+    *out = nullptr;
+    SCOPED_ATTACH_TASK(_mem_tracker);
+    std::shared_ptr<vectorized::Block> result;
+    auto st = 
ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(_statement->query_id, 
&result,
+                                                                     
_timezone_obj);
+    st.prepend("ArrowFlightBatchLocalReader fetch arrow data failed");
+    ARROW_RETURN_NOT_OK(to_arrow_status(st));
+    if (result == nullptr) {
+        // eof, normal path end
+        return arrow::Status::OK();
+    }
+
+    {
+        // convert one batch
+        SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer);
+        st = convert_to_arrow_batch(*result, _schema, 
arrow::default_memory_pool(), out,
+                                    _timezone_obj);
+        st.prepend("ArrowFlightBatchLocalReader convert block to arrow batch 
failed");
+        ARROW_RETURN_NOT_OK(to_arrow_status(st));
+    }
+
+    _packet_seq++;
+    if (*out != nullptr) {
+        VLOG_NOTICE << "ArrowFlightBatchLocalReader read next: " << 
(*out)->num_rows() << ", "
+                    << (*out)->num_columns() << ", packet_seq: " << 
_packet_seq;
+    }
+    return arrow::Status::OK();
+}
+
+ArrowFlightBatchRemoteReader::ArrowFlightBatchRemoteReader(
+        const std::shared_ptr<QueryStatement>& statement,
+        const std::shared_ptr<PBackendService_Stub>& stub)
+        : ArrowFlightBatchReaderBase(statement), _brpc_stub(stub), 
_block(nullptr) {
+    _mem_tracker = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::QUERY,
+            fmt::format("ArrowFlightBatchRemoteReader#QueryId={}", 
print_id(_statement->query_id)));
+}
+
+arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>> 
ArrowFlightBatchRemoteReader::Create(
+        const std::shared_ptr<QueryStatement>& statement) {
+    std::shared_ptr<PBackendService_Stub> stub =
+            ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+                    statement->result_addr);
+    if (!stub) {
+        std::string msg = fmt::format(
+                "ArrowFlightBatchRemoteReader get rpc stub failed, 
result_addr={}:{}, finistId={}",
+                statement->result_addr.hostname, statement->result_addr.port,
+                print_id(statement->query_id));
+        LOG(WARNING) << msg;
+        return arrow::Status::Invalid(msg);
     }
-    std::shared_ptr<ArrowFlightBatchReader> result(new 
ArrowFlightBatchReader(statement_, schema));
+
+    std::shared_ptr<ArrowFlightBatchRemoteReader> result(
+            new ArrowFlightBatchRemoteReader(statement, stub));
+    ARROW_RETURN_NOT_OK(result->init_schema());
     return result;
 }
 
-arrow::Status 
ArrowFlightBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
-    // *out not nullptr
+arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool 
first_fetch_for_init) {

Review Comment:
   warning: function '_fetch_data' exceeds recommended size/complexity 
thresholds [readability-function-size]
   ```cpp
   arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool 
first_fetch_for_init) {
                                               ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/service/arrow_flight/arrow_flight_batch_reader.cpp:154:** 88 lines 
including whitespace and comments (threshold 80)
   ```cpp
   arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool 
first_fetch_for_init) {
                                               ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to