This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 9b14720107b [improve](stream-load) add observability on receiving HTTP request (#41136) 9b14720107b is described below commit 9b14720107bb02d1ab940086e3a2d7f35c2a3f0f Author: hui lai <1353307...@qq.com> AuthorDate: Thu Sep 26 11:48:34 2024 +0800 [improve](stream-load) add observability on receiving HTTP request (#41136) pick https://github.com/apache/doris/pull/30432 and https://github.com/apache/doris/pull/40735 --------- Co-authored-by: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> --- be/src/http/action/stream_load.cpp | 14 +++++++++++++- be/src/runtime/stream_load/stream_load_context.cpp | 2 ++ be/src/runtime/stream_load/stream_load_context.h | 2 ++ .../runtime/stream_load/stream_load_executor.cpp | 22 +++++++++++++++++++++- 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 1b0eadd47c3..3557d750ed4 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -75,6 +75,8 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit:: DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS); +bvar::LatencyRecorder g_stream_load_receive_data_latency_ms("stream_load_receive_data_latency_ms"); + static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024; static const string CHUNK = "chunked"; @@ -188,8 +190,10 @@ int StreamLoadAction::on_header(HttpRequest* req) { LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db << ", tbl=" << ctx->table; + ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos(); auto st = _on_header(req, ctx); + if (!st.ok()) { ctx->status = std::move(st); if (ctx->need_rollback) { @@ -340,7 +344,15 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { } ctx->receive_bytes += remove_bytes; } - ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time); + int64_t read_data_time = MonotonicNanos() - start_read_data_time; + int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos; + ctx->read_data_cost_nanos += read_data_time; + ctx->receive_and_read_data_cost_nanos = + MonotonicNanos() - ctx->begin_receive_and_read_data_cost_nanos; + g_stream_load_receive_data_latency_ms + << (ctx->receive_and_read_data_cost_nanos - last_receive_and_read_data_cost_nanos - + read_data_time) / + 1000000; } void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) { diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index f381ba097db..2e9aa4fad9f 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -100,6 +100,8 @@ std::string StreamLoadContext::to_json() const { writer.Int64(read_data_cost_nanos / 1000000); writer.Key("WriteDataTimeMs"); writer.Int(write_data_cost_nanos / 1000000); + writer.Key("ReceiveDataTimeMs"); + writer.Int((receive_and_read_data_cost_nanos - read_data_cost_nanos) / 1000000); writer.Key("CommitAndPublishTimeMs"); writer.Int64(commit_and_publish_txn_cost_nanos / 1000000); diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 0e821956470..334e9ad6af2 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -194,6 +194,8 @@ public: int64_t pre_commit_txn_cost_nanos = 0; int64_t read_data_cost_nanos = 0; int64_t write_data_cost_nanos = 0; + int64_t receive_and_read_data_cost_nanos = 0; + int64_t begin_receive_and_read_data_cost_nanos = 0; std::string error_url = ""; // if label already be used, set existing job's status here diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 19d25e9ffa1..75386d7aa6b 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -69,7 +69,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte // submit this params #ifndef BE_TEST ctx->start_write_data_nanos = MonotonicNanos(); - LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" << ctx->txn_id + LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", txn_id=" << ctx->txn_id << ", query_id=" << print_id(ctx->put_result.params.params.query_id); Status st; if (ctx->put_result.__isset.params) { @@ -144,6 +144,16 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte this->commit_txn(ctx.get()); } } + + LOG(INFO) << "finished to execute stream load. label=" << ctx->label + << ", txn_id=" << ctx->txn_id + << ", query_id=" << print_id(ctx->put_result.params.params.query_id) + << ", receive_data_cost_ms=" + << (ctx->receive_and_read_data_cost_nanos - + ctx->read_data_cost_nanos) / + 1000000 + << ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 1000000 + << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000; }); } else { st = _exec_env->fragment_mgr()->exec_plan_fragment( @@ -217,6 +227,16 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte this->commit_txn(ctx.get()); } } + + LOG(INFO) << "finished to execute stream load. label=" << ctx->label + << ", txn_id=" << ctx->txn_id + << ", query_id=" << print_id(ctx->put_result.params.params.query_id) + << ", receive_data_cost_ms=" + << (ctx->receive_and_read_data_cost_nanos - + ctx->read_data_cost_nanos) / + 1000000 + << ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 1000000 + << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000; }); } if (!st.ok()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org