github-actions[bot] commented on code in PR #43281: URL: https://github.com/apache/doris/pull/43281#discussion_r1835606862
########## be/src/service/arrow_flight/arrow_flight_batch_reader.cpp: ########## @@ -18,52 +18,276 @@ #include "service/arrow_flight/arrow_flight_batch_reader.h" #include <arrow/status.h> +#include <arrow/type.h> +#include <gen_cpp/internal_service.pb.h> + +#include <utility> -#include "arrow/builder.h" #include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" #include "runtime/result_buffer_mgr.h" +#include "runtime/thread_context.h" +#include "service/backend_options.h" +#include "util/arrow/block_convertor.h" #include "util/arrow/row_batch.h" #include "util/arrow/utils.h" +#include "util/brpc_client_cache.h" +#include "util/ref_count_closure.h" +#include "util/string_util.h" +#include "vec/core/block.h" + +namespace doris::flight { + +constexpr size_t BRPC_CONTROLLER_TIMEOUT_MS = 60 * 1000; + +ArrowFlightBatchReaderBase::ArrowFlightBatchReaderBase( + const std::shared_ptr<QueryStatement>& statement) + : _statement(statement) {} + +std::shared_ptr<arrow::Schema> ArrowFlightBatchReaderBase::schema() const { + return _schema; +} -namespace doris { -namespace flight { +arrow::Status ArrowFlightBatchReaderBase::_return_invalid_status(const std::string& msg) { + std::string status_msg = + fmt::format("ArrowFlightBatchReader {}, packet_seq={}, result={}:{}, finistId={}", msg, + _packet_seq, _statement->result_addr.hostname, _statement->result_addr.port, + print_id(_statement->query_id)); + LOG(WARNING) << status_msg; + return arrow::Status::Invalid(status_msg); +} -std::shared_ptr<arrow::Schema> ArrowFlightBatchReader::schema() const { - return schema_; +ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() { + VLOG_NOTICE << fmt::format( + "ArrowFlightBatchReader finished, packet_seq={}, result_addr={}:{}, finistId={}, " + "convert_arrow_batch_timer={}, deserialize_block_timer={}, peak_memory_usage={}", + _packet_seq, _statement->result_addr.hostname, _statement->result_addr.port, + print_id(_statement->query_id), _convert_arrow_batch_timer, _deserialize_block_timer, + _mem_tracker->peak_consumption()); } -ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr<QueryStatement> statement, - std::shared_ptr<arrow::Schema> schema) - : statement_(std::move(statement)), schema_(std::move(schema)) {} +ArrowFlightBatchLocalReader::ArrowFlightBatchLocalReader( + const std::shared_ptr<QueryStatement>& statement, + const std::shared_ptr<arrow::Schema>& schema, + const std::shared_ptr<MemTrackerLimiter>& mem_tracker) + : ArrowFlightBatchReaderBase(statement) { + _schema = schema; + _mem_tracker = mem_tracker; +} -arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> ArrowFlightBatchReader::Create( - const std::shared_ptr<QueryStatement>& statement_) { +arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>> ArrowFlightBatchLocalReader::Create( + const std::shared_ptr<QueryStatement>& statement) { + DCHECK(statement->result_addr.hostname == BackendOptions::get_localhost()); // Make sure that FE send the fragment to BE and creates the BufferControlBlock before returning ticket // to the ADBC client, so that the schema and control block can be found. - auto schema = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id); - if (schema == nullptr) { - ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format( - "Client not found arrow flight schema, maybe query has been canceled, queryid: {}", - print_id(statement_->query_id)))); + std::shared_ptr<arrow::Schema> schema; + RETURN_ARROW_STATUS_IF_ERROR( + ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement->query_id, &schema)); + std::shared_ptr<MemTrackerLimiter> mem_tracker; + RETURN_ARROW_STATUS_IF_ERROR(ExecEnv::GetInstance()->result_mgr()->find_mem_tracker( + statement->query_id, &mem_tracker)); + + std::shared_ptr<ArrowFlightBatchLocalReader> result( + new ArrowFlightBatchLocalReader(statement, schema, mem_tracker)); + return result; +} + +arrow::Status ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) { + // parameter *out not nullptr + *out = nullptr; + SCOPED_ATTACH_TASK(_mem_tracker); + std::shared_ptr<vectorized::Block> result; + auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(_statement->query_id, &result, + _timezone_obj); + st.prepend("ArrowFlightBatchLocalReader fetch arrow data failed"); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + if (result == nullptr) { + // eof, normal path end + return arrow::Status::OK(); + } + + { + // convert one batch + SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer); + st = convert_to_arrow_batch(*result, _schema, arrow::default_memory_pool(), out, + _timezone_obj); + st.prepend("ArrowFlightBatchLocalReader convert block to arrow batch failed"); + ARROW_RETURN_NOT_OK(to_arrow_status(st)); + } + + _packet_seq++; + if (*out != nullptr) { + VLOG_NOTICE << "ArrowFlightBatchLocalReader read next: " << (*out)->num_rows() << ", " + << (*out)->num_columns() << ", packet_seq: " << _packet_seq; + } + return arrow::Status::OK(); +} + +ArrowFlightBatchRemoteReader::ArrowFlightBatchRemoteReader( + const std::shared_ptr<QueryStatement>& statement, + const std::shared_ptr<PBackendService_Stub>& stub) + : ArrowFlightBatchReaderBase(statement), _brpc_stub(stub), _block(nullptr) { + _mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::QUERY, + fmt::format("ArrowFlightBatchRemoteReader#QueryId={}", print_id(_statement->query_id))); +} + +arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>> ArrowFlightBatchRemoteReader::Create( + const std::shared_ptr<QueryStatement>& statement) { + std::shared_ptr<PBackendService_Stub> stub = + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + statement->result_addr); + if (!stub) { + std::string msg = fmt::format( + "ArrowFlightBatchRemoteReader get rpc stub failed, result_addr={}:{}, finistId={}", + statement->result_addr.hostname, statement->result_addr.port, + print_id(statement->query_id)); + LOG(WARNING) << msg; + return arrow::Status::Invalid(msg); } - std::shared_ptr<ArrowFlightBatchReader> result(new ArrowFlightBatchReader(statement_, schema)); + + std::shared_ptr<ArrowFlightBatchRemoteReader> result( + new ArrowFlightBatchRemoteReader(statement, stub)); + ARROW_RETURN_NOT_OK(result->init_schema()); return result; } -arrow::Status ArrowFlightBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) { - // *out not nullptr +arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_init) { Review Comment: warning: function '_fetch_data' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_init) { ^ ``` <details> <summary>Additional context</summary> **be/src/service/arrow_flight/arrow_flight_batch_reader.cpp:154:** 88 lines including whitespace and comments (threshold 80) ```cpp arrow::Status ArrowFlightBatchRemoteReader::_fetch_data(bool first_fetch_for_init) { ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org