This is an automated email from the ASF dual-hosted git repository.

jasonmfehr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 3910e924d IMPALA-13237: [Patch 7] - Lock ClientRequestState during 
Opentelemetry Traces
3910e924d is described below

commit 3910e924d406709419f654eb5af30e22da11f9c5
Author: jasonmfehr <[email protected]>
AuthorDate: Tue Aug 12 17:05:34 2025 -0700

    IMPALA-13237: [Patch 7] - Lock ClientRequestState during Opentelemetry 
Traces
    
    Updates the SpanManager class so it takes the ClientRequestState lock
    when reading from that object.
    
    Updates startup flag otel_trace_span_processor to be hidden. Manual
    testing revealed that setting this flag to "simple" (which uses
    SimpleSpanProcessor when forwarding OpenTelemetry traces) causes the
    SpanManager object to block until the destination OpenTelemetry
    collector receives the request and responds. Thus, network slowness
    or an overloaded OpenTelemetry collector will block the entire query
    processing flow since SpanManager will hold the ClientRequestState
    lock throughout the duration of the communication with the
    OpenTelemetry collector. Since the SimpleSpanProcessor is useful in
    testing, this flag was changed to hidden to avoid incorrect usage in
    production.
    
    When generating span attribute values on OpenTelemetry traces for
    queries, data is read from ClientRequestState without holding its
    lock. The documentation in client-request-state.h specifically states
    reading most fields requires holding its lock.
    
    An examination of the opentelemetry-cpp SDK code revealed the
    ClientRequestState lock must be held until the StartSpan() and
    EndSpan() functions complete. The reason is span attribute keys and
    values are deep copied from the source nostd::string_view objects
    during these functions.
    
    Testing accomplished by running the test_otel_trace.py custom cluster
    tests as regression tests. Additionally, manual testing with
    intentionally delayed network communication to an OpenTelemetry
    collector demonstrated that the StartSpan() and EndSpan() functions
    do not block waiting on the OpenTelemetry collector if the batch span
    processor is used. However, these functions do block if the simple
    span processor is used.
    
    Additionally, a cause of flaky tests was addressed. The custom
    cluster tests wait until JSON objects for all traces are written to
    the output file. Since each trace JSON object is written on its own
    line in the output file, this wait is accomplished by checking the
    number of lines in the output file. Occasionally, the traces would be
    partially written to the file which satisfied the line count check
    but the trace would not be fully written out when the assertion code
    loaded it. In these situations, the test failed because a partial
    JSON object cannot be loaded. The fix is to wait both for the
    expected line count and for the last line to end with a newline
    character. This fix ensures that the JSON representing the trace is
    fully written to the file before the assert code loads it.
    
    Generated-by: Github Copilot (Claude Sonnet 3.7)
    Change-Id: I649bdb6f88176995d45f7d10db898188bbe0b609
    Reviewed-on: http://gerrit.cloudera.org:8080/23294
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/observe/otel-flags-trace.cc     |  15 +++--
 be/src/observe/otel.cc                 |   5 +-
 be/src/observe/otel.h                  |   2 +-
 be/src/observe/span-manager.cc         | 119 +++++++++++++++++++++++----------
 be/src/observe/span-manager.h          |  40 ++++++-----
 be/src/service/client-request-state.cc |   8 ++-
 be/src/service/impala-server.cc        |   2 +-
 tests/common/file_utils.py             |  61 +++++++++++++++--
 8 files changed, 184 insertions(+), 68 deletions(-)

diff --git a/be/src/observe/otel-flags-trace.cc 
b/be/src/observe/otel-flags-trace.cc
index b144736c8..65718fe82 100644
--- a/be/src/observe/otel-flags-trace.cc
+++ b/be/src/observe/otel-flags-trace.cc
@@ -26,6 +26,7 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gutil/strings/split.h>
+#include <gutil/strings/substitute.h>
 #include <opentelemetry/sdk/trace/batch_span_processor_options.h>
 
 #include "common/status.h"
@@ -173,7 +174,8 @@ DEFINE_string(otel_trace_tls_cipher_suites, "", "List of 
allowed TLS cipher suit
     "using TLS 1.3, default to the value of Impala’s tls_ciphersuites startup 
flag.");
 
 DEFINE_bool(otel_trace_tls_insecure_skip_verify, false, "If set to true, skips 
"
-    "verification of collector’s TLS certificate.");
+    "verification of collector’s TLS certificate. This should only be set to 
false for "
+    "development / testing");
 //
 // End of TLS related flags.
 //
@@ -204,11 +206,12 @@ 
DEFINE_validator(otel_trace_retry_policy_backoff_multiplier, ge_one);
 //
 // Start of Span Processor flags
 //
-static const string SPAN_PROCESSOR_HELP = "The span processor implementation 
to use for "
-    "exporting spans to the OTel Collector. Supported values: '"
-    + impala::SPAN_PROCESSOR_BATCH + "' and '" + impala::SPAN_PROCESSOR_SIMPLE 
+ "'.";
-DEFINE_string(otel_trace_span_processor, impala::SPAN_PROCESSOR_BATCH.c_str(),
-    SPAN_PROCESSOR_HELP.c_str());
+// This flag is hidden because simple span processor blocks the query 
processing while
+// communicating with the OTel collector.
+DEFINE_string_hidden(otel_trace_span_processor, 
impala::SPAN_PROCESSOR_BATCH.c_str(),
+    strings::Substitute("The span processor implementation to use for 
exporting spans to "
+    "the OTel Collector. Supported values: '$0' and '$1'.", 
impala::SPAN_PROCESSOR_BATCH,
+    impala::SPAN_PROCESSOR_SIMPLE).c_str());
 DEFINE_validator(otel_trace_span_processor, [](const char* flagname,
     const string& value) {
   const std::string trimmed = boost::algorithm::trim_copy(value);
diff --git a/be/src/observe/otel.cc b/be/src/observe/otel.cc
index 456c8030d..40a870d11 100644
--- a/be/src/observe/otel.cc
+++ b/be/src/observe/otel.cc
@@ -268,6 +268,9 @@ Status init_otel_tracer() {
     }
   } else {
     VLOG(2) << "Using SimpleSpanProcessor for OTel spans";
+    LOG(WARNING) << "Setting --otel_trace_span_processor=simple blocks the 
query "
+    "processing thread while exporting spans to the OTel collector. This will 
cause "
+    "significant performance degradation and is not recommended for production 
use.";
     processor = make_unique<SimpleSpanProcessor>(move(exporter));
   }
 
@@ -289,7 +292,7 @@ void shutdown_otel_tracer() {
   provider_.reset();
 }
 
-shared_ptr<SpanManager> build_span_manager(const ClientRequestState* crs) {
+shared_ptr<SpanManager> build_span_manager(ClientRequestState* crs) {
   DCHECK(provider_) << "OpenTelemetry tracer was not initialized.";
 
   return make_shared<SpanManager>(
diff --git a/be/src/observe/otel.h b/be/src/observe/otel.h
index 35e79a810..f5591d6cb 100644
--- a/be/src/observe/otel.h
+++ b/be/src/observe/otel.h
@@ -52,6 +52,6 @@ Status init_otel_tracer();
 void shutdown_otel_tracer();
 
 // Builds a SpanManager instance for the given query.
-std::shared_ptr<SpanManager> build_span_manager(const ClientRequestState*);
+std::shared_ptr<SpanManager> build_span_manager(ClientRequestState*);
 
 } // namespace impala
diff --git a/be/src/observe/span-manager.cc b/be/src/observe/span-manager.cc
index 38f42d820..dbd0ae8d2 100644
--- a/be/src/observe/span-manager.cc
+++ b/be/src/observe/span-manager.cc
@@ -122,7 +122,7 @@ static inline void debug_log_span(const TimedSpan* span, 
const string& span_name
 } // function debug_log_span
 
 SpanManager::SpanManager(nostd::shared_ptr<trace::Tracer> tracer,
-    const ClientRequestState* client_request_state) : 
tracer_(std::move(tracer)),
+    ClientRequestState* client_request_state) : tracer_(std::move(tracer)),
     client_request_state_(client_request_state),
     query_id_(PrintId(client_request_state_->query_id())) {
   child_span_type_ = ChildSpanType::NONE;
@@ -130,15 +130,20 @@ SpanManager::SpanManager(nostd::shared_ptr<trace::Tracer> 
tracer,
   DCHECK(client_request_state_ != nullptr) << "Cannot start root span without 
a valid "
       "client request state.";
 
-  root_ = make_shared<TimedSpan>(tracer_, query_id_, ATTR_QUERY_START_TIME, 
ATTR_RUNTIME,
-      OtelAttributesMap{
-        {ATTR_CLUSTER_ID, FLAGS_cluster_id},
-        {ATTR_QUERY_ID, query_id_},
-        {ATTR_REQUEST_POOL, client_request_state_->request_pool()},
-        {ATTR_SESSION_ID, PrintId(client_request_state_->session_id())},
-        {ATTR_USER_NAME, client_request_state_->effective_user()}
-      },
-      trace::SpanKind::kServer);
+  {
+    lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
+
+    root_ = make_shared<TimedSpan>(tracer_, query_id_, ATTR_QUERY_START_TIME,
+        ATTR_RUNTIME,
+        OtelAttributesMap{
+          {ATTR_CLUSTER_ID, FLAGS_cluster_id},
+          {ATTR_QUERY_ID, query_id_},
+          {ATTR_REQUEST_POOL, client_request_state_->request_pool()},
+          {ATTR_SESSION_ID, PrintId(client_request_state_->session_id())},
+          {ATTR_USER_NAME, client_request_state_->effective_user()}
+        },
+        trace::SpanKind::kServer);
+  }
 
   scope_ = make_unique<trace::Scope>(root_->SetActive());
   debug_log_span(root_.get(), "Root", query_id_, true);
@@ -180,21 +185,32 @@ void SpanManager::StartChildSpanInit() {
   ChildSpanBuilder(ChildSpanType::INIT,
       {
         {ATTR_CLUSTER_ID, FLAGS_cluster_id},
-        {ATTR_DEFAULT_DB, client_request_state_->default_db()},
-        {ATTR_QUERY_ID, query_id_},
-        {ATTR_QUERY_STRING, client_request_state_->redacted_sql()},
-        {ATTR_REQUEST_POOL, client_request_state_->request_pool()},
-        {ATTR_SESSION_ID, PrintId(client_request_state_->session_id())},
-        {ATTR_USER_NAME, client_request_state_->effective_user()}
+        {ATTR_QUERY_ID, query_id_}
       });
+
+  {
+    lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
+
+    current_child_->SetAttribute(ATTR_DEFAULT_DB,
+        client_request_state_->default_db());
+    current_child_->SetAttribute(ATTR_QUERY_STRING,
+        client_request_state_->redacted_sql());
+    current_child_->SetAttribute(ATTR_REQUEST_POOL,
+        client_request_state_->request_pool());
+    current_child_->SetAttribute(ATTR_SESSION_ID,
+        PrintId(client_request_state_->session_id()));
+    current_child_->SetAttribute(ATTR_USER_NAME,
+        client_request_state_->effective_user());
+  }
 } // function StartChildSpanInit
 
 void SpanManager::EndChildSpanInit() {
   lock_guard<mutex> l(child_span_mu_);
+  lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
   DoEndChildSpanInit();
 } // function EndChildSpanInit
 
-void SpanManager::DoEndChildSpanInit(const Status* cause) {
+inline void SpanManager::DoEndChildSpanInit(const Status* cause) {
   DCHECK_CHILD_SPAN_TYPE(ChildSpanType::INIT);
 
   EndChildSpan(
@@ -212,10 +228,11 @@ void SpanManager::StartChildSpanSubmitted() {
 
 void SpanManager::EndChildSpanSubmitted() {
   lock_guard<mutex> l(child_span_mu_);
+  lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
   DoEndChildSpanSubmitted();
 } // function EndChildSpanSubmitted
 
-void SpanManager::DoEndChildSpanSubmitted(const Status* cause) {
+inline void SpanManager::DoEndChildSpanSubmitted(const Status* cause) {
   DCHECK_CHILD_SPAN_TYPE(ChildSpanType::SUBMITTED);
   EndChildSpan(cause);
 } // function DoEndChildSpanSubmitted
@@ -230,7 +247,7 @@ void SpanManager::EndChildSpanPlanning() {
   DoEndChildSpanPlanning();
 } // function EndChildSpanPlanning
 
-void SpanManager::DoEndChildSpanPlanning(const Status* cause) {
+inline void SpanManager::DoEndChildSpanPlanning(const Status* cause) {
   DCHECK_CHILD_SPAN_TYPE(ChildSpanType::PLANNING);
   EndChildSpan(
       cause,
@@ -241,16 +258,16 @@ void SpanManager::DoEndChildSpanPlanning(const Status* 
cause) {
 
 void SpanManager::StartChildSpanAdmissionControl() {
   lock_guard<mutex> l(child_span_mu_);
-  ChildSpanBuilder(ChildSpanType::ADMISSION_CONTROL,
-      {{ATTR_REQUEST_POOL, client_request_state_->request_pool()}});
+  ChildSpanBuilder(ChildSpanType::ADMISSION_CONTROL);
 } // function StartChildSpanAdmissionControl
 
 void SpanManager::EndChildSpanAdmissionControl() {
   lock_guard<mutex> l(child_span_mu_);
+  lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
   DoEndChildSpanAdmissionControl();
 } // function EndChildSpanAdmissionControl
 
-void SpanManager::DoEndChildSpanAdmissionControl(const Status* cause) {
+inline void SpanManager::DoEndChildSpanAdmissionControl(const Status* cause) {
   if (IsClosing()) {
     // If we are already closing, silently return as some cases (such as 
FIRST_FETCH)
     // will end the admission control phase even though the query already 
finished.
@@ -259,14 +276,24 @@ void SpanManager::DoEndChildSpanAdmissionControl(const 
Status* cause) {
 
   DCHECK_CHILD_SPAN_TYPE(ChildSpanType::ADMISSION_CONTROL);
 
-  const bool was_queued = 
client_request_state_->admission_control_client()->WasQueued();
+  bool was_queued = false;
+  const string* adm_result = nullptr;
+
+  if (LIKELY(client_request_state_->summary_profile() != nullptr)) {
+      adm_result =
+          client_request_state_->summary_profile()->GetInfoString("Admission 
result");
+  }
+
+  if (LIKELY(client_request_state_->admission_control_client() != nullptr)) {
+    was_queued = 
client_request_state_->admission_control_client()->WasQueued();
+  }
 
   EndChildSpan(
       cause,
       OtelAttributesMap{
         {ATTR_QUEUED, was_queued},
-        {ATTR_ADM_RESULT,
-            
*client_request_state_->summary_profile()->GetInfoString("Admission result")}
+        {ATTR_ADM_RESULT, (adm_result == nullptr ? "" : *adm_result)},
+        {ATTR_REQUEST_POOL, client_request_state_->request_pool()}
       });
 } // function DoEndChildSpanAdmissionControl
 
@@ -279,15 +306,16 @@ void SpanManager::StartChildSpanQueryExecution() {
    return; // <-- EARLY RETURN
   }
 
-  ChildSpanBuilder(ChildSpanType::QUERY_EXEC, {}, true);
+  ChildSpanBuilder(ChildSpanType::QUERY_EXEC, true);
 } // function StartChildSpanQueryExecution
 
 void SpanManager::EndChildSpanQueryExecution() {
   lock_guard<mutex> l(child_span_mu_);
+  lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
   DoEndChildSpanQueryExecution();
 }  // function EndChildSpanQueryExecution
 
-void SpanManager::DoEndChildSpanQueryExecution(const Status* cause) {
+inline void SpanManager::DoEndChildSpanQueryExecution(const Status* cause) {
   if (IsClosing()) {
     // If we are already closing, silently return as some cases (such as 
FIRST_FETCH)
     // will end the query execution phase even though the query already failed.
@@ -316,14 +344,28 @@ void SpanManager::StartChildSpanClose(const Status* 
cause) {
   // In an error scenario, another child span may still be active since the 
normal code
   // path was interrupted and thus the correct end child span function was not 
called.
   // In this case, we must first end the current child span.
-  EndActiveChildSpan(cause);
+  if (UNLIKELY(current_child_)) {
+    DCHECK(cause != nullptr) << "Child span '" << child_span_type_ << "'is 
active when "
+        "starting the Close span, a non-null cause must be provided to 
indicate why the "
+        "query processing flow is being interrupted.";
+    DCHECK(!cause->ok()) << "Child span '" << child_span_type_ << "'is active 
when "
+        "starting the Close span, a non-OK cause must be provided to indicate 
why the "
+        "query processing flow is being interrupted.";
+
+    {
+      lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
+      EndActiveChildSpan(cause);
+    }
+  }
 
   ChildSpanBuilder(ChildSpanType::CLOSE);
 } // function StartChildSpanClose
 
 void SpanManager::EndChildSpanClose() {
-  lock_guard<mutex> l(child_span_mu_);
   DCHECK_CHILD_SPAN_TYPE(ChildSpanType::CLOSE);
+
+  lock_guard<mutex> l(child_span_mu_);
+  lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
   EndChildSpan();
 
   // Set all root span attributes to avoid dereferencing the 
client_request_state_ in the
@@ -362,7 +404,7 @@ void SpanManager::EndChildSpanClose() {
   }
 } // function EndChildSpanClose
 
-void SpanManager::ChildSpanBuilder(const ChildSpanType& span_type,
+inline void SpanManager::ChildSpanBuilder(const ChildSpanType& span_type,
     OtelAttributesMap&& additional_attributes, bool running) {
   DCHECK(client_request_state_ != nullptr) << "Cannot start child span without 
a valid "
       "client request state.";
@@ -373,10 +415,12 @@ void SpanManager::ChildSpanBuilder(const ChildSpanType& 
span_type,
         "another child span '$1' is still active trace_id=\"$2\" 
span_id=\"$3\"\n$4",
         to_string(span_type), to_string(child_span_type_), root_->TraceId(),
         root_->SpanId(), GetStackTrace());
-    DCHECK(!current_child_) << "Cannot start a new child span while one is 
already "
-        << "active.";
+    DCHECK(false) << "Should not start a new child span while one is already 
active.";
 
-    EndActiveChildSpan();
+    {
+      lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
+      EndActiveChildSpan();
+    }
   }
 
   const string full_span_name = query_id_ + " - " + to_string(span_type);
@@ -392,12 +436,15 @@ void SpanManager::ChildSpanBuilder(const ChildSpanType& 
span_type,
   debug_log_span(current_child_.get(), to_string(span_type), query_id_, true);
 } // function ChildSpanBuilder
 
-void SpanManager::EndChildSpan(const Status* cause,
+inline void SpanManager::ChildSpanBuilder(const ChildSpanType& span_type, bool 
running) {
+  ChildSpanBuilder(span_type, {}, running);
+} // function ChildSpanBuilder
+
+inline void SpanManager::EndChildSpan(const Status* cause,
     const OtelAttributesMap& additional_attributes) {
   DCHECK(client_request_state_ != nullptr) << "Cannot end child span without a 
valid "
       "client request state.";
 
-
   if (LIKELY(current_child_)) {
     for (const auto& a : additional_attributes) {
       current_child_->SetAttribute(a.first, a.second);
@@ -440,7 +487,7 @@ void SpanManager::EndChildSpan(const Status* cause,
   }
 } // function EndChildSpan
 
-void SpanManager::EndActiveChildSpan(const Status* cause) {
+inline void SpanManager::EndActiveChildSpan(const Status* cause) {
   switch (child_span_type_) {
     case ChildSpanType::INIT:
       DoEndChildSpanInit(cause);
diff --git a/be/src/observe/span-manager.h b/be/src/observe/span-manager.h
index 969a11f40..b47b0c161 100644
--- a/be/src/observe/span-manager.h
+++ b/be/src/observe/span-manager.h
@@ -57,7 +57,7 @@ class SpanManager {
 public:
   SpanManager(
       opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> tracer,
-      const ClientRequestState* client_request_state);
+      ClientRequestState* client_request_state);
   ~SpanManager();
 
   // Adds an event to the currently active child span. If no child span is 
active,
@@ -88,14 +88,19 @@ public:
   void StartChildSpanClose(const Status* cause = nullptr);
 
   // Functions to end child spans. If no child span is active, logs a warning 
and does
-  // nothing else.
+  // nothing else. These functions take ownership of child_span_mu_ and
+  // client_request_state_->lock().
   void EndChildSpanInit();
   void EndChildSpanSubmitted();
-  void EndChildSpanPlanning();
   void EndChildSpanAdmissionControl();
   void EndChildSpanQueryExecution();
   void EndChildSpanClose();
 
+  // Function to end the Planning child span. This function takes ownership of
+  // child_span_mu_ BUT NOT client_request_state_->lock(). The code calling 
this function
+  // already holds client_request_state_->lock().
+  void EndChildSpanPlanning();
+
 private:
   // Tracer instance used to construct spans.
   opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> tracer_;
@@ -104,7 +109,7 @@ private:
   std::unique_ptr<opentelemetry::trace::Scope> scope_;
 
   // ClientRequestState for the query this SpanManager is tracking.
-  const ClientRequestState* client_request_state_;
+  ClientRequestState* client_request_state_;
 
   // Convenience constant string the string representation of the Query ID for 
the query
   // this SpanManager is tracking.
@@ -123,38 +128,39 @@ private:
   ChildSpanType child_span_type_;
 
   // Helper method that builds a child span and populates it with common 
attributes plus
-  // the specified additional attributes. Does not take ownership of the child 
span mutex.
-  // Callers must already hold the child_span_mu_ lock.
-  void ChildSpanBuilder(const ChildSpanType& span_type,
+  // the specified additional attributes.
+  // Callers must hold child_span_mu_ but MUST NOT HOLD 
client_requst_state_->lock().
+  inline void ChildSpanBuilder(const ChildSpanType& span_type,
       OtelAttributesMap&& additional_attributes = {}, bool running = false);
+  inline void ChildSpanBuilder(const ChildSpanType& span_type, bool running);
 
   // Internal helper functions to perform the actual work of ending child 
spans.
-  // Callers must already hold the child_span_mu_ lock.
+  // Callers must already hold child_span_mu_ and 
client_request_state_->lock().
   //
   // Parameters:
   //   cause - See comments on StartChildSpanClose().
-  void DoEndChildSpanInit(const Status* cause = nullptr);
-  void DoEndChildSpanSubmitted(const Status* cause = nullptr);
-  void DoEndChildSpanPlanning(const Status* cause = nullptr);
-  void DoEndChildSpanAdmissionControl(const Status* cause = nullptr);
-  void DoEndChildSpanQueryExecution(const Status* cause = nullptr);
+  inline void DoEndChildSpanInit(const Status* cause = nullptr);
+  inline void DoEndChildSpanSubmitted(const Status* cause = nullptr);
+  inline void DoEndChildSpanPlanning(const Status* cause = nullptr);
+  inline void DoEndChildSpanAdmissionControl(const Status* cause = nullptr);
+  inline void DoEndChildSpanQueryExecution(const Status* cause = nullptr);
 
   // Properly closes the active child span by calling the appropriate End 
method for the
   // active child span type. If no child span is active, does nothing.
-  // Callers must already hold the child_span_mu_ lock.
+  // Callers must already hold child_span_mu_ and 
client_request_state_->lock().
   //
   // Parameters:
   //   cause - See comments on StartChildSpanClose().
-  void EndActiveChildSpan(const Status* cause = nullptr);
+  inline void EndActiveChildSpan(const Status* cause = nullptr);
 
   // Helper method to end a child span and populate its common attributes.
-  // Callers must already hold the child_span_mu_ lock.
+  // Callers must already hold child_span_mu_ and 
client_request_state_->lock().
   //
   // Parameters:
   //   cause - See comments on StartChildSpanClose().
   //   additional_attributes - Span specific attributes that will be set on 
the span
   //                           before ending it.
-  void EndChildSpan(const Status* cause = nullptr,
+  inline void EndChildSpan(const Status* cause = nullptr,
       const OtelAttributesMap& additional_attributes = {});
 
   // Returns true if the Close child span is active.
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index a5b30d36c..2cad22c66 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1144,7 +1144,13 @@ Status ClientRequestState::ExecEventProcessorCmd() {
 
 void ClientRequestState::Finalize(const Status* cause) {
   if (otel_trace_query()) {
-    // No need to end previous child span since this function takes care of it.
+    // In a non-error case, end the query execution span since it will be the 
active span.
+    if (cause == nullptr || cause->ok()) {
+      otel_span_manager_->EndChildSpanQueryExecution();
+    }
+
+    // No need to end previous child span in an error case. This function 
silently closes
+    // the active child span if there is one.
     otel_span_manager_->StartChildSpanClose(cause);
   }
 
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 645679bab..b4fdb61f2 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1425,8 +1425,8 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& 
query_ctx,
     if ((*query_handle)->otel_trace_query()) {
       (*query_handle)->otel_span_manager()->EndChildSpanPlanning();
     }
-
   }
+
   VLOG(2) << "Execution request: "
           << ThriftDebugString((*query_handle)->exec_request());
 
diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py
index 2bb17db53..2f0b70a7c 100644
--- a/tests/common/file_utils.py
+++ b/tests/common/file_utils.py
@@ -208,13 +208,64 @@ def count_lines(file_path):
     return sum(1 for _ in file.readlines())
 
 
+def file_ends_with_char(file_path, char="\n"):
+  """
+  Checks if a file ends with a specified character.
+
+  Args:
+    file_path: Path to the file to check.
+    char: The character to check for at the end of the file (default is 
newline)/
+
+  Returns:
+    bool: True if the file ends with the specified character, False otherwise.
+
+  Raises:
+    AssertionError: If char is not a single character.
+  """
+  assert isinstance(char, str)
+
+  char = char.encode('utf-8')
+  assert len(char) == 1, "char parameter must be a single character, got: 
{}".format(char)
+
+  assert os.path.isfile(file_path), "File does not exist: {}".format(file_path)
+  if os.path.getsize(file_path) == 0:
+    return False
+
+  with open(file_path, 'rb') as f:
+    # Move to the last character of the file
+    f.seek(-1, os.SEEK_END)
+    last_char = f.read(1)
+    # Check if the last character matches the provided character
+    return last_char == char
+
+
 def wait_for_file_line_count(file_path, expected_line_count, max_attempts=3,
-    sleep_time_s=1, backoff=2):
-  """Waits until the given file contains the expected number of lines or until 
the
-      timeout is reached. Fails an assert if the timeout is reached before the 
expected
-      number of lines is found."""
+    sleep_time_s=1, backoff=2, last_char="\n"):
+  """
+  Waits until the given file contains the expected number of lines or until 
the timeout is
+  reached. Fails an assert if the timeout is reached before the expected 
number of lines
+  is found.
+
+  Args:
+    file_path: Path to the file to check.
+    expected_line_count: Expected number of lines in the file.
+    max_attempts: Maximum number of attempts to check the file (default is 3).
+    sleep_time_s: Time to wait between attempts in seconds (default is 1).
+    backoff: Backoff factor for exponential backoff (default is 2).
+    last_char: Optional character that the file should end with (default is 
newline). If
+               None, the file is not checked for a specific ending character.
+
+  Raises:
+    AssertionError: If the file does not reach the expected line count within 
the given
+                    number of attempts or if the file does not end with the 
specified
+                    character (if provided).
+  """
   def assert_trace_file_lines():
-    return count_lines(file_path) == expected_line_count
+    ret = count_lines(file_path) == expected_line_count
+    if last_char is not None:
+      ret = ret and file_ends_with_char(file_path, last_char)
+
+    return ret
 
   assert retry(assert_trace_file_lines, max_attempts, sleep_time_s, backoff), \
       "File '{}' did not reach expected line count of '{}'. actual line count: 
'{}'" \

Reply via email to