This is an automated email from the ASF dual-hosted git repository.
jasonmfehr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 3910e924d IMPALA-13237: [Patch 7] - Lock ClientRequestState during
Opentelemetry Traces
3910e924d is described below
commit 3910e924d406709419f654eb5af30e22da11f9c5
Author: jasonmfehr <[email protected]>
AuthorDate: Tue Aug 12 17:05:34 2025 -0700
IMPALA-13237: [Patch 7] - Lock ClientRequestState during Opentelemetry
Traces
Updates the SpanManager class so it takes the ClientRequestState lock
when reading from that object.
Updates startup flag otel_trace_span_processor to be hidden. Manual
testing revealed that setting this flag to "simple" (which uses
SimpleSpanProcessor when forwarding OpenTelemetry traces) causes the
SpanManager object to block until the destination OpenTelemetry
collector receives the request and responds. Thus, network slowness
or an overloaded OpenTelemetry collector will block the entire query
processing flow since SpanManager will hold the ClientRequestState
lock throughout the duration of the communication with the
OpenTelemetry collector. Since the SimpleSpanProcessor is useful in
testing, this flag was changed to hidden to avoid incorrect usage in
production.
When generating span attribute values on OpenTelemetry traces for
queries, data is read from ClientRequestState without holding its
lock. The documentation in client-request-state.h specifically states
reading most fields requires holding its lock.
An examination of the opentelemetry-cpp SDK code revealed the
ClientRequestState lock must be held until the StartSpan() and
EndSpan() functions complete. The reason is span attribute keys and
values are deep copied from the source nostd::string_view objects
during these functions.
Testing accomplished by running the test_otel_trace.py custom cluster
tests as regression tests. Additionally, manual testing with
intentionally delayed network communication to an OpenTelemetry
collector demonstrated that the StartSpan() and EndSpan() functions
do not block waiting on the OpenTelemetry collector if the batch span
processor is used. However, these functions do block if the simple
span processor is used.
Additionally, a cause of flaky tests was addressed. The custom
cluster tests wait until JSON objects for all traces are written to
the output file. Since each trace JSON object is written on its own
line in the output file, this wait is accomplished by checking the
number of lines in the output file. Occasionally, the traces would be
partially written to the file which satisfied the line count check
but the trace would not be fully written out when the assertion code
loaded it. In these situations, the test failed because a partial
JSON object cannot be loaded. The fix is to wait both for the
expected line count and for the last line to end with a newline
character. This fix ensures that the JSON representing the trace is
fully written to the file before the assert code loads it.
Generated-by: Github Copilot (Claude Sonnet 3.7)
Change-Id: I649bdb6f88176995d45f7d10db898188bbe0b609
Reviewed-on: http://gerrit.cloudera.org:8080/23294
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/observe/otel-flags-trace.cc | 15 +++--
be/src/observe/otel.cc | 5 +-
be/src/observe/otel.h | 2 +-
be/src/observe/span-manager.cc | 119 +++++++++++++++++++++++----------
be/src/observe/span-manager.h | 40 ++++++-----
be/src/service/client-request-state.cc | 8 ++-
be/src/service/impala-server.cc | 2 +-
tests/common/file_utils.py | 61 +++++++++++++++--
8 files changed, 184 insertions(+), 68 deletions(-)
diff --git a/be/src/observe/otel-flags-trace.cc
b/be/src/observe/otel-flags-trace.cc
index b144736c8..65718fe82 100644
--- a/be/src/observe/otel-flags-trace.cc
+++ b/be/src/observe/otel-flags-trace.cc
@@ -26,6 +26,7 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gutil/strings/split.h>
+#include <gutil/strings/substitute.h>
#include <opentelemetry/sdk/trace/batch_span_processor_options.h>
#include "common/status.h"
@@ -173,7 +174,8 @@ DEFINE_string(otel_trace_tls_cipher_suites, "", "List of
allowed TLS cipher suit
"using TLS 1.3, default to the value of Impala’s tls_ciphersuites startup
flag.");
DEFINE_bool(otel_trace_tls_insecure_skip_verify, false, "If set to true, skips
"
- "verification of collector’s TLS certificate.");
+ "verification of collector’s TLS certificate. This should only be set to
false for "
+ "development / testing");
//
// End of TLS related flags.
//
@@ -204,11 +206,12 @@
DEFINE_validator(otel_trace_retry_policy_backoff_multiplier, ge_one);
//
// Start of Span Processor flags
//
-static const string SPAN_PROCESSOR_HELP = "The span processor implementation
to use for "
- "exporting spans to the OTel Collector. Supported values: '"
- + impala::SPAN_PROCESSOR_BATCH + "' and '" + impala::SPAN_PROCESSOR_SIMPLE
+ "'.";
-DEFINE_string(otel_trace_span_processor, impala::SPAN_PROCESSOR_BATCH.c_str(),
- SPAN_PROCESSOR_HELP.c_str());
+// This flag is hidden because simple span processor blocks the query
processing while
+// communicating with the OTel collector.
+DEFINE_string_hidden(otel_trace_span_processor,
impala::SPAN_PROCESSOR_BATCH.c_str(),
+ strings::Substitute("The span processor implementation to use for
exporting spans to "
+ "the OTel Collector. Supported values: '$0' and '$1'.",
impala::SPAN_PROCESSOR_BATCH,
+ impala::SPAN_PROCESSOR_SIMPLE).c_str());
DEFINE_validator(otel_trace_span_processor, [](const char* flagname,
const string& value) {
const std::string trimmed = boost::algorithm::trim_copy(value);
diff --git a/be/src/observe/otel.cc b/be/src/observe/otel.cc
index 456c8030d..40a870d11 100644
--- a/be/src/observe/otel.cc
+++ b/be/src/observe/otel.cc
@@ -268,6 +268,9 @@ Status init_otel_tracer() {
}
} else {
VLOG(2) << "Using SimpleSpanProcessor for OTel spans";
+ LOG(WARNING) << "Setting --otel_trace_span_processor=simple blocks the
query "
+ "processing thread while exporting spans to the OTel collector. This will
cause "
+ "significant performance degradation and is not recommended for production
use.";
processor = make_unique<SimpleSpanProcessor>(move(exporter));
}
@@ -289,7 +292,7 @@ void shutdown_otel_tracer() {
provider_.reset();
}
-shared_ptr<SpanManager> build_span_manager(const ClientRequestState* crs) {
+shared_ptr<SpanManager> build_span_manager(ClientRequestState* crs) {
DCHECK(provider_) << "OpenTelemetry tracer was not initialized.";
return make_shared<SpanManager>(
diff --git a/be/src/observe/otel.h b/be/src/observe/otel.h
index 35e79a810..f5591d6cb 100644
--- a/be/src/observe/otel.h
+++ b/be/src/observe/otel.h
@@ -52,6 +52,6 @@ Status init_otel_tracer();
void shutdown_otel_tracer();
// Builds a SpanManager instance for the given query.
-std::shared_ptr<SpanManager> build_span_manager(const ClientRequestState*);
+std::shared_ptr<SpanManager> build_span_manager(ClientRequestState*);
} // namespace impala
diff --git a/be/src/observe/span-manager.cc b/be/src/observe/span-manager.cc
index 38f42d820..dbd0ae8d2 100644
--- a/be/src/observe/span-manager.cc
+++ b/be/src/observe/span-manager.cc
@@ -122,7 +122,7 @@ static inline void debug_log_span(const TimedSpan* span,
const string& span_name
} // function debug_log_span
SpanManager::SpanManager(nostd::shared_ptr<trace::Tracer> tracer,
- const ClientRequestState* client_request_state) :
tracer_(std::move(tracer)),
+ ClientRequestState* client_request_state) : tracer_(std::move(tracer)),
client_request_state_(client_request_state),
query_id_(PrintId(client_request_state_->query_id())) {
child_span_type_ = ChildSpanType::NONE;
@@ -130,15 +130,20 @@ SpanManager::SpanManager(nostd::shared_ptr<trace::Tracer>
tracer,
DCHECK(client_request_state_ != nullptr) << "Cannot start root span without
a valid "
"client request state.";
- root_ = make_shared<TimedSpan>(tracer_, query_id_, ATTR_QUERY_START_TIME,
ATTR_RUNTIME,
- OtelAttributesMap{
- {ATTR_CLUSTER_ID, FLAGS_cluster_id},
- {ATTR_QUERY_ID, query_id_},
- {ATTR_REQUEST_POOL, client_request_state_->request_pool()},
- {ATTR_SESSION_ID, PrintId(client_request_state_->session_id())},
- {ATTR_USER_NAME, client_request_state_->effective_user()}
- },
- trace::SpanKind::kServer);
+ {
+ lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
+
+ root_ = make_shared<TimedSpan>(tracer_, query_id_, ATTR_QUERY_START_TIME,
+ ATTR_RUNTIME,
+ OtelAttributesMap{
+ {ATTR_CLUSTER_ID, FLAGS_cluster_id},
+ {ATTR_QUERY_ID, query_id_},
+ {ATTR_REQUEST_POOL, client_request_state_->request_pool()},
+ {ATTR_SESSION_ID, PrintId(client_request_state_->session_id())},
+ {ATTR_USER_NAME, client_request_state_->effective_user()}
+ },
+ trace::SpanKind::kServer);
+ }
scope_ = make_unique<trace::Scope>(root_->SetActive());
debug_log_span(root_.get(), "Root", query_id_, true);
@@ -180,21 +185,32 @@ void SpanManager::StartChildSpanInit() {
ChildSpanBuilder(ChildSpanType::INIT,
{
{ATTR_CLUSTER_ID, FLAGS_cluster_id},
- {ATTR_DEFAULT_DB, client_request_state_->default_db()},
- {ATTR_QUERY_ID, query_id_},
- {ATTR_QUERY_STRING, client_request_state_->redacted_sql()},
- {ATTR_REQUEST_POOL, client_request_state_->request_pool()},
- {ATTR_SESSION_ID, PrintId(client_request_state_->session_id())},
- {ATTR_USER_NAME, client_request_state_->effective_user()}
+ {ATTR_QUERY_ID, query_id_}
});
+
+ {
+ lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
+
+ current_child_->SetAttribute(ATTR_DEFAULT_DB,
+ client_request_state_->default_db());
+ current_child_->SetAttribute(ATTR_QUERY_STRING,
+ client_request_state_->redacted_sql());
+ current_child_->SetAttribute(ATTR_REQUEST_POOL,
+ client_request_state_->request_pool());
+ current_child_->SetAttribute(ATTR_SESSION_ID,
+ PrintId(client_request_state_->session_id()));
+ current_child_->SetAttribute(ATTR_USER_NAME,
+ client_request_state_->effective_user());
+ }
} // function StartChildSpanInit
void SpanManager::EndChildSpanInit() {
lock_guard<mutex> l(child_span_mu_);
+ lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
DoEndChildSpanInit();
} // function EndChildSpanInit
-void SpanManager::DoEndChildSpanInit(const Status* cause) {
+inline void SpanManager::DoEndChildSpanInit(const Status* cause) {
DCHECK_CHILD_SPAN_TYPE(ChildSpanType::INIT);
EndChildSpan(
@@ -212,10 +228,11 @@ void SpanManager::StartChildSpanSubmitted() {
void SpanManager::EndChildSpanSubmitted() {
lock_guard<mutex> l(child_span_mu_);
+ lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
DoEndChildSpanSubmitted();
} // function EndChildSpanSubmitted
-void SpanManager::DoEndChildSpanSubmitted(const Status* cause) {
+inline void SpanManager::DoEndChildSpanSubmitted(const Status* cause) {
DCHECK_CHILD_SPAN_TYPE(ChildSpanType::SUBMITTED);
EndChildSpan(cause);
} // function DoEndChildSpanSubmitted
@@ -230,7 +247,7 @@ void SpanManager::EndChildSpanPlanning() {
DoEndChildSpanPlanning();
} // function EndChildSpanPlanning
-void SpanManager::DoEndChildSpanPlanning(const Status* cause) {
+inline void SpanManager::DoEndChildSpanPlanning(const Status* cause) {
DCHECK_CHILD_SPAN_TYPE(ChildSpanType::PLANNING);
EndChildSpan(
cause,
@@ -241,16 +258,16 @@ void SpanManager::DoEndChildSpanPlanning(const Status*
cause) {
void SpanManager::StartChildSpanAdmissionControl() {
lock_guard<mutex> l(child_span_mu_);
- ChildSpanBuilder(ChildSpanType::ADMISSION_CONTROL,
- {{ATTR_REQUEST_POOL, client_request_state_->request_pool()}});
+ ChildSpanBuilder(ChildSpanType::ADMISSION_CONTROL);
} // function StartChildSpanAdmissionControl
void SpanManager::EndChildSpanAdmissionControl() {
lock_guard<mutex> l(child_span_mu_);
+ lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
DoEndChildSpanAdmissionControl();
} // function EndChildSpanAdmissionControl
-void SpanManager::DoEndChildSpanAdmissionControl(const Status* cause) {
+inline void SpanManager::DoEndChildSpanAdmissionControl(const Status* cause) {
if (IsClosing()) {
// If we are already closing, silently return as some cases (such as
FIRST_FETCH)
// will end the admission control phase even though the query already
finished.
@@ -259,14 +276,24 @@ void SpanManager::DoEndChildSpanAdmissionControl(const
Status* cause) {
DCHECK_CHILD_SPAN_TYPE(ChildSpanType::ADMISSION_CONTROL);
- const bool was_queued =
client_request_state_->admission_control_client()->WasQueued();
+ bool was_queued = false;
+ const string* adm_result = nullptr;
+
+ if (LIKELY(client_request_state_->summary_profile() != nullptr)) {
+ adm_result =
+ client_request_state_->summary_profile()->GetInfoString("Admission
result");
+ }
+
+ if (LIKELY(client_request_state_->admission_control_client() != nullptr)) {
+ was_queued =
client_request_state_->admission_control_client()->WasQueued();
+ }
EndChildSpan(
cause,
OtelAttributesMap{
{ATTR_QUEUED, was_queued},
- {ATTR_ADM_RESULT,
-
*client_request_state_->summary_profile()->GetInfoString("Admission result")}
+ {ATTR_ADM_RESULT, (adm_result == nullptr ? "" : *adm_result)},
+ {ATTR_REQUEST_POOL, client_request_state_->request_pool()}
});
} // function DoEndChildSpanAdmissionControl
@@ -279,15 +306,16 @@ void SpanManager::StartChildSpanQueryExecution() {
return; // <-- EARLY RETURN
}
- ChildSpanBuilder(ChildSpanType::QUERY_EXEC, {}, true);
+ ChildSpanBuilder(ChildSpanType::QUERY_EXEC, true);
} // function StartChildSpanQueryExecution
void SpanManager::EndChildSpanQueryExecution() {
lock_guard<mutex> l(child_span_mu_);
+ lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
DoEndChildSpanQueryExecution();
} // function EndChildSpanQueryExecution
-void SpanManager::DoEndChildSpanQueryExecution(const Status* cause) {
+inline void SpanManager::DoEndChildSpanQueryExecution(const Status* cause) {
if (IsClosing()) {
// If we are already closing, silently return as some cases (such as
FIRST_FETCH)
// will end the query execution phase even though the query already failed.
@@ -316,14 +344,28 @@ void SpanManager::StartChildSpanClose(const Status*
cause) {
// In an error scenario, another child span may still be active since the
normal code
// path was interrupted and thus the correct end child span function was not
called.
// In this case, we must first end the current child span.
- EndActiveChildSpan(cause);
+ if (UNLIKELY(current_child_)) {
+ DCHECK(cause != nullptr) << "Child span '" << child_span_type_ << "'is
active when "
+ "starting the Close span, a non-null cause must be provided to
indicate why the "
+ "query processing flow is being interrupted.";
+ DCHECK(!cause->ok()) << "Child span '" << child_span_type_ << "'is active
when "
+ "starting the Close span, a non-OK cause must be provided to indicate
why the "
+ "query processing flow is being interrupted.";
+
+ {
+ lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
+ EndActiveChildSpan(cause);
+ }
+ }
ChildSpanBuilder(ChildSpanType::CLOSE);
} // function StartChildSpanClose
void SpanManager::EndChildSpanClose() {
- lock_guard<mutex> l(child_span_mu_);
DCHECK_CHILD_SPAN_TYPE(ChildSpanType::CLOSE);
+
+ lock_guard<mutex> l(child_span_mu_);
+ lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
EndChildSpan();
// Set all root span attributes to avoid dereferencing the
client_request_state_ in the
@@ -362,7 +404,7 @@ void SpanManager::EndChildSpanClose() {
}
} // function EndChildSpanClose
-void SpanManager::ChildSpanBuilder(const ChildSpanType& span_type,
+inline void SpanManager::ChildSpanBuilder(const ChildSpanType& span_type,
OtelAttributesMap&& additional_attributes, bool running) {
DCHECK(client_request_state_ != nullptr) << "Cannot start child span without
a valid "
"client request state.";
@@ -373,10 +415,12 @@ void SpanManager::ChildSpanBuilder(const ChildSpanType&
span_type,
"another child span '$1' is still active trace_id=\"$2\"
span_id=\"$3\"\n$4",
to_string(span_type), to_string(child_span_type_), root_->TraceId(),
root_->SpanId(), GetStackTrace());
- DCHECK(!current_child_) << "Cannot start a new child span while one is
already "
- << "active.";
+ DCHECK(false) << "Should not start a new child span while one is already
active.";
- EndActiveChildSpan();
+ {
+ lock_guard<mutex> crs_lock(*(client_request_state_->lock()));
+ EndActiveChildSpan();
+ }
}
const string full_span_name = query_id_ + " - " + to_string(span_type);
@@ -392,12 +436,15 @@ void SpanManager::ChildSpanBuilder(const ChildSpanType&
span_type,
debug_log_span(current_child_.get(), to_string(span_type), query_id_, true);
} // function ChildSpanBuilder
-void SpanManager::EndChildSpan(const Status* cause,
+inline void SpanManager::ChildSpanBuilder(const ChildSpanType& span_type, bool
running) {
+ ChildSpanBuilder(span_type, {}, running);
+} // function ChildSpanBuilder
+
+inline void SpanManager::EndChildSpan(const Status* cause,
const OtelAttributesMap& additional_attributes) {
DCHECK(client_request_state_ != nullptr) << "Cannot end child span without a
valid "
"client request state.";
-
if (LIKELY(current_child_)) {
for (const auto& a : additional_attributes) {
current_child_->SetAttribute(a.first, a.second);
@@ -440,7 +487,7 @@ void SpanManager::EndChildSpan(const Status* cause,
}
} // function EndChildSpan
-void SpanManager::EndActiveChildSpan(const Status* cause) {
+inline void SpanManager::EndActiveChildSpan(const Status* cause) {
switch (child_span_type_) {
case ChildSpanType::INIT:
DoEndChildSpanInit(cause);
diff --git a/be/src/observe/span-manager.h b/be/src/observe/span-manager.h
index 969a11f40..b47b0c161 100644
--- a/be/src/observe/span-manager.h
+++ b/be/src/observe/span-manager.h
@@ -57,7 +57,7 @@ class SpanManager {
public:
SpanManager(
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> tracer,
- const ClientRequestState* client_request_state);
+ ClientRequestState* client_request_state);
~SpanManager();
// Adds an event to the currently active child span. If no child span is
active,
@@ -88,14 +88,19 @@ public:
void StartChildSpanClose(const Status* cause = nullptr);
// Functions to end child spans. If no child span is active, logs a warning
and does
- // nothing else.
+ // nothing else. These functions take ownership of child_span_mu_ and
+ // client_request_state_->lock().
void EndChildSpanInit();
void EndChildSpanSubmitted();
- void EndChildSpanPlanning();
void EndChildSpanAdmissionControl();
void EndChildSpanQueryExecution();
void EndChildSpanClose();
+ // Function to end the Planning child span. This function takes ownership of
+ // child_span_mu_ BUT NOT client_request_state_->lock(). The code calling
this function
+ // already holds client_request_state_->lock().
+ void EndChildSpanPlanning();
+
private:
// Tracer instance used to construct spans.
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> tracer_;
@@ -104,7 +109,7 @@ private:
std::unique_ptr<opentelemetry::trace::Scope> scope_;
// ClientRequestState for the query this SpanManager is tracking.
- const ClientRequestState* client_request_state_;
+ ClientRequestState* client_request_state_;
// Convenience constant string the string representation of the Query ID for
the query
// this SpanManager is tracking.
@@ -123,38 +128,39 @@ private:
ChildSpanType child_span_type_;
// Helper method that builds a child span and populates it with common
attributes plus
- // the specified additional attributes. Does not take ownership of the child
span mutex.
- // Callers must already hold the child_span_mu_ lock.
- void ChildSpanBuilder(const ChildSpanType& span_type,
+ // the specified additional attributes.
+ // Callers must hold child_span_mu_ but MUST NOT HOLD
client_requst_state_->lock().
+ inline void ChildSpanBuilder(const ChildSpanType& span_type,
OtelAttributesMap&& additional_attributes = {}, bool running = false);
+ inline void ChildSpanBuilder(const ChildSpanType& span_type, bool running);
// Internal helper functions to perform the actual work of ending child
spans.
- // Callers must already hold the child_span_mu_ lock.
+ // Callers must already hold child_span_mu_ and
client_request_state_->lock().
//
// Parameters:
// cause - See comments on StartChildSpanClose().
- void DoEndChildSpanInit(const Status* cause = nullptr);
- void DoEndChildSpanSubmitted(const Status* cause = nullptr);
- void DoEndChildSpanPlanning(const Status* cause = nullptr);
- void DoEndChildSpanAdmissionControl(const Status* cause = nullptr);
- void DoEndChildSpanQueryExecution(const Status* cause = nullptr);
+ inline void DoEndChildSpanInit(const Status* cause = nullptr);
+ inline void DoEndChildSpanSubmitted(const Status* cause = nullptr);
+ inline void DoEndChildSpanPlanning(const Status* cause = nullptr);
+ inline void DoEndChildSpanAdmissionControl(const Status* cause = nullptr);
+ inline void DoEndChildSpanQueryExecution(const Status* cause = nullptr);
// Properly closes the active child span by calling the appropriate End
method for the
// active child span type. If no child span is active, does nothing.
- // Callers must already hold the child_span_mu_ lock.
+ // Callers must already hold child_span_mu_ and
client_request_state_->lock().
//
// Parameters:
// cause - See comments on StartChildSpanClose().
- void EndActiveChildSpan(const Status* cause = nullptr);
+ inline void EndActiveChildSpan(const Status* cause = nullptr);
// Helper method to end a child span and populate its common attributes.
- // Callers must already hold the child_span_mu_ lock.
+ // Callers must already hold child_span_mu_ and
client_request_state_->lock().
//
// Parameters:
// cause - See comments on StartChildSpanClose().
// additional_attributes - Span specific attributes that will be set on
the span
// before ending it.
- void EndChildSpan(const Status* cause = nullptr,
+ inline void EndChildSpan(const Status* cause = nullptr,
const OtelAttributesMap& additional_attributes = {});
// Returns true if the Close child span is active.
diff --git a/be/src/service/client-request-state.cc
b/be/src/service/client-request-state.cc
index a5b30d36c..2cad22c66 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1144,7 +1144,13 @@ Status ClientRequestState::ExecEventProcessorCmd() {
void ClientRequestState::Finalize(const Status* cause) {
if (otel_trace_query()) {
- // No need to end previous child span since this function takes care of it.
+ // In a non-error case, end the query execution span since it will be the
active span.
+ if (cause == nullptr || cause->ok()) {
+ otel_span_manager_->EndChildSpanQueryExecution();
+ }
+
+ // No need to end previous child span in an error case. This function
silently closes
+ // the active child span if there is one.
otel_span_manager_->StartChildSpanClose(cause);
}
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 645679bab..b4fdb61f2 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1425,8 +1425,8 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx&
query_ctx,
if ((*query_handle)->otel_trace_query()) {
(*query_handle)->otel_span_manager()->EndChildSpanPlanning();
}
-
}
+
VLOG(2) << "Execution request: "
<< ThriftDebugString((*query_handle)->exec_request());
diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py
index 2bb17db53..2f0b70a7c 100644
--- a/tests/common/file_utils.py
+++ b/tests/common/file_utils.py
@@ -208,13 +208,64 @@ def count_lines(file_path):
return sum(1 for _ in file.readlines())
+def file_ends_with_char(file_path, char="\n"):
+ """
+ Checks if a file ends with a specified character.
+
+ Args:
+ file_path: Path to the file to check.
+ char: The character to check for at the end of the file (default is
newline)/
+
+ Returns:
+ bool: True if the file ends with the specified character, False otherwise.
+
+ Raises:
+ AssertionError: If char is not a single character.
+ """
+ assert isinstance(char, str)
+
+ char = char.encode('utf-8')
+ assert len(char) == 1, "char parameter must be a single character, got:
{}".format(char)
+
+ assert os.path.isfile(file_path), "File does not exist: {}".format(file_path)
+ if os.path.getsize(file_path) == 0:
+ return False
+
+ with open(file_path, 'rb') as f:
+ # Move to the last character of the file
+ f.seek(-1, os.SEEK_END)
+ last_char = f.read(1)
+ # Check if the last character matches the provided character
+ return last_char == char
+
+
def wait_for_file_line_count(file_path, expected_line_count, max_attempts=3,
- sleep_time_s=1, backoff=2):
- """Waits until the given file contains the expected number of lines or until
the
- timeout is reached. Fails an assert if the timeout is reached before the
expected
- number of lines is found."""
+ sleep_time_s=1, backoff=2, last_char="\n"):
+ """
+ Waits until the given file contains the expected number of lines or until
the timeout is
+ reached. Fails an assert if the timeout is reached before the expected
number of lines
+ is found.
+
+ Args:
+ file_path: Path to the file to check.
+ expected_line_count: Expected number of lines in the file.
+ max_attempts: Maximum number of attempts to check the file (default is 3).
+ sleep_time_s: Time to wait between attempts in seconds (default is 1).
+ backoff: Backoff factor for exponential backoff (default is 2).
+ last_char: Optional character that the file should end with (default is
newline). If
+ None, the file is not checked for a specific ending character.
+
+ Raises:
+ AssertionError: If the file does not reach the expected line count within
the given
+ number of attempts or if the file does not end with the
specified
+ character (if provided).
+ """
def assert_trace_file_lines():
- return count_lines(file_path) == expected_line_count
+ ret = count_lines(file_path) == expected_line_count
+ if last_char is not None:
+ ret = ret and file_ends_with_char(file_path, last_char)
+
+ return ret
assert retry(assert_trace_file_lines, max_attempts, sleep_time_s, backoff), \
"File '{}' did not reach expected line count of '{}'. actual line count:
'{}'" \