github-actions[bot] commented on code in PR #43281: URL: https://github.com/apache/doris/pull/43281#discussion_r1837993210
########## be/src/runtime/buffer_control_block.cpp: ########## @@ -191,56 +267,132 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) { ctx->on_data(result, _packet_num); _packet_num++; - _update_dependency(); return; } if (_is_close) { ctx->on_close(_packet_num, _query_statistics.get()); - _update_dependency(); return; } // no ready data, push ctx to waiting list _waiting_rpc.push_back(ctx); - _update_dependency(); } -Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) { +Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>* result, + cctz::time_zone& timezone_obj) { std::unique_lock<std::mutex> l(_lock); + Defer defer {[&]() { _update_dependency(); }}; if (!_status.ok()) { return _status; } if (_is_cancelled) { return Status::Cancelled("Cancelled"); } - while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) { - _arrow_data_arrival.wait_for(l, std::chrono::seconds(1)); + while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled && !_is_close) { + _arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20)); } if (_is_cancelled) { return Status::Cancelled("Cancelled"); } - if (!_arrow_flight_batch_queue.empty()) { - *result = std::move(_arrow_flight_batch_queue.front()); - _arrow_flight_batch_queue.pop_front(); + if (!_arrow_flight_result_batch_queue.empty()) { + *result = std::move(_arrow_flight_result_batch_queue.front()); + _arrow_flight_result_batch_queue.pop_front(); + timezone_obj = _timezone_obj; + for (auto it : _instance_rows_in_queue.front()) { _instance_rows[it.first] -= it.second; } _instance_rows_in_queue.pop_front(); _packet_num++; - _update_dependency(); return Status::OK(); } // normal path end if (_is_close) { - _update_dependency(); + std::stringstream ss; + _profile.pretty_print(&ss); + VLOG_NOTICE << fmt::format( + "BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, " + "packet_num={}, peak_memory_usage={}, profile={}", + print_id(_fragment_id), _is_close, _is_cancelled, _packet_num, + _mem_tracker->peak_consumption(), ss.str()); return Status::OK(); } return Status::InternalError("Get Arrow Batch Abnormal Ending"); } +void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) { + std::unique_lock<std::mutex> l(_lock); + SCOPED_ATTACH_TASK(_mem_tracker); + Defer defer {[&]() { _update_dependency(); }}; + if (!_status.ok()) { + ctx->on_failure(_status); + return; + } + if (_is_cancelled) { + ctx->on_failure(Status::Cancelled("Cancelled")); + return; + } + + if (!_arrow_flight_result_batch_queue.empty()) { + auto block = _arrow_flight_result_batch_queue.front(); + _arrow_flight_result_batch_queue.pop_front(); + for (auto it : _instance_rows_in_queue.front()) { + _instance_rows[it.first] -= it.second; + } + _instance_rows_in_queue.pop_front(); + + ctx->on_data(block, _packet_num, _be_exec_version, _fragement_transmission_compression_type, + _timezone, _serialize_batch_ns_timer, _uncompressed_bytes_counter, + _compressed_bytes_counter); + _packet_num++; + return; + } + + // normal path end + if (_is_close) { + ctx->on_close(_packet_num); + std::stringstream ss; + _profile.pretty_print(&ss); + VLOG_NOTICE << fmt::format( + "BufferControlBlock finished, fragment_id={}, is_close={}, is_cancelled={}, " + "packet_num={}, peak_memory_usage={}, profile={}", + print_id(_fragment_id), _is_close, _is_cancelled, _packet_num, + _mem_tracker->peak_consumption(), ss.str()); + return; + } + // no ready data, push ctx to waiting list + _waiting_arrow_result_batch_rpc.push_back(ctx); +} + +void BufferControlBlock::register_arrow_schema(const std::shared_ptr<arrow::Schema>& arrow_schema) { + std::lock_guard<std::mutex> l(_lock); + _arrow_schema = arrow_schema; +} + +Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema) { Review Comment: warning: method 'find_arrow_schema' can be made const [readability-make-member-function-const] ```suggestion Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>* arrow_schema) const { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org